00001
00010 #include <memory>
00011 #include <vector>
00012 #include <string>
00013
00014 #include <cstring>
00015
00016 struct stat;
00017
00018 class XrdSfsFile;
00019 class XrdSysError;
00020
00021 namespace TPC {
00022 class Stream {
00023 public:
00024 Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
00025 : m_open_for_write(false),
00026 m_avail_count(max_blocks),
00027 m_fh(std::move(fh)),
00028 m_offset(0),
00029 m_log(log)
00030 {
00031 m_buffers.reserve(max_blocks);
00032 for (size_t idx=0; idx < max_blocks; idx++) {
00033 m_buffers.push_back(new Entry(buffer_size));
00034 }
00035 m_open_for_write = true;
00036 }
00037
00038 ~Stream();
00039
00040 int Stat(struct stat *);
00041
00042 int Read(off_t offset, char *buffer, size_t size);
00043
00044 int Write(off_t offset, const char *buffer, size_t size);
00045
00046 size_t AvailableBuffers() const {return m_avail_count;}
00047
00048 void DumpBuffers() const;
00049
00050
00051
00052
00053
00054
00055
00056
00057 bool Finalize();
00058
00059 std::string GetErrorMessage() const {return m_error_buf;}
00060
00061 private:
00062
00063 class Entry {
00064 public:
00065 Entry(size_t capacity) :
00066 m_offset(-1),
00067 m_capacity(capacity),
00068 m_size(0)
00069 {}
00070
00071 bool Available() const {return m_offset == -1;}
00072
00073 int Write(Stream &stream) {
00074 if (Available() || !CanWrite(stream)) {return 0;}
00075
00076 int size_desired = m_size;
00077 int retval = stream.Write(m_offset, &m_buffer[0], size_desired);
00078 m_size = 0;
00079 m_offset = -1;
00080 if (retval != size_desired) {
00081 return -1;
00082 }
00083 return retval;
00084 }
00085
00086 bool Accept(off_t offset, const char *buf, size_t size) {
00087
00088 if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
00089 return false;
00090 }
00091 if (size > m_capacity - m_size) {
00092 return false;
00093 }
00094
00095
00096 ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity();
00097 if (new_bytes_needed > 0) {
00098 m_buffer.reserve(m_capacity);
00099 }
00100
00101
00102 memcpy(&m_buffer[0] + m_size, buf, size);
00103 m_size += size;
00104 if (m_offset == -1) {
00105 m_offset = offset;
00106 }
00107 return true;
00108 }
00109
00110 void ShrinkIfUnused() {
00111 if (!Available()) {return;}
00112 #if __cplusplus > 199711L
00113 m_buffer.shrink_to_fit();
00114 #endif
00115 }
00116
00117 void Move(Entry &other) {
00118 m_buffer.swap(other.m_buffer);
00119 m_offset = other.m_offset;
00120 m_size = other.m_size;
00121 }
00122
00123 off_t GetOffset() const {return m_offset;}
00124 size_t GetCapacity() const {return m_capacity;}
00125 size_t GetSize() const {return m_size;}
00126
00127 private:
00128
00129 Entry(const Entry&) = delete;
00130
00131 bool CanWrite(Stream &stream) const {
00132 return (m_size > 0) && (m_offset == stream.m_offset);
00133 }
00134
00135 off_t m_offset;
00136 size_t m_capacity;
00137 size_t m_size;
00138 std::vector<char> m_buffer;
00139 };
00140
00141 bool m_open_for_write;
00142 size_t m_avail_count;
00143 std::unique_ptr<XrdSfsFile> m_fh;
00144 off_t m_offset;
00145 std::vector<Entry*> m_buffers;
00146 XrdSysError &m_log;
00147 std::string m_error_buf;
00148 };
00149 }