// ---------------------------------------------------------------------- // File: QClient.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * qclient - A simple redis C++ client with support for redirects * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "qclient/QClient.hh" #include "qclient/Utils.hh" #include "qclient/network/HostResolver.hh" #include "qclient/network/AsyncConnector.hh" #include "network/NetworkStream.hh" #include #include #include #include #include #include #include #include "qclient/Logger.hh" #include "WriterThread.hh" #include "EndpointDecider.hh" #include "ConnectionCore.hh" #include "qclient/GlobalInterceptor.hh" //------------------------------------------------------------------------------ //! Instantiate a few templates inside this compilation unit, to save compile //! time. The alternative is to have every single compilation unit which //! includes QClient.hh instantiate them, which increases compilation time. //------------------------------------------------------------------------------ template class std::future; #if HAVE_FOLLY == 1 template class folly::Future; #endif using namespace qclient; #define SSTR(message) static_cast(std::ostringstream().flush() << message).str() #define DBG(message) std::cerr << __FILE__ << ":" << __LINE__ << " -- " << #message << " = " << message << std::endl; //------------------------------------------------------------------------------ // Constructor taking host and port //----------------------------------------------------------------------------- QClient::QClient(const std::string& host_, const int port_, Options &&opts) : members(host_, port_), options(std::move(opts)), faultInjector(*this) { startEventLoop(); } //------------------------------------------------------------------------------ // Constructor taking list of members for the cluster //------------------------------------------------------------------------------ QClient::QClient(const Members& members_, Options &&opts) : members(members_), options(std::move(opts)), faultInjector(*this) { startEventLoop(); } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ QClient::~QClient() { shutdownEventFD.notify(); eventLoopThread.join(); cleanup(true); } //------------------------------------------------------------------------------ // Primary execute command that takes a redis encoded buffer and sends it // over the network //------------------------------------------------------------------------------ void QClient::execute(QCallback *callback, EncodedRequest &&req) { connectionCore->stage(callback, std::move(req)); } std::future QClient::execute(EncodedRequest &&req) { return connectionCore->stage(std::move(req)); } #if HAVE_FOLLY == 1 folly::Future QClient::follyExecute(EncodedRequest &&req) { return connectionCore->follyStage(std::move(req)); } #endif //------------------------------------------------------------------------------ // Execute a MULTI block. //------------------------------------------------------------------------------ void QClient::execute(QCallback *callback, std::deque &&reqs) { size_t ignoredResponses = reqs.size() + 1; connectionCore->stage( callback, EncodedRequest::fuseIntoBlockAndSurround(std::move(reqs)), ignoredResponses); } std::future QClient::execute(std::deque &&reqs) { size_t ignoredResponses = reqs.size() + 1; return connectionCore->stage( EncodedRequest::fuseIntoBlockAndSurround(std::move(reqs)), ignoredResponses); } #if HAVE_FOLLY == 1 folly::Future QClient::follyExecute(std::deque &&req) { size_t ignoredResponses = req.size() + 1; return connectionCore->follyStage( EncodedRequest::fuseIntoBlockAndSurround(std::move(req)), ignoredResponses); } #endif //------------------------------------------------------------------------------ // Event loop for the client //------------------------------------------------------------------------------ void QClient::startEventLoop() { // Initialize a simple logger if user has not provided one if(!options.logger) { options.logger = std::make_shared(); } // If no handshake is present, and user is asking to prime // connection: Set handshake to a simple Pinger. if(!options.handshake && options.ensureConnectionIsPrimed) { options.handshake.reset(new PingHandshake()); } hostResolver = std::make_unique(options.logger.get()); endpointDecider = std::make_unique(options.logger.get(), hostResolver.get(), members); // Give some leeway when starting up before declaring the cluster broken. lastAvailable = std::chrono::steady_clock::now(); connectionCore.reset(new ConnectionCore(options.logger.get(), options.handshake.get(), options.backpressureStrategy, options.transparentRedirects, options.messageListener.get(), options.exclusivePubsub, options.mPerfCb.get())); writerThread.reset(new WriterThread(options.logger.get(), *connectionCore.get(), shutdownEventFD)); eventLoopThread.reset(&QClient::eventLoop, this); } //------------------------------------------------------------------------------ // Feed bytes from the socket into the response builder //------------------------------------------------------------------------------ bool QClient::feed(const char* buf, size_t len) { responseBuilder.feed(buf, len); while (true) { redisReplyPtr rr; ResponseBuilder::Status status = responseBuilder.pull(rr); if(status == ResponseBuilder::Status::kProtocolError) { return false; } if(status == ResponseBuilder::Status::kIncomplete) { // We need more bytes before a full response can be built, go back // to the event loop to pull more bytes. return true; } // --- We have a new response from the server! // Is this a redirect? if (options.transparentRedirects && rr->type == REDIS_REPLY_ERROR && strncmp(rr->str, "MOVED ", strlen("MOVED ")) == 0) { std::vector response = split(std::string(rr->str, rr->len), " "); RedisServer redirect; if (response.size() == 3 && parseServer(response[2], redirect)) { endpointDecider->registerRedirection(Endpoint(redirect.host, redirect.port)); return false; } } // "Normal" response, let the connection handler take care of it. if(!connectionCore->consumeResponse(std::move(rr))) { // An error has been signalled, this connection cannot go on. return false; } // We're all good, satisfy request. successfulResponses = true; } return true; } //------------------------------------------------------------------------------ // Should we purge any requests currently queued inside WriterThread? //------------------------------------------------------------------------------ bool QClient::shouldPurgePendingRequests() { if(options.retryStrategy.getMode() == RetryStrategy::Mode::kInfiniteRetries) { //-------------------------------------------------------------------------- // Infinite retries, nope. //-------------------------------------------------------------------------- return false; } if(options.retryStrategy.getMode() == RetryStrategy::Mode::kRetryWithTimeout && lastAvailable + options.retryStrategy.getTimeout() >= std::chrono::steady_clock::now()) { //-------------------------------------------------------------------------- // Timeout has not expired yet, nope. //-------------------------------------------------------------------------- return false; } if(options.retryStrategy.getMode() == RetryStrategy::Mode::kNRetries && currentConnectionEpoch <= options.retryStrategy.getRetries()) { //-------------------------------------------------------------------------- // Retries have not been done yet, nope. //-------------------------------------------------------------------------- return false; } //---------------------------------------------------------------------------- // We shouldn't purge while trying to connect.. Otherwise, the following // will occur: // // qclient::QClient qcl( ... ); // qcl.execute("PING") -> failed because the first ServiceEndpoint was // invalid, even though the rest were good // // Instead, we should consider purging only after we've tried all service // endpoints. //---------------------------------------------------------------------------- if(!successfulResponsesEver && !endpointDecider->madeFullCircle()) { //-------------------------------------------------------------------------- // We're still trying out endpoints, no purge //-------------------------------------------------------------------------- return false; } //---------------------------------------------------------------------------- // Yes, purge. //---------------------------------------------------------------------------- return true; } //------------------------------------------------------------------------------ // Cleanup before reconnection or when exiting //------------------------------------------------------------------------------ void QClient::cleanup(bool shutdown) { writerThread->deactivate(); networkStream.reset(); responseBuilder.restart(); successfulResponsesEver = successfulResponsesEver | successfulResponses; successfulResponses = false; if(shouldPurgePendingRequests()) { size_t previouslyPending = connectionCore->clearAllPending(); if(shutdown) { QCLIENT_LOG(options.logger, LogLevel::kDebug, SSTR("Shutting down QClient, discarding " << previouslyPending << " pending requests")); } else { QCLIENT_LOG(options.logger, LogLevel::kInfo, SSTR("Backend is unavailable, discarding " << previouslyPending << " pending requests")); } } connectionCore->reconnection(); } //------------------------------------------------------------------------------ // Set up TCP connection //------------------------------------------------------------------------------ void QClient::connectTCP() { // TODO(gbitzes): Fix fault injection eventually.. // if(faultInjector.hasPartition(untranslatedTargetEndpoint)) { // networkStream.reset(); // return; // } ServiceEndpoint endpoint; if(!endpointDecider->getNextEndpoint(endpoint)) { return; } AsyncConnector connector(endpoint); if(!connector.blockUntilReady(shutdownEventFD.getFD(), options.tcpTimeout)) { return; } if(!connector.ok()) { QCLIENT_LOG(options.logger, LogLevel::kInfo, "Encountered an error when connecting to " << endpoint.getString() << ": " << connector.getError()); return; } networkStream.reset(new NetworkStream(connector.release(), options.tlsconfig)); if(!networkStream->ok()) { return; } notifyConnectionEstablished(); writerThread->activate(networkStream.get()); } //------------------------------------------------------------------------------ // Connect //------------------------------------------------------------------------------ void QClient::connect() { currentConnectionEpoch++; if(currentConnectionEpoch != 1) { cleanup(false); } connectTCP(); } //------------------------------------------------------------------------------ // Main event loop thread, handles processing of incoming requests and // reconnects in case of network instabilities. //------------------------------------------------------------------------------ void QClient::eventLoop(ThreadAssistant &assistant) { signal(SIGPIPE, SIG_IGN); std::chrono::milliseconds backoff(1); while (true) { this->connect(); bool receivedBytes = handleConnectionEpoch(assistant); if(receivedBytes) { backoff = std::chrono::milliseconds(1); } assistant.wait_for(backoff); if (assistant.terminationRequested()) { feed(NULL, 0); break; } // Give some more leeway, update lastAvailable after sleeping. if(successfulResponses) { lastAvailable = std::chrono::steady_clock::now(); } if (backoff < std::chrono::milliseconds(2048)) { backoff++; } } } //------------------------------------------------------------------------------ // Return fault injector object for this QClient //------------------------------------------------------------------------------ FaultInjector& QClient::getFaultInjector() { return faultInjector; } //------------------------------------------------------------------------------ // Notification from FaultInjector that fault injections were updated //------------------------------------------------------------------------------ void QClient::notifyFaultInjectionsUpdated() { // shutdownEventFD.notify(); } //------------------------------------------------------------------------------ // Attach reconnection listener. The underlying object must remain alive // as long as the reconnection listener is attached! //------------------------------------------------------------------------------ void QClient::attachListener(ReconnectionListener *listener) { std::unique_lock lock(reconnectionListenersMtx); reconnectionListeners.insert(listener); } //------------------------------------------------------------------------------ // Detach reconnection listener. It's now safe to delete the underlying // object. // // Returns true if the given object was found to be registered and was // removed, false otherwise. //------------------------------------------------------------------------------ bool QClient::detachListener(ReconnectionListener *listener) { std::unique_lock lock(reconnectionListenersMtx); auto it = reconnectionListeners.find(listener); if(it == reconnectionListeners.end()) { return false; } reconnectionListeners.erase(it); return true; } //------------------------------------------------------------------------------ // Notify that a connection has been established //------------------------------------------------------------------------------ void QClient::notifyConnectionEstablished() { std::unique_lock lock(reconnectionListenersMtx); for(auto it = reconnectionListeners.begin(); it != reconnectionListeners.end(); it++) { (*it)->notifyConnectionEstablished(currentConnectionEpoch); } } //------------------------------------------------------------------------------ // Notify that a connection has been lost //------------------------------------------------------------------------------ void QClient::notifyConnectionLost(int errc, const std::string &err) { std::unique_lock lock(reconnectionListenersMtx); for(auto it = reconnectionListeners.begin(); it != reconnectionListeners.end(); it++) { (*it)->notifyConnectionLost(currentConnectionEpoch, errc, err); } } //------------------------------------------------------------------------------ // Handles a single "connection epoch". If the current socket breaks, we return // and let the parent handle the error. // // Returns whether, during this connection epoch, any bytes at all were received // from the server. //------------------------------------------------------------------------------ bool QClient::handleConnectionEpoch(ThreadAssistant &assistant) { const size_t BUFFER_SIZE = 1024 * 2; char buffer[BUFFER_SIZE]; bool receivedBytes = false; if(!networkStream || !networkStream->ok()) { return false; } struct pollfd polls[2]; polls[0].fd = shutdownEventFD.getFD(); polls[0].events = POLLIN; polls[1].fd = networkStream->getFd(); polls[1].events = POLLIN; RecvStatus status(true, 0, 0); while (networkStream->ok()) { // If the previous iteration returned any bytes at all, try to read again // without polling. It could be that there's more data cached inside // OpenSSL, which poll() will not detect. if(status.bytesRead <= 0) { int rpoll = poll(polls, 2, 60); if(rpoll < 0 && errno != EINTR) { // something's wrong, try to reconnect break; } } if( (polls[0].revents != 0) || assistant.terminationRequested()) { notifyConnectionLost(0, "shutdown requested"); break; } // looks like a legit connection status = networkStream->recv(buffer, BUFFER_SIZE, 0); if(!status.connectionAlive) { break; // connection died on us } if(status.bytesRead > 0 && !feed(buffer, status.bytesRead)) { notifyConnectionLost(EINVAL, "protocol violation"); break; // protocol violation } else { receivedBytes = true; } } if(!networkStream->ok()) { notifyConnectionLost(networkStream->getErrno(), networkStream->getError()); } return receivedBytes; } //------------------------------------------------------------------------------ // Wrapper function for exists command //------------------------------------------------------------------------------ long long int QClient::exists(const std::string& key) { redisReplyPtr reply = exec("EXISTS", key).get(); if ((reply == nullptr) || (reply->type != REDIS_REPLY_INTEGER)) { throw std::runtime_error("[FATAL] Error exists key: " + key + ": Unexpected/null reply "); } return reply->integer; } //------------------------------------------------------------------------------ // Wrapper function for del command //------------------------------------------------------------------------------ long long int QClient::del(const std::string& key) { redisReplyPtr reply = exec("DEL", key).get(); if ((reply == nullptr) || (reply->type != REDIS_REPLY_INTEGER)) { throw std::runtime_error("[FATAL] Error del key: " + key + ": Unexpected/null reply "); } return reply->integer; } //------------------------------------------------------------------------------ // Check whether we're currently connected by sending a PING -- synchronous // operation with the given timeout. //------------------------------------------------------------------------------ Status QClient::checkConnection(std::chrono::milliseconds timeout) { std::future fut = this->exec("PING"); if(fut.wait_for(timeout) != std::future_status::ready) { return Status(ETIME, "time-out while waiting on PING reply"); } qclient::redisReplyPtr reply = fut.get(); if (!reply) { return Status(ENOTCONN, "connection not active"); } if (reply->type != REDIS_REPLY_STATUS || std::string(reply->str, reply->len) != "PONG") { return Status(EINVAL, SSTR("Received unexpected response to PING request: " << qclient::describeRedisReply(reply))); } return Status(); }