//------------------------------------------------------------------------------ // File: ObserverMgr.hh // Author: Abhishek Lekshmanan - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2022 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "common/SharedCallbackList.hh" #include "common/Logging.hh" #include "common/ThreadPool.hh" #include "common/utils/BindArguments.hh" namespace eos::common { /*! * A mediator like class that basically holds a list of observers and * notifies all of them of changes * @tparam Args - the list of args that you'd need the callbacks to accept */ // Exposing this here as it'll be hard to use typename ObserverMgr::tag using observer_tag_t = shared_callback_slot_t; template class ObserverMgr { public: ObserverMgr() : mThreadPool(2) {} ObserverMgr(size_t min_threads, size_t max_threads = std::thread::hardware_concurrency()) : mThreadPool(min_threads, max_threads) {} virtual ~ObserverMgr() { syncAllNotifications(); } // reap all notifications, blocks the calling thread; not meant to be called often void syncAllNotifications() { for (auto it = async_completions.begin(); it != async_completions.end();) { it->wait(); it = async_completions.erase(it); } } /*! * * @tparam F the type of function, will be inferred from the arg * @param f the actual callable object that will be invoked on notification, * can be a lambda or bind_like object that a std::function accepts * @return a tag (uint32_t) that should be supplied when removing th observer */ template [[nodiscard]] observer_tag_t addObserver(F&& f) { return mObservers.addCallback(std::forward(f)); } void rmObserver(observer_tag_t tag) { mObservers.rmCallback(tag); } /*! * Synchronously notify all the listeners of the changes, note that this will * block the calling thread, so only meant to be called if it can be ensured * that the callbacks would be really small to affect the calling thread * @param args arguments to be provided for each callback */ void notifyChangeSync(Args... args) { auto callbacks = mObservers.getCallbacks(); for (auto callback : callbacks) { if (auto shared_fn = callback.lock()) { std::invoke(*shared_fn, args...); } } } /*! * Asynchronously notify all the listeners of the changes, this job runs * in the ObserverMgr Threadpool and hence doesn't block the calling thread * @param args arguments to be provided for each callback */ void notifyChange(Args... args) { auto callbacks = mObservers.getCallbacks(); for (auto callback : callbacks) { if (auto shared_fn = callback.lock()) { async_completions.emplace_back(mThreadPool.PushTask(std::make_shared < std::packaged_task> (bindArgs(*shared_fn, args...)))); } } // reap the finished completions every time! async_completions.erase(std::remove_if(async_completions.begin(), async_completions.end(), [](std::future& fut) { return (fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready); }), async_completions.end()); } private: mutable eos::common::ThreadPool mThreadPool; mutable std::vector> async_completions; SharedCallbackList mObservers; }; } // eos::common