// ---------------------------------------------------------------------- // File: MessagingRealm.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2020 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include #include #include #include "mq/MessagingRealm.hh" #include "mq/XrdMqMessage.hh" #include "mq/XrdMqClient.hh" #include "mq/FsChangeListener.hh" EOSMQNAMESPACE_BEGIN //------------------------------------------------------------------------------ // Initialize legacy-MQ-based messaging realm. //------------------------------------------------------------------------------ MessagingRealm::MessagingRealm(XrdMqSharedObjectManager* som, XrdMqSharedObjectChangeNotifier* notif, XrdMqClient* mqcl, qclient::SharedManager* qsom) : mSom(som), mNotifier(notif), mMessageClient(mqcl), mQSom(qsom), mHashProvider(qsom), mDequeProvider(qsom) {} //------------------------------------------------------------------------------ // Is this a QDB realm? //------------------------------------------------------------------------------ bool MessagingRealm::haveQDB() const { return mQSom != nullptr; } //------------------------------------------------------------------------------ // Get som //------------------------------------------------------------------------------ XrdMqSharedObjectManager* MessagingRealm::getSom() const { return mSom; } //------------------------------------------------------------------------------ // Get legacy change notifier //------------------------------------------------------------------------------ XrdMqSharedObjectChangeNotifier* MessagingRealm::getChangeNotifier() const { return mNotifier; } //------------------------------------------------------------------------------ // Get qclient shared manager //------------------------------------------------------------------------------ qclient::SharedManager* MessagingRealm::getQSom() const { return mQSom; } //------------------------------------------------------------------------------ // Get pointer to hash provider //------------------------------------------------------------------------------ SharedHashProvider* MessagingRealm::getHashProvider() { return &mHashProvider; } //------------------------------------------------------------------------------ // Get pointer to deque provider //------------------------------------------------------------------------------ SharedDequeProvider* MessagingRealm::getDequeProvider() { return &mDequeProvider; } //------------------------------------------------------------------------------ //! Send message to the given receiver queue //------------------------------------------------------------------------------ MessagingRealm::Response MessagingRealm::sendMessage(const std::string& descr, const std::string& payload, const std::string& receiver, bool is_monitor) { Response resp; if (haveQDB()) { // The reply to publish is the number of subscribers that receive the msg qclient::redisReplyPtr reply = mQSom->getQClient()->exec("PUBLISH", receiver, payload).get(); if (reply->type == REDIS_REPLY_INTEGER) { resp.status = (reply->integer == 0 ? 1 : 0); } else { resp.status = 1; } } else { XrdMqMessage message(descr.c_str()); message.SetBody(payload.c_str()); if (is_monitor) { message.MarkAsMonitor(); } if (mMessageClient->SendMessage(message, receiver.c_str())) { resp.status = 0; } else { resp.status = 1; } } return resp; } //------------------------------------------------------------------------------ // Set instance name //------------------------------------------------------------------------------ bool MessagingRealm::setInstanceName(const std::string& name) { if (!haveQDB()) { return true; } qclient::QClient* qcl = mQSom->getQClient(); qclient::redisReplyPtr reply = qcl->exec("SET", "eos-instance-name", name).get(); qclient::StatusParser parser(reply); if (!parser.ok()) { eos_static_crit("error while setting instance name in QDB: %s", parser.err().c_str()); return false; } if (parser.value() != "OK") { eos_static_crit("unexpected response while setting instance name in QDB: %s", parser.value().c_str()); return false; } return true; } //------------------------------------------------------------------------------ // Get instance name //------------------------------------------------------------------------------ bool MessagingRealm::getInstanceName(std::string& name) { if (!haveQDB()) { return false; } qclient::QClient* qcl = mQSom->getQClient(); qclient::redisReplyPtr reply = qcl->exec("GET", "eos-instance-name").get(); qclient::StringParser parser(reply); if (!parser.ok()) { return false; } name = parser.value(); if (name.empty()) { return false; } return true; } //------------------------------------------------------------------------------ // Get FsChange listener with given name //------------------------------------------------------------------------------ std::shared_ptr MessagingRealm::GetFsChangeListener(const std::string& name) { std::scoped_lock lock(mMutexListeners); auto it = mFsListeners.find(name); if (it != mFsListeners.end()) { return it->second; } mFsListeners[name] = std::make_shared(this, name); return mFsListeners[name]; } //------------------------------------------------------------------------------ // Get map of listeners and the keys they are interested in for the given // channel i.e. file system queue path //------------------------------------------------------------------------------ std::map, std::set> MessagingRealm::GetInterestedListeners(const std::string& channel) { std::map, std::set> map_interest; std::scoped_lock lock(mMutexListeners); for (auto& elem : mFsListeners) { auto& listener = elem.second; std::set interested_keys = listener->GetInterests(channel); if (!interested_keys.empty()) { map_interest.emplace(listener, std::move(interested_keys)); } } return map_interest; } EOSMQNAMESPACE_END