// ---------------------------------------------------------------------- // File: FsChangeListener.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mq/FsChangeListener.hh" #include "mq/XrdMqSharedObject.hh" #include "mq/MessagingRealm.hh" EOSMQNAMESPACE_BEGIN std::string FsChangeListener::sAllMatchTag = "*"; //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ FsChangeListener::FsChangeListener(mq::MessagingRealm* realm, const std::string& name) : mMessagingRealm(realm), mNotifier(nullptr), mListenerName(name) { if (!mMessagingRealm->haveQDB()) { mNotifier = mMessagingRealm->getChangeNotifier(); } } //------------------------------------------------------------------------------ // Subscribe to the given key, such as "stat.errc" or "stat.geotag" //------------------------------------------------------------------------------ bool FsChangeListener::subscribe(const std::string& key) { if (mNotifier) { return mNotifier->SubscribesToKey(mListenerName.c_str(), key, XrdMqSharedObjectChangeNotifier::kMqSubjectModification); } else { eos::common::RWMutexWriteLock wr_lock(mMutexMap); mMapInterests[sAllMatchTag].insert(key); return true; } } //------------------------------------------------------------------------------ // Subscribe to the given channel and key combination - MUST NOT be used // directly but only from FileSystem::AttachFsListener //------------------------------------------------------------------------------ bool FsChangeListener::subscribe(const std::string& channel, const std::set& keys) { if (mNotifier) { return mNotifier->SubscribesToSubjectAndKey(mListenerName.c_str(), channel, keys, XrdMqSharedObjectChangeNotifier::kMqSubjectModification); } else { eos::common::RWMutexWriteLock wr_lock(mMutexMap); auto resp = mMapInterests.emplace(channel, std::set()); auto& set_keys = resp.first->second; set_keys.insert(keys.begin(), keys.end()); return true; } } //---------------------------------------------------------------------------- // Unsubscribe from the given channel and key combination - MUST NOT be used // directly but only from FileSystem::DetachFsListener //---------------------------------------------------------------------------- bool FsChangeListener::unsubscribe(const std::string& channel, const std::set& keys) { if (mNotifier) { return mNotifier->UnsubscribesToSubjectAndKey(mListenerName.c_str(), channel, keys, XrdMqSharedObjectChangeNotifier::kMqSubjectModification); } else { eos::common::RWMutexWriteLock wr_lock(mMutexMap); auto it = mMapInterests.find(channel); if (it != mMapInterests.end()) { for (const auto& key : keys) { it->second.erase(key); } if (it->second.empty()) { mMapInterests.erase(it); } } return true; } } //------------------------------------------------------------------------------ // Check if current listener is interested in updates from the given // channel. Return set of keys that listener is interested in. //------------------------------------------------------------------------------ std::set FsChangeListener::GetInterests(const std::string& channel) const { std::set keys; eos::common::RWMutexReadLock rd_lock(mMutexMap); // Check if this listener is interested in some updates from all channels auto it = mMapInterests.find(sAllMatchTag); if (it != mMapInterests.end()) { keys.insert(it->second.begin(), it->second.end()); } // Check lister has some special interests in this particular channel it = mMapInterests.find(channel); if (it != mMapInterests.end()) { keys.insert(it->second.begin(), it->second.end()); } return keys; } //------------------------------------------------------------------------------ // Start listening //------------------------------------------------------------------------------ bool FsChangeListener::startListening() { if (mNotifier) { mNotifier->BindCurrentThread(mListenerName); return mNotifier->StartNotifyCurrentThread(); } return true; } //------------------------------------------------------------------------------ // Consume next event, block until there's one //------------------------------------------------------------------------------ bool FsChangeListener::fetch(ThreadAssistant& assistant, Event& out, std::chrono::seconds timeout) { if (mNotifier == nullptr) { // New QDB implementation return WaitForEvent(out, timeout); } else { // Old implementation mNotifier->tlSubscriber->mSubjMtx.Lock(); if (mNotifier->tlSubscriber->NotificationSubjects.size() == 0u) { mNotifier->tlSubscriber->mSubjMtx.UnLock(); mNotifier->tlSubscriber->mSubjSem.Wait(1); mNotifier->tlSubscriber->mSubjMtx.Lock(); } if (mNotifier->tlSubscriber->NotificationSubjects.size() == 0u) { mNotifier->tlSubscriber->mSubjMtx.UnLock(); return false; } XrdMqSharedObjectManager::Notification event; event = mNotifier->tlSubscriber->NotificationSubjects.front(); mNotifier->tlSubscriber->NotificationSubjects.pop_front(); mNotifier->tlSubscriber->mSubjMtx.UnLock(); out.fileSystemQueue = event.mSubject.c_str(); size_t dpos = out.fileSystemQueue.find(";"); if (dpos != std::string::npos) { out.key = out.fileSystemQueue; out.key.erase(0, dpos + 1); out.fileSystemQueue.erase(dpos); } out.deletion = (event.mType == XrdMqSharedObjectManager::kMqSubjectDeletion); return true; } } //------------------------------------------------------------------------------ // Check if given event is interesting for the current listener given its // interests //------------------------------------------------------------------------------ bool FsChangeListener::IsEventInteresting(const Event& event) const { const std::set key_interest = GetInterests(event.fileSystemQueue); if (key_interest.find(event.key) != key_interest.cend()) { return true; } return false; } //------------------------------------------------------------------------------ // Notify new event //------------------------------------------------------------------------------ void FsChangeListener::NotifyEvent(const Event& event) { if (IsEventInteresting(event)) { { std::lock_guard lock(mMutex); mPendingEvents.emplace_back(event); } mCv.notify_one(); } } //------------------------------------------------------------------------------ // Waiting at most timout seconds for an event //------------------------------------------------------------------------------ bool FsChangeListener::WaitForEvent(Event& out, std::chrono::seconds timeout) { std::unique_lock lock(mMutex); if (!mCv.wait_for(lock, timeout, [&] {return !mPendingEvents.empty();})) { return false; } out = mPendingEvents.front(); mPendingEvents.pop_front(); lock.unlock(); return true; } EOSMQNAMESPACE_END