// ---------------------------------------------------------------------- // File: multi.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "Dispatcher.hh" #include "redis/Transaction.hh" #include "raft/RaftContactDetails.hh" #include "test-utils.hh" #include "test-reply-macros.hh" #include #include using namespace quarkdb; using namespace qclient; class Multi : public TestCluster3NodesFixture {}; TEST_F(Multi, Dispatching) { RedisDispatcher dispatcher(*stateMachine(), *publisher()); Transaction tx1; tx1.emplace_back("GET", "aaa"); tx1.emplace_back("SET", "aaa", "bbb"); tx1.emplace_back("GET", "aaa"); tx1.setPhantom(false); RedisEncodedResponse resp = dispatcher.dispatch(tx1, 1); ASSERT_EQ(resp.val, "*3\r\n$-1\r\n+OK\r\n$3\r\nbbb\r\n"); RedisRequest req = {"GET", "aaa"}; resp = dispatcher.dispatch(req, 0); ASSERT_EQ(resp.val, "$3\r\nbbb\r\n"); } TEST_F(Multi, HandlerBasicSanity) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); int leaderID = getLeaderID(); std::vector> replies; replies.push_back(tunnel(leaderID)->exec("MULTI")); replies.push_back(tunnel(leaderID)->exec("SET", "key", "value")); replies.push_back(tunnel(leaderID)->exec("SET", "key-2", "val-2")); ASSERT_REPLY(replies[0], "OK"); ASSERT_REPLY(replies[1], "QUEUED"); ASSERT_REPLY(replies[2], "QUEUED"); // No dirty reads QClient connection2(myself(leaderID).hostname, myself(leaderID).port, makeNoRedirectOptions()); ASSERT_REPLY(connection2.exec("GET", "key"), ""); redisReplyPtr reply = tunnel(leaderID)->exec("EXEC").get(); ASSERT_EQ(qclient::describeRedisReply(reply), "1) OK\n2) OK\n"); ASSERT_REPLY(connection2.exec("GET", "key"), "value"); // Empty multi/exec block replies.clear(); replies.push_back(tunnel(leaderID)->exec("MULTI")); replies.push_back(tunnel(leaderID)->exec("EXEC")); ASSERT_REPLY(replies[0], "OK"); reply = replies[1].get(); ASSERT_EQ(reply->type, REDIS_REPLY_ARRAY); ASSERT_EQ(reply->elements, 0u); // No double MULTI replies.clear(); replies.push_back(tunnel(leaderID)->exec("MULTI")); replies.push_back(tunnel(leaderID)->exec("MULTI", "aaaa")); replies.push_back(tunnel(leaderID)->exec("MULTI")); replies.push_back(tunnel(leaderID)->exec("SET", "counter", "1")); replies.push_back(tunnel(leaderID)->exec("HSET", "myhash", "f1", "v1")); replies.push_back(tunnel(leaderID)->exec("EXEC")); ASSERT_REPLY(replies[0], "OK"); ASSERT_REPLY(replies[1], "ERR wrong number of arguments for 'MULTI' command"); ASSERT_REPLY(replies[2], "ERR MULTI calls can not be nested"); ASSERT_REPLY(replies[3], "QUEUED"); ASSERT_REPLY(replies[4], "QUEUED"); reply = replies[5].get(); ASSERT_EQ(qclient::describeRedisReply(reply), "1) OK\n2) (integer) 1\n"); // Discard without multi ASSERT_REPLY(tunnel(leaderID)->exec("DISCARD"), "ERR DISCARD without MULTI"); // Discard replies.clear(); replies.push_back(tunnel(leaderID)->exec("MULTI")); replies.push_back(tunnel(leaderID)->exec("HSET", "myhash", "f1", "v2")); replies.push_back(tunnel(leaderID)->exec("DISCARD")); replies.push_back(tunnel(leaderID)->exec("HGET", "myhash", "f1")); ASSERT_REPLY(replies[0], "OK"); ASSERT_REPLY(replies[1], "QUEUED"); ASSERT_REPLY(replies[2], "OK"); ASSERT_REPLY(replies[3], "v1"); // EXEC without MULTI replies.clear(); ASSERT_REPLY(tunnel(leaderID)->exec("EXEC"), "ERR EXEC without MULTI"); ASSERT_REPLY(tunnel(leaderID)->exec("HGET", "myhash", "f1"), "v1"); // Read MULTI-EXEC right after a write replies.clear(); replies.push_back(tunnel(leaderID)->exec("MULTI")); replies.push_back(tunnel(leaderID)->exec("SET", "abc", "123")); replies.push_back(tunnel(leaderID)->exec("EXEC")); ASSERT_REPLY(replies[0], "OK"); ASSERT_REPLY(replies[1], "QUEUED"); ASSERT_REPLY_DESCRIBE(replies[2], "1) OK\n"); replies.clear(); replies.push_back(tunnel(leaderID)->exec("MULTI")); replies.push_back(tunnel(leaderID)->exec("GET", "abc")); replies.push_back(tunnel(leaderID)->exec("EXEC")); ASSERT_REPLY(replies[0], "OK"); ASSERT_REPLY(replies[1], "QUEUED"); ASSERT_REPLY_DESCRIBE(replies[2], "1) \"123\"\n"); } TEST_F(Multi, WithRaft) { spinup(0); spinup(1); spinup(2); RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2)); Transaction write; write.emplace_back("SET", "aaa", "bbb"); write.emplace_back("SET", "bbb", "ccc"); ASSERT_EQ(write.getFusedCommand(), "TX_READWRITE"); int leaderID = getLeaderID(); redisReplyPtr reply = tunnel(leaderID)->exec( write.getFusedCommand(), write.serialize(), "real" ).get(); ASSERT_EQ(qclient::describeRedisReply(reply), "1) OK\n" "2) OK\n" ); write.clear(); write.emplace_back("SET", "bbb", "ddd"); write.emplace_back("GET", "aaa"); ASSERT_EQ(write.getFusedCommand(), "TX_READWRITE"); reply = tunnel(leaderID)->exec( write.getFusedCommand(), write.serialize(), "real" ).get(); ASSERT_EQ(qclient::describeRedisReply(reply), "1) OK\n" "2) \"bbb\"\n" ); Transaction read; read.emplace_back("GET", "aaa"); read.emplace_back("GET", "bbb"); ASSERT_FALSE(read.containsWrites()); ASSERT_EQ(read.getFusedCommand(), "TX_READONLY"); reply = tunnel(leaderID)->exec( read.getFusedCommand(), read.serialize(), "real" ).get(); ASSERT_EQ(qclient::describeRedisReply(reply), "1) \"bbb\"\n" "2) \"ddd\"\n" ); }