// ---------------------------------------------------------------------- // File: AssistedThread.hh // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2018 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #ifndef QUARKDB_ASSISTED_THREAD_H #define QUARKDB_ASSISTED_THREAD_H #include #include #include #include #include #include namespace quarkdb { //------------------------------------------------------------------------------ // C++ threads offer no easy way to stop a thread once it's started. Signalling // "stop" to a (potentially sleeping) background thread involves a subtle dance // involving a mutex, condition variable, and possibly an atomic. // // Doing this correctly for every thread is a huge pain, which this class // tries to alleviate. // // How to create a thread: Just like std::thread, ie // AssistedThread(&SomeClass::SomeFunction, this, some_int_value) // // The function will receive a thread assistant object as *one extra* // parameter *at the end*, for example: // // void SomeClass::SomeFunction(int some_int_value, ThreadAssistant &assistant) // // The assistant object can then be used to check if thread termination has been // requested, or sleep for a specified amount of time but wake up immediatelly // the moment termination is requested. // // A common pattern for background threads is then: // while(!assistant.terminationRequested()) { // doStuff(); // assistant.sleep_for(std::chrono::seconds(1)); // } //------------------------------------------------------------------------------ class AssistedThread; //------------------------------------------------------------------------------ //! Class ThreadAssistant //------------------------------------------------------------------------------ class ThreadAssistant { public: void reset() { stopFlag = false; terminationCallbacks.clear(); } void requestTermination() { std::scoped_lock lock(mtx); if(!stopFlag) { stopFlag = true; notifier.notify_all(); for(size_t i = 0; i < terminationCallbacks.size(); i++) { terminationCallbacks[i](); } } } void registerCallback(std::function callable) { std::scoped_lock lock(mtx); terminationCallbacks.emplace_back(std::move(callable)); if(stopFlag) { //------------------------------------------------------------------------ // Careful here.. This is a race condition where thread termination has // already been requested, even though we're not done yet registering // callbacks, apparently. // // Let's simply call the callback ourselves. //------------------------------------------------------------------------ (terminationCallbacks.back())(); } } void dropCallbacks() { std::scoped_lock lock(mtx); terminationCallbacks.clear(); } bool terminationRequested() { return stopFlag; } template void wait_for(T duration) { std::unique_lock lock(mtx); if(stopFlag) return; notifier.wait_for(lock, duration); } template void wait_until(T duration) { std::unique_lock lock(mtx); if(stopFlag) return; notifier.wait_until(lock, duration); } //---------------------------------------------------------------------------- // Ok, this is a bit weird: Consider an AssistedThread which "owns" or // coordinates a bunch of other threads: // // void Coordinator(ThreadAssistant &assistant) { // AssistedThread worker1( ... ); // AssistedThread worker2( ... ); // AssistedThread worker3( ... ); // // worker1.blockUntilThreadJoins(); // worker2.blockUntilThreadJoins(); // worker3.blockUntilThreadJoins(); // } // // We would like that any requests to shut down Coordinator propagate to all // workers. Otherwise, since Coordinator blocks waiting for the workers to // terminate, its own early termination signal would get ignored. // // propagateTerminationSignal does just this. In the above example, call: // assistant.propagateTerminationSignal(worker1); // assistant.propagateTerminationSignal(worker2); // assistant.propagateTerminationSignal(worker3); // // And the moment Coordinator is asked to terminate, all registered threads // will, too. // // NOTE: assistant object must belong to a different thread! //---------------------------------------------------------------------------- void propagateTerminationSignal(AssistedThread &thread); private: // Private constructor - only AssistedThread can create such an object. ThreadAssistant(bool flag) : stopFlag(flag) {} friend class AssistedThread; std::atomic stopFlag; std::mutex mtx; std::condition_variable notifier; std::vector> terminationCallbacks; }; class AssistedThread { public: //---------------------------------------------------------------------------- //! null constructor, no underlying thread //---------------------------------------------------------------------------- AssistedThread() : assistant(new ThreadAssistant(true)), joined(true) { } //---------------------------------------------------------------------------- // universal references, perfect forwarding, variadic template // (C++ is intensifying) //---------------------------------------------------------------------------- template AssistedThread(Args&&... args) : assistant(new ThreadAssistant(false)), joined(false), th(std::forward(args)..., std::ref(*assistant)) { } // No assignment, no copying AssistedThread& operator=(const AssistedThread&) = delete; // Moving is allowed. AssistedThread(AssistedThread&& other) { assistant = std::move(other.assistant); joined = other.joined; th = std::move(other.th); other.joined = true; } template void reset(Args&&... args) { join(); assistant.get()->reset(); joined = false; th = std::thread(std::forward(args)..., std::ref(*assistant)); } virtual ~AssistedThread() { join(); } void stop() { if(joined) return; assistant->requestTermination(); } void join() { if(joined) return; stop(); blockUntilThreadJoins(); } // Different meaning than join, which explicitly asks the thread to // terminate. Here, we simply wait until the thread exits on its own. void blockUntilThreadJoins() { if(joined) return; th.join(); joined = true; } void registerCallback(std::function callable) { assistant->registerCallback(std::move(callable)); } void dropCallbacks() { assistant->dropCallbacks(); } //---------------------------------------------------------------------------- //! Set thread name. Useful to have in GDB traces, for example. //---------------------------------------------------------------------------- void setName(const std::string &threadName) { pthread_setname_np(th.native_handle(), threadName.c_str()); } private: std::unique_ptr assistant; bool joined; std::thread th; }; inline void ThreadAssistant::propagateTerminationSignal(AssistedThread &thread) { registerCallback(std::bind(&AssistedThread::stop, &thread)); } } #endif