//------------------------------------------------------------------------------
// File: XrdMqClientWorker.cc
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2018 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mq/XrdMqClient.hh"
#include "mq/XrdMqTiming.hh"
#include
int main(int argc, char* argv[])
{
if (argc != 2) {
std::cerr << "error: at least one argument neeeds to be provided"
<< std::endl;
exit(-1);
}
XrdOucString myid = "root://localhost:1097//eos/";
myid += argv[1];
myid += "/worker";
XrdMqClient mqc;
if (!mqc.AddBroker(myid.c_str())) {
std::cerr << "error: failed to add broker " << myid.c_str() << std::endl;
exit(-1);
}
mqc.Subscribe();
mqc.SetDefaultReceiverQueue("/eos/*/master");
XrdMqMessage message("Msg for master");
message.Configure();
message.Encode();
XrdMqTiming mq("send");
TIMING("START", &mq);
uint64_t count = 0ull;
while (true) {
message.NewId();
message.kMessageHeader.kDescription = "Hello Master Test";
(mqc << message);
std::unique_ptr new_msg {mqc.RecvMessage()};
if (new_msg) {
new_msg->Print();
++count;
std::string expected = "Hello Worker Test " + std::to_string(count);
if (new_msg->kMessageHeader.kDescription != expected.c_str()) {
std::cerr << "expected: " << expected << " received: " <<
new_msg->kMessageHeader.kDescription << std::endl;
std::terminate();
}
}
do {
new_msg.reset(mqc.RecvFromInternalBuffer());
if (new_msg == nullptr) {
break;
} else {
++count;
new_msg->Print();
std::string expected = "Hello Worker Test " + std::to_string(count);
if (new_msg->kMessageHeader.kDescription != expected.c_str()) {
std::cerr << "expected: " << expected << " received: " <<
new_msg->kMessageHeader.kDescription << std::endl;
std::terminate();
}
}
} while (true);
}
TIMING("SEND+RECV", &mq);
mq.Print();
}