/*! * @project XRootD SSI/Protocol Buffer Interface Project * @brief Command-line test client for XRootD SSI/Protocol Buffers * @copyright Copyright 2018 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include "TestClient.hpp" bool is_metadata_done = false; // // Define XRootD SSI callbacks // namespace XrdSsiPb { /* * Alert callback. * * Defines how Alert messages should be logged */ template<> void RequestCallback::operator()(const test::Alert &alert) { Log::Msg(Log::INFO, LOG_SUFFIX, "Alert received:"); // Output message in Json format Log::DumpProtobuf(Log::INFO, &alert); } /* * Data/Stream callback. * * Defines how incoming records from the stream should be handled */ template<> void IStreamBuffer::DataCallback(test::Data record) const { // Processing of Metadata and Data Responses are asynchronous. To guarantee that Metadata Responses // are fully processed before handing the Data Responses, we need to add some kind of synchronisation. // Probably something more portable than this for production code. while(!is_metadata_done) ; const test::Record &test_record = record.record(); std::cout << std::setfill(' ') << std::setw(7) << std::right << test_record.test_int32() << ' ' << std::setfill(' ') << std::setw(15) << std::right << test_record.test_int64() << ' ' << std::setfill(' ') << std::setw(15) << std::right << test_record.test_double() << ' ' << (test_record.test_bool() ? "true " : "false") << ' ' << test_record.test_string() << std::endl; } } // namespace XrdSsiPb // // TestClient implementation // const std::string DEFAULT_ENDPOINT = "localhost:10956"; // Change console text colour const char* const TEXT_RED = "\x1b[31;1m"; const char* const TEXT_NORMAL = "\x1b[0m\n"; // Convert string to bool bool to_bool(std::string str) { std::transform(str.begin(), str.end(), str.begin(), ::tolower); if(str == "true") return true; if(str == "false") return false; throw std::runtime_error(str + " is not a valid boolean value"); } TestClientCmd::TestClientCmd(int argc, const char *const *const argv) : m_execname(argv[0]), m_endpoint(DEFAULT_ENDPOINT), m_repeat(1) { // Strip path from execname size_t p = m_execname.find_last_of('/'); if(p != std::string::npos) m_execname.erase(0, p+1); // Parse the command if(argc < 2) throwUsage("Missing command"); std::string cmd(argv[1]); int next_arg = 2; if(cmd == "help") { throwUsage(); } else if(cmd == "metadata") { m_request.set_cmd(test::Request::SEND_METADATA); } else if(cmd == "data") { m_request.set_cmd(test::Request::SEND_DATA); } else if(cmd == "stream") { if(argc < 3) throwUsage("Must specify number of records to return"); m_request.set_cmd(test::Request::SEND_STREAM); m_request.set_repeat(strtol(argv[next_arg++], nullptr, 0)); if(m_request.repeat() < 1) throwUsage("Number of records to return must be a valid positive integer"); } else { throwUsage("Unrecognized command: " + cmd); } // Parse command line options while(next_arg < argc) { std::string option(argv[next_arg++]); if(next_arg == argc) throwUsage("Unrecognised option or missing parameter"); if(option == "--endpoint") { m_endpoint = argv[next_arg++]; } else if(option == "--repeat") { m_repeat = std::atoi(argv[next_arg++]); } else if(option == "--bool") { m_request.mutable_record()->set_test_bool(to_bool(argv[next_arg++])); } else if(option == "--int64") { m_request.mutable_record()->set_test_int64(strtoll(argv[next_arg++], nullptr, 0)); } else if(option == "--double") { m_request.mutable_record()->set_test_double(strtod(argv[next_arg++], nullptr)); } else if(option == "--string") { m_request.mutable_record()->set_test_string(argv[next_arg++]); } } // Read environment variables XrdSsiPb::Config config; // If XRDDEBUG=1, switch on all logging if(getenv("XRDDEBUG")) { config.set("log", "all"); } // XrdSsiPbLogLevel gives more fine-grained control over log level config.getEnv("log", "XrdSsiPbLogLevel"); // If the server is down, we want an immediate failure. Set client retry to a single attempt. XrdSsiProviderClient->SetTimeout(XrdSsiProvider::connect_N, 1); // Obtain a Service Provider std::string resource("/test"); m_test_service_ptr = std::make_unique(m_endpoint, resource, config); } void TestClientCmd::Send() { if(m_request.cmd() == test::Request::SEND_METADATA) { try { // Send the Request to the Service and get a Response test::Response response; m_test_service_ptr->Send(m_request, response); // Handle responses switch(response.type()) { using namespace test; case Response::RSP_SUCCESS: std::cout << response.message_txt(); break; case Response::RSP_ERR_PROTOBUF: throw XrdSsiPb::PbException(response.message_txt()); case Response::RSP_ERR_USER: case Response::RSP_ERR_SERVER: throw std::runtime_error(response.message_txt()); default: throw XrdSsiPb::PbException("Invalid response type."); } } catch(XrdSsiPb::XrdSsiException &ex) { std::cerr << ex.what() << std::endl; } } else { SendAsync(); } } void TestClientCmd::SendAsync() { while(m_repeat-- > 0) { try { // Send the Request to the Service and get a Response test::Response response; XrdSsiPbServiceType::DataFuture stream_future(m_test_service_ptr->SendAsync(m_request, response)); // Handle responses switch(response.type()) { using namespace test; case Response::RSP_SUCCESS: std::cout << response.message_txt(); break; case Response::RSP_ERR_PROTOBUF: throw XrdSsiPb::PbException(response.message_txt()); case Response::RSP_ERR_USER: case Response::RSP_ERR_SERVER: throw std::runtime_error(response.message_txt()); default: throw XrdSsiPb::PbException("Invalid response type."); } is_metadata_done = true; // If there is a Data/Stream payload, wait until it has been processed before exiting stream_future.wait(); } catch(XrdSsiPb::XrdSsiException &ex) { // Don't stop for client-side XRootD errors, just report it and try again next time std::cerr << ex.what() << std::endl; } // Sleep for 5 secs between outgoing requests if(m_repeat > 0) sleep(5); } } void TestClientCmd::throwUsage(const std::string &error_txt) const { std::stringstream help; if(error_txt != "") { help << m_execname << ": " << error_txt << std::endl << std::endl; } help << TEXT_RED << "XRootD SSI/Google Protocol Buffers 3 Test Client" << TEXT_NORMAL << std::endl << "Usage:" << std::endl << " " << m_execname << " help Show this help message" << std::endl << " " << m_execname << " metadata Request a metadata-only response" << std::endl << " " << m_execname << " data Request a simple data response" << std::endl << " " << m_execname << " stream Request a stream response comprising the specified number of records" << std::endl << std::endl << "Where is specified with one or more of these options: " << std::endl << " [--bool true|false] [--int64 ] [--double ] [--string ]" << std::endl << std::endl << "Additional options:" << std::endl << " [--endpoint :] Address of the XRootD SSI server to connect to (default localhost:10955)" << std::endl << " [--repeat ] Number of times to repeat the command using the same Service object (default 1)" << std::endl; throw std::runtime_error(help.str()); } /*! * Start here * * @param argc[in] The number of command-line arguments * @param argv[in] The command-line arguments * * @retval 0 Success * @retval 1 The client threw an exception */ int main(int argc, const char **argv) { try { // Test uninitialised logging : logging is not available until the test_service object is // instantiated, so this message should be silently ignored XrdSsiPb::Log::Msg(XrdSsiPb::Log::ERROR, "main", "Logging is not initialised"); // Parse the command line arguments TestClientCmd cmd(argc, argv); // Send the protocol buffer cmd.Send(); // Delete all global objects allocated by libprotobuf google::protobuf::ShutdownProtobufLibrary(); return 0; } catch (XrdSsiPb::PbException &ex) { std::cerr << "Error in Google Protocol Buffers: " << ex.what() << std::endl; } catch (XrdSsiPb::XrdSsiException &ex) { std::cerr << "Error from XRootD SSI Framework: " << ex.what() << std::endl; } catch (std::runtime_error &ex) { std::cerr << ex.what() << std::endl; } catch (std::exception &ex) { std::cerr << "Caught exception: " << ex.what() << std::endl; } catch (...) { std::cerr << "Caught an unknown exception" << std::endl; } return 1; }