// ---------------------------------------------------------------------- // File: test-utils.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "ShardDirectory.hh" #include "config/test-config.hh" #include "test-utils.hh" #include "Utils.hh" #include #include #include "Shard.hh" #include "raft/RaftGroup.hh" #include "raft/RaftJournal.hh" #include "QuarkDBNode.hh" #include "qclient/GlobalInterceptor.hh" #include "raft/RaftDispatcher.hh" #include namespace quarkdb { std::vector testreqs = { {"set", "abc", "123"}, {"set", "123", "abc"}, {"hset", "myhash", "value", "234" }, {"sadd", "myset", "a"}, {"sadd", "myset", "b"}, {"sadd", "myset", "c"}, {"sadd", "myset", "d"}, {"hset", "myhash", "key1", "val1"}, {"hset", "myhash", "key2", "val2"}, {"hset", "myhash", "key3", "val3"}, {"hset", "myhash", "key4", "val4"}, {"hset", "myhash", "key5", "val5"}, {"hset", "myhash", "key6", "val6"}, {"hset", "myhash", "key7", "val7"}, {"hset", "myhash", "key8", "val8"}, {"hset", "myhash", "key9", "val9"} }; void GlobalEnv::clearConnectionCache() { qdb_info("Global environment: clearing connection cache."); for(auto& kv : shardDirCache) { delete kv.second; } shardDirCache.clear(); if(!testdir.empty()) { if (system(SSTR("rm -r " << testdir).c_str())) { qdb_error("Failed to remove directory: " << testdir); } if (system(SSTR("mkdir " << testdir).c_str())) { qdb_error("Failed to create directory: " << testdir); } } } void GlobalEnv::SetUp() { clearConnectionCache(); } void GlobalEnv::TearDown() { clearConnectionCache(); } ShardDirectory* GlobalEnv::getShardDirectory(const std::string &path, RaftClusterID clusterID, const std::vector &nodes) { ShardDirectory *ret = shardDirCache[path]; if(ret == nullptr) { Status st; ret = ShardDirectory::create(path, clusterID, "default", nodes, 0, FsyncPolicy::kAsync, {}, st); st.assertOk(); shardDirCache[path] = ret; } ret->obliterate(clusterID, nodes, 0, FsyncPolicy::kAsync, {} ); return ret; } RaftServer GlobalEnv::server(int id) { RaftServer srv; srv.hostname = SSTR("server" << id); srv.port = 23456 + id; qclient::GlobalInterceptor::addIntercept( qclient::Endpoint(srv.hostname, srv.port), qclient::Endpoint("127.0.0.1", srv.port) ); return srv; } ::testing::Environment* const commonStatePtr = ::testing::AddGlobalTestEnvironment(new GlobalEnv); GlobalEnv &commonState(*(GlobalEnv*)commonStatePtr); TestCluster::TestCluster(RaftTimeouts timeouts, RaftClusterID clust, const std::vector &nd, int initialActiveNodes) : clusterid(clust), clusterTimeouts(timeouts), allNodes(nd) { if(initialActiveNodes < 0) { initialNodes = allNodes; } for(int i = 0; i < initialActiveNodes; i++) { initialNodes.emplace_back(allNodes[i]); } Connection::setPhantomBatchLimit(100); } TestCluster::TestCluster(RaftClusterID clust, const std::vector &nd, int initialActiveNodes) : TestCluster(testconfig.raftTimeouts, clust, nd, initialActiveNodes) {} TestCluster::~TestCluster() { for(auto &kv : testnodes) { delete kv.second; } if(!testconfig.databaseReuse) { commonState.clearConnectionCache(); } } RaftClusterID TestCluster::clusterID() { return clusterid; } size_t TestCluster::getClusterSize() const { return testnodes.size(); } ShardDirectory* TestCluster::shardDirectory(int id) { return node(id)->shardDirectory(); } StateMachine* TestCluster::stateMachine(int id) { return node(id)->group()->stateMachine(); } RaftJournal* TestCluster::journal(int id) { return node(id)->group()->journal(); } RaftDispatcher* TestCluster::dispatcher(int id) { return node(id)->group()->dispatcher(); } RaftState* TestCluster::state(int id) { return node(id)->group()->state(); } AsioPoller* TestCluster::poller(int id) { return node(id)->poller(); } RaftDirector* TestCluster::director(int id) { return node(id)->group()->director(); } RaftServer TestCluster::myself(int id) { return node(id)->group()->myself(); } std::vector TestCluster::nodes(int id) { return node(id)->nodes(); } qclient::Members TestCluster::members(int id) { return node(id)->members(); } qclient::QClient* TestCluster::tunnel(int id) { return node(id)->tunnel(); } std::unique_ptr TestCluster::makeQClientHandshake(int id) { return node(id)->makeQClientHandshake(); } qclient::Options TestCluster::makeNoRedirectOptions(int id) { return node(id)->makeNoRedirectOptions(); } RaftHeartbeatTracker* TestCluster::heartbeatTracker(int id) { return node(id)->group()->heartbeatTracker(); } const RaftContactDetails* TestCluster::contactDetails(int id) { return node(id)->group()->contactDetails(); } RaftLease* TestCluster::lease(int id) { return node(id)->group()->lease(); } RaftCommitTracker* TestCluster::commitTracker(int id) { return node(id)->group()->commitTracker(); } RaftConfig* TestCluster::raftconfig(int id) { return node(id)->group()->config(); } RaftTrimmer* TestCluster::trimmer(int id) { return node(id)->group()->trimmer(); } Publisher* TestCluster::publisher(int id) { return node(id)->group()->publisher(); } void TestCluster::killTunnel(int id) { return node(id)->killTunnel(); } TestNode* TestCluster::node(int id, const RaftServer &srv) { auto it = testnodes.find(id); if(it != testnodes.end()) { return it->second; } RaftServer newserver = srv; if(newserver.empty()) newserver = allNodes[id]; TestNode *ret = new TestNode(newserver, clusterID(), timeouts(), initialNodes); testnodes[id] = ret; return ret; } void TestCluster::spinup(int id) { qdb_info("Spinning up node #" << id) node(id)->spinup(); } void TestCluster::spindown(int id) { qdb_info("Spinning down node #" << id) node(id)->spindown(); } void TestCluster::prepare(int id) { qdb_info("Preparing node #" << id); journal(id); stateMachine(id); } int TestCluster::getServerID(const RaftServer &srv) { for(size_t i = 0; i < allNodes.size(); i++) { if(myself(i) == srv) return i; } return -1; } std::vector TestCluster::retrieveLeaders() { std::vector ret; for(size_t i = 0; i < initialNodes.size(); i++) { if(testnodes.count(i) > 0) { ret.push_back(state(i)->getSnapshot()->leader); } } return ret; } int TestCluster::getLeaderID() { return getServerID(state(0)->getSnapshot()->leader); } qclient::SubscriptionOptions TestCluster::reasonableSubscriptionOptions(bool pushtypes) { qclient::SubscriptionOptions opts; opts.usePushTypes = pushtypes; opts.handshake = makeQClientHandshake(); return opts; } RaftTimeouts TestCluster::timeouts() { return clusterTimeouts; } RaftVoteResponse TestCluster::issueManualVote(const RaftVoteRequest &votereq, int id) { RaftStateSnapshotPtr snapshot = state(id)->getSnapshot(); // pre-vote RaftVoteResponse preVoteOutcome = dispatcher(id)->requestVote(votereq, true); // ensure nothing changed EXPECT_EQ(snapshot, state(id)->getSnapshot()); EXPECT_EQ(snapshot->term, state(id)->getSnapshot()->term); // real-vote RaftVoteResponse realVoteOutcome = dispatcher(id)->requestVote(votereq, false); EXPECT_EQ(preVoteOutcome.vote, realVoteOutcome.vote); return realVoteOutcome; } TestNode::TestNode(RaftServer me, RaftClusterID clust, RaftTimeouts timeouts, const std::vector &nd) : myselfSrv(me), clusterID(clust), initialNodes(nd) { std::string shardPath = SSTR(commonState.testdir << "/" << myself().hostname << "-" << myself().port); Configuration config; bool status = Configuration::fromString(SSTR( "redis.mode raft\n" << "redis.database " << shardPath << "\n" "redis.myself " << myselfSrv.toString() << "\n" "redis.password 1234567890-qwerty-0987654321-ytrewq\n" ), config); if(!status) { qdb_throw("error reading configuration"); } shardPath += "/"; // We inject the shard directory in QDB node. sharddirptr = commonState.getShardDirectory(shardPath, clusterID, initialNodes); qdbnodeptr = new QuarkDBNode(config, timeouts, sharddirptr); } ShardDirectory* TestNode::shardDirectory() { return sharddirptr; } Shard* TestNode::shard() { return quarkdbNode()->getShard(); } RaftGroup* TestNode::group() { return shard()->getRaftGroup(); } QuarkDBNode* TestNode::quarkdbNode() { return qdbnodeptr; } TestNode::~TestNode() { if(pollerptr) delete pollerptr; if(tunnelptr) delete tunnelptr; if(qdbnodeptr) delete qdbnodeptr; } RaftServer TestNode::myself() { return myselfSrv; } std::vector TestNode::nodes() { return group()->journal()->getNodes(); } qclient::Members TestNode::members() { qclient::Members memb; std::vector clusterNodes = this->nodes(); for(auto it = clusterNodes.begin(); it != clusterNodes.end(); it++) { memb.push_back(it->hostname, it->port); } return memb; } AsioPoller* TestNode::poller() { if(pollerptr == nullptr) { pollerptr = new AsioPoller(myself().port, 5, quarkdbNode()); } return pollerptr; } qclient::Options TestNode::makeNoRedirectOptions() { qclient::Options options; options.transparentRedirects = false; options.handshake = makeQClientHandshake(); return options; } std::unique_ptr TestNode::makeQClientHandshake() { if(group()->contactDetails()->getPassword().empty()) { return {}; } return std::unique_ptr(new qclient::HmacAuthHandshake(group()->contactDetails()->getPassword())); } qclient::QClient* TestNode::tunnel() { if(tunnelptr == nullptr) { tunnelptr = new qclient::QClient(myself().hostname, myself().port, makeNoRedirectOptions()); } return tunnelptr; } void TestNode::killTunnel() { if(tunnelptr != nullptr) { delete tunnelptr; tunnelptr = nullptr; } } void TestNode::spinup() { shard()->spinup(); poller(); } void TestNode::spindown() { if(pollerptr) { delete pollerptr; pollerptr = nullptr; } shard()->spindown(); } bool IptablesHelper::singleDropPackets(int port) { return system(SSTR("iptables -I OUTPUT -p tcp --dest 127.0.0.1 --dport " << port << " -j DROP").c_str()) == 0; } bool IptablesHelper::singleAcceptPackets(int port) { return system(SSTR("iptables -I OUTPUT -p tcp --dest 127.0.0.1 --dport " << port << " -j ACCEPT").c_str()) == 0; } ClusterDestabilizer::ClusterDestabilizer(TestCluster *testCluster) : mTestCluster(testCluster) { mThread.reset(&ClusterDestabilizer::main, this); } ClusterDestabilizer::~ClusterDestabilizer() {} void ClusterDestabilizer::main(ThreadAssistant &assistant) { assistant.wait_for(mTestCluster->timeouts().getLow()); while(!assistant.terminationRequested()) { int leaderID = mTestCluster->getLeaderID(); if(leaderID >= 0) { mTestCluster->spindown(leaderID); assistant.wait_for(mTestCluster->timeouts().getLow()); mTestCluster->spinup(leaderID); } assistant.wait_for(mTestCluster->timeouts().getHigh() * 2); } } }