// ----------------------------------------------------------------------
// File: AssistedThread.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2018 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef QUARKDB_ASSISTED_THREAD_H
#define QUARKDB_ASSISTED_THREAD_H
#include
#include
#include
#include
#include
#include
namespace quarkdb {
//------------------------------------------------------------------------------
// C++ threads offer no easy way to stop a thread once it's started. Signalling
// "stop" to a (potentially sleeping) background thread involves a subtle dance
// involving a mutex, condition variable, and possibly an atomic.
//
// Doing this correctly for every thread is a huge pain, which this class
// tries to alleviate.
//
// How to create a thread: Just like std::thread, ie
// AssistedThread(&SomeClass::SomeFunction, this, some_int_value)
//
// The function will receive a thread assistant object as *one extra*
// parameter *at the end*, for example:
//
// void SomeClass::SomeFunction(int some_int_value, ThreadAssistant &assistant)
//
// The assistant object can then be used to check if thread termination has been
// requested, or sleep for a specified amount of time but wake up immediatelly
// the moment termination is requested.
//
// A common pattern for background threads is then:
// while(!assistant.terminationRequested()) {
// doStuff();
// assistant.sleep_for(std::chrono::seconds(1));
// }
//------------------------------------------------------------------------------
class AssistedThread;
//------------------------------------------------------------------------------
//! Class ThreadAssistant
//------------------------------------------------------------------------------
class ThreadAssistant {
public:
void reset() {
stopFlag = false;
terminationCallbacks.clear();
}
void requestTermination() {
std::scoped_lock lock(mtx);
if(!stopFlag) {
stopFlag = true;
notifier.notify_all();
for(size_t i = 0; i < terminationCallbacks.size(); i++) {
terminationCallbacks[i]();
}
}
}
void registerCallback(std::function callable) {
std::scoped_lock lock(mtx);
terminationCallbacks.emplace_back(std::move(callable));
if(stopFlag) {
//------------------------------------------------------------------------
// Careful here.. This is a race condition where thread termination has
// already been requested, even though we're not done yet registering
// callbacks, apparently.
//
// Let's simply call the callback ourselves.
//------------------------------------------------------------------------
(terminationCallbacks.back())();
}
}
void dropCallbacks() {
std::scoped_lock lock(mtx);
terminationCallbacks.clear();
}
bool terminationRequested() {
return stopFlag;
}
template
void wait_for(T duration) {
std::unique_lock lock(mtx);
if(stopFlag) return;
notifier.wait_for(lock, duration);
}
template
void wait_until(T duration) {
std::unique_lock lock(mtx);
if(stopFlag) return;
notifier.wait_until(lock, duration);
}
//----------------------------------------------------------------------------
// Ok, this is a bit weird: Consider an AssistedThread which "owns" or
// coordinates a bunch of other threads:
//
// void Coordinator(ThreadAssistant &assistant) {
// AssistedThread worker1( ... );
// AssistedThread worker2( ... );
// AssistedThread worker3( ... );
//
// worker1.blockUntilThreadJoins();
// worker2.blockUntilThreadJoins();
// worker3.blockUntilThreadJoins();
// }
//
// We would like that any requests to shut down Coordinator propagate to all
// workers. Otherwise, since Coordinator blocks waiting for the workers to
// terminate, its own early termination signal would get ignored.
//
// propagateTerminationSignal does just this. In the above example, call:
// assistant.propagateTerminationSignal(worker1);
// assistant.propagateTerminationSignal(worker2);
// assistant.propagateTerminationSignal(worker3);
//
// And the moment Coordinator is asked to terminate, all registered threads
// will, too.
//
// NOTE: assistant object must belong to a different thread!
//----------------------------------------------------------------------------
void propagateTerminationSignal(AssistedThread &thread);
private:
// Private constructor - only AssistedThread can create such an object.
ThreadAssistant(bool flag) : stopFlag(flag) {}
friend class AssistedThread;
std::atomic stopFlag;
std::mutex mtx;
std::condition_variable notifier;
std::vector> terminationCallbacks;
};
class AssistedThread {
public:
//----------------------------------------------------------------------------
//! null constructor, no underlying thread
//----------------------------------------------------------------------------
AssistedThread() : assistant(new ThreadAssistant(true)), joined(true) { }
//----------------------------------------------------------------------------
// universal references, perfect forwarding, variadic template
// (C++ is intensifying)
//----------------------------------------------------------------------------
template
AssistedThread(Args&&... args) : assistant(new ThreadAssistant(false)), joined(false), th(std::forward(args)..., std::ref(*assistant)) {
}
// No assignment, no copying
AssistedThread& operator=(const AssistedThread&) = delete;
// Moving is allowed.
AssistedThread(AssistedThread&& other) {
assistant = std::move(other.assistant);
joined = other.joined;
th = std::move(other.th);
other.joined = true;
}
template
void reset(Args&&... args) {
join();
assistant.get()->reset();
joined = false;
th = std::thread(std::forward(args)..., std::ref(*assistant));
}
virtual ~AssistedThread() {
join();
}
void stop() {
if(joined) return;
assistant->requestTermination();
}
void join() {
if(joined) return;
stop();
blockUntilThreadJoins();
}
// Different meaning than join, which explicitly asks the thread to
// terminate. Here, we simply wait until the thread exits on its own.
void blockUntilThreadJoins() {
if(joined) return;
th.join();
joined = true;
}
void registerCallback(std::function callable) {
assistant->registerCallback(std::move(callable));
}
void dropCallbacks() {
assistant->dropCallbacks();
}
//----------------------------------------------------------------------------
//! Set thread name. Useful to have in GDB traces, for example.
//----------------------------------------------------------------------------
void setName(const std::string &threadName) {
pthread_setname_np(th.native_handle(), threadName.c_str());
}
private:
std::unique_ptr assistant;
bool joined;
std::thread th;
};
inline void ThreadAssistant::propagateTerminationSignal(AssistedThread &thread) {
registerCallback(std::bind(&AssistedThread::stop, &thread));
}
}
#endif