// ---------------------------------------------------------------------- // File: RaftUtils.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "utils/IntToBinaryString.hh" #include "RaftUtils.hh" #include "RaftTalker.hh" #include "RaftState.hh" #include "RaftLease.hh" #include "RaftContactDetails.hh" #include "raft/RaftVoteRegistry.hh" #include "utils/ParseUtils.hh" #include "utils/StringUtils.hh" namespace quarkdb { ElectionOutcome RaftElection::performPreVote(RaftVoteRequest votereq, RaftState &state, const RaftContactDetails &contactDetails) { if(!votereq.candidate.empty()) { qdb_throw("candidate member of votereq must be empty, it is filled out by this function"); } votereq.candidate = state.getMyself(); qdb_info("Starting pre-vote round for term " << votereq.term); std::chrono::steady_clock::time_point broadcastTimepoint = std::chrono::steady_clock::now(); std::vector> talkers; std::map> futures; for(const RaftServer &node : state.getNodes()) { if(node != votereq.candidate) { talkers.emplace_back(new RaftTalker(node, contactDetails, "internal-vote-request")); futures[node] = talkers.back()->requestVote(votereq, true); } } std::chrono::steady_clock::time_point deadline = broadcastTimepoint + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2; qdb_info("Pre-vote requests have been sent off, will allow a window of " << contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies."); RaftVoteRegistry registry(votereq.term, true); for(auto it = futures.begin(); it != futures.end(); it++) { registry.registerVote(it->first, it->second, deadline); } qdb_info(registry.describeOutcome()); return registry.determineOutcome(); } ElectionOutcome RaftElection::perform(RaftVoteRequest votereq, RaftState &state, RaftLease &lease, const RaftContactDetails &contactDetails) { if(!votereq.candidate.empty()) { qdb_throw("candidate member of votereq must be empty, it is filled out by this function"); } votereq.candidate = state.getMyself(); RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(votereq.term != snapshot->term) { qdb_warn("Aborting election, received stale term: " << votereq.term << " vs " << snapshot->term); return ElectionOutcome::kNotElected; } if(!snapshot->leader.empty()) { qdb_warn("Aborting election, we already have a recognized leader already for term " << snapshot->term << " which is " << snapshot->leader.toString()); return ElectionOutcome::kNotElected; } if(snapshot->status != RaftStatus::CANDIDATE) { qdb_warn("Aborting election, I am not a candidate for " << snapshot->term << ", but in status " << statusToString(snapshot->status)); return ElectionOutcome::kNotElected; } qdb_info("Starting election round for term " << votereq.term); std::chrono::steady_clock::time_point broadcastTimepoint = std::chrono::steady_clock::now(); std::vector> talkers; std::map> futures; for(const RaftServer &node : state.getNodes()) { if(node != votereq.candidate) { talkers.emplace_back(new RaftTalker(node, contactDetails, "internal-vote-request")); futures[node] = talkers.back()->requestVote(votereq); } } std::chrono::steady_clock::time_point deadline = broadcastTimepoint + contactDetails.getRaftTimeouts().getHeartbeatInterval()*2; qdb_info("Vote requests have been sent off, will allow a window of " << contactDetails.getRaftTimeouts().getLow().count() << "ms to receive replies."); RaftVoteRegistry registry(votereq.term, false); for(auto it = futures.begin(); it != futures.end(); it++) { registry.registerVote(it->first, it->second, deadline); } registry.observeTermsAndLeases(state, lease, broadcastTimepoint); ElectionOutcome outcome = registry.determineOutcome(); qdb_info(registry.describeOutcome()); if(outcome == ElectionOutcome::kElected) { if(state.ascend(votereq.term)) { return outcome; } // Race condition, term must have progressed. outcome = ElectionOutcome::kNotElected; } return outcome; } bool RaftParser::appendEntries(RedisRequest &&source, RaftAppendEntriesRequest &dest) { //---------------------------------------------------------------------------- // We assume source[0] is correct, ie "raft_append_entries" //---------------------------------------------------------------------------- // 3 chunks is the minimum for a 0-entries request if(source.size() < 3) return false; if(!parseServer(source[1], dest.leader)) return false; if(source[2].size() != sizeof(int64_t) * 5) return false; int64_t nreqs; dest.term = binaryStringToInt(source[2].data() + 0*sizeof(int64_t) ); dest.prevIndex = binaryStringToInt(source[2].data() + 1*sizeof(int64_t) ); dest.prevTerm = binaryStringToInt(source[2].data() + 2*sizeof(int64_t) ); dest.commitIndex = binaryStringToInt(source[2].data() + 3*sizeof(int64_t) ); nreqs = binaryStringToInt(source[2].data() + 4*sizeof(int64_t) ); if((int) source.size() != 3 + nreqs) return false; dest.entries.resize(nreqs); int64_t index = 3; for(int64_t i = 0; i < nreqs; i++) { RaftEntry::deserialize(dest.entries[i], source[index]); index++; } if(index != (int64_t) source.size()) return false; return true; } bool RaftParser::appendEntriesResponse(const redisReplyPtr &source, RaftAppendEntriesResponse &dest) { if(source == nullptr || source->type != REDIS_REPLY_ARRAY || source->elements != 4) { return false; } for(size_t i = 0; i < source->elements; i++) { if(source->element[i]->type != REDIS_REPLY_STRING) { return false; } } std::string_view tmp(source->element[0]->str, source->element[0]->len); if(!ParseUtils::parseInt64(tmp, dest.term)) return false; tmp = std::string_view(source->element[1]->str, source->element[1]->len); if(!ParseUtils::parseInt64(tmp, dest.logSize)) return false; tmp = std::string_view(source->element[2]->str, source->element[2]->len); if(tmp == "0") dest.outcome = false; else if(tmp == "1") dest.outcome = true; else return false; dest.err = std::string(source->element[3]->str, source->element[3]->len); return true; } bool RaftParser::heartbeat(const RedisRequest &source, RaftHeartbeatRequest &dest) { //---------------------------------------------------------------------------- // We assume source[0] is correct, ie "raft_heartbeat" //---------------------------------------------------------------------------- if(source.size() != 3) return false; if(!ParseUtils::parseInt64(source[1], dest.term)) return false; if(!parseServer(source[2], dest.leader)) return false; return true; } bool RaftParser::heartbeatResponse(const qclient::redisReplyPtr &source, RaftHeartbeatResponse &dest) { if(source == nullptr || source->type != REDIS_REPLY_ARRAY || source->elements != 3) { return false; } for(size_t i = 0; i < source->elements; i++) { if(source->element[i]->type != REDIS_REPLY_STRING) { return false; } } std::string_view tmp(source->element[0]->str, source->element[0]->len); if(!ParseUtils::parseInt64(tmp, dest.term)) return false; tmp = std::string_view(source->element[1]->str, source->element[1]->len); if(tmp == "0") dest.nodeRecognizedAsLeader = false; else if(tmp == "1") dest.nodeRecognizedAsLeader = true; else return false; dest.err = std::string(source->element[2]->str, source->element[2]->len); return true; } bool RaftParser::voteRequest(RedisRequest &source, RaftVoteRequest &dest) { //---------------------------------------------------------------------------- // We assume source[0] is correct, ie "raft_request_vote" //---------------------------------------------------------------------------- if(source.size() != 5) return false; if(!ParseUtils::parseInt64(source[1], dest.term)) return false; if(!parseServer(source[2], dest.candidate)) return false; if(!ParseUtils::parseInt64(source[3], dest.lastIndex)) return false; if(!ParseUtils::parseInt64(source[4], dest.lastTerm)) return false; return true; } bool RaftParser::voteResponse(const redisReplyPtr &source, RaftVoteResponse &dest) { if(source == nullptr || source->type != REDIS_REPLY_ARRAY || source->elements != 2) { return false; } for(size_t i = 0; i < source->elements; i++) { if(source->element[i]->type != REDIS_REPLY_STRING) { return false; } } std::string_view tmp(source->element[0]->str, source->element[0]->len); if(!ParseUtils::parseInt64(tmp, dest.term)) return false; tmp = std::string_view(source->element[1]->str, source->element[1]->len); if(tmp == "granted") { dest.vote = RaftVote::GRANTED; } else if(tmp == "refused") { dest.vote = RaftVote::REFUSED; } else if(tmp == "veto") { dest.vote = RaftVote::VETO; } else { return false; // parse error } return true; } bool RaftParser::fetchResponse(redisReply *source, RaftEntry &entry) { if(source == nullptr || source->type != REDIS_REPLY_ARRAY || source->elements != 2) { return false; } if(source->element[0]->type != REDIS_REPLY_STRING) { return false; } if(source->element[1]->type != REDIS_REPLY_ARRAY) { return false; } redisReply *req = source->element[1]; for(size_t i = 0; i < req->elements; i++) { if(req->element[i]->type != REDIS_REPLY_STRING) { return false; } } std::string_view tmp(source->element[0]->str, source->element[0]->len); if(!StringUtils::startsWith(tmp, "TERM: ")) return false; tmp = std::string_view(tmp.data()+ 6, tmp.size()-6); if(!ParseUtils::parseInt64(tmp, entry.term)) return false; entry.request.clear(); for(size_t i = 0; i < req->elements; i++) { entry.request.emplace_back(req->element[i]->str, req->element[i]->len); } return true; } bool RaftParser::fetchLastResponse(const qclient::redisReplyPtr &source, std::vector &entries) { if(source == nullptr || source->type != REDIS_REPLY_ARRAY) { return false; } entries.clear(); entries.resize(source->elements); for(size_t i = 0; i < source->elements; i++) { if(!fetchResponse(source->element[i], entries[i])) { return false; } } return true; } }