// ---------------------------------------------------------------------- // File: poller.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "Dispatcher.hh" #include "netio/AsioPoller.hh" #include "test-utils.hh" #include #include #include using namespace quarkdb; using namespace qclient; #define ASSERT_REPLY(reply, val) { ASSERT_NE(reply, nullptr); ASSERT_EQ(std::string(((reply))->str, ((reply))->len), val); } class tPoller : public TestCluster3NodesFixture {}; TEST_F(tPoller, SimpleConstruction) { RedisDispatcher dispatcher(*stateMachine(), *publisher()); AsioPoller smPoller(myself().port, 3, &dispatcher); } TEST_F(tPoller, OneRequest) { RedisDispatcher dispatcher(*stateMachine(), *publisher()); AsioPoller smPoller(myself().port, 3, &dispatcher); QClient tunnel(myself().hostname, myself().port, {} ); redisReplyPtr reply = tunnel.exec("set", "abc", "1234").get(); ASSERT_REPLY(reply, "OK"); } TEST_F(tPoller, T1) { RedisDispatcher dispatcher(*stateMachine(), *publisher()); AsioPoller smPoller(myself().port, 3, &dispatcher); // start first connection QClient tunnel(myself().hostname, myself().port, {} ); redisReplyPtr reply = tunnel.exec("set", "abc", "1234").get(); ASSERT_REPLY(reply, "OK"); reply = tunnel.exec("get", "abc").get(); ASSERT_REPLY(reply, "1234"); // start second connection, ensure the poller can handle them concurrently QClient tunnel2(myself().hostname, myself().port, {} ); reply = tunnel2.exec("get", "abc").get(); ASSERT_REPLY(reply, "1234"); reply = tunnel2.exec("set", "qwert", "asdf").get(); ASSERT_REPLY(reply, "OK"); // now try a third QClient tunnel3(myself().hostname, myself().port, {} ); reply = tunnel3.exec("get", "qwert").get(); ASSERT_REPLY(reply, "asdf"); } class ReconnectionCounter : public ReconnectionListener { public: virtual void notifyConnectionLost(int64_t epoch, int errc, const std::string &msg) override { } virtual void notifyConnectionEstablished(int64_t epoch) override { lastEpoch = epoch; } int64_t getEpoch() const { return lastEpoch; } private: int64_t lastEpoch = 0u; }; TEST_F(tPoller, test_reconnect) { RedisDispatcher dispatcher(*stateMachine(), *publisher()); std::shared_ptr listener = std::make_shared(); QClient tunnel(myself().hostname, myself().port, qclient::Options()); tunnel.attachListener(listener.get()); for(size_t reconnects = 0; reconnects < 5; reconnects++) { AsioPoller rocksdbpoller(myself().port, 3, &dispatcher); bool success = false; for(size_t i = 0; i < 30; i++) { redisReplyPtr reply = tunnel.exec("set", "abc", "1234").get(); if(reply != nullptr) { ASSERT_REPLY(reply, "OK"); success = true; break; } else { ASSERT_FALSE(success); std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } ASSERT_TRUE(success); } std::cout << "Number of reconnections in total: " << listener->getEpoch() << std::endl; ASSERT_GE(listener->getEpoch(), 5u); }