// ----------------------------------------------------------------------
// File: RaftCommon.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef __QUARKDB_RAFT_COMMON_H__
#define __QUARKDB_RAFT_COMMON_H__
#include "RedisRequest.hh"
#include "utils/TimeFormatting.hh"
#include "Common.hh"
#include "utils/Macros.hh"
#include "health/HealthIndicator.hh"
#include "Utils.hh"
namespace quarkdb {
enum class RaftStatus {
LEADER,
FOLLOWER,
CANDIDATE,
SHUTDOWN
};
std::string statusToString(RaftStatus st);
inline void append_int_to_string(int64_t source, std::ostringstream &target) {
char buff[sizeof(source)];
memcpy(&buff, &source, sizeof(source));
target.write(buff, sizeof(source));
}
inline int64_t fetch_int_from_string(const char *pos) {
int64_t result;
memcpy(&result, pos, sizeof(result));
return result;
}
using RaftSerializedEntry = std::string;
struct RaftEntry {
RaftTerm term;
RedisRequest request;
RaftEntry() {}
RaftEntry(RaftTerm term_, RedisRequest&& req) : term(term_), request(std::move(req)) {}
RaftEntry(RaftTerm term_, const RedisRequest& req) : term(term_), request(req) {}
template
RaftEntry(RaftTerm term_, Args&&... args) : term(term_), request{args...} {}
RaftSerializedEntry serialize() const {
std::ostringstream ss;
append_int_to_string(term, ss);
for(size_t i = 0; i < request.size(); i++) {
append_int_to_string(request[i].size(), ss);
ss << request[i];
}
return ss.str();
}
static void deserialize(RaftEntry &entry, std::string_view data) {
entry.request.clear();
entry.term = fetch_int_from_string(data.data());
const char *pos = data.data() + sizeof(term);
const char *end = data.data() + data.size();
while(pos < end) {
int64_t len = fetch_int_from_string(pos);
pos += sizeof(len);
entry.request.emplace_back(pos, len);
pos += len;
}
}
static int64_t fetchTerm(std::string_view data) {
return fetch_int_from_string(data.data());
}
bool operator==(const RaftEntry &rhs) const {
return term == rhs.term && request == rhs.request;
}
bool operator!=(const RaftEntry &rhs) const {
return !(*this == rhs);
}
};
struct RaftEntryWithIndex {
RaftEntry entry;
LogIndex index;
RaftEntryWithIndex(const RaftEntry &entr, LogIndex idx)
: entry(entr), index(idx) {}
};
inline std::ostream& operator<<(std::ostream& out, const RaftEntry& entry) {
out << "term: " << entry.term << " -> " << entry.request;
return out;
}
struct RaftHeartbeatRequest {
RaftTerm term;
RaftServer leader;
};
// The response to the node which sent us a heartbeat: our current term,
// whether we recognize the heartbeat-sender as leader, and if not, the reason
// why.
struct RaftHeartbeatResponse {
RaftTerm term;
bool nodeRecognizedAsLeader;
std::string err;
std::vector toVector() {
std::vector ret;
ret.push_back(std::to_string(term));
ret.push_back(std::to_string(nodeRecognizedAsLeader));
ret.push_back(err);
return ret;
}
};
struct RaftAppendEntriesRequest {
RaftTerm term;
RaftServer leader;
LogIndex prevIndex;
RaftTerm prevTerm;
LogIndex commitIndex;
std::vector entries;
};
struct RaftAppendEntriesResponse {
RaftAppendEntriesResponse(RaftTerm tr, LogIndex ind, bool out, const std::string &er)
: term(tr), logSize(ind), outcome(out), err(er) {}
RaftAppendEntriesResponse() {}
RaftTerm term = -1;
LogIndex logSize = -1;
bool outcome = false;
std::string err;
std::vector toVector() {
std::vector ret;
ret.push_back(std::to_string(term));
ret.push_back(std::to_string(logSize));
ret.push_back(std::to_string(outcome));
ret.push_back(err);
return ret;
}
};
struct RaftVoteRequest {
RaftTerm term;
RaftServer candidate;
LogIndex lastIndex;
RaftTerm lastTerm;
std::string describe(bool preVote) const {
std::ostringstream ss;
if(preVote) {
ss << "pre-vote request ";
}
else {
ss << "vote request ";
}
ss << "[candidate=" << candidate.toString() <<
", term=" << term <<
", lastIndex=" << lastIndex <<
", lastTerm=" << lastTerm <<
"]";
return ss.str();
}
};
enum class RaftVote {
VETO = -1,
REFUSED = 0,
GRANTED = 1
};
struct RaftVoteResponse {
RaftVoteResponse(RaftTerm tr, RaftVote vt) : term(tr), vote(vt) {}
RaftVoteResponse() : term(0), vote(RaftVote::VETO) {}
RaftTerm term;
RaftVote vote;
std::vector toVector() {
std::vector ret;
ret.push_back(std::to_string(term));
if(vote == RaftVote::GRANTED) {
ret.push_back("granted");
}
else if(vote == RaftVote::REFUSED) {
ret.push_back("refused");
}
else if(vote == RaftVote::VETO) {
ret.push_back("veto");
}
else {
qdb_throw("unable to convert vote to string in RaftVoteResponse::toVector");
}
return ret;
}
};
enum class ElectionOutcome {
kElected,
kNotElected,
kVetoed
};
inline size_t calculateQuorumSize(size_t members) {
return (members / 2) + 1;
}
struct ReplicaStatus {
RaftServer target;
bool online;
LogIndex logSize;
std::string version;
std::string resilveringProgress;
ReplicaStatus() {}
ReplicaStatus(const RaftServer &trg, bool onl, LogIndex indx, const std::string &ver = "N/A", const std::string &resilv = "")
: target(trg), online(onl), logSize(indx), version(ver), resilveringProgress(resilv) {}
bool upToDate(LogIndex leaderLogSize) const {
if(!online) return false;
if(logSize < 0) return false;
return (leaderLogSize - logSize < 30000);
}
std::string toString(LogIndex currentLogSize) const {
std::ostringstream ss;
toString(ss, currentLogSize);
return ss.str();
}
void toString(std::ostringstream &ss, LogIndex currentLogSize) const {
ss << target.toString() << " ";
if(online) {
ss << "| ONLINE | ";
if(!resilveringProgress.empty()) {
ss << "RESILVERING-PROGRESS " << resilveringProgress << " | ";
}
else if(upToDate(currentLogSize)) {
ss << "UP-TO-DATE | ";
}
else {
ss << "LAGGING | ";
}
ss << "LOG-SIZE ";
if(logSize < 0) {
ss << "N/A";
}
else {
ss << logSize;
}
ss << " | VERSION " << version;
}
else {
ss << "| OFFLINE";
}
}
};
struct ReplicationStatus {
std::vector replicas;
bool shakyQuorum = false;
size_t replicasOnline() {
size_t ret = 0;
for(size_t i = 0; i < replicas.size(); i++) {
if(replicas[i].online) {
ret++;
}
}
return ret;
}
size_t replicasUpToDate(LogIndex leaderLogSize) {
size_t ret = 0;
for(size_t i = 0; i < replicas.size(); i++) {
if(replicas[i].upToDate(leaderLogSize)) {
ret++;
}
}
return ret;
}
bool quorumUpToDate(LogIndex leaderLogSize) {
if(replicas.size() == 1) return true;
return calculateQuorumSize(replicas.size()) <= replicasUpToDate(leaderLogSize);
}
ReplicaStatus getReplicaStatus(const RaftServer &replica) {
for(size_t i = 0; i < replicas.size(); i++) {
if(replicas[i].target == replica) {
return replicas[i];
}
}
qdb_throw("Replica " << replica.toString() << " not found");
}
void removeReplica(const RaftServer &replica) {
for(size_t i = 0; i < replicas.size(); i++) {
if(replicas[i].target == replica) {
replicas.erase(replicas.begin()+i);
return;
}
}
qdb_throw("Replica " << replica.toString() << " not found");
}
void removeReplicas(const std::vector &replicas) {
for(size_t i = 0; i < replicas.size(); i++) {
removeReplica(replicas[i]);
}
}
void addReplica(const ReplicaStatus &replica) {
for(size_t i = 0; i < replicas.size(); i++) {
if(replicas[i].target == replica.target) {
qdb_throw("Targer " << replica.target.toString() << " already exists in the list");
}
}
replicas.push_back(replica);
}
bool contains(const RaftServer &replica) {
for(size_t i = 0; i < replicas.size(); i++) {
if(replicas[i].target == replica) return true;
}
return false;
}
};
struct RaftInfo {
RaftClusterID clusterID;
RaftServer myself;
RaftServer leader;
HealthStatus nodeHealthStatus;
FsyncPolicy fsyncPolicy;
LogIndex membershipEpoch;
std::vector nodes;
std::vector observers;
RaftTerm term;
LogIndex logStart;
LogIndex logSize;
RaftStatus status;
LogIndex commitIndex;
LogIndex lastApplied;
size_t blockedWrites;
int64_t lastStateChange;
ReplicationStatus replicationStatus;
std::string myVersion;
std::vector toVector() {
std::vector ret;
ret.push_back(SSTR("TERM " << term));
ret.push_back(SSTR("LOG-START " << logStart));
ret.push_back(SSTR("LOG-SIZE " << logSize));
ret.push_back(SSTR("LEADER " << leader.toString()));
ret.push_back(SSTR("CLUSTER-ID " << clusterID));
ret.push_back(SSTR("COMMIT-INDEX " << commitIndex));
ret.push_back(SSTR("LAST-APPLIED " << lastApplied));
ret.push_back(SSTR("BLOCKED-WRITES " << blockedWrites));
ret.push_back(SSTR("LAST-STATE-CHANGE " << lastStateChange << " (" << formatTime(std::chrono::seconds(lastStateChange)) << ")"));
ret.push_back("----------");
ret.push_back(SSTR("MYSELF " << myself.toString()));
ret.push_back(SSTR("VERSION " << myVersion));
ret.push_back(SSTR("STATUS " << statusToString(status)));
ret.push_back(SSTR("NODE-HEALTH " << healthStatusAsString(nodeHealthStatus)));
ret.push_back(SSTR("JOURNAL-FSYNC-POLICY " << fsyncPolicyToString(fsyncPolicy)));
ret.push_back("----------");
ret.push_back(SSTR("MEMBERSHIP-EPOCH " << membershipEpoch));
ret.push_back(SSTR("NODES " << serializeNodes(nodes)));
ret.push_back(SSTR("OBSERVERS " << serializeNodes(observers)));
ret.push_back(SSTR("QUORUM-SIZE " << calculateQuorumSize(nodes.size())));
if(!replicationStatus.replicas.empty()) {
ret.push_back("----------");
}
for(auto it = replicationStatus.replicas.begin(); it != replicationStatus.replicas.end(); it++) {
std::ostringstream ss;
ss << "REPLICA ";
it->toString(ss, logSize);
ret.push_back(ss.str());
}
return ret;
}
};
}
#endif