// ---------------------------------------------------------------------- // File: raft-state.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftState.hh" #include "raft/RaftJournal.hh" #include using namespace quarkdb; #define ASSERT_OK(msg) ASSERT_TRUE(msg.ok()) class Raft_State : public ::testing::Test { protected: virtual void SetUp() { nodes.emplace_back("server1", 7776); nodes.emplace_back("server2", 7777); nodes.emplace_back("server3", 7778); RaftJournal::ObliterateAndReinitializeJournal(dbpath, clusterID, nodes, 0, FsyncPolicy::kAsync); } virtual void TearDown() { } std::vector nodes; std::vector observers; std::string dbpath = "/tmp/raft-journal"; RaftClusterID clusterID = "55cd595d-7306-4971-b92c-4b9ba5930d40"; RedisRequest req; RedisRequest req2; RaftServer srv; RaftTerm term; RaftServer myself = {"server2", 7777}; }; TEST_F(Raft_State, T1) { { RaftJournal journal(dbpath); RaftState state(journal, myself); ASSERT_EQ(state.getSnapshot()->term, 0); ASSERT_TRUE(state.observed(1, {})); ASSERT_FALSE(state.observed(0, {})); ASSERT_EQ(myself, state.getMyself()); RaftStateSnapshot snapshot = {1, RaftStatus::FOLLOWER, {}, {}, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_FALSE(state.observed(0, {"server1", 1234})); ASSERT_TRUE(state.observed(1, {"server1", 1234})); ASSERT_FALSE(state.observed(1, {"server1", 1234})); ASSERT_FALSE(state.observed(1, {"server1", 321})); // i've already recognized a leader ASSERT_FALSE(state.grantVote(1, {"server1", 7776} )); ASSERT_FALSE(state.grantVote(1, {"server2", 7778} )); ASSERT_TRUE(state.observed(2, {})); ASSERT_TRUE(state.grantVote(2, {"server1", 7778})); // cannot vote again ASSERT_FALSE(state.grantVote(2, {"server2", 7778})); ASSERT_FALSE(state.ascend(2)); // not a candidate, plus have recognized leader ASSERT_TRUE(state.observed(3, RaftServer{})); ASSERT_TRUE(state.becomeCandidate(3)); snapshot = {3, RaftStatus::CANDIDATE, {}, myself, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); // drop out from candidacy, make sure I can't try again in the same term ASSERT_TRUE(state.dropOut(3)); snapshot = {3, RaftStatus::FOLLOWER, {}, myself, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_FALSE(state.becomeCandidate(3)); // observed new term, no longer a candidate ASSERT_TRUE(state.observed(4, {})); snapshot = {4, RaftStatus::FOLLOWER, {}, {}, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_TRUE(state.observed(4, nodes[0])); snapshot.leader = nodes[0]; snapshot.votedFor = RaftState::BLOCKED_VOTE; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_FALSE(state.becomeCandidate(4)); // already recognized a leader ASSERT_FALSE(state.becomeCandidate(3)); ASSERT_FALSE(state.becomeCandidate(5)); ASSERT_TRUE(state.observed(5, {})); ASSERT_TRUE(state.becomeCandidate(5)); ASSERT_TRUE(state.ascend(5)); snapshot = {5, RaftStatus::LEADER, myself, myself, 1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_FALSE(state.dropOut(5)); ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_TRUE(state.observed(6, nodes[0])); snapshot = {6, RaftStatus::FOLLOWER, nodes[0], RaftState::BLOCKED_VOTE, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_NE(state.getSnapshot()->status, RaftStatus::SHUTDOWN); state.shutdown(); ASSERT_FALSE(state.observed(200, nodes[0])); ASSERT_EQ(state.getSnapshot()->status, RaftStatus::SHUTDOWN); } { RaftJournal journal(dbpath); RaftState state(journal, myself); RaftStateSnapshot snapshot = {6, RaftStatus::FOLLOWER, {}, RaftState::BLOCKED_VOTE, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); // let's erase ourselves from the cluster.. nodes.erase(nodes.begin()+1); std::string err; ASSERT_TRUE(journal.removeMember(6, myself, err)); ASSERT_TRUE(journal.setCommitIndex(2)); snapshot = {6, RaftStatus::FOLLOWER, {}, RaftState::BLOCKED_VOTE, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_TRUE(state.observed(6, nodes[0])); snapshot = {6, RaftStatus::FOLLOWER, nodes[0], RaftState::BLOCKED_VOTE, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_TRUE(state.observed(7, {})); snapshot = {7, RaftStatus::FOLLOWER, {}, {}, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); // cannot become candidate, not part of the cluster ASSERT_FALSE(state.becomeCandidate(7)); ASSERT_FALSE(state.ascend(7)); // re-enter the cluster as an observer nodes.push_back(myself); ASSERT_TRUE(journal.addObserver(7, myself, err)); ASSERT_TRUE(journal.setCommitIndex(3)); // still cannot call election, not a full node ASSERT_FALSE(state.becomeCandidate(7)); ASSERT_FALSE(state.ascend(7)); // become full-node ASSERT_TRUE(journal.promoteObserver(7, myself, err)); ASSERT_TRUE(journal.setCommitIndex(4)); ASSERT_TRUE(state.observed(7, nodes[0])); snapshot = {7, RaftStatus::FOLLOWER, nodes[0], RaftState::BLOCKED_VOTE, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); // push two changes to the log // mark the first as applied, the other as committed ASSERT_TRUE(journal.append(5, RaftEntry(7, "set", "qwerty", "asdf"))); ASSERT_TRUE(journal.append(6, RaftEntry(7, "set", "1234", "9876"))); ASSERT_TRUE(journal.setCommitIndex(4)); ASSERT_FALSE(journal.setCommitIndex(0)); ASSERT_THROW(journal.setCommitIndex(7), FatalException); ASSERT_TRUE(journal.setCommitIndex(6)); // exit again.. ASSERT_TRUE(journal.removeMember(7, nodes[2], err)); nodes.erase(nodes.begin()+2); snapshot = {7, RaftStatus::FOLLOWER, nodes[0], RaftState::BLOCKED_VOTE, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); } { RaftJournal journal(dbpath); RaftState state(journal, myself); RaftStateSnapshot snapshot = {7, RaftStatus::FOLLOWER, {}, RaftState::BLOCKED_VOTE, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); ASSERT_EQ(journal.getCurrentTerm(), 7); ASSERT_EQ(journal.getVotedFor(), RaftState::BLOCKED_VOTE); // verify we remember commit index ASSERT_EQ(journal.getCommitIndex(), 6); // re-enter cluster nodes.push_back(myself); ASSERT_TRUE(journal.setCommitIndex(7)); std::string err; ASSERT_TRUE(journal.addObserver(7, myself, err)); ASSERT_TRUE(journal.setCommitIndex(8)); ASSERT_TRUE(journal.promoteObserver(7, myself, err)); // become leader ASSERT_TRUE(state.observed(8, {})); ASSERT_TRUE(state.becomeCandidate(8)); ASSERT_TRUE(state.ascend(8)); snapshot = {8, RaftStatus::LEADER, myself, myself, 10}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); } { RaftJournal journal(dbpath); RaftState state(journal, myself); RaftStateSnapshot snapshot = {8, RaftStatus::FOLLOWER, {}, myself, -1}; ASSERT_TRUE(snapshot.equals(state.getSnapshot())); } }