// ---------------------------------------------------------------------- // File: raft.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftDispatcher.hh" #include "raft/RaftReplicator.hh" #include "raft/RaftTalker.hh" #include "raft/RaftTimeouts.hh" #include "raft/RaftCommitTracker.hh" #include "raft/RaftBlockedWrites.hh" #include "raft/RaftMembers.hh" #include "raft/RaftJournal.hh" #include "raft/RaftLease.hh" #include "raft/RaftContactDetails.hh" #include "raft/RaftVoteRegistry.hh" #include "Version.hh" #include "test-utils.hh" #include "RedisParser.hh" #include using namespace quarkdb; #define ASSERT_OK(msg) ASSERT_TRUE(msg.ok()) #define ASSERT_REPLY(reply, val) { ASSERT_NE(reply, nullptr); ASSERT_EQ(std::string(((reply))->str, ((reply))->len), val); } class Raft_Replicator : public TestCluster3NodesFixture {}; class Raft_Voting : public TestCluster3NodesFixture {}; class Raft_Dispatcher : public TestCluster3NodesRelaxedTimeoutsFixture {}; class Raft_Election : public TestCluster3NodesRelaxedTimeoutsFixture {}; class Raft_Director : public TestCluster3NodesFixture {}; class Raft_CommitTracker : public TestCluster3NodesFixture {}; class Raft_JournalIterator : public TestCluster3NodesFixture {}; TEST_F(Raft_Replicator, no_replication_on_myself) { ASSERT_TRUE(state()->observed(2, {})); ASSERT_TRUE(state()->becomeCandidate(2)); ASSERT_TRUE(state()->ascend(2)); ASSERT_THROW(RaftReplicaTracker(myself(), state()->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()), FatalException); } TEST_F(Raft_Replicator, only_leader_can_launch_replicator) { ASSERT_THROW(RaftReplicaTracker(nodes()[1], state()->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()), FatalException); } TEST_F(Raft_Replicator, verify_sane_snapshot_term) { ASSERT_TRUE(state()->observed(2, {})); ASSERT_TRUE(state()->becomeCandidate(2)); ASSERT_TRUE(state()->ascend(2)); // trying to replicate for a term in the future RaftStateSnapshotPtr snapshot = state()->getSnapshot(); RaftStateSnapshot snapshot2(*snapshot.get()); snapshot2.term = 3; ASSERT_THROW(RaftReplicaTracker(nodes()[1], RaftStateSnapshotPtr(new RaftStateSnapshot(snapshot2)), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()), FatalException); // stale term - this can naturally happen, so it is not an exception ASSERT_TRUE(state()->observed(4, {})); RaftReplicaTracker tracker(nodes()[1], snapshot, *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()); ASSERT_FALSE(tracker.isRunning()); } TEST_F(Raft_Replicator, do_simple_replication) { // node #0 will replicate its log to node #1 ASSERT_TRUE(state(0)->observed(2, {})); ASSERT_TRUE(state(0)->becomeCandidate(2)); ASSERT_TRUE(state(0)->ascend(2)); // add an inconsistent journal entry to #1, just for fun ASSERT_TRUE(journal(1)->append(1, RaftEntry(0, "supposed", "to", "be", "removed"))); ASSERT_EQ(state(1)->getSnapshot()->term, 0); // activate poller for #1 poller(1); // launch! RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()); ASSERT_TRUE(tracker.isRunning()); // populate #0's journal for(size_t i = 0; i < testreqs.size(); i++) { ASSERT_TRUE(journal(0)->append(i+2, RaftEntry(2, testreqs[i]))); } // verify #1 recognized #0 as leader and that replication was successful RETRY_ASSERT_EQ(journal(1)->getLogSize(), (int64_t) testreqs.size()+2); RaftStateSnapshotPtr snapshot = state(1)->getSnapshot(); ASSERT_EQ(snapshot->term, 2); ASSERT_EQ(snapshot->leader, myself(0)); for(size_t i = 0; i < testreqs.size(); i++) { RaftEntry entry; ASSERT_TRUE(dispatcher(1)->fetch(i+2, entry)); ASSERT_EQ(entry.term, 2); ASSERT_EQ(entry.request, testreqs[i]); } } TEST_F(Raft_Replicator, test_replication_with_empty_journals) { // node #0 will do replication to #1, but with a journal that only contains // 1 entry. ASSERT_TRUE(state(0)->observed(2, {})); ASSERT_TRUE(state(0)->becomeCandidate(2)); ASSERT_TRUE(state(0)->ascend(2)); // active poller for #1 poller(1); // launch RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()); ASSERT_TRUE(tracker.isRunning()); // verify everything's sane RETRY_ASSERT_EQ(state(1)->getSnapshot()->leader, myself(0)); RaftStateSnapshotPtr snapshot = state(1)->getSnapshot(); ASSERT_EQ(snapshot->term, 2); ASSERT_EQ(snapshot->leader, myself(0)); RETRY_ASSERT_EQ(journal(1)->getLogSize(), 2); RaftEntry entry; journal(1)->fetch_or_die(1, entry); ASSERT_EQ(entry.request, make_req("JOURNAL_LEADERSHIP_MARKER", SSTR(2), myself(0).toString())); ASSERT_EQ(journal(1)->getLogSize(), 2); } TEST_F(Raft_Replicator, follower_has_larger_journal_than_leader) { // through the addition of several inconsistent entries, a follower // ended up with a larger journal than the leader ASSERT_TRUE(state(0)->observed(2, {})); ASSERT_TRUE(state(0)->becomeCandidate(2)); ASSERT_TRUE(state(0)->ascend(2)); ASSERT_TRUE(journal(1)->append(1, RaftEntry(0, "supposed", "to", "be", "removed1"))); ASSERT_TRUE(journal(1)->append(2, RaftEntry(0, "supposed", "to", "be", "removed2"))); ASSERT_TRUE(journal(1)->append(3, RaftEntry(0, "supposed", "to", "be", "removed3"))); ASSERT_EQ(state(1)->getSnapshot()->term, 0); // activate poller for #1 poller(1); // launch! RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()); ASSERT_TRUE(tracker.isRunning()); // verify #1 recognized #0 as leader and that replication was successful RETRY_ASSERT_EQ(journal(1)->getLogSize(), 2); RaftStateSnapshotPtr snapshot = state(1)->getSnapshot(); ASSERT_EQ(snapshot->term, 2); ASSERT_EQ(snapshot->leader, myself(0)); } TEST_F(Raft_Replicator, no_replication_of_higher_term_entries) { // Try to trick the replicator into sending entries of higher term // than its snapshot, verify it doesn't succeed. That's a race // condition that could happen once a new leader starts replicating // entries into a node which used to be leader, but its replicator // hasn't shut down completely yet. ASSERT_TRUE(state(0)->observed(1, {})); ASSERT_TRUE(state(0)->becomeCandidate(1)); ASSERT_TRUE(state(0)->ascend(1)); ASSERT_TRUE(journal(0)->setCurrentTerm(2, {})); ASSERT_TRUE(journal(0)->append(2, RaftEntry(2, "should", "not", "get", "replicated"))); // activate poller for #1 poller(1); // launch! RaftReplicaTracker tracker(myself(1), state(0)->getSnapshot(), *journal(), *state(), *lease(), *commitTracker(), *trimmer(), *shardDirectory(), *raftconfig(), *contactDetails()); RETRY_ASSERT_TRUE(!tracker.isRunning()); ASSERT_TRUE(journal(0)->getCommitIndex() == 0); ASSERT_TRUE(journal(1)->getCommitIndex() == 0); ASSERT_TRUE(journal(0)->getLogSize() == 3); ASSERT_TRUE(journal(1)->getLogSize() == 1); } TEST_F(Raft_Dispatcher, validate_initial_state) { RaftInfo info = dispatcher()->info(); ASSERT_EQ(info.clusterID, clusterID()); ASSERT_EQ(info.myself, myself()); ASSERT_EQ(info.term, 0); ASSERT_EQ(info.logSize, 1); ASSERT_TRUE(info.observers.empty()); ASSERT_EQ(info.nodes, nodes()); ASSERT_EQ(info.membershipEpoch, 0); ASSERT_EQ(info.myVersion, VERSION_FULL_STRING); RaftEntry entry; ASSERT_TRUE(dispatcher()->fetch(0, entry)); ASSERT_EQ(entry.term, 0); ASSERT_EQ(entry.request, make_req("JOURNAL_UPDATE_MEMBERS", RaftMembers(nodes(), {}).toString(), info.clusterID)); } TEST_F(Raft_Dispatcher, send_first_heartbeat) { // simulate heartbeat from #1 to #0 RaftAppendEntriesRequest req; req.term = 1; req.leader = myself(1); req.prevIndex = 0; req.prevTerm = 0; req.commitIndex = 0; RaftAppendEntriesResponse resp = dispatcher()->appendEntries(std::move(req)); ASSERT_EQ(resp.term, 1); ASSERT_TRUE(resp.outcome); ASSERT_EQ(resp.logSize, 1); } TEST_F(Raft_Dispatcher, throw_on_append_entries_from_myself) { RaftAppendEntriesRequest req; req.term = 2; req.leader = myself(0); req.prevIndex = 0; req.prevTerm = 0; req.commitIndex = 0; ASSERT_THROW(dispatcher()->appendEntries(std::move(req)), FatalException); } TEST_F(Raft_Dispatcher, add_entries) { RaftAppendEntriesRequest req; req.term = 2; req.leader = myself(1); req.prevIndex = 0; req.prevTerm = 0; req.commitIndex = 0; req.entries.emplace_back(1, "set", "qwerty", "123"); req.entries.emplace_back(1, "hset", "abc", "123", "234"); RaftAppendEntriesResponse resp = dispatcher()->appendEntries(std::move(req)); ASSERT_EQ(resp.term, 2); ASSERT_TRUE(resp.outcome); ASSERT_EQ(resp.logSize, 3); // previous entry term mismatch, but verify term progressed req = RaftAppendEntriesRequest(); req.term = 3; req.leader = myself(1); req.prevIndex = 2; req.prevTerm = 0; req.commitIndex = 0; resp = dispatcher()->appendEntries(std::move(req)); ASSERT_EQ(resp.term, 3); ASSERT_FALSE(resp.outcome); ASSERT_EQ(resp.logSize, 3); // add three more entries with a different leader, while removing the last // entry as inconsistent req = RaftAppendEntriesRequest(); req.term = 5; req.leader = myself(2); req.prevIndex = 1; req.prevTerm = 1; req.commitIndex = 1; req.entries.emplace_back(3, "sadd", "myset", "a"); req.entries.emplace_back(3, "sadd", "myset", "b"); req.entries.emplace_back(3, "sadd", "myset", "c"); resp = dispatcher()->appendEntries(std::move(req)); ASSERT_EQ(resp.term, 5); ASSERT_TRUE(resp.outcome) << resp.err; ASSERT_EQ(resp.logSize, 5); RaftEntry entry; ASSERT_TRUE(dispatcher()->fetch(2, entry)); ASSERT_EQ(entry.term, 3); ASSERT_EQ(entry.request, make_req("sadd", "myset", "a")); // let's commit all entries req = RaftAppendEntriesRequest(); req.term = 5; req.leader = myself(2); req.prevIndex = 4; req.prevTerm = 3; req.commitIndex = 4; resp = dispatcher()->appendEntries(std::move(req)); ASSERT_EQ(resp.term, 5); ASSERT_TRUE(resp.outcome); ASSERT_EQ(resp.logSize, 5); // now let's say the new leader is a little confused, and tries to replicate the // last *committed* entry once again. Ensure the follower plays along req = RaftAppendEntriesRequest(); req.term = 5; req.leader = myself(2); req.prevIndex = 3; req.prevTerm = 3; req.commitIndex = 4; req.entries.emplace_back(3, "sadd", "myset", "c"); resp = dispatcher()->appendEntries(std::move(req)); ASSERT_EQ(resp.term, 5); ASSERT_TRUE(resp.outcome); ASSERT_EQ(resp.logSize, 5); // the leader is still confused, and is sending an even older entry req = RaftAppendEntriesRequest(); req.term = 5; req.leader = myself(2); req.prevIndex = 2; req.prevTerm = 3; req.commitIndex = 4; req.entries.emplace_back(3, "sadd", "myset", "b"); resp = dispatcher()->appendEntries(std::move(req)); ASSERT_EQ(resp.term, 5); ASSERT_TRUE(resp.outcome); ASSERT_EQ(resp.logSize, 5); // the leader is drunk and tries to overwrite the last committed entry with // a different one. req = RaftAppendEntriesRequest(); req.term = 5; req.leader = myself(2); req.prevIndex = 3; req.prevTerm = 3; req.commitIndex = 4; req.entries.emplace_back(3, "sadd", "a different set", "c"); ASSERT_THROW(dispatcher()->appendEntries(std::move(req)), FatalException); } TEST_F(Raft_Dispatcher, incompatible_timeouts) { // try to talk to a raft server while providing the wrong timeouts poller(0); RaftContactDetails cd(clusterID(), RaftTimeouts(std::chrono::milliseconds(1), std::chrono::milliseconds(2), std::chrono::milliseconds(3)), ""); RaftTalker talker(myself(0), cd, "tests"); RaftVoteRequest votereq; votereq.term = 1337; votereq.candidate = {"its_me_ur_leader", 1234}; votereq.lastIndex = 35000000; votereq.lastTerm = 1000; ASSERT_FALSE(talker.requestVote(votereq).get()); } TEST_F(Raft_Dispatcher, test_wrong_cluster_id) { // try to talk to a raft server while providing the wrong // cluster id, verify it sends us to hell poller(0); RaftContactDetails cd("random_cluster_id", timeouts(), ""); RaftTalker talker(myself(0), cd, "tests"); RaftVoteRequest votereq; votereq.term = 1337; votereq.candidate = {"its_me_ur_leader", 1234}; votereq.lastIndex = 35000000; votereq.lastTerm = 1000; ASSERT_FALSE(talker.requestVote(votereq).get()); std::vector entries; redisReplyPtr reply = talker.appendEntries(13737, myself(1), 3000, 100, 500, entries).get(); ASSERT_FALSE(reply); } TEST_F(Raft_Voting, throws_with_requestvote_to_myself) { RaftVoteRequest req; req.term = 1; req.candidate = myself(); req.lastTerm = 0; req.lastIndex = 2; ASSERT_THROW(dispatcher()->requestVote(req), FatalException); } TEST_F(Raft_Voting, no_double_voting_on_same_term) { RaftVoteRequest req; req.term = 1; req.candidate = myself(1); req.lastTerm = 0; req.lastIndex = 2; RaftVoteResponse resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::GRANTED); req.candidate = myself(2); resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::REFUSED); } TEST_F(Raft_Voting, no_votes_for_previous_terms) { RaftVoteRequest req; req.term = 1; req.candidate = myself(1); req.lastTerm = 0; req.lastIndex = 2; RaftVoteResponse resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::GRANTED); req.term = 0; resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::REFUSED); } TEST_F(Raft_Voting, no_votes_to_outdated_logs) { RaftVoteRequest req; req.term = 5; req.candidate = myself(1); req.lastTerm = 0; req.lastIndex = 1; RaftVoteResponse resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::GRANTED); // add a few requests to the log ASSERT_TRUE(journal()->append(1, RaftEntry(3, testreqs[0]))); ASSERT_TRUE(journal()->append(2, RaftEntry(4, testreqs[1]))); ASSERT_TRUE(journal()->append(3, RaftEntry(5, testreqs[2]))); req.term = 6; req.candidate = myself(2); req.lastTerm = 4; req.lastIndex = 30; resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::REFUSED); req.lastTerm = 5; req.lastIndex = 2; resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::REFUSED); req.lastIndex = 4; resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::GRANTED); } TEST_F(Raft_Voting, veto_if_new_leader_would_overwrite_committed_entries) { RaftVoteRequest req; req.term = 5; req.candidate = myself(1); req.lastTerm = 0; req.lastIndex = 1; RaftVoteResponse resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::GRANTED); // add a few requests to the log ASSERT_TRUE(journal()->append(1, RaftEntry(3, testreqs[0]))); ASSERT_TRUE(journal()->append(2, RaftEntry(4, testreqs[1]))); ASSERT_TRUE(journal()->append(3, RaftEntry(5, testreqs[2]))); // commit all of them ASSERT_TRUE(journal()->setCommitIndex(3)); req.term = 6; req.candidate = myself(2); req.lastTerm = 2; req.lastIndex = 1; // would overwrite committed entry #1 resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::VETO); req.lastTerm = 3; // contacting node is too far behind, and the addition of the leadership marker // would overwrite entry #2 resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::VETO); // contacting node's lastIndex has a higher term than local, committed lastIndex. req.lastTerm = 4; ASSERT_EQ(issueManualVote(req).vote, RaftVote::VETO); // Case where lastIndex has been trimmed already RETRY_ASSERT_TRUE(stateMachine()->getLastApplied() >= 2); journal()->trimUntil(2); req.lastIndex = 1; req.lastTerm = 3; resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::VETO); } TEST_F(Raft_Voting, smaller_log_but_last_index_higher_term) { ASSERT_TRUE(state()->observed(5, {})); // add a few entries ASSERT_TRUE(journal()->append(1, RaftEntry(3, testreqs[0]))); ASSERT_TRUE(journal()->append(2, RaftEntry(3, testreqs[1]))); ASSERT_TRUE(journal()->append(3, RaftEntry(3, testreqs[2]))); RaftVoteRequest req; req.term = 9; req.candidate = myself(1); req.lastTerm = 5; req.lastIndex = 2; RaftVoteResponse resp = issueManualVote(req); ASSERT_EQ(resp.vote, RaftVote::GRANTED); } TEST(RaftTimeouts, basic_sanity) { RaftTimeouts timeouts(std::chrono::milliseconds(100), std::chrono::milliseconds(200), std::chrono::milliseconds(50)); ASSERT_EQ(timeouts.getLow(), std::chrono::milliseconds(100)); ASSERT_EQ(timeouts.getHigh(), std::chrono::milliseconds(200)); ASSERT_EQ(timeouts.getHeartbeatInterval(), std::chrono::milliseconds(50)); for(size_t i = 0; i < 10; i++) { std::chrono::milliseconds random = timeouts.getRandom(); ASSERT_LE(random.count(), 200); ASSERT_GE(random.count(), 100); } } TEST(RaftTimeouts, serialization) { RaftTimeouts timeouts(std::chrono::milliseconds(133), std::chrono::milliseconds(166), std::chrono::milliseconds(30) ); ASSERT_EQ(timeouts.toString(), "133:166:30"); RaftTimeouts deserialized(std::chrono::milliseconds(1), std::chrono::milliseconds(2), std::chrono::milliseconds(3) ); ASSERT_EQ(deserialized.toString(), "1:2:3"); ASSERT_TRUE(RaftTimeouts::fromString(deserialized, timeouts.toString())); ASSERT_EQ(timeouts, deserialized); ASSERT_EQ(timeouts.toString(), deserialized.toString()); std::string description = "1337:1338:1339"; ASSERT_TRUE(RaftTimeouts::fromString(deserialized, description)); ASSERT_EQ(deserialized.toString(), description); ASSERT_FALSE(RaftTimeouts::fromString(deserialized, "adfas")); ASSERT_FALSE(RaftTimeouts::fromString(deserialized, "1234:dfa:134")); ASSERT_FALSE(RaftTimeouts::fromString(deserialized, "pquf:13:134")); ASSERT_FALSE(RaftTimeouts::fromString(deserialized, "11:13:kajshf")); ASSERT_FALSE(RaftTimeouts::fromString(deserialized, "1234:1234:134:1341")); } TEST_F(Raft_Election, basic_sanity) { ASSERT_TRUE(state()->observed(2, {})); // term mismatch, can't perform election RaftVoteRequest votereq; votereq.lastIndex = 1; votereq.term = 1; votereq.lastTerm = 0; ASSERT_EQ(ElectionOutcome::kNotElected, RaftElection::perform(votereq, *state(), *lease(), *contactDetails())); // we have a leader already, can't do election ASSERT_TRUE(state()->observed(2, myself(1))); votereq.term = 2; ASSERT_EQ( ElectionOutcome::kNotElected, RaftElection::perform(votereq, *state(), *lease(), *contactDetails())); // votereq.candidate must be empty votereq.candidate = myself(1); votereq.term = 3; ASSERT_TRUE(state()->observed(3, {})); ASSERT_THROW(RaftElection::perform(votereq, *state(), *lease(), *contactDetails()), FatalException); } TEST_F(Raft_Election, leader_cannot_call_election) { ASSERT_TRUE(state()->observed(2, {})); ASSERT_TRUE(state()->becomeCandidate(2)); ASSERT_TRUE(state()->ascend(2)); RaftVoteRequest votereq; votereq.lastIndex = 5; votereq.term = 2; votereq.lastTerm = 1; ASSERT_EQ( ElectionOutcome::kNotElected, RaftElection::perform(votereq, *state(), *lease(), *contactDetails())); } TEST_F(Raft_Election, observer_cannot_call_election) { // initialize node #0 not to be part of the cluster, thus an observer node(0, GlobalEnv::server(3)); RaftStateSnapshotPtr snapshot = state()->getSnapshot(); ASSERT_EQ(snapshot->status, RaftStatus::FOLLOWER); ASSERT_TRUE(state()->observed(1, {})); RaftVoteRequest votereq; votereq.term = 1; votereq.lastTerm = 0; votereq.lastIndex = 5; ASSERT_EQ( ElectionOutcome::kNotElected, RaftElection::perform(votereq, *state(), *lease(), *contactDetails())); } TEST_F(Raft_Election, complete_simple_election) { // initialize our raft cluster .. poller(0); poller(1); poller(2); ASSERT_TRUE(state(0)->observed(2, {})); ASSERT_TRUE(state(0)->becomeCandidate(2)); RaftVoteRequest votereq; votereq.term = 2; votereq.lastIndex = 0; votereq.lastTerm = 0; ASSERT_EQ( ElectionOutcome::kElected, RaftElection::perform(votereq, *state(0), *lease(0), *contactDetails(0))); RaftStateSnapshotPtr snapshot0 = state(0)->getSnapshot(); ASSERT_EQ(snapshot0->term, 2); ASSERT_EQ(snapshot0->leader, myself(0)); ASSERT_EQ(snapshot0->status, RaftStatus::LEADER); // the rest of the nodes have not recognized the leadership yet, would need to // send heartbeats } TEST_F(Raft_Election, unsuccessful_election_not_enough_votes) { // #0 is alone in the cluster, its election rounds should always fail poller(); ASSERT_TRUE(state()->observed(2, {})); ASSERT_TRUE(state()->becomeCandidate(2)); RaftVoteRequest votereq; votereq.term = 2; votereq.lastIndex = 0; votereq.lastTerm = 0; ASSERT_EQ( ElectionOutcome::kNotElected, RaftElection::perform(votereq, *state(0), *lease(0), *contactDetails(0))); } TEST_F(Raft_Election, split_votes_successful_election) { // let's have some more fun - have #1 already vote for term 2 for itself, // so it rejects any further requests // still possible to achieve quorum with #0 and #2 poller(0); poller(1); poller(2); ASSERT_TRUE(state(0)->observed(2, {})); ASSERT_TRUE(state(0)->becomeCandidate(2)); ASSERT_TRUE(state(1)->observed(2, {})); // #1 has already voted in term 2 ASSERT_TRUE(state(1)->grantVote(2, myself(1))); RaftVoteRequest votereq; votereq.term = 2; votereq.lastIndex = 0; votereq.lastTerm = 0; ASSERT_EQ( ElectionOutcome::kElected, RaftElection::perform(votereq, *state(0), *lease(0), *contactDetails(0))); RaftStateSnapshotPtr snapshot0 = state(0)->getSnapshot(); ASSERT_EQ(snapshot0->term, 2); ASSERT_EQ(snapshot0->leader, myself(0)); ASSERT_EQ(snapshot0->status, RaftStatus::LEADER); } TEST_F(Raft_Election, split_votes_unsuccessful_election) { // this time both #1 and #2 have voted for themselves, should not be possible to // get a quorum poller(0); poller(1); poller(2); ASSERT_TRUE(state(0)->observed(2, {})); ASSERT_TRUE(state(0)->becomeCandidate(2)); ASSERT_TRUE(state(1)->observed(2, {})); ASSERT_TRUE(state(2)->observed(2, {})); ASSERT_TRUE(state(1)->grantVote(2, myself(1))); ASSERT_TRUE(state(2)->grantVote(2, myself(2))); RaftVoteRequest votereq; votereq.term = 2; votereq.lastIndex = 0; votereq.lastTerm = 0; ASSERT_EQ( ElectionOutcome::kNotElected, RaftElection::perform(votereq, *state(0), *lease(0), *contactDetails(0))); RaftStateSnapshotPtr snapshot0 = state(0)->getSnapshot(); ASSERT_EQ(snapshot0->term, 2); ASSERT_TRUE(snapshot0->leader.empty()); ASSERT_EQ(snapshot0->status, RaftStatus::CANDIDATE); } TEST_F(Raft_Director, achieve_natural_election) { // spin up the directors and pollers - this fully simulates a 3-node cluster spinup(0); spinup(1); spinup(2); std::vector snapshots; RETRY_ASSERT_TRUE(checkStateConsensusWithSnapshots(true, snapshots, 0, 1, 2)); // verify all have agreed on the same term ASSERT_EQ(snapshots[0]->term, snapshots[1]->term); ASSERT_EQ(snapshots[1]->term, snapshots[2]->term); // verify all have agreed on the same leader ASSERT_FALSE(snapshots[0]->leader.empty()); ASSERT_EQ(snapshots[0]->leader, snapshots[1]->leader); ASSERT_EQ(snapshots[1]->leader, snapshots[2]->leader); int leaderID = getServerID(snapshots[0]->leader); ASSERT_GE(leaderID, 0); ASSERT_LE(leaderID, 2); ASSERT_EQ(snapshots[leaderID]->status, RaftStatus::LEADER); for(int i = 0; i < 3; i++) { if(i != leaderID) { ASSERT_EQ(snapshots[i]->status, RaftStatus::FOLLOWER) << i; } } LogIndex startingReqIndex = journal(leaderID)->getLogSize(); // let's push a bunch of entries to the leader, and verify they get committed for(size_t i = 0; i < testreqs.size(); i++) { ASSERT_TRUE(journal(leaderID)->append(startingReqIndex+i, RaftEntry(snapshots[0]->term, testreqs[i]))); } LogIndex expectedIndex = startingReqIndex + testreqs.size() - 1; RETRY_ASSERT_EQ(journal(0)->getCommitIndex(), expectedIndex); RETRY_ASSERT_EQ(journal(1)->getCommitIndex(), expectedIndex); RETRY_ASSERT_EQ(journal(2)->getCommitIndex(), expectedIndex); // verify entries one by one, for all three journals for(size_t i = 0; i < testreqs.size(); i++) { for(size_t j = 0; j < 3; j++) { RaftEntry entry; ASSERT_TRUE(journal(j)->fetch(startingReqIndex+i, entry).ok()); ASSERT_EQ(entry.request, testreqs[i]); ASSERT_EQ(entry.term, snapshots[0]->term); } } } TEST_F(Raft_Director, late_arrival_in_established_cluster) { // spin up only two nodes spinup(0); spinup(1); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1)); // verify they reached consensus std::vector snapshots; snapshots.push_back(state(0)->getSnapshot()); snapshots.push_back(state(1)->getSnapshot()); ASSERT_EQ(snapshots[0]->term, snapshots[1]->term); ASSERT_FALSE(snapshots[0]->leader.empty()); ASSERT_EQ(snapshots[0]->leader, snapshots[1]->leader); // spin up node #2, make sure it joins the cluster and doesn't disrupt the current leader spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); RaftStateSnapshotPtr late_arrival = state(2)->getSnapshot(); ASSERT_EQ(late_arrival->term, snapshots[0]->term); ASSERT_EQ(late_arrival->leader, snapshots[1]->leader); } TEST_F(Raft_Director, late_consensus) { // at first, node #0 is all alone and should not be able to ascend spinup(0); std::this_thread::sleep_for(heartbeatTracker()->getTimeouts().getHigh()*2); // verify the node tried to ascend, and failed RaftStateSnapshotPtr snapshot = state(0)->getSnapshot(); ASSERT_EQ(snapshot->term, 0); ASSERT_TRUE(snapshot->leader.empty()); ASSERT_TRUE( (snapshot->status == RaftStatus::FOLLOWER) || (snapshot->status == RaftStatus::CANDIDATE) ); spinup(1); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1)); // verify consensus reached std::vector snapshots; snapshots.push_back(state(0)->getSnapshot()); snapshots.push_back(state(1)->getSnapshot()); ASSERT_EQ(snapshots[0]->term, snapshots[1]->term); ASSERT_FALSE(snapshots[0]->leader.empty()); ASSERT_EQ(snapshots[0]->leader, snapshots[1]->leader); // spin up node #2, ensure it doesn't disrupt current leader spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); RaftStateSnapshotPtr late_arrival = state(2)->getSnapshot(); ASSERT_EQ(late_arrival->term, snapshots[0]->term); ASSERT_EQ(late_arrival->leader, snapshots[0]->leader); ASSERT_EQ(late_arrival->status, RaftStatus::FOLLOWER); } TEST_F(Raft_Director, election_with_different_journals) { // start an election between #0 and #1 where #1 is guaranteed to win due // to more up-to-date journal ASSERT_TRUE(journal(1)->append(1, RaftEntry(0, "set", "asdf", "abc"))); spinup(0); spinup(1); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1)); RaftStateSnapshotPtr snapshot = state(0)->getSnapshot(); ASSERT_EQ(snapshot->leader, myself(1)); ASSERT_EQ(snapshot->status, RaftStatus::FOLLOWER); snapshot = state(1)->getSnapshot(); ASSERT_EQ(snapshot->leader, myself(1)); ASSERT_EQ(snapshot->status, RaftStatus::LEADER); } TEST_F(Raft_CommitTracker, basic_sanity) { std::vector members; members.push_back(myself(1)); members.push_back(myself(2)); commitTracker()->updateTargets(members); ASSERT_EQ(journal(0)->getCommitIndex(), 0); // populate #0's journal for(size_t i = 0; i < testreqs.size(); i++) { ASSERT_TRUE(journal(0)->append(i+1, RaftEntry(0, testreqs[i]))); } RaftMatchIndexTracker &matchIndex1 = commitTracker()->getHandler(myself(1)); RaftMatchIndexTracker &matchIndex2 = commitTracker()->getHandler(myself(2)); matchIndex1.update(1); ASSERT_EQ(journal(0)->getCommitIndex(), 1); matchIndex1.update(0); ASSERT_EQ(journal(0)->getCommitIndex(), 1); matchIndex2.update(1); ASSERT_EQ(journal(0)->getCommitIndex(), 1); matchIndex2.update(2); ASSERT_EQ(journal(0)->getCommitIndex(), 2); matchIndex1.update(3); ASSERT_EQ(journal(0)->getCommitIndex(), 3); members.emplace_back("random", 123); members.emplace_back("random", 234); commitTracker()->updateTargets(members); matchIndex1.update(4); ASSERT_EQ(journal(0)->getCommitIndex(), 3); matchIndex2.update(4); ASSERT_EQ(journal(0)->getCommitIndex(), 4); matchIndex1.update(10); ASSERT_EQ(journal(0)->getCommitIndex(), 4); RaftMatchIndexTracker &matchIndex3 = commitTracker()->getHandler({"random", 123}); matchIndex3.update(15); // now we have 10, 4, 15 ASSERT_EQ(journal(0)->getCommitIndex(), 10); matchIndex2.update(11); // now we have 10, 11, 15 ASSERT_EQ(journal(0)->getCommitIndex(), 11); matchIndex1.update(16); // now we have 16, 11, 15 ASSERT_EQ(journal(0)->getCommitIndex(), 15); } TEST_F(Raft_CommitTracker, AutoCommit) { std::vector members; commitTracker()->updateTargets(members); ASSERT_EQ(journal()->getCommitIndex(), 0); // Ensure commitIndex is auto-updated for(size_t i = 0; i < testreqs.size(); i++) { ASSERT_TRUE(journal()->append(i+1, RaftEntry(0, testreqs[i]))); RETRY_ASSERT_EQ(journal()->getCommitIndex(), (int) i+1); } } TEST(RaftMembers, basic_sanity) { std::vector nodes = { {"server1", 245}, {"localhost", 789}, {"server2.cern.ch", 1789} }; std::vector observers = { {"observer1", 1234}, {"observer2", 789}, {"observer3.cern.ch", 111} }; RaftMembers members(nodes, observers); ASSERT_EQ(members.nodes, nodes); ASSERT_EQ(members.observers, observers); RaftMembers members2(members.toString()); ASSERT_EQ(members, members2); ASSERT_EQ(members.nodes, members2.nodes); ASSERT_EQ(members.observers, members2.observers); ASSERT_EQ(members.toString(), members2.toString()); } TEST(RaftMembers, no_observers) { std::vector nodes = { {"server1", 245}, {"localhost", 789}, {"server2.cern.ch", 1789} }; std::vector observers; RaftMembers members(nodes, observers); ASSERT_EQ(members.nodes, nodes); ASSERT_EQ(members.observers, observers); RaftMembers members2(members.toString()); ASSERT_EQ(members, members2); ASSERT_EQ(members.nodes, members2.nodes); ASSERT_EQ(members.observers, members2.observers); ASSERT_EQ(members.toString(), members2.toString()); } TEST(RaftMembers, DemoteToObserver) { std::vector nodes = { {"server1", 245}, {"localhost", 789}, {"server2.cern.ch", 1789} }; std::vector observers; RaftMembers members(nodes, observers); ASSERT_EQ(members.nodes, nodes); ASSERT_EQ(members.observers, observers); std::string err; ASSERT_FALSE(members.demoteToObserver(RaftServer("localhost", 788), err)); ASSERT_EQ(err, "localhost:788 is not a full member."); ASSERT_TRUE(members.demoteToObserver(RaftServer("localhost", 789), err)); nodes = { {"server1", 245}, {"server2.cern.ch", 1789} }; observers = { {"localhost", 789}, }; ASSERT_EQ(members.nodes, nodes); ASSERT_EQ(members.observers, observers); } TEST(Raft_BlockedWrites, basic_sanity) { RaftBlockedWrites blockedWrites; std::shared_ptr q1 { new PendingQueue(nullptr) }; std::shared_ptr q2 { new PendingQueue(nullptr) }; std::shared_ptr q3 { new PendingQueue(nullptr) }; std::shared_ptr q4 { new PendingQueue(nullptr) }; blockedWrites.insert(1, q1); blockedWrites.insert(2, q2); blockedWrites.insert(3, q3); blockedWrites.insert(4, q4); ASSERT_EQ(blockedWrites.size(), 4u); ASSERT_EQ(blockedWrites.popIndex(6), nullptr); ASSERT_EQ(blockedWrites.popIndex(1), q1); ASSERT_EQ(blockedWrites.popIndex(1), nullptr); ASSERT_EQ(blockedWrites.size(), 3u); blockedWrites.insert(5, q1); ASSERT_EQ(blockedWrites.size(), 4u); ASSERT_EQ(blockedWrites.popIndex(2), q2); ASSERT_EQ(blockedWrites.popIndex(3), q3); ASSERT_EQ(blockedWrites.size(), 2u); ASSERT_EQ(blockedWrites.popIndex(5), q1); ASSERT_EQ(blockedWrites.popIndex(4), q4); ASSERT_EQ(blockedWrites.size(), 0u); } TEST_F(Raft_JournalIterator, basic_sanity) { for(size_t i = 0; i < testreqs.size(); i++) { RaftEntry tempEntry; tempEntry.term = 0; tempEntry.request = testreqs[i]; ASSERT_TRUE(journal(0)->append(i+1, tempEntry)) << i; } RaftJournal::Iterator it = journal(0)->getIterator(1, true); ASSERT_TRUE(it.valid()); for(size_t i = 0; i < testreqs.size(); i++) { ASSERT_TRUE(it.valid()); std::string tmp; it.current(tmp); RaftEntry entry; RaftEntry::deserialize(entry, tmp); ASSERT_EQ(entry.term, 0); ASSERT_EQ(entry.request, testreqs[i]); it.next(); } ASSERT_FALSE(it.valid()); } TEST(RaftEntry, parsing) { RaftEntry entry; entry.term = 13; entry.request = make_req("set", "abc", "123"); RaftSerializedEntry serialized = entry.serialize(); ASSERT_EQ(RaftEntry::fetchTerm(serialized), 13); } TEST(RaftHeartbeatTracker, BasicSanity) { RaftHeartbeatTracker tracker(defaultTimeouts); std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); ASSERT_EQ(tracker.getLastHeartbeat(), std::chrono::steady_clock::time_point()); tracker.heartbeat(now); ASSERT_EQ(tracker.getTimeouts(), defaultTimeouts); std::chrono::milliseconds timeout = tracker.getRandomTimeout(); ASSERT_GE(timeout, defaultTimeouts.getLow()); ASSERT_LE(timeout, defaultTimeouts.getHigh()); ASSERT_EQ(TimeoutStatus::kNo, tracker.timeout(now + timeout - std::chrono::milliseconds(1))); ASSERT_EQ(TimeoutStatus::kNo, tracker.timeout(now+timeout)); ASSERT_EQ(TimeoutStatus::kYes, tracker.timeout(now+timeout+std::chrono::milliseconds(1))); tracker.heartbeat(now - std::chrono::milliseconds(1)); ASSERT_EQ(TimeoutStatus::kNo, tracker.timeout(now + timeout - std::chrono::milliseconds(1))); ASSERT_EQ(TimeoutStatus::kNo, tracker.timeout(now+timeout)); ASSERT_EQ(TimeoutStatus::kYes, tracker.timeout(now+timeout+std::chrono::milliseconds(1))); tracker.heartbeat(now + std::chrono::milliseconds(1)); ASSERT_EQ(TimeoutStatus::kNo, tracker.timeout(now + timeout - std::chrono::milliseconds(1))); ASSERT_EQ(TimeoutStatus::kNo, tracker.timeout(now+timeout)); ASSERT_EQ(TimeoutStatus::kNo, tracker.timeout(now+timeout+std::chrono::milliseconds(1))); } TEST(RaftVoteRequest, Describe) { RaftVoteRequest voteReq; voteReq.candidate = RaftServer("localhost", 1234); voteReq.term = 777; voteReq.lastIndex = 999; voteReq.lastTerm = 555; ASSERT_EQ("vote request [candidate=localhost:1234, term=777, lastIndex=999, lastTerm=555]", voteReq.describe(false)); ASSERT_EQ("pre-vote request [candidate=localhost:1234, term=777, lastIndex=999, lastTerm=555]", voteReq.describe(true)); } TEST(RaftVoteRegistry, DoubleVote) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); ASSERT_THROW(registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::REFUSED)), FatalException); } TEST(RaftVoteRegistry, OneForOneAgainst) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerVote(RaftServer("localhost", 7778), RaftVoteResponse(1, RaftVote::REFUSED)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 1u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 1u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kElected); } TEST(RaftVoteRegistry, OneForOneVeto) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerVote(RaftServer("localhost", 7778), RaftVoteResponse(1, RaftVote::VETO)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 1u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 1u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kVetoed); ASSERT_EQ(registry.describeOutcome(), "Election round unsuccessful for term 1. Contacted 2 nodes, received 2 replies with a tally of 1 positive votes, 0 refused votes, and 1 vetoes."); } TEST(RaftVoteRegistry, OneForOneNetErr) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerNetworkError(RaftServer("localhost", 7778)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 1u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 1u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kElected); } TEST(RaftVoteRegistry, OneForOneParseErr) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerParseError(RaftServer("localhost", 7778)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 1u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 1u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kElected); } TEST(RaftVoteRegistry, ParsingError) { RaftVoteRegistry registry(1, false); registry.registerParseError(RaftServer("localhost", 7777)); registry.registerParseError(RaftServer("localhost", 7778)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 0u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 2u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kNotElected); } TEST(RaftVoteRegistry, PreVoteParsingError) { RaftVoteRegistry registry(1, true); registry.registerParseError(RaftServer("localhost", 7777)); registry.registerParseError(RaftServer("localhost", 7778)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 0u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 2u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kElected); ASSERT_EQ(registry.describeOutcome(), "Pre-vote round successful for term 1. Contacted 2 nodes, received 0 replies with a tally of 0 positive votes, 0 refused votes, and 0 vetoes."); } TEST(RaftVoteRegistry, TwoAgainst) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::REFUSED)); registry.registerVote(RaftServer("localhost", 7778), RaftVoteResponse(1, RaftVote::REFUSED)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 0u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 2u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kNotElected); } TEST(RaftVoteRegistry, TwoVetoes) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::VETO)); registry.registerVote(RaftServer("localhost", 7778), RaftVoteResponse(1, RaftVote::VETO)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 0u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 2u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kVetoed); } TEST(RaftVoteRegistry, TwoFor) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerVote(RaftServer("localhost", 7778), RaftVoteResponse(1, RaftVote::GRANTED)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 2u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kElected); ASSERT_EQ(registry.describeOutcome(), "Election round successful for term 1. Contacted 2 nodes, received 2 replies with a tally of 2 positive votes, 0 refused votes, and 0 vetoes."); } TEST(RaftVoteRegistry, OneFor) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 1u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 0u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kElected); } TEST(RaftVoteRegistry, OneAgainst) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::REFUSED)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 0u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 1u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kNotElected); } TEST(RaftVoteRegistry, TwoForOneAgainst) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerVote(RaftServer("localhost", 7778), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerVote(RaftServer("localhost", 7780), RaftVoteResponse(1, RaftVote::REFUSED)); registry.registerVote(RaftServer("localhost", 7781), RaftVoteResponse(1, RaftVote::REFUSED)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 2u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 2u); ASSERT_EQ(registry.count(RaftVote::VETO), 0u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kElected); } TEST(RaftVoteRegistry, TwoForOneVeto) { RaftVoteRegistry registry(1, false); registry.registerVote(RaftServer("localhost", 7777), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerVote(RaftServer("localhost", 7778), RaftVoteResponse(1, RaftVote::GRANTED)); registry.registerVote(RaftServer("localhost", 7780), RaftVoteResponse(1, RaftVote::REFUSED)); registry.registerVote(RaftServer("localhost", 7781), RaftVoteResponse(1, RaftVote::VETO)); ASSERT_EQ(registry.count(RaftVote::GRANTED), 2u); ASSERT_EQ(registry.count(RaftVote::REFUSED), 1u); ASSERT_EQ(registry.count(RaftVote::VETO), 1u); ASSERT_EQ(registry.countNetworkError(), 0u); ASSERT_EQ(registry.countParseError(), 0u); ASSERT_EQ(registry.determineOutcome(), ElectionOutcome::kVetoed); }