![]() |
Eigen-unsupported
3.3.3
|
00001 // This file is part of Eigen, a lightweight C++ template library 00002 // for linear algebra. 00003 // 00004 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> 00005 // 00006 // This Source Code Form is subject to the terms of the Mozilla 00007 // Public License v. 2.0. If a copy of the MPL was not distributed 00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 00009 00010 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 00011 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ 00012 00013 00014 namespace Eigen { 00015 00016 // RunQueue is a fixed-size, partially non-blocking deque or Work items. 00017 // Operations on front of the queue must be done by a single thread (owner), 00018 // operations on back of the queue can be done by multiple threads concurrently. 00019 // 00020 // Algorithm outline: 00021 // All remote threads operating on the queue back are serialized by a mutex. 00022 // This ensures that at most two threads access state: owner and one remote 00023 // thread (Size aside). The algorithm ensures that the occupied region of the 00024 // underlying array is logically continuous (can wraparound, but no stray 00025 // occupied elements). Owner operates on one end of this region, remote thread 00026 // operates on the other end. Synchronization between these threads 00027 // (potential consumption of the last element and take up of the last empty 00028 // element) happens by means of state variable in each element. States are: 00029 // empty, busy (in process of insertion of removal) and ready. Threads claim 00030 // elements (empty->busy and ready->busy transitions) by means of a CAS 00031 // operation. The finishing transition (busy->empty and busy->ready) are done 00032 // with plain store as the element is exclusively owned by the current thread. 00033 // 00034 // Note: we could permit only pointers as elements, then we would not need 00035 // separate state variable as null/non-null pointer value would serve as state, 00036 // but that would require malloc/free per operation for large, complex values 00037 // (and this is designed to store std::function<()>). 00038 template <typename Work, unsigned kSize> 00039 class RunQueue { 00040 public: 00041 RunQueue() : front_(0), back_(0) { 00042 // require power-of-two for fast masking 00043 eigen_assert((kSize & (kSize - 1)) == 0); 00044 eigen_assert(kSize > 2); // why would you do this? 00045 eigen_assert(kSize <= (64 << 10)); // leave enough space for counter 00046 for (unsigned i = 0; i < kSize; i++) 00047 array_[i].state.store(kEmpty, std::memory_order_relaxed); 00048 } 00049 00050 ~RunQueue() { eigen_assert(Size() == 0); } 00051 00052 // PushFront inserts w at the beginning of the queue. 00053 // If queue is full returns w, otherwise returns default-constructed Work. 00054 Work PushFront(Work w) { 00055 unsigned front = front_.load(std::memory_order_relaxed); 00056 Elem* e = &array_[front & kMask]; 00057 uint8_t s = e->state.load(std::memory_order_relaxed); 00058 if (s != kEmpty || 00059 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 00060 return w; 00061 front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed); 00062 e->w = std::move(w); 00063 e->state.store(kReady, std::memory_order_release); 00064 return Work(); 00065 } 00066 00067 // PopFront removes and returns the first element in the queue. 00068 // If the queue was empty returns default-constructed Work. 00069 Work PopFront() { 00070 unsigned front = front_.load(std::memory_order_relaxed); 00071 Elem* e = &array_[(front - 1) & kMask]; 00072 uint8_t s = e->state.load(std::memory_order_relaxed); 00073 if (s != kReady || 00074 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 00075 return Work(); 00076 Work w = std::move(e->w); 00077 e->state.store(kEmpty, std::memory_order_release); 00078 front = ((front - 1) & kMask2) | (front & ~kMask2); 00079 front_.store(front, std::memory_order_relaxed); 00080 return w; 00081 } 00082 00083 // PushBack adds w at the end of the queue. 00084 // If queue is full returns w, otherwise returns default-constructed Work. 00085 Work PushBack(Work w) { 00086 std::unique_lock<std::mutex> lock(mutex_); 00087 unsigned back = back_.load(std::memory_order_relaxed); 00088 Elem* e = &array_[(back - 1) & kMask]; 00089 uint8_t s = e->state.load(std::memory_order_relaxed); 00090 if (s != kEmpty || 00091 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 00092 return w; 00093 back = ((back - 1) & kMask2) | (back & ~kMask2); 00094 back_.store(back, std::memory_order_relaxed); 00095 e->w = std::move(w); 00096 e->state.store(kReady, std::memory_order_release); 00097 return Work(); 00098 } 00099 00100 // PopBack removes and returns the last elements in the queue. 00101 // Can fail spuriously. 00102 Work PopBack() { 00103 if (Empty()) return Work(); 00104 std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); 00105 if (!lock) return Work(); 00106 unsigned back = back_.load(std::memory_order_relaxed); 00107 Elem* e = &array_[back & kMask]; 00108 uint8_t s = e->state.load(std::memory_order_relaxed); 00109 if (s != kReady || 00110 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) 00111 return Work(); 00112 Work w = std::move(e->w); 00113 e->state.store(kEmpty, std::memory_order_release); 00114 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed); 00115 return w; 00116 } 00117 00118 // PopBackHalf removes and returns half last elements in the queue. 00119 // Returns number of elements removed. But can also fail spuriously. 00120 unsigned PopBackHalf(std::vector<Work>* result) { 00121 if (Empty()) return 0; 00122 std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock); 00123 if (!lock) return 0; 00124 unsigned back = back_.load(std::memory_order_relaxed); 00125 unsigned size = Size(); 00126 unsigned mid = back; 00127 if (size > 1) mid = back + (size - 1) / 2; 00128 unsigned n = 0; 00129 unsigned start = 0; 00130 for (; static_cast<int>(mid - back) >= 0; mid--) { 00131 Elem* e = &array_[mid & kMask]; 00132 uint8_t s = e->state.load(std::memory_order_relaxed); 00133 if (n == 0) { 00134 if (s != kReady || 00135 !e->state.compare_exchange_strong(s, kBusy, 00136 std::memory_order_acquire)) 00137 continue; 00138 start = mid; 00139 } else { 00140 // Note: no need to store temporal kBusy, we exclusively own these 00141 // elements. 00142 eigen_assert(s == kReady); 00143 } 00144 result->push_back(std::move(e->w)); 00145 e->state.store(kEmpty, std::memory_order_release); 00146 n++; 00147 } 00148 if (n != 0) 00149 back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed); 00150 return n; 00151 } 00152 00153 // Size returns current queue size. 00154 // Can be called by any thread at any time. 00155 unsigned Size() const { 00156 // Emptiness plays critical role in thread pool blocking. So we go to great 00157 // effort to not produce false positives (claim non-empty queue as empty). 00158 for (;;) { 00159 // Capture a consistent snapshot of front/tail. 00160 unsigned front = front_.load(std::memory_order_acquire); 00161 unsigned back = back_.load(std::memory_order_acquire); 00162 unsigned front1 = front_.load(std::memory_order_relaxed); 00163 if (front != front1) continue; 00164 int size = (front & kMask2) - (back & kMask2); 00165 // Fix overflow. 00166 if (size < 0) size += 2 * kSize; 00167 // Order of modification in push/pop is crafted to make the queue look 00168 // larger than it is during concurrent modifications. E.g. pop can 00169 // decrement size before the corresponding push has incremented it. 00170 // So the computed size can be up to kSize + 1, fix it. 00171 if (size > static_cast<int>(kSize)) size = kSize; 00172 return size; 00173 } 00174 } 00175 00176 // Empty tests whether container is empty. 00177 // Can be called by any thread at any time. 00178 bool Empty() const { return Size() == 0; } 00179 00180 private: 00181 static const unsigned kMask = kSize - 1; 00182 static const unsigned kMask2 = (kSize << 1) - 1; 00183 struct Elem { 00184 std::atomic<uint8_t> state; 00185 Work w; 00186 }; 00187 enum { 00188 kEmpty, 00189 kBusy, 00190 kReady, 00191 }; 00192 std::mutex mutex_; 00193 // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of 00194 // front/back, repsectively. The remaining bits contain modification counters 00195 // that are incremented on Push operations. This allows us to (1) distinguish 00196 // between empty and full conditions (if we would use log(kSize) bits for 00197 // position, these conditions would be indistinguishable); (2) obtain 00198 // consistent snapshot of front_/back_ for Size operation using the 00199 // modification counters. 00200 std::atomic<unsigned> front_; 00201 std::atomic<unsigned> back_; 00202 Elem array_[kSize]; 00203 00204 RunQueue(const RunQueue&) = delete; 00205 void operator=(const RunQueue&) = delete; 00206 }; 00207 00208 } // namespace Eigen 00209 00210 #endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_