// ---------------------------------------------------------------------- // File: raft-journal.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftJournal.hh" #include "test-utils.hh" #include using namespace quarkdb; #define ASSERT_OK(msg) ASSERT_TRUE(msg.ok()) #define ASSERT_NOTFOUND(msg) ASSERT_TRUE(msg.IsNotFound()) class Raft_Journal : public ::testing::Test { protected: virtual void SetUp() { nodes.emplace_back("server1", 7776); nodes.emplace_back("server2", 7777); nodes.emplace_back("server3", 7778); RaftJournal::ObliterateAndReinitializeJournal(dbpath, clusterID, nodes, 0, FsyncPolicy::kAsync); } virtual void TearDown() { } std::vector nodes; std::vector observers; std::string dbpath = "/tmp/raft-journal"; RaftClusterID clusterID = "55cd595d-7306-4971-b92c-4b9ba5930d40"; RaftEntry entry1, entry2; RedisRequest req; RedisRequest req2; RaftServer srv; RaftTerm term; }; TEST_F(Raft_Journal, T1) { { RaftJournal journal(dbpath); srv = { "localhost", 1234 }; ASSERT_EQ(journal.getCurrentTerm(), 0); ASSERT_EQ(journal.getLogSize(), 1); ASSERT_EQ(journal.getClusterID(), clusterID); ASSERT_EQ(journal.getCommitIndex(), 0); ASSERT_TRUE(journal.setCurrentTerm(2, srv)); ASSERT_EQ(journal.getCurrentTerm(), 2); ASSERT_FALSE(journal.setCurrentTerm(1, srv)); srv = {"server2", 2345 }; ASSERT_FALSE(journal.setCurrentTerm(2, srv)); srv = { "", 0 }; ASSERT_TRUE(journal.setCurrentTerm(3, srv)); srv = { "server3", 89 }; ASSERT_TRUE(journal.setCurrentTerm(3, srv)); srv = { "server4", 89 }; ASSERT_FALSE(journal.setCurrentTerm(3, srv)); ASSERT_FALSE(journal.setCurrentTerm(2, srv)); ASSERT_EQ(journal.getNodes(), nodes); entry1 = RaftEntry(2, "set", "abc", "123"); ASSERT_TRUE(journal.append(1, entry1)); ASSERT_OK(journal.fetch(1, entry2)); ASSERT_EQ(entry1, entry2); ASSERT_TRUE(journal.matchEntries(1, 2)); ASSERT_THROW(journal.setCommitIndex(2), FatalException); entry1 = RaftEntry(4, "set", "qwerty", "asdf"); ASSERT_FALSE(journal.append(2, entry1)); entry1.term = 2; ASSERT_TRUE(journal.append(2, entry1)); ASSERT_TRUE(journal.matchEntries(2, 2)); journal.setCommitIndex(2); entry1 = RaftEntry(1, "set", "123", "456"); req = entry1.request; ASSERT_FALSE(journal.append(3, entry1)); ASSERT_TRUE(journal.setCurrentTerm(4, srv)); entry1.term = 4; ASSERT_TRUE(journal.append(3, entry1)); ASSERT_TRUE(journal.matchEntries(3, 4)); ASSERT_EQ(journal.getLogSize(), 4); ASSERT_EQ(journal.getMembership().observers, observers); } { RaftJournal journal(dbpath); ASSERT_EQ(journal.getCommitIndex(), 2); ASSERT_EQ(journal.getLogSize(), 4); ASSERT_EQ(journal.getNodes(), nodes); ASSERT_EQ(journal.getCurrentTerm(), 4); ASSERT_EQ(journal.getClusterID(), clusterID); ASSERT_EQ(journal.getVotedFor(), srv); journal.fetch_or_die(3, entry1); ASSERT_EQ(entry1.term, 4); ASSERT_EQ(entry1.request, req); entry1.term = 3; ASSERT_FALSE(journal.append(4, entry1)); ASSERT_EQ(journal.getLogSize(), 4); // try to remove committed entry ASSERT_THROW(journal.removeEntries(2), FatalException); ASSERT_FALSE(journal.removeEntries(5)); ASSERT_TRUE(journal.removeEntries(3)); ASSERT_FALSE(journal.removeEntries(3)); ASSERT_EQ(journal.getLogSize(), 3); ASSERT_NOTFOUND(journal.fetch(3, entry2)); ASSERT_NOTFOUND(journal.fetch(4, entry2)); req = { "set", "qwerty", "asdf" }; journal.fetch_or_die(2, entry2); ASSERT_EQ(entry2.term, 2); ASSERT_EQ(entry2.request, req); for(size_t i = 0; i < testreqs.size(); i++) { RaftEntry tempEntry; tempEntry.term = 4; tempEntry.request = testreqs[i]; ASSERT_TRUE(journal.append(3+i, tempEntry)) << i; } ASSERT_THROW(journal.trimUntil(4), FatalException); // commit index is 2 ASSERT_THROW(journal.trimUntil(3), FatalException); journal.trimUntil(2); journal.trimUntil(2); // no op ASSERT_EQ(journal.getLogStart(), 2); RaftEntry tmp; ASSERT_NOTFOUND(journal.fetch(0, tmp)); ASSERT_NOTFOUND(journal.fetch(1, tmp)); ASSERT_OK(journal.fetch(2, tmp)); journal.setCommitIndex(3); journal.trimUntil(3); ASSERT_NOTFOUND(journal.fetch(2, tmp)); ASSERT_OK(journal.fetch(3, tmp)); ASSERT_THROW(journal.trimUntil(4), FatalException); // commit index is 3 RaftJournal::Iterator it = journal.getIterator(2, true); ASSERT_FALSE(it.valid()); it = journal.getIterator(2, false); ASSERT_TRUE(it.valid()); ASSERT_EQ(it.getCurrentIndex(), 3); journal.setCommitIndex(6); journal.trimUntil(5); ASSERT_NOTFOUND(journal.fetch(4, tmp)); ASSERT_EQ(journal.getLogStart(), 5); LogIndex size = 3+testreqs.size(); ASSERT_EQ(journal.getLogSize(), size); // add and remove observers std::string err; observers.emplace_back("observer1", 123); ASSERT_TRUE(journal.addObserver(4, observers[0], err)); ASSERT_EQ(journal.getMembership().observers, observers); observers.emplace_back("observer2", 345); // previous membership change not committed yet, cannot add another ASSERT_FALSE(journal.addObserver(4, observers[1], err)); ASSERT_EQ(journal.getMembership().observers.size(), 1u); ASSERT_EQ(journal.getMembership().observers[0], observers[0]); ASSERT_EQ(journal.getLogSize(), size+1); ASSERT_OK(journal.fetch(size, tmp)); ASSERT_EQ(tmp.request[0], "JOURNAL_UPDATE_MEMBERS"); ASSERT_TRUE(journal.setCommitIndex(size)); // try to add the same observer again ASSERT_FALSE(journal.addObserver(4, observers[0], err)); // try to add an existing node as observer ASSERT_FALSE(journal.addObserver(4, nodes[1], err)); // try to remove non-existing nodes ASSERT_FALSE(journal.removeMember(4, observers[1], err)); ASSERT_FALSE(journal.removeMember(4, RaftServer("asdfad", 13), err)); // ASSERT_FALSE(journal.removeObserver(4, nodes[2], err)); // try to add an observer again, after committing the previous membership epoch ASSERT_TRUE(journal.addObserver(4, observers[1], err)); ASSERT_EQ(journal.getMembership().observers, observers); // roll-back previous membership change ASSERT_TRUE(journal.removeEntries(size+1)); ASSERT_EQ(journal.getMembership().observers.size(), 1u); ASSERT_EQ(journal.getMembership().observers[0], observers[0]); // add it again, and commit ASSERT_TRUE(journal.addObserver(4, observers[1], err)); ASSERT_EQ(journal.getMembership().observers, observers); ASSERT_TRUE(journal.setCommitIndex(size+1)); ASSERT_EQ(journal.getMembership().observers, observers); ASSERT_TRUE(journal.removeMember(4, observers[0], err)); observers.erase(observers.begin()); ASSERT_EQ(journal.getMembership().observers, observers); ASSERT_TRUE(journal.setCommitIndex(size+2)); ASSERT_TRUE(journal.removeMember(4, observers[0], err)); ASSERT_TRUE(journal.getMembership().observers.empty()); // roll-back ASSERT_TRUE(journal.removeEntries(size+3)); ASSERT_EQ(journal.getMembership().observers, observers); // push a regular entry, just because entry1 = RaftEntry(4, "set", "regular_entry", "just_because"); ASSERT_TRUE(journal.append(size+3, entry1)); // remove the last observer again ASSERT_TRUE(journal.removeMember(4, observers[0], err)); ASSERT_TRUE(journal.getMembership().observers.empty()); ASSERT_TRUE(journal.setCommitIndex(size+4)); ASSERT_TRUE(journal.getMembership().observers.empty()); // add two observers, promote the first ASSERT_TRUE(journal.addObserver(4, observers[0], err)); ASSERT_TRUE(journal.setCommitIndex(size+5)); observers.emplace_back("observer3", 789); ASSERT_TRUE(journal.addObserver(4, observers[1], err)); ASSERT_TRUE(journal.setCommitIndex(size+6)); ASSERT_EQ(journal.getMembership().observers, observers); observers.emplace_back("observer4", 1111); ASSERT_FALSE(journal.promoteObserver(4, observers[2], err)); ASSERT_TRUE(journal.promoteObserver(4, observers[1], err)); ASSERT_EQ(journal.getMembership().observers.size(), 1u); ASSERT_EQ(journal.getMembership().nodes.size(), 4u); ASSERT_EQ(journal.getMembership().observers[0], observers[0]); ASSERT_EQ(journal.getMembership().nodes[3], observers[1]); ASSERT_TRUE(journal.setCommitIndex(size+7)); ASSERT_TRUE(journal.removeMember(4, nodes[0], err)); ASSERT_EQ(journal.getMembership().nodes.size(), 3u); ASSERT_EQ(journal.getMembership().nodes[0], nodes[1]); ASSERT_TRUE(journal.setCommitIndex(size+8)); // try to update members by appending a journal entry which has a wrong clusterID // verify it is ignored RaftMembership originalMembership = journal.getMembership(); nodes = originalMembership.nodes; observers = originalMembership.observers; observers.emplace_back("some_node", 567); ASSERT_TRUE(journal.append(journal.getLogSize(), RaftEntry(4, "JOURNAL_UPDATE_MEMBERS", RaftMembers(nodes, observers).toString(), "wrong_cluster_id"))); ASSERT_EQ(journal.getMembership(), originalMembership); } } TEST(FsyncPolicy, Parsing) { FsyncPolicy policy; ASSERT_TRUE(parseFsyncPolicy("async", policy)); ASSERT_EQ(policy, FsyncPolicy::kAsync); ASSERT_TRUE(parseFsyncPolicy("always", policy)); ASSERT_EQ(policy, FsyncPolicy::kAlways); ASSERT_TRUE(parseFsyncPolicy("sync-important-updates", policy)); ASSERT_EQ(policy, FsyncPolicy::kSyncImportantUpdates); ASSERT_FALSE(parseFsyncPolicy("aaaa", policy)); ASSERT_FALSE(parseFsyncPolicy("ALWAYS", policy)); }