/*!
* @project XRootD SSI/Protocol Buffer Interface Project
* @brief Input stream to receive a stream of protocol buffers from the server
* @copyright Copyright 2018 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#pragma once
#include
#include "XrdSsiPbException.hpp"
#include "XrdSsiPbLog.hpp"
namespace XrdSsiPb {
/*!
* Input Stream Buffer class
*
* This implementation is for a record-based stream. The client should be configured with a XrdSsi stream buffer size
* which is large with respect to the maximum size of DataType. This is mainly for efficiency reasons as there is a
* little extra data copying overhead in handling records which are split across two SSI buffers. Note that there is
* also a hard limit: the record size cannot exceed the buffer size.
*
* The buffer size parameter is set in the constructor to XrdSsiPbServiceClientSide. The size of DataType is the maximum
* encoded size of the DataType protocol buffer on the wire.
*
* If there is a requirement to stream arbitrarily large binary blobs rather than records, this functionality will need
* to be added. See the comments on Request::ProcessResponseData().
*/
template
class IStreamBuffer
{
public:
IStreamBuffer(uint32_t bufsize) :
m_max_msglen(bufsize-UINT32_SIZE),
m_split_buffer(std::unique_ptr(new uint8_t[m_max_msglen])),
m_split_buflen(0)
{
Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called IStreamBuffer() constructor");
}
~IStreamBuffer() {
Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called ~IStreamBuffer() destructor");
}
/*!
* Push a new buffer onto the input stream
*
* NOTE: This method is not reentrant; it is assumed it will be called from the XrdSsi framework
* in single-threaded mode. Each client or client thread must set up its own stream.
*
* @param[in] buf_ptr XRootD SSI stream or data buffer
* @param[in] buf_len Size of buf_ptr
*/
void Push(const char *buf_ptr, int buf_len);
private:
/*!
* Pop a single record from an input stream and pass it to the client
*
* If the message is split across the boundary between buffers, the partial message is saved and
* the method returns false.
*
* @param[in] msg_len Size of the next Protocol Buffer message on the wire
* @param[in,out] input_stream Protocol Buffer Coded Input Stream object wrapping the XRootD
* SSI buffer
*
* @retval true There are more messages to process on the stream
* @retval false End of the input stream was reached
*/
bool popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream);
/*!
* Callback to handle stream data
*
* Define a specialised version of this method on the client side to handle a specific type of data stream
*/
void DataCallback(DataType record) const {
throw XrdSsiException("Stream/data payload received, but IStreamBuffer::DataCallback() has not been defined");
}
// Member variables
const uint32_t m_max_msglen; //!< Maximum allowed length of a protobuf on the wire
std::unique_ptr m_split_buffer; //!< Holding buffer for partial messages split across two input buffers
/*!
* NOTE: m_split_buflen must be a signed integer because we need to pass it to
* google::protobuf::io::CodedInputStream, which takes an int* parameter
*
* UINT32_SIZE is a signed int constexpr, to stop gcc complaining about
* signed/unsigned comparisons
*/
int m_split_buflen; //!< Length of data stored in m_split_buffer
static constexpr const char* const LOG_SUFFIX = "Pb::IStreamBuffer"; //!< Identifier for log messages
static constexpr int UINT32_SIZE = sizeof(uint32_t); //!< Size of a uint32_t = 4 bytes
};
template
void IStreamBuffer::Push(const char *buf_ptr, int buf_len)
{
google::protobuf::io::CodedInputStream input_stream(reinterpret_cast(buf_ptr), buf_len);
uint32_t msg_len;
if(m_split_buflen > 0) {
// Stitch together the saved partial record and the incoming record
if(m_split_buflen <= UINT32_SIZE) {
// The size field is split across the boundary, just copy that one field
int bytes_to_copy = UINT32_SIZE - m_split_buflen;
memcpy(m_split_buffer.get() + m_split_buflen, buf_ptr, bytes_to_copy);
input_stream.Skip(bytes_to_copy);
google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len);
popRecord(msg_len, input_stream);
} else {
// The payload is split across the boundary, copy the entire record
google::protobuf::io::CodedInputStream::ReadLittleEndian32FromArray(m_split_buffer.get(), &msg_len);
if(msg_len > m_max_msglen) {
throw XrdSsiException("IStreamBuffer::Push(): Data record size (" + std::to_string(msg_len) +
" bytes) exceeds XRootD SSI buffer size (" + std::to_string(m_max_msglen) + " bytes)");
}
int bytes_to_copy = msg_len + UINT32_SIZE - m_split_buflen;
memcpy(m_split_buffer.get() + m_split_buflen, buf_ptr, bytes_to_copy);
input_stream.Skip(bytes_to_copy);
google::protobuf::io::CodedInputStream split_stream(reinterpret_cast(m_split_buffer.get() + UINT32_SIZE), msg_len);
popRecord(msg_len, split_stream);
}
m_split_buflen = 0;
}
// Extract remaining records from the input buffer
do {
const char *buf_ptr;
// Get pointer to next record
if(!input_stream.GetDirectBufferPointer(reinterpret_cast(&buf_ptr), &buf_len)) break;
if(buf_len < UINT32_SIZE) {
// Size field is split across the boundary, save the partial field and finish
m_split_buflen = buf_len;
memcpy(m_split_buffer.get(), buf_ptr, m_split_buflen);
break;
}
// Get size of next item on the stream
input_stream.ReadLittleEndian32(&msg_len);
} while(popRecord(msg_len, input_stream));
}
template
bool IStreamBuffer::popRecord(int msg_len, google::protobuf::io::CodedInputStream &input_stream)
{
const char *buf_ptr;
int buf_len;
if(msg_len > static_cast(m_max_msglen)) {
throw XrdSsiException("IStreamBuffer::popRecord(): Data record size (" + std::to_string(msg_len) +
" bytes) exceeds XRootD SSI buffer size (" + std::to_string(m_max_msglen) + " bytes)");
}
// Get pointer to next record
if(!input_stream.GetDirectBufferPointer(reinterpret_cast(&buf_ptr), &buf_len)) buf_len = 0;
if(buf_len < msg_len) {
// Record payload is split across the boundary, save the partial record
google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(msg_len, m_split_buffer.get());
memcpy(m_split_buffer.get() + UINT32_SIZE, buf_ptr, buf_len);
m_split_buflen = buf_len + UINT32_SIZE;
return false;
} else {
DataType record;
record.ParseFromArray(buf_ptr, msg_len);
input_stream.Skip(msg_len);
Log::DumpProtobuf(Log::PROTOBUF, &record);
DataCallback(record);
// If the message terminates at the end of the buffer, we are done, otherwise keep going
return buf_len != msg_len;
}
}
} // namespace XrdSsiPb