//------------------------------------------------------------------------------ //! @file BufferManager.hh //! @author Elvin-Alin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2020 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "common/Namespace.hh" #include "common/Logging.hh" #include "common/StringConversion.hh" #include #include #include #include #include EOSCOMMONNAMESPACE_BEGIN //------------------------------------------------------------------------------ //! Get the nearest power of 2 value bigger then the given input but always //! greater than given min //! //! @param input input value //! @param min min power of 2 to be used!!! //! //! @return nearest power of 2 bigger than input //------------------------------------------------------------------------------ uint32_t GetPowerCeil(const uint32_t input, const uint32_t min = 1024); //------------------------------------------------------------------------------ //! Get amount of system memory //------------------------------------------------------------------------------ uint64_t GetSystemMemorySize(); //------------------------------------------------------------------------------ //! Get OS page size aligned buffer //! //! @param size buffer size to be allocated //! //! @return unique_ptr to buffer or null if there is any error //------------------------------------------------------------------------------ std::unique_ptr GetAlignedBuffer(const size_t size); //------------------------------------------------------------------------------ //! Class Buffer //------------------------------------------------------------------------------ class Buffer { friend class BufferManager; public: //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- Buffer(uint64_t size): mCapacity(size), mLength(0ull), mData(nullptr, free) { mData = GetAlignedBuffer(mCapacity); } //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~Buffer() = default; //---------------------------------------------------------------------------- //! Get pointer to underlying data //---------------------------------------------------------------------------- inline char* GetDataPtr() { return mData.get(); } uint64_t mCapacity; ///< Available size of the buffer uint64_t mLength; ///< Length of the useful data std::unique_ptr mData; ///< Buffer holding the data }; //------------------------------------------------------------------------------ //! Class BufferSlot //------------------------------------------------------------------------------ class BufferSlot { friend class BufferManager; public: //---------------------------------------------------------------------------- //! Constructor //! //! @param size size of buffers allocated by the current slot //---------------------------------------------------------------------------- BufferSlot(uint64_t size): mBuffSize(size), mNumBuffers(0) {} //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~BufferSlot() { std::unique_lock lock(mSlotMutex); mAvailableBuffers.clear(); } //---------------------------------------------------------------------------- //! Move assignment operator //---------------------------------------------------------------------------- BufferSlot& operator =(BufferSlot&& other) noexcept { if (this != &other) { mBuffSize = other.mBuffSize; mNumBuffers.store(other.mNumBuffers); mAvailableBuffers = other.mAvailableBuffers; other.mAvailableBuffers.clear(); } return *this; } //---------------------------------------------------------------------------- //! Move constructor //---------------------------------------------------------------------------- BufferSlot(BufferSlot&& other) noexcept { *this = std::move(other); } //---------------------------------------------------------------------------- //! Get buffer //---------------------------------------------------------------------------- std::pair, bool> GetBuffer() { bool new_alloc = false; std::unique_lock lock(mSlotMutex); if (!mAvailableBuffers.empty()) { auto buff = mAvailableBuffers.front(); mAvailableBuffers.pop_front(); return std::make_pair(buff, new_alloc); } ++mNumBuffers; new_alloc = true; return std::make_pair(std::make_shared(mBuffSize), new_alloc); } //---------------------------------------------------------------------------- //! Recycle buffer object //! //! @param buffer buffer object to be recycled //! @param keep true if buffer is to be saved otherwise false //---------------------------------------------------------------------------- void Recycle(std::shared_ptr buffer, bool keep) { if (keep) { std::unique_lock lock(mSlotMutex); mAvailableBuffers.push_back(buffer); } else { --mNumBuffers; } } //---------------------------------------------------------------------------- //! Try to pop a buffer from the list of available ones if possible //---------------------------------------------------------------------------- void Pop() { std::unique_lock lock(mSlotMutex); if (!mAvailableBuffers.empty()) { mAvailableBuffers.pop_front(); --mNumBuffers; } } private: std::mutex mSlotMutex; std::list> mAvailableBuffers; std::atomic mNumBuffers; uint64_t mBuffSize; }; //------------------------------------------------------------------------------ //! Class BufferManager //------------------------------------------------------------------------------ class BufferManager: public eos::common::LogId { public: //---------------------------------------------------------------------------- //! Constructor //! //! @param max_size maximum total size of allocated buffers //! @param slots number of slots for different buffer sizes which are power //! of 2 and multiple of slot_base_size e.g. 1MB //! slot 0 -> 1MB //! slot 1 -> 2MB //! slot 2 -> 4MB //! ... //! slot 6 -> 64MB //! @param slot_base_sz size of the blocks in the first slot //---------------------------------------------------------------------------- BufferManager(uint64_t max_size = 256 * 1024 * 1024 , uint32_t slots = 6, uint64_t slot_base_sz = 1024 * 1024): mMaxSize(max_size), mAllocatedSize(0ull), mNumSlots(slots), mSlotBaseSize(slot_base_sz) { for (uint32_t i = 0u; i <= mNumSlots; ++i) { mSlots.emplace_back((1 << i) * mSlotBaseSize); } } //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~BufferManager() = default; //---------------------------------------------------------------------------- //! Get buffer for the given length //! //! @param size minimum size for requested buffer //! //! @return buffer object //---------------------------------------------------------------------------- std::shared_ptr GetBuffer(uint64_t size) { // No new buffer if we already hold more than half of system memory if (mAllocatedSize > (GetSystemMemorySize() >> 1)) { return nullptr; } uint32_t slot {UINT32_MAX}; // Find appropriate slot for the given size for (uint32_t i = 0; i <= mNumSlots; ++i) { if (size <= (mSlotBaseSize * std::pow(2, i))) { slot = i; break; } } // No slot big enough for the given request if (slot == UINT32_MAX) { // No buffer if size is unreasonably large > 512MB if (size > 512 * eos::common::MB) { return nullptr; } mAllocatedSize += size; return std::make_shared(size); } std::pair, bool> pair = mSlots[slot].GetBuffer(); if (pair.second) { mAllocatedSize += pair.first->mCapacity; } return pair.first; } //---------------------------------------------------------------------------- //! Recycle buffer object //! //! @param buffer objec to be recycled //---------------------------------------------------------------------------- void Recycle(std::shared_ptr buffer) { if (buffer == nullptr) { return; } uint32_t slot {UINT32_MAX}; // Find appropriate slot for given buffer for (uint32_t i = 0; i <= mNumSlots; ++i) { if (buffer->mCapacity == (mSlotBaseSize * std::pow(2, i))) { slot = i; break; } } // Buffer larger then our biggest slot, just deallocate if (slot == UINT32_MAX) { mAllocatedSize -= buffer->mCapacity; buffer.reset(); return; } uint64_t total_size {0ull}; auto sorted_slots = GetSortedSlotSizes(total_size); bool keep = (total_size <= mMaxSize); if (!keep) { eos_debug("msg=\"buffer pool is full\" max_size=%s", eos::common::StringConversion::GetPrettySize(mMaxSize).c_str()); // Perform clean up for rest of slots depending on their size for (auto it = sorted_slots.rbegin(); it != sorted_slots.rend(); ++it) { if (it->first > slot) { mSlots[it->first].Pop(); break; } if (it->first < slot) { // Free the equivalent of a block from the current slot int free_blocks = 1 << (slot - it->first); while (free_blocks) { mSlots[it->first].Pop(); --free_blocks; } break; } } } mSlots[slot].Recycle(buffer, keep); if (!keep) { mAllocatedSize -= buffer->mCapacity; } } //---------------------------------------------------------------------------- //! Get sorted distribution of slot sizes from smallest to biggest //! //! @param total_size compute the total size allocated so far //! //! @return sorted vector of pairs of slot ids and size of allocated buffers //! for that corresponding slot //---------------------------------------------------------------------------- std::vector< std::pair > GetSortedSlotSizes(uint64_t& total_size) const { std::vector< std::pair > elem; total_size = 0ull; for (uint32_t i = 0; i <= mNumSlots; ++i) { elem.push_back(std::make_pair(i, (mSlots[i].mNumBuffers * (1 << i) * 1024 * 1024))); total_size += elem.rbegin()->second; } auto comparator = [](std::pair a, std::pair b) { return (a.second < b.second); }; std::sort(elem.begin(), elem.end(), comparator); return elem; } //---------------------------------------------------------------------------- //! Get number of slots handled by the current buffer manager //---------------------------------------------------------------------------- uint32_t GetNumSlots() const { return mNumSlots.load(); } //---------------------------------------------------------------------------- //! Get max size of buffers stored by buffer manager //---------------------------------------------------------------------------- uint64_t GetMaxSize() const { return mMaxSize.load(); } private: std::atomic mMaxSize; std::atomic mAllocatedSize; std::atomic mNumSlots; const uint64_t mSlotBaseSize; std::vector mSlots; }; EOSCOMMONNAMESPACE_END