// ---------------------------------------------------------------------- // File: resilvering.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "../test-reply-macros.hh" #include "../test-utils.hh" #include "raft/RaftConfig.hh" #include "ShardDirectory.hh" #include "StateMachine.hh" #include "Connection.hh" #include "raft/RaftResilverer.hh" #include "raft/RaftJournal.hh" using namespace quarkdb; class Trimming : public TestCluster3NodesFixture {}; class Resilvering : public TestCluster3NodesFixture {}; #define ASSERT_NOTFOUND(msg) ASSERT_TRUE(msg.IsNotFound()) TEST_F(Trimming, configurable_trimming_limit) { Connection::setPhantomBatchLimit(1); spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::vector> futures; // push lots of updates const int64_t NENTRIES = 500; for(size_t i = 0; i < NENTRIES; i++) { futures.emplace_back(tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i))); } // Set journal trim config to ridiculously low values. // This is to ensure the trimmer never tries to remove non-committed or non-applied entries. // With a sane trim limit in the millions, this would never happen anyway, but let's be paranoid. TrimmingConfig trimConfig { 2, 1 }; EncodedConfigChange configChange = raftconfig(leaderID)->setTrimmingConfig(trimConfig, true); ASSERT_TRUE(configChange.error.empty()); ASSERT_REPLY(tunnel(leaderID)->execute(configChange.request), "OK"); // some more updates... for(size_t i = NENTRIES; i < NENTRIES*2; i++) { futures.emplace_back(tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i))); } // Get responses for(size_t i = 0; i < futures.size(); i++) { ASSERT_REPLY(futures[i], "OK"); } RETRY_ASSERT_EQ(journal(0)->getLogStart(), 1000); RETRY_ASSERT_EQ(journal(1)->getLogStart(), 1000); RETRY_ASSERT_EQ(journal(2)->getLogStart(), 1000); RETRY_ASSERT_EQ(stateMachine(0)->getLastApplied(), 1002); RETRY_ASSERT_EQ(stateMachine(1)->getLastApplied(), 1002); RETRY_ASSERT_EQ(stateMachine(2)->getLastApplied(), 1002); } TEST_F(Resilvering, manual) { // Don't spin up #2 yet.. We'll try to resilver that node manually later. spinup(0); spinup(1); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1)); int leaderID = getLeaderID(); // push lots of updates const int64_t NENTRIES = 5000; for(size_t i = 0; i < NENTRIES; i++) { ASSERT_REPLY(tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i)), "OK"); } LogIndex commitIndex = journal(leaderID)->getCommitIndex(); RETRY_ASSERT_EQ(journal(0)->getCommitIndex(), commitIndex); RETRY_ASSERT_EQ(journal(1)->getCommitIndex(), commitIndex); ASSERT_EQ(journal(2)->getCommitIndex(), 0); // Stop the stable cluster and start node #2 spindown(0); spindown(1); spinup(2); // Ensure node #2 is empty. std::string tmp; for(size_t i = 0; i < NENTRIES; i++) { ASSERT_NOTFOUND(stateMachine(2)->get(SSTR("key-" << i), tmp)); } // Let's drive the resilvering logic of #2 manually. RaftResilverer resilverer(*shardDirectory(0), myself(2), *contactDetails(), *trimmer(0)); RETRY_ASSERT_EQ(resilverer.getStatus().state, ResilveringState::SUCCEEDED); ASSERT_GE(resilverer.getProgress(), 4u); qdb_info("Files copied: " << resilverer.getProgress()); ASSERT_EQ(resilverer.getProgress(), resilverer.getTotalToSend()); // Ensure the data is there after resilvering. for(size_t i = 0; i < NENTRIES; i++) { std::string value; ASSERT_TRUE(stateMachine(2)->get(SSTR("key-" << i), value).ok()); ASSERT_EQ(value, SSTR("value-" << i)); } ASSERT_TRUE(journal(2)->getCommitIndex() == commitIndex); } TEST_F(Resilvering, automatic) { // Don't spin up #2 yet.. Will be resilvered later on. spinup(0); spinup(1); prepare(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1)); int leaderID = getLeaderID(); // Lower the journal trim limit, so as to trigger a resilvering. TrimmingConfig trimConfig { 1000, 1000 }; EncodedConfigChange configChange = raftconfig(leaderID)->setTrimmingConfig(trimConfig, true); ASSERT_TRUE(configChange.error.empty()); ASSERT_REPLY(tunnel(leaderID)->execute(configChange.request), "OK"); // push lots of updates const int64_t NENTRIES = 5000; for(size_t i = 0; i < NENTRIES; i++) { ASSERT_REPLY(tunnel(leaderID)->exec("set", SSTR("key-" << i), SSTR("value-" << i)), "OK"); } RETRY_ASSERT_EQ(journal(0)->getCommitIndex(), NENTRIES+2); RETRY_ASSERT_EQ(journal(1)->getCommitIndex(), NENTRIES+2); ASSERT_EQ(journal(2)->getCommitIndex(), 0); RETRY_ASSERT_EQ(journal(0)->getLogStart(), NENTRIES - 1000); RETRY_ASSERT_EQ(journal(1)->getLogStart(), NENTRIES - 1000); ASSERT_EQ(journal(2)->getLogStart(), 0); const ResilveringHistory &resilveringHistory = shardDirectory(2)->getResilveringHistory(); ASSERT_EQ(resilveringHistory.size(), 1u); ASSERT_EQ(resilveringHistory.at(0).getID(), "GENESIS"); // Start up node #2, verify it gets resilvered spinup(2); // Attention here.. when resilvering is in progress, we can't access the journal // or state machine. Wait until resilvering is done. RETRY_ASSERT_EQ(resilveringHistory.size(), 2u); RETRY_ASSERT_EQ(journal(2)->getCommitIndex(), NENTRIES+2); RETRY_ASSERT_EQ(journal(2)->getLogStart(), NENTRIES - 1000); // Ensure the data is there after resilvering. for(size_t i = 0; i < NENTRIES; i++) { std::string value; ASSERT_TRUE(stateMachine(2)->get(SSTR("key-" << i), value).ok()); ASSERT_EQ(value, SSTR("value-" << i)); } }