//------------------------------------------------------------------------------ // File: common/ThreadPool.cc // Author: Jozsef Makai - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2017 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "common/Namespace.hh" #include "common/ConcurrentQueue.hh" #include #include #include #ifdef __APPLE__ #include #endif EOSCOMMONNAMESPACE_BEGIN //------------------------------------------------------------------------------------ //! @brief Dynamically scaling pool of threads which will asynchronously execute tasks //------------------------------------------------------------------------------------ class ThreadPool { public: //---------------------------------------------------------------------------------- //! @brief Create a new thread pool //! //! @param threadsMin the minimum and starting number of allocated threads, //! defaults to hardware concurrency //! @param threadsMax the maximum number of allocated threads, //! defaults to hardware concurrency //! @param samplingInterval sampling interval in seconds for the waiting jobs, //! required for dynamic scaling, defaults to 10 seconds //! @param samplingNumber number of samples to collect before making a scaling //! decision, scaling decision will be made after samplingInterval * //! samplingNumber seconds //! @param averageWaitingJobsPerNewThread the average number of waiting jobs per which //! one new thread should be started, defaults to 10, //! e.g. if in average 27.8 jobs were waiting for execution, then 2 new //! threads will be added to the pool //! @param name identifier for the thread pool //---------------------------------------------------------------------------------- explicit ThreadPool(unsigned int threadsMin = std::thread::hardware_concurrency(), unsigned int threadsMax = std::thread::hardware_concurrency(), unsigned int samplingInterval = 10, unsigned int samplingNumber = 12, unsigned int averageWaitingJobsPerNewThread = 10, const std::string& identifier = "default"): mThreadsMin(threadsMin), mThreadsMax(threadsMin > threadsMax ? threadsMin : threadsMax), mPoolSize(0ul), mId(identifier) { auto threadPoolFunc = [this] { bool toContinue = true; do { std::pair>> task; mTasks.wait_pop(task); toContinue = task.first; // Termination is signalled by false if (toContinue) { (*(task.second))(); } } while (toContinue); }; for (auto i = 0u; i < std::max(mThreadsMin.load(), 1u); ++i) { try { mThreadPool.emplace_back(std::async(std::launch::async, threadPoolFunc)); } catch (const std::exception& e) { std::cerr << "error: std::async couldn't start a new thread " << "and threw an exception: " << e.what() << std::endl; continue; } ++mThreadCount; } mPoolSize = mThreadPool.size(); if (mThreadsMax > mThreadsMin) { auto maintainerThreadFunc = [this, threadPoolFunc, samplingInterval, samplingNumber, averageWaitingJobsPerNewThread] { auto rounds = 0u, sumQueueSize = 0u; auto signalFuture = mMaintainerSignal.get_future(); while (true) { if (signalFuture.valid()) { if (signalFuture.wait_for(std::chrono::seconds(samplingInterval)) == std::future_status::ready) { break; } } else { break; } // Check first if we have finished, removable threads/futures and remove them mThreadPool.erase( std::remove_if(mThreadPool.begin(), mThreadPool.end(), [](std::future& future) { return (future.wait_for(std::chrono::seconds(0)) == std::future_status::ready); }), mThreadPool.end()); sumQueueSize += mTasks.size(); if (++rounds == samplingNumber) { auto averageQueueSize = (double) sumQueueSize / rounds; if ((averageQueueSize > mThreadCount) && (mThreadCount <= mThreadsMax)) { auto threadsToAdd = std::min((unsigned int) floor(averageQueueSize / averageWaitingJobsPerNewThread), mThreadsMax - mThreadCount); while (threadsToAdd > 0) { try { mThreadPool.emplace_back(std::async(std::launch::async, threadPoolFunc)); } catch (const std::exception& e) { std::cerr << "error: std::async couldn't start a new thread " << "and threw an exception: " << e.what() << std::endl; continue; } ++mThreadCount; --threadsToAdd; } } else { unsigned int threadsToRemove = 0ull; if (mThreadCount > mThreadsMax) { threadsToRemove = mThreadCount - mThreadsMax; } else { threadsToRemove = mThreadCount - std::max((unsigned int) floor(averageQueueSize), mThreadsMin.load()); } // Push in fake tasks for each thread to be stopped so threads can wake up and // notice that they should terminate. Termination is signalled with false. for (auto i = 0u; i < threadsToRemove; ++i) { auto fake_task = std::make_pair(false, std::make_shared > ([] {})); mTasks.push(fake_task); } mThreadCount -= threadsToRemove; } sumQueueSize = 0u; rounds = 0u; } mPoolSize = mThreadPool.size(); } }; mMaintainerThread.reset(new std::thread(maintainerThreadFunc)); } } //---------------------------------------------------------------------------- //! @brief Push a task for execution, the task can have a return type but //! inputs should be either captured in case of lambdas or bound using //! std::bind in case of regular functions. //! //! @param Ret return type of the task //! @param func the function for the task to execute //! //! @return future of the return type to communicate with your task //---------------------------------------------------------------------------- template std::future PushTask(std::function func) { auto task = std::make_shared>(func); auto taskFunc = std::make_pair(true, std::make_shared>([task] { (*task)(); })); mTasks.push(taskFunc); return task->get_future(); } template std::future PushTask(std::shared_ptr>&& task) { auto taskFunc = std::make_pair(true, std::make_shared>([task] { (*task)(); })); mTasks.push(taskFunc); return task->get_future(); } //---------------------------------------------------------------------------- //! @brief Stop the thread pool. All threads will be stopped and the pool //! cannot be used again. //---------------------------------------------------------------------------- void Stop() { if (mMaintainerThread && mMaintainerThread->joinable()) { mMaintainerSignal.set_value(); mMaintainerThread->join(); } // Push in fake tasks for each threads so all waiting can wake up and // notice that running is over. Termination is signalled with false. for (auto i = 0u; i < mThreadPool.size(); ++i) { auto fake_task = std::make_pair(false, std::make_shared > ([] {})); mTasks.push(fake_task); } for (auto& future : mThreadPool) { if (future.valid()) { future.get(); } } mTasks.clear(); mThreadPool.clear(); } //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~ThreadPool() { Stop(); } //---------------------------------------------------------------------------- //! Get thread pool information //---------------------------------------------------------------------------- std::string GetInfo() const { std::ostringstream oss; oss << "pool=" << std::setw(14) << std::left << mId << " min=" << std::setw(3) << std::left << mThreadsMin << " max=" << std::setw(4) << std::left << mThreadsMax << " size=" << std::setw(4) << std::left << mPoolSize << " queue_sz=" << mTasks.size(); return oss.str(); } //---------------------------------------------------------------------------- //! Set min number of threads. If the new minimum is greater than the current //! max value then this one is also updated. //! //! @param num new min number of threads //---------------------------------------------------------------------------- void SetMinThreads(unsigned int num) { mThreadsMin = num; if (mThreadsMax < num) { mThreadsMax = num; } } //---------------------------------------------------------------------------- //! Set max number of threads. If the new maximum is smaller than the current //! min value then this one is also updated. //! //! @param num new max number of threads > 0 //---------------------------------------------------------------------------- void SetMaxThreads(unsigned int num) { if (num == 0) { return; } mThreadsMax = num; if (mThreadsMin > num) { mThreadsMin = num; } } //---------------------------------------------------------------------------- //! Get size of thread pool //---------------------------------------------------------------------------- unsigned int GetSize() { return mPoolSize; } //---------------------------------------------------------------------------- //! Get size of the queue of jobs //---------------------------------------------------------------------------- size_t GetQueueSize() const { return mTasks.size(); } // Disable copy/move constructors and assignment operators ThreadPool(const ThreadPool&) = delete; ThreadPool(ThreadPool&&) = delete; ThreadPool& operator=(const ThreadPool&) = delete; ThreadPool& operator=(ThreadPool&&) = delete; private: std::vector> mThreadPool; eos::common::ConcurrentQueue>>> mTasks; std::unique_ptr mMaintainerThread; std::promise mMaintainerSignal; std::atomic_uint mThreadCount {0}; std::atomic_uint mThreadsMin, mThreadsMax, mPoolSize; std::string mId; ///< Thread pool identifier }; EOSCOMMONNAMESPACE_END