// ----------------------------------------------------------------------
// File: replication.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "raft/RaftDispatcher.hh"
#include "raft/RaftReplicator.hh"
#include "raft/RaftTalker.hh"
#include "raft/RaftTimeouts.hh"
#include "raft/RaftCommitTracker.hh"
#include "raft/RaftReplicator.hh"
#include "raft/RaftConfig.hh"
#include "raft/RaftTrimmer.hh"
#include "utils/ParseUtils.hh"
#include "storage/ConsistencyScanner.hh"
#include "Configuration.hh"
#include "QuarkDBNode.hh"
#include "../test-utils.hh"
#include "RedisParser.hh"
#include
#include
#include
#include
#include
#include
#include "utils/AssistedThread.hh"
#include "../test-reply-macros.hh"
using namespace quarkdb;
class Replication : public TestCluster3NodesFixture {};
class CommunicatorTest : public TestCluster3NodesFixture {};
class Membership : public TestCluster5NodesFixture {};
class SingleNodeInitially : public TestCluster10Nodes1InitialFixture {};
TEST_F(Replication, ArtificiallySlowWrite) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("artificially-slow-write-never-use-this", "-1").get(),
"(error) ERR Invalid argument: value is not an integer or out of range");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("artificially-slow-write-never-use-this", "test").get(),
"(error) ERR Invalid argument: value is not an integer or out of range");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("artificially-slow-write-never-use-this", "1000").get(),
"OK");
}
TEST_F(Replication, entries_50k_with_follower_loss) {
Connection::setPhantomBatchLimit(1);
// let's get this party started
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// push lots of updates
const int64_t NENTRIES = 50000;
for(size_t i = 0; i < NENTRIES; i++) {
tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i));
}
int victimFollower = (leaderID+1)%3;
int activeFollower = (leaderID+2)%3;
// verify the leader has started replicating some of the entries already
RETRY_ASSERT_TRUE(journal(victimFollower)->getCommitIndex() > 5000);
// bring down one of the followers, ensure replication is not complete
spindown(victimFollower);
ASSERT_TRUE(journal(victimFollower)->getLogSize() < NENTRIES);
// ensure that eventually, the other follower gets all entries
RETRY_ASSERT_TRUE(journal(activeFollower)->getLogSize() >= NENTRIES+2);
ASSERT_TRUE(journal(leaderID)->getLogSize() >= NENTRIES+2);
RETRY_ASSERT_TRUE(stateMachine(activeFollower)->getLastApplied() >= NENTRIES+1);
RETRY_ASSERT_TRUE(stateMachine(leaderID)->getLastApplied() >= NENTRIES+1);
}
TEST_F(Replication, lease_expires_under_load) {
Connection::setPhantomBatchLimit(1);
// only nodes #0 and #1 are active
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
int leaderID = getLeaderID();
int followerID = (getLeaderID()+1) % 2;
// push lots of updates
const int64_t NENTRIES = 50000;
for(size_t i = 0; i < NENTRIES; i++) {
tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i));
}
// verify the leader has started replicating some of the entries already
RETRY_ASSERT_TRUE(journal(followerID)->getCommitIndex() > 5000);
// bring down one the follower, ensure replication is not complete
spindown(followerID);
ASSERT_TRUE(journal(followerID)->getLogSize() < NENTRIES);
// ensure the connection doesn't hang
std::future reply = tunnel(leaderID)->exec("ping");
tunnel(leaderID)->exec("set", "random evil intertwined", "write");
ASSERT_TRUE(reply.wait_for(std::chrono::seconds(25)) == std::future_status::ready);
ASSERT_REPLY(reply, "PONG");
ASSERT_REPLY(tunnel(leaderID)->exec("ping", "abc123"), "abc123");
}
TEST_F(Replication, node_has_committed_entries_no_one_else_has_ensure_it_vetoes) {
// node #0 has committed entries that no other node has. The node should
// veto any attempts of election, so that only itself can win this election.
ASSERT_TRUE(state(0)->observed(5, {}));
ASSERT_TRUE(state(1)->observed(5, {}));
ASSERT_TRUE(state(2)->observed(5, {}));
// add a few requests to the log
ASSERT_TRUE(journal()->append(1, RaftEntry(3, testreqs[0])));
ASSERT_TRUE(journal()->append(2, RaftEntry(4, testreqs[1])));
ASSERT_TRUE(journal()->append(3, RaftEntry(5, testreqs[2])));
// commit all of them
ASSERT_TRUE(journal()->setCommitIndex(3));
// Here, timeouts are really important, as the veto message must go through
// in time. Prepare the DBs before spinning up.
prepare(0); prepare(1); prepare(2);
// node #0 must win
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
ASSERT_EQ(state(0)->getSnapshot()->status, RaftStatus::LEADER);
}
TEST_F(Replication, connection_shuts_down_before_all_replies_arrive) {
Connection::setPhantomBatchLimit(1);
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
const int64_t NENTRIES = 10000;
for(size_t repeat = 0; repeat < 3; repeat++) {
// push lots of updates
for(size_t i = 0; i < NENTRIES; i++) {
tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i));
}
killTunnel(leaderID);
}
for(size_t i = 0; i < NENTRIES; i++) {
tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i));
}
tunnel(leaderID)->exec("ping").get();
ASSERT_TRUE(leaderID == getLeaderID());
ASSERT_TRUE(journal()->getCommitIndex() > NENTRIES);
// if we haven't crashed or gotten hung by now, we're grand
}
// blindly generate load, ignore any errors
static void generateLoad(qclient::QClient *qcl, std::string prefix, quarkdb::ThreadAssistant &assistant) {
int counter = 0;
while(!assistant.terminationRequested()) {
qcl->exec("set", SSTR(prefix << "-key-" << counter), SSTR(prefix << "value-" << counter));
counter++;
}
qdb_info("Stopping load generation towards '" << prefix << "', waiting on pending replies");
qcl->exec("ping").get();
qdb_info("Shutting down load generator towards '" << prefix << "'");
}
TEST_F(Replication, load_during_election) {
Connection::setPhantomBatchLimit(1);
// add backpressure to load-generating threads
qclient::Options opts0 = makeNoRedirectOptions();
opts0.backpressureStrategy = qclient::BackpressureStrategy::RateLimitPendingRequests(1024);
qclient::QClient qcl0(myself(0).hostname, myself(0).port, std::move(opts0));
qclient::Options opts1 = makeNoRedirectOptions();
opts1.backpressureStrategy = qclient::BackpressureStrategy::RateLimitPendingRequests(1024);
qclient::QClient qcl1(myself(1).hostname, myself(1).port, std::move(opts1));
qclient::Options opts2 = makeNoRedirectOptions();
opts2.backpressureStrategy = qclient::BackpressureStrategy::RateLimitPendingRequests(1024);
qclient::QClient qcl2(myself(2).hostname, myself(2).port, std::move(opts2));
// let's be extra evil and start generating load even before the nodes start up
quarkdb::AssistedThread t1(generateLoad, &qcl0, "node0");
quarkdb::AssistedThread t2(generateLoad, &qcl1, "node1");
quarkdb::AssistedThread t3(generateLoad, &qcl2, "node2");
// start the cluster
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
// terminate once we reach a decent number of writes
int leaderID = getLeaderID();
RETRY_ASSERT_TRUE(journal(leaderID)->getCommitIndex() > 20000);
ASSERT_TRUE(leaderID == getLeaderID());
t1.stop();
t2.stop();
t3.stop();
}
static void assert_linearizability(std::future &future, std::string expectedValue, std::atomic &responses, std::atomic &violations) {
redisReplyPtr reply = future.get();
if(reply->type != REDIS_REPLY_STRING) return;
responses++;
std::string receivedValue = std::string(reply->str, reply->len);
if(expectedValue != receivedValue) {
violations++;
qdb_critical("Linearizability violation. Received " << quotes(receivedValue) << ", expected: " << quotes(expectedValue));
}
}
// Given an endpoint, try to read a key again and again and again.
// If we get ERR or MOVED, no problem.
// If we get a response other than expectedValue, linearizability has been violated.
static void obsessiveReader(qclient::QClient *qcl, std::string key, std::string expectedValue, std::atomic &responses, std::atomic &violations, quarkdb::ThreadAssistant &assistant) {
std::list> futures;
qdb_info("Issuing a flood of reads for key " << quotes(key));
while(!assistant.terminationRequested()) {
futures.emplace_back(qcl->exec("get", key));
while(futures.size() >= 1000) {
assert_linearizability(futures.front(), expectedValue, responses, violations);
futures.pop_front();
}
}
while(futures.size() != 0) {
assert_linearizability(futures.front(), expectedValue, responses, violations);
futures.pop_front();
}
}
TEST_F(Replication, linearizability_during_transition) {
// start the cluster
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// activate full consistency scan every second, across every node
ASSERT_REPLY(tunnel(leaderID)->exec("config_set", ConsistencyScanner::kConfigurationKey, "1"), "OK");
std::vector> futures;
// Issue a bunch of writes, all towards the same key
const int nWrites = 10000;
for(size_t i = 0; i <= nWrites; i++) {
futures.push_back(tunnel(leaderID)->exec("set", "key", SSTR("value-" << i)));
}
// Receive responses
for(size_t i = 0; i < futures.size(); i++) {
ASSERT_REPLY(futures[i], "OK");
}
// our followers..
int node1 = (leaderID + 1) % 3;
int node2 = (leaderID + 2) % 3;
// start reading "key"
std::atomic responses(0), violations(0);
quarkdb::AssistedThread reader1(obsessiveReader, tunnel(node1), "key", SSTR("value-" << nWrites), std::ref(responses), std::ref(violations));
quarkdb::AssistedThread reader2(obsessiveReader, tunnel(node2), "key", SSTR("value-" << nWrites), std::ref(responses), std::ref(violations));
RaftTerm firstTerm = state(leaderID)->getSnapshot()->term;
// stop the leader
spindown(leaderID);
// Ensure failover happens..
RETRY_ASSERT_NE(state(node1)->getSnapshot()->term, firstTerm);
RETRY_ASSERT_TRUE(checkStateConsensus(node1, node2));
int newLeaderID = getLeaderID();
ASSERT_NE(leaderID, newLeaderID);
// Wait until we have 1k real responses (not errors or "moved")
RETRY_ASSERT_TRUE(responses >= 1000);
reader1.stop(); reader2.stop();
reader1.join(); reader2.join();
qdb_info("After " << responses << " reads, linearizability was violated " << violations << " times.");
ASSERT_EQ(violations, 0);
RETRY_ASSERT_TRUE(checkFullConsensus(node1, node2));
ASSERT_TRUE(crossCheckJournals(node1, node2));
}
void consumeListener(CommunicatorRequest &&req) {
int64_t round = 0;
ParseUtils::parseInt64(req.getContents().substr(6), round);
req.sendReply(round, SSTR("reply-" << round));
}
TEST_F(CommunicatorTest, ChaosTest) {
// start the cluster
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
qclient::Subscriber subscriber1(members(), reasonableSubscriptionOptions(true));
qclient::Subscriber subscriber2(members(), reasonableSubscriptionOptions(true));
qclient::Communicator communicator(&subscriber1, "comm-channel", nullptr, std::chrono::seconds(1),
std::chrono::minutes(5));
qclient::CommunicatorListener communicatorListener(&subscriber2, "comm-channel");
using namespace std::placeholders;
communicatorListener.attach(std::bind(&consumeListener, _1));
ClusterDestabilizer destabilizer(this);
std::vector> futReplies;
for(size_t round = 0; round < 1000; round++) {
futReplies.emplace_back(communicator.issue(SSTR("round-" << round)));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
for(size_t i = 0; i < futReplies.size(); i++) {
ASSERT_EQ(futReplies[i].wait_for(std::chrono::seconds(30)), std::future_status::ready) << i;
CommunicatorReply reply = futReplies[i].get();
ASSERT_EQ(reply.status, (int) i);
ASSERT_EQ(reply.contents, SSTR("reply-" << i));
}
}
TEST_F(Replication, several_transitions) {
Connection::setPhantomBatchLimit(1);
// Start load generation..
quarkdb::AssistedThread t1(generateLoad, tunnel(0), "node0");
quarkdb::AssistedThread t2(generateLoad, tunnel(1), "node1");
quarkdb::AssistedThread t3(generateLoad, tunnel(2), "node2");
// start the cluster
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
// Transition the leader several times, while the
// load generators are hammering all nodes at the
// same time. :)
for(size_t iteration = 0; iteration < 5; iteration++) {
std::this_thread::sleep_for(timeouts().getHigh() * 5);
int leaderID = getLeaderID();
int follower1 = (getLeaderID() + 1) % 3;
int follower2 = (getLeaderID() + 2) % 3;
RaftTerm oldTerm = state(leaderID)->getSnapshot()->term;
spindown(leaderID);
RETRY_ASSERT_TRUE(oldTerm < state(follower1)->getSnapshot()->term);
RETRY_ASSERT_TRUE(oldTerm < state(follower2)->getSnapshot()->term);
RETRY_ASSERT_TRUE(checkStateConsensus(follower1, follower2));
spinup(leaderID);
}
t1.stop(); t2.stop(); t3.stop();
t1.join(); t2.join(); t3.join();
RETRY_ASSERT_TRUE(checkFullConsensus(0, 1, 2));
ASSERT_TRUE(crossCheckJournals(0, 1, 2));
}
TEST_F(Replication, ArtificiallyObservedTerm) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term").get(),
"(error) ERR wrong number of arguments for 'raft-observe-term' command");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term", "aaa").get(),
"(error) ERR cannot parse term: aaa");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term", "-1").get(),
"(error) ERR cannot parse term: -1");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term", "1.1").get(),
"(error) ERR cannot parse term: 1.1");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term", "2").get(),
"(integer) 1");
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
leaderID = getLeaderID();
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term", "1").get(),
"(integer) 0");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term", "2").get(),
"(integer) 0");
ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("raft-observe-term", "100").get(),
"(integer) 1");
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
}
TEST_F(Replication, FollowerLaggingBy1m) {
Connection::setPhantomBatchLimit(1);
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
int leaderID = getLeaderID();
std::vector> futs;
for(size_t i = 0; i < 1'000'000; i++) {
futs.emplace_back(tunnel(leaderID)->exec("set", "abc", SSTR(i)));
}
for(size_t i = 0; i < futs.size(); i++) {
ASSERT_REPLY_DESCRIBE(futs[i].get(), "OK");
}
qdb_info("All writes processed, waiting until follower catches up...");
spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
// bringing the lagging follower up-to-date could take a while
RETRY_ASSERT_TRUE_20MIN(checkFullConsensus(0, 1, 2));
ASSERT_TRUE(crossCheckJournals(0, 1, 2));
}
TEST_F(Membership, prevent_promotion_of_outdated_observer) {
// Only start nodes 0, 1, 2 of a 5-node cluster
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// Remove #4 and #5
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(3).toString()), "OK");
RETRY_ASSERT_TRUE(journal(leaderID)->getEpoch() <= journal(leaderID)->getCommitIndex());
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(4).toString()), "OK");
RETRY_ASSERT_TRUE(journal(leaderID)->getEpoch() <= journal(leaderID)->getCommitIndex());
// turn off one of the active nodes - now only 2 out of 3 are active
int victim = (leaderID + 1) % 3;
spindown(victim);
// push lots of updates
std::vector> futures;
const int64_t NENTRIES = 50000;
for(size_t i = 0; i < NENTRIES; i++) {
futures.emplace_back(tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i)));
}
for(size_t i = 0; i < futures.size(); i++) {
ASSERT_REPLY(futures[i], "OK");
}
// add back #3 as observer, try to promote them right away, ensure it gets
// blocked.
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(4).toString()), "OK");
RETRY_ASSERT_TRUE(journal(leaderID)->getEpoch() <= journal(leaderID)->getCommitIndex());
// let #4 be brought up to date, then add it as observer
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_PROMOTE_OBSERVER", myself(4).toString()), "ERR membership update blocked, observer is not up-to-date");
spinup(4);
RETRY_ASSERT_EQ(journal(4)->getCommitIndex(), journal(leaderID)->getCommitIndex());
ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_PROMOTE_OBSERVER", myself(4).toString()), "OK");
}
TEST_F(Replication, no_committing_entries_from_previous_terms) {
// Try to emulate Figure 8 of the raft paper. Certain entries are
// present in a majority of replicas, but can still be removed as
// conflicting. Ensure the leader never marks them as committed!
state(0)->observed(2, {});
state(1)->observed(2, {});
state(2)->observed(2, {});
for(size_t i = 1; i < 50000; i++) {
RaftEntry entry;
entry.term = 2;
entry.request = make_req("set", SSTR("entry-" << i), SSTR("contents-" << i));
ASSERT_TRUE(journal(1)->append(i, entry));
}
state(0)->observed(100, {});
state(1)->observed(100, {});
state(2)->observed(100, {});
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
ASSERT_TRUE(getLeaderID() == 1);
// We have node #1 as leader for term > 100 - wait until it replicates
// some of the entries, then shut it down.
RETRY_ASSERT_TRUE(journal(0)->getLogSize() >= 100);
spindown(1); spindown(0);
// Ensure no entry got committed!
ASSERT_TRUE(journal(0)->getCommitIndex() == 0);
ASSERT_TRUE(journal(1)->getCommitIndex() == 0);
// Make things even more interesting by starting up node #2 which
// has more up-to-date log. :) Ensure it overwrites the rest.
state(2)->observed(2000, {});
RaftEntry entry;
entry.term = 2000;
entry.request = make_req("set", "one entry", "to rule them all");
ASSERT_TRUE(journal(2)->append(1, entry));
spinup(2); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(1, 2));
ASSERT_TRUE(state(1)->getSnapshot()->leader == myself(2));
LogIndex leadershipMarker = state(2)->getSnapshot()->leadershipMarker;
RETRY_ASSERT_EQ(journal(1)->getLogSize(), leadershipMarker+1);
RETRY_ASSERT_EQ(journal(1)->getCommitIndex(), leadershipMarker);
RETRY_ASSERT_EQ(journal(2)->getCommitIndex(), leadershipMarker);
// now start node #0, too, ensure its contents are overwritten as well
spinup(0);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
ASSERT_TRUE(state(0)->getSnapshot()->leader == myself(2));
RETRY_ASSERT_EQ(journal(0)->getLogSize(), leadershipMarker+1);
RETRY_ASSERT_EQ(journal(0)->getCommitIndex(), leadershipMarker);
}
TEST_F(Replication, TrimmingBlock) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
int leaderID = getLeaderID();
// Set journal trim config to ridiculously low values.
TrimmingConfig trimConfig { 2, 1 };
EncodedConfigChange configChange = raftconfig(leaderID)->setTrimmingConfig(trimConfig, true);
ASSERT_TRUE(configChange.error.empty());
ASSERT_REPLY(tunnel(leaderID)->execute(configChange.request), "OK");
// Ensure it took effect...
for(size_t i = 0; i < 100; i++) {
ASSERT_REPLY(tunnel(leaderID)->exec("set", SSTR("entry-" << i), SSTR("val-" << i)), "OK");
}
// Ensure everyone apart from the leader trimmed their journals.
// (The replicator might have blocked such aggressive trimming)
LogIndex logSize = journal(leaderID)->getLogSize();
RETRY_ASSERT_EQ(journal((leaderID+1) % 3)->getLogStart(), (logSize - 3));
RETRY_ASSERT_EQ(journal((leaderID+2) % 3)->getLogStart(), (logSize - 3));
// Put a trimming block in place for one of the followers.
RaftTrimmingBlock trimmingBlock(*trimmer((leaderID+1)%3), 0);
for(size_t i = 0; i < 100; i++) {
ASSERT_REPLY(tunnel(leaderID)->exec("set", SSTR("entry2-" << i), SSTR("val2-" << i)), "OK");
}
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_TRUE(journal((leaderID+1) % 3)->getLogStart() == (logSize - 3));
LogIndex newLogSize = journal(leaderID)->getLogSize();
RETRY_ASSERT_EQ(journal((leaderID+2) % 3)->getLogStart(), (newLogSize - 3));
// Lift block, ensure journal is trimmed
trimmingBlock.enforce(150);
RETRY_ASSERT_EQ(journal((leaderID+1) % 3)->getLogStart(), 149);
trimmingBlock.enforce(151);
RETRY_ASSERT_EQ(journal((leaderID+1) % 3)->getLogStart(), 150);
trimmingBlock.enforce(155);
RETRY_ASSERT_EQ(journal((leaderID+1) % 3)->getLogStart(), 154);
trimmingBlock.enforce(173);
RETRY_ASSERT_EQ(journal((leaderID+1) % 3)->getLogStart(), 172);
trimmingBlock.lift();
RETRY_ASSERT_EQ(journal((leaderID+1) % 3)->getLogStart(), (newLogSize - 3));
}
TEST_F(Replication, EnsureEntriesBeingReplicatedAreNotTrimmed) {
Connection::setPhantomBatchLimit(1);
spinup(0); spinup(1);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
int leaderID = getLeaderID();
std::vector> replies;
for(size_t i = 0; i < 10000; i++) {
replies.emplace_back(tunnel(leaderID)->exec("set", SSTR("entry-" << i), SSTR("contents-" << i)));
}
for(size_t i = 0; i < 10000; i++) {
ASSERT_REPLY(replies[i], "OK");
}
// Spin up node #2
spinup(2);
RETRY_ASSERT_TRUE(journal(2)->getLogSize() >= 10);
// Set an insane trimming limit.
TrimmingConfig trimConfig { 2, 1 };
EncodedConfigChange configChange = raftconfig(leaderID)->setTrimmingConfig(trimConfig, true);
ASSERT_TRUE(configChange.error.empty());
ASSERT_REPLY(tunnel(leaderID)->execute(configChange.request), "OK");
// Ensure the replicator doesn't blow up
while(journal(2)->getLogSize() != journal(leaderID)->getLogSize()) {
ASSERT_FALSE(trimmer(leaderID)->canTrimUntil(journal(2)->getLogSize()));
}
// Ensure contents are OK.
for(size_t i = 0; i < 10000; i++) {
std::string value;
ASSERT_TRUE(stateMachine(leaderID)->get(SSTR("entry-" << i), value).ok());
ASSERT_EQ(value, SSTR("contents-" << i));
}
// Ensure journals are eventually trimmed.
RETRY_ASSERT_EQ(journal(0)->getLogStart(), 10000);
RETRY_ASSERT_EQ(journal(1)->getLogStart(), 10000);
RETRY_ASSERT_EQ(journal(2)->getLogStart(), 10000);
ASSERT_TRUE(trimmer(0)->canTrimUntil(9999));
ASSERT_TRUE(trimmer(1)->canTrimUntil(9999));
ASSERT_TRUE(trimmer(2)->canTrimUntil(9999));
ASSERT_FALSE(trimmer(leaderID)->canTrimUntil(10001));
}
TEST_F(SingleNodeInitially, Multi) {
spinup(0);
RETRY_ASSERT_EQ(state(0)->getSnapshot()->status, RaftStatus::LEADER);
std::deque multi1;
multi1.emplace_back(qclient::EncodedRequest::make("set", "my-awesome-counter", "1"));
multi1.emplace_back(qclient::EncodedRequest::make("set", "other-counter", "12345"));
multi1.emplace_back(qclient::EncodedRequest::make("get", "other-counter"));
multi1.emplace_back(qclient::EncodedRequest::make("get", "my-awesome-counter"));
ASSERT_EQ(
qclient::describeRedisReply(tunnel(0)->execute(std::move(multi1)).get()),
"1) OK\n"
"2) OK\n"
"3) \"12345\"\n"
"4) \"1\"\n"
);
}
TEST_F(SingleNodeInitially, SingleNodeRaftMode) {
spinup(0);
RETRY_ASSERT_EQ(state(0)->getSnapshot()->status, RaftStatus::LEADER);
std::vector> replies;
for(size_t i = 0; i < 100; i++) {
replies.emplace_back(tunnel(0)->exec("set", SSTR("entry-" << i), SSTR("contents-" << i)));
}
for(size_t i = 0; i < 100; i++) {
replies.emplace_back(tunnel(0)->exec("get", SSTR("entry-" << i)));
}
for(size_t i = 0; i < 100; i++) {
ASSERT_REPLY(replies[i], "OK");
}
for(size_t i = 100; i < 200; i++) {
ASSERT_REPLY(replies[i], SSTR("contents-" << i-100));
}
}
TEST_F(SingleNodeInitially, BuildClusterFromSingle) {
spinup(0);
RETRY_ASSERT_EQ(state(0)->getSnapshot()->status, RaftStatus::LEADER);
std::vector> replies;
for(size_t i = 0; i < 100; i++) {
replies.emplace_back(tunnel(0)->exec("set", SSTR("entry-" << i), SSTR("contents-" << i)));
}
for(size_t i = 0; i < 100; i++) {
ASSERT_REPLY(replies[i], "OK");
}
// spinup node #1, add to cluster
spinup(1);
ASSERT_REPLY(tunnel(0)->exec("RAFT_ADD_OBSERVER", myself(1).toString()), "OK");
RETRY_ASSERT_TRUE(journal(0)->getEpoch() <= journal(0)->getCommitIndex());
// promote
RETRY_ASSERT_TRUE(
qclient::describeRedisReply(tunnel(0)->exec("RAFT_PROMOTE_OBSERVER", myself(1).toString()).get())
== "OK"
);
// More writes
replies.clear();
for(size_t i = 100; i < 200; i++) {
replies.emplace_back(tunnel(0)->exec("set", SSTR("entry-" << i), SSTR("contents-" << i)));
}
for(size_t i = 0; i < 100; i++) {
ASSERT_REPLY(replies[i], "OK");
}
// Switch over leadership to #1
while(true) {
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1));
int leaderID = getLeaderID();
if(leaderID == 1) break;
tunnel(1)->exec("raft-attempt-coup").get();
std::this_thread::sleep_for(timeouts().getLow());
}
// commit a write
ASSERT_REPLY(tunnel(1)->exec("set", "aaa", "123"), "OK");
// get a previous read
ASSERT_REPLY(tunnel(1)->exec("get", "entry-100"), "contents-100");
// remove #0
ASSERT_REPLY(tunnel(1)->exec("raft-remove-member", myself(0).toString()), "OK");
// commit one more write
ASSERT_REPLY(tunnel(1)->exec("set", "bbb", "123"), "OK");
// read
ASSERT_REPLY(tunnel(1)->exec("get", "entry-5"), "contents-5");
}