// ---------------------------------------------------------------------- // File: QuarkDBNode.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "StateMachine.hh" #include "QuarkDBNode.hh" #include "Version.hh" #include "Shard.hh" #include "ShardDirectory.hh" #include "utils/FileUtils.hh" #include "utils/ScopedAdder.hh" #include "utils/TimeFormatting.hh" #include "XrdVersion.hh" #include using namespace quarkdb; QuarkDBNode::~QuarkDBNode() { qdb_info("Shutting down QuarkDB node.") } QuarkDBNode::QuarkDBNode(const Configuration &config, const RaftTimeouts &t, ShardDirectory *injectedDirectory) : configuration(config), timeouts(t), password(config.extractPasswordOrDie()), authDispatcher(password) { bootStart = std::chrono::steady_clock::now(); if(injectedDirectory) { shardDirectory = injectedDirectory; // no ownership!!! } else { shardDirectoryOwnership.reset(new ShardDirectory(configuration.getDatabase(), configuration)); shardDirectory = shardDirectoryOwnership.get(); } if(configuration.getMode() == Mode::raft) { shard.reset(new Shard(shardDirectory, configuration.getMyself(), configuration.getMode(), timeouts, password)); if(!injectedDirectory) { shard->spinup(); } } else { shard.reset(new Shard(shardDirectory, {}, configuration.getMode(), timeouts, password)); } bootEnd = std::chrono::steady_clock::now(); } bool QuarkDBNode::isAuthenticated(Connection *conn) const { // Always permit access if: // - No password is set. (duh) // - Link is from localhost, AND we don't require that all localhost connections be authenticated. if(password.empty() || (conn->isLocalhost() && !configuration.getRequirePasswordForLocalhost() )) { conn->authorization = true; } return conn->authorization; } LinkStatus QuarkDBNode::dispatch(Connection *conn, Transaction &transaction) { // We need to be authenticated past this point. Are we? if(!isAuthenticated(conn)) { return conn->noauth("Authentication required."); } return shard->dispatch(conn, transaction); } LinkStatus QuarkDBNode::dispatch(Connection *conn, RedisRequest &req) { // Authentication command? if(req.getCommandType() == CommandType::AUTHENTICATION) { return authDispatcher.dispatch(conn, req); } // We need to be authenticated past this point. Are we? if(!isAuthenticated(conn)) { qdb_warn("Unauthenticated client attempted to execute command " << req[0]); return conn->noauth("Authentication required."); } switch(req.getCommand()) { case RedisCommand::PING: { return conn->raw(handlePing(req)); } case RedisCommand::CLIENT: { if (req.size() < 2) { return conn->err("malformed request"); } if(caseInsensitiveEquals(req[1], "setname")) { if(req.size() != 3) return conn->errArgs(req[0]); qdb_info("Connection with UUID " << conn->getID() << " identifying as '" << StringUtils::escapeNonPrintable(req[2]) << "'"); conn->setName(req[2]); return conn->ok(); } else if(caseInsensitiveEquals(req[1], "getname")) { if(req.size() != 2) return conn->errArgs(req[0]); return conn->string(conn->getName()); } return conn->err("malformed request"); } case RedisCommand::DEBUG: { if(req.size() != 2) return conn->errArgs(req[0]); if(caseInsensitiveEquals(req[1], "segfault")) { qdb_event("Performing harakiri on client request: SEGV"); if (system(SSTR("kill -11 " << getpid()).c_str())) { qdb_error("Failed whe sending SEGV"); } return conn->ok(); } if(caseInsensitiveEquals(req[1], "kill")) { qdb_event("Performing harakiri on client request: SIGKILL"); if (system(SSTR("kill -9 " << getpid()).c_str())) { qdb_error("Failed when sending SIGKILL"); } return conn->ok(); } if(caseInsensitiveEquals(req[1], "terminate")) { qdb_event("Performing harakiri on client request: SIGTERM"); if (system(SSTR("kill " << getpid()).c_str())) { qdb_error("Failed when sending SIGTERM"); } return conn->ok(); } return conn->err(SSTR("unknown argument '" << req[1] << "'")); } case RedisCommand::CLIENT_ID: { return conn->status(conn->getID()); } case RedisCommand::ACTIVATE_PUSH_TYPES: { conn->activatePushTypes(); return conn->ok(); } case RedisCommand::QUARKDB_INFO: { return conn->statusVector(this->info().toVector()); } case RedisCommand::QUARKDB_VERSION: { return conn->string(VERSION_FULL_STRING); } case RedisCommand::QUARKDB_CHECKPOINT: { if(req.size() != 2) return conn->errArgs(req[0]); std::string err = shardDirectory->checkpoint(req[1]); if(!err.empty()) { return conn->err(err); } return conn->ok(); } case RedisCommand::CONVERT_STRING_TO_INT: case RedisCommand::CONVERT_INT_TO_STRING: { return conn->raw(handleConversion(req)); } default: { return shard->dispatch(conn, req); } } } QuarkDBInfo QuarkDBNode::info() { return {configuration.getMode(), configuration.getDatabase(), configuration.getConfigurationPath(), VERSION_FULL_STRING, SSTR(ROCKSDB_MAJOR << "." << ROCKSDB_MINOR << "." << ROCKSDB_PATCH), SSTR(XrdVERSION), chooseWorstHealth(shard->getHealth().getIndicators()), shard->monitors(), std::chrono::duration_cast(bootEnd - bootStart).count(), std::chrono::duration_cast(std::chrono::steady_clock::now() - bootEnd).count() }; } std::vector QuarkDBInfo::toVector() const { std::vector ret; ret.emplace_back(SSTR("MODE " << modeToString(mode))); ret.emplace_back(SSTR("BASE-DIRECTORY " << baseDir)); ret.emplace_back(SSTR("CONFIGURATION-PATH " << configurationPath)); ret.emplace_back(SSTR("QUARKDB-VERSION " << version)); ret.emplace_back(SSTR("ROCKSDB-VERSION " << rocksdbVersion)); ret.emplace_back(SSTR("XROOTD-HEADERS " << xrootdHeaders)); ret.emplace_back(SSTR("NODE-HEALTH " << healthStatusAsString(nodeHealthStatus))); ret.emplace_back(SSTR("MONITORS " << monitors)); ret.emplace_back(SSTR("BOOT-TIME " << bootTime << " (" << formatTime(std::chrono::seconds(bootTime)) << ")")); ret.emplace_back(SSTR("UPTIME " << uptime << " (" << formatTime(std::chrono::seconds(uptime)) << ")")); return ret; }