/*!
* @project XRootD SSI/Protocol Buffer Interface Project
* @brief Command-line test client for XRootD SSI/Protocol Buffers
* @copyright Copyright 2018 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include "TestClient.hpp"
bool is_metadata_done = false;
// Thread handler
void SendOnce(XrdSsiPbServiceType *service_ptr, test::Request request);
//
// Hook into XRootD SSI globals
//
namespace XrdSsi {
extern XrdSysError Log;
}
//
// Define XRootD SSI callbacks
//
namespace XrdSsiPb {
/*
* Alert callback.
*
* Defines how Alert messages should be logged
*/
template<>
void RequestCallback::operator()(const test::Alert &alert)
{
Log::Msg(Log::INFO, LOG_SUFFIX, "Alert received:");
// Output message in Json format
Log::DumpProtobuf(Log::INFO, &alert);
}
/*
* Data/Stream callback.
*
* Defines how incoming records from the stream should be handled
*/
template<>
void IStreamBuffer::DataCallback(test::Data record) const
{
// Processing of Metadata and Data Responses are asynchronous. To guarantee that Metadata Responses
// are fully processed before handing the Data Responses, we need to add some kind of synchronisation.
// Probably something more portable than this for production code.
while(!is_metadata_done) ;
const test::Record &test_record = record.record();
std::cout << std::setfill(' ') << std::setw(7) << std::right << test_record.test_int32() << ' '
<< std::setfill(' ') << std::setw(15) << std::right << test_record.test_int64() << ' '
<< std::setfill(' ') << std::setw(15) << std::right << test_record.test_double() << ' '
<< (test_record.test_bool() ? "true " : "false") << ' '
<< test_record.test_string() << std::endl;
}
} // namespace XrdSsiPb
//
// TestClient implementation
//
const std::string DEFAULT_ENDPOINT = "localhost:10956";
// Change console text colour
const char* const TEXT_RED = "\x1b[31;1m";
const char* const TEXT_NORMAL = "\x1b[0m\n";
// Convert string to bool
bool to_bool(std::string str) {
std::transform(str.begin(), str.end(), str.begin(), ::tolower);
if(str == "true") return true;
if(str == "false") return false;
throw std::runtime_error(str + " is not a valid boolean value");
}
TestClientCmd::TestClientCmd(int argc, const char *const *const argv) :
m_execname(argv[0]),
m_endpoint(DEFAULT_ENDPOINT),
m_repeat(1)
{
// Read environment variables
XrdSsiPb::Config config;
// If XRDDEBUG=1, switch on all logging
if(getenv("XRDDEBUG")) {
config.set("log", "all");
}
// XrdSsiPbLogLevel gives more fine-grained control over log level
config.getEnv("log", "XrdSsiPbLogLevel");
// Enable microsecond resolution in timestamps
config.set("log.hiRes", "true");
// Strip path from execname
size_t p = m_execname.find_last_of('/');
if(p != std::string::npos) m_execname.erase(0, p+1);
// Threading test is metadata-only
m_request.set_cmd(test::Request::SEND_METADATA);
// Parse command line options
for(int next_arg = 1; next_arg < argc; ) {
std::string option(argv[next_arg++]);
// Options with no argument
if(option == "--reusable") {
config.set("resource.options", "Reusable");
continue;
}
if(next_arg == argc) throwUsage("Unrecognised option or missing parameter");
// Options which take an argument
if(option == "--endpoint") {
m_endpoint = argv[next_arg++];
} else if(option == "--repeat") {
m_repeat = std::atoi(argv[next_arg++]);
} else if(option == "--delay") {
m_request.set_delay_ms(strtoll(argv[next_arg++], nullptr, 0));
} else if(option == "--bool") {
m_request.mutable_record()->set_test_bool(to_bool(argv[next_arg++]));
} else if(option == "--int64") {
m_request.mutable_record()->set_test_int64(strtoll(argv[next_arg++], nullptr, 0));
} else if(option == "--double") {
m_request.mutable_record()->set_test_double(strtod(argv[next_arg++], nullptr));
} else if(option == "--string") {
m_request.mutable_record()->set_test_string(argv[next_arg++]);
}
}
// If the server is down, we want an immediate failure. Set client retry to a single attempt.
XrdSsiProviderClient->SetTimeout(XrdSsiProvider::connect_N, 1);
// Obtain a Service Provider
std::string resource("/test");
m_test_service_ptr = std::make_unique(m_endpoint, resource, config);
}
void TestClientCmd::Send()
{
std::vector threads;
std::vector> connections;
XrdSsiPb::Config config;
std::string resource("/test");
while(m_repeat-- > 0) {
#if 1
// All threads use same connection
threads.push_back(std::thread(SendOnce, m_test_service_ptr.get(), m_request));
#else
// Create a separate connection for each thread
connections.push_back(std::make_unique(m_endpoint, resource, config));
threads.push_back(std::thread(SendOnce, connections.back().get(), m_request));
#endif
}
for(auto &t : threads) {
t.join();
}
}
void SendOnce(XrdSsiPbServiceType *service_ptr, test::Request request)
{
try {
// Send the Request to the Service and get a Response
test::Response response;
// Output the round-trip time for the request (ms)
auto sentAt = std::chrono::steady_clock::now();
service_ptr->Send(request, response);
auto receivedAt = std::chrono::steady_clock::now();
auto timeSpent = std::chrono::duration_cast(receivedAt-sentAt);
std::stringstream ss;
ss << timeSpent.count() << std::endl;
std::cout << ss.str();
// Handle responses
switch(response.type())
{
using namespace test;
case Response::RSP_SUCCESS: std::cout << response.message_txt(); break;
case Response::RSP_ERR_PROTOBUF: throw XrdSsiPb::PbException(response.message_txt());
case Response::RSP_ERR_USER:
case Response::RSP_ERR_SERVER: throw std::runtime_error(response.message_txt());
default: throw XrdSsiPb::PbException("Invalid response type.");
}
} catch(XrdSsiPb::XrdSsiException &ex) {
std::cerr << ex.what() << std::endl;
}
}
void TestClientCmd::throwUsage(const std::string &error_txt) const
{
std::stringstream help;
if(error_txt != "") {
help << m_execname << ": " << error_txt << std::endl << std::endl;
}
help << TEXT_RED << "XRootD SSI/Google Protocol Buffers 3 Thread Test Client" << TEXT_NORMAL << std::endl
<< "Usage:" << std::endl
<< " " << m_execname << " [--endpoint :] [--repeat ] " << std::endl
<< "Where is specified with one or more of these options: " << std::endl
<< " [--bool true|false] [--int64 ] [--double ] [--string ]" << std::endl << std::endl
<< "Additional options:" << std::endl
<< " [--endpoint :] Address of the XRootD SSI server to connect to (default localhost:10955)" << std::endl
<< " [--reusable] Switch on XRootD SSI Reusable Resources" << std::endl
<< " [--delay ] Delay in ms before server sends its Response" << std::endl
<< " [--repeat ] Number of times to repeat the command using the same Service object (default 1)" << std::endl;
throw std::runtime_error(help.str());
}
/*!
* Start here
*
* @param argc[in] The number of command-line arguments
* @param argv[in] The command-line arguments
*
* @retval 0 Success
* @retval 1 The client threw an exception
*/
int main(int argc, const char **argv)
{
try {
// Test uninitialised logging : logging is not available until the test_service object is
// instantiated, so this message should be silently ignored
XrdSsiPb::Log::Msg(XrdSsiPb::Log::ERROR, "main", "Logging is not initialised");
// Parse the command line arguments
TestClientCmd cmd(argc, argv);
// Send the protocol buffer
cmd.Send();
// Delete all global objects allocated by libprotobuf
google::protobuf::ShutdownProtobufLibrary();
return 0;
} catch (XrdSsiPb::PbException &ex) {
std::cerr << "Error in Google Protocol Buffers: " << ex.what() << std::endl;
} catch (XrdSsiPb::XrdSsiException &ex) {
std::cerr << "Error from XRootD SSI Framework: " << ex.what() << std::endl;
} catch (std::runtime_error &ex) {
std::cerr << ex.what() << std::endl;
} catch (std::exception &ex) {
std::cerr << "Caught exception: " << ex.what() << std::endl;
} catch (...) {
std::cerr << "Caught an unknown exception" << std::endl;
}
return 1;
}