![]() |
Eigen-unsupported
3.3.3
|
00001 // This file is part of Eigen, a lightweight C++ template library 00002 // for linear algebra. 00003 // 00004 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> 00005 // 00006 // This Source Code Form is subject to the terms of the Mozilla 00007 // Public License v. 2.0. If a copy of the MPL was not distributed 00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 00009 00010 #ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ 00011 #define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ 00012 00013 namespace Eigen { 00014 00015 // EventCount allows to wait for arbitrary predicates in non-blocking 00016 // algorithms. Think of condition variable, but wait predicate does not need to 00017 // be protected by a mutex. Usage: 00018 // Waiting thread does: 00019 // 00020 // if (predicate) 00021 // return act(); 00022 // EventCount::Waiter& w = waiters[my_index]; 00023 // ec.Prewait(&w); 00024 // if (predicate) { 00025 // ec.CancelWait(&w); 00026 // return act(); 00027 // } 00028 // ec.CommitWait(&w); 00029 // 00030 // Notifying thread does: 00031 // 00032 // predicate = true; 00033 // ec.Notify(true); 00034 // 00035 // Notify is cheap if there are no waiting threads. Prewait/CommitWait are not 00036 // cheap, but they are executed only if the preceeding predicate check has 00037 // failed. 00038 // 00039 // Algorihtm outline: 00040 // There are two main variables: predicate (managed by user) and state_. 00041 // Operation closely resembles Dekker mutual algorithm: 00042 // https://en.wikipedia.org/wiki/Dekker%27s_algorithm 00043 // Waiting thread sets state_ then checks predicate, Notifying thread sets 00044 // predicate then checks state_. Due to seq_cst fences in between these 00045 // operations it is guaranteed than either waiter will see predicate change 00046 // and won't block, or notifying thread will see state_ change and will unblock 00047 // the waiter, or both. But it can't happen that both threads don't see each 00048 // other changes, which would lead to deadlock. 00049 class EventCount { 00050 public: 00051 class Waiter; 00052 00053 EventCount(MaxSizeVector<Waiter>& waiters) : waiters_(waiters) { 00054 eigen_assert(waiters.size() < (1 << kWaiterBits) - 1); 00055 // Initialize epoch to something close to overflow to test overflow. 00056 state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2); 00057 } 00058 00059 ~EventCount() { 00060 // Ensure there are no waiters. 00061 eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); 00062 } 00063 00064 // Prewait prepares for waiting. 00065 // After calling this function the thread must re-check the wait predicate 00066 // and call either CancelWait or CommitWait passing the same Waiter object. 00067 void Prewait(Waiter* w) { 00068 w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed); 00069 std::atomic_thread_fence(std::memory_order_seq_cst); 00070 } 00071 00072 // CommitWait commits waiting. 00073 void CommitWait(Waiter* w) { 00074 w->state = Waiter::kNotSignaled; 00075 // Modification epoch of this waiter. 00076 uint64_t epoch = 00077 (w->epoch & kEpochMask) + 00078 (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); 00079 uint64_t state = state_.load(std::memory_order_seq_cst); 00080 for (;;) { 00081 if (int64_t((state & kEpochMask) - epoch) < 0) { 00082 // The preceeding waiter has not decided on its fate. Wait until it 00083 // calls either CancelWait or CommitWait, or is notified. 00084 EIGEN_THREAD_YIELD(); 00085 state = state_.load(std::memory_order_seq_cst); 00086 continue; 00087 } 00088 // We've already been notified. 00089 if (int64_t((state & kEpochMask) - epoch) > 0) return; 00090 // Remove this thread from prewait counter and add it to the waiter list. 00091 eigen_assert((state & kWaiterMask) != 0); 00092 uint64_t newstate = state - kWaiterInc + kEpochInc; 00093 newstate = (newstate & ~kStackMask) | (w - &waiters_[0]); 00094 if ((state & kStackMask) == kStackMask) 00095 w->next.store(nullptr, std::memory_order_relaxed); 00096 else 00097 w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed); 00098 if (state_.compare_exchange_weak(state, newstate, 00099 std::memory_order_release)) 00100 break; 00101 } 00102 Park(w); 00103 } 00104 00105 // CancelWait cancels effects of the previous Prewait call. 00106 void CancelWait(Waiter* w) { 00107 uint64_t epoch = 00108 (w->epoch & kEpochMask) + 00109 (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); 00110 uint64_t state = state_.load(std::memory_order_relaxed); 00111 for (;;) { 00112 if (int64_t((state & kEpochMask) - epoch) < 0) { 00113 // The preceeding waiter has not decided on its fate. Wait until it 00114 // calls either CancelWait or CommitWait, or is notified. 00115 EIGEN_THREAD_YIELD(); 00116 state = state_.load(std::memory_order_relaxed); 00117 continue; 00118 } 00119 // We've already been notified. 00120 if (int64_t((state & kEpochMask) - epoch) > 0) return; 00121 // Remove this thread from prewait counter. 00122 eigen_assert((state & kWaiterMask) != 0); 00123 if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc, 00124 std::memory_order_relaxed)) 00125 return; 00126 } 00127 } 00128 00129 // Notify wakes one or all waiting threads. 00130 // Must be called after changing the associated wait predicate. 00131 void Notify(bool all) { 00132 std::atomic_thread_fence(std::memory_order_seq_cst); 00133 uint64_t state = state_.load(std::memory_order_acquire); 00134 for (;;) { 00135 // Easy case: no waiters. 00136 if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0) 00137 return; 00138 uint64_t waiters = (state & kWaiterMask) >> kWaiterShift; 00139 uint64_t newstate; 00140 if (all) { 00141 // Reset prewait counter and empty wait list. 00142 newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask; 00143 } else if (waiters) { 00144 // There is a thread in pre-wait state, unblock it. 00145 newstate = state + kEpochInc - kWaiterInc; 00146 } else { 00147 // Pop a waiter from list and unpark it. 00148 Waiter* w = &waiters_[state & kStackMask]; 00149 Waiter* wnext = w->next.load(std::memory_order_relaxed); 00150 uint64_t next = kStackMask; 00151 if (wnext != nullptr) next = wnext - &waiters_[0]; 00152 // Note: we don't add kEpochInc here. ABA problem on the lock-free stack 00153 // can't happen because a waiter is re-pushed onto the stack only after 00154 // it was in the pre-wait state which inevitably leads to epoch 00155 // increment. 00156 newstate = (state & kEpochMask) + next; 00157 } 00158 if (state_.compare_exchange_weak(state, newstate, 00159 std::memory_order_acquire)) { 00160 if (!all && waiters) return; // unblocked pre-wait thread 00161 if ((state & kStackMask) == kStackMask) return; 00162 Waiter* w = &waiters_[state & kStackMask]; 00163 if (!all) w->next.store(nullptr, std::memory_order_relaxed); 00164 Unpark(w); 00165 return; 00166 } 00167 } 00168 } 00169 00170 class Waiter { 00171 friend class EventCount; 00172 // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector. 00173 EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next; 00174 std::mutex mu; 00175 std::condition_variable cv; 00176 uint64_t epoch; 00177 unsigned state; 00178 enum { 00179 kNotSignaled, 00180 kWaiting, 00181 kSignaled, 00182 }; 00183 }; 00184 00185 private: 00186 // State_ layout: 00187 // - low kStackBits is a stack of waiters committed wait. 00188 // - next kWaiterBits is count of waiters in prewait state. 00189 // - next kEpochBits is modification counter. 00190 static const uint64_t kStackBits = 16; 00191 static const uint64_t kStackMask = (1ull << kStackBits) - 1; 00192 static const uint64_t kWaiterBits = 16; 00193 static const uint64_t kWaiterShift = 16; 00194 static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1) 00195 << kWaiterShift; 00196 static const uint64_t kWaiterInc = 1ull << kWaiterBits; 00197 static const uint64_t kEpochBits = 32; 00198 static const uint64_t kEpochShift = 32; 00199 static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift; 00200 static const uint64_t kEpochInc = 1ull << kEpochShift; 00201 std::atomic<uint64_t> state_; 00202 MaxSizeVector<Waiter>& waiters_; 00203 00204 void Park(Waiter* w) { 00205 std::unique_lock<std::mutex> lock(w->mu); 00206 while (w->state != Waiter::kSignaled) { 00207 w->state = Waiter::kWaiting; 00208 w->cv.wait(lock); 00209 } 00210 } 00211 00212 void Unpark(Waiter* waiters) { 00213 Waiter* next = nullptr; 00214 for (Waiter* w = waiters; w; w = next) { 00215 next = w->next.load(std::memory_order_relaxed); 00216 unsigned state; 00217 { 00218 std::unique_lock<std::mutex> lock(w->mu); 00219 state = w->state; 00220 w->state = Waiter::kSignaled; 00221 } 00222 // Avoid notifying if it wasn't waiting. 00223 if (state == Waiter::kWaiting) w->cv.notify_one(); 00224 } 00225 } 00226 00227 EventCount(const EventCount&) = delete; 00228 void operator=(const EventCount&) = delete; 00229 }; 00230 00231 } // namespace Eigen 00232 00233 #endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_