// ---------------------------------------------------------------------- // File: RaftDispatcher.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftDispatcher.hh" #include "raft/RaftUtils.hh" #include "raft/RaftJournal.hh" #include "raft/RaftWriteTracker.hh" #include "raft/RaftState.hh" #include "raft/RaftReplicator.hh" #include "redis/LeaseFilter.hh" #include "StateMachine.hh" #include "Formatter.hh" #include "utils/ParseUtils.hh" #include "utils/CommandParsing.hh" #include "Version.hh" #include #include using namespace quarkdb; RaftDispatcher::RaftDispatcher(RaftJournal &jour, StateMachine &sm, RaftState &st, RaftHeartbeatTracker &rht, RaftWriteTracker &wt, RaftReplicator &rep, Publisher &pub) : journal(jour), stateMachine(sm), state(st), heartbeatTracker(rht), redisDispatcher(sm, pub), writeTracker(wt), replicator(rep), publisher(pub) { } void RaftDispatcher::notifyDisconnect(Connection *conn) { publisher.notifyDisconnect(conn); } LinkStatus RaftDispatcher::dispatchInfo(Connection *conn, RedisRequest &req) { if(req.size() == 2 && caseInsensitiveEquals(req[1], "leader")) { return conn->string(state.getSnapshot()->leader.toString()); } return conn->statusVector(this->info().toVector()); } LinkStatus RaftDispatcher::dispatch(Connection *conn, Transaction &transaction) { return this->service(conn, transaction); } LinkStatus RaftDispatcher::dispatchPubsub(Connection *conn, RedisRequest &req) { // Only leaders should service pubsub requests. RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(snapshot->status != RaftStatus::LEADER) { if(snapshot->leader.empty()) { return conn->raw(Formatter::err("unavailable")); } // Redirect. return conn->raw(Formatter::moved(0, snapshot->leader)); } // We're good, submit to publisher. return publisher.dispatch(conn, req); } //------------------------------------------------------------------------------ // Check if the removal of the given node would be acceptable //------------------------------------------------------------------------------ bool RaftDispatcher::checkIfNodeRemovalAcceptable(const RaftServer &srv) { // Build a replication status object with how the full members would // look like after the node removal ReplicationStatus replicationStatus = replicator.getStatus(); replicationStatus.removeReplicas(journal.getMembership().observers); ReplicaStatus leaderStatus = { state.getMyself(), true, journal.getLogSize() }; replicationStatus.addReplica(leaderStatus); if(replicationStatus.contains(srv)) { replicationStatus.removeReplica(srv); } return replicationStatus.quorumUpToDate(leaderStatus.logSize); } LinkStatus RaftDispatcher::dispatch(Connection *conn, RedisRequest &req) { if(req.getCommandType() == CommandType::PUBSUB) { return dispatchPubsub(conn, req); } switch(req.getCommand()) { case RedisCommand::RAFT_INFO: { // safe, read-only request, does not need authorization return dispatchInfo(conn, req); } case RedisCommand::RAFT_LEADER_INFO: { // safe, read-only request, does not need authorization RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(snapshot->status != RaftStatus::LEADER) { if(snapshot->leader.empty()) { return conn->err("unavailable"); } return conn->moved(0, snapshot->leader); } return dispatchInfo(conn, req); } case RedisCommand::RAFT_FETCH_LAST: { // safe, read-only request, does not need authorization if(req.size() != 2 && req.size() != 3) return conn->errArgs(req[0]); int64_t nentries; if(!ParseUtils::parseInt64(req[1], nentries) || nentries <= 0) return conn->err(SSTR("could not parse " << req[1])); bool raw = false; if(req.size() == 3) { if(req[2] == "raw") { raw = true; } else { return conn->err(SSTR("could not parse " << req[2])); } } std::vector entries; journal.fetch_last(nentries, entries); return conn->raw(Formatter::raftEntries(entries, raw)); } case RedisCommand::RAFT_FETCH: { // safe, read-only request, does not need authorization if(req.size() != 2 && req.size() != 3) return conn->errArgs(req[0]); LogIndex index; if(!ParseUtils::parseInt64(req[1], index)) return conn->err(SSTR("could not parse " << req[1])); bool raw = false; if(req.size() == 3) { if(req[2] == "raw") { raw = true; } else { return conn->err(SSTR("could not parse " << req[2])); } } RaftEntry entry; std::vector ret; if(this->fetch(index, entry)) { return conn->raw(Formatter::raftEntry(entry, raw)); } return conn->null(); } case RedisCommand::RAFT_HEARTBEAT: { if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands"); RaftHeartbeatRequest dest; if(!RaftParser::heartbeat(std::move(req), dest)) { return conn->err("malformed request"); } RaftHeartbeatResponse resp = heartbeat(std::move(dest)); return conn->vector(resp.toVector()); } case RedisCommand::RAFT_APPEND_ENTRIES: { Connection::FlushGuard guard(conn); if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands"); RaftAppendEntriesRequest dest; if(!RaftParser::appendEntries(std::move(req), dest)) { return conn->err("malformed request"); } RaftAppendEntriesResponse resp = appendEntries(std::move(dest)); return conn->vector(resp.toVector()); } case RedisCommand::RAFT_SET_FSYNC_POLICY: { if(req.size() != 2u) return conn->errArgs(req[0]); FsyncPolicy policy; if(!parseFsyncPolicy(req[1], policy)) { return conn->err(SSTR("could not parse '" << req[1] << "', available choices: always,async,sync-important-updates")); } journal.setFsyncPolicy(policy); return conn->ok(); } case RedisCommand::RAFT_REQUEST_PRE_VOTE: { if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands"); RaftVoteRequest votereq; if(!RaftParser::voteRequest(req, votereq)) { return conn->err("malformed request"); } RaftVoteResponse resp = requestVote(votereq, true); return conn->vector(resp.toVector()); } case RedisCommand::RAFT_REQUEST_VOTE: { if(!conn->raftAuthorization) return conn->err("not authorized to issue raft commands"); RaftVoteRequest votereq; if(!RaftParser::voteRequest(req, votereq)) { return conn->err("malformed request"); } RaftVoteResponse resp = requestVote(votereq); return conn->vector(resp.toVector()); } case RedisCommand::RAFT_HANDSHAKE: { conn->raftAuthorization = false; if(req.size() != 4) return conn->errArgs(req[0]); if(req[2] != journal.getClusterID()) { qdb_misconfig("received handshake with wrong cluster id: " << req[2] << " (mine is " << journal.getClusterID() << ")"); return conn->err("wrong cluster id"); } if(req[3] != heartbeatTracker.getTimeouts().toString()) { qdb_misconfig("received handshake with different raft timeouts (" << req[3] << ") than mine (" << heartbeatTracker.getTimeouts().toString() << ")"); return conn->err("incompatible raft timeouts"); } conn->raftAuthorization = true; return conn->ok(); } case RedisCommand::RAFT_ATTEMPT_COUP: { RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(snapshot->leader.empty()) { return conn->err("I have no leader, cannot start a coup"); } if(snapshot->leader == state.getMyself()) { return conn->err("I am the leader! I can't revolt against myself, you know."); } if(!contains(journal.getMembership().nodes, state.getMyself())) { return conn->err("I am not a full cluster member, pointless to start a coup. First promote me from observer status."); } qdb_event("Received request to attempt a coup d'etat against the current leader."); heartbeatTracker.triggerTimeout(); return conn->status("vive la revolution"); } case RedisCommand::RAFT_ADD_OBSERVER: case RedisCommand::RAFT_REMOVE_MEMBER: case RedisCommand::RAFT_PROMOTE_OBSERVER: case RedisCommand::RAFT_DEMOTE_TO_OBSERVER: { std::scoped_lock lock(raftCommand); // We need to lock the journal for writes during a membership update. // Otherwise, a different client might race to acquire the same position // in the journal to place a different entry, and cause a crash. if(req.size() != 2) return conn->errArgs(req[0]); RaftServer srv; if(!parseServer(req[1], srv)) { return conn->err(SSTR("cannot parse server: " << req[1])); } RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(snapshot->status != RaftStatus::LEADER) return conn->err("not a leader"); if(srv == state.getMyself()) return conn->err("cannot perform membership changes on current leader"); std::string err; bool rc; if(req.getCommand() == RedisCommand::RAFT_ADD_OBSERVER) { rc = journal.addObserver(snapshot->term, srv, err); } else if(req.getCommand() == RedisCommand::RAFT_REMOVE_MEMBER) { if(!checkIfNodeRemovalAcceptable(srv)) { return conn->err("membership update blocked, new cluster would not have an up-to-date quorum"); } rc = journal.removeMember(snapshot->term, srv, err); } else if(req.getCommand() == RedisCommand::RAFT_PROMOTE_OBSERVER) { ReplicationStatus replicationStatus = replicator.getStatus(); if(!replicationStatus.getReplicaStatus(srv).upToDate(journal.getLogSize())) { return conn->err("membership update blocked, observer is not up-to-date"); } rc = journal.promoteObserver(snapshot->term, srv, err); } else if(req.getCommand() == RedisCommand::RAFT_DEMOTE_TO_OBSERVER) { if(!checkIfNodeRemovalAcceptable(srv)) { return conn->err("membership update blocked, new cluster would not have an up-to-date quorum"); } rc = journal.demoteToObserver(snapshot->term, srv, err); } else { qdb_throw("should never happen"); } if(!rc) return conn->err(err); // All clear, propagate the update replicator.reconfigure(); return conn->ok(); } case RedisCommand::ACTIVATE_STALE_READS: { conn->raftStaleReads = true; return conn->ok(); } case RedisCommand::RAFT_JOURNAL_SCAN: { if(req.size() <= 1) { return conn->errArgs(req[0]); } ScanCommandArguments args = parseScanCommand(req.begin()+1, req.end(), true); if(!args.error.empty()) { return conn->err(args.error); } LogIndex cursor; if(!ParseUtils::parseInt64(args.cursor, cursor)) { return conn->err(SSTR("invalid cursor: " << args.cursor)); } std::vector entries; LogIndex nextCursor; rocksdb::Status st = journal.scanContents(cursor, args.count, args.match, entries, nextCursor); if(!st.ok()) { return conn->raw(Formatter::fromStatus(st)); } return conn->raw(Formatter::journalScan(nextCursor, entries)); } case RedisCommand::RAFT_JOURNAL_MANUAL_COMPACTION: { if(req.size() != 1) { return conn->errArgs(req[0]); } return conn->fromStatus(journal.manualCompaction()); } case RedisCommand::RAFT_OBSERVE_TERM: { if(req.size() != 2) { return conn->errArgs(req[0]); } RaftTerm term = 0; if(!ParseUtils::parseInt64(req[1], term) || term < 0) { return conn->err(SSTR("cannot parse term: " << req[1])); } qdb_event("Artificially observing raft term " << term); bool outcome = state.observed(term, {}); return conn->integer(outcome); } default: { // Must be either a read, or write at this point. qdb_assert(req.getCommandType() == CommandType::WRITE || req.getCommandType() == CommandType::READ); Transaction tx(std::move(req)); return this->service(conn, tx); } } } LinkStatus RaftDispatcher::service(Connection *conn, Transaction &tx) { // if not leader, redirect... except if this is a read, // and stale reads are active! RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(snapshot->status != RaftStatus::LEADER) { if(snapshot->leader.empty()) { return conn->raw(Formatter::multiply(Formatter::err("unavailable"), tx.expectedResponses())); } if(conn->raftStaleReads && !tx.containsWrites()) { // Forward directly to the state machine. return redisDispatcher.dispatch(conn, tx); } // Redirect. return conn->raw(Formatter::multiply(Formatter::moved(0, snapshot->leader), tx.expectedResponses())); } // What happens if I was just elected as leader, but my state machine is // behind leadershipMarker? // // It means I have committed entries on the journal, which haven't been applied // to the state machine. If I were to service a read, I'd be giving out potentially // stale values! // // Ensure the state machine is all caught-up before servicing reads, in order // to prevent a linearizability violation. // // But we do the same thing for writes: // - Ensures a leader is stable before actually inserting writes into the // journal. // - Ensures no race conditions exist between committing the leadership marker // (which causes a hard-synchronization of the dynamic clock to the static // one), and the time we service lease requests. // // This adds some latency to writes right after a leader is elected, as we // need some extra roundtrips to commit the leadership marker. But since // leaders usually last weeks, who cares. if(stateMachine.getLastApplied() < snapshot->leadershipMarker) { // Stall client request until state machine is caught-up, or we lose leadership while(!stateMachine.waitUntilTargetLastApplied(snapshot->leadershipMarker, std::chrono::milliseconds(500))) { if(!state.isSnapshotCurrent(snapshot.get())) { // Ouch, we're no longer a leader.. start from scratch return this->service(conn, tx); } } // If we've made it this far, the state machine should be all caught-up // by now. Proceed to service this request. qdb_assert(snapshot->leadershipMarker <= stateMachine.getLastApplied()); } if(!tx.containsWrites()) { // Forward request to the state machine, without going through the // raft journal. return conn->addPendingTransaction(&redisDispatcher, std::move(tx)); } // At this point, the received command *must* be a write - verify! qdb_assert(tx.containsWrites()); // Do lease filtering ClockValue txTimestamp = stateMachine.getDynamicClock(); LeaseFilter::transform(tx, txTimestamp); // send request to the write tracker std::scoped_lock lock(raftCommand); LogIndex index = journal.getLogSize(); if(!writeTracker.append(index, snapshot->term, std::move(tx), conn->getQueue(), redisDispatcher)) { // We were most likely hit by the following race: // - We retrieved the state snapshot. // - The raft term was changed in the meantime, we lost leadership. // - The journal rejected the entry due to term mismatch. // Let's simply retry. return this->service(conn, tx); } return 1; } RaftHeartbeatResponse RaftDispatcher::heartbeat(const RaftHeartbeatRequest &req) { RaftStateSnapshotPtr snapshot; return heartbeat(req, snapshot); } RaftHeartbeatResponse RaftDispatcher::heartbeat(const RaftHeartbeatRequest &req, RaftStateSnapshotPtr &snapshot) { //---------------------------------------------------------------------------- // This RPC is a custom extension to raft - coupling appendEntries to // heartbeats creates certain issues: We can't aggressively pipeline the // replicated entries, for example, out of caution of losing the lease, // or the follower timing out, since pipelining will affect latencies of // acknowledgement reception. // // Having a separate RPC which is sent strictly every heartbeat interval in // addition to appendEntries should alleviate this, and make the cluster // far more robust against spurious timeouts in the presence of pipelined, // gigantic in size appendEntries messages. // // We don't lock raftCommand here - this is intentional! We only access // thread-safe objects, thus preventing the possibility of an appendEntries // storm blocking the heartbeats. //---------------------------------------------------------------------------- if(req.leader == state.getMyself()) { qdb_throw("received heartbeat from myself"); } state.observed(req.term, req.leader); snapshot = state.getSnapshot(); if(snapshot->status == RaftStatus::SHUTDOWN) { return {snapshot->term, false, "in shutdown"}; } if(req.term < snapshot->term) { return {snapshot->term, false, "My raft term is newer"}; } qdb_assert(req.term == snapshot->term); if(req.leader != snapshot->leader) { qdb_throw("Received append entries from " << req.leader.toString() << ", while I believe leader for term " << snapshot->term << " is " << snapshot->leader.toString()); } heartbeatTracker.heartbeat(std::chrono::steady_clock::now()); return {snapshot->term, true, ""}; } RaftAppendEntriesResponse RaftDispatcher::appendEntries(RaftAppendEntriesRequest &&req) { std::scoped_lock lock(raftCommand); //---------------------------------------------------------------------------- // An appendEntries RPC also serves as a heartbeat. We need to preserve the // state snapshot taken inside heartbeat. //---------------------------------------------------------------------------- RaftStateSnapshotPtr snapshot; RaftHeartbeatResponse heartbeatResponse = heartbeat({req.term, req.leader}, snapshot); if(!heartbeatResponse.nodeRecognizedAsLeader) { return {heartbeatResponse.term, journal.getLogSize(), false, heartbeatResponse.err}; } //---------------------------------------------------------------------------- // The contacting node is recognized as leader, proceed with the // requested journal modifications, if any. //---------------------------------------------------------------------------- writeTracker.flushQueues(Formatter::moved(0, snapshot->leader)); publisher.purgeListeners(Formatter::moved(0, snapshot->leader)); if(!journal.matchEntries(req.prevIndex, req.prevTerm)) { return {snapshot->term, journal.getLogSize(), false, "Log entry mismatch"}; } //---------------------------------------------------------------------------- // Four cases. // 1. All entries are new; we're grand. By far the most common case. // 2. The leader is sligthly confused and is sending entries that I have // already. Perform a quick check to ensure they're identical to mine and // continue on like nothing happened. // 3. Some of the entries are different than mine. This can be caused by mild // log inconsistencies when switching leaders. This is normal and expected // to happen rarely, so let's remove the inconsistent entries. // 4. Some of the entries are different, AND they've already been committed // or applied. This is a major safety violation and should never happen. //---------------------------------------------------------------------------- LogIndex firstInconsistency = journal.compareEntries(req.prevIndex+1, req.entries); LogIndex appendFrom = firstInconsistency - (req.prevIndex+1); // check if ALL entries are duplicates. If so, I don't need to do anything. if(appendFrom < LogIndex(req.entries.size()) ) { if(firstInconsistency <= journal.getCommitIndex()) { qdb_throw("detected inconsistent entries for index " << firstInconsistency << ". " << " Leader attempted to overwrite a committed entry with one with different contents."); } if(firstInconsistency != journal.getLogSize() && firstInconsistency <= stateMachine.getLastApplied()) { qdb_throw("raft invariant violation: Attempted to remove already applied entries as inconsistent. (first inconsistency: " << firstInconsistency << ", last applied: " << stateMachine.getLastApplied()); } journal.removeEntries(firstInconsistency); for(size_t i = appendFrom; i < req.entries.size(); i++) { if(!journal.append(req.prevIndex+1+i, req.entries[i], false)) { qdb_warn("something odd happened when adding entries to the journal.. probably a race condition, but should be harmless"); return {snapshot->term, journal.getLogSize(), false, "Unknown error"}; } } } journal.setCommitIndex(std::min(journal.getLogSize()-1, req.commitIndex)); warnIfLagging(req.commitIndex); return {snapshot->term, journal.getLogSize(), true, ""}; } void RaftDispatcher::warnIfLagging(LogIndex leaderCommitIndex) { const LogIndex threshold = 10000; LogIndex entriesBehind = leaderCommitIndex - journal.getCommitIndex(); if(entriesBehind > threshold && std::chrono::steady_clock::now() - lastLaggingWarning > std::chrono::seconds(10)) { qdb_warn("My commit index is " << entriesBehind << " entries behind that of the leader."); lastLaggingWarning = std::chrono::steady_clock::now(); } else if(entriesBehind <= threshold && lastLaggingWarning != std::chrono::steady_clock::time_point()) { qdb_info("No longer lagging significantly behind the leader. (" << entriesBehind << " entries)"); lastLaggingWarning = {}; } } RaftVoteResponse RaftDispatcher::requestVote(const RaftVoteRequest &req, bool preVote) { std::string reqDescr = req.describe(preVote); std::scoped_lock lock(raftCommand); if(req.candidate == state.getMyself()) { qdb_throw("received vote request from myself: " << reqDescr); } //---------------------------------------------------------------------------- // Defend against disruptive servers. // A node that has been removed from the cluster will often not know that, // and will repeatedly start elections trying to depose the current leader, // effectively making the cluster unavailable. // // If this node is not part of the cluster (that I know of) and I am already // in contact with the leader, completely ignore its vote request, and don't // take its term into consideration. // // If I don't have a leader, the situation is different though. Maybe this // node was added later, and I just don't know about it yet. Process the // request normally, since there's no leader to depose of, anyway. //---------------------------------------------------------------------------- if(!contains(state.getNodes(), req.candidate)) { RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(!snapshot->leader.empty()) { qdb_misconfig("Non-voting " << req.candidate.toString() << " attempted to disrupt the cluster by starting an election for term " << req.term << ". Ignoring its request - shut down that node!"); return {snapshot->term, RaftVote::VETO}; } qdb_warn("Non-voting " << req.candidate.toString() << " is requesting a vote, even though it is not a voting member of the cluster as far I know. Will still process its request, since I have no leader."); } if(!preVote) { state.observed(req.term, {}); } RaftStateSnapshotPtr snapshot = state.getSnapshot(); //---------------------------------------------------------------------------- // If the contacting node were to be elected, would they potentially overwrite // any of my committed entries? // // Raft should prevent this, but let's be extra paranoid and send a 'veto' // vote if that's the case. Even a single 'veto' response will prevent a node // from ascending, even if they have a quorum of positive votes. // // If this safety mechanism doesn't work for some reason (the network loses // the message, or whatever), this node will simply crash later on // with an exception instead of overwriting committed entries, in case the // candidate does ascend. // // Under normal circumstances, a 'veto' vote should never affect the outcome // of an election, and it ought to be identical to a 'refused' vote. //---------------------------------------------------------------------------- if(req.lastIndex <= journal.getCommitIndex()) { if(req.lastIndex < journal.getLogStart()) { qdb_event("Vetoing " << reqDescr << " because its lastIndex (" << req.lastIndex << ") is before my log start (" << journal.getLogStart() << ") - way too far behind me."); return {snapshot->term, RaftVote::VETO}; } RaftTerm myLastIndexTerm; if(!journal.fetch(req.lastIndex, myLastIndexTerm).ok()) { qdb_critical("Error when reading journal entry " << req.lastIndex << " when trying to determine if accepting a vote request could potentially overwrite my committed entries."); // It could be that I just have a corrupted journal - don't prevent the // node from ascending in this case... If I crash afterwards during // replication, so be it. return {snapshot->term, RaftVote::REFUSED}; } // If the node were to ascend, it'll try and remove my req.lastIndex entry // as inconsistent, which I consider committed already... Veto! if(req.lastTerm != myLastIndexTerm) { qdb_event("Vetoing " << reqDescr << " because its ascension would overwrite my committed entry with index " << req.lastIndex); return {snapshot->term, RaftVote::VETO}; } if(req.lastIndex+1 <= journal.getCommitIndex()) { // If the node were to ascend, it would add a leadership marker, and try // to remove my committed req.lastIndex+1 entry as conflicting. Veto! qdb_event("Vetoing " << reqDescr << " because its ascension would overwrite my committed entry with index " << req.lastIndex+1 << " through the addition of a leadership marker."); return {snapshot->term, RaftVote::VETO}; } } if(snapshot->term > req.term) { qdb_event("Rejecting " << reqDescr << " because of term mismatch: " << snapshot->term << " vs " << req.term); return {snapshot->term, RaftVote::REFUSED}; } if(!preVote) { qdb_assert(snapshot->term == req.term); } if(snapshot->term == req.term) { if(!snapshot->votedFor.empty() && snapshot->votedFor != req.candidate) { qdb_event("Rejecting " << reqDescr << " since I've voted already in this term (" << snapshot->term << ") for " << snapshot->votedFor.toString()); return {snapshot->term, RaftVote::REFUSED}; } } LogIndex myLastIndex = journal.getLogSize()-1; RaftTerm myLastTerm; if(!journal.fetch(myLastIndex, myLastTerm).ok()) { qdb_critical("Error when reading journal entry " << myLastIndex << " when processing request vote."); return {snapshot->term, RaftVote::REFUSED}; } if(req.lastTerm < myLastTerm) { qdb_event("Rejecting " << reqDescr << " since my journal is more up-to-date, based on last term: " << myLastIndex << "," << myLastTerm << " vs " << req.lastIndex << "," << req.lastTerm); return {snapshot->term, RaftVote::REFUSED}; } if(req.lastTerm == myLastTerm && req.lastIndex < myLastIndex) { qdb_event("Rejecting " << reqDescr << " since my journal is more up-to-date, based on last index: " << myLastIndex << "," << myLastTerm << " vs " << req.lastIndex << "," << req.lastTerm); return {snapshot->term, RaftVote::REFUSED}; } // Grant vote - be generous with the heartbeats to increase robustness. // A heartbeat that is registered only _after_ grantVote has been called suffers // from the following race: // - RaftDirector followerLoop is sleeping in state.wait // - grantVote triggers RaftDirector to wake up // - HeartbeatTracker has timed-out - followerLoop attempts to start an election, // and all this happens before we reach heartbeatTracker.heartbeat in this thread. // // ... even though we JUST voted for a different node! // // Therefore, register the heartbeat twice just to be sure. if(!preVote) { heartbeatTracker.heartbeat(std::chrono::steady_clock::now()); if(!state.grantVote(req.term, req.candidate)) { qdb_warn("RaftState rejected " << reqDescr << " - probably benign race condition?"); return {snapshot->term, RaftVote::REFUSED}; } heartbeatTracker.heartbeat(std::chrono::steady_clock::now()); } qdb_event("Granted " << reqDescr); return {snapshot->term, RaftVote::GRANTED}; } //------------------------------------------------------------------------------ // Return health information //------------------------------------------------------------------------------ NodeHealth RaftDispatcher::getHealth() { std::vector indicators = stateMachine.getHealthIndicators(); //---------------------------------------------------------------------------- // Am I currently part of the quorum? //---------------------------------------------------------------------------- RaftStateSnapshotPtr snapshot = state.getSnapshot(); if(snapshot->leader.empty()) { indicators.emplace_back(HealthStatus::kRed, "PART-OF-QUORUM", "No"); } else { indicators.emplace_back(HealthStatus::kGreen, "PART-OF-QUORUM", SSTR("Yes | LEADER " << snapshot->leader.toString())); } //---------------------------------------------------------------------------- // Leader? If so, show replication status //---------------------------------------------------------------------------- if(snapshot->status == RaftStatus::LEADER) { ReplicationStatus replicationStatus = replicator.getStatus(); LogIndex logSize = journal.getLogSize(); if(replicationStatus.shakyQuorum) { indicators.emplace_back(HealthStatus::kYellow, "QUORUM-STABILITY", "Shaky"); } else { indicators.emplace_back(HealthStatus::kGreen, "QUORUM-STABILITY", "Good"); } for(auto it = replicationStatus.replicas.begin(); it != replicationStatus.replicas.end(); it++) { HealthStatus replicaStatus = HealthStatus::kGreen; if(!it->online) { replicaStatus = HealthStatus::kYellow; } else if(!it->upToDate(logSize)) { replicaStatus = HealthStatus::kYellow; } indicators.emplace_back(replicaStatus, "REPLICA", it->toString(logSize)); } } return NodeHealth(VERSION_FULL_STRING, state.getMyself().toString(), indicators); } RaftInfo RaftDispatcher::info() { std::scoped_lock lock(raftCommand); RaftStateSnapshotPtr snapshot = state.getSnapshot(); RaftMembership membership = journal.getMembership(); ReplicationStatus replicationStatus = replicator.getStatus(); HealthStatus nodeHealthStatus = chooseWorstHealth(getHealth().getIndicators()); return {journal.getClusterID(), state.getMyself(), snapshot->leader, nodeHealthStatus, journal.getFsyncPolicy(), membership.epoch, membership.nodes, membership.observers, snapshot->term, journal.getLogStart(), journal.getLogSize(), snapshot->status, journal.getCommitIndex(), stateMachine.getLastApplied(), writeTracker.size(), std::chrono::duration_cast(std::chrono::steady_clock::now() - snapshot->timeCreated).count(), replicationStatus, VERSION_FULL_STRING }; } bool RaftDispatcher::fetch(LogIndex index, RaftEntry &entry) { rocksdb::Status st = journal.fetch(index, entry); return st.ok(); }