// ----------------------------------------------------------------------
// File: XrdMqQueueInjection.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#define TRACE_debug 0xffff
#include "mq/XrdMqClient.hh"
#include
int main(int argc, char* argv[])
{
XrdMqClient mqc;
if ((argc < 3) || (argc > 3)) {
fprintf(stderr, "usage: xrdmqinjection / \n");
exit(-1);
}
XrdOucString injectionfile = argv[2];
XrdOucString broker = argv[1];
if (!broker.beginswith("root://")) {
fprintf(stderr,
"error: has to be like root://host[:port]/\n");
exit(-1);
}
if (!mqc.AddBroker(broker.c_str())) {
fprintf(stderr, "error: failed to add broker %s\n", broker.c_str());
exit(-1);
}
XrdOucString queue = broker;
int apos = broker.find("//");
if (apos == STR_NPOS) {
fprintf(stderr,
"error: has to be like root://host[:port]/\n");
exit(-1);
}
int bpos = broker.find("/", apos + 2);
if (bpos == STR_NPOS) {
fprintf(stderr,
"error: has to be like root://host[:port]/\n");
exit(-1);
}
queue.erase(0, bpos + 1);
fprintf(stdout, "=> feeding into %s\n", queue.c_str());
mqc.SetDefaultReceiverQueue(queue.c_str());
FILE* fd = fopen(injectionfile.c_str(), "r");
if (!fd) {
fprintf(stderr, "error: unable to open injection file <%s>\n",
injectionfile.c_str());
exit(-1);
}
ssize_t linein = 0;
char* lineptr = (char*) malloc(4096);
size_t lineptr_n = 4096;
int injected = 0;
while ((linein = getline(&lineptr, &lineptr_n, fd)) > 0) {
fprintf(stdout, "< %s >\n", lineptr);
XrdMqMessage message("Injection");
message.Configure(0);
XrdOucString body = lineptr;
body.erase(body.length() - 1);
message.NewId();
message.MarkAsMonitor();
message.kMessageHeader.kDescription = "Monitor Injection";
message.SetBody(body.c_str());
if (!(mqc << message)) {
fprintf(stderr, "error: failed to send message\n");
} else {
++injected;
}
}
fprintf(stdout, "info: injected %i messages\n", injected);
}