// ----------------------------------------------------------------------
// File: communicator.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "qclient/shared/PendingRequestVault.hh"
#include "qclient/shared/Communicator.hh"
#include "qclient/shared/CommunicatorListener.hh"
#include "qclient/pubsub/Subscriber.hh"
#include "qclient/pubsub/Message.hh"
#include "qclient/utils/SteadyClock.hh"
#include "shared/SharedSerialization.hh"
#include "qclient/SSTR.hh"
#include
using namespace qclient;
TEST(CommunicatorRequest, Serialization) {
std::string sourceUuid = "qwerty";
std::string contents = "uiop";
std::string payload = serializeCommunicatorRequest(sourceUuid, contents);
std::string parsedUuid;
std::string parsedContents;
ASSERT_TRUE(parseCommunicatorRequest(payload, parsedUuid, parsedContents));
}
TEST(Communicator, IssueWithReply) {
Subscriber subscriber;
Communicator communicator(&subscriber, "abc");
std::string reqid;
std::future fut = communicator.issue("1234", reqid);
ASSERT_EQ(fut.wait_for(std::chrono::seconds(0)), std::future_status::timeout);
std::cerr << "Assigned RequestID: " << reqid << std::endl;
CommunicatorReply reply;
reply.status = 999;
reply.contents = "AAAA";
Message msg = Message::createMessage("abc", serializeCommunicatorReply(reqid, reply));
subscriber.feedFakeMessage(msg);
CommunicatorReply rep = fut.get();
ASSERT_EQ(rep.status, 999);
ASSERT_EQ(rep.contents, "AAAA");
}
TEST(Communicator, WithRetries) {
Subscriber subscriber;
SteadyClock steadyClock(true);
Communicator communicator(&subscriber, "abc", &steadyClock);
std::string reqid;
std::future fut = communicator.issue("987", reqid);
ASSERT_EQ(fut.wait_for(std::chrono::seconds(0)), std::future_status::timeout);
std::cerr << "Assigned RequestID: " << reqid << std::endl;
std::string retryChannel, retryContents, retryID;
ASSERT_FALSE(communicator.runNextToRetry(retryChannel, retryContents, retryID));
steadyClock.advance(std::chrono::seconds(9));
ASSERT_FALSE(communicator.runNextToRetry(retryChannel, retryContents, retryID));
steadyClock.advance(std::chrono::seconds(1));
ASSERT_TRUE(communicator.runNextToRetry(retryChannel, retryContents, retryID));
ASSERT_EQ(retryChannel, "abc");
ASSERT_EQ(retryContents, "987");
ASSERT_EQ(retryID, reqid);
ASSERT_FALSE(communicator.runNextToRetry(retryChannel, retryContents, retryID));
steadyClock.advance(std::chrono::seconds(10));
ASSERT_TRUE(communicator.runNextToRetry(retryChannel, retryContents, retryID));
// Test expiry
steadyClock.advance(std::chrono::seconds(40));
ASSERT_FALSE(communicator.runNextToRetry(retryChannel, retryContents, retryID));
steadyClock.advance(std::chrono::seconds(9000));
ASSERT_FALSE(communicator.runNextToRetry(retryChannel, retryContents, retryID));
}
TEST(PendingRequestVault, BasicSanity) {
PendingRequestVault requestVault;
ASSERT_EQ(requestVault.size(), 0u);
std::chrono::steady_clock::time_point tp2;
ASSERT_FALSE(requestVault.getEarliestRetry(tp2));
std::chrono::steady_clock::time_point tp;
tp += std::chrono::seconds(1);
PendingRequestVault::InsertOutcome outcome = requestVault.insert("ch1", "123", tp);
std::cerr << "RequestID: " << outcome.id << std::endl;
ASSERT_EQ(requestVault.size(), 1u);
ASSERT_EQ(outcome.fut.wait_for(std::chrono::seconds(0)), std::future_status::timeout);
ASSERT_TRUE(requestVault.getEarliestRetry(tp2));
ASSERT_EQ(tp, tp2);
requestVault.blockUntilNonEmpty();
CommunicatorReply reply;
reply.status = 123;
reply.contents = "aaa";
ASSERT_FALSE(requestVault.satisfy("123", std::move(reply)));
ASSERT_TRUE(requestVault.satisfy(outcome.id, std::move(reply)));
ASSERT_EQ(requestVault.size(), 0u);
ASSERT_FALSE(requestVault.getEarliestRetry(tp2));
CommunicatorReply rep = outcome.fut.get();
ASSERT_EQ(rep.status, 123);
ASSERT_EQ(rep.contents, "aaa");
}
TEST(PendingRequestVault, WithRetries) {
PendingRequestVault requestVault;
std::chrono::steady_clock::time_point start;
requestVault.insert("ch1", "123", start+std::chrono::seconds(1));
requestVault.insert("ch1", "1234", start+std::chrono::seconds(2));
ASSERT_EQ(requestVault.size(), 2u);
std::chrono::steady_clock::time_point tp;
std::string channel, contents, id;
ASSERT_TRUE(requestVault.getEarliestRetry(tp));
ASSERT_EQ(start+std::chrono::seconds(1), tp);
ASSERT_TRUE(requestVault.retryFrontItem(start+std::chrono::seconds(3),
channel, contents, id));
ASSERT_EQ(channel, "ch1");
ASSERT_EQ(contents, "123");
ASSERT_TRUE(requestVault.getEarliestRetry(tp));
ASSERT_EQ(start+std::chrono::seconds(2), tp);
ASSERT_TRUE(requestVault.retryFrontItem(start+std::chrono::seconds(4),
channel, contents, id));
ASSERT_EQ(channel, "ch1");
ASSERT_EQ(contents, "1234");
ASSERT_TRUE(requestVault.getEarliestRetry(tp));
ASSERT_EQ(start+std::chrono::seconds(3), tp);
ASSERT_TRUE(requestVault.retryFrontItem(start+std::chrono::seconds(5),
channel, contents, id));
ASSERT_EQ(channel, "ch1");
ASSERT_EQ(contents, "123");
ASSERT_TRUE(requestVault.getEarliestRetry(tp));
ASSERT_EQ(start+std::chrono::seconds(4), tp);
ASSERT_TRUE(requestVault.retryFrontItem(start+std::chrono::seconds(6),
channel, contents, id));
ASSERT_EQ(channel, "ch1");
ASSERT_EQ(contents, "1234");
ASSERT_EQ(requestVault.expire(start), 0u);
ASSERT_EQ(requestVault.expire(start+std::chrono::seconds(1)), 1u);
ASSERT_EQ(requestVault.size(), 1u);
ASSERT_EQ(requestVault.expire(start+std::chrono::seconds(1)), 0u);
ASSERT_EQ(requestVault.expire(start+std::chrono::seconds(2)), 1u);
ASSERT_EQ(requestVault.size(), 0u);
}
TEST(PendingRequestVault, SingleItemRetry) {
PendingRequestVault requestVault;
std::chrono::steady_clock::time_point start;
requestVault.insert("ch1", "123", start+std::chrono::seconds(1));
std::string channel, contents, id;
ASSERT_TRUE(requestVault.retryFrontItem(start+std::chrono::seconds(3),
channel, contents, id));
ASSERT_EQ(channel, "ch1");
ASSERT_EQ(contents, "123");
CommunicatorReply reply;
reply.status = 123;
reply.contents = "aaa";
ASSERT_TRUE(requestVault.satisfy(id, std::move(reply)));
}
TEST(CommunicatorListener, BasicSanity) {
Subscriber subscriber;
CommunicatorListener communicator(&subscriber, "abc");
ASSERT_EQ(communicator.size(), 0u);
Message msg = Message::createMessage("abc", serializeCommunicatorRequest("1-2-3-4", "qqq"));
subscriber.feedFakeMessage(msg);
ASSERT_EQ(communicator.size(), 1u);
CommunicatorRequest req = communicator.front();
communicator.pop_front();
ASSERT_EQ(req.getID(), "1-2-3-4");
ASSERT_EQ(req.getContents(), "qqq");
subscriber.feedFakeMessage(msg);
ASSERT_EQ(communicator.size(), 0u);
}