//------------------------------------------------------------------------------ // File: Messaging.cc // Author: Andreas-Joachim Peters - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "XrdOuc/XrdOucEnv.hh" #include "fst/storage/Storage.hh" #include "fst/Messaging.hh" #include "fst/Deletion.hh" #include "fst/Verify.hh" #include "fst/XrdFstOfs.hh" #include "common/SymKeys.hh" EOSFSTNAMESPACE_BEGIN //------------------------------------------------------------------------------ // Listen for incoming messages //------------------------------------------------------------------------------ void Messaging::Listen(ThreadAssistant& assistant) noexcept { std::unique_ptr new_msg; while (!assistant.terminationRequested()) { new_msg.reset(XrdMqMessaging::gMessageClient.RecvMessage(&assistant)); // We were redirected to a new MQ endponint request broadcast if (XrdMqMessaging::gMessageClient.GetAndResetNewMqFlag()) { gOFS.RequestBroadcasts(); } if (new_msg) { Process(new_msg.get()); } else { assistant.wait_for(std::chrono::seconds(2)); } } } //------------------------------------------------------------------------------ // Process incomming messages //------------------------------------------------------------------------------ void Messaging::Process(XrdMqMessage* newmessage) { XrdOucString saction = newmessage->GetBody(); XrdOucEnv action(saction.c_str()); XrdOucString cmd = action.Get("mgm.cmd"); XrdOucString subcmd = action.Get("mgm.subcmd"); // Shared object communication point if (mSom) { XrdOucString error = ""; bool result = mSom->ParseEnvMessage(newmessage, error); if (!result) { if (error != "no subject in message body") { eos_info("msg=\"%s\" body=\"%s\"", error.c_str(), saction.c_str()); } else { eos_debug("%s", error.c_str()); } } else { return; } } } EOSFSTNAMESPACE_END