//---------------------------------------------------------------------- // File: Connection.hh // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #ifndef QUARKDB_CONNECTION_H #define QUARKDB_CONNECTION_H #include "Link.hh" #include "RedisParser.hh" #include "BufferedWriter.hh" #include "Formatter.hh" #include "redis/MultiHandler.hh" #include "redis/Authenticator.hh" #include "pubsub/SubscriptionTracker.hh" #include "utils/Synchronized.hh" #include namespace rocksdb { class Status; } namespace quarkdb { //------------------------------------------------------------------------------ // Keeps track of a list of pending requests, which can optionally be related // to a Connection. // // Why "optionally"? There's no guarantee that by the time a pending request // is ready to be serviced, the connection will still be alive! The client // might have disconnected in the meantime, even after issuing writes that // have already been appended to the raft journal. //------------------------------------------------------------------------------ class Connection; class RedisDispatcher; class PendingQueue { public: PendingQueue(Connection *c) : conn(c) {} ~PendingQueue() {} void detachConnection() { std::scoped_lock lock(mtx); conn = nullptr; } LinkStatus flushPending(const RedisEncodedResponse &msg); LinkStatus appendResponse(RedisEncodedResponse &&raw); LinkStatus addPendingTransaction(RedisDispatcher *dispatcher, Transaction &&tx, LogIndex index = -1); LogIndex dispatchPending(RedisDispatcher *dispatcher, LogIndex commitIndex); bool appendIfAttached(RedisEncodedResponse &&raw); bool appendIfAttachedNoLock(RedisEncodedResponse &&raw); size_t subscriptions = 0u; void subscribe(const std::string &item); void psubscribe(const std::string &item); void unsubscribe(const std::string &item); void punsubscribe(const std::string &item); bool addMessageIfAttached(const std::string &channel, std::string_view payload); bool addPatternMessageIfAttached(const std::string &pattern, std::string_view channel, std::string_view payload); void activatePushTypes(); bool hasPushTypesActive() const; private: LinkStatus appendResponseNoLock(RedisEncodedResponse &&raw); Connection *conn; std::mutex mtx; //---------------------------------------------------------------------------- // Information about a pending request, which can be either a read or a write. // Every write corresponds to exactly one entry in the raft journal. Naturally, // we have to wait until it's committed before responding to the client. // // But why do reads need to wait, too? If a read request is made right after a // write with pipelining, we have to give the responses in the correct order, // so a read has to be queued until the write that's blocking us has finished. // // The queue will usually look like this: // write, read, read, read, write, read, read, read, write, write // // All read requests are being blocked by one or more writes, and each write // corresponds to a unique raft journal entry. // // Reads will be processed as soon as they aren't being blocked by a write. If // all a client does is read, the queue will not be used. //---------------------------------------------------------------------------- struct PendingRequest { Transaction tx; RedisEncodedResponse rawResp; // if not empty, we're just storing a raw, pre-formatted response LogIndex index = -1; // the corresponding entry in the raft journal - only relevant for write requests }; LogIndex lastIndex = -1; std::queue pending; SubscriptionTracker subscriptionTracker; std::atomic supportsPushTypes {false}; }; //------------------------------------------------------------------------------ // Keeps track of connection-specific state. //------------------------------------------------------------------------------ class Dispatcher; class InFlightTracker; class RedisEncodedResponse; class Authenticator; class Connection { public: Connection(Link *link); ~Connection(); std::string describe() const; std::string getID() const { return uuid; } LinkStatus raw(RedisEncodedResponse &&encoded); LinkStatus moved(int64_t shardId, const RaftServer &location); LinkStatus err(std::string_view msg); LinkStatus errArgs(std::string_view cmd); LinkStatus pong(); LinkStatus string(std::string_view str); LinkStatus fromStatus(const rocksdb::Status &status); LinkStatus status(std::string_view msg); LinkStatus ok(); LinkStatus null(); LinkStatus integer(int64_t number); LinkStatus vector(const std::vector &vec); LinkStatus statusVector(const std::vector &vec); LinkStatus scan(std::string_view marker, const std::vector &vec); LinkStatus noauth(std::string_view msg); bool monitor = false; void setMonitor() { // There's no function setting monitor back to false. This is intentional, // there's no going back after issuing 'MONITOR'. monitor = true; } bool raftStaleReads = false; bool raftAuthorization = false; bool authorization = false; std::unique_ptr authenticator; LinkStatus processRequests(Dispatcher *dispatcher, const InFlightTracker &tracker); void setResponseBuffering(bool value); void flush(); LinkStatus addPendingTransaction(RedisDispatcher *dispatcher, Transaction &&tx, LogIndex index = -1) { return pendingQueue->addPendingTransaction(dispatcher, std::move(tx), index); } LinkStatus flushPending(const RedisEncodedResponse &msg) { return pendingQueue->flushPending(msg); } LogIndex dispatchPending(RedisDispatcher *dispatcher, LogIndex commitIndex) { return pendingQueue->dispatchPending(dispatcher, commitIndex); } std::shared_ptr getQueue() { return pendingQueue; } bool hasPushTypesActive() { return pendingQueue->hasPushTypesActive(); } bool isLocalhost() const { return localhost; } class FlushGuard { public: FlushGuard(Connection *c) : conn(c) { } ~FlushGuard() { if(conn) { conn->flush(); } } private: Connection *conn; }; void activatePushTypes(); bool hasPushTypesActive() const; static void setPhantomBatchLimit(size_t newval); void setName(std::string_view name); std::string getName() const; private: BufferedWriter writer; RedisRequest currentRequest; RedisParser parser; std::shared_ptr pendingQueue; std::string description; std::string uuid; bool localhost; Synchronized clientName; MultiHandler multiHandler; friend class PendingQueue; }; } #endif