// ---------------------------------------------------------------------- // File: qclient.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "test-utils.hh" #include "test-reply-macros.hh" #include "Link.hh" #include "RedisParser.hh" #include #include #include using namespace quarkdb; using namespace qclient; static void assert_receive(int fd, const std::string &contents) { char buffer[contents.size()]; int len = recv(fd, buffer, contents.size(), 0); EXPECT_EQ(len, (int) contents.size()); ASSERT_EQ(std::string(buffer, len), contents); } static void socket_send(int fd, const std::string &contents) { ASSERT_GT(send(fd, contents.c_str(), contents.size(), 0), 0); } static std::string str_from_reply(redisReplyPtr &reply) { return std::string(reply->str, reply->len); } TEST(Tunnel, T1) { qclient::Options opts; opts.ensureConnectionIsPrimed = false; QClient tunnel("localhost", 1234, std::move(opts) ); RedisRequest req { "set", "abc", "123" }; std::future fut = tunnel.execute(req); ASSERT_EQ(fut.get(), nullptr); SocketListener listener(1234); int s2 = listener.accept(); ASSERT_GT(s2, 0); // connected fut = tunnel.execute(req); assert_receive(s2, "*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\n123\r\n"); socket_send(s2, "+OK\r\n"); redisReplyPtr reply = fut.get(); ASSERT_EQ(reply->type, REDIS_REPLY_STATUS); ASSERT_EQ(str_from_reply(reply), "OK"); req = { "get", "abc" }; fut = tunnel.execute(req); req = { "get", "qwerty" }; std::future fut2 = tunnel.execute(req); assert_receive(s2, "*2\r\n$3\r\nget\r\n$3\r\nabc\r\n"); assert_receive(s2, "*2\r\n$3\r\nget\r\n$6\r\nqwerty\r\n"); socket_send(s2, "$-1\r\n"); socket_send(s2, "$7\r\n1234567\r\n"); reply = fut.get(); ASSERT_EQ(reply->type, REDIS_REPLY_NIL); reply = fut2.get(); ASSERT_EQ(reply->type, REDIS_REPLY_STRING); ASSERT_EQ(str_from_reply(reply), "1234567"); close(s2); } TEST(QClient, T2) { class SimpleHandshake : public qclient::Handshake { public: std::vector provideHandshake() override { return {"RAFT_HANDSHAKE", "some-cluster-id"}; } Status validateResponse(const redisReplyPtr &reply) override { return Status::VALID_COMPLETE; } virtual void restart() override { } virtual std::unique_ptr clone() const override { return std::unique_ptr(new SimpleHandshake()); } }; // with handshake qclient::Options options; options.handshake.reset(new SimpleHandshake()); QClient tunnel("localhost", 1234, std::move(options)); RedisRequest req { "set", "abc", "123" }; std::future fut = tunnel.execute(req); ASSERT_EQ(fut.get(), nullptr); SocketListener listener(1234); int s2 = listener.accept(); ASSERT_GT(s2, 0); // connected fut = tunnel.execute(req); assert_receive(s2, "*2\r\n$14\r\nRAFT_HANDSHAKE\r\n$15\r\nsome-cluster-id\r\n"); socket_send(s2, "+OK\r\n"); assert_receive(s2, "*3\r\n$3\r\nset\r\n$3\r\nabc\r\n$3\r\n123\r\n"); socket_send(s2, "+OK\r\n"); close(s2); } TEST(QClient, T3) { class PingHandshake : public qclient::Handshake { public: std::vector provideHandshake() override { if(count >= 10) { qdb_throw("invalid count: " << count); } return {"PING", std::to_string(count) }; } Status validateResponse(const redisReplyPtr &reply) override { if(!reply) return Status::INVALID; if(reply->type != REDIS_REPLY_STATUS) return Status::INVALID; if(std::string(reply->str, reply->len) != SSTR(count)) { return Status::INVALID; } qdb_info("Validated ping handshake response #" << count); count++; if(count == 10) { return Status::VALID_COMPLETE; } return Status::VALID_INCOMPLETE; } virtual void restart() override { count = 0; } virtual std::unique_ptr clone() const override { return std::unique_ptr(new PingHandshake()); } private: int count = 0; }; // with handshake qclient::Options options; options.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(60)); options.handshake.reset(new PingHandshake()); QClient tunnel("localhost", 1234, std::move(options)); for(size_t attempts = 0; attempts < 2; attempts++) { SocketListener listener(1234); int s2 = listener.accept(); ASSERT_GT(s2, 0); Link link(s2); RedisParser parser(&link); RedisRequest req1 { "set", "abc", "123" }; std::future fut1 = tunnel.execute(req1); RedisRequest req2 { "set", "aaa", "bbb" }; std::future fut2 = tunnel.execute(req2); RedisRequest incoming; for(size_t i = 0; i < 10; i++) { RETRY_ASSERT_EQ_SPIN(parser.fetch(incoming, true), 1); ASSERT_EQ(incoming, make_req("PING", std::to_string(i))); link.Send(SSTR("+" << i << "\r\n")); } RETRY_ASSERT_EQ_SPIN(parser.fetch(incoming, true), 1); ASSERT_EQ(incoming, req1); link.Send("+OK\r\n"); ASSERT_REPLY(fut1, "OK"); RETRY_ASSERT_EQ_SPIN(parser.fetch(incoming, true), 1); ASSERT_EQ(incoming, req2); link.Send("+ZZZ\r\n"); ASSERT_REPLY(fut2, "ZZZ"); link.Close(); } } TEST(QClient, AuthHandshake) { // with handshake qclient::Options opts; opts.retryStrategy = qclient::RetryStrategy::WithTimeout(std::chrono::seconds(60)); opts.handshake.reset(new qclient::AuthHandshake("hunter2")); QClient tunnel("localhost", 1235, std::move(opts)); for(size_t attempts = 0; attempts < 2; attempts++) { SocketListener listener(1235); int s2 = listener.accept(); ASSERT_GT(s2, 0); Link link(s2); RedisParser parser(&link); RedisRequest req1 { "set", "abc", "123" }; std::future fut1 = tunnel.execute(req1); RedisRequest req2 { "set", "aaa", "bbb" }; std::future fut2 = tunnel.execute(req2); RedisRequest incoming; RETRY_ASSERT_EQ_SPIN(parser.fetch(incoming, true), 1); ASSERT_EQ(incoming, make_req("AUTH", "hunter2")); link.Send("+OK\r\n"); RETRY_ASSERT_EQ_SPIN(parser.fetch(incoming, true), 1); ASSERT_EQ(incoming, req1); link.Send("+OK\r\n"); ASSERT_REPLY(fut1, "OK"); RETRY_ASSERT_EQ_SPIN(parser.fetch(incoming, true), 1); ASSERT_EQ(incoming, req2); link.Send("+ZZZ\r\n"); ASSERT_REPLY(fut2, "ZZZ"); link.Close(); } }