//------------------------------------------------------------------------------ // File: RCULite.hh // Author: Abhishek Lekshmanan - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2023 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "common/concurrency/ThreadEpochCounter.hh" #include #include namespace eos::common { constexpr size_t MAX_THREADS = 4096; /* A Read Copy Update Like primitive that guarantees that is wait-free on the readers and guarantees that all memory is protected from deletion. This is similar to folly's RCU implementation, but a bit simpler to accomodate our use cases. Let's say you've a data type that is mostly a read workload with very rare updates, with classical RW Locks this is what you'd be doing void reader() { std::shared_lock lock(shared_mutex); process(myconfig); } A rather simple way to not pay the cost would be using something like atomic_unique_ptr void reader() { auto* config_data = myconfig.get() process(config_data); } void writer() { auto *old_config_data = myconfig.reset(new myconfig(config_data)); // This works and is safe, however we don't know when is a good checkpoint // in the program to delete the old_config_data. Deleting when another reader // is still accessing the data is something we want to avoid } void reader() { RCUReadLock rlock(my_rcu_domain); process(myconfig.get()); } void writer() { ConfigData* old_config_data(nullptr); { RCUWriteLock wlock(my_rcu_domain); old_config_data = myconfig.reset(new config(config_data)); } delete (old_config_data); } // Alternatively a scopedRCUWrite will drain the readers and wait for them to complete before deletion void writer() { ScopedRCUWrite(my_rcu_domain, myconfig, new config(config_data)); } */ template , size_t MaxWriters=1> class RCUDomain { public: RCUDomain() = default; inline uint64_t get_current_epoch(std::memory_order order = std::memory_order_acquire) noexcept { return mEpoch.load(order); } inline size_t rcu_read_lock(uint64_t epoch) noexcept { return mReadersCounter.increment(epoch); } inline size_t rcu_read_lock() noexcept { return rcu_read_lock(mEpoch.load(std::memory_order_acquire)); } inline void rcu_read_unlock(uint64_t epoch, uint64_t tag) noexcept { mReadersCounter.decrement(epoch, tag); } // rcu_read_unlock for a stateless list, which doesn't depend on return from // the lock call template inline auto rcu_read_unlock() noexcept -> std::enable_if_t, void> { mReadersCounter.decrement(); } inline void rcu_read_unlock(uint64_t tag) noexcept { mReadersCounter.decrement(mEpoch.load(std::memory_order_acquire), tag); } inline void rcu_write_lock() noexcept { auto writers = mWritersCount.load(std::memory_order_acquire); uint64_t expected_writers = MaxWriters - 1; uint64_t counter{0}; while (!mWritersCount.compare_exchange_strong(expected_writers, expected_writers + 1, std::memory_order_acq_rel)) { if (expected_writers >= MaxWriters) { expected_writers = MaxWriters - 1; } if (counter % 20 == 0) { std::this_thread::yield(); } } } inline void rcu_synchronize() noexcept { auto curr_epoch = mEpoch.load(std::memory_order_acquire); while (!mEpoch.compare_exchange_strong(curr_epoch, curr_epoch + 1, std::memory_order_acq_rel)) ; int i=0; while(mReadersCounter.epochHasReaders(curr_epoch)) { if (i++ % 20 == 0) { std::this_thread::yield(); } } mWritersCount.fetch_sub(1, std::memory_order_release); } inline void rcu_write_unlock() noexcept { rcu_synchronize(); } private: ListT mReadersCounter; alignas(hardware_destructive_interference_size) std::atomic mEpoch{0}; alignas(hardware_destructive_interference_size) std::atomic mWritersCount{0}; }; template struct RCUReadLock { RCUReadLock(RCUDomain& _rcu_domain) : rcu_domain(_rcu_domain) { epoch = rcu_domain.get_current_epoch(); tag = rcu_domain.rcu_read_lock(epoch); } ~RCUReadLock() { rcu_domain.rcu_read_unlock(epoch, tag); } uint64_t tag; uint64_t epoch; RCUDomain& rcu_domain; }; template struct RCUWriteLock { RCUWriteLock(RCUDomain& _rcu_domain): rcu_domain(_rcu_domain) { rcu_domain.rcu_write_lock(); } ~RCUWriteLock() { rcu_domain.rcu_synchronize(); } RCUDomain& rcu_domain; }; template struct ScopedRCUWrite { ScopedRCUWrite(RCUDomain& _rcu_domain, Ptr& ptr, typename Ptr::pointer new_val) : rcu_domain(_rcu_domain) { rcu_domain.rcu_write_lock(); old_val = ptr.reset(new_val); } ~ScopedRCUWrite() { rcu_domain.rcu_synchronize(); delete old_val; } RCUDomain& rcu_domain; typename Ptr::pointer old_val; }; using VersionedRCUDomain = RCUDomain,1>; using EpochRCUDomain = RCUDomain; } // eos::common