// ----------------------------------------------------------------------
// File: Messaging.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include
#include "mgm/Messaging.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/FsView.hh"
#include "mq/MessagingRealm.hh"
EOSMGMNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
Messaging::Messaging(const char* url, const char* defaultreceiverqueue,
mq::MessagingRealm* realm)
{
mSom = realm->getSom();
// Add to a broker with the flushbacklog flag since we don't want to
// block message flow in case of a master/slave MGM where one got stuck or
// is too slow
if (gMessageClient.AddBroker(url, true, true , true)) {
mIsZombie = false;
} else {
mIsZombie = true;
}
int spos;
XrdOucString clientid = url;
spos = clientid.find("//");
if (spos != STR_NPOS) {
spos = clientid.find("//", spos + 1);
clientid.erase(0, spos + 1);
gMessageClient.SetClientId(clientid.c_str());
}
gMessageClient.Subscribe();
gMessageClient.SetDefaultReceiverQueue(defaultreceiverqueue);
}
//------------------------------------------------------------------------------
// Infinite loop processing messages
//------------------------------------------------------------------------------
void
Messaging::Listen(ThreadAssistant& assistant) noexcept
{
std::unique_ptr new_msg;
while (!assistant.terminationRequested()) {
int64_t t1 = std::chrono::duration_cast
(std::chrono::steady_clock::now().time_since_epoch()).count();
new_msg.reset(XrdMqMessaging::gMessageClient.RecvMessage(&assistant));
int64_t t2 = std::chrono::duration_cast
(std::chrono::steady_clock::now().time_since_epoch()).count();
if ((t2 - t1) > 2000) {
eos_warning("MQ heartbeat recv lasted %ld milliseconds",
t2 - t1);
}
if (new_msg) {
int64_t t3 = std::chrono::duration_cast
(std::chrono::steady_clock::now().time_since_epoch()).count();
Process(new_msg.get());
if ((t3 - t2) > 2000) {
eos_warning("MQ heartbeat processing lasted %ld milliseconds",
t3 - t2);
}
} else {
assistant.wait_for(std::chrono::seconds(1));
}
}
}
//------------------------------------------------------------------------------
// Process heartbeat information based on the given advisory message
//------------------------------------------------------------------------------
void
Messaging::ProcessIncomingHeartbeat(const std::string& nodequeue, bool online,
time_t senderTimeSec)
{
if (FsView::gFsView.mNodeView.count(nodequeue)) {
auto* node = FsView::gFsView.mNodeView.find(nodequeue)->second;
if (online) {
if (node->GetActiveStatus() != eos::common::ActiveStatus::kOnline) {
node->SetActiveStatus(eos::common::ActiveStatus::kOnline);
}
} else {
if (node->GetActiveStatus() != eos::common::ActiveStatus::kOffline) {
node->SetActiveStatus(eos::common::ActiveStatus::kOffline);
// Propagate into filesystem states
for (auto it = node->begin(); it != node->end(); ++it) {
FileSystem* entry = FsView::gFsView.mIdView.lookupByID(*it);
if (entry) {
entry->SetStatus(eos::common::BootStatus::kDown, false);
}
}
}
}
eos_static_debug("msg=\"setting heart beat to %llu for node queue=%s\"",
(unsigned long long) senderTimeSec, nodequeue.c_str());
node->SetHeartBeat(senderTimeSec);
}
}
//------------------------------------------------------------------------------
// Update based on advisory message
//------------------------------------------------------------------------------
bool
Messaging::Update(XrdAdvisoryMqMessage* advmsg)
{
if (!advmsg) {
return false;
}
std::string nodequeue = advmsg->kQueue.c_str();
eos::common::RWMutexReadLock rd_fs_lock(FsView::gFsView.ViewMutex);
if (FsView::gFsView.mNodeView.count(nodequeue) == 0) {
// Rare case where a node is not yet known
rd_fs_lock.Release();
// Register the node to the global view and config
eos_static_info("Registering node queue %s ..", nodequeue.c_str());
eos::common::RWMutexWriteLock wr_fs_lock(FsView::gFsView.ViewMutex);
if (FsView::gFsView.RegisterNode(nodequeue.c_str())) {
// Just initialize config queue, taken care by constructor
mq::SharedHashWrapper(gOFS->mMessagingRealm.get(),
common::SharedHashLocator::makeForNode(nodequeue));
}
ProcessIncomingHeartbeat(nodequeue, advmsg->kOnline,
advmsg->kMessageHeader.kSenderTime_sec);
return true;
} else {
// Here we can go just with a read lock
ProcessIncomingHeartbeat(nodequeue, advmsg->kOnline,
advmsg->kMessageHeader.kSenderTime_sec);
return true;
}
}
//------------------------------------------------------------------------------
// Process message
//------------------------------------------------------------------------------
void
Messaging::Process(XrdMqMessage* new_msg)
{
static bool discardmode = false;
if ((new_msg->kMessageHeader.kType == XrdMqMessageHeader::kStatusMessage) ||
(new_msg->kMessageHeader.kType == XrdMqMessageHeader::kQueryMessage)) {
if (discardmode) {
return;
}
XrdAdvisoryMqMessage* advisorymessage = XrdAdvisoryMqMessage::Create(
new_msg->GetMessageBuffer());
if (advisorymessage) {
eos_debug("queue=%s online=%d", advisorymessage->kQueue.c_str(),
advisorymessage->kOnline);
if (advisorymessage->kQueue.endswith("/fst")) {
if (!Update(advisorymessage)) {
eos_err("cannot update node status for %s", advisorymessage->GetBody());
}
}
delete advisorymessage;
}
} else {
// deal with shared object exchange messages
if (mSom) {
// do a cut on the maximum allowed delay for shared object messages
if ((!discardmode) &&
((new_msg->kMessageHeader.kReceiverTime_sec -
new_msg->kMessageHeader.kBrokerTime_sec) > 60)) {
eos_crit("dropping shared object message because of message delays of %d seconds",
(new_msg->kMessageHeader.kReceiverTime_sec -
new_msg->kMessageHeader.kBrokerTime_sec));
discardmode = true;
return;
} else {
// we accept when we catched up
if ((new_msg->kMessageHeader.kReceiverTime_sec -
new_msg->kMessageHeader.kBrokerTime_sec) <= 5) {
discardmode = false;
} else {
if (discardmode) {
eos_crit("dropping shared object message because of message delays of %d seconds",
(new_msg->kMessageHeader.kReceiverTime_sec -
new_msg->kMessageHeader.kBrokerTime_sec));
return;
}
}
}
// parse as shared object manager message
XrdOucString error = "";
bool result = mSom->ParseEnvMessage(new_msg, error);
// TIMING("ParseEnv-Stop",&somTiming);
// somTiming.Print();
if (!result) {
if ((error != "no subject in message body") &&
(error != "no pairs in message body")) {
// new_msg->Print();
eos_err("%s", error.c_str());
} else {
eos_debug("%s", error.c_str());
}
return;
} else {
return;
}
}
XrdOucString saction = new_msg->GetBody();
// new_msg->Print();
// replace the arg separator # with an & to be able to put it into XrdOucEnv
XrdOucEnv action(saction.c_str());
XrdOucString cmd = action.Get("mgm.cmd");
XrdOucString subcmd = action.Get("mgm.subcmd");
}
}
EOSMGMNAMESPACE_END