//------------------------------------------------------------------------------
// File: RequestQueue.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef QCLIENT_REQUEST_QUEUE_HH
#define QCLIENT_REQUEST_QUEUE_HH
#include "qclient/queueing/WaitableQueue.hh"
#include "StagedRequest.hh"
namespace qclient {
//------------------------------------------------------------------------------
// A WaitableQueue which holds pending, un-acknowledged requests, with a twist:
// At all times, this class holds one extra, hidden request at the front.
//
// When you do pop_front(), you're not actually destroying the item which you'd
// have gotten with begin(), but the one prior to it.
//
// pop_front() always lags one item behind, but you have no way to actually
// access the hidden element.
//
// Why? The writer event loop may still be accessing the front element, while
// the reader loop has already received a response. We give a leeway of a
// single extra request before deallocating stuff, even after it has been
// satisfied in order to allow the writer loop to progress safely.
//
// EXAMPLE:
// - Writer loop does a ::send with the top request.
// - Response comes very quickly, the reader loop calls pop_front().
// - If we were to free that item now, the writer loop might segfault as it
// has to access that item again after ::send.
//
// Could there be more responses coming? Well, no, as the writer thread hasn't
// sent those yet.
//
// To prevent the above, we always keep around one extra item at the front of
// the queue, but the interface does not reflect this: Only this class knows
// that this is happening.
//
// begin() will thus return the second item, instead of the first.
//
// To be able to hold this invariant even at startup and keep the code simple,
// we insert a dummy request during construction.
//------------------------------------------------------------------------------
class RequestQueue {
public:
using QueueType = WaitableQueue;
using Iterator = QueueType::Iterator;
//----------------------------------------------------------------------------
// Constructor
//----------------------------------------------------------------------------
RequestQueue() {
insertDummyRequest();
}
//----------------------------------------------------------------------------
// Reset queue contents - identical interface to WaitableQueue
//----------------------------------------------------------------------------
void reset() {
queue.reset();
insertDummyRequest();
}
//----------------------------------------------------------------------------
// Constructs an item inside the queue - identical interface to WaitableQueue
//----------------------------------------------------------------------------
template
void emplace_back(Args&&... args) {
queue.emplace_back(std::forward(args)...);
}
//----------------------------------------------------------------------------
// Pop an item from the front - identical interface to WaitableQueue.
//----------------------------------------------------------------------------
void pop_front() {
queue.pop_front();
}
//----------------------------------------------------------------------------
// Get iterator to the queue - identical interface to WaitableQueue
//----------------------------------------------------------------------------
Iterator begin() {
auto iter = queue.begin();
iter.next();
return iter;
}
//----------------------------------------------------------------------------
// Set blocking mode - identical interface to WaitableQueue
//----------------------------------------------------------------------------
void setBlockingMode(bool value) {
queue.setBlockingMode(value);
}
//----------------------------------------------------------------------------
// If we were to add a new element, what sequence number would it be assigned
// to?
//----------------------------------------------------------------------------
int64_t getNextSequenceNumber() const {
return queue.getNextSequenceNumber();
}
//----------------------------------------------------------------------------
// What is the size of the queue? Dummy item not included.
//----------------------------------------------------------------------------
size_t size() const {
return queue.size() - 1;
}
private:
void insertDummyRequest() {
queue.emplace_back(nullptr, EncodedRequest(std::vector{"dummy"}));
}
QueueType queue;
};
}
#endif