/*! * @project XRootD SSI/Protocol Buffer Interface Project * @brief Output stream to send a stream of protocol buffers from server to client * @copyright Copyright 2018 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #pragma once #include #include namespace XrdSsiPb { /*! * Output Stream Buffer class. * * This class binds XrdSsiStream::Buffer to the stream interface. * * This is a naïve implementation, where memory is allocated and deallocated on every use. It favours * computation efficiency over memory efficiency: the buffer allocated is twice the hint size, but data * copying is minimized. * * A more performant implementation could be implemented using a buffer pool, using the Recycle() * method to return buffers to the pool. */ template class OStreamBuffer : public XrdSsiStream::Buffer { public: /*! * Constructor * * data is a public member of XrdSsiStream::Buffer. It is an unmanaged char* pointer. We initialize * it to double the hint size, with the implicit rule that the size of an individual serialized * record on the wire cannot exceed the hint size. */ OStreamBuffer(uint32_t hint_size) : XrdSsiStream::Buffer(new char[hint_size * 2]), m_hint_size(hint_size), m_data_ptr(data), m_data_size(0) { Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called OStreamBuffer() constructor"); } ~OStreamBuffer() { Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called ~OStreamBuffer() destructor"); delete[] data; } /*! * Get the data size */ uint32_t Size() const { return m_data_size; } /*! * Push a protobuf into the queue * * @retval true The buffer has been filled and is ready for sending * @retval false There is room in the buffer for more records */ bool Push(const DataType &record) { Log::Msg(Log::PROTOBUF, LOG_SUFFIX, "Push(): adding record to stream:"); Log::DumpProtobuf(Log::PROTOBUF, &record); uint32_t bytesize = record.ByteSize(); if(m_data_size + bytesize > m_hint_size * 2) { throw XrdSsiException("OStreamBuffer::Push(): Stream buffer overflow"); } // Write the size of the next record into the buffer google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(bytesize, reinterpret_cast(m_data_ptr)); m_data_ptr += sizeof(uint32_t); // Serialize the Protocol Buffer record.SerializeToArray(m_data_ptr, bytesize); m_data_ptr += bytesize; m_data_size += sizeof(uint32_t) + bytesize; return m_data_size >= m_hint_size; } private: /*! * Called by the XrdSsi framework when it is finished with the object */ virtual void Recycle() { Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called Recycle()"); delete this; } // Member variables const uint32_t m_hint_size; //!< Requested size of the buffer from the XRootD framework char *m_data_ptr; //!< Pointer to the raw storage uint32_t m_data_size; //!< Total size of the buffer on the wire std::vector> m_protobuf_q; //!< Queue of protobufs to be serialized static constexpr const char* const LOG_SUFFIX = "Pb::OStreamBuffer"; //!< Identifier for log messages }; } // namespace XrdSsiPb