// ----------------------------------------------------------------------
// File: XrdMqOfs.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "XrdVersion.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucTrace.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysLogger.hh"
#include "XrdSys/XrdSysTimer.hh"
#include "XrdSec/XrdSecInterface.hh"
#include "XrdSfs/XrdSfsAio.hh"
#include "XrdNet/XrdNetUtils.hh"
#include "XrdNet/XrdNetAddr.hh"
#include "mq/XrdMqOfs.hh"
#include "mq/XrdMqMessage.hh"
#include "mq/XrdMqOfsTrace.hh"
#include "common/PasswordHandler.hh"
#include "common/Strerror_r_wrapper.hh"
#include "common/StringUtils.hh"
#include "namespace/ns_quarkdb/qclient/include/qclient/QClient.hh"
#include
#include
#include
#include
#include
#include
#include
#include
#define XRDMQOFS_FSCTLPATHLEN 1024
std::string XrdMqOfs::sLeaseKey {"master_lease"};
XrdSysError gMqOfsEroute(0);
XrdOucTrace gMqOfsTrace(&gMqOfsEroute);
XrdMqOfs* gMqFS = 0;
#ifdef COVERAGE_BUILD
// Forward declaration of gcov flush API
extern "C" void __gcov_flush();
#endif
//------------------------------------------------------------------------------
// Shutdown handler
//------------------------------------------------------------------------------
void
xrdmqofs_shutdown(int sig)
{
exit(0);
}
//------------------------------------------------------------------------------
// Coverage report handler
//------------------------------------------------------------------------------
void
xrdmqofs_coverage(int sig)
{
#ifdef COVERAGE_BUILD
eos_static_notice("printing coverage data");
__gcov_flush();
return;
#endif
eos_static_notice("compiled without coverage support");
}
//------------------------------------------------------------------------------
// File open
//------------------------------------------------------------------------------
int
XrdMqOfsFile::open(const char* queuename, XrdSfsFileOpenMode openMode,
mode_t createMode, const XrdSecEntity* client,
const char* opaque)
{
EPNAME("open");
tident = error.getErrUser();
SetLogId(nullptr, tident);
eos_info("connecting queue: %s", queuename);
MAYREDIRECT;
mQueueName = queuename;
XrdSysMutexHelper scope_lock(gMqFS->mQueueOutMutex);
// printf("%s %s %s\n",mQueueName.c_str(),gMqFS->QueuePrefix.c_str(),opaque);
// check if this queue is accepted by the broker
if (mQueueName.find(gMqFS->QueuePrefix.c_str()) != 0) {
// this queue is not supported by us
return gMqFS->Emsg(epname, error, EINVAL,
"connect queue - the broker does not serve the requested queue");
}
if (gMqFS->mQueueOut.count(mQueueName)) {
fprintf(stderr, "EBUSY: Queue %s is busy\n", mQueueName.c_str());
// this is already open by 'someone'
return gMqFS->Emsg(epname, error, EBUSY, "connect queue - already connected",
queuename);
}
mMsgOut = new XrdMqMessageOut(queuename);
// check if advisory messages are requested
XrdOucEnv queueenv((opaque) ? opaque : "");
bool advisorystatus = false;
bool advisoryquery = false;
bool advisoryflushbacklog = false;
const char* val;
if ((val = queueenv.Get(XMQCADVISORYSTATUS))) {
advisorystatus = atoi(val);
}
if ((val = queueenv.Get(XMQCADVISORYQUERY))) {
advisoryquery = atoi(val);
}
if ((val = queueenv.Get(XMQCADVISORYFLUSHBACKLOG))) {
advisoryflushbacklog = atoi(val);
}
mMsgOut->AdvisoryStatus = advisorystatus;
mMsgOut->AdvisoryQuery = advisoryquery;
mMsgOut->AdvisoryFlushBackLog = advisoryflushbacklog;
mMsgOut->BrokenByFlush = false;
gMqFS->mQueueOut.insert(std::make_pair(mQueueName, mMsgOut));
eos_info("connected queue: %s", mQueueName.c_str());
mIsOpen = true;
return SFS_OK;
}
//------------------------------------------------------------------------------
// File stat
//------------------------------------------------------------------------------
int
XrdMqOfsFile::stat(struct stat* buf)
{
EPNAME("stat");
int port = 0;
XrdOucString host = "";
if (gMqFS->ShouldRedirect(host, port)) {
// we have to close this object to make the client reopen it to be redirected
// this->close();
eos_static_info("%s", "msg=\"stat force close - should redirect\"");
return gMqFS->Emsg(epname, error, EINVAL,
"stat - forced close - you should be redirected");
}
MAYREDIRECT;
gMqFS->Statistics();
if (mMsgOut) {
mMsgOut->DeletionSem.Wait();
// this should be the case always ...
ZTRACE(stat, "Waiting for message");
gMqFS->AdvisoryMessages++;
// Submit an advisory message
XrdAdvisoryMqMessage amg("AdvisoryQuery", mQueueName.c_str(), true,
XrdMqMessageHeader::kQueryMessage);
XrdMqMessageHeader::GetTime(amg.kMessageHeader.kSenderTime_sec,
amg.kMessageHeader.kSenderTime_nsec);
XrdMqMessageHeader::GetTime(amg.kMessageHeader.kBrokerTime_sec,
amg.kMessageHeader.kBrokerTime_nsec);
amg.kMessageHeader.kSenderId = gMqFS->BrokerId;
amg.Encode();
// amg.Print();
XrdSmartOucEnv* env = new XrdSmartOucEnv(amg.GetMessageBuffer());
XrdMqOfsMatches matches(gMqFS->QueueAdvisory.c_str(), env, tident,
XrdMqMessageHeader::kQueryMessage, mQueueName.c_str());
if (!gMqFS->Deliver(matches)) {
delete env;
}
ZTRACE(stat, "Grabbing message");
memset(buf, 0, sizeof(struct stat));
buf->st_blksize = 1024;
buf->st_dev = 0;
buf->st_rdev = 0;
buf->st_nlink = 1;
buf->st_uid = 0;
buf->st_gid = 0;
buf->st_size = mMsgOut->RetrieveMessages();
buf->st_atime = 0;
buf->st_mtime = 0;
buf->st_ctime = 0;
buf->st_blocks = 1024;
buf->st_ino = 0;
buf->st_mode = S_IXUSR | S_IRUSR | S_IWUSR | S_IFREG;
mMsgOut->DeletionSem.Post();
if (buf->st_size == 0) {
gMqFS->NoMessages++;
}
return SFS_OK;
}
ZTRACE(stat, "No message queue");
return SFS_ERROR;
}
//------------------------------------------------------------------------------
// File read
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdMqOfsFile::read(XrdSfsFileOffset fileOffset, char* buffer,
XrdSfsXferSize buffer_size)
{
EPNAME("read");
ZTRACE(read, "read");
if (mMsgOut) {
unsigned int mlen = mMsgOut->mMsgBuffer.length();
ZTRACE(read, "reading size:" << buffer_size);
if ((unsigned long) buffer_size < mlen) {
memcpy(buffer, mMsgOut->mMsgBuffer.c_str(), buffer_size);
mMsgOut->mMsgBuffer.erase(0, buffer_size);
return buffer_size;
} else {
memcpy(buffer, mMsgOut->mMsgBuffer.c_str(), mlen);
mMsgOut->mMsgBuffer.clear();
mMsgOut->mMsgBuffer.reserve(0);
return mlen;
}
}
error.setErrInfo(-1, "");
return SFS_ERROR;
}
//------------------------------------------------------------------------------
// File close
//------------------------------------------------------------------------------
int
XrdMqOfsFile::close()
{
if (!mIsOpen) {
return SFS_OK;
}
mIsOpen = false;
eos_info("disconnecting queue: %s", mQueueName.c_str());
{
XrdSysMutexHelper scope_lock(gMqFS->mQueueOutMutex);
if ((gMqFS->mQueueOut.count(mQueueName)) &&
(mMsgOut = gMqFS->mQueueOut[mQueueName])) {
// hmm this could create a dead lock
// mMsgOut->DeletionSem.Wait();
// Take away all pending messages
mMsgOut->RetrieveMessages();
gMqFS->mQueueOut.erase(mQueueName);
delete mMsgOut;
}
mMsgOut = nullptr;
}
{
gMqFS->AdvisoryMessages++;
// submit an advisory message
XrdAdvisoryMqMessage amg("AdvisoryStatus", mQueueName.c_str(), false,
XrdMqMessageHeader::kStatusMessage);
XrdMqMessageHeader::GetTime(amg.kMessageHeader.kSenderTime_sec,
amg.kMessageHeader.kSenderTime_nsec);
XrdMqMessageHeader::GetTime(amg.kMessageHeader.kBrokerTime_sec,
amg.kMessageHeader.kBrokerTime_nsec);
amg.kMessageHeader.kSenderId = gMqFS->BrokerId;
amg.Encode();
// amg.Print();
XrdSmartOucEnv* env = new XrdSmartOucEnv(amg.GetMessageBuffer());
XrdMqOfsMatches matches(gMqFS->QueueAdvisory.c_str(), env, tident,
XrdMqMessageHeader::kStatusMessage, mQueueName.c_str());
if (!gMqFS->Deliver(matches)) {
delete env;
}
}
eos_info("disconnected queue: %s", mQueueName.c_str());
return SFS_OK;
}
/******************************************************************************/
/* G e t F i l e S y s t e m */
/******************************************************************************/
// Set the version information
XrdVERSIONINFO(XrdSfsGetFileSystem, MqOfs);
extern "C"
XrdSfsFileSystem* XrdSfsGetFileSystem(XrdSfsFileSystem* native_fs,
XrdSysLogger* lp,
const char* configfn)
{
// Do the herald thing
gMqOfsEroute.SetPrefix("MqOfs_");
gMqOfsEroute.logger(lp);
gMqOfsEroute.Say("++++++ (c) 2018 CERN/IT-DSS ",
VERSION);
static XrdMqOfs myFS(&gMqOfsEroute);
lp->setRotate(0); // disable XRootD log rotation
gMqFS = &myFS;
gMqFS->ConfigFN = (configfn && *configfn ? strdup(configfn) : 0);
if (gMqFS->Configure(gMqOfsEroute)) {
return 0;
}
// All done, we can return the callout vector to these routines.
return gMqFS;
}
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
XrdMqOfs::XrdMqOfs(XrdSysError* ep):
myPort(1097), mDeliveredMessages(0ull), mFanOutMessages(0ull),
mMaxQueueBacklog(MQOFSMAXQUEUEBACKLOG),
mRejectQueueBacklog(MQOFSREJECTQUEUEBACKLOG), mQdbCluster(), mQdbPassword(),
mQdbContactDetails(), mQcl(nullptr), mMgmId()
{
ConfigFN = 0;
StartupTime = time(0);
LastOutputTime = time(0);
ReceivedMessages = 0;
AdvisoryMessages = 0;
UndeliverableMessages = 0;
DiscardedMonitoringMessages = 0;
BacklogDeferred = NoMessages = QueueBacklogHits = 0;
MaxMessageBacklog = MQOFSMAXMESSAGEBACKLOG;
(void) signal(SIGINT, xrdmqofs_shutdown);
if (getenv("EOS_COVERAGE_REPORT")) {
(void) signal(SIGPROF, xrdmqofs_coverage);
}
HostName = 0;
HostPref = 0;
eos_info("Addr:mQueueOutMutex: 0x%llx", &mQueueOutMutex);
eos_info("Addr:MessageMutex: 0x%llx", &mMsgsMutex);
}
//------------------------------------------------------------------------------
// OFS plugin configure
//------------------------------------------------------------------------------
int XrdMqOfs::Configure(XrdSysError& Eroute)
{
int rc = 0;
char* var;
const char* val;
int cfgFD;
StatisticsFile = "/var/log/eos/mq/proc/stats";
QueuePrefix = "/xmessage/";
QueueAdvisory = "/xmessage/*";
// extract the manager from the config file
XrdOucStream Config(&Eroute, getenv("XRDINSTANCE"));
{
// borrowed from XrdOfs
char buff[256], *bp;
int i;
// Obtain port number we will be using
myPort = (bp = getenv("XRDPORT")) ? strtol(bp, (char**)0, 10) : 0;
// Establish our hostname and IPV4 address
const char* errtext = 0;
HostName = XrdNetUtils::MyHostName(0, &errtext);
if (!HostName) {
return Eroute.Emsg("Config", errno, "cannot get hostname : %s", errtext);
}
XrdNetAddr* addrs = 0;
int nAddrs = 0;
const char* err = XrdNetUtils::GetAddrs(HostName, &addrs, nAddrs,
XrdNetUtils::allIPv64,
XrdNetUtils::NoPortRaw);
if (err || nAddrs == 0) {
sprintf(buff, "[::127.0.0.1]:%d", myPort);
} else {
int len = XrdNetUtils::IPFormat(addrs[0].SockAddr(), buff, sizeof(buff),
XrdNetUtils::noPort | XrdNetUtils::oldFmt);
delete [] addrs;
if (len == 0) {
sprintf(buff, "[::127.0.0.1]:%d", myPort);
} else {
sprintf(buff + len, ":%d", myPort);
}
}
for (i = 0; HostName[i] && HostName[i] != '.'; i++);
HostName[i] = '\0';
HostPref = strdup(HostName);
HostName[i] = '.';
Eroute.Say("=====> mq.hostname: ", HostName, "");
Eroute.Say("=====> mq.hostpref: ", HostPref, "");
ManagerId = HostName;
ManagerId += ":";
ManagerId += (int)myPort;
Eroute.Say("=====> mq.managerid: ", ManagerId.c_str(), "");
int mgm_port = 1094;
char* ptr = getenv("EOS_MGM_DAEMON_PORT_FOR_MQ");
if (ptr) {
(void) eos::common::StringToNumeric(std::string(ptr), mgm_port);
}
mMgmId = SSTR(HostName << ':' << mgm_port).c_str();
}
gMqOfsTrace.What = TRACE_getstats | TRACE_close | TRACE_open;
if (!ConfigFN || !*ConfigFN) {
// this error will be reported by gMqFS->Configure
} else {
// Try to open the configuration file.
if ((cfgFD = open(ConfigFN, O_RDONLY, 0)) < 0) {
return Eroute.Emsg("Config", errno, "open config file fn=", ConfigFN);
}
Config.Attach(cfgFD);
// Now start reading records until eof.
while ((var = Config.GetMyFirstWord())) {
if (!strncmp(var, "mq.", 3)) {
var += 3;
if (!strcmp("queue", var)) {
if ((val = Config.GetWord())) {
QueuePrefix = val;
QueueAdvisory = QueuePrefix;
QueueAdvisory += "*";
}
}
if (!strcmp("maxmessagebacklog", var)) {
if ((val = Config.GetWord())) {
(void) sscanf(val, "%lld", &MaxMessageBacklog);
}
}
if (!strcmp("maxqueuebacklog", var)) {
if ((val = Config.GetWord())) {
uint64_t tmp_val {0};
(void) sscanf(val, "%lu", &tmp_val);
mMaxQueueBacklog = tmp_val;
}
}
if (!strcmp("rejectqueuebacklog", var)) {
if ((val = Config.GetWord())) {
uint64_t tmp_val {0};
(void) sscanf(val, "%lu", &tmp_val);
mRejectQueueBacklog = tmp_val;
}
}
if (!strcmp("trace", var)) {
if ((val = Config.GetWord())) {
auto& g_logging = eos::common::Logging::GetInstance();
g_logging.SetLogPriority(LOG_INFO);
g_logging.SetUnit(SSTR("mq@" << ManagerId).c_str());
XrdOucString tracelevel = val;
if (tracelevel == "low") {
gMqOfsTrace.What = TRACE_close | TRACE_open;
g_logging.SetLogPriority(LOG_INFO);
}
if (tracelevel == "medium") {
gMqOfsTrace.What = TRACE_getstats | TRACE_open | TRACE_close;
g_logging.SetLogPriority(LOG_NOTICE);
}
if (tracelevel == "high") {
gMqOfsTrace.What = TRACE_ALL;
g_logging.SetLogPriority(LOG_DEBUG);
}
}
}
if (!strcmp("statfile", var)) {
if ((val = Config.GetWord())) {
StatisticsFile = val;
}
}
if (!strcmp("qdbcluster", var)) {
while ((val = Config.GetWord())) {
mQdbCluster += val;
mQdbCluster += " ";
}
Eroute.Say("=====> mq.qdbcluster : ", mQdbCluster.c_str());
mQdbContactDetails.members.parse(mQdbCluster);
}
if (!strcmp("qdbpassword", var)) {
while ((val = Config.GetWord())) {
mQdbPassword += val;
}
// Trim whitespace at the end
mQdbPassword.erase(mQdbPassword.find_last_not_of(" \t\n\r\f\v") + 1);
std::string pwlen = std::to_string(mQdbPassword.size());
Eroute.Say("=====> mq.qdbpassword length : ", pwlen.c_str());
mQdbContactDetails.password = mQdbPassword;
}
if (!strcmp("qdbpassword_file", var)) {
std::string path;
while ((val = Config.GetWord())) {
path += val;
}
if (!eos::common::PasswordHandler::readPasswordFile(path, mQdbPassword)) {
Eroute.Emsg("Config", "failed to open path pointed by qdbpassword_file");
rc = 1;
}
std::string pwlen = std::to_string(mQdbPassword.size());
Eroute.Say("=====> mq.qdbpassword length : ", pwlen.c_str());
mQdbContactDetails.password = mQdbPassword;
}
}
}
Config.Close();
}
if (rc) {
eos_err("msg=\"failed while parsing the configuration file\"");
return rc;
}
if (!mQdbContactDetails.members.empty() &&
mQdbContactDetails.password.empty()) {
Eroute.Say("=====> Configuration error: Found QDB cluster members, but no password."
" EOS will only connect to password-protected QDB instances. (mqofs.qdbpassword / mqofs.qdbpassword_file missing)");
return 1;
}
// Create a qclient object if cluster information provided
if (!mQdbCluster.empty()) {
mQcl = std::make_unique(mQdbContactDetails.members,
mQdbContactDetails.constructOptions());
}
XrdOucString basestats = StatisticsFile;
basestats.erase(basestats.rfind("/"));
XrdOucString mkdirbasestats = "mkdir -p ";
mkdirbasestats += basestats;
mkdirbasestats += " 2>/dev/null";
rc = system(mkdirbasestats.c_str());
if (rc) {
fprintf(stderr, "error {%s/%s/%d}: system command failed;retc=%d", __FUNCTION__,
__FILE__, __LINE__, WEXITSTATUS(rc));
}
BrokerId = "root://";
BrokerId += ManagerId;
BrokerId += "/";
BrokerId += QueuePrefix;
Eroute.Say("=====> mq.queue: ", QueuePrefix.c_str());
Eroute.Say("=====> mq.brokerid: ", BrokerId.c_str());
return rc;
}
//------------------------------------------------------------------------------
// File system stat
//------------------------------------------------------------------------------
int
XrdMqOfs::stat(const char* queuename, struct stat* buf, XrdOucErrInfo& error,
const XrdSecEntity* client, const char* opaque)
{
EPNAME("stat");
const char* tident = error.getErrUser();
if (!strcmp(queuename, "/eos/")) {
// this is just a ping test if we are alive
memset(buf, 0, sizeof(struct stat));
buf->st_blksize = 1024;
buf->st_dev = 0;
buf->st_rdev = 0;
buf->st_nlink = 1;
buf->st_uid = 0;
buf->st_gid = 0;
buf->st_size = 0;
buf->st_atime = 0;
buf->st_mtime = 0;
buf->st_ctime = 0;
buf->st_blocks = 1024;
buf->st_ino = 0;
buf->st_mode = S_IXUSR | S_IRUSR | S_IWUSR | S_IFREG;
return SFS_OK;
}
MAYREDIRECT;
XrdMqMessageOut* msg_out = nullptr;
Statistics();
ZTRACE(stat, "stat by buf: " << queuename);
std::string squeue = queuename;
{
XrdSysMutexHelper scope_lock(mQueueOutMutex);
if ((!gMqFS->mQueueOut.count(squeue)) ||
(!(msg_out = gMqFS->mQueueOut[squeue]))) {
return gMqFS->Emsg(epname, error, EINVAL, "check queue - no such queue");
}
msg_out->DeletionSem.Wait();
}
{
gMqFS->AdvisoryMessages++;
// submit an advisory message
XrdAdvisoryMqMessage amg("AdvisoryQuery", queuename, true,
XrdMqMessageHeader::kQueryMessage);
XrdMqMessageHeader::GetTime(amg.kMessageHeader.kSenderTime_sec,
amg.kMessageHeader.kSenderTime_nsec);
XrdMqMessageHeader::GetTime(amg.kMessageHeader.kBrokerTime_sec,
amg.kMessageHeader.kBrokerTime_nsec);
amg.kMessageHeader.kSenderId = gMqFS->BrokerId;
amg.Encode();
// amg.Print();
XrdSmartOucEnv* env = new XrdSmartOucEnv(amg.GetMessageBuffer());
XrdMqOfsMatches matches(gMqFS->QueueAdvisory.c_str(), env, tident,
XrdMqMessageHeader::kQueryMessage, queuename);
if (!gMqFS->Deliver(matches)) {
delete env;
}
}
// this should be the case always ...
ZTRACE(stat, "Waiting for message");
ZTRACE(stat, "Grabbing message");
memset(buf, 0, sizeof(struct stat));
buf->st_blksize = 1024;
buf->st_dev = 0;
buf->st_rdev = 0;
buf->st_nlink = 1;
buf->st_uid = 0;
buf->st_gid = 0;
buf->st_size = msg_out->RetrieveMessages();
buf->st_atime = 0;
buf->st_mtime = 0;
buf->st_ctime = 0;
buf->st_blocks = 1024;
buf->st_ino = 0;
buf->st_mode = S_IXUSR | S_IRUSR | S_IWUSR | S_IFREG;
msg_out->DeletionSem.Post();
if (buf->st_size == 0) {
gMqFS->NoMessages++;
}
return SFS_OK;
}
//------------------------------------------------------------------------------
// Stat by mode
//------------------------------------------------------------------------------
int
XrdMqOfs::stat(const char* Name,
mode_t& mode,
XrdOucErrInfo& error,
const XrdSecEntity* client,
const char* opaque)
{
EPNAME("stat");
const char* tident = error.getErrUser();
ZTRACE(stat, "stat by mode");
return SFS_ERROR;
}
//------------------------------------------------------------------------------
// Statistics
//------------------------------------------------------------------------------
void
XrdMqOfs::Statistics()
{
EPNAME("Statistics");
StatLock.Lock();
static bool startup = true;
static struct timeval tstart;
static struct timeval tstop;
static struct timezone tz;
static uint64_t LastDeliveredMessages;
static uint64_t LastFanOutMessages;
static long long LastReceivedMessages,
LastAdvisoryMessages, LastUndeliverableMessages,
LastNoMessages, LastDiscardedMonitoringMessages;
if (startup) {
tstart.tv_sec = 0;
tstart.tv_usec = 0;
LastDeliveredMessages = LastFanOutMessages = 0ull;
LastReceivedMessages = LastAdvisoryMessages =
LastUndeliverableMessages = LastNoMessages =
LastDiscardedMonitoringMessages = 0;
startup = false;
}
gettimeofday(&tstop, &tz);
if (!tstart.tv_sec) {
gettimeofday(&tstart, &tz);
StatLock.UnLock();
return;
}
const char* tident = "";
time_t now = time(0);
float tdiff = ((tstop.tv_sec - tstart.tv_sec) * 1000) +
(tstop.tv_usec - tstart.tv_usec) / 1000;
if (tdiff > (10 * 1000)) {
// every minute
XrdOucString tmpfile = StatisticsFile;
tmpfile += ".tmp";
int fd = open(tmpfile.c_str(), O_CREAT | O_RDWR | O_TRUNC,
S_IROTH | S_IRGRP | S_IRUSR);
if (fd >= 0) {
char line[4096];
int rc;
sprintf(line, "mq.received %lld\n", ReceivedMessages);
rc = write(fd, line, strlen(line));
sprintf(line, "mq.delivered %lu\n", mDeliveredMessages.load());
rc = write(fd, line, strlen(line));
sprintf(line, "mq.fanout %lu\n", mFanOutMessages.load());
rc = write(fd, line, strlen(line));
sprintf(line, "mq.advisory %lld\n", AdvisoryMessages);
rc = write(fd, line, strlen(line));
sprintf(line, "mq.undeliverable %lld\n", UndeliverableMessages);
rc = write(fd, line, strlen(line));
sprintf(line, "mq.droppedmonitoring %lld\n", DiscardedMonitoringMessages);
rc = write(fd, line, strlen(line));
sprintf(line, "mq.total %lld\n", NoMessages);
rc = write(fd, line, strlen(line));
sprintf(line, "mq.queued %d\n", (int)Messages.size());
rc = write(fd, line, strlen(line));
sprintf(line, "mq.nqueues %d\n", (int)mQueueOut.size());
rc = write(fd, line, strlen(line));
sprintf(line, "mq.backloghits %lld\n", QueueBacklogHits);
rc = write(fd, line, strlen(line));
sprintf(line, "mq.in_rate %f\n",
(1000.0 * (ReceivedMessages - LastReceivedMessages) / (tdiff)));
rc = write(fd, line, strlen(line));
sprintf(line, "mq.out_rate %f\n",
(1000.0 * (mDeliveredMessages - LastDeliveredMessages) / (tdiff)));
rc = write(fd, line, strlen(line));
sprintf(line, "mq.fan_rate %f\n",
(1000.0 * (mFanOutMessages - LastFanOutMessages) / (tdiff)));
rc = write(fd, line, strlen(line));
sprintf(line, "mq.advisory_rate %f\n",
(1000.0 * (AdvisoryMessages - LastAdvisoryMessages) / (tdiff)));
rc = write(fd, line, strlen(line));
sprintf(line, "mq.undeliverable_rate %f\n",
(1000.0 * (UndeliverableMessages - LastUndeliverableMessages) / (tdiff)));
rc = write(fd, line, strlen(line));
sprintf(line, "mq.droppedmonitoring_rate %f\n",
(1000.0 * (DiscardedMonitoringMessages - LastDiscardedMonitoringMessages) /
(tdiff)));
rc = write(fd, line, strlen(line));
sprintf(line, "mq.total_rate %f\n",
(1000.0 * (NoMessages - LastNoMessages) / (tdiff)));
rc = write(fd, line, strlen(line));
close(fd);
rc = ::rename(tmpfile.c_str(), StatisticsFile.c_str());
if (rc) {
fprintf(stderr, "error {%s/%s/%d}: system command failed;retc=%d", __FUNCTION__,
__FILE__, __LINE__, WEXITSTATUS(rc));
}
}
gettimeofday(&tstart, &tz);
ZTRACE(getstats, "*****************************************************");
ZTRACE(getstats, "Received Messages : " << ReceivedMessages);
ZTRACE(getstats, "Delivered Messages : " << mDeliveredMessages);
ZTRACE(getstats, "FanOut Messages : " << mFanOutMessages);
ZTRACE(getstats, "Advisory Messages : " << AdvisoryMessages);
ZTRACE(getstats, "Undeliverable Messages : " << UndeliverableMessages);
ZTRACE(getstats, "Discarded Monitoring Messages : " <<
DiscardedMonitoringMessages);
ZTRACE(getstats, "No Messages : " << NoMessages);
ZTRACE(getstats, "Queue Messages : " << Messages.size());
ZTRACE(getstats, "#Queues : " << mQueueOut.size());
ZTRACE(getstats, "Deferred Messages (backlog) : " << BacklogDeferred);
ZTRACE(getstats, "Backlog Messages Hits : " << QueueBacklogHits);
char rates[4096];
sprintf(rates,
"Rates: IN: %.02f OUT: %.02f FAN: %.02f ADV: %.02f: UNDEV: %.02f DISCMON: %.02f NOMSG: %.02f"
, (1000.0 * (ReceivedMessages - LastReceivedMessages) / (tdiff))
, (1000.0 * (mDeliveredMessages - LastDeliveredMessages) / (tdiff))
, (1000.0 * (mFanOutMessages - LastFanOutMessages) / (tdiff))
, (1000.0 * (AdvisoryMessages - LastAdvisoryMessages) / (tdiff))
, (1000.0 * (UndeliverableMessages - LastUndeliverableMessages) / (tdiff))
, (1000.0 * (DiscardedMonitoringMessages - LastDiscardedMonitoringMessages) /
(tdiff))
, (1000.0 * (NoMessages - LastNoMessages) / (tdiff)));
ZTRACE(getstats, rates);
ZTRACE(getstats, "*****************************************************");
LastOutputTime = now;
LastReceivedMessages = ReceivedMessages;
LastDeliveredMessages = mDeliveredMessages;
LastFanOutMessages = mFanOutMessages;
LastAdvisoryMessages = AdvisoryMessages;
LastUndeliverableMessages = UndeliverableMessages;
LastNoMessages = NoMessages;
LastDiscardedMonitoringMessages = DiscardedMonitoringMessages;
}
StatLock.UnLock();
}
//------------------------------------------------------------------------------
// Get the identity of the current lease holder
//------------------------------------------------------------------------------
std::string
XrdMqOfs::GetLeaseHolder()
{
std::string holder;
std::future f = mQcl->exec("lease-get", sLeaseKey);
qclient::redisReplyPtr reply = f.get();
if ((reply == nullptr) || (reply->type == REDIS_REPLY_NIL)) {
eos_debug("%s", "msg=\"lease-get is NULL\"");
return holder;
}
std::string reply_msg = std::string(reply->element[0]->str,
reply->element[0]->len);
eos_debug("lease-get reply: %s", reply_msg.c_str());
std::string tag {"HOLDER: "};
size_t pos = reply_msg.find(tag);
if (pos == std::string::npos) {
return holder;
}
pos += tag.length();
size_t pos_end = reply_msg.find('\n', pos);
if (pos_end == std::string::npos) {
holder = reply_msg.substr(pos);
} else {
holder = reply_msg.substr(pos, pos_end - pos + 1);
}
return holder;
}
//------------------------------------------------------------------------------
// Decide if client should be redirected to a different host based on the
// current master-slave status.
//------------------------------------------------------------------------------
bool XrdMqOfs::ShouldRedirect(XrdOucString& host, int& port)
{
if (mQcl) {
return ShouldRedirectQdb(host, port);
} else {
return ShouldRedirectInMem(host, port);
}
}
//------------------------------------------------------------------------------
// Decide if client should be redirected to a different host based on the
// current master-slave status. Used for QuarkDB namespace.
//------------------------------------------------------------------------------
bool XrdMqOfs::ShouldRedirectQdb(XrdOucString& host, int& port)
{
using namespace std::chrono;
static time_t last_check = 0;
static std::string s_master_id;
static std::mutex mutex;
time_t now = time(nullptr);
std::string master_id;
// The master lease is taken for 10 seconds so we can check every 5 seconds
if (now - last_check > 5) {
last_check = now;
std::unique_lock lock(mutex);
s_master_id = GetLeaseHolder();
master_id = s_master_id;
} else {
std::unique_lock lock(mutex);
master_id = s_master_id;
}
eos_static_debug("master_id=\"%s\", current_mq_id=\"%s\"", master_id.c_str(),
mMgmId.c_str());
// If we are the current master or there is no master then don't redirect
if ((master_id == mMgmId) || master_id.empty()) {
return false;
} else {
size_t pos = master_id.find(':');
try {
host = master_id.substr(0, pos).c_str();
port = myPort; // 1097
} catch (const std::exception& e) {
eos_static_notice("msg=\"unset or unexpected master identity format\" "
"master_id=\"%s\"", master_id.c_str());
return false;
}
if (now - last_check > 10) {
eos_static_info("msg=\"redirect to new master mq\" id=%s:%i", host.c_str(),
port);
}
return true;
}
}
//------------------------------------------------------------------------------
// Decide if client should be redirected to a different host based on the
// current master-slave status. Used for in-memory namespace.
//------------------------------------------------------------------------------
bool XrdMqOfs::ShouldRedirectInMem(XrdOucString& host, int& port)
{
EPNAME("ShouldRedirect");
const char* tident = "internal";
static time_t lastaliascheck = 0;
static bool isSlave = false;
static XrdOucString remoteMq = "localhost";
static XrdSysMutex sMutex;
XrdSysMutexHelper sLock(sMutex);
time_t now = time(NULL);
if ((now - lastaliascheck) > 10) {
XrdOucString myName = HostName;
XrdOucString master1Name;
XrdOucString master2Name;
bool m1ok;
bool m2ok;
m1ok = ResolveName(getenv("EOS_MGM_MASTER1"), master1Name);
m2ok = ResolveName(getenv("EOS_MGM_MASTER2"), master2Name);
if (!m1ok) {
fprintf(stderr, "error: unable to resolve %s\n", getenv("EOS_MGM_MASTER1"));
}
if (!m2ok) {
fprintf(stderr, "error: unable to resolve %s\n", getenv("EOS_MGM_MASTER2"));
}
remoteMq = "localhost";
isSlave = false;
if (myName == master1Name) {
remoteMq = master2Name;
}
if (myName == master2Name) {
remoteMq = master1Name;
}
{
// check if we should be master or slave MQ
XrdOucString mastertagfile = "/var/eos/eos.mgm.rw";
XrdOucString remotemqfile = "/var/eos/eos.mq.remote.up";
XrdOucString localmqfile = "/var/eos/eos.mq.master";
struct stat buf;
if (::stat(localmqfile.c_str(), &buf)) {
isSlave = true;
if (::stat(remotemqfile.c_str(), &buf)) {
// oh no, the remote mq is down, keep the guys around here
isSlave = false;
}
} else {
// we should be the master according to configuration
isSlave = false;
}
}
lastaliascheck = now;
if (isSlave) {
host = remoteMq;
port = myPort;
ZTRACE(redirect, "Redirect (resolv)" << host.c_str() << ":" << port);
return true;
} else {
host = "localhost";
port = myPort;
ZTRACE(redirect, "Stay (resolve)" << host.c_str() << ":" << port);
return false;
}
} else {
if (isSlave) {
host = remoteMq;
port = myPort;
ZTRACE(redirect, "Redirect (cached) " << host.c_str() << ":" << port);
return true;
} else {
host = "localhost";
port = myPort;
ZTRACE(redirect, "Stay (cached) " << host.c_str() << ":" << port);
}
}
return false;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
bool XrdMqOfs::ResolveName(const char* inhost, XrdOucString& outhost)
{
struct hostent* hp;
struct hostent* rhp;
if (!inhost) {
return false;
}
hp = gethostbyname(inhost);
outhost = "localhost";
if (hp) {
if (hp->h_addrtype == AF_INET) {
if (hp->h_addr_list[0]) {
outhost = inet_ntoa(*(struct in_addr*)hp->h_addr_list[0]);
rhp = gethostbyaddr(hp->h_addr_list[0], sizeof(int), AF_INET);
if (rhp) {
outhost = rhp->h_name;
}
return true;
}
}
}
return false;
}
//------------------------------------------------------------------------------
// Error message formatting
//------------------------------------------------------------------------------
int XrdMqOfs::Emsg(const char* pfx, // Message prefix value
XrdOucErrInfo& einfo, // Place to put text & error code
int ecode, // The error code
const char* op, // Operation being performed
const char* target) // The target (e.g., fname)
{
char etext[128], buffer[4096];
// Get the reason for the error
if (ecode < 0) {
ecode = -ecode;
}
if (eos::common::strerror_r(ecode, etext, sizeof(etext))) {
snprintf(etext, sizeof(etext), "reason unknown (%d)", ecode);
}
// Format the error message
snprintf(buffer, sizeof(buffer), "Unable to %s %s; %s", op, target, etext);
gMqOfsEroute.Emsg(pfx, buffer);
// Place the error message in the error object and return
einfo.setErrInfo(ecode, buffer);
return SFS_ERROR;
}
//------------------------------------------------------------------------------
// Stall method
//------------------------------------------------------------------------------
int XrdMqOfs::Stall(XrdOucErrInfo& error, // Error text & code
int stime, // Seconds to stall
const char* msg) // Message to give
{
XrdOucString smessage = msg;
smessage += "; come back in ";
smessage += stime;
smessage += " seconds!";
EPNAME("Stall");
const char* tident = error.getErrUser();
ZTRACE(delay, "Stall " << stime << ": " << smessage.c_str());
// Place the error message in the error object and return
error.setErrInfo(0, smessage.c_str());
return stime;
}
//------------------------------------------------------------------------------
// Build redirect response
//------------------------------------------------------------------------------
int XrdMqOfs::Redirect(XrdOucErrInfo& error, XrdOucString& host, int& port)
{
EPNAME("Redirect");
const char* tident = error.getErrUser();
ZTRACE(delay, "Redirect " << host.c_str() << ":" << port);
// Place the error message in the error object and return
error.setErrInfo(port, host.c_str());
return SFS_REDIRECT;
}
//------------------------------------------------------------------------------
// Get XRootD version
//------------------------------------------------------------------------------
const char*
XrdMqOfs::getVersion()
{
return XrdVERSION;
}
//------------------------------------------------------------------------------
// FSctl plugin function
//------------------------------------------------------------------------------
int
XrdMqOfs::FSctl(const int cmd, XrdSfsFSctl& args, XrdOucErrInfo& error,
const XrdSecEntity* client)
{
char ipath[XRDMQOFS_FSCTLPATHLEN + 1];
static const char* epname = "FSctl";
const char* tident = client->tident;
SetLogId(nullptr, tident);
eos_static_debug("arg1=\"%s\" arg2=\"%s\"", args.Arg1, args.Arg2);
MAYREDIRECT;
if (cmd != SFS_FSCTL_PLUGIN) {
gMqFS->Emsg(epname, error, EINVAL, "to call FSctl - not supported", "");
return SFS_ERROR;
}
// check for backlog
if ((long long)Messages.size() > MaxMessageBacklog) {
// this is not absolutely threadsafe .... better would lock
BacklogDeferred++;
eos_static_err("%s", "msg=\"too many pending messages, reject message\"");
gMqFS->Emsg(epname, error, ENOMEM, "accept message - too many pending messages",
"");
return SFS_ERROR;
}
if (args.Arg1Len) {
if (args.Arg1Len < XRDMQOFS_FSCTLPATHLEN) {
strncpy(ipath, args.Arg1, args.Arg1Len);
ipath[args.Arg1Len] = 0;
} else {
gMqFS->Emsg(epname, error, EINVAL, "convert path argument - string too long",
"");
return SFS_ERROR;
}
} else {
ipath[0] = 0;
}
// from here on we can deal with XrdOucString which is more 'comfortable'
XrdOucString path = ipath;
XrdOucString result = "";
XrdOucString opaque = "";
if (args.Arg2Len) {
opaque.assign(args.Arg2, 0, args.Arg2Len);
}
XrdSmartOucEnv* env = new XrdSmartOucEnv(opaque.c_str());
if (!env) {
gMqFS->Emsg(epname, error, ENOMEM, "allocate memory", "");
return SFS_ERROR;
}
// look into the header
XrdMqMessageHeader mh;
if (!mh.Decode(opaque.c_str())) {
gMqFS->Emsg(epname, error, EINVAL, "decode message header", "");
delete env;
return SFS_ERROR;
}
// add the broker ID
mh.kBrokerId = BrokerId;
// update broker time
mh.GetTime(mh.kBrokerTime_sec, mh.kBrokerTime_nsec);
// dump it
// mh.Print();
// encode the new values
mh.Encode();
// replace the old header with the new one .... that's ugly :-(
int envlen;
XrdOucString envstring = env->Env(envlen);
int p1 = envstring.find(XMQHEADER);
int p2 = envstring.find("&", p1 + 1);
envstring.erase(p1, p2 - 1);
envstring.insert(mh.GetHeaderBuffer(), p1);
delete env;
env = new XrdSmartOucEnv(envstring.c_str());
XrdMqOfsMatches matches(mh.kReceiverQueue.c_str(), env, tident, mh.kType,
mh.kSenderId.c_str());
Deliver(matches);
if (matches.backlogrejected) {
XrdOucString backlogmessage =
"queue message on all receivers - maximum backlog exceeded on queues: ";
backlogmessage += matches.backlogqueues;
gMqFS->Emsg(epname, error, E2BIG, backlogmessage.c_str(), ipath);
if (backlogmessage.length() > 255) {
backlogmessage.erase(255);
backlogmessage += "...";
}
TRACES(backlogmessage.c_str());
if (!matches.message->Refs()) {
delete env;
}
return SFS_ERROR;
}
if (matches.backlog) {
XrdOucString backlogmessage =
"guarantee quick delivery - backlog exceeded on queues: ";
backlogmessage += matches.backlogqueues;
if (backlogmessage.length() > 255) {
backlogmessage.erase(255);
backlogmessage += "...";
}
gMqFS->Emsg(epname, error, ENFILE, backlogmessage.c_str(), ipath);
TRACES(backlogmessage.c_str());
return SFS_ERROR;
}
if (matches.matches) {
const char* result = "OK";
error.setErrInfo(3, (char*)result);
if (((matches.messagetype) != XrdMqMessageHeader::kStatusMessage) &&
((matches.messagetype) != XrdMqMessageHeader::kQueryMessage)) {
gMqFS->ReceivedMessages++;
}
return SFS_DATA;
} else {
bool ismonitor = false;
if (env->Get(XMQMONITOR)) {
ismonitor = true;
}
int envlen;
std::string c = env->Env(envlen);
delete env;
// This is a new hook for special monitoring message, to just accept them
// and if nobody listens they just go to nirvana.
if (!ismonitor) {
gMqFS->UndeliverableMessages++;
gMqFS->Emsg(epname, error, EINVAL,
"submit message - no listener on requested queue: ", ipath);
TRACES("no listener on requested queue: ");
TRACES(ipath);
return SFS_ERROR;
} else {
// fprintf(stderr,"Dropped Monitor message %s\n",c.c_str());
ZTRACE(fsctl, "Discarding monitor message without receiver");
const char* result = "OK";
error.setErrInfo(3, (char*)result);
gMqFS->DiscardedMonitoringMessages++;
return SFS_DATA;
}
}
}
//------------------------------------------------------------------------------
// Helper Classes & Functions
//------------------------------------------------------------------------------
bool
XrdMqOfs::Deliver(XrdMqOfsMatches& Matches)
{
EPNAME("Deliver");
XrdSysMutexHelper scope_lock(mQueueOutMutex);
const char* tident = Matches.mTident;
std::string sendername = Matches.sendername.c_str();
// Store all the queues where we need to deliver this message
std::vector matched_out_queues;
Matches.message->procmutex.Lock();
// If we have a status message we have to do a complete loop
if (((Matches.messagetype) == XrdMqMessageHeader::kStatusMessage) ||
((Matches.messagetype) == XrdMqMessageHeader::kQueryMessage)) {
for (auto it = mQueueOut.begin(); it != mQueueOut.end(); ++it) {
XrdMqMessageOut* msg_out = it->second;
// If this is be a loop back message we continue
if (sendername == it->first) {
// avoid feedback to the same queue
continue;
}
// If this queue does not take advisory status messages we continue
if ((Matches.messagetype == XrdMqMessageHeader::kStatusMessage) &&
(!msg_out->AdvisoryStatus)) {
continue;
}
// if this queue does not take advisory query messages we continue
if ((Matches.messagetype == XrdMqMessageHeader::kQueryMessage) &&
(!msg_out->AdvisoryQuery)) {
continue;
} else {
matched_out_queues.push_back(msg_out);
}
}
} else {
// If we have a wildcard match we have to do a complete loop
if ((Matches.queuename.find("*") != STR_NPOS)) {
for (auto it = mQueueOut.begin(); it != mQueueOut.end(); ++it) {
XrdMqMessageOut* msg_out = it->second;
// fprintf(stderr,"current queue name: %s <=> sender :%s\n",
// it->first.c_str(), sendername.c_str());
// If this would be a loop back message we continue
if (sendername == it->first) {
// avoid feedback to the same queue
continue;
}
XrdOucString Key = it->first.c_str();
XrdOucString nowildcard = Matches.queuename;
nowildcard.replace("*", "");
int nmatch = Key.matches(Matches.queuename.c_str(), '*');
if (nmatch == nowildcard.length()) {
// this is a match
ZTRACE(fsctl, "Adding Wildcard matched Message to Queuename: "
<< msg_out->QueueName.c_str());
matched_out_queues.push_back(msg_out);
}
}
} else {
// We have just to find one named queue
std::string queuename = Matches.queuename.c_str();
XrdMqMessageOut* msg_out = 0;
if (mQueueOut.count(queuename)) {
msg_out = mQueueOut[queuename];
}
if (msg_out) {
ZTRACE(fsctl, "Adding full matched Message to Queuename: " <<
msg_out->QueueName.c_str());
matched_out_queues.push_back(msg_out);
}
}
}
// This is a match
if (matched_out_queues.size()) {
Matches.backlog = false;
Matches.backlogrejected = false;
// Lock all matched queues at once
for (auto msg_out : matched_out_queues) {
msg_out->Lock();
}
for (auto msg_out : matched_out_queues) {
// check for backlog on this queue and set a warning flag
if (msg_out->mMsgQueue.size() > mMaxQueueBacklog) {
// Only set the backlog flag if the queue has not set the advisory
// flush back log flag
if (!msg_out->AdvisoryFlushBackLog) {
Matches.backlog = true;
} else {
if (!msg_out->BrokenByFlush) {
msg_out->BrokenByFlush = true;
TRACES("warning: queue " << msg_out->QueueName
<< " is broken by backlog flush of "
<< mMaxQueueBacklog << " message!");
}
}
Matches.backlogqueues += msg_out->QueueName;
Matches.backlogqueues += ":";
gMqFS->QueueBacklogHits++;
if (!msg_out->BrokenByFlush) {
TRACES("warning: queue " << msg_out->QueueName
<< " exceeds backlog of " << mMaxQueueBacklog
<< " message!");
}
} else {
if (msg_out->BrokenByFlush) {
msg_out->BrokenByFlush = false;
TRACES("warning: re-enabling queue " << msg_out->QueueName
<< " backlog is now " << msg_out->mMsgQueue.size()
<< " messages!");
}
}
if (msg_out->mMsgQueue.size() > mRejectQueueBacklog) {
// Only set the reject flag if the queue has not set the advisory
// flush back log flag
if (!msg_out->AdvisoryFlushBackLog) {
Matches.backlogrejected = true;
} else {
if (!msg_out->BrokenByFlush) {
msg_out->BrokenByFlush = true;
TRACES("warning: queue " << msg_out->QueueName
<< " is broken by backlog flush of " << mRejectQueueBacklog
<< " message!");
}
}
Matches.backlogqueues += msg_out->QueueName;
Matches.backlogqueues += ":";
gMqFS->BacklogDeferred++;
if (!msg_out->BrokenByFlush)
TRACES("error: queue " << msg_out->QueueName
<< " exceeds max. accepted backlog of " << mRejectQueueBacklog
<< " message!");
} else {
if (!msg_out->BrokenByFlush) {
// We deliver only to not broken clients, they have to reconnect to
// get out of this situation
Matches.matches++;
if (Matches.matches == 1) {
// add to the message hash
std::string messageid = Matches.message->Get(XMQHEADER);
XrdSysMutexHelper scope_lock(gMqFS->mMsgsMutex);
gMqFS->Messages.insert(std::pair (messageid,
Matches.message));
}
ZTRACE(fsctl, "Adding Message to Queuename: " << msg_out->QueueName.c_str());
msg_out->mMsgQueue.push_back(Matches.message);
Matches.message->AddRefs(1);
}
}
}
// Unlock all matched queues at once
for (auto msg_out : matched_out_queues) {
msg_out->UnLock();
}
}
Matches.message->procmutex.UnLock();
return (Matches.matches > 0);
}
//------------------------------------------------------------------------------
// Collect all messages from the queue and append them to the internal
// buffer. Also delete messages if this was the last reference towards them.
//------------------------------------------------------------------------------
size_t
XrdMqMessageOut::RetrieveMessages()
{
XrdSmartOucEnv* message {nullptr};
XrdSysMutexHelper scope_lock(mMutex);
while (mMsgQueue.size()) {
message = mMsgQueue.front();
mMsgQueue.pop_front();
message->procmutex.Lock();
// fprintf(stderr,"%llu %s Message %llu nref: %d\n", (unsigned long long)
// &mMsgQueue, QueueName.c_str(), (unsigned long long) message, message->Refs());
int len;
mMsgBuffer += message->Env(len);
++gMqFS->mDeliveredMessages;
message->DecRefs();
if (message->Refs() <= 0) {
// We can delete this message from the queue!
const char* ptr = message->Get(XMQHEADER);
if (ptr) {
std::string msg_id = message->Get(XMQHEADER);
XrdSysMutexHelper scope_lock(gMqFS->mMsgsMutex);
gMqFS->Messages.erase(msg_id.c_str());
}
message->procmutex.UnLock();
// fprintf(stderr,"%s delete %llu \n", QueueName.c_str(),
// (unsigned long long) message);
delete message;
++gMqFS->mFanOutMessages;
} else {
message->procmutex.UnLock();
}
}
return mMsgBuffer.length();
}