// ----------------------------------------------------------------------
// File: RaftState.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "RaftState.hh"
#include "RaftJournal.hh"
using namespace quarkdb;
RaftServer RaftState::BLOCKED_VOTE = { "VOTING_BLOCKED_FOR_THIS_TERM", -1 };
RaftState::RaftState(RaftJournal &jr, const RaftServer &me)
: journal(jr), myself(me) {
status = RaftStatus::FOLLOWER;
term = journal.getCurrentTerm();
votedFor = journal.getVotedFor();
leadershipMarker = -1;
updateSnapshot();
}
RaftServer RaftState::getMyself() {
return myself;
}
std::vector RaftState::getNodes() {
return journal.getNodes();
}
RaftClusterID RaftState::getClusterID() {
return journal.getClusterID();
}
void RaftState::declareEvent(RaftTerm observedTerm, const RaftServer &observedLeader) {
if(observedTerm > term) {
qdb_event("Progressing raft term: " << term << " ==> " << observedTerm);
notifier.notify_all();
}
if(!observedLeader.empty()) {
qdb_event("Recognizing leader " << observedLeader.toString() << " for term " << observedTerm);
}
}
void RaftState::updateStatus(RaftStatus newstatus) {
if(status != newstatus) {
qdb_event("Status transition: " << statusToString(status) << " ==> " << statusToString(newstatus));
status = newstatus;
if(status != RaftStatus::LEADER) {
leadershipMarker = -1;
}
}
}
bool RaftState::dropOut(RaftTerm forTerm) {
std::scoped_lock lock(update);
if(status != RaftStatus::CANDIDATE) {
return false;
}
if(forTerm != term) {
return false;
}
updateStatus(RaftStatus::FOLLOWER);
updateSnapshot();
return true;
}
bool RaftState::becomeCandidate(RaftTerm forTerm) {
std::scoped_lock lock(update);
if(forTerm != term) {
// we got hit by a race.. do nothing
return false;
}
if(status != RaftStatus::FOLLOWER) {
qdb_warn("attempted to become a candidate without first being a follower for term " << forTerm);
return false;
}
if(!leader.empty()) {
qdb_warn("attempted to become a candidate for term " << term << " while having recognized "
<< leader.toString() << " as leader already");
return false;
}
if(!votedFor.empty()) {
qdb_warn("attempted to become a candidate for term " << term << " while having voted already for " << votedFor.toString());
return false;
}
if(!contains(journal.getNodes(), myself)) {
qdb_warn("attempted to become a candidate even though I'm not a full voting member");
return false;
}
votedFor = myself;
this->updateJournal();
updateStatus(RaftStatus::CANDIDATE);
updateSnapshot();
return true;
}
bool RaftState::ascend(RaftTerm forTerm) {
std::scoped_lock lock(update);
if(forTerm != term) {
// we got hit by a race.. do nothing
return false;
}
if(status != RaftStatus::CANDIDATE) {
qdb_critical("attempted to ascend without being a candidate for term " << forTerm << ".");
return false;
}
if(!leader.empty()) {
// we have already recognized a leader for the current term..
// something is wrong, do nothing
qdb_critical("attempted to ascend for term " << term << " while having recognized "
<< leader.toString() << " as leader already");
return false;
}
if(votedFor != myself) {
qdb_critical("attempted to ascend in term " << forTerm << " without having voted for myself first");
return false;
}
if(!contains(journal.getNodes(), myself)) {
qdb_critical("attempted to ascend even though I'm not a full voting member");
return false;
}
LogIndex localIndex = journal.getLogSize();
if(!journal.appendLeadershipMarker(localIndex, forTerm, myself)) {
qdb_warn("could not append leadership marker to journal for term " << forTerm << ", unable to ascend");
return false;
}
leader = myself;
leadershipMarker = localIndex;
updateStatus(RaftStatus::LEADER);
updateSnapshot();
qdb_event("Ascending as leader for term " << forTerm << ". Long may I reign.");
return true;
}
//------------------------------------------------------------------------------
// This function should be called AFTER we have established that the raft log
// of the server asking a vote is at least up-to-date as ours.
//------------------------------------------------------------------------------
bool RaftState::grantVote(RaftTerm forTerm, const RaftServer &vote) {
std::scoped_lock lock(update);
if(status != RaftStatus::FOLLOWER) {
qdb_warn("attempted to vote for " << vote.toString() << " while in status " << statusToString(status));
return false;
}
if(forTerm != term) {
// we got hit by a race.. term has progressed since this
// function got called. Do nothing.
return false;
}
if(!leader.empty()) {
// we have already recognized a leader for the current term..
// voting for another makes zero sense
qdb_critical("attempted to vote for " << vote.toString() << " and term "
<< term << " while there's already an established leader: " << leader.toString());
return false;
}
if(!votedFor.empty()) {
// ok, this is worrying, but could still be explained by a race.
// but should not normally happen, given that servicing of requestVote is
// serialized
qdb_critical("attempted to change vote for term " << term << ": " << votedFor.toString() << " ==> " << vote.toString());
return false;
}
qdb_event("Granting vote for term " << forTerm << " to " << vote.toString());
votedFor = vote;
this->updateJournal();
updateSnapshot();
return true;
}
void RaftState::shutdown() {
std::scoped_lock lock(update);
updateStatus(RaftStatus::SHUTDOWN);
updateSnapshot();
notifier.notify_all();
}
//------------------------------------------------------------------------------
// Wait until the timeout expires, or we enter shutdown mode.
//------------------------------------------------------------------------------
void RaftState::wait(const std::chrono::milliseconds &t) {
std::unique_lock lock(update);
if(status == RaftStatus::SHUTDOWN) return;
notifier.wait_for(lock, t);
}
//------------------------------------------------------------------------------
// Wait until the specified time_point, or we enter shutdown mode.
//------------------------------------------------------------------------------
void RaftState::wait_until(const std::chrono::steady_clock::time_point &t) {
std::unique_lock lock(update);
if(status == RaftStatus::SHUTDOWN) return;
notifier.wait_until(lock, t);
}
//------------------------------------------------------------------------------
// We must call updateJournal after having made changes to either term
// or votedFor.
//------------------------------------------------------------------------------
void RaftState::updateJournal() {
journal.setCurrentTerm(term, votedFor);
}
bool RaftState::observed(RaftTerm observedTerm, const RaftServer &observedLeader) {
std::scoped_lock lock(update);
// reject any changes if we're in shutdown mode
if(status == RaftStatus::SHUTDOWN) {
return false;
}
// observed a newer term, step down if leader / candidate
if(observedTerm > term) {
updateStatus(RaftStatus::FOLLOWER);
declareEvent(observedTerm, observedLeader);
votedFor.clear();
term = observedTerm;
leader = observedLeader;
//--------------------------------------------------------------------------
// If observedLeader is not empty, we have already discovered the leader for
// this term, which should never change.
// We set votedFor to an invalid value to prevent this node from voting for
// another server in this term after a crash.
// This is not strictly necessary to do, according to the raft description,
// but let's be conservative.
//--------------------------------------------------------------------------
if(!observedLeader.empty()) {
votedFor = BLOCKED_VOTE;
}
updateJournal();
updateSnapshot();
return true;
}
else if(observedTerm == term && leader.empty()) {
declareEvent(observedTerm, observedLeader);
leader = observedLeader;
//--------------------------------------------------------------------------
// Block any more votes for the current term, same reason as above
//--------------------------------------------------------------------------
if(!leader.empty() && votedFor.empty()) {
votedFor = BLOCKED_VOTE;
this->updateJournal();
}
updateSnapshot();
return true;
}
else if(observedTerm == term && !leader.empty() && leader != observedLeader && !observedLeader.empty()) {
qdb_critical("attempted to change leader for term " << term << ": " << leader.toString() << " ==> " << observedLeader.toString());
}
return false;
}
//------------------------------------------------------------------------------
// Update state snapshot
//------------------------------------------------------------------------------
void RaftState::updateSnapshot() {
std::atomic_store(
¤tSnapshot,
std::make_shared(term, status, leader, votedFor, leadershipMarker)
);
}
//------------------------------------------------------------------------------
// Helper function
//------------------------------------------------------------------------------
std::string quarkdb::statusToString(RaftStatus st) {
if(st == RaftStatus::LEADER) return "LEADER";
if(st == RaftStatus::FOLLOWER) return "FOLLOWER";
if(st == RaftStatus::CANDIDATE) return "CANDIDATE";
if(st == RaftStatus::SHUTDOWN) return "SHUTDOWN";
qdb_throw("unrecognized RaftStatus");
}