// ----------------------------------------------------------------------
// File: connection.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "Connection.hh"
#include "RedisParser.hh"
#include "StateMachine.hh"
#include "Dispatcher.hh"
#include "test-utils.hh"
#include
using namespace quarkdb;
class tConnection : public TestCluster3NodesFixture {};
TEST_F(tConnection, basic_sanity) {
const int BUFFER_SIZE = 1024;
char buffer[BUFFER_SIZE];
RedisDispatcher dispatcher(*stateMachine(), *publisher());
Link link;
Connection conn(&link);
conn.setResponseBuffering(false);
conn.addPendingTransaction(&dispatcher, Transaction({"get", "abc"}));
int len = link.Recv(buffer, BUFFER_SIZE, 0);
ASSERT_EQ(std::string(buffer, len), "$-1\r\n");
conn.err("fatality");
len = link.Recv(buffer, BUFFER_SIZE, 0);
ASSERT_EQ(std::string(buffer, len), "-ERR fatality\r\n");
conn.addPendingTransaction(&dispatcher, Transaction({"set", "abc", "qwerty"}), 1);
ASSERT_THROW(conn.addPendingTransaction(&dispatcher, Transaction({"set", "abc", "qwerty"}), 1), FatalException);
// verify the request has NOT been dispatched yet
std::string tmp;
ASSERT_FALSE(stateMachine()->get("abc", tmp).ok());
conn.addPendingTransaction(&dispatcher, Transaction({"get", "abc"}) );
ASSERT_EQ(link.Recv(buffer, BUFFER_SIZE, 0), 0); // "set" is blocking any replies
conn.addPendingTransaction(&dispatcher, Transaction({"set", "abc", "12345"}), 2);
ASSERT_EQ(conn.dispatchPending(&dispatcher, 1), 2);
len = link.Recv(buffer, BUFFER_SIZE, 0);
ASSERT_EQ(std::string(buffer, len), "+OK\r\n$6\r\nqwerty\r\n");
conn.err("fatality^2");
conn.addPendingTransaction(&dispatcher, Transaction({"get", "abc"}) );
ASSERT_EQ(link.Recv(buffer, BUFFER_SIZE, 0), 0); // "set" is blocking any replies
ASSERT_TRUE(stateMachine()->get("abc", tmp).ok());
ASSERT_EQ(tmp, "qwerty");
ASSERT_EQ(conn.dispatchPending(&dispatcher, 2), -1);
ASSERT_TRUE(stateMachine()->get("abc", tmp).ok());
ASSERT_EQ(tmp, "12345");
len = link.Recv(buffer, BUFFER_SIZE, 0);
ASSERT_EQ(std::string(buffer, len), "+OK\r\n-ERR fatality^2\r\n$5\r\n12345\r\n");
ASSERT_THROW(conn.addPendingTransaction(&dispatcher, Transaction({"set", "asdf", "qwerty"}), 1), FatalException);
}