//------------------------------------------------------------------------------
// File: XrdMqClientMaster.cc
//------------------------------------------------------------------------------o
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2018 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mq/XrdMqClient.hh"
#include "mq/XrdMqTiming.hh"
#include
#include
int main(int argc, char* argv[])
{
uint64_t num_loops = 1000;
if (argc == 2) {
num_loops = std::stoi(argv[1]);
}
XrdMqClient mqc;
if (!mqc.AddBroker("root://localhost:1097//eos/localhost/master", true, true)) {
std::cerr << "error: failed to add broker" << std::endl;
exit(-1);
}
mqc.Subscribe();
mqc.SetDefaultReceiverQueue("/eos/*/worker");
XrdMqMessage message("Hello Worker");
message.Configure();
message.Encode();
message.Print();
XrdMqTiming mq("send");
TIMING("START", &mq);
do {
for (uint64_t i = 0; i < num_loops; ++i) {
message.NewId();
message.kMessageHeader.kDescription = "Hello Worker Test ";
message.kMessageHeader.kDescription += (int)i;
(mqc << message);
for (int j = 0; j < 10; j++) {
std::unique_ptr new_msg {mqc.RecvMessage()};
if (new_msg == nullptr) {
std::this_thread::sleep_for(std::chrono::seconds(2));
continue;
}
if ((new_msg->kMessageHeader.kType == XrdMqMessageHeader::kStatusMessage) ||
(new_msg->kMessageHeader.kType == XrdMqMessageHeader::kQueryMessage)) {
std::unique_ptr adv_msg {
XrdAdvisoryMqMessage::Create(new_msg->GetMessageBuffer())};
// adv_msg->Print();
} else {
new_msg->Print();
if (new_msg->kMessageHeader.kDescription != "Hello Master Test") {
std::terminate();
}
}
do {
new_msg.reset(mqc.RecvFromInternalBuffer());
if (new_msg == nullptr) {
break;
} else {
if ((new_msg->kMessageHeader.kType == XrdMqMessageHeader::kStatusMessage) ||
(new_msg->kMessageHeader.kType == XrdMqMessageHeader::kQueryMessage)) {
std::unique_ptr adv_msg {
XrdAdvisoryMqMessage::Create(new_msg->GetMessageBuffer())};
// adv_msg->Print();
} else {
new_msg->Print();
if (new_msg->kMessageHeader.kDescription != "Hello Master Test") {
std::terminate();
}
}
}
} while (true);
}
}
} while (true);
TIMING("SEND+RECV", &mq);
mq.Print();
}