/*! * @project XRootD SSI/Protocol Buffer Interface Project * @brief XRootD SSI Responder class implementation * @copyright Copyright 2018 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include "TestServiceProvider.hpp" #include "TestStream.hpp" /* * Class to process Request Messages */ class RequestMessage { public: RequestMessage(const XrdSsiEntity &client, const TestServiceProvider *service) { using namespace XrdSsiPb; Log::Msg(Log::DEBUG, LOG_SUFFIX, "RequestMessage() constructor: request received from client ", client.name, '@', client.host); } /*! * Process a Notification request or an Admin command request * * @param[in] request * @param[out] response Response protocol buffer * @param[out] stream Reference to Response stream pointer */ void process(const test::Request &request, test::Response &response, std::string &data_buffer, XrdSsiStream* &stream) { using namespace XrdSsiPb; // Artifical delay to simulate a heavier workload if(request.delay_ms() > 0) { Log::Msg(Log::DEBUG, LOG_SUFFIX, "process(): delay = ", request.delay_ms(), " ms"); std::this_thread::sleep_for(std::chrono::milliseconds(request.delay_ms())); } switch(request.cmd()) { using namespace test; case Request::SEND_METADATA: { std::stringstream message; message << "Request payload:" << std::endl << "bool = " << (request.record().test_bool() ? "true" : "false") << std::endl << "int64 = " << request.record().test_int64() << std::endl << "double = " << request.record().test_double() << std::endl << "string = \"" << request.record().test_string() << "\"" << std::endl; response.set_message_txt(message.str()); response.set_type(Response::RSP_SUCCESS); break; } case Request::SEND_DATA: { // Probably we should provide a function in the generic headers to make this simpler for the client. // As we now have to prepend the length, there is no advantage in using a string for the serialized // buffer. Probably better to change it to a char array managed using a unique pointer. // Return response header in metadata set_header(response); response.set_type(Response::RSP_SUCCESS); // Send data response test::Data data; // Round-robin test: set pointer in data object to record in request object data.set_allocated_record(const_cast(&request.record())); Log::Msg(Log::PROTOBUF, LOG_SUFFIX, "process(): Set Data response:"); Log::DumpProtobuf(Log::PROTOBUF, &data); data.SerializeToString(&data_buffer); // Prevent protobuf from trying to delete request record data.release_record(); // Prepend length so Data and Streams can be handled the same way by the client char bufsize[sizeof(uint32_t)]; google::protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(data_buffer.size(), reinterpret_cast(bufsize)); data_buffer.insert(0, bufsize, sizeof(uint32_t)); break; } case Request::SEND_STREAM: // Create a XrdSsi stream object to return the results stream = new TestStream(request); // Return response header in metadata set_header(response); response.set_type(Response::RSP_SUCCESS); break; default: response.set_message_txt("Invalid cmd."); response.set_type(Response::RSP_ERR_PROTOBUF); } } private: /*! * Set reply header in metadata */ void set_header(test::Response &response) { const char* const TEXT_RED = "\x1b[31;1m"; const char* const TEXT_NORMAL = "\x1b[0m\n"; std::stringstream header; header << TEXT_RED << " Count " << " Int64 Value " << " Double Value " << "Bool String Value" << TEXT_NORMAL; response.set_message_txt(header.str()); } static constexpr const char* const LOG_SUFFIX = "Pb::RequestMessage"; //!< Identifier for log messages }; /* * Implementation of XRootD SSI subclasses */ namespace XrdSsiPb { /* * Convert a framework exception into a Response */ template<> void ExceptionHandler::operator()(test::Response &response, const PbException &ex) { response.set_type(test::Response::RSP_ERR_PROTOBUF); response.set_message_txt(ex.what()); } /* * Process the Notification Request */ template <> void RequestProc::ExecuteAction() { try { // Perform a capability query on the XrdSsiProviderServer object: it must be a TestServiceProvider TestServiceProvider *test_service_ptr; if(!(test_service_ptr = dynamic_cast(XrdSsiProviderServer))) { throw std::runtime_error("XRootD Service is not the Test Service"); } RequestMessage request_msg(*(m_resource.client), test_service_ptr); request_msg.process(m_request, m_metadata, m_response_str, m_response_stream_ptr); } catch(PbException &ex) { m_metadata.set_type(test::Response::RSP_ERR_PROTOBUF); m_metadata.set_message_txt(ex.what()); } catch(std::exception &ex) { // Serialize and send a log message test::Alert alert_msg; alert_msg.set_message_txt("Something bad happened"); Alert(alert_msg); // Send the metadata response m_metadata.set_type(test::Response::RSP_ERR_SERVER); m_metadata.set_message_txt(ex.what()); } } } // namespace XrdSsiPb