//---------------------------------------------------------------------- // File: Connection.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "Connection.hh" #include "Dispatcher.hh" #include "Formatter.hh" #include "utils/InFlightTracker.hh" #include "redis/InternalFilter.hh" using namespace quarkdb; LinkStatus PendingQueue::flushPending(const RedisEncodedResponse &msg) { std::scoped_lock lock(mtx); while(!pending.empty()) { if(conn) { if(!pending.front().rawResp.empty()) { conn->writer.send(std::move(pending.front().rawResp.val)); } else { conn->writer.send(Formatter::multiply(msg, pending.front().tx.expectedResponses() ).val); } } pending.pop(); } if(conn) conn->writer.flush(); lastIndex = -1; return 1; } void PendingQueue::subscribe(const std::string &item) { std::scoped_lock lock(mtx); subscriptionTracker.addChannel(item); } void PendingQueue::psubscribe(const std::string &item) { std::scoped_lock lock(mtx); subscriptionTracker.addPattern(item); } void PendingQueue::unsubscribe(const std::string &item) { std::scoped_lock lock(mtx); subscriptionTracker.removeChannel(item); } void PendingQueue::punsubscribe(const std::string &item) { std::scoped_lock lock(mtx); subscriptionTracker.removePattern(item); } bool PendingQueue::addMessageIfAttached(const std::string &channel, std::string_view payload) { std::scoped_lock lock(mtx); if(!conn) return false; if(!subscriptionTracker.hasChannel(channel)) return true; Connection::FlushGuard guard(conn); appendResponseNoLock(Formatter::message(supportsPushTypes, channel, payload)); return true; } bool PendingQueue::addPatternMessageIfAttached(const std::string &pattern, std::string_view channel, std::string_view payload) { std::scoped_lock lock(mtx); if(!conn) return false; if(!subscriptionTracker.hasPattern(pattern)) return true; Connection::FlushGuard guard(conn); appendResponseNoLock(Formatter::pmessage(supportsPushTypes, pattern, channel, payload)); return true; } bool PendingQueue::appendIfAttachedNoLock(RedisEncodedResponse &&raw) { if(!conn) return false; Connection::FlushGuard guard(conn); appendResponseNoLock(std::move(raw)); return true; } bool PendingQueue::appendIfAttached(RedisEncodedResponse &&raw) { std::scoped_lock lock(mtx); return appendIfAttachedNoLock(std::move(raw)); } LinkStatus PendingQueue::appendResponseNoLock(RedisEncodedResponse &&raw) { if(!conn) qdb_throw("attempted to append a raw response to a pendingQueue while being detached from a Connection. Contents: '" << raw.val << "'"); if(pending.empty()) return conn->writer.send(std::move(raw.val)); // we're being blocked by a write, must queue PendingRequest req; req.rawResp = std::move(raw); pending.push(std::move(req)); return 1; } LinkStatus PendingQueue::appendResponse(RedisEncodedResponse &&raw) { std::scoped_lock lock(mtx); return appendResponseNoLock(std::move(raw)); } LinkStatus PendingQueue::addPendingTransaction(RedisDispatcher *dispatcher, Transaction &&tx, LogIndex index) { std::scoped_lock lock(mtx); if(!conn) qdb_throw("attempted to append a pending request to a pendingQueue while being detached from a Connection, command " << tx.toPrintableString() << ", log index: " << index); if(pending.empty() && index < 0) { // This is a read, and we're not being blocked by any writes. Forward directly // to the state machine, no need to do any queueing. qdb_assert(!tx.containsWrites()); return conn->writer.send(dispatcher->dispatch(tx, 0).val); } if(index > 0) { if(index <= lastIndex) { qdb_throw("attempted to insert queued request with index " << index << " while the last one had index " << lastIndex); } lastIndex = index; } PendingRequest penreq; penreq.tx = std::move(tx); penreq.index = index; pending.push(std::move(penreq)); return 1; } LogIndex PendingQueue::dispatchPending(RedisDispatcher *dispatcher, LogIndex commitIndex) { std::scoped_lock lock(mtx); Connection::FlushGuard guard(conn); bool found = false; while(!pending.empty()) { PendingRequest &req = pending.front(); if(commitIndex < req.index) { // the rest of the items are blocked, return new blocking index return req.index; } if(!req.rawResp.empty()) { if(conn) conn->writer.send(std::move(req.rawResp.val)); } else { if(req.index > 0) { if(found) qdb_throw("queue corruption: " << this << " found entry with positive index twice (" << req.index << ")"); found = true; if(req.index != commitIndex) qdb_throw("queue corruption: " << this << " expected entry with index " << commitIndex << ", found " << req.index); } // we must dispatch the request even if the connection has died, since // writes increase lastApplied of the state machine RedisEncodedResponse response = dispatcher->dispatch(req.tx, req.index); if(conn) conn->writer.send(std::move(response.val)); } pending.pop(); } if(!found) qdb_throw("entry with index " << commitIndex << " not found"); // no more pending requests return -1; } void PendingQueue::activatePushTypes() { supportsPushTypes = true; } bool PendingQueue::hasPushTypesActive() const { return supportsPushTypes; } size_t phantomBatchLimit = 100; void Connection::setPhantomBatchLimit(size_t newval) { phantomBatchLimit = newval; } void Connection::setName(std::string_view name) { clientName.set(name); } std::string Connection::getName() const { return clientName.get(); } Connection::Connection(Link *l) : writer(l), parser(l), pendingQueue(new PendingQueue(this)), description(l->describe()), uuid(l->getID()), localhost(l->isLocalhost()) { } Connection::~Connection() { pendingQueue->detachConnection(); } LinkStatus Connection::raw(RedisEncodedResponse &&encoded) { return pendingQueue->appendResponse(std::move(encoded)); } LinkStatus Connection::moved(int64_t shardId, const RaftServer &location) { return pendingQueue->appendResponse(Formatter::moved(shardId, location)); } LinkStatus Connection::err(std::string_view msg) { return pendingQueue->appendResponse(Formatter::err(msg)); } LinkStatus Connection::errArgs(std::string_view cmd) { return pendingQueue->appendResponse(Formatter::errArgs(cmd)); } LinkStatus Connection::pong() { return pendingQueue->appendResponse(Formatter::pong()); } LinkStatus Connection::string(std::string_view str) { return pendingQueue->appendResponse(Formatter::string(str)); } LinkStatus Connection::fromStatus(const rocksdb::Status &status) { return pendingQueue->appendResponse(Formatter::fromStatus(status)); } LinkStatus Connection::status(std::string_view msg) { return pendingQueue->appendResponse(Formatter::status(msg)); } LinkStatus Connection::ok() { return pendingQueue->appendResponse(Formatter::ok()); } LinkStatus Connection::null() { return pendingQueue->appendResponse(Formatter::null()); } LinkStatus Connection::integer(int64_t number) { return pendingQueue->appendResponse(Formatter::integer(number)); } LinkStatus Connection::vector(const std::vector &vec) { return pendingQueue->appendResponse(Formatter::vector(vec)); } LinkStatus Connection::statusVector(const std::vector &vec) { return pendingQueue->appendResponse(Formatter::statusVector(vec)); } LinkStatus Connection::scan(std::string_view marker, const std::vector &vec) { return pendingQueue->appendResponse(Formatter::scan(marker, vec)); } LinkStatus Connection::noauth(std::string_view msg) { return pendingQueue->appendResponse(Formatter::noauth(msg)); } LinkStatus Connection::processRequests(Dispatcher *dispatcher, const InFlightTracker &inFlightTracker) { FlushGuard guard(this); while(inFlightTracker.isAcceptingRequests()) { if(monitor) { // This connection is in "MONITOR" mode, we don't accept any more commands. // Do nothing for all received data. LinkStatus status = parser.purge(); if(status == 0) return 1; // slow link if(status < 0) return status; // link error qdb_throw("should never reach here"); } LinkStatus status = parser.fetch(currentRequest, authorization); InternalFilter::process(currentRequest); if(status < 0) { return status; // link error } if(status == 0) { // slow link - process the write batch, if needed multiHandler.finalizePhantomTransaction(dispatcher, this); return 1; // slow link } // Beginning of a MULTI block: Finalize phantom transactions if(currentRequest.getCommand() == RedisCommand::MULTI) { multiHandler.finalizePhantomTransaction(dispatcher, this); multiHandler.process(dispatcher, this, currentRequest); continue; } // EXEC without MULTI? if(currentRequest.getCommand() == RedisCommand::EXEC && !multiHandler.active()) { this->err("EXEC without MULTI"); continue; } if(currentRequest.getCommand() == RedisCommand::TX_READWRITE) { multiHandler.finalizePhantomTransaction(dispatcher, this); dispatcher->dispatch(this, currentRequest); continue; } if(multiHandler.size() >= phantomBatchLimit) { multiHandler.finalizePhantomTransaction(dispatcher, this); } if(multiHandler.active()) { if(multiHandler.isPhantom() && currentRequest.getCommandType() != CommandType::WRITE) { multiHandler.finalizePhantomTransaction(dispatcher, this); } else { multiHandler.process(dispatcher, this, currentRequest); continue; } } if(currentRequest.getCommand() == RedisCommand::DISCARD) { multiHandler.finalizePhantomTransaction(dispatcher, this); this->err("DISCARD without MULTI"); continue; } if(currentRequest.getCommandType() == CommandType::WRITE) { multiHandler.activatePhantom(); multiHandler.process(dispatcher, this, currentRequest); continue; } else { multiHandler.finalizePhantomTransaction(dispatcher, this); dispatcher->dispatch(this, currentRequest); } } multiHandler.finalizePhantomTransaction(dispatcher, this); return 1; } void Connection::setResponseBuffering(bool value) { writer.setActive(value); } void Connection::flush() { writer.flush(); } std::string Connection::describe() const { return description; } void Connection::activatePushTypes() { pendingQueue->activatePushTypes(); } bool Connection::hasPushTypesActive() const { return pendingQueue->hasPushTypesActive(); }