// ----------------------------------------------------------------------
// File: GlobalConfigChangeListener.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mq/GlobalConfigChangeListener.hh"
#include "mq/XrdMqSharedObject.hh"
#include "mq/MessagingRealm.hh"
#include "common/Locators.hh"
#include
#include
EOSMQNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
GlobalConfigChangeListener::GlobalConfigChangeListener(mq::MessagingRealm*
realm, const std::string& name, const std::string& configQueue)
: mMessagingRealm(realm), mNotifier(nullptr),
mListenerName(name), mConfigQueue(configQueue),
mSubscription(nullptr)
{
if (mMessagingRealm->haveQDB()) {
mSharedHash = mMessagingRealm->getHashProvider()->Get(
eos::common::SharedHashLocator::makeForGlobalHash());
mSubscription = mSharedHash->subscribe(true);
using namespace std::placeholders;
mSubscription->attachCallback(std::bind(
&GlobalConfigChangeListener::ProcessUpdateCb,
this, _1));
} else {
mNotifier = mMessagingRealm->getChangeNotifier();
mNotifier->SubscribesToSubject(mListenerName.c_str(), mConfigQueue.c_str(),
XrdMqSharedObjectChangeNotifier::kMqSubjectModification);
mNotifier->SubscribesToSubject(mListenerName.c_str(), mConfigQueue.c_str(),
XrdMqSharedObjectChangeNotifier::kMqSubjectDeletion);
mNotifier->SubscribesToSubject(mListenerName.c_str(), mConfigQueue.c_str(),
XrdMqSharedObjectChangeNotifier::kMqSubjectKeyDeletion);
mNotifier->BindCurrentThread(mListenerName);
mNotifier->StartNotifyCurrentThread();
}
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
GlobalConfigChangeListener::~GlobalConfigChangeListener()
{
if (mSubscription) {
mSubscription->detachCallback();
}
}
//----------------------------------------------------------------------------
// Callback to process update for the shared hash
//----------------------------------------------------------------------------
void
GlobalConfigChangeListener::ProcessUpdateCb(qclient::SharedHashUpdate&& upd)
{
{
std::lock_guard lock(mMutex);
mPendingUpdates.emplace_back(upd);
}
mCv.notify_one();
}
//------------------------------------------------------------------------------
// Block waiting for an event
//------------------------------------------------------------------------------
bool
GlobalConfigChangeListener::WaitForEvent(Event& out,
std::chrono::seconds timeout)
{
std::unique_lock lock(mMutex);
if (!mCv.wait_for(lock, timeout, [&] {return !mPendingUpdates.empty();})) {
return false;
}
auto update = mPendingUpdates.front();
mPendingUpdates.pop_front();
lock.unlock();
out.key = update.key;
out.deletion = update.value.empty();
return true;
}
//------------------------------------------------------------------------------
// Consume next event, block until there's one
//------------------------------------------------------------------------------
bool GlobalConfigChangeListener::fetch(ThreadAssistant& assistant, Event& out)
{
if (mSharedHash) {
// New QDB implementation
return WaitForEvent(out);
} else {
// Old implementation
mNotifier->tlSubscriber->mSubjMtx.Lock();
if (mNotifier->tlSubscriber->NotificationSubjects.size() == 0u) {
mNotifier->tlSubscriber->mSubjMtx.UnLock();
mNotifier->tlSubscriber->mSubjSem.Wait(1);
mNotifier->tlSubscriber->mSubjMtx.Lock();
}
if (mNotifier->tlSubscriber->NotificationSubjects.size() == 0u) {
mNotifier->tlSubscriber->mSubjMtx.UnLock();
return false;
}
XrdMqSharedObjectManager::Notification event;
event = mNotifier->tlSubscriber->NotificationSubjects.front();
mNotifier->tlSubscriber->NotificationSubjects.pop_front();
mNotifier->tlSubscriber->mSubjMtx.UnLock();
out.key = event.mSubject.c_str();
size_t dpos = out.key.find(";");
if (dpos != std::string::npos) {
out.key.erase(0, dpos + 1);
}
out.deletion = (event.mType == XrdMqSharedObjectManager::kMqSubjectKeyDeletion);
return true;
}
}
EOSMQNAMESPACE_END