// ---------------------------------------------------------------------- // File: Shard.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "StateMachine.hh" #include "Shard.hh" #include "ShardDirectory.hh" #include "StandaloneGroup.hh" #include "raft/RaftGroup.hh" #include "raft/RaftDispatcher.hh" #include "redis/LeaseFilter.hh" #include "utils/ScopedAdder.hh" #include "utils/VectorUtils.hh" #include "Version.hh" using namespace quarkdb; Shard::Shard(ShardDirectory *shardDir, const RaftServer &me, Mode m, const RaftTimeouts &t, const std::string &pw) : shardDirectory(shardDir), myself(me), mode(m), timeouts(t), password(pw), inFlightTracker(false) { attach(); } void Shard::attach() { qdb_assert(!inFlightTracker.isAcceptingRequests()); if(mode == Mode::standalone) { standaloneGroup.reset(new StandaloneGroup(*shardDirectory, false)); dispatcher = standaloneGroup->getDispatcher(); stateMachine = standaloneGroup->getStateMachine(); } else if(mode == Mode::raft) { raftGroup.reset(new RaftGroup(*shardDirectory, myself, timeouts, password)); dispatcher = static_cast(raftGroup->dispatcher()); stateMachine = shardDirectory->getStateMachine(); } else if(mode == Mode::bulkload) { standaloneGroup.reset(new StandaloneGroup(*shardDirectory, true)); dispatcher = standaloneGroup->getDispatcher(); stateMachine = standaloneGroup->getStateMachine(); } else { qdb_throw("cannot determine configuration mode"); // should never happen } inFlightTracker.setAcceptingRequests(true); } void Shard::start() { attach(); spinup(); } void Shard::stopAcceptingRequests() { inFlightTracker.setAcceptingRequests(false); qdb_event("Spinning until all requests being dispatched (" << inFlightTracker.getInFlight() << ") have been processed."); inFlightTracker.spinUntilNoRequestsInFlight(); } void Shard::detach() { if(!inFlightTracker.isAcceptingRequests()) return; stopAcceptingRequests(); qdb_info("All requests processed, detaching."); stateMachine = nullptr; dispatcher = nullptr; raftGroup.reset(); standaloneGroup.reset(); qdb_info("Backend has been detached from this quarkdb shard."); } Shard::~Shard() { detach(); } RaftGroup* Shard::getRaftGroup() { std::scoped_lock lock(raftGroupMtx); return raftGroup.get(); } void Shard::spinup() { raftGroup->spinup(); dispatcher = static_cast(raftGroup->dispatcher()); } void Shard::spindown() { raftGroup->spindown(); } LinkStatus Shard::dispatch(Connection *conn, Transaction &transaction) { commandMonitor.broadcast(conn->describe(), transaction); InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return conn->raw(Formatter::multiply(Formatter::err("unavailable"), transaction.expectedResponses())); } return dispatcher->dispatch(conn, transaction); } NodeHealth Shard::getHealth() { NodeHealth nodeHealth; InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { std::vector indicators; indicators.emplace_back(HealthStatus::kRed, "BACKEND-GROUP-ATTACHED", "No"); return NodeHealth(VERSION_FULL_STRING, indicators); } if(standaloneGroup) { return standaloneGroup->getHealth(); } else if(raftGroup) { return raftGroup->dispatcher()->getHealth(); } qdb_throw("should never reach here"); } LinkStatus Shard::dispatch(Connection *conn, RedisRequest &req) { commandMonitor.broadcast(conn->describe(), req); if(req.getCommandType() == CommandType::RECOVERY) { return conn->err("recovery commands not allowed, not in recovery mode"); } switch(req.getCommand()) { case RedisCommand::MONITOR: { commandMonitor.addRegistration(conn); return conn->ok(); } case RedisCommand::INVALID: { qdb_warn("Received unrecognized command: " << quotes(req[0])); return conn->err(SSTR("unknown command " << quotes(req[0]))); } case RedisCommand::QUARKDB_START_RESILVERING: { if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands"); if(req.size() != 2) return conn->errArgs(req[0]); ResilveringEventID eventID(req[1]); std::string err; if(!shardDirectory->resilveringStart(eventID, err)) { return conn->err(err); } return conn->ok(); } case RedisCommand::QUARKDB_RESILVERING_COPY_FILE: { if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands"); if(req.size() != 4) return conn->errArgs(req[0]); ResilveringEventID eventID(req[1]); std::string err; if(!shardDirectory->resilveringCopy(eventID, req[2], req[3], err)) { return conn->err(err); } return conn->ok(); } case RedisCommand::QUARKDB_FINISH_RESILVERING: { if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands"); if(req.size() != 2) return conn->errArgs(req[0]); ResilveringEventID eventID(req[1]); std::scoped_lock lock(raftGroupMtx); detach(); std::string err; if(!shardDirectory->resilveringFinish(eventID, err)) { start(); return conn->err(err); } start(); return conn->ok(); } case RedisCommand::QUARKDB_BULKLOAD_FINALIZE: { if(req.size() != 1) return conn->errArgs(req[0]); if(mode != Mode::bulkload) { qdb_warn("received command QUARKDB_BULKLOAD_FINALIZE while in mode " << modeToString(mode)); return conn->err("not in bulkload mode"); } stopAcceptingRequests(); stateMachine->finalizeBulkload(); return conn->ok(); } case RedisCommand::QUARKDB_MANUAL_COMPACTION: { if(req.size() != 1) return conn->errArgs(req[0]); InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return conn->err("unavailable"); } return conn->fromStatus(stateMachine->manualCompaction()); } case RedisCommand::QUARKDB_LEVEL_STATS: { if(req.size() != 1) return conn->errArgs(req[0]); InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return conn->err("unavailable"); } return conn->status(stateMachine->levelStats()); } case RedisCommand::QUARKDB_COMPRESSION_STATS: { if(req.size() != 1) return conn->errArgs(req[0]); InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return conn->err("unavailable"); } std::ostringstream ss; std::vector stats = stateMachine->compressionStats(); for(size_t i = 0; i < stats.size(); i++) { ss << "Level " << i << ": " << stats[i] << std::endl; } return conn->status(ss.str()); } case RedisCommand::QUARKDB_HEALTH: { if(req.size() != 1) return conn->errArgs(req[0]); return conn->raw(Formatter::nodeHealth(getHealth())); } case RedisCommand::COMMAND_STATS: { if(req.size() != 1) return conn->errArgs(req[0]); InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return conn->err("unavailable"); } std::vector headers; std::vector> data; stateMachine->getRequestCounter().fillHistorical(headers, data); return conn->raw(Formatter::vectorsWithHeaders(headers, data)); } case RedisCommand::QUARKDB_VERIFY_CHECKSUM: { if(req.size() != 1) return conn->errArgs(req[0]); InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return conn->err("unavailable"); } rocksdb::Status st = stateMachine->verifyChecksum(); std::vector output; output.emplace_back(SSTR("state-machine: " << st.ToString())); return conn->statusVector(output); } default: { if(req.getCommandType() == CommandType::QUARKDB) { qdb_critical("Unable to dispatch command '" << req[0] << "' of type QUARKDB"); return conn->err("internal dispatching error"); } InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return conn->err("unavailable"); } LinkStatus ret = dispatcher->dispatch(conn, req); return ret; } } }