//------------------------------------------------------------------------------ // File: ThreadEpochCounter.hh // Author: Abhishek Lekshmanan - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2023 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "common/concurrency/AlignMacros.hh" #include #include #include #include #include namespace eos::common { namespace detail { template struct is_state_less : std::false_type {}; template struct is_state_less> : std::true_type {}; template constexpr bool is_state_less_v = is_state_less::value; } // detail template class VersionEpochCounter { public: inline uint64_t getEpochIndex(uint64_t epoch) noexcept { if (epoch < kMaxEpochs) return epoch; // TODO: This only works assuming that we wouldn't really have // readers at epoch 0 by the time kMaxEpochs is reached, which // is relatively safe given kMaxEpochs amount of writes don't happen // before the first reader finishes. return epoch % kMaxEpochs; } inline size_t increment(uint64_t epoch, uint16_t count=1) noexcept { auto index = getEpochIndex(epoch); mCounter[index].fetch_add(count, std::memory_order_release); return index; } inline void decrement(uint64_t epoch) noexcept { auto index = getEpochIndex(epoch); mCounter[index].fetch_sub(1, std::memory_order_release); } inline void decrement(uint64_t epoch, uint64_t index) noexcept { mCounter[index].fetch_sub(1, std::memory_order_release); } inline size_t getReaders(uint64_t epoch) noexcept { return mCounter[getEpochIndex(epoch)].load(std::memory_order_relaxed); } bool epochHasReaders(uint64_t epoch) noexcept { auto index = getEpochIndex(epoch); return mCounter[index].load(std::memory_order_acquire) > 0; } private: alignas(hardware_destructive_interference_size) std::array, kMaxEpochs> mCounter{0}; }; namespace experimental { // The Idea of Thread local ID is borrowed from // https://github.com/cmuparlay/concurrent_deferred_rcu // Turning Manual Concurrent Memory Reclamation into Automatic Reference Counting // Daniel Anderson, Guy E. Blelloch, Yuanhao Wei (PLDI 2022) static constexpr size_t EOS_MAX_THREADS=32768; static std::array, EOS_MAX_THREADS> g_thread_in_use {false}; struct ThreadID { ThreadID() { for (size_t i = 0; i < EOS_MAX_THREADS; ++i) { bool expected = false; if (!g_thread_in_use[i] && g_thread_in_use[i].compare_exchange_strong(expected, true)) { tid = i; return; } } // COULD NOT FIND A FREE THREAD ID, PANIC! // assert(true); In the rare event we reach here, we can't guarantee EpochCounter // correctness, so we'll just assert. throw std::runtime_error("Could not find a free thread id"); } ~ThreadID() { g_thread_in_use[tid].store(false, std::memory_order_release); } size_t get() { return tid; } size_t tid; }; static thread_local ThreadID tlocalID; /** * @brief a simple epoch counter per thread that can be used to implement * RCU-like algorithms. Basically we store a bitfield of * 16 bit counter and a 48 bit epoch. If we have no hash collisions, this is fairly * simple to implement, you'd only need a simple increment and a memory_order_release * store. However, if we have hash collisions, we need to store the oldest epoch * as we're tracking the oldest epoch. * * This counter currently is not meant to be stable as with thread_id, hash collisions are * expected and thus we won't be doing the epoch tracking correctly if a thread with a colliding * hash moves onto a different epoch. A different way of doing this is just having a thread_local * reader/epoch counter, and when one does a write lock, you'd have to walk through this list of * thread_local pointers. */ struct alignas(hardware_destructive_interference_size) ThreadEpoch { auto get(std::memory_order order = std::memory_order_acquire) { return epoch_counter.load(order); } auto get_counter(std::memory_order order = std::memory_order_acquire) { return get(order) & 0xFFFF; } std::atomic epoch_counter; }; class ThreadEpochCounter { public: using is_state_less = void; size_t increment(uint64_t epoch, uint16_t count=1) noexcept { auto tid = tlocalID.get(); // This is 2 instructions, instead of a single CAS. Given that threads // will not hash to the same number, we can guarantee that we'd only have one // epoch per thread auto old = mCounter[tid].get(); auto new_val = (epoch << 16) | (old & 0xFFFF) + count; mCounter[tid].epoch_counter.store(new_val, std::memory_order_release); return tid; } inline void decrement(uint64_t epoch, size_t tid) { // assert (old >> 16) == epoch); mCounter[tid].epoch_counter.fetch_sub(1, std::memory_order_release); } inline void decrement() { auto tid = tlocalID.get(); mCounter[tid].epoch_counter.fetch_sub(1, std::memory_order_release); } size_t getReaders(size_t tid) noexcept { return mCounter[tid].get_counter(); } bool epochHasReaders(uint64_t epoch) noexcept { for (int i=0; i < EOS_MAX_THREADS; ++i) { auto val = mCounter[i].get(); if ((val >> 16) == epoch && (val & 0xFFFF) > 0) { return true; } } return false; } private: std::array mCounter{0}; }; static_assert(detail::is_state_less_v); } // experimental } // eos::common