// ---------------------------------------------------------------------- // File: XrdMqQueueFeeder.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mq/XrdMqClient.hh" #include #include int main(int argc, char* argv[]) { uint64_t max_feeds = 0ull; uint64_t num_feeds = 0ull; uint64_t ms_sleep = 0ull; uint64_t msg_size = 10; if ((argc < 2) || (argc > 5)) { std::cerr << "Usage: " << argv[0] << " / num_feeds " << " ms_sleep_between_feeds msg_size" << std::endl; exit(-1); } if (argc >= 3) { max_feeds = std::stoll(argv[2]); } if (argc >= 4) { ms_sleep = strtoll(argv[3], 0, 10); } if (argc >= 5) { msg_size = strtoll(argv[4], 0, 10); } XrdOucString broker = argv[1]; if (!broker.beginswith("root://")) { std::cerr << "error: must have the following format " << "root://host[:port]/" << std::endl; exit(-1); } XrdMqClient mqc; if (!mqc.AddBroker(broker.c_str())) { std::cerr << "error: failed to add broker " << broker.c_str() << std::endl; exit(-1); } XrdOucString queue = broker; int apos = broker.find("//"); if (apos == STR_NPOS) { std::cerr << "error: must have the following format " << "root://host[:port]/" << std::endl; exit(-1); } int bpos = broker.find("/", apos + 2); if (bpos == STR_NPOS) { std::cerr << "error: must have the following format " << "root://host[:port]/" << std::endl; exit(-1); } queue.erase(0, bpos + 1); std::cout << "info: feeding into queue: " << queue.c_str() << std::endl; mqc.SetDefaultReceiverQueue(queue.c_str()); XrdMqMessage message("HelloDumper"); message.Configure(0); // Creates a logger object for the message std::string body; uint64_t successful_feeds = 0ull; for (uint64_t i = 0; i < msg_size; ++i) { body += "a"; } while (true) { message.NewId(); message.kMessageHeader.kDescription = "Hello Dumper "; message.kMessageHeader.kDescription += (int)num_feeds; message.SetBody(body.c_str()); ++num_feeds; if (!(mqc << message)) { std::cerr << "error: failed to send msg #" << num_feeds << std::endl; } else { std::cout << "info: feeding msg #" << num_feeds << std::endl; ++successful_feeds; } // Exit after max_feeds messages if (max_feeds && (num_feeds >= max_feeds)) { std::cout << "info: successfully sent " << successful_feeds << "/" << num_feeds << " feeds" << std::endl; exit(0); } std::this_thread::sleep_for(std::chrono::milliseconds(ms_sleep)); } }