// ---------------------------------------------------------------------- // File: OpaqueFuture.hh // Author: Abhishek Lekshmanan - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2022 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include #include "common/async/OpaqueFuture.hh" #include "common/ThreadPool.hh" #include #include namespace eos::common { namespace detail { // A function that runs a given function via a given folly executor. We wrap the // the result type in a type erased OpaqueFuture to allow for interop with std::future // and folly::Future, we also transform the type tag folly::Unit to a void to interop // with std::future template auto execVia(folly::ThreadPoolExecutor* executor, F&& f) -> std::enable_if_t>::value, OpaqueFuture>> { // Folly's void futures are mapped to a folly::Unit empty type // since this is not void, do this mapping where in we return // an OpaqueFuture of in case the function returns a // void instead of a folly::Unit which we cannot work with. using ResultType = std::invoke_result_t; // a type holding either result of F or folly::Unit if void using follyType = folly::lift_unit_t; folly::Promise promise; auto fut = promise.getFuture(); executor->add([promise = std::move(promise), f = std::move(f)]() mutable { promise.setWith(std::move(f)); }); return OpaqueFuture(std::move(fut)); } // A function that runs a given function via eos::common::Threadpool and returns an opaque future // The task is wrapped as packeged_task over the std::function variant as // lambdas don't decompose to this signature. Since the folly variant of this // function takes a folly::Function, we use the packaged task and transfer ownership. // We wrap the the result type in a type erased OpaqueFuture to allow for interop with folly::future template auto execVia(eos::common::ThreadPool* threadpool, F&& f) -> OpaqueFuture> { using ResultType = std::invoke_result_t; auto task = std::make_shared>(std::move(f)); auto fut = threadpool->PushTask(std::move(task)); return OpaqueFuture(std::move(fut)); } inline void ShutdownExecutor(folly::ThreadPoolExecutor* executor) { executor->stop(); } inline void ShutdownExecutor(eos::common::ThreadPool* threadpool) { threadpool->Stop(); } inline size_t GetQueueSize(folly::ThreadPoolExecutor* executor) { return executor->getPendingTaskCount(); } inline size_t GetQueueSize(eos::common::ThreadPool* threadpool) { return threadpool->GetQueueSize(); } } // detail enum class ExecutorType { kThreadPool, kFollyExecutor, kFollyIOExecutor, }; inline constexpr ExecutorType GetExecutorType(std::string_view exec_type) { if (exec_type == "folly" || exec_type == "follyCPU") { return ExecutorType::kFollyExecutor; } else if (exec_type == "follyIO") { return ExecutorType::kFollyIOExecutor; } // std is the default return ExecutorType::kThreadPool; } /* * A class to hold folly or eos::common::threadpool executors * while it would have been easy to inherit from folly::ThreadPoolExecutor and make our * threadpool use this, we are exposed to potential folly impl bugs. This is to * get around that fact. Also we have two disjoint executor like implementations, which * doesn't make that much sense to combine under a single one. * folly::executors take a folly::function which * is a non copyable type in contrast to std::function. So we can avoid this by * templating on the function type, so that the various executors can be their * own variant of a callable/function/packaged_task etc. */ static constexpr unsigned int MIN_THREADPOOL_SIZE=2; class ExecutorMgr { public: template using future_result_t = OpaqueFuture>; template auto PushTask(F&& f) -> future_result_t { if (auto executor = std::get_if>(&mExecutor)) { return detail::execVia(executor->get(), std::forward(f)); } else if (auto threadpool = std::get_if>(&mExecutor)) { return detail::execVia(threadpool->get(), f); } else { throw std::runtime_error("Invalid executor type"); } } void Shutdown() { std::visit([](auto&& executor) { detail::ShutdownExecutor(executor.get()); }, mExecutor); } size_t GetQueueSize() const { return std::visit([](auto&& executor) { return detail::GetQueueSize(executor.get()); }, mExecutor); } template constexpr bool holdsType() const { return std::holds_alternative(mExecutor); } constexpr bool IsFollyExecutor() const { return holdsType>(); } constexpr bool IsThreadPool() const { return holdsType>(); } ExecutorMgr(ExecutorType type, size_t num_threads) { switch (type) { case ExecutorType::kThreadPool: mExecutor = std::make_shared(MIN_THREADPOOL_SIZE, num_threads); break; case ExecutorType::kFollyExecutor: mExecutor = std::make_shared(num_threads); break; case ExecutorType::kFollyIOExecutor: mExecutor = std::make_shared(num_threads); } } template ExecutorMgr(ExecutorType type, size_t min_threads, Args... args) { switch (type) { case ExecutorType::kThreadPool: mExecutor = std::make_shared(min_threads, args...); break; default: ExecutorMgr(type, min_threads); } } template ExecutorMgr(std::string_view executor_type, size_t num_threads, Args... args) : ExecutorMgr(GetExecutorType(executor_type), num_threads, args...) {} ExecutorMgr(std::shared_ptr executor) : mExecutor(executor) {} ExecutorMgr(std::shared_ptr threadpool) : mExecutor(threadpool) {} ~ExecutorMgr() = default; private: std::variant, std::shared_ptr> mExecutor; }; } // eos::common