/*!
* @project XRootD SSI/Protocol Buffer Interface Project
* @brief XRootD SSI Request class
* @copyright Copyright 2018 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#pragma once
#include
#include
#include
#include "XrdSsiPbIStreamBuffer.hpp"
#include "XrdVersion.hh"
namespace XrdSsiPb {
/*!
* Request Callback class
*
* The client should specialize on this class for the Alert Response type. This permits an arbitrary
* number of Alert messages to be sent before each Response. These can be used for any purpose defined
* by the client and server, for example writing messages to the client log.
*/
template
class RequestCallback
{
public:
void operator()(const CallbackArg &arg);
private:
static constexpr const char* const LOG_SUFFIX = "Pb::RequestCallback"; //!< Identifier for log messages
};
/*!
* Request class
*/
template
class Request : public XrdSsiRequest
{
public:
Request(const RequestType &request, unsigned int response_bufsize);
virtual ~Request() {
Log::Msg(Log::DEBUG, LOG_SUFFIX, "Called ~Request() destructor");
}
/*!
* The implementation of GetRequest() must create request data, save it in some manner, and provide
* it to the framework.
*/
virtual char *GetRequest(int &reqlen) override
{
reqlen = m_request_str.size();
return const_cast(m_request_str.c_str());
}
/*
* Optionally also define the RelRequestBuffer() method to clean up when the framework no longer
* needs access to the data. The thread used to initiate a request may be the same one used in the
* GetRequest() call.
*/
virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo) override;
#if XrdMajorVNUM( XrdVNUMBER ) > 4
virtual void ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last) override;
#else
virtual XrdSsiRequest::PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last) override;
#endif
virtual void Alert(XrdSsiRespInfoMsg &alert_msg) override;
/*!
* Return the future associated with this object's Metadata promise
*/
std::future GetMetadataFuture() { return m_metadata_promise.get_future(); }
/*!
* Return the future associated with this object's Data/Stream promise
*/
std::future GetDataFuture() { return m_data_promise.get_future(); }
private:
void ProcessResponseMetadata();
std::string m_request_str; //!< Request buffer
MetadataType m_response_metadata; //!< Response metadata object
std::unique_ptr m_response_buffer; //!< Pointer to storage for Data responses
char *m_response_bufptr; //!< Pointer to the Response buffer
int m_response_bufsize; //!< Size of the Response buffer
std::promise m_metadata_promise; //!< Promise a reply of Metadata type
std::promise m_data_promise; //!< Promise a data or stream response
IStreamBuffer m_istream_buffer; //!< Input stream buffer object
RequestCallback AlertCallback; //!< Callback for Alert messages
static constexpr const char* const LOG_SUFFIX = "Pb::Request"; //!< Identifier for log messages
};
/*!
* Request constructor
*/
template
Request::
Request(const RequestType &request, unsigned int response_bufsize) :
m_response_bufsize(response_bufsize),
m_istream_buffer(response_bufsize)
{
Log::Msg(Log::DEBUG, LOG_SUFFIX, "Request(): Response buffer size = ", m_response_bufsize, " bytes");
// Serialize the Request
if(!request.SerializeToString(&m_request_str))
{
throw PbException("request.SerializeToString() failed");
}
}
/*!
* Process Responses from the server
*
* Requests are sent to the server asynchronously via the service object. ProcessResponse() informs
* the Request object on the client side if it completed or failed.
*/
template
bool Request::ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &rInfo)
{
Log::Msg(Log::DEBUG, LOG_SUFFIX, "ProcessResponse(): response type = ", rInfo.State());
try {
switch(rInfo.rType) {
// Data and Metadata responses
case XrdSsiRespInfo::isData:
// Process Metadata
ProcessResponseMetadata();
// Process Data
if(rInfo.blen > 0)
{
// For Data responses, we need to allocate the buffer to receive the data
m_response_buffer = std::unique_ptr(new char[m_response_bufsize]);
m_response_bufptr = m_response_buffer.get();
// Process Data Response: copy one chunk of data into the buffer, then call ProcessResponseData()
GetResponseData(m_response_bufptr, m_response_bufsize);
} else {
// Return control of the object to the calling thread and delete rInfo
Finished();
// Response is Metadata-only
m_data_promise.set_value();
// It is now safe to delete the Request object
}
break;
// Stream response
case XrdSsiRespInfo::isStream:
// Process Metadata
ProcessResponseMetadata();
// For Stream responses, we need to allocate the buffer to receive the data
m_response_buffer = std::unique_ptr(new char[m_response_bufsize]);
m_response_bufptr = m_response_buffer.get();
// Process Stream Response: copy one chunk of data into the buffer, then call ProcessResponseData()
GetResponseData(m_response_bufptr, m_response_bufsize);
break;
// Handle errors in the XRootD framework (e.g. no response from server)
case XrdSsiRespInfo::isError: throw XrdSsiException(eInfo);
// To implement detached requests, add another callback type which saves the handle
case XrdSsiRespInfo::isHandle: throw XrdSsiException("Detached requests are not implemented.");
// To implement file requests, add another callback type
case XrdSsiRespInfo::isFile: throw XrdSsiException("File requests are not implemented.");
// Handle invalid responses
case XrdSsiRespInfo::isNone:
default: throw XrdSsiException("Invalid Response.");
}
} catch(std::exception &ex) {
std::exception_ptr p(std::current_exception());
try {
// Use the exception to fulfil the promise
m_metadata_promise.set_exception(p);
} catch(std::future_error &f_ex) {
Log::Msg(Log::ERROR, LOG_SUFFIX, "ProcessResponse(): ", f_ex.what());
// Metadata promise is already fulfilled, so set a Data/Stream exception instead
m_data_promise.set_exception(p);
}
Finished();
}
// Response was a valid Data or Stream object, set response metadata
return true;
}
/*!
* Process Response Metadata
*
* A Response can (optionally) contain Metadata. This can be used for simple responses (e.g. status
* code, short message) or as the header for large asynchronous data transfers or streaming data.
*/
template
void Request::ProcessResponseMetadata()
{
int metadata_len;
const char *metadata_buffer = GetMetadata(metadata_len);
Log::Msg(Log::DEBUG, LOG_SUFFIX, "ProcessResponseMetadata(): received ", metadata_len, " bytes");
Log::DumpBuffer(Log::PROTORAW, metadata_buffer, metadata_len);
// Deserialize the metadata
MetadataType metadata;
if(metadata.ParseFromArray(metadata_buffer, metadata_len))
{
Log::DumpProtobuf(Log::PROTOBUF, &metadata);
m_metadata_promise.set_value(metadata);
}
else
{
throw PbException("metadata.ParseFromArray() failed");
}
}
/*!
* Process Response Data.
*
* Data Responses are implemented as a binary stream, which is received one chunk at a time.
* The chunk size is defined when the Request object is instantiated (see m_response_bufsize).
*
* In this implementation, the data returned in the response buffer is record-based, where each
* record is a protocol buffer of type DataType. The framework ensures that the client application
* receives only complete records.
*
* An alternative implementation would be to return typeless blobs to the client application,
* possibly with the format defined in the metadata. This would make sense for cases where the
* data size is in excess of the chunk size. Currently there is no use case for this but
* it could be added in future if required. A possible implementation would be to use type traits
* on DataType to decide how it should be handled.
*
* ProcessResponseData() is called either by GetResponseData(), or asynchronously at any time for data
* streams.
*
* @retval PRD_Normal The response was accepted for processing
* @retval PRD_Hold The response could not be handled at this time. The callback will be placed
* in a global hold queue and the thread will be released. The client is
* responsible for calling the static method XrdSsiRequest::RestartDataResponse()
* to restart processing responses (in FIFO order).
*/
template
#if XrdMajorVNUM( XrdVNUMBER ) > 4
void Request
#else
XrdSsiRequest::PRD_Xeq Request
#endif
::ProcessResponseData(const XrdSsiErrInfo &eInfo, char *response_bufptr, int response_buflen, bool is_last)
{
Log::Msg(Log::DEBUG, LOG_SUFFIX, "ProcessResponseData(): received ", response_buflen, " bytes");
Log::DumpBuffer(Log::PROTORAW, response_bufptr, response_buflen);
// The buffer length is set to -1 if an error occurred setting up the response
if(response_buflen == -1)
{
// Report an error message and abort the stream operation
Log::Msg(Log::ERROR, LOG_SUFFIX, "ProcessResponseData(): fatal error from XRootD framework\n", eInfo.Get());
m_data_promise.set_value();
Finished();
#if XrdMajorVNUM( XrdVNUMBER ) > 4
return;
#else
return XrdSsiRequest::PRD_Normal;
#endif
}
// The buffer length can be 0 if the response is metadata only
if(response_buflen != 0)
{
// Push stream/data buffer onto the input stream for the client
m_istream_buffer.Push(response_bufptr, response_buflen);
}
if(is_last) // No more data to come
{
Log::Msg(Log::DEBUG, LOG_SUFFIX, "ProcessResponseData(): done");
// Clean up
// Set the data promise
m_data_promise.set_value();
// If Request objects are uniform, we could re-use them instead of deleting them, to avoid the
// overhead of repeated object creation. This would require a more complex Request factory. For
// now we just delete.
Finished();
}
else
{
Log::Msg(Log::DEBUG, LOG_SUFFIX, "ProcessResponseData(): request more response data");
// If there is more data, get the next chunk
GetResponseData(m_response_bufptr, m_response_bufsize);
}
#if XrdMajorVNUM( XrdVNUMBER ) < 5
return XrdSsiRequest::PRD_Normal;
#endif
}
/*!
* Deserialize Alert messages and call the Alert callback
*/
template
void Request::Alert(XrdSsiRespInfoMsg &alert_msg)
{
try {
// Get the Alert
int alert_len;
char *alert_buffer = alert_msg.GetMsg(alert_len);
// Deserialize the Alert
AlertType alert;
if(alert.ParseFromArray(alert_buffer, alert_len))
{
AlertCallback(alert);
}
else
{
throw PbException("alert.ParseFromArray() failed");
}
} catch(std::exception &ex) {
Log::Msg(Log::ERROR, LOG_SUFFIX, "Alert() could not process the Alert message: ", ex.what());
}
// Recycle the message to free memory
alert_msg.RecycleMsg();
}
} // namespace XrdSsiPb