/*! * @project XRootD SSI/Protocol Buffer Interface Project * @brief XRootD SSI Stream response class implementation * @copyright Copyright 2018 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #pragma once #include "XrdSsiPbOStreamBuffer.hpp" #include "test.pb.h" /*! * Test response stream object */ class TestStream : public XrdSsiStream { public: TestStream(const test::Request &request) : XrdSsiStream(XrdSsiStream::isActive), m_repeat(request.repeat()), m_record(std::move(request.record())) { using namespace XrdSsiPb; Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called TestStream() constructor"); } virtual ~TestStream() { using namespace XrdSsiPb; Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called ~TestStream() destructor"); } /*! * Synchronously obtain data from an active stream * * Active streams can only exist on the server-side. This XRootD SSI Stream class is marked as an * active stream in the constructor. * * @param[out] eInfo The object to receive any error description. * @param[in,out] dlen input: the optimal amount of data wanted (this is a hint) * output: the actual amount of data returned in the buffer. * @param[in,out] last input: should be set to false. * output: if true it indicates that no more data remains to be returned * either for this call or on the next call. * * @return Pointer to the Buffer object that contains a pointer to the the data (see below). The * buffer must be returned to the stream using Buffer::Recycle(). The next member is usable. * @retval 0 No more data remains or an error occurred: * last = true: No more data remains. * last = false: A fatal error occurred, eRef has the reason. */ virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) { using namespace XrdSsiPb; Log::Msg(Log::INFO, LOG_SUFFIX, "GetBuff(): XrdSsi buffer fill request (", dlen, " bytes)"); XrdSsiPb::OStreamBuffer *streambuf; try { if(m_repeat <= 0) { // Nothing more to send, close the stream last = true; return nullptr; } sleep(10); streambuf = new XrdSsiPb::OStreamBuffer(dlen); for(bool is_buffer_full = false; m_repeat > 0 && !is_buffer_full; --m_repeat) { test::Data data; // Increment record number m_record.set_test_int32(m_record.test_int32() + 1); // Set record pointer in data object data.set_allocated_record(&m_record); is_buffer_full = streambuf->Push(data); // Prevent protobuf from trying to delete m_record data.release_record(); } dlen = streambuf->Size(); Log::Msg(Log::INFO, LOG_SUFFIX, "GetBuff(): Returning buffer with ", dlen, " bytes of data."); Log::DumpBuffer(Log::PROTORAW, streambuf->data, dlen); } catch(std::exception &ex) { std::ostringstream errMsg; errMsg << __FUNCTION__ << " failed: " << ex.what(); eInfo.Set(errMsg.str().c_str(), ECANCELED); delete streambuf; } catch(...) { std::ostringstream errMsg; errMsg << __FUNCTION__ << " failed: Caught an unknown exception"; eInfo.Set(errMsg.str().c_str(), ECANCELED); delete streambuf; } return streambuf; } private: int32_t m_repeat; //!< Number of records to send in the response test::Record m_record; //!< Request record for round-robin Response test static constexpr const char* const LOG_SUFFIX = "TestStream"; //!< Identifier for log messages };