// ---------------------------------------------------------------------- // File: qclient.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftDispatcher.hh" #include "raft/RaftReplicator.hh" #include "raft/RaftTimeouts.hh" #include "raft/RaftCommitTracker.hh" #include "raft/RaftReplicator.hh" #include "Configuration.hh" #include "QuarkDBNode.hh" #include "../test-utils.hh" #include "RedisParser.hh" #include #include #include "utils/AssistedThread.hh" #include "../test-reply-macros.hh" using namespace quarkdb; class QClientTests : public TestCluster3NodesFixture {}; TEST_F(QClientTests, hide_transient_failures) { qclient::Members members; members.push_back(myself(0).hostname, myself(0).port); members.push_back(myself(1).hostname, myself(1).port); members.push_back(myself(2).hostname, myself(2).port); qclient::Options opts; opts.transparentRedirects = true; opts.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(30)); opts.handshake = makeQClientHandshake(); QClient qcl(members, std::move(opts)); // Issue request _before_ spinning up the cluster! Verify it succeeds. std::future reply = qcl.exec("HSET", "aaaaa", "bbbbb", "cccc"); spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); ASSERT_REPLY(reply, 1); ASSERT_REPLY(qcl.exec("HGET", "aaaaa", "bbbbb"), "cccc"); int leaderID = getLeaderID(); spindown(leaderID); ASSERT_REPLY(qcl.exec("HSET", "aaaaa", "bbbbb", "ddd"), 0); ASSERT_REPLY(qcl.exec("HGET", "aaaaa", "bbbbb"), "ddd"); spinup(leaderID); std::vector> replies; for(size_t i = 0; i < 10000; i++) { replies.emplace_back(qcl.exec("SET", SSTR("key-" << i), SSTR("val-" << i))); if(i % 1024 == 0) { // huehueue int leaderID = getLeaderID(); if(leaderID >= 0) { spindown(leaderID); spinup(leaderID); } std::this_thread::sleep_for(std::chrono::seconds(2)); } } for(size_t i = 0; i < 10000; i++) { ASSERT_REPLY(replies[i], "OK"); } for(size_t i = 0; i < 10000; i++) { ASSERT_REPLY(qcl.exec("GET", SSTR("key-" << i)), SSTR("val-" << i)); } } TEST_F(QClientTests, nullptr_only_after_timeout) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); qclient::Members members; members.push_back(myself(0).hostname, myself(0).port); members.push_back(myself(1).hostname, myself(1).port); members.push_back(myself(2).hostname, myself(2).port); qclient::Options opts; opts.transparentRedirects = true; opts.handshake = makeQClientHandshake(); opts.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(3)); QClient qcl(members, std::move(opts)); ASSERT_REPLY(qcl.exec("HSET", "aaaaa", "bbbbb", "cccc"), 1); ASSERT_REPLY(qcl.exec("HGET", "aaaaa", "bbbbb"), "cccc"); // kill cluster spindown(0); spindown(1); spindown(2); // ensure qclient responses don't hang std::chrono::steady_clock::time_point t0 = std::chrono::steady_clock::now(); std::future reply = qcl.exec("HGET", "aaaaa", "bbbbb"); ASSERT_EQ(reply.get(), nullptr); std::chrono::steady_clock::time_point t1 = std::chrono::steady_clock::now(); reply = qcl.exec("HGET", "aaaaa", "bbbbb"); ASSERT_EQ(reply.get(), nullptr); std::chrono::steady_clock::time_point t2 = std::chrono::steady_clock::now(); reply = qcl.exec("HGET", "aaaaa", "bbbbb"); ASSERT_EQ(reply.get(), nullptr); std::chrono::steady_clock::time_point t3 = std::chrono::steady_clock::now(); std::cerr << "t1 - t0: " << std::chrono::duration_cast(t1 - t0).count() << " ms" << std::endl; std::cerr << "t2 - t1: " << std::chrono::duration_cast(t2 - t1).count() << " ms" << std::endl; std::cerr << "t3 - t2: " << std::chrono::duration_cast(t3 - t2).count() << " ms" << std::endl; // Ensure qclient can recover after the timeout, when the cluster is back online. spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); // After a long unavailability, qclient opts to fail fast - thus we have to // try a few times manually. bool success = false; for(size_t i = 0; i < 10; i++) { redisReplyPtr reply = qcl.exec("HGET", "aaaaa", "bbbbb").get(); // Verify that after qclient comes back online, _all_ consequent responses // are valid, and not just if(reply) success = true; ASSERT_TRUE(reply == nullptr || success); if(!reply) { ASSERT_FALSE(success); continue; } ASSERT_REPLY(reply, "cccc"); } ASSERT_TRUE(success); } static void pingerThread(QClient *qcl, size_t id, bool expectValid) { std::vector> futures; for(size_t i = 0; i < 10000; i++) { futures.emplace_back(qcl->exec("PING", SSTR("thread-" << id << "-req-" << i))); } for(size_t i = 0; i < 10000; i++) { redisReplyPtr reply = futures[i].get(); if(expectValid) { ASSERT_REPLY(reply, SSTR("thread-" << id << "-req-" << i)); } } } TEST_F(QClientTests, MultipleWriterThreads) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); // Launch many threads doing pings, using the same QClient object. qclient::Options opts; opts.backpressureStrategy = BackpressureStrategy::RateLimitPendingRequests(2048); opts.handshake = makeQClientHandshake(); QClient qcl(myself(leaderID).hostname, myself(leaderID).port, std::move(opts)); std::vector threads; for(size_t i = 0; i < 20; i++) { threads.emplace_back(pingerThread, &qcl, i, true); } for(size_t i = 0; i < 20; i++) { threads[i].join(); } threads.clear(); // Let's do the above all over again, but shut down the cluster in the middle // of sending pings. Don't expect correct replies this time, of course. for(size_t i = 0; i < 20; i++) { threads.emplace_back(pingerThread, &qcl, i, false); } spindown(0); spindown(1); spindown(2); for(size_t i = 0; i < 20; i++) { threads[i].join(); } } // TEST_F(QClientTests, Partitions) { // spinup(0); spinup(1); spinup(2); // RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); // int leaderID = getLeaderID(); // ASSERT_REPLY(tunnel(leaderID)->exec("PING", "pickles"), "pickles"); // qclient::FaultInjector &faultInjector = tunnel(leaderID)->getFaultInjector(); // for(size_t i = 0; i < 10; i++) { // faultInjector.enforceTotalBlackout(); // redisReplyPtr reply; // int attempts = 0; // while(attempts++ < 10 && reply) { // reply = tunnel(leaderID)->exec("PING", "pickles-3").get(); // } // ASSERT_EQ(reply, nullptr); // faultInjector.liftTotalBlackout(); // redisReplyPtr reply2; // attempts = 0; // while(attempts++ < 10 && !reply2) { // reply2 = tunnel(leaderID)->exec("PING", "pickles-3").get(); // } // ASSERT_TRUE(reply2 != nullptr); // ASSERT_REPLY(reply2, "pickles-3"); // } // }