// ----------------------------------------------------------------------
//! @file ConcurrentQueue.hh
//! @author Elvin-Alin Sindrilaru - CERN
//! @brief Implementation of a thread-safe queue.
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* CopByright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#pragma once
#include "common/Namespace.hh"
#include
#include
#include
#include
#include
EOSCOMMONNAMESPACE_BEGIN
//------------------------------------------------------------------------------
//! Thread-safe queue implementation using mutexes
//------------------------------------------------------------------------------
template
class ConcurrentQueue: public LogId
{
public:
ConcurrentQueue() = default;
~ConcurrentQueue() = default;
ConcurrentQueue(const ConcurrentQueue& other) = delete;
ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
ConcurrentQueue(ConcurrentQueue&& other) = delete;
ConcurrentQueue& operator=(ConcurrentQueue&& other) = delete;
size_t size() const;
void push(Data& data);
template
void emplace(Ts&& ... args);
bool push_size(Data& data, size_t max_size);
bool empty() const;
bool try_pop(Data& popped_value);
void wait_pop(Data& popped_value);
void clear();
private:
std::queue queue;
mutable std::mutex mMutex;
std::condition_variable mCondVar;
};
//------------------------------------------------------------------------------
//! Get size of the queue
//------------------------------------------------------------------------------
template
size_t
ConcurrentQueue::size() const
{
std::lock_guard lock(mMutex);
return queue.size();
}
//------------------------------------------------------------------------------
//! Push data to the queue
//------------------------------------------------------------------------------
template
void
ConcurrentQueue::push(Data& data)
{
{
std::lock_guard lock(mMutex);
queue.push(data);
}
mCondVar.notify_all();
}
//------------------------------------------------------------------------------
//! Push data to the queue while constructing the object in place
//------------------------------------------------------------------------------
template
template
void ConcurrentQueue::emplace(Ts&& ... args)
{
{
std::lock_guard lock(mMutex);
queue.emplace(std::forward(args)...);
}
mCondVar.notify_all();
}
//------------------------------------------------------------------------------
//! Push data to the queue if queue size is less then max_size
//!
//! @param data object to be pushed in the queue
//! @param max_size max size allowed of the queue
//------------------------------------------------------------------------------
template
bool
ConcurrentQueue::push_size(Data& data, size_t max_size)
{
bool ret_val = false;
std::unique_lock lock(mMutex);
if (queue.size() <= max_size) {
queue.push(data);
lock.unlock();
mCondVar.notify_all();
ret_val = true;
}
return ret_val;
}
//------------------------------------------------------------------------------
//! Test if queue is empty
//------------------------------------------------------------------------------
template
bool ConcurrentQueue::empty() const
{
std::lock_guard lock(mMutex);
return queue.empty();
}
//------------------------------------------------------------------------------
//! Try to get data from queue
//------------------------------------------------------------------------------
template
bool
ConcurrentQueue::try_pop(Data& popped_value)
{
std::lock_guard lock(mMutex);
if (queue.empty()) {
return false;
}
popped_value = queue.front();
queue.pop();
return true;
}
//------------------------------------------------------------------------------
//! Get data from queue, if empty queue then block until at least one element
//! is added
//------------------------------------------------------------------------------
template
void
ConcurrentQueue::wait_pop(Data& popped_value)
{
std::unique_lock lock(mMutex);
mCondVar.wait(lock, [&]() {
return !queue.empty();
});
eos_static_debug("%s", "msg=\"wait on concurrent queue signalled\"");
popped_value = queue.front();
queue.pop();
}
//------------------------------------------------------------------------------
//! Remove all elements from the queue
//------------------------------------------------------------------------------
template
void
ConcurrentQueue::clear()
{
std::lock_guard lock(mMutex);
while (!queue.empty()) {
queue.pop();
}
}
EOSCOMMONNAMESPACE_END