// ---------------------------------------------------------------------- // File: RaftVoteRegistry.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2020 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftVoteRegistry.hh" #include "raft/RaftUtils.hh" #include "raft/RaftState.hh" #include "raft/RaftLease.hh" namespace quarkdb { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ RaftVoteRegistry::RaftVoteRegistry(RaftTerm term, bool prevote) : mTerm(term), mPreVote(prevote) {} //------------------------------------------------------------------------------ // Register vote //------------------------------------------------------------------------------ void RaftVoteRegistry::registerVote(const RaftServer &srv, RaftVoteResponse resp) { qdb_assert(mContents.find(srv) == mContents.end()); SingleVote vote; vote.netError = false; vote.parseError = false; vote.resp = resp; mContents[srv] = vote; } //------------------------------------------------------------------------------ // Register vote //------------------------------------------------------------------------------ void RaftVoteRegistry::registerParseError(const RaftServer &srv) { qdb_assert(mContents.find(srv) == mContents.end()); SingleVote vote; vote.netError = false; vote.parseError = true; mContents[srv] = vote; } //------------------------------------------------------------------------------ // Register vote //------------------------------------------------------------------------------ void RaftVoteRegistry::registerNetworkError(const RaftServer &srv) { qdb_assert(mContents.find(srv) == mContents.end()); SingleVote vote; vote.netError = true; vote.parseError = false; mContents[srv] = vote; } //------------------------------------------------------------------------------ // Determine outcome //------------------------------------------------------------------------------ ElectionOutcome RaftVoteRegistry::determineOutcome() const { size_t positives = 0; for(auto it = mContents.begin(); it != mContents.end(); it++) { const SingleVote& sv = it->second; if(sv.netError) { continue; } else if(sv.parseError) { if(mPreVote) { // Does not support pre-vote... assume granted positives++; } } else if(sv.resp.vote == RaftVote::GRANTED) { positives++; } else if(sv.resp.vote == RaftVote::VETO) { return ElectionOutcome::kVetoed; } } // Implicit vote for myself positives++; if(positives >= calculateQuorumSize(mContents.size()+1)) { return ElectionOutcome::kElected; } return ElectionOutcome::kNotElected; } //------------------------------------------------------------------------------ // Count a specific type of vote //------------------------------------------------------------------------------ size_t RaftVoteRegistry::count(RaftVote vote) const { size_t num = 0; for(auto it = mContents.begin(); it != mContents.end(); it++) { const SingleVote& sv = it->second; if(sv.netError || sv.parseError) { continue; } if(sv.resp.vote == vote) { num++; } } return num; } //------------------------------------------------------------------------------ // Count network errors //------------------------------------------------------------------------------ size_t RaftVoteRegistry::countNetworkError() const { size_t num = 0; for(auto it = mContents.begin(); it != mContents.end(); it++) { const SingleVote& sv = it->second; if(sv.netError) { num++; } } return num; } //------------------------------------------------------------------------------ // Count parse errors //------------------------------------------------------------------------------ size_t RaftVoteRegistry::countParseError() const { size_t num = 0; for(auto it = mContents.begin(); it != mContents.end(); it++) { const SingleVote& sv = it->second; if(sv.parseError) { num++; } } return num; } //------------------------------------------------------------------------------ // Register vote //------------------------------------------------------------------------------ void RaftVoteRegistry::registerVote(const RaftServer &srv, std::future &fut, std::chrono::steady_clock::time_point deadline) { if(fut.wait_until(deadline) != std::future_status::ready) { return registerNetworkError(srv); } qclient::redisReplyPtr reply = fut.get(); if(reply == nullptr) { return registerNetworkError(srv); } RaftVoteResponse resp; if(!RaftParser::voteResponse(reply, resp)) { if(!mPreVote) { qdb_critical("Could not parse vote response from " << srv.toString() << ": " << qclient::describeRedisReply(reply)); } return registerParseError(srv); } return registerVote(srv, resp); } //------------------------------------------------------------------------------ // Describe outcome //------------------------------------------------------------------------------ std::string RaftVoteRegistry::describeOutcome() const { std::ostringstream ss; if(mPreVote) { ss << "Pre-vote round"; } else { ss << "Election round"; } const ElectionOutcome outcome = determineOutcome(); const size_t granted = count(RaftVote::GRANTED); const size_t refused = count(RaftVote::REFUSED); const size_t veto = count(RaftVote::VETO); if(outcome == ElectionOutcome::kElected) { ss << " successful"; } else { ss << " unsuccessful"; } ss << " for term " << mTerm << ". Contacted " << mContents.size() << " nodes,"; ss << " received " << granted+refused+veto << " replies with a tally of "; ss << granted << " positive votes, " << refused << " refused votes, and "; ss << veto << " vetoes."; if(granted >= calculateQuorumSize(mContents.size()+1) && veto > 0) { qdb_critical("Received a quorum of positive votes (" << granted << ") plus vetoes: " << veto); } return ss.str(); } //------------------------------------------------------------------------------ // Observe terms and leases //------------------------------------------------------------------------------ void RaftVoteRegistry::observeTermsAndLeases(RaftState &state, RaftLease &lease, std::chrono::steady_clock::time_point broadcastTimepoint) { qdb_assert(!mPreVote); for(auto it = mContents.begin(); it != mContents.end(); it++) { const SingleVote& sv = it->second; if(sv.netError || sv.parseError) { continue; } state.observed(sv.resp.term, {}); if(sv.resp.vote == RaftVote::GRANTED) { lease.getHandler(it->first).heartbeat(broadcastTimepoint); } } } }