//------------------------------------------------------------------------------ // File: ConnectionCore.cc // Author: Georgios Bitzes - CERN //------------------------------------------------------------------------------ /************************************************************************ * qclient - A simple redis C++ client with support for redirects * * Copyright (C) 2018 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "ConnectionCore.hh" #include "pubsub/MessageParser.hh" #include "qclient/Handshake.hh" #include "qclient/pubsub/MessageListener.hh" #include "qclient/QClient.hh" #define DBG(message) std::cerr << __FILE__ << ":" << __LINE__ << " -- " << #message << " = " << message << std::endl; namespace qclient { ConnectionCore::ConnectionCore(Logger *log, Handshake *hs, BackpressureStrategy bp, bool transUnavail, MessageListener *ms, bool exclpubsub, QPerfCallback* perf_cb) : logger(log), handshake(hs), backpressure(bp), transparentUnavailable(transUnavail), listener(ms), exclusivePubsub(exclpubsub), mPerfCb(perf_cb) { reconnection(); } //------------------------------------------------------------------------------ // Check for "unavailable" response - specific to QDB //------------------------------------------------------------------------------ static bool isUnavailable(redisReply* reply) { if(reply->type != REDIS_REPLY_ERROR) { return false; } static const std::string kFirstType("ERR unavailable"); static const std::string kSecondType("UNAVAILABLE"); if(strncmp(reply->str, kFirstType.c_str(), kFirstType.size()) == 0) { return true; } if(strncmp(reply->str, kSecondType.c_str(), kSecondType.size()) == 0) { return true; } return false; } void ConnectionCore::reconnection() { //---------------------------------------------------------------------------- // The connection has dropped. This means: // - We're in handshake mode once again - forbidden to process user requests // until handshake has completed. // - Any requests written onto the socket which have not been acknowledged // yet will have to be processed anew. // - Any un-acknowledged transactions will have to start from scratch. // // We may or may not purge un-acknowledged requests without trying them // again, but that's for clearAllPending() to decide, not us. //---------------------------------------------------------------------------- if(handshake) { //-------------------------------------------------------------------------- // Re-initialize handshake. //-------------------------------------------------------------------------- inHandshake = true; handshake->restart(); handshakeRequests.reset(); handshakeRequests.emplace_back(nullptr, handshake->provideHandshake()); handshakeIterator = handshakeRequests.begin(); } else { inHandshake = false; } //---------------------------------------------------------------------------- // Re-initialize ignored responses for transactions, reset iterators. //---------------------------------------------------------------------------- ignoredResponses = 0u; nextToWriteIterator = requestQueue.begin(); nextToAcknowledgeIterator = requestQueue.begin(); } size_t ConnectionCore::clearAllPending() { std::lock_guard lock(mtx); //---------------------------------------------------------------------------- // The party's over, any requests that still remain un-acknowledged // will get a null response. //---------------------------------------------------------------------------- inHandshake = false; redisReplyPtr nullReply; while(nextToAcknowledgeIterator.itemHasArrived()) { acknowledgePending(std::move(nullReply)); } size_t retval = requestQueue.size(); requestQueue.reset(); reconnection(); return retval; } void ConnectionCore::stage(QCallback *callback, EncodedRequest &&req, size_t multiSize) { backpressure.reserve(); std::lock_guard lock(mtx); requestQueue.emplace_back(callback, std::move(req), multiSize); } std::future ConnectionCore::stage(EncodedRequest &&req, size_t multiSize) { std::lock_guard lock(mtx); std::future retval = futureHandler.stage(); requestQueue.emplace_back(&futureHandler, std::move(req), multiSize); return retval; } #if HAVE_FOLLY == 1 folly::Future ConnectionCore::follyStage(EncodedRequest &&req, size_t multiSize) { backpressure.reserve(); std::lock_guard lock(mtx); folly::Future retval = follyFutureHandler.stage(); requestQueue.emplace_back(&follyFutureHandler, std::move(req), multiSize); return retval; } #endif void ConnectionCore::acknowledgePending(redisReplyPtr &&reply) { auto& stage_req = nextToAcknowledgeIterator.item(); if (mPerfCb) { measurePerf(stage_req); } cbExecutor.stage(stage_req.getCallback(), std::move(reply)); discardPending(); } void ConnectionCore::discardPending() { nextToAcknowledgeIterator.next(); requestQueue.pop_front(); backpressure.release(); } static bool isOK(const redisReplyPtr &reply) { if(reply->type != REDIS_REPLY_STATUS) { return false; } if(reply->len != 2) { return false; } if(strncasecmp(reply->str, "OK", 2) != 0) { return false; } return true; } static bool isQueued(const redisReplyPtr &reply) { if(reply->type != REDIS_REPLY_STATUS) { return false; } if(reply->len != 6) { return false; } if(strncasecmp(reply->str, "QUEUED", 6) != 0) { return false; } return true; } bool ConnectionCore::consumeResponse(redisReplyPtr &&reply) { // Is this a transient "unavailable" error? Specific to QDB. if(transparentUnavailable && isUnavailable(reply.get())) { // Break connection, try again. QCLIENT_LOG(logger, LogLevel::kWarn, "Cluster is temporarily unavailable: " << std::string(reply->str, reply->len)); return false; } // Is this a response to the handshake? if(inHandshake) { // Forward reply to handshake object, and check the response. Handshake::Status status = handshake->validateResponse(reply); if(status == Handshake::Status::INVALID) { // Error during handshaking, drop connection return false; } if(status == Handshake::Status::VALID_COMPLETE) { // We're done handshaking inHandshake = false; handshakeRequests.setBlockingMode(false); return true; } if(status == Handshake::Status::VALID_INCOMPLETE) { // Still more requests to go handshakeRequests.emplace_back(nullptr, handshake->provideHandshake()); return true; } qclient_assert("should never happen"); } if(reply->type == REDIS_REPLY_PUSH) { if(listener) { Message msg; if(!MessageParser::parse(std::move(reply), msg)) { //---------------------------------------------------------------------- // Parse error, doesn't look like a valid pub/sub message //---------------------------------------------------------------------- QCLIENT_LOG(logger, LogLevel::kWarn, "Unable to parse incoming PUSH type message: " << qclient::describeRedisReply(reply)); return false; } listener->handleIncomingMessage(std::move(msg)); return true; } //-------------------------------------------------------------------------- // Even if message parsing failed, or there's no listener set, not much // more we can do here, we're done. //-------------------------------------------------------------------------- return true; } if(listener && exclusivePubsub) { //-------------------------------------------------------------------------- // Connection is in exclusive pub-sub mode, deliver all replies to message // listener. //-------------------------------------------------------------------------- Message msg; if(!MessageParser::parse(std::move(reply), msg)) { //------------------------------------------------------------------------ // Parse error, doesn't look like a valid pub/sub message //------------------------------------------------------------------------ QCLIENT_LOG(logger, LogLevel::kWarn, "Unable to parse incoming message while connection is in PUB/SUB mode: " << qclient::describeRedisReply(reply)); return false; } listener->handleIncomingMessage(std::move(msg)); return true; } if(!nextToAcknowledgeIterator.itemHasArrived()) { //-------------------------------------------------------------------------- // The server is sending more responses than we sent requests... wtf. // Break connection. //-------------------------------------------------------------------------- QCLIENT_LOG(logger, LogLevel::kError, "Server is sending more responses than there were requests ?!?"); return false; } if(nextToAcknowledgeIterator.item().getMultiSize() != 0u) { ignoredResponses++; if(ignoredResponses == 1u && nextToAcknowledgeIterator.item().getMultiSize() != 0u) { //------------------------------------------------------------------------ // This has to be an OK response, is it? //------------------------------------------------------------------------ if(!isOK(reply)) { QCLIENT_LOG(logger, LogLevel::kError, "Expected OK response at start of MULTI block (multi-size=" << nextToAcknowledgeIterator.item().getMultiSize() << ", current response number=" << ignoredResponses << "), received: " << describeRedisReply(reply)); return false; } return true; } if(ignoredResponses <= nextToAcknowledgeIterator.item().getMultiSize()) { //------------------------------------------------------------------------ // This has to be a QUEUED response, is it? //------------------------------------------------------------------------ if(!isQueued(reply)) { QCLIENT_LOG(logger, LogLevel::kError, "Expected QUEUED response within MULTI block (multi-size=" << nextToAcknowledgeIterator.item().getMultiSize() << ", current response number=" << ignoredResponses << "), received: " << describeRedisReply(reply)); return false; } //------------------------------------------------------------------------ // Yes, ignore //------------------------------------------------------------------------ return true; } // This is the real response. ignoredResponses = 0u; } acknowledgePending(std::move(reply)); return true; } void ConnectionCore::setBlockingMode(bool value) { handshakeRequests.setBlockingMode(value); requestQueue.setBlockingMode(value); } StagedRequest* ConnectionCore::getNextToWrite() { if(inHandshake) { StagedRequest *item = handshakeIterator.getItemBlockOrNull(); if(!item) return nullptr; handshakeIterator.next(); return item; } StagedRequest *item = nextToWriteIterator.getItemBlockOrNull(); if (listener && exclusivePubsub ) { //-------------------------------------------------------------------------- // The connection is in exclusive pub-sub mode, which means normal requests // are no longer being acknowledged. The request queue can potentially // grow to infinity - let's trim no-longer-needed items. //-------------------------------------------------------------------------- while(nextToWriteIterator.seq() > nextToAcknowledgeIterator.seq()) { discardPending(); } } if(!item) return nullptr; nextToWriteIterator.next(); return item; } //------------------------------------------------------------------------------ // Mesasure request performance and sent info to the perf callback //------------------------------------------------------------------------------ void ConnectionCore::measurePerf(const StagedRequest& req) const { if (mPerfCb) { unsigned long long rtt_val = std::chrono::duration_cast (std::chrono::system_clock::now() - req.getTimestamp()).count(); mPerfCb->SendPerfMarker("rtt_us", rtt_val); } } }