// Copyright 2017 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef HIGHWAYHASH_DATA_PARALLEL_H_ #define HIGHWAYHASH_DATA_PARALLEL_H_ // Portable C++11 alternative to OpenMP for data-parallel computations: // provides low-overhead ThreadPool, plus PerThread with support for reduction. #include #include // find_if #include #include //NOLINT #include #include #include #include #include //NOLINT #include //NOLINT #include #include #define DATA_PARALLEL_CHECK(condition) \ while (!(condition)) { \ printf("data_parallel check failed at line %d\n", __LINE__); \ abort(); \ } namespace highwayhash { // Highly scalable thread pool, especially suitable for data-parallel // computations in the fork-join model, where clients need to know when all // tasks have completed. // // Thread pools usually store small numbers of heterogeneous tasks in a queue. // When tasks are identical or differ only by an integer input parameter, it is // much faster to store just one function of an integer parameter and call it // for each value. // // This thread pool can efficiently load-balance millions of tasks using an // atomic counter, thus avoiding per-task syscalls. With 48 hyperthreads and // 1M tasks that add to an atomic counter, overall runtime is 10-20x higher // when using std::async, and up to 200x for a queue-based ThreadPool. // // Usage: // ThreadPool pool; // pool.Run(0, 1000000, [](const int i) { Func1(i); }); // // When Run returns, all of its tasks have finished. // // pool.RunTasks({Func2, Func3, Func4}); // // The destructor waits until all worker threads have exited cleanly. class ThreadPool { public: // Starts the given number of worker threads and blocks until they are ready. // "num_threads" defaults to one per hyperthread. explicit ThreadPool( const int num_threads = std::thread::hardware_concurrency()) : num_threads_(num_threads) { DATA_PARALLEL_CHECK(num_threads_ > 0); threads_.reserve(num_threads_); for (int i = 0; i < num_threads_; ++i) { threads_.emplace_back(ThreadFunc, this); } padding_[0] = 0; // avoid unused member warning. WorkersReadyBarrier(); } ThreadPool(const ThreadPool&) = delete; ThreadPool& operator&(const ThreadPool&) = delete; // Waits for all threads to exit. ~ThreadPool() { StartWorkers(kWorkerExit); for (std::thread& thread : threads_) { thread.join(); } } // Runs func(i) on worker thread(s) for every i in [begin, end). // Not thread-safe - no two calls to Run and RunTasks may overlap. // Subsequent calls will reuse the same threads. // // Precondition: 0 <= begin <= end. template void Run(const int begin, const int end, const Func& func) { DATA_PARALLEL_CHECK(0 <= begin && begin <= end); if (begin == end) { return; } const WorkerCommand worker_command = (WorkerCommand(end) << 32) + begin; // Ensure the inputs do not result in a reserved command. DATA_PARALLEL_CHECK(worker_command != kWorkerWait); DATA_PARALLEL_CHECK(worker_command != kWorkerExit); // If Func is large (many captures), this will allocate memory, but it is // still slower to use a std::ref wrapper. task_ = func; num_reserved_.store(0); StartWorkers(worker_command); WorkersReadyBarrier(); } // Runs each task (closure, typically a lambda function) on worker thread(s). // Not thread-safe - no two calls to Run and RunTasks may overlap. // Subsequent calls will reuse the same threads. // // This is a more conventional interface for heterogeneous tasks that may be // independent/unrelated. void RunTasks(const std::vector>& tasks) { Run(0, static_cast(tasks.size()), [&tasks](const int i) { tasks[i](); }); } // Statically (and deterministically) splits [begin, end) into ranges and // calls "func" for each of them. Useful when "func" involves some overhead // (e.g. for PerThread::Get or random seeding) that should be amortized over // a range of values. "func" is void(int chunk, uint32_t begin, uint32_t end). template void RunRanges(const uint32_t begin, const uint32_t end, const Func& func) { const uint32_t length = end - begin; // Use constant rather than num_threads_ for machine-independent splitting. const uint32_t chunk = std::max(1U, (length + 127) / 128); std::vector> ranges; // begin/end ranges.reserve(length / chunk + 1); for (uint32_t i = 0; i < length; i += chunk) { ranges.emplace_back(begin + i, begin + std::min(i + chunk, length)); } Run(0, static_cast(ranges.size()), [&ranges, func](const int i) { func(i, ranges[i].first, ranges[i].second); }); } private: // After construction and between calls to Run, workers are "ready", i.e. // waiting on worker_start_cv_. They are "started" by sending a "command" // and notifying all worker_start_cv_ waiters. (That is why all workers // must be ready/waiting - otherwise, the notification will not reach all of // them and the main thread waits in vain for them to report readiness.) using WorkerCommand = uint64_t; // Special values; all others encode the begin/end parameters. static constexpr WorkerCommand kWorkerWait = 0; static constexpr WorkerCommand kWorkerExit = ~0ULL; void WorkersReadyBarrier() { std::unique_lock lock(mutex_); workers_ready_cv_.wait(lock, [this]() { return workers_ready_ == num_threads_; }); workers_ready_ = 0; } // Precondition: all workers are ready. void StartWorkers(const WorkerCommand worker_command) { std::unique_lock lock(mutex_); worker_start_command_ = worker_command; // Workers will need this lock, so release it before they wake up. lock.unlock(); worker_start_cv_.notify_all(); } // Attempts to reserve and perform some work from the global range of tasks, // which is encoded within "command". Returns after all tasks are reserved. static void RunRange(ThreadPool* self, const WorkerCommand command) { const int begin = command & 0xFFFFFFFF; const int end = command >> 32; const int num_tasks = end - begin; // OpenMP introduced several "schedule" strategies: // "single" (static assignment of exactly one chunk per thread): slower. // "dynamic" (allocates k tasks at a time): competitive for well-chosen k. // "guided" (allocates k tasks, decreases k): computing k = remaining/n // is faster than halving k each iteration. We prefer this strategy // because it avoids user-specified parameters. for (;;) { const int num_reserved = self->num_reserved_.load(); const int num_remaining = num_tasks - num_reserved; const int my_size = std::max(num_remaining / (self->num_threads_ * 2), 1); const int my_begin = begin + self->num_reserved_.fetch_add(my_size); const int my_end = std::min(my_begin + my_size, begin + num_tasks); // Another thread already reserved the last task. if (my_begin >= my_end) { break; } for (int i = my_begin; i < my_end; ++i) { self->task_(i); } } } static void ThreadFunc(ThreadPool* self) { // Until kWorkerExit command received: for (;;) { std::unique_lock lock(self->mutex_); // Notify main thread that this thread is ready. if (++self->workers_ready_ == self->num_threads_) { self->workers_ready_cv_.notify_one(); } RESUME_WAIT: // Wait for a command. self->worker_start_cv_.wait(lock); const WorkerCommand command = self->worker_start_command_; switch (command) { case kWorkerWait: // spurious wakeup: goto RESUME_WAIT; // lock still held, avoid incrementing ready. case kWorkerExit: return; // exits thread } lock.unlock(); RunRange(self, command); } } const int num_threads_; // Unmodified after ctor, but cannot be const because we call thread::join(). std::vector threads_; std::mutex mutex_; // guards both cv and their variables. std::condition_variable workers_ready_cv_; int workers_ready_ = 0; std::condition_variable worker_start_cv_; WorkerCommand worker_start_command_; // Written by main thread, read by workers (after mutex lock/unlock). std::function task_; // Updated by workers; alignment/padding avoids false sharing. alignas(64) std::atomic num_reserved_{0}; int padding_[15]; }; // Thread-local storage with support for reduction (combining into one result). // The "T" type must be unique to the call site because the list of threads' // copies is a static member. (With knowledge of the underlying threads, we // could eliminate this list and T allocations, but that is difficult to // arrange and we prefer this to be usable independently of ThreadPool.) // // Usage: // for (int i = 0; i < N; ++i) { // // in each thread: // T& my_copy = PerThread::Get(); // my_copy.Modify(); // // // single-threaded: // T& combined = PerThread::Reduce(); // Use(combined); // PerThread::Destroy(); // } // // T is duck-typed and implements the following interface: // // // Returns true if T is default-initialized or Destroy was called without // // any subsequent re-initialization. // bool IsNull() const; // // // Releases any resources. Postcondition: IsNull() == true. // void Destroy(); // // // Merges in data from "victim". Precondition: !IsNull() && !victim.IsNull(). // void Assimilate(const T& victim); template class PerThread { public: // Returns reference to this thread's T instance (dynamically allocated, // so its address is unique). Callers are responsible for any initialization // beyond the default ctor. static T& Get() { static thread_local T* t; if (t == nullptr) { t = new T; static std::mutex mutex; std::lock_guard lock(mutex); Threads().push_back(t); } return *t; } // Returns vector of all per-thread T. Used inside Reduce() or by clients // that require direct access to T instead of Assimilating them. // Function wrapper avoids separate static member variable definition. static std::vector& Threads() { static std::vector threads; return threads; } // Returns the first non-null T after assimilating all other threads' T // into it. Precondition: at least one non-null T exists (caller must have // called Get() and initialized the result). static T& Reduce() { std::vector& threads = Threads(); // Find first non-null T const auto it = std::find_if(threads.begin(), threads.end(), [](const T * t) { return !t->IsNull(); }); if (it == threads.end()) { abort(); } T* const first = *it; for (const T* t : threads) { if (t != first && !t->IsNull()) { first->Assimilate(*t); } } return *first; } // Calls each thread's T::Destroy to release resources and/or prepare for // reuse by the same threads/ThreadPool. Note that all T remain allocated // (we need thread-independent pointers for iterating over each thread's T, // and deleting them would leave dangling pointers in each thread, which is // unacceptable because the same thread may call Get() again later.) static void Destroy() { for (T* t : Threads()) { t->Destroy(); } } }; } // namespace highwayhash #endif // HIGHWAYHASH_DATA_PARALLEL_H_