// ---------------------------------------------------------------------- // File: RecoveryDispatcher.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "recovery/RecoveryDispatcher.hh" #include "Formatter.hh" #include "utils/CommandParsing.hh" #include "utils/IntToBinaryString.hh" #include "storage/KeyConstants.hh" #include "raft/RaftMembers.hh" #include "utils/CommandParsing.hh" using namespace quarkdb; RecoveryDispatcher::RecoveryDispatcher(RecoveryEditor &ed) : editor(ed) { } LinkStatus RecoveryDispatcher::dispatch(Connection *conn, Transaction &tx) { qdb_throw("Transactions not supported in RecoveryDispatcher"); } LinkStatus RecoveryDispatcher::dispatch(Connection *conn, RedisRequest &req) { return conn->raw(dispatch(req)); } RedisEncodedResponse RecoveryDispatcher::dispatch(RedisRequest &request) { switch(request.getCommand()) { case RedisCommand::CONVERT_STRING_TO_INT: case RedisCommand::CONVERT_INT_TO_STRING: { return handleConversion(request); } default: { // no-op, continue } } if(request.getCommandType() != CommandType::RECOVERY) { std::string msg = SSTR("unable to dispatch command " << quotes(request[0]) << " - remember we're running in recovery mode, not all operations are available"); qdb_warn(msg); return Formatter::err(msg); } switch(request.getCommand()) { case RedisCommand::RECOVERY_GET: { if(request.size() != 2) return Formatter::errArgs(request[0]); std::string value; rocksdb::Status st = editor.get(request[1], value); if(!st.ok()) return Formatter::fromStatus(st); return Formatter::string(value); } case RedisCommand::RECOVERY_SET: { if(request.size() != 3) return Formatter::errArgs(request[0]); return Formatter::fromStatus(editor.set(request[1], request[2])); } case RedisCommand::RECOVERY_DEL: { if(request.size() != 2) return Formatter::errArgs(request[0]); return Formatter::fromStatus(editor.del(request[1])); } case RedisCommand::RECOVERY_INFO: { if(request.size() != 1) return Formatter::errArgs(request[0]); return Formatter::vector(editor.retrieveMagicValues()); } case RedisCommand::RECOVERY_FORCE_RECONFIGURE_JOURNAL: { if(request.size() != 3) return Formatter::errArgs(request[0]); RaftMembers members; if(!members.parse(request[1])) { return Formatter::err("cannot parse new members"); } std::string clusterID; rocksdb::Status st = editor.get(KeyConstants::kJournal_ClusterID, clusterID); if(!st.ok()) { return Formatter::err(SSTR("unable to retrieve clusterID, status " << st.ToString() << " - are you sure this is a journal?")); } if(clusterID == request[2]) { return Formatter::err("when force reconfiguring, new clusterID must be different than old one"); } // All checks are clear, proceed qdb_assert(editor.set(KeyConstants::kJournal_ClusterID, request[2]).ok()); qdb_assert(editor.set(KeyConstants::kJournal_Members, request[1]).ok()); qdb_assert(editor.set(KeyConstants::kJournal_MembershipEpoch, intToBinaryString(0)).ok()); editor.del(KeyConstants::kJournal_PreviousMembers); editor.del(KeyConstants::kJournal_PreviousMembershipEpoch); return Formatter::ok(); } case RedisCommand::RECOVERY_SCAN: { if(request.size() < 2) return Formatter::errArgs(request[0]); ScanCommandArguments args = parseScanCommand(request.begin()+1, request.end(), false); if(!args.error.empty()) { return Formatter::err(args.error); } std::string nextCursor; std::vector results; editor.scan(args.cursor, args.count, nextCursor, results); if(nextCursor == "") nextCursor = "0"; else nextCursor = "next:" + nextCursor; return Formatter::scan(nextCursor, results); } case RedisCommand::RECOVERY_GET_ALL_VERSIONS: { if(request.size() != 2) return Formatter::errArgs(request[0]); std::vector results; editor.getAllVersions(request[1], results); return Formatter::vector(results); } default: { qdb_throw("should never reach here"); } } }