// ---------------------------------------------------------------------- // File: e2e.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftDispatcher.hh" #include "raft/RaftReplicator.hh" #include "raft/RaftTimeouts.hh" #include "raft/RaftCommitTracker.hh" #include "raft/RaftConfig.hh" #include "raft/RaftContactDetails.hh" #include "ShardDirectory.hh" #include "Version.hh" #include "Configuration.hh" #include "QuarkDBNode.hh" #include "test-utils.hh" #include "RedisParser.hh" #include #include "test-reply-macros.hh" #include "qclient/structures/QDeque.hh" #include "qclient/structures/QScanner.hh" #include "qclient/structures/QSet.hh" #include "qclient/structures/QLocalityHash.hh" #include "qclient/structures/QHash.hh" #include "qclient/pubsub/MessageQueue.hh" #include "qclient/pubsub/BaseSubscriber.hh" #include "qclient/pubsub/Subscriber.hh" #include "qclient/network/AsyncConnector.hh" #include "qclient/network/HostResolver.hh" #include "qclient/shared/SharedDeque.hh" #include "qclient/shared/PersistentSharedHash.hh" #include "qclient/shared/SharedHash.hh" #include "qclient/shared/UpdateBatch.hh" #include "qclient/shared/SharedHashSubscription.hh" #include "qclient/shared/SharedManager.hh" #include "qclient/shared/TransientSharedHash.hh" #include "qclient/shared/Communicator.hh" #include "qclient/shared/CommunicatorListener.hh" using namespace quarkdb; #define ASSERT_OK(msg) ASSERT_TRUE(msg.ok()) class Raft_e2e : public TestCluster3NodesFixture {}; class Raft_e2e5 : public TestCluster5NodesFixture {}; TEST_F(Raft_e2e, coup) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); ASSERT_GE(leaderID, 0); ASSERT_LE(leaderID, 2); int instigator = (leaderID+1)%3; for(int i = 1; i < 10; i++) { RaftTerm term = state(instigator)->getSnapshot()->term; ASSERT_REPLY(tunnel(instigator)->exec("RAFT_ATTEMPT_COUP"), "vive la revolution"); RETRY_ASSERT_TRUE(state(instigator)->getSnapshot()->term > term); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); if(instigator == getLeaderID()) { qdb_info("Successful coup in " << i << " attempts"); return; // pass test } } ASSERT_TRUE(false) << "Test has failed"; } TEST_F(Raft_e2e, simultaneous_clients) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getServerID(state(0)->getSnapshot()->leader); ASSERT_GE(leaderID, 0); ASSERT_LE(leaderID, 2); LogIndex lastEntry = journal(leaderID)->getLogSize() - 1; std::vector> futures; // send off many requests, pipeline them futures.emplace_back(tunnel(leaderID)->exec("get", "asdf")); futures.emplace_back(tunnel(leaderID)->exec("ping")); futures.emplace_back(tunnel(leaderID)->exec("client", "setname", "aaa")); futures.emplace_back(tunnel(leaderID)->exec("set", "asdf", "1234")); futures.emplace_back(tunnel(leaderID)->exec("get", "asdf")); futures.emplace_back(tunnel(leaderID)->exec("raft-fetch", SSTR(lastEntry+1), "raw")); ASSERT_REPLY(futures[0], ""); ASSERT_REPLY(futures[1], "PONG"); ASSERT_REPLY(futures[2], "OK"); ASSERT_REPLY(futures[3], "OK"); ASSERT_REPLY(futures[4], "1234"); RaftEntry entry; ASSERT_TRUE(RaftParser::fetchResponse(futures[5].get().get(), entry)); ASSERT_EQ(entry.term, state(0)->getSnapshot()->term); ASSERT_EQ(entry.request, make_req("set", "asdf", "1234")); futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("set", "asdf", "3456")); futures.emplace_back(tunnel(leaderID)->exec("get", "asdf")); ASSERT_REPLY(futures[0], "OK"); ASSERT_REPLY(futures[1], "3456"); // make sure the log entry has been propagated to all nodes for(size_t i = 0; i < 3; i++) { std::string value; RETRY_ASSERT_TRUE(stateMachine(i)->get("asdf", value).ok() && value == "3456"); } ASSERT_REPLY(tunnel(leaderID)->exec("set", "qwerty", "789"), "OK"); futures.clear(); // interwine pipelined requests from three connections qclient::QClient tunnel2(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions()); qclient::QClient tunnel3(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions()); futures.emplace_back(tunnel2.exec("get", "qwerty")); futures.emplace_back(tunnel(leaderID)->exec("set", "client2", "val")); futures.emplace_back(tunnel(leaderID)->exec("get", "client2")); futures.emplace_back(tunnel(leaderID)->exec("sadd", "myset", "a")); futures.emplace_back(tunnel2.exec("sadd", "myset", "b")); futures.emplace_back(tunnel2.exec("sadd", "myset")); // malformed request futures.emplace_back(tunnel3.exec("set", "client3", "myval")); futures.emplace_back(tunnel3.exec("get", "client3")); // not guaranteed that response will be "myval" here, since it's on a different connection futures.emplace_back(tunnel2.exec("get", "client3")); ASSERT_REPLY(futures[0], "789"); ASSERT_REPLY(futures[1], "OK"); ASSERT_REPLY(futures[2], "val"); ASSERT_REPLY(futures[3], 1); ASSERT_REPLY(futures[4], 1); ASSERT_REPLY(futures[5], "ERR wrong number of arguments for 'sadd' command"); ASSERT_REPLY(futures[6], "OK"); ASSERT_REPLY(futures[7], "myval"); redisReplyPtr reply = futures[8].get(); std::string str = std::string(reply->str, reply->len); qdb_info("Race-y request: GET client3 ==> " << str); ASSERT_TRUE(str == "myval" || str == ""); ASSERT_REPLY(tunnel2.exec("scard", "myset"), 2); // but here we've received an ack - response _must_ be myval ASSERT_REPLY(tunnel2.exec("get", "client3"), "myval"); RaftInfo info = dispatcher(leaderID)->info(); ASSERT_EQ(info.blockedWrites, 0u); ASSERT_EQ(info.leader, myself(leaderID)); std::string err; std::string checkpointPath = SSTR(commonState.testdir << "/checkpoint"); // Before taking a checkpoint, ensure node #0 is caught up RETRY_ASSERT_EQ(stateMachine(0)->getLastApplied(), stateMachine(leaderID)->getLastApplied()); ASSERT_TRUE(shardDirectory()->checkpoint(checkpointPath).empty()); ASSERT_FALSE(shardDirectory()->checkpoint(checkpointPath).empty()); // exists already // pretty expensive to open two extra databases, but necessary StateMachine checkpointSM(SSTR(checkpointPath << "/current/state-machine")); std::string tmp; ASSERT_OK(checkpointSM.get("client3", tmp)); ASSERT_EQ(tmp, "myval"); ASSERT_OK(checkpointSM.get("client2", tmp)); ASSERT_EQ(tmp, "val"); // TODO: verify checkpointSM last applied, once atomic commits are implemented // ensure the checkpoint journal is identical to the original RaftJournal checkpointJournal(SSTR(checkpointPath << "/current/raft-journal")); ASSERT_EQ(checkpointJournal.getLogSize(), journal()->getLogSize()); for(LogIndex i = 0; i < journal()->getLogSize(); i++) { RaftEntry entry1, entry2; ASSERT_OK(checkpointJournal.fetch(i, entry1)); ASSERT_OK(journal()->fetch(i, entry2)); ASSERT_EQ(entry1, entry2); } } TEST_F(Raft_e2e, hscan) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getServerID(state(0)->getSnapshot()->leader); for(size_t i = 1; i < 10; i++) { ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", SSTR("f" << i), SSTR("v" << i)), 1); } redisReplyPtr reply = tunnel(leaderID)->exec("hscan", "hash", "0", "cOUnT", "3").get(); ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "v1", "f2", "v2", "f3", "v3"))); reply = tunnel(leaderID)->exec("hscan", "hash", "0", "asdf", "123").get(); ASSERT_ERR(reply, "ERR syntax error"); reply = tunnel(leaderID)->exec("hscan", "hash", "next:f4", "COUNT", "3").get(); ASSERT_REPLY(reply, std::make_pair("next:f7", make_vec("f4", "v4", "f5", "v5", "f6", "v6"))); reply = tunnel(leaderID)->exec("hscan", "hash", "next:f7", "COUNT", "30").get(); ASSERT_REPLY(reply, std::make_pair("0", make_vec("f7", "v7", "f8", "v8", "f9", "v9"))); reply = tunnel(leaderID)->exec("hscan", "hash", "adfaf").get(); ASSERT_ERR(reply, "ERR invalid cursor"); reply = tunnel(leaderID)->exec("hscan", "hash", "next:zz").get(); ASSERT_REPLY(reply, std::make_pair("0", make_vec())); } TEST_F(Raft_e2e, scan) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); for(size_t i = 1; i < 10; i++) { ASSERT_REPLY(tunnel(leaderID)->exec("set", SSTR("f" << i), SSTR("v" << i)), "OK"); } redisReplyPtr reply = tunnel(leaderID)->exec("scan", "0", "MATCH", "f[1-2]").get(); ASSERT_REPLY(reply, std::make_pair("0", make_vec("f1", "f2"))); reply = tunnel(leaderID)->exec("scan", "0", "MATCH", "f*", "COUNT", "3").get(); ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3"))); // without MATCH reply = tunnel(leaderID)->exec("scan", "0", "COUNT", "3").get(); ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3"))); // with "*" MATCH pattern reply = tunnel(leaderID)->exec("scan", "0", "COUNT", "3", "MATCH", "*").get(); ASSERT_REPLY(reply, std::make_pair("next:f4", make_vec("f1", "f2", "f3"))); QScanner scanner(*tunnel(leaderID), "f*", 3); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 1u); ASSERT_EQ(scanner.getValue(), "f1"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 1u); ASSERT_EQ(scanner.getValue(), "f2"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 1u); ASSERT_EQ(scanner.getValue(), "f3"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 2u); ASSERT_EQ(scanner.getValue(), "f4"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 2u); ASSERT_EQ(scanner.getValue(), "f5"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 2u); ASSERT_EQ(scanner.getValue(), "f6"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 3u); ASSERT_EQ(scanner.getValue(), "f7"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 3u); ASSERT_EQ(scanner.getValue(), "f8"); scanner.next(); ASSERT_TRUE(scanner.valid()); ASSERT_EQ(scanner.requestsSoFar(), 3u); ASSERT_EQ(scanner.getValue(), "f9"); scanner.next(); ASSERT_FALSE(scanner.valid()); } TEST_F(Raft_e2e, test_qclient_convenience_classes) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); // QHash iterator std::vector> replies; for(size_t i = 0; i < 9; i++) { replies.push_back(tunnel(leaderID)->exec("HSET", "myhash", SSTR("f" << i), SSTR("v" << i))); } for(size_t i = 0; i < 9; i++) { ASSERT_REPLY(replies[i], 1); } qclient::QHash qhash(*tunnel(leaderID), "myhash"); qclient::QHash::Iterator it = qhash.getIterator(2); for(size_t i = 0; i < 9; i++) { ASSERT_TRUE(it.valid()); ASSERT_EQ(it.getKey(), SSTR("f" << i)); ASSERT_EQ(it.getValue(), SSTR("v" << i)); it.next(); } ASSERT_FALSE(it.valid()); ASSERT_EQ(it.requestsSoFar(), 5u); // QSet iterator replies.clear(); for(size_t i = 0; i < 9; i++) { replies.push_back(tunnel(leaderID)->exec("SADD", "myset", SSTR("item-" << i))); } for(size_t i = 0; i < 9; i++) { ASSERT_REPLY(replies[i], 1); } qclient::QSet qset(*tunnel(leaderID), "myset"); for(size_t count = 1; count < 15; count++) { qclient::QSet::Iterator it = qset.getIterator(count); for(size_t i = 0; i < 9; i++) { ASSERT_TRUE(it.valid()); ASSERT_EQ(it.getElement(), SSTR("item-" << i)); it.next(); } ASSERT_FALSE(it.valid()); ASSERT_EQ(it.requestsSoFar(), (9 / count) + (9%count != 0) ); } qclient::QSet::Iterator it2 = qset.getIterator(3, "next:item-4"); for(size_t i = 4; i < 9; i++) { ASSERT_TRUE(it2.valid()); ASSERT_EQ(it2.getElement(), SSTR("item-" << i)); it2.next(); } ASSERT_FALSE(it2.valid()); ASSERT_EQ(it2.requestsSoFar(), 2u); } TEST_F(Raft_e2e, test_many_redis_commands) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getServerID(state(0)->getSnapshot()->leader); std::vector> futures; futures.emplace_back(tunnel(leaderID)->exec("SADD", "myset", "a", "b", "c")); futures.emplace_back(tunnel(leaderID)->exec("TYPE", "myset")); futures.emplace_back(tunnel(leaderID)->exec("SCARD", "myset")); futures.emplace_back(tunnel(leaderID)->exec("Smembers", "myset")); futures.emplace_back(tunnel(leaderID)->exec("srem", "myset", "a", "b")); futures.emplace_back(tunnel(leaderID)->exec("srem", "myset", "b")); futures.emplace_back(tunnel(leaderID)->exec("scard", "myset")); futures.emplace_back(tunnel(leaderID)->exec("smembers", "myset")); futures.emplace_back(tunnel(leaderID)->exec("get", "empty_key")); futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-acquire", "123")); futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-get", "123")); futures.emplace_back(tunnel(leaderID)->exec("timestamped-lease-release", "123")); size_t count = 0; ASSERT_REPLY(futures[count++], 3); ASSERT_REPLY(futures[count++], "set"); ASSERT_REPLY(futures[count++], 3); ASSERT_REPLY(futures[count++], make_vec("a", "b", "c")); ASSERT_REPLY(futures[count++], 2); ASSERT_REPLY(futures[count++], 0); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], make_vec("c")); ASSERT_NIL(futures[count++]); ASSERT_REPLY(futures[count++], "ERR unknown command 'timestamped-lease-acquire'" ); ASSERT_REPLY(futures[count++], "ERR unknown command 'timestamped-lease-get'" ); ASSERT_REPLY(futures[count++], "ERR unknown command 'timestamped-lease-release'" ); futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b")); futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "b", "c")); futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "c", "d")); futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "d")); futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "a", "b", "b")); futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "a")); futures.emplace_back(tunnel(leaderID)->exec("sadd", "myhash", "wrongtype")); futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash")); futures.emplace_back(tunnel(leaderID)->exec("hdel", "myhash", "c")); futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash")); futures.emplace_back(tunnel(leaderID)->exec("sadd", "myhash", "wrongtype")); futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash")); futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b")); futures.emplace_back(tunnel(leaderID)->exec("srem", "myhash", "wrongtype")); futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash")); futures.emplace_back(tunnel(leaderID)->exec("hset", "myhash", "a", "b")); futures.emplace_back(tunnel(leaderID)->exec("exists", "myhash", "myhash", "asdf")); futures.emplace_back(tunnel(leaderID)->exec("hexists", "myhash", "a")); futures.emplace_back(tunnel(leaderID)->exec("hexists", "myhash", "b")); futures.emplace_back(tunnel(leaderID)->exec("sismember", "myhash", "b")); futures.emplace_back(tunnel(leaderID)->exec("scard", "myhash")); futures.emplace_back(tunnel(leaderID)->exec("scard", "does-not-exist")); futures.emplace_back(tunnel(leaderID)->exec("quarkdb_invalid_command")); futures.emplace_back(tunnel(leaderID)->exec("raft-fetch-last", "7", "raw")); count = 0; ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 0); ASSERT_REPLY(futures[count++], 2); ASSERT_REPLY(futures[count++], 0); ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 0); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 0); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 2); ASSERT_REPLY(futures[count++], 1); ASSERT_REPLY(futures[count++], 0); ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(futures[count++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(futures[count++], 0); ASSERT_REPLY(futures[count++], "ERR internal dispatching error"); redisReplyPtr entries = futures[count++].get(); std::vector lastEntries; ASSERT_TRUE(RaftParser::fetchLastResponse(entries, lastEntries)); for(size_t i = 1; i <= 7; i++) { RaftEntry comparison; LogIndex index = journal(leaderID)->getLogSize() - i; ASSERT_OK(journal(leaderID)->fetch(index, comparison)); ASSERT_EQ(lastEntries[7 - i], comparison); } futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("set", "mystring", "asdf")); futures.emplace_back(tunnel(leaderID)->exec("keys", "*")); futures.emplace_back(tunnel(leaderID)->exec("exists", "mystring", "myset", "myhash", "adfa", "myhash")); futures.emplace_back(tunnel(leaderID)->exec("del", "myhash", "myset", "mystring")); futures.emplace_back(tunnel(leaderID)->exec("exists", "mystring", "myset", "myhash", "adfa", "myhash")); futures.emplace_back(tunnel(leaderID)->exec("del", "myhash", "myset")); futures.emplace_back(tunnel(leaderID)->exec("clock-get")); ASSERT_REPLY(futures[0], "OK"); ASSERT_REPLY(futures[1], make_vec("myhash", "myset", "mystring")); ASSERT_REPLY(futures[2], 4); ASSERT_REPLY(futures[3], 3); ASSERT_REPLY(futures[4], 0); ASSERT_REPLY(futures[5], 0); qdb_info(qclient::describeRedisReply(futures[6].get())); futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("set", "a", "aa")); futures.emplace_back(tunnel(leaderID)->exec("set", "aa", "a")); futures.emplace_back(tunnel(leaderID)->exec("get", "a")); futures.emplace_back(tunnel(leaderID)->exec("del", "a")); futures.emplace_back(tunnel(leaderID)->exec("get", "aa")); futures.emplace_back(tunnel(leaderID)->exec("keys", "*")); ASSERT_REPLY(futures[0], "OK"); ASSERT_REPLY(futures[1], "OK"); ASSERT_REPLY(futures[2], "aa"); ASSERT_REPLY(futures[3], 1); ASSERT_REPLY(futures[4], "a"); ASSERT_REPLY(futures[5], make_vec("aa")); futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("config_getall")); futures.emplace_back(tunnel(leaderID)->exec("config_set", "some.config.value", "1234")); futures.emplace_back(tunnel(leaderID)->exec("flushall")); futures.emplace_back(tunnel(leaderID)->exec("del", "aa")); futures.emplace_back(tunnel(leaderID)->exec("config_get", "some.config.value", "1234")); futures.emplace_back(tunnel(leaderID)->exec("config_get", "some.config.value")); futures.emplace_back(tunnel(leaderID)->exec("config_getall")); ASSERT_REPLY(futures[0], ""); ASSERT_REPLY(futures[1], "OK"); ASSERT_REPLY(futures[2], "OK"); ASSERT_REPLY(futures[3], 0); ASSERT_REPLY(futures[4], "ERR wrong number of arguments for 'config_get' command"); ASSERT_REPLY(futures[5], "1234"); ASSERT_REPLY(futures[6], make_vec("some.config.value", "1234")); futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("hset", "hash", "key1", "v1")); futures.emplace_back(tunnel(leaderID)->exec("hset", "hash2", "key1", "v1")); futures.emplace_back(tunnel(leaderID)->exec("exists", "hash", "hash2")); futures.emplace_back(tunnel(leaderID)->exec("del", "hash")); futures.emplace_back(tunnel(leaderID)->exec("raft_info")); futures.emplace_back(tunnel(leaderID)->exec("bad_command")); futures.emplace_back(tunnel(leaderID)->exec("exists", "hash")); futures.emplace_back(tunnel(leaderID)->exec("exists", "hash2")); futures.emplace_back(tunnel(leaderID)->exec("raft_info", "leader")); futures.emplace_back(tunnel(leaderID)->exec("recovery_get", "test")); ASSERT_REPLY(futures[0], 1); ASSERT_REPLY(futures[1], 1); ASSERT_REPLY(futures[2], 2); ASSERT_REPLY(futures[3], 1); // ignore futures[4] ASSERT_REPLY(futures[5], "ERR unknown command 'bad_command'"); ASSERT_REPLY(futures[6], 0); ASSERT_REPLY(futures[7], 1); ASSERT_REPLY(futures[8], myself(leaderID).toString() ); ASSERT_REPLY(futures[9], "ERR recovery commands not allowed, not in recovery mode"); futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f1", "v1", "f2", "v2")); futures.emplace_back(tunnel(leaderID)->exec("exists", "hmset_test")); futures.emplace_back(tunnel(leaderID)->exec("hmset", "test")); futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f2", "v3", "f4")); futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f1")); futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test")); futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f2", "value2", "f3", "value3")); futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test")); futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f2")); futures.emplace_back(tunnel(leaderID)->exec("hmset", "hmset_test", "f3", "v3")); futures.emplace_back(tunnel(leaderID)->exec("hget", "hmset_test", "f3")); futures.emplace_back(tunnel(leaderID)->exec("hlen", "hmset_test")); ASSERT_REPLY(futures[0], "OK"); ASSERT_REPLY(futures[1], 1); ASSERT_REPLY(futures[2], "ERR wrong number of arguments for 'hmset' command"); ASSERT_REPLY(futures[3], "ERR wrong number of arguments for 'hmset' command"); ASSERT_REPLY(futures[4], "v1"); ASSERT_REPLY(futures[5], 2); ASSERT_REPLY(futures[6], "OK"); ASSERT_REPLY(futures[7], 3); ASSERT_REPLY(futures[8], "value2"); ASSERT_REPLY(futures[9], "OK"); ASSERT_REPLY(futures[10], "v3"); ASSERT_REPLY(futures[11], 3); futures.clear(); futures.emplace_back(tunnel(leaderID)->exec("deque-push-front", "list_test", "i1", "i2", "i3", "i4")); futures.emplace_back(tunnel(leaderID)->exec("exists", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-len", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-front", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-len", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-back", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-len", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("del", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-len", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-front", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-push-back", "list_test", "i5", "i6", "i7", "i8")); futures.emplace_back(tunnel(leaderID)->exec("set", "list_test", "asdf")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-front", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-back", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-back", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-front", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("set", "list_test", "asdf")); futures.emplace_back(tunnel(leaderID)->exec("deque-pop-front", "list_test")); futures.emplace_back(tunnel(leaderID)->exec("deque-push-back", "my-deque", "1", "2", "3", "4", "5", "6", "7", "8", "9" )); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "0", "COUNT", "3")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "0", "COUNT", "3000")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x05", "COUNT", "3")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x02", "COUNT", "3")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x02", "COUNT", "4")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x02", "COUNT", "2")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x00", "COUNT", "2")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x00", "COUNT", "1")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x70\x00\x00\x00\x00\x00\x00\x00", "COUNT", "1")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x7f\xff\xff\xff\xff\xff\xff\xff", "COUNT", "1")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x7f\xff\xff\xfd\xf3\xff\x1f\x0f", "COUNT", "1")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x7f\xff\xff\xfd\xf3\xff\x1f\x0f", "COUNT", "100")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x06", "COUNT", "3")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x08", "COUNT", "3")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "my-deque", "next:\x80\x00\x00\x00\x00\x00\x00\x09", "COUNT", "3")); futures.emplace_back(tunnel(leaderID)->exec("deque-scan-back", "not-existing", "next:\x80\x00\x00\x00\x00\x00\x00\x09", "COUNT", "3")); int i = 0; ASSERT_REPLY(futures[i++], 4); ASSERT_REPLY(futures[i++], 1); ASSERT_REPLY(futures[i++], 4); ASSERT_REPLY(futures[i++], "i4"); ASSERT_REPLY(futures[i++], 3); ASSERT_REPLY(futures[i++], "i1"); ASSERT_REPLY(futures[i++], 2); ASSERT_REPLY(futures[i++], 1); ASSERT_REPLY(futures[i++], 0); ASSERT_NIL(futures[i++]); ASSERT_REPLY(futures[i++], 4); ASSERT_REPLY(futures[i++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(futures[i++], "i5"); ASSERT_REPLY(futures[i++], "i8"); ASSERT_REPLY(futures[i++], "i7"); ASSERT_REPLY(futures[i++], "i6"); ASSERT_REPLY(futures[i++], "OK"); ASSERT_REPLY(futures[i++], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(futures[i++], 9); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x05\"\n" "2) 1) \"7\"\n" " 2) \"8\"\n" " 3) \"9\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) 1) \"1\"\n" " 2) \"2\"\n" " 3) \"3\"\n" " 4) \"4\"\n" " 5) \"5\"\n" " 6) \"6\"\n" " 7) \"7\"\n" " 8) \"8\"\n" " 9) \"9\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x02\"\n" "2) 1) \"4\"\n" " 2) \"5\"\n" " 3) \"6\"\n" ); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) 1) \"1\"\n" " 2) \"2\"\n" " 3) \"3\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) 1) \"1\"\n" " 2) \"2\"\n" " 3) \"3\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\n" "2) 1) \"2\"\n" " 2) \"3\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) 1) \"1\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) 1) \"1\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) (empty list or set)\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) (empty list or set)\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) (empty list or set)\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) (empty list or set)\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x03\"\n" "2) 1) \"5\"\n" " 2) \"6\"\n" " 3) \"7\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x05\"\n" "2) 1) \"7\"\n" " 2) \"8\"\n" " 3) \"9\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"next:\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\x05\"\n" "2) 1) \"7\"\n" " 2) \"8\"\n" " 3) \"9\"\n"); ASSERT_REPLY_DESCRIBE(futures[i++], "1) \"0\"\n" "2) (empty list or set)\n"); // Now test qclient callbacks, ensure things stay reasonable when we mix them // with futures. TrivialQCallback c1; tunnel(leaderID)->execCB(&c1, "set", "qcl-counter", "1"); TrivialQCallback c2; tunnel(leaderID)->execCB(&c2, "get", "qcl-counter"); std::future fut1 = tunnel(leaderID)->exec("get", "qcl-counter"); std::future fut2 = tunnel(leaderID)->exec("set", "qcl-counter", "2"); std::future fut3 = tunnel(leaderID)->exec("get", "qcl-counter"); TrivialQCallback c3; tunnel(leaderID)->execCB(&c3, "get", "qcl-counter"); TrivialQCallback c4; tunnel(leaderID)->execCB(&c4, "set", "qcl-counter", "3"); TrivialQCallback c5; tunnel(leaderID)->execCB(&c5, "get", "qcl-counter"); std::future fut4 = tunnel(leaderID)->exec("get", "qcl-counter"); ASSERT_REPLY(c1.getFuture(), "OK"); ASSERT_REPLY(c2.getFuture(), "1"); ASSERT_REPLY(fut1, "1"); ASSERT_REPLY(fut2, "OK"); ASSERT_REPLY(fut3, "2"); ASSERT_REPLY(c3.getFuture(), "2"); ASSERT_REPLY(c4.getFuture(), "OK"); ASSERT_REPLY(c5.getFuture(), "3"); ASSERT_REPLY(fut4, "3"); // Test lease commands. std::future l0 = tunnel(leaderID)->exec("lease-acquire", "qcl-counter", "holder1", "10000"); std::future l1 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder1", "10000"); std::future l2 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000"); std::future l3 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder1", "10000"); ASSERT_REPLY(l0, "ERR Invalid Argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(l1, "ACQUIRED"); redisReplyPtr replyL2 = l2.get(); std::string reply = std::string(replyL2->str, replyL2->len); ASSERT_TRUE(StringUtils::startsWith(reply, "ERR lease held by 'holder1', time remaining")); ASSERT_REPLY(l3, "RENEWED"); std::future l4 = tunnel(leaderID)->exec("lease-get", "mykey"); std::future l5 = tunnel(leaderID)->exec("lease-get", "mykey-2"); redisReplyPtr replyL4 = l4.get(); qdb_info(qclient::describeRedisReply(replyL4)); ASSERT_TRUE(StringUtils::startsWith(qclient::describeRedisReply(replyL4), "1) HOLDER: holder1\n2) REMAINING: ")); ASSERT_NIL(l5); std::future l6 = tunnel(leaderID)->exec("lease-release", "mykey"); std::future l7 = tunnel(leaderID)->exec("lease-release", "mykey-2"); std::future l8 = tunnel(leaderID)->exec("lease-release", "qcl-counter"); std::future l9 = tunnel(leaderID)->exec("lease-release", "mykey"); std::future l10 = tunnel(leaderID)->exec("lease-get", "mykey"); ASSERT_REPLY(l6, "OK"); ASSERT_NIL(l7); ASSERT_REPLY(l8, "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_NIL(l9); ASSERT_NIL(l10); std::future l11 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000"); std::future l12 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000"); std::future l13 = tunnel(leaderID)->exec("del", "mykey"); std::future l14 = tunnel(leaderID)->exec("lease-acquire", "mykey", "holder2", "10000"); std::future l15 = tunnel(leaderID)->exec("lease-get-pending-expiration-events"); ASSERT_REPLY(l11, "ACQUIRED"); ASSERT_REPLY(l12, "RENEWED"); ASSERT_REPLY(l13, 1); ASSERT_REPLY(l14, "ACQUIRED"); l15.get(); // ignore for now.. // Ensure the followers return the correct number of responses on MOVED for // pipelined writes. int follower1 = (leaderID+1) % 3; std::vector> moved; for(size_t i = 0; i < 10; i++) { moved.emplace_back(tunnel(follower1)->exec("set", "abc", "123")); } for(size_t i = 0; i < 10; i++) { ASSERT_REPLY(moved[i], SSTR("MOVED 0 " << myself(leaderID).toString())); } // Make sure the connection did not hang. ASSERT_REPLY(tunnel(follower1)->exec("ping", "zxcvbnm"), "zxcvbnm"); // Test integer <-> binary string conversion functions. redisReplyPtr conv1 = tunnel(follower1)->exec("convert-int-to-string", "999").get(); ASSERT_EQ(qclient::describeRedisReply(conv1), "1) \"As int64_t: \\x00\\x00\\x00\\x00\\x00\\x00\\x03\\xE7\"\n2) \"As uint64_t: \\x00\\x00\\x00\\x00\\x00\\x00\\x03\\xE7\"\n"); ASSERT_REPLY(tunnel(follower1)->exec("convert-int-to-string", "adfs"), "ERR cannot parse integer"); ASSERT_REPLY(tunnel(follower1)->exec("convert-string-to-int", "qqqq"), "ERR expected string with 8 characters, was given 4 instead"); redisReplyPtr conv2 = tunnel(follower1)->exec("convert-string-to-int", unsignedIntToBinaryString(999u)).get(); ASSERT_EQ(qclient::describeRedisReply(conv2), "1) Interpreted as int64_t: 999\n2) Interpreted as uint64_t: 999\n"); std::deque multi1; multi1.emplace_back(qclient::EncodedRequest::make("set", "my-awesome-counter", "1")); multi1.emplace_back(qclient::EncodedRequest::make("set", "other-counter", "12345")); multi1.emplace_back(qclient::EncodedRequest::make("get", "other-counter")); multi1.emplace_back(qclient::EncodedRequest::make("get", "my-awesome-counter")); ASSERT_EQ( qclient::describeRedisReply(tunnel(leaderID)->execute(std::move(multi1)).get()), "1) OK\n" "2) OK\n" "3) \"12345\"\n" "4) \"1\"\n" ); ASSERT_REPLY_DESCRIBE( tunnel(leaderID)->exec("quarkdb-verify-checksum").get(), "1) state-machine: OK\n" ); } TEST_F(Raft_e2e, DequeTrimming) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getServerID(state(0)->getSnapshot()->leader); ASSERT_REPLY(tunnel(leaderID)->exec("deque-push-back", "dq", "1", "2", "3", "4", "5", "6"), 6); ASSERT_REPLY(tunnel(leaderID)->exec("deque-len", "dq"), 6); ASSERT_REPLY(tunnel(leaderID)->exec("set", "test", "abc"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("deque-trim-front", "test", "1"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(tunnel(leaderID)->exec("deque-trim-front", "dq", "chicken"), "ERR Invalid argument: value is not an integer or out of range"); ASSERT_REPLY(tunnel(leaderID)->exec("deque-trim-front", "dq", "3"), 3); ASSERT_REPLY(tunnel(leaderID)->exec("deque-pop-front", "dq"), "4"); ASSERT_REPLY(tunnel(leaderID)->exec("deque-pop-front", "dq"), "5"); ASSERT_REPLY(tunnel(leaderID)->exec("deque-trim-front", "dq", "1"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("deque-trim-front", "dq", "0"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("set", "dq", "abc"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("del", "dq", "test"), 2); } TEST_F(Raft_e2e, DequeClear) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getServerID(state(0)->getSnapshot()->leader); ASSERT_REPLY(tunnel(leaderID)->exec("deque-push-back", "dq", "1", "2", "3", "4"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("deque-clear", "dq"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("deque-len", "dq"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("set", "dq", "abc"), "OK"); qclient::QDeque dq(*tunnel(leaderID), "dq2"); size_t len; ASSERT_TRUE(dq.size(len).ok()); ASSERT_EQ(len, 0u); ASSERT_TRUE(dq.push_back("123").ok()); ASSERT_TRUE(dq.push_back("333").ok()); ASSERT_TRUE(dq.size(len).ok()); ASSERT_EQ(len, 2u); std::string val; ASSERT_TRUE(dq.pop_front(val).ok()); ASSERT_EQ(val, "123"); ASSERT_TRUE(dq.size(len).ok()); ASSERT_EQ(len, 1u); ASSERT_TRUE(dq.clear().ok()); ASSERT_TRUE(dq.size(len).ok()); ASSERT_EQ(len, 0u); } TEST_F(Raft_e2e, replication_with_trimmed_journal) { Connection::setPhantomBatchLimit(1); spinup(0); spinup(1); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1)); int leaderID = getServerID(state(0)->getSnapshot()->leader); int firstSlaveID = (leaderID+1)%2; ASSERT_GE(leaderID, 0); ASSERT_LE(leaderID, 1); // First, disable automatic resilvering.. EncodedConfigChange configChange = raftconfig(leaderID)->setResilveringEnabled(false); ASSERT_TRUE(configChange.error.empty()); ASSERT_REPLY(tunnel(leaderID)->execute(configChange.request), "OK"); // send off many requests, pipeline them std::vector> futures; for(size_t i = 0; i < testreqs.size(); i++) { futures.emplace_back(tunnel(leaderID)->execute(testreqs[i])); } for(size_t i = 0; i < 2; i++) { ASSERT_REPLY(futures[i], "OK"); } for(size_t i = 2; i < futures.size(); i++) { ASSERT_REPLY(futures[i], 1); } // ensure the two nodes have reached complete consensus RETRY_ASSERT_TRUE(checkFullConsensus(0, 1)); // now let's trim their journals.. std::vector entryBackup; for(size_t i = 1; i < 5; i++) { RaftEntry entry; ASSERT_TRUE(journal(firstSlaveID)->fetch(i, entry).ok()) << i; entryBackup.emplace_back(std::move(entry)); } journal(0)->trimUntil(4); journal(1)->trimUntil(4); // and verify it's NOT possible to bring node #2 up to date spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); ASSERT_EQ(journal(2)->getLogSize(), 1); ASSERT_EQ(journal(2)->getLogStart(), 0); // a divine intervention fills up the missing entries in node #2 journal for(size_t i = 0; i < 4; i++) { journal(2)->append(i+1, entryBackup[i]); } // now verify node #2 can be brought up to date successfully RETRY_ASSERT_TRUE( journal(0)->getLogSize() == journal(1)->getLogSize() && journal(1)->getLogSize() == journal(2)->getLogSize() ); ASSERT_EQ(journal(2)->getLogSize(), journal(leaderID)->getLogSize()); ASSERT_EQ(journal(2)->getLogSize(), journal(firstSlaveID)->getLogSize()); // Verify resilvering didn't happen. ASSERT_EQ(journal(2)->getLogStart(), 0); } TEST_F(Raft_e2e, membership_updates) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getServerID(state(0)->getSnapshot()->leader); ASSERT_REPLY(tunnel(leaderID)->exec("set", "pi", "3.141516"), "OK"); // throw a node out of the cluster int victim = (leaderID+1) % 3; RETRY_ASSERT_TRUE(checkFullConsensus(0, 1, 2)); int index = journal(leaderID)->getLogSize() - 1; ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK"); RETRY_ASSERT_EQ(dispatcher(leaderID)->info().commitIndex, index + 1); // verify the cluster has not been disrupted ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID)); // add it back as an observer, verify consensus ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(victim).toString()), "OK"); RETRY_ASSERT_EQ(dispatcher(0)->info().commitIndex, index + 2); RETRY_ASSERT_EQ(dispatcher(1)->info().commitIndex, index + 2); RETRY_ASSERT_EQ(dispatcher(2)->info().commitIndex, index + 2); ASSERT_EQ(state(victim)->getSnapshot()->status, RaftStatus::FOLLOWER); ASSERT_EQ(state(0)->getSnapshot()->leader, state(1)->getSnapshot()->leader); ASSERT_EQ(state(1)->getSnapshot()->leader, state(2)->getSnapshot()->leader); ASSERT_EQ(journal(0)->getLogSize(), journal(1)->getLogSize()); ASSERT_EQ(journal(1)->getLogSize(), journal(2)->getLogSize()); // cannot be a leader, it's an observer ASSERT_NE(state(0)->getSnapshot()->leader, myself(victim)); // add back as a full voting member leaderID = getServerID(state(0)->getSnapshot()->leader); ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_PROMOTE_OBSERVER", myself(victim).toString()), "OK"); RETRY_ASSERT_EQ(dispatcher(leaderID)->info().commitIndex, index + 3); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); // .. and demote again ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_DEMOTE_TO_OBSERVER", myself(victim).toString()), "OK"); RETRY_ASSERT_EQ(dispatcher(leaderID)->info().commitIndex, index + 4); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); } TEST_F(Raft_e2e, reject_dangerous_membership_update) { spinup(0); spinup(1); RETRY_ASSERT_TRUE(checkFullConsensus(0, 1)); int leaderID = getLeaderID(); // make sure dangerous node removal is prevented int victim = (leaderID+1) % 2; redisReplyPtr reply = tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()).get(); ASSERT_ERR(reply, "ERR membership update blocked, new cluster would not have an up-to-date quorum"); // Try to remove a non-existent node ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", RaftServer("random_host", 123).toString()), "ERR random_host:123 is neither an observer nor a full node."); // Make sure we can remove the third node ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(2).toString()), "OK"); RaftMembership membership = journal(leaderID)->getMembership(); RETRY_ASSERT_EQ(journal(leaderID)->getCommitIndex(), membership.epoch); // Add it back as observer ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_ADD_OBSERVER", myself(2).toString()), "OK"); membership = journal(leaderID)->getMembership(); RETRY_ASSERT_EQ(journal(leaderID)->getCommitIndex(), membership.epoch); // Remove it again ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(2).toString()), "OK"); membership = journal(leaderID)->getMembership(); RETRY_ASSERT_EQ(journal(leaderID)->getCommitIndex(), membership.epoch); } TEST_F(Raft_e2e5, membership_updates_with_disruptions) { // let's get this party started spinup(0); spinup(1); spinup(2); spinup(3); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2, 3)); // throw node #4 out of the cluster int leaderID = getServerID(state(0)->getSnapshot()->leader); ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(4).toString()), "OK"); // wait until membership update has been committed RaftMembership membership = journal(leaderID)->getMembership(); ASSERT_GT(membership.epoch, 0u); ASSERT_EQ(membership.nodes.size(), 4u); RETRY_ASSERT_EQ(journal(leaderID)->getCommitIndex(), membership.epoch); // .. and now spinup node #4 :> Ensure it doesn't disrupt the current leader spinup(4); std::this_thread::sleep_for(heartbeatTracker()->getTimeouts().getHigh()*2); ASSERT_EQ(leaderID, getServerID(state(0)->getSnapshot()->leader)); // verify the cluster has not been disrupted ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID)); // remove one more node int victim = (leaderID+1) % 5; if(victim == 4) victim = 2; ASSERT_REPLY(tunnel(leaderID)->exec("RAFT_REMOVE_MEMBER", myself(victim).toString()), "OK"); std::this_thread::sleep_for(heartbeatTracker()->getTimeouts().getHigh()*2); // verify the cluster has not been disrupted ASSERT_EQ(state(leaderID)->getSnapshot()->leader, myself(leaderID)); // issue a bunch of writes and reads ASSERT_REPLY(tunnel(leaderID)->exec("set", "123", "abc"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("get", "123"), "abc"); } TEST_F(Raft_e2e, leader_steps_down_after_follower_loss) { // cluster with 2 nodes spinup(0); spinup(1); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1)); int leaderID = getLeaderID(); ASSERT_GE(leaderID, 0); ASSERT_LE(leaderID, 1); RaftTerm term = state(leaderID)->getSnapshot()->term; int followerID = (leaderID + 1)%2; ASSERT_TRUE(tunnel(followerID)->checkConnection(std::chrono::milliseconds(100))); spindown(followerID); ASSERT_FALSE(tunnel(followerID)->checkConnection(std::chrono::milliseconds(100))); RETRY_ASSERT_TRUE(term < state(leaderID)->getSnapshot()->term); ASSERT_TRUE(state(leaderID)->getSnapshot()->leader.empty()); } TEST_F(Raft_e2e, stale_reads) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); int follower = (getLeaderID() + 1) % 3; ASSERT_REPLY(tunnel(leaderID)->exec("set", "abc", "1234"), "OK"); ASSERT_REPLY(tunnel(follower)->exec("get", "abc"), SSTR("MOVED 0 " << myself(leaderID).toString())); ASSERT_REPLY(tunnel(follower)->exec("activate-stale-reads"), "OK"); redisReplyPtr reply = tunnel(follower)->exec("get", "abc").get(); qdb_info("Race-y read: " << std::string(reply->str, reply->len)); RETRY_ASSERT_TRUE(checkFullConsensus(0, 1, 2)); ASSERT_REPLY(tunnel(follower)->exec("get", "abc"), "1234"); } TEST_F(Raft_e2e, monitor) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); // Get connection ID redisReplyPtr connIDReply = tunnel(leaderID)->exec("client-id").get(); std::string connID(connIDReply->str, connIDReply->len); qdb_info("Connection ID: " << connID); // We can't use QClient for this, it can't handle the output of MONITOR qclient::HostResolver resolver(nullptr); qclient::Status st; std::vector endpoints = resolver.resolve("localhost", myself(leaderID).port, st); ASSERT_TRUE(st.ok()); ASSERT_GE(endpoints.size(), 1u); int ipv4 = 0; for(size_t i = 0; i < endpoints.size(); i++) { if(endpoints[i].getProtocolType() == qclient::ProtocolType::kIPv4) { ipv4 = i; break; } } qclient::AsyncConnector connector(endpoints[ipv4]); ASSERT_TRUE(connector.blockUntilReady()); ASSERT_TRUE(connector.ok()); Link link(connector.release()); BufferedReader reader(&link); ASSERT_EQ(link.Send(SSTR("*2\r\n$4\r\nAUTH\r\n$" << contactDetails()->getPassword().size() << "\r\n" << contactDetails()->getPassword() << "\r\n")), 56); std::string response; RETRY_ASSERT_TRUE(reader.consume(5, response)); ASSERT_EQ(response, "+OK\r\n"); ASSERT_EQ(link.Send("*1\r\n$7\r\nMONITOR\r\n"), 17); ASSERT_EQ(link.Send("random string"), 13); RETRY_ASSERT_TRUE(reader.consume(5, response)); ASSERT_EQ(response, "+OK\r\n"); tunnel(leaderID)->exec("set", "abc", "aaaa" "\xab" "bbb"); response.clear(); std::string expectedReply = SSTR("+localhost [" << connID << "]: \"set\" \"abc\" \"aaaa\\xABbbb\"\r\n"); RETRY_ASSERT_TRUE(reader.consume(expectedReply.size(), response)); ASSERT_EQ(response, expectedReply); tunnel(leaderID)->exec("get", "abc"); response.clear(); expectedReply = SSTR("+localhost [" << connID << "]: \"get\" \"abc\"\r\n"); RETRY_ASSERT_TRUE(reader.consume(expectedReply.size(), response)); ASSERT_EQ(response, expectedReply); } class PingCallback : qclient::QCallback { public: PingCallback(qclient::QClient &q) : qcl(q) { flag = prom.get_future(); qcl.execCB(this, "PING", SSTR(pingCounter)); } void finalize(bool result) { isOk = result; prom.set_value(); } virtual void handleResponse(redisReplyPtr &&reply) { if(!reply) finalize(false); if(reply->type != REDIS_REPLY_STRING) finalize(false); if(std::string(reply->str, reply->len) != SSTR(pingCounter)) finalize(false); qdb_info("Received successful ping response: " << pingCounter); pingCounter++; if(pingCounter == 5) return finalize(true); qcl.execCB(this, "PING", SSTR(pingCounter)); } bool ok() { return isOk; } void wait() { flag.get(); } private: size_t pingCounter = 0; std::promise prom; std::future flag; bool isOk = true; qclient::QClient &qcl; }; TEST_F(Raft_e2e, PingExtravaganza) { // A most efficient and sophisticated ping machinery. spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); PingCallback pinger(*tunnel(leaderID)); pinger.wait(); ASSERT_TRUE(pinger.ok()); } TEST_F(Raft_e2e, hincrbymulti) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "3", "h2", "h3", "4"), 7); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h1", "h2"), "3"); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h2", "h3"), "4"); ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "-5", "h2", "h3", "20", "h4", "h8"), "ERR wrong number of arguments for 'hincrbymulti' command"); ASSERT_REPLY(tunnel(leaderID)->exec("hincrbymulti", "h1", "h2", "-5", "h2", "h3", "20", "h4", "h8", "13"), 35); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h1", "h2"), "-2"); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h2", "h3"), "24"); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "h4", "h8"), "13"); } TEST_F(Raft_e2e, smove) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "set1", "i1", "i2", "i3", "i4", "i5"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "set2", "t1", "t2", "t3", "t4", "t5"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("set", "mykey", "myval"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "mykey", "i1"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(tunnel(leaderID)->exec("smove", "mykey", "set1", "i1"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "set2", "i1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6); ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set1"), make_vec("i2", "i3", "i4", "i5")); ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set2"), make_vec("i1", "t1", "t2", "t3", "t4", "t5")); ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "set2", "not-existing"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6); ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "set1", "i1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set1"), make_vec("i1", "i2", "i3", "i4", "i5")); ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set2"), make_vec("i1", "t1", "t2", "t3", "t4", "t5")); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6); ASSERT_REPLY(tunnel(leaderID)->exec("smove", "set1", "set2", "i1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set1"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("scard", "set2"), 6); ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set1"), make_vec("i2", "i3", "i4", "i5")); ASSERT_REPLY(tunnel(leaderID)->exec("smembers", "set2"), make_vec("i1", "t1", "t2", "t3", "t4", "t5")); ASSERT_REPLY(tunnel(leaderID)->exec("quarkdb-manual-compaction"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("raft-journal-manual-compaction"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("quarkdb-version"), VERSION_FULL_STRING); qdb_info(qclient::describeRedisReply(tunnel(leaderID)->exec("quarkdb-level-stats").get())); qdb_info(qclient::describeRedisReply(tunnel(leaderID)->exec("quarkdb-compression-stats").get())); } TEST_F(Raft_e2e, sscan) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); redisReplyPtr reply = tunnel(leaderID)->exec("sscan", "myset", "0", "asdf", "123").get(); ASSERT_ERR(reply, "ERR syntax error"); ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset", "a", "b", "c", "d", "e", "f", "g"), 7); reply = tunnel(leaderID)->exec("sscan", "myset", "0", "COUNT", "3").get(); ASSERT_REPLY(reply, std::make_pair("next:d", make_vec("a", "b", "c"))); reply = tunnel(leaderID)->exec("sscan", "myset", "next:d", "COUNT", "2").get(); ASSERT_REPLY(reply, std::make_pair("next:f", make_vec("d", "e"))); reply = tunnel(leaderID)->exec("sscan", "myset", "next:f", "COUNT", "2").get(); ASSERT_REPLY(reply, std::make_pair("0", make_vec("f", "g"))); reply = tunnel(leaderID)->exec("sscan", "myset", "next:zz").get(); ASSERT_REPLY(reply, std::make_pair("0", make_vec())); reply = tunnel(leaderID)->exec("sscan", "not-existing", "next:zz").get(); ASSERT_REPLY(reply, std::make_pair("0", make_vec())); QSet qset(*tunnel(leaderID), "myset"); auto pair = qset.sscan("0", 2); ASSERT_EQ(pair.first, "next:c"); ASSERT_EQ(pair.second, make_vec("a", "b")); pair = qset.sscan(pair.first, 2); ASSERT_EQ(pair.first, "next:e"); ASSERT_EQ(pair.second, make_vec("c", "d")); QSet qset2(*tunnel(leaderID), "not-existing"); pair = qset2.sscan("0", 2); ASSERT_EQ(pair.first, "0"); ASSERT_EQ(pair.second, make_vec()); } TEST_F(Raft_e2e, LocalityHash) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); // Insert new field. ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "ayy-lmao", "emptykey"), "v1"); // Update old field, no changes to locality hint. ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f1", "hint1", "v2", "fallback"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "ayy-lmao"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "ayy-lmao", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1); // Insert one more field. ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint2", "v3"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint2"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint2", "emptykey"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint1", "emptykey"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2); // Update locality hint of first field. ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint2", "v2"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint2", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2); // Update value and locality hint of second field. ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f2", "hint3", "v4", "fallback"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint3", "emptykey"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint1", "emptykey"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2); // Insert one more field. ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f3", "aaaaa", "v5"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "aaaaa"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "wrong-hint"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "emptykey"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "aaaaa", "emptykey"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "wrong-hint"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3); // Re-read everything. ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint3"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint1"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint2"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1", "hint1"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint3", "emptykey"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint1", "emptykey"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint2", "emptykey"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "hint1", "emptykey"), "v2"); // Delete key. ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey", "mykey"), 2); ASSERT_REPLY(tunnel(leaderID)->exec("del", "mykey"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("del", "mykey"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3", "aaaaa"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "aaaaa", "emptykey"), ""); // Recreate with five fields. ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1", "v1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint2", "v2"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f3", "hint3", "v3"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f4", "hint4", "v4"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f5", "hint5", "v5"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("exists", "mykey"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 5); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f2", "hint1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2", "hint2"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "hint2", "emptykey"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f2", "hint1"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f1", "f3"), 2); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 2); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f4"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f5"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f4", "emptykey"), "v4"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f5", "emptykey"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f4", "f4", "f4", "f4"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f4"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f4", "emptykey"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("get", "mykey"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f4"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f5", "hint5"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f5", "hint5", "emptykey"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel", "mykey", "f5"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f5", "hint5"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f5", "hint5", "emptykey"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v1", "ayy"), "ERR wrong number of arguments for 'lhmset' command"); ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a", "b", "c"), "ERR wrong number of arguments for 'lhmset' command"); ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a", "b"), "ERR wrong number of arguments for 'lhmset' command"); ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "a"), "ERR wrong number of arguments for 'lhmset' command"); ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v1"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f1", "hint1", "v2", "f1", "hint3", "v3"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhmset", "mykey", "f2", "hint2", "v5", "f3", "hint1", "v6"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f1"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f2"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f3"), "v6"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f1", "emptykey"), "v3"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f2", "emptykey"), "v5"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f3", "emptykey"), "v6"); // Test fallback ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("hset", "fb", "f9", "V"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("hset", "fb", "f8", "Z"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "V"); ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "mykey", "f9", "hint1", "VVV"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "VVV"); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 2); ASSERT_REPLY(tunnel(leaderID)->exec("lhset-and-del-fallback", "mykey", "f9", "hint", "ZZZ", "fb"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 4); ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f9"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f8"), "Z"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), "ZZZ"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "mykey", "f9"), "ZZZ"); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel-with-fallback", "mykey", "f9", "fb"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel-with-fallback", "mykey", "f9", "fb"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3); ASSERT_REPLY(tunnel(leaderID)->exec("lhget-with-fallback", "mykey", "f9", "fb"), ""); ASSERT_REPLY(tunnel(leaderID)->exec("lhdel-with-fallback", "mykey", "f8", "fb"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "mykey"), 3); ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "fb"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("hget", "fb", "f8"), ""); redisReplyPtr reply = tunnel(leaderID)->exec("raw-scan", "\x01", "count", "2000").get(); qdb_info(qclient::describeRedisReply(reply)); LogIndex lastApplied = stateMachine(leaderID)->getLastApplied(); std::string lastAppliedStr = qclient::describeRedisReply(qclient::ResponseBuilder::makeStr(intToBinaryString(lastApplied))); ASSERT_EQ( qclient::describeRedisReply(reply), SSTR( "1) \"!mykey\"\n" "2) \"e\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x03\"\n" "3) \"__clock\"\n" "4) \"\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\"\n" "5) \"__format\"\n" "6) \"0\"\n" "7) \"__in-bulkload\"\n" "8) \"FALSE\"\n" "9) \"__last-applied\"\n" "10) " << lastAppliedStr << "\n" "11) \"emykey##dhint1##f3\"\n" "12) \"v6\"\n" "13) \"emykey##dhint2##f2\"\n" "14) \"v5\"\n" "15) \"emykey##dhint3##f1\"\n" "16) \"v3\"\n" "17) \"emykey##if1\"\n" "18) \"hint3\"\n" "19) \"emykey##if2\"\n" "20) \"hint2\"\n" "21) \"emykey##if3\"\n" "22) \"hint1\"\n" ) ); // QLocalityHash::Iterator on empty key std::string errMsg; QLocalityHash::Iterator iter(tunnel(leaderID), "empty-key"); ASSERT_FALSE(iter.valid()); ASSERT_FALSE(iter.hasError(errMsg)); // QLocalityHash::Iterator on wrong type ASSERT_REPLY(tunnel(leaderID)->exec("set", "my-string", "aaaa"), "OK"); iter = QLocalityHash::Iterator(tunnel(leaderID), "my-string"); ASSERT_FALSE(iter.valid()); ASSERT_TRUE(iter.hasError(errMsg)); ASSERT_EQ(errMsg, "malformed server response to LHSCAN: (error) ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); // QLocalityHash::Iterator on correct type iter = QLocalityHash::Iterator(tunnel(leaderID), "mykey"); ASSERT_TRUE(iter.valid()); ASSERT_FALSE(iter.hasError(errMsg)); ASSERT_EQ(iter.getLocalityHint(), "hint1"); ASSERT_EQ(iter.getKey(), "f3"); ASSERT_EQ(iter.getValue(), "v6"); iter.next(); ASSERT_TRUE(iter.valid()); ASSERT_EQ(iter.getLocalityHint(), "hint2"); ASSERT_EQ(iter.getKey(), "f2"); ASSERT_EQ(iter.getValue(), "v5"); iter.next(); ASSERT_TRUE(iter.valid()); ASSERT_EQ(iter.getLocalityHint(), "hint3"); ASSERT_EQ(iter.getKey(), "f1"); ASSERT_EQ(iter.getValue(), "v3"); ASSERT_EQ(iter.requestsSoFar(), 1u); iter.next(); ASSERT_FALSE(iter.valid()); ASSERT_FALSE(iter.hasError(errMsg)); // QLocalityHash::Iterator as above, but with much smaller COUNT of 2 iter = QLocalityHash::Iterator(tunnel(leaderID), "mykey", 2u); ASSERT_TRUE(iter.valid()); ASSERT_FALSE(iter.hasError(errMsg)); ASSERT_EQ(iter.requestsSoFar(), 1u); ASSERT_EQ(iter.getLocalityHint(), "hint1"); ASSERT_EQ(iter.getKey(), "f3"); ASSERT_EQ(iter.getValue(), "v6"); iter.next(); ASSERT_EQ(iter.requestsSoFar(), 1u); ASSERT_EQ(iter.getLocalityHint(), "hint2"); ASSERT_EQ(iter.getKey(), "f2"); ASSERT_EQ(iter.getValue(), "v5"); iter.next(); ASSERT_EQ(iter.requestsSoFar(), 2u); ASSERT_EQ(iter.getLocalityHint(), "hint3"); ASSERT_EQ(iter.getKey(), "f1"); ASSERT_EQ(iter.getValue(), "v3"); iter.next(); ASSERT_EQ(iter.requestsSoFar(), 2u); ASSERT_FALSE(iter.valid()); ASSERT_FALSE(iter.hasError(errMsg)); std::vector> replies; replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0" )); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0", "COUNT", "2" )); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "next:hint3##f1", "COUNT", "2" )); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "next:hint3##", "COUNT", "5")); ASSERT_REPLY_DESCRIBE(replies[0], "1) \"0\"\n" "2) 1) \"hint1\"\n" " 2) \"f3\"\n" " 3) \"v6\"\n" " 4) \"hint2\"\n" " 5) \"f2\"\n" " 6) \"v5\"\n" " 7) \"hint3\"\n" " 8) \"f1\"\n" " 9) \"v3\"\n" ); ASSERT_REPLY_DESCRIBE(replies[1], "1) \"next:hint3##f1\"\n" "2) 1) \"hint1\"\n" " 2) \"f3\"\n" " 3) \"v6\"\n" " 4) \"hint2\"\n" " 5) \"f2\"\n" " 6) \"v5\"\n" ); ASSERT_REPLY_DESCRIBE(replies[2], "1) \"0\"\n" "2) 1) \"hint3\"\n" " 2) \"f1\"\n" " 3) \"v3\"\n" ); ASSERT_REPLY_DESCRIBE(replies[3], "1) \"0\"\n" "2) 1) \"hint3\"\n" " 2) \"f1\"\n" " 3) \"v3\"\n" ); // Now test with evil characters, too replies.clear(); replies.emplace_back(tunnel(leaderID)->exec("lhset", "my#key", "f#1", "hint#1", "v1")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "my#key", "f2", "hint2", "v2")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "my#key", "f#3", "hint3", "v3")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "my#key", "f#4", "hint#4", "v#4")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "my#key", "f#5##", "##hint5##", "v5")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "my#key", "0")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "my#key", "0", "COUNT", "2")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "my#key", "next:hint|#1##f#1", "COUNT", "2")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "my#key", "next:|#|#hint5|#|###f#5##")); size_t count = 0; ASSERT_REPLY(replies[count++], 1); ASSERT_REPLY(replies[count++], 1); ASSERT_REPLY(replies[count++], 1); ASSERT_REPLY(replies[count++], 1); ASSERT_REPLY(replies[count++], 1); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"0\"\n" "2) 1) \"hint2\"\n" " 2) \"f2\"\n" " 3) \"v2\"\n" " 4) \"hint3\"\n" " 5) \"f#3\"\n" " 6) \"v3\"\n" " 7) \"hint#1\"\n" " 8) \"f#1\"\n" " 9) \"v1\"\n" " 10) \"hint#4\"\n" " 11) \"f#4\"\n" " 12) \"v#4\"\n" " 13) \"##hint5##\"\n" " 14) \"f#5##\"\n" " 15) \"v5\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"next:hint|#1##f#1\"\n" "2) 1) \"hint2\"\n" " 2) \"f2\"\n" " 3) \"v2\"\n" " 4) \"hint3\"\n" " 5) \"f#3\"\n" " 6) \"v3\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"next:|#|#hint5|#|###f#5##\"\n" "2) 1) \"hint#1\"\n" " 2) \"f#1\"\n" " 3) \"v1\"\n" " 4) \"hint#4\"\n" " 5) \"f#4\"\n" " 6) \"v#4\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"0\"\n" "2) 1) \"##hint5##\"\n" " 2) \"f#5##\"\n" " 3) \"v5\"\n" ); } TEST_F(Raft_e2e, LHSCAN) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::vector> replies; replies.emplace_back(tunnel(leaderID)->exec("lhset", "mykey", "f1", "hint1111", "v1")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "mykey", "f2", "hint1112", "v2")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "mykey", "f3", "hint1212", "v3")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "mykey", "f4", "hint3212", "v4")); replies.emplace_back(tunnel(leaderID)->exec("lhset", "mykey", "f5", "hint1212", "v5")); for(size_t i = 0; i < replies.size(); i++) { ASSERT_REPLY(replies[i], 1); } replies.clear(); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0", "MATCHLOC", "hint1212")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0", "MATCHLOC", "hint1212", "COUNT", "1")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "next:hint1212##f5", "MATCHLOC", "hint1212", "COUNT", "1")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0", "MATCHLOC", "hint1*")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0", "MATCHLOC", "hint1*", "COUNT", "2")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "next:hint1212##f3", "MATCHLOC", "hint1*", "COUNT", "2")); replies.emplace_back(tunnel(leaderID)->exec("lhscan", "mykey", "0", "MATCHLOC", "hint3*", "COUNT", "2")); size_t count = 0; ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"0\"\n" "2) 1) \"hint1212\"\n" " 2) \"f3\"\n" " 3) \"v3\"\n" " 4) \"hint1212\"\n" " 5) \"f5\"\n" " 6) \"v5\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"next:hint1212##f5\"\n" "2) 1) \"hint1212\"\n" " 2) \"f3\"\n" " 3) \"v3\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"0\"\n" "2) 1) \"hint1212\"\n" " 2) \"f5\"\n" " 3) \"v5\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"0\"\n" "2) 1) \"hint1111\"\n" " 2) \"f1\"\n" " 3) \"v1\"\n" " 4) \"hint1112\"\n" " 5) \"f2\"\n" " 6) \"v2\"\n" " 7) \"hint1212\"\n" " 8) \"f3\"\n" " 9) \"v3\"\n" " 10) \"hint1212\"\n" " 11) \"f5\"\n" " 12) \"v5\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"next:hint1212##f3\"\n" "2) 1) \"hint1111\"\n" " 2) \"f1\"\n" " 3) \"v1\"\n" " 4) \"hint1112\"\n" " 5) \"f2\"\n" " 6) \"v2\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"0\"\n" "2) 1) \"hint1212\"\n" " 2) \"f3\"\n" " 3) \"v3\"\n" " 4) \"hint1212\"\n" " 5) \"f5\"\n" " 6) \"v5\"\n" ); ASSERT_REPLY_DESCRIBE(replies[count++], "1) \"0\"\n" "2) 1) \"hint3212\"\n" " 2) \"f4\"\n" " 3) \"v4\"\n" ); } TEST_F(Raft_e2e, lhlocdel) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); ASSERT_REPLY(tunnel(leaderID)->exec("lhset", "myhash", "f1", "h1", "v1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "myhash", "f1"), "v1"); // wrong locality hint, no deletion ASSERT_REPLY(tunnel(leaderID)->exec("lhlocdel", "myhash", "f1", "h2"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "myhash", "f1"), "v1"); // correct locality hint, bye bye ASSERT_REPLY(tunnel(leaderID)->exec("lhlocdel", "myhash", "f1", "h1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "myhash", "f1"), ""); } TEST_F(Raft_e2e, RawGetAllVersions) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset-for-raw-get", "s1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "myset-for-raw-get", "s2"), 1); redisReplyPtr reply = tunnel(leaderID)->exec("raw-get-all-versions", "cmyset-for-raw-get##s1").get(); qdb_info(qclient::describeRedisReply(reply)); ASSERT_EQ(reply->elements, 4u); ASSERT_EQ(std::string(reply->element[0]->str, reply->element[0]->len), "KEY: cmyset-for-raw-get##s1"); ASSERT_EQ(std::string(reply->element[1]->str, reply->element[1]->len), "VALUE: 1"); // Ignore sequence number ASSERT_EQ(std::string(reply->element[3]->str, reply->element[3]->len), "TYPE: 1"); reply = tunnel(leaderID)->exec("raw-get-all-versions", "!myset-for-raw-get").get(); ASSERT_EQ(reply->elements, 8u); qdb_info(qclient::describeRedisReply(reply)); ASSERT_EQ(std::string(reply->element[0]->str, reply->element[0]->len), "KEY: !myset-for-raw-get"); ASSERT_EQ(std::string(reply->element[1]->str, reply->element[1]->len), SSTR("VALUE: c" << intToBinaryString(2))); ASSERT_EQ(std::string(reply->element[3]->str, reply->element[3]->len), "TYPE: 1"); ASSERT_EQ(std::string(reply->element[4]->str, reply->element[4]->len), "KEY: !myset-for-raw-get"); ASSERT_EQ(std::string(reply->element[5]->str, reply->element[5]->len), SSTR("VALUE: c" << intToBinaryString(1))); ASSERT_EQ(std::string(reply->element[7]->str, reply->element[7]->len), "TYPE: 1"); } TEST_F(Raft_e2e, ConvertHashToLHash) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", "f1", "v1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f1", "lhash", "f1", "hint"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "lhash", "f1", "hint"), "v1"); ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f1", "lhash", "f1", "hint"), "ERR Destination field already exists!"); ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f2", "lhash", "f2", "hint"), "ERR NotFound: "); ASSERT_REPLY(tunnel(leaderID)->exec("hset", "hash", "f2", "v2"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("convert-hash-field-to-lhash", "hash", "f2", "lhash", "f2", "hint"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("lhget", "lhash", "f2", "hint"), "v2"); ASSERT_REPLY(tunnel(leaderID)->exec("hlen", "hash"), 0); ASSERT_REPLY(tunnel(leaderID)->exec("lhlen", "lhash"), 2); } TEST_F(Raft_e2e, InconsistentIteratorsTest) { // Try to trigger "inconsistent iterators" condition spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::vector> futs; for(size_t i = 0; i < 100; i++) { futs.emplace_back(tunnel(leaderID)->exec("hset", "hash", SSTR("f" << i), SSTR("v" << i))); } std::future delReply = tunnel(leaderID)->exec("del", "hash"); for(size_t i = 0; i < 100; i++) { ASSERT_REPLY(futs[i], 1); } ASSERT_REPLY(delReply, 1); } TEST_F(Raft_e2e, CloneHash) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::vector> replies; for(size_t i = 0; i < 10; i++) { replies.emplace_back(tunnel(leaderID)->exec("HSET", "hash", SSTR("f" << i), SSTR("v" << i))); } for(size_t i = 0; i < 10; i++) { ASSERT_REPLY(replies[i], 1); } ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "hash", "hash2"), "OK"); redisReplyPtr hgetall = tunnel(leaderID)->exec("hgetall", "hash2").get(); ASSERT_EQ( qclient::describeRedisReply(hgetall), "1) \"f0\"\n" "2) \"v0\"\n" "3) \"f1\"\n" "4) \"v1\"\n" "5) \"f2\"\n" "6) \"v2\"\n" "7) \"f3\"\n" "8) \"v3\"\n" "9) \"f4\"\n" "10) \"v4\"\n" "11) \"f5\"\n" "12) \"v5\"\n" "13) \"f6\"\n" "14) \"v6\"\n" "15) \"f7\"\n" "16) \"v7\"\n" "17) \"f8\"\n" "18) \"v8\"\n" "19) \"f9\"\n" "20) \"v9\"\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "hash", "hash2"), "ERR Invalid argument: ERR target key already exists, will not overwrite"); ASSERT_REPLY(tunnel(leaderID)->exec("sadd", "my-set", "s1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "my-set", "hash3"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "hash", "my-set"), "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "not-existing", "hash"), "ERR Invalid argument: ERR target key already exists, will not overwrite"); ASSERT_REPLY(tunnel(leaderID)->exec("hclone", "not-existing", "not-existing-2"), "OK"); ASSERT_REPLY(tunnel(leaderID)->exec("exists", "not-existing", "not-existing-2"), 0); } bool lookForSentinelValues(qclient::MessageQueue *queue) { bool penguinsFound = false; bool chickensFound = false; auto iterator = queue->begin(); for(size_t i = 0; i < queue->size(); i++) { Message& item = iterator.item(); if(item.getPayload() == "penguins") { penguinsFound = true; } if(item.getPayload() == "chickens") { chickensFound = true; } iterator.next(); } return penguinsFound && chickensFound; } bool lookForTurtles(qclient::MessageQueue *queue) { bool turtlesFound = false; auto iterator = queue->begin(); for(size_t i = 0; i < queue->size(); i++) { Message& item = iterator.item(); if(item.getMessageType() == MessageType::kPatternMessage && item.getPattern() == "abc-*" && item.getChannel() == "abc-cde" && item.getPayload() == "turtles") { turtlesFound = true; } iterator.next(); } return turtlesFound; } TEST_F(Raft_e2e, pubsub) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::shared_ptr mq = std::make_shared(); qclient::BaseSubscriber subscriber(members(), mq, reasonableSubscriptionOptions()); ASSERT_REPLY(tunnel(leaderID)->exec("publish", "test-channel", "giraffes"), 0); subscriber.subscribe( {"test-channel"} ); RETRY_ASSERT_TRUE( qclient::describeRedisReply(tunnel(leaderID)->exec("publish", "test-channel", "penguins").get()) == "(integer) 1" ); spindown(0); spindown(1); spindown(2); spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); leaderID = getLeaderID(); // Ensure subscriber is able to re-subscribe! RETRY_ASSERT_TRUE( qclient::describeRedisReply(tunnel(leaderID)->exec("publish", "test-channel", "chickens").get()) == "(integer) 1" ); RETRY_ASSERT_TRUE(lookForSentinelValues(mq.get())); // now subscribe to a pattern subscriber.psubscribe( {"abc-*" } ); RETRY_ASSERT_TRUE( qclient::describeRedisReply(tunnel(leaderID)->exec("publish", "abc-cde", "turtles").get()) == "(integer) 1" ); RETRY_ASSERT_TRUE(lookForTurtles(mq.get())); mq->clear(); subscriber.unsubscribe( {"test-channel"} ); subscriber.punsubscribe( {"abc-*"} ); Message* item = mq->begin().getItemBlockOrNull(); ASSERT_NE(item, nullptr); ASSERT_EQ(item->getMessageType(), MessageType::kUnsubscribe); ASSERT_EQ(item->getChannel(), "test-channel"); ASSERT_EQ(item->getActiveSubscriptions(), 1); mq->pop_front(); item = mq->begin().getItemBlockOrNull(); ASSERT_NE(item, nullptr); ASSERT_EQ(item->getMessageType(), MessageType::kPatternUnsubscribe); ASSERT_EQ(item->getPattern(), "abc-*"); ASSERT_EQ(item->getActiveSubscriptions(), 0); mq->pop_front(); ASSERT_EQ(mq->size(), 0u); } class ReconnectionCounter : public ReconnectionListener { public: virtual void notifyConnectionLost(int64_t epoch, int errc, const std::string &msg) override { } virtual void notifyConnectionEstablished(int64_t epoch) override { lastEpoch = epoch; } int64_t getEpoch() const { return lastEpoch; } private: int64_t lastEpoch = 0u; }; TEST_F(Raft_e2e, MultiSubscribeWithPushtypes) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::shared_ptr listener = std::make_shared(); tunnel(leaderID)->attachListener(listener.get()); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("activate-push-types").get(), "OK"); ASSERT_EQ(listener->getEpoch(), 1u); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("subscribe", "a", "b").get(), "OK"); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("ping").get(), "PONG"); ASSERT_EQ(listener->getEpoch(), 1u); ASSERT_TRUE(tunnel(leaderID)->detachListener(listener.get())); listener.reset(); } TEST_F(Raft_e2e, SharedDeque) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); qclient::SubscriptionOptions subopts; subopts.handshake = makeQClientHandshake(); qclient::SharedManager sm(members(), std::move(subopts)); subopts.handshake = makeQClientHandshake(); qclient::SharedManager sm2(members(), std::move(subopts)); qclient::SharedDeque deque1(&sm, "shared-deque"); qclient::SharedDeque deque2(&sm2, "shared-deque"); size_t sz = 0u; RETRY_ASSERT_TRUE(deque1.size(sz)); ASSERT_EQ(sz, 0u); RETRY_ASSERT_TRUE(deque2.size(sz)); ASSERT_EQ(sz, 0u); qclient::Status st = deque2.push_back("turtles"); ASSERT_TRUE(st); RETRY_ASSERT_TRUE(deque1.size(sz) && sz == 1); } TEST_F(Raft_e2e, TransientSharedHash) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); qclient::SubscriptionOptions subopts; subopts.handshake = makeQClientHandshake(); qclient::SharedManager sm(members(), std::move(subopts)); subopts.handshake = makeQClientHandshake(); qclient::SharedManager sm2(members(), std::move(subopts)); std::unique_ptr hash1 = sm.makeTransientSharedHash("hash1"); std::unique_ptr hash2 = sm2.makeTransientSharedHash("hash1"); std::map batch; batch["aaa"] = "bbb"; batch["test"] = "meow"; std::string val1; std::string val2; while(true) { hash1->set(batch); if(hash2->get("aaa", val1)) { ASSERT_TRUE(hash2->get("test", val2)); break; } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } ASSERT_EQ(val1, "bbb"); ASSERT_EQ(val2, "meow"); } TEST_F(Raft_e2e, Subscriber) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); qclient::Subscriber subscriber(members(), reasonableSubscriptionOptions()); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("publish", "test-channel", "giraffes").get(), "(integer) 0"); std::unique_ptr subscription = subscriber.subscribe("test-channel"); ASSERT_TRUE(subscription->empty()); RETRY_ASSERT_TRUE(subscription->acknowledged()); ASSERT_EQ(qclient::describeRedisReply(tunnel(leaderID)->exec("publish", "test-channel", "giraffes").get()), "(integer) 1"); while(true) { tunnel(leaderID)->exec("publish", "test-channel", "giraffes"); if(!subscription->empty()) break; } ASSERT_FALSE(subscription->empty()); qclient::Message msg; ASSERT_TRUE(subscription->front(msg)); ASSERT_EQ(msg, qclient::Message::createMessage("test-channel", "giraffes")); } TEST_F(Raft_e2e, vhset) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); qclient::SubscriptionOptions opts; opts.handshake = makeQClientHandshake(); qclient::Subscriber subscriber(members(), std::move(opts)); std::unique_ptr subscription = subscriber.subscribe(SSTR("__vhash@" << "key-1")); ASSERT_TRUE(subscription->empty()); RETRY_ASSERT_TRUE(subscription->acknowledged()); std::vector> replies; replies.emplace_back(tunnel(leaderID)->exec("set", "key-0", "val")); replies.emplace_back(tunnel(leaderID)->exec("vhset", "key-0", "f1", "v1")); replies.emplace_back(tunnel(leaderID)->exec("vhset", "key-1", "f1", "v1")); ASSERT_REPLY(replies[0], "OK"); ASSERT_REPLY(replies[1], "ERR Invalid argument: WRONGTYPE Operation against a key holding the wrong kind of value"); ASSERT_REPLY(replies[2], 1); qclient::Message msg; RETRY_ASSERT_EQ(subscription->size(), 1u); ASSERT_TRUE(subscription->front(msg)); ASSERT_EQ( qclient::describeRedisReply(msg.getPayload()), "1) (integer) 1\n" "2) 1) \"f1\"\n" " 2) \"v1\"\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("vhset", "key-1", "f2", "v2"), 2); ASSERT_REPLY(tunnel(leaderID)->exec("vhset", "key-1", "f3", "v3"), 3); ASSERT_REPLY(tunnel(leaderID)->exec("vhset", "key-1", "f4", "v4"), 4); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 4\n" "2) 1) \"f1\"\n" " 2) \"v1\"\n" " 3) \"f2\"\n" " 4) \"v2\"\n" " 5) \"f3\"\n" " 6) \"v3\"\n" " 7) \"f4\"\n" " 8) \"v4\"\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f3"), 5); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 5\n" "2) 1) \"f1\"\n" " 2) \"v1\"\n" " 3) \"f2\"\n" " 4) \"v2\"\n" " 5) \"f4\"\n" " 6) \"v4\"\n" ); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhkeys", "key-1").get(), "1) (integer) 5\n" "2) 1) \"f1\"\n" " 2) \"f2\"\n" " 3) \"f4\"\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 3); ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f1"), 6); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 6\n" "2) 1) \"f2\"\n" " 2) \"v2\"\n" " 3) \"f4\"\n" " 4) \"v4\"\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 2); ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f4"), 7); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 7\n" "2) 1) \"f2\"\n" " 2) \"v2\"\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "not-existing"), 7); ASSERT_REPLY(tunnel(leaderID)->exec("vhlen", "key-1"), 1); ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f2"), 8); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 8\n" "2) (empty list or set)\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("vhdel", "key-1", "f2"), 8); ASSERT_REPLY(tunnel(leaderID)->exec("vhset", "key-1", "f3", "v3"), 9); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 9\n" "2) 1) \"f3\"\n" " 2) \"v3\"\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("del", "key-1"), 1); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 0\n" "2) (empty list or set)\n" ); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhkeys", "key-1").get(), "1) (integer) 0\n" "2) (empty list or set)\n" ); ASSERT_REPLY(tunnel(leaderID)->exec("vhset", "key-1", "f9", "v9"), 1); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhgetall", "key-1").get(), "1) (integer) 1\n" "2) 1) \"f9\"\n" " 2) \"v9\"\n" ); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhkeys", "key-1").get(), "1) (integer) 1\n" "2) 1) \"f9\"\n" ); } TEST_F(Raft_e2e, JournalScanning) { for(size_t i = 1; i <= 5; i ++) { RaftEntry entry(0, {"set", SSTR("k" << i), SSTR("v" << i) } ); ASSERT_TRUE(journal(0)->append(i, entry)); ASSERT_TRUE(journal(1)->append(i, entry)); ASSERT_TRUE(journal(2)->append(i, entry)); } std::vector entries; LogIndex cursor; ASSERT_OK(journal(0)->scanContents(1, 3, "", entries, cursor)); ASSERT_EQ(entries.size(), 3u); ASSERT_EQ(cursor, 4); for(size_t i = 1; i <= 3; i ++) { RaftEntry entry(0, {"set", SSTR("k" << i), SSTR("v" << i) } ); ASSERT_EQ(entries[i-1].entry, entry); ASSERT_EQ(entries[i-1].index, (int) i); } ASSERT_OK(journal(0)->scanContents(0, 300, "*k2*", entries, cursor)); ASSERT_EQ(entries.size(), 1u); ASSERT_EQ(cursor, 0); RaftEntry entry(0, {"set", "k2", "v2"}); ASSERT_EQ(entries[0].entry, entry); ASSERT_EQ(entries[0].index, 2); spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); ASSERT_REPLY_DESCRIBE(tunnel(0)->exec("raft-journal-scan", "next:1", "COUNT", "2").get(), "1) \"next:3\"\n" "2) 1) 1) \"INDEX: 1\"\n" " 2) \"TERM: 0\"\n" " 3) 1) \"set\"\n" " 2) \"k1\"\n" " 3) \"v1\"\n" " 2) 1) \"INDEX: 2\"\n" " 2) \"TERM: 0\"\n" " 3) 1) \"set\"\n" " 2) \"k2\"\n" " 3) \"v2\"\n" ); ASSERT_REPLY_DESCRIBE(tunnel(0)->exec("raft-journal-scan").get(), "(error) ERR wrong number of arguments for 'raft-journal-scan' command"); } TEST_F(Raft_e2e, PushTypes) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::shared_ptr mq = std::make_shared(); qclient::Options opts; opts.handshake = makeQClientHandshake(); opts.chainHandshake(std::unique_ptr(new qclient::ActivatePushTypesHandshake())); opts.transparentRedirects = true; opts.messageListener = mq; opts.exclusivePubsub = false; qclient::QClient qcl(myself(leaderID).hostname, myself(leaderID).port, std::move(opts)); std::vector> replies; replies.emplace_back(qcl.exec("SET", "aaa", "bbb")); replies.emplace_back(qcl.exec("SUBSCRIBE", "__vhash@key-1")); replies.emplace_back(qcl.exec("GET", "aaa")); ASSERT_REPLY_DESCRIBE(replies[0].get(), "OK"); ASSERT_REPLY_DESCRIBE(replies[1].get(), "OK"); ASSERT_REPLY_DESCRIBE(replies[2].get(), "\"bbb\""); replies.emplace_back(qcl.exec("VHSET", "key-1", "field-1", "v999")); ASSERT_REPLY_DESCRIBE(replies[3].get(), "(integer) 1"); RETRY_ASSERT_EQ(mq->size(), 2u); auto it = mq->begin(); Message *msg = it.getItemBlockOrNull(); ASSERT_NE(msg, nullptr); ASSERT_EQ(msg->getMessageType(), MessageType::kSubscribe); ASSERT_EQ(msg->getChannel(), "__vhash@key-1"); ASSERT_EQ(msg->getActiveSubscriptions(), 1); it.next(); msg = it.getItemBlockOrNull(); ASSERT_NE(msg, nullptr); ASSERT_EQ(msg->getMessageType(), MessageType::kMessage); ASSERT_EQ(msg->getChannel(), "__vhash@key-1"); ASSERT_EQ(qclient::describeRedisReply(msg->getPayload()), "1) (integer) 1\n" "2) 1) \"field-1\"\n" " 2) \"v999\"\n" ); } TEST_F(Raft_e2e, SharedHash) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); qdb_info("LeaderID: " << leaderID); qclient::SubscriptionOptions subopts; subopts.handshake = makeQClientHandshake(); subopts.retryStrategy = qclient::RetryStrategy::WithTimeout( std::chrono::minutes(2)); qclient::SharedManager sm(members(), std::move(subopts)); subopts.handshake = makeQClientHandshake(); qclient::SharedManager sm2(members(), std::move(subopts)); qclient::SharedHash hash3(&sm2, "my-shared-hash"); std::unique_ptr subscription = hash3.subscribe(); qclient::SharedHash hash1(&sm, "my-shared-hash"); qclient::SharedHash hash2(&sm2, "my-shared-hash"); ASSERT_REPLY_DESCRIBE(sm.getQClient()->exec("PING").get(), "PONG"); ASSERT_REPLY_DESCRIBE(sm2.getQClient()->exec("PING").get(), "PONG"); RETRY_ASSERT_EQ(hash1.getPersistentRevision(), 0u); qclient::UpdateBatch batch; batch.setDurable("durable value", "123"); batch.setTransient("transient value", "345"); batch.setLocal("local value", "999"); hash1.set(batch).get(); RETRY_ASSERT_EQ(hash1.getPersistentRevision(), 1u); std::set exp_keys {"durable value", "transient value", "local value"}; auto keys = hash1.getKeys(); for (const auto& elem: keys) { ASSERT_TRUE(exp_keys.find(elem) != exp_keys.end()); } std::string tmp; RETRY_ASSERT_TRUE(hash1.get("durable value", tmp)); ASSERT_EQ(tmp, "123"); RETRY_ASSERT_TRUE(hash1.get("transient value", tmp)); ASSERT_EQ(tmp, "345"); ASSERT_TRUE(hash1.get("local value", tmp)); ASSERT_EQ(tmp, "999"); RETRY_ASSERT_TRUE(hash2.get("durable value", tmp)); ASSERT_EQ(tmp, "123"); RETRY_ASSERT_TRUE(hash2.get("transient value", tmp)); ASSERT_EQ(tmp, "345"); ASSERT_FALSE(hash2.get("local value", tmp)); RETRY_ASSERT_EQ(subscription->size(), 2u); qclient::SharedHashUpdate update; ASSERT_TRUE(subscription->front(update)); ASSERT_EQ(update.key, "transient value"); ASSERT_EQ(update.value, "345"); subscription->pop_front(); ASSERT_TRUE(subscription->front(update)); ASSERT_EQ(update.key, "durable value"); ASSERT_EQ(update.value, "123"); } TEST_F(Raft_e2e, PersistentSharedHash) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("vhset", "my-shared-hash", "k1", "v1").get(), "(integer) 1"); qclient::SubscriptionOptions subopts; subopts.handshake = makeQClientHandshake(); qclient::SharedManager sm(members(), std::move(subopts)); subopts.handshake = makeQClientHandshake(); qclient::SharedManager sm2(members(), std::move(subopts)); qclient::PersistentSharedHash hash1(&sm, "my-shared-hash"); RETRY_ASSERT_EQ(hash1.getCurrentVersion(), 1u); hash1.set("k2", "v2"); RETRY_ASSERT_EQ(hash1.getCurrentVersion(), 2u); std::string tmp; ASSERT_TRUE(hash1.get("k2", tmp)); ASSERT_EQ(tmp, "v2"); qclient::PersistentSharedHash hash2(&sm, "my-shared-hash"); RETRY_ASSERT_EQ(hash2.getCurrentVersion(), 2u); ASSERT_TRUE(hash2.get("k2", tmp)); ASSERT_EQ(tmp, "v2"); ASSERT_TRUE(hash2.get("k1", tmp)); ASSERT_EQ(tmp, "v1"); } TEST_F(Raft_e2e, Communicator) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); qclient::Subscriber subscriber1(members(), reasonableSubscriptionOptions(true)); qclient::Subscriber subscriber2(members(), reasonableSubscriptionOptions(true)); Communicator communicator(&subscriber1, "comm-channel", nullptr, std::chrono::milliseconds(1), std::chrono::seconds(60)); CommunicatorListener communicatorListener(&subscriber2, "comm-channel"); std::string reqID; std::future fut = communicator.issue("i-like-trains", reqID); RETRY_ASSERT_EQ(communicatorListener.size(), 1u); qclient::CommunicatorRequest req = communicatorListener.front(); ASSERT_EQ(req.getID(), reqID); ASSERT_EQ(req.getContents(), "i-like-trains"); req.sendReply(888, "aaaaa"); ASSERT_EQ(fut.wait_for(std::chrono::seconds(3)), std::future_status::ready); CommunicatorReply reply = fut.get(); ASSERT_EQ(reply.status, 888); ASSERT_EQ(reply.contents, "aaaaa"); } TEST_F(Raft_e2e, NoAuth) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); // de-auth ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("auth", "aaaa").get(), "(error) ERR invalid password"); // try command with too many arguments for un-authenticated clients ASSERT_EQ(tunnel(leaderID)->exec("1", "2", "3", "4", "5", "6", "7", "8", "9", "10").get(), nullptr); // re-connect, re-auth ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("ping").get(), "PONG"); // try command again ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("1", "2", "3", "4", "5", "6", "7", "8", "9", "10").get(), "(error) ERR unknown command '1'"); // de-auth ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("auth", "aaaa").get(), "(error) ERR invalid password"); // try command with huge payload, 1 MB std::string payload(1048577, 'a'); ASSERT_EQ(tunnel(leaderID)->exec("SET", "key", payload).get(), nullptr); // re-connect, re-auth ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("ping").get(), "PONG"); // ensure huge payload now succeeds ASSERT_REPLY_DESCRIBE(tunnel(leaderID)->exec("SET", "key", payload).get(), "OK"); }