//------------------------------------------------------------------------------
// File: ConnectionCore.hh
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef QCLIENT_CONNECTION_HANDLER_HH
#define QCLIENT_CONNECTION_HANDLER_HH
#include "qclient/queueing/WaitableQueue.hh"
#include "BackpressureApplier.hh"
#include "RequestQueue.hh"
#include "FutureHandler.hh"
#include "CallbackExecutorThread.hh"
#include "qclient/Logger.hh"
namespace qclient {
class Handshake;
class MessageListener;
//------------------------------------------------------------------------------
// Handles a particular connection, deciding what should be written into the
// socket, and consumes bytes out of it. However, this class is decoupled from
// the actual networking code.
//------------------------------------------------------------------------------
class ConnectionCore {
public:
ConnectionCore(Logger *log, Handshake *hs, BackpressureStrategy backpressure,
bool transparentUnavailable, MessageListener *listener = nullptr,
bool exclusivePubsub = true, QPerfCallback* perf_cb = nullptr);
~ConnectionCore() = default;
void reconnection();
// Returns whether connection is still alive after consuming this response.
// False can happen durnig a failed handshake, for example.
bool consumeResponse(redisReplyPtr &&reply);
void stage(QCallback *callback, EncodedRequest &&req, size_t multiSize = 0u);
std::future stage(EncodedRequest &&req, size_t multiSize = 0u);
#if HAVE_FOLLY == 1
folly::Future follyStage(EncodedRequest &&req,
size_t multiSize = 0u);
#endif
void setBlockingMode(bool value);
StagedRequest* getNextToWrite();
// Wipe out pending request queue - return size of queue
size_t clearAllPending();
//----------------------------------------------------------------------------
//! Mesasure request performance and sent info to the perf callback
//!
//! @param req concerned request
//----------------------------------------------------------------------------
void measurePerf(const StagedRequest& reg) const;
//---------------------------------------------------------------------------
//! Check if we have a performance monitor callback registered
//---------------------------------------------------------------------------
inline bool hasPerfCb() const {
return (mPerfCb != nullptr);
}
private:
Logger *logger;
Handshake *handshake;
BackpressureApplier backpressure;
bool transparentUnavailable;
MessageListener *listener = nullptr;
bool exclusivePubsub;
void acknowledgePending(redisReplyPtr &&reply);
void discardPending();
size_t ignoredResponses = 0u;
WaitableQueue handshakeRequests;
decltype(handshakeRequests)::Iterator handshakeIterator;
std::atomic inHandshake {true};
RequestQueue::Iterator nextToWriteIterator;
RequestQueue::Iterator nextToAcknowledgeIterator;
RequestQueue requestQueue;
FutureHandler futureHandler;
#if HAVE_FOLLY == 1
FollyFutureHandler follyFutureHandler;
#endif
// NOTE: cbExecutor must be destroyed before FutureHandler, so it has to be
// below it in the member variables definition.
CallbackExecutorThread cbExecutor;
QPerfCallback* mPerfCb = nullptr; ///< Performance measurement callback
std::mutex mtx;
};
}
#endif