// ---------------------------------------------------------------------- // File: pubsub.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * qclient - A simple redis C++ client with support for redirects * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "pubsub/MessageParser.hh" #include "qclient/ResponseBuilder.hh" #include "qclient/pubsub/Message.hh" #include "qclient/pubsub/MessageQueue.hh" #include "qclient/pubsub/Subscriber.hh" #include "gtest/gtest.h" using namespace qclient; TEST(MessageParser, ParseFailure) { Message msg; ASSERT_FALSE(MessageParser::parse(ResponseBuilder::makeStr("adfaf"), msg)); ASSERT_FALSE(MessageParser::parse(ResponseBuilder::makeInt(3), msg)); } TEST(MessageParser, kMessage) { Message msg; std::vector vec = { "message", "mychannel", "test" }; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makeStringArray(vec), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kMessage); ASSERT_EQ(msg.getChannel(), "mychannel"); ASSERT_EQ(msg.getPayload(), "test"); } TEST(MessageParser, kMessagePush) { Message msg; std::vector vec = { "pubsub", "message", "mychannel", "test" }; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makePushArray(vec), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kMessage); ASSERT_EQ(msg.getChannel(), "mychannel"); ASSERT_EQ(msg.getPayload(), "test"); } TEST(MessageParser, kPatternMessage) { Message msg; std::vector vec = { "pmessage", "pattern*", "channel-name", "aaa" }; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makeStringArray(vec), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kPatternMessage); ASSERT_EQ(msg.getPattern(), "pattern*"); ASSERT_EQ(msg.getChannel(), "channel-name"); ASSERT_EQ(msg.getPayload(), "aaa"); } TEST(MessageParser, kPatternMessagePush) { Message msg; std::vector vec = { "pubsub", "pmessage", "pattern*", "channel-name", "aaa" }; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makePushArray(vec), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kPatternMessage); ASSERT_EQ(msg.getPattern(), "pattern*"); ASSERT_EQ(msg.getChannel(), "channel-name"); ASSERT_EQ(msg.getPayload(), "aaa"); } TEST(MessageParser, kSubscribe) { Message msg; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makeArr("subscribe", "chan", 4), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kSubscribe); ASSERT_EQ(msg.getChannel(), "chan"); ASSERT_EQ(msg.getActiveSubscriptions(), 4); } TEST(MessageParser, kSubscribePush) { Message msg; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makePushArr("pubsub", "subscribe", "chan", 4), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kSubscribe); ASSERT_EQ(msg.getChannel(), "chan"); ASSERT_EQ(msg.getActiveSubscriptions(), 4); } TEST(MessageParser, kPatternSubscribe) { Message msg; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makeArr("psubscribe", "chan2", 3), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kPatternSubscribe); ASSERT_EQ(msg.getPattern(), "chan2"); ASSERT_EQ(msg.getActiveSubscriptions(), 3); } TEST(MessageParser, kPatternSubscribePush) { Message msg; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makePushArr("pubsub", "psubscribe", "chan2", 3), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kPatternSubscribe); ASSERT_EQ(msg.getPattern(), "chan2"); ASSERT_EQ(msg.getActiveSubscriptions(), 3); } TEST(MessageParser, kUnsubscribe) { Message msg; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makeArr("unsubscribe", "mychan", 99), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kUnsubscribe); ASSERT_EQ(msg.getChannel(), "mychan"); ASSERT_EQ(msg.getActiveSubscriptions(), 99); } TEST(MessageParser, kUnsubscribePush) { Message msg; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makePushArr("pubsub", "unsubscribe", "mychan", 99), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kUnsubscribe); ASSERT_EQ(msg.getChannel(), "mychan"); ASSERT_EQ(msg.getActiveSubscriptions(), 99); } TEST(MessageParser, kPatternUnsubscribe) { Message msg; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makeArr("punsubscribe", "p*", 9999), msg)); ASSERT_EQ(msg.getMessageType(), MessageType::kPatternUnsubscribe); ASSERT_EQ(msg.getPattern(), "p*"); ASSERT_EQ(msg.getActiveSubscriptions(), 9999); } TEST(MessageQueue, BasicSanity) { MessageQueue queue; Message msg; std::vector vec = { "message", "mychannel", "test" }; ASSERT_TRUE(MessageParser::parse(ResponseBuilder::makeStringArray(vec), msg)); queue.handleIncomingMessage(std::move(msg)); ASSERT_EQ(queue.size(), 1u); auto it = queue.begin(); ASSERT_TRUE(it.itemHasArrived()); ASSERT_EQ(it.item().getMessageType(), MessageType::kMessage); ASSERT_EQ(it.item().getChannel(), "mychannel"); it.next(); queue.pop_front(); ASSERT_EQ(queue.size(), 0u); } TEST(Subscriber, BasicSanity) { Subscriber subscriber; std::unique_ptr ch1 = subscriber.subscribe("ch1"); ASSERT_TRUE(ch1->empty()); subscriber.feedFakeMessage(Message::createMessage("ch2", "test")); ASSERT_TRUE(ch1->empty()); subscriber.feedFakeMessage(Message::createMessage("ch1", "aaaa")); Message expected = Message::createMessage("ch1", "aaaa"); Message msg; ASSERT_TRUE(ch1->front(msg)); ASSERT_EQ(msg, expected); ch1->pop_front(); ASSERT_TRUE(ch1->empty()); std::unique_ptr ch1clone = subscriber.subscribe("ch1"); subscriber.feedFakeMessage(Message::createMessage("ch1", "aaaa")); ASSERT_TRUE(ch1->front(msg)); ASSERT_EQ(msg, expected); ch1->pop_front(); ASSERT_TRUE(ch1->empty()); ASSERT_TRUE(ch1clone->front(msg)); ASSERT_EQ(msg, expected); ch1clone->pop_front(); ASSERT_TRUE(ch1clone->empty()); }