// ---------------------------------------------------------------------- // File: Subscriber.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * qclient - A simple redis C++ client with support for redirects * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "qclient/pubsub/Subscriber.hh" #include "qclient/pubsub/Message.hh" #include "qclient/pubsub/MessageListener.hh" namespace qclient { //------------------------------------------------------------------------------ // Listener, static to this object //------------------------------------------------------------------------------ class SubscriberListener : public MessageListener { public: SubscriberListener(Subscriber *sub) : subscriber(sub) {} virtual ~SubscriberListener() {} virtual void handleIncomingMessage(const Message& msg) { subscriber->processIncomingMessage(msg); } private: Subscriber *subscriber; }; //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ Subscription::Subscription(Subscriber* sub) : subscriber(sub) {} //------------------------------------------------------------------------------ // Destructor - notify subscriber we're shutting down //------------------------------------------------------------------------------ Subscription::~Subscription() { if(subscriber) { subscriber->unsubscribe(this); subscriber = nullptr; } } //------------------------------------------------------------------------------ // Process incoming message //------------------------------------------------------------------------------ void Subscription::processIncoming(const Message &msg) { queue.emplace_back(msg); } //------------------------------------------------------------------------------ // Is the queue empty? //------------------------------------------------------------------------------ bool Subscription::empty() const { return queue.size() == 0; } //------------------------------------------------------------------------------ // Remove the oldest received message, ie the front of the queue. //------------------------------------------------------------------------------ void Subscription::pop_front() { return queue.pop_front(); } //------------------------------------------------------------------------------ // Get oldest message, ie the front of the queue. Return false if the queue // is empty. //------------------------------------------------------------------------------ bool Subscription::front(Message &out) const { if(queue.size() == 0) { return false; } out = queue.front(); return true; } //------------------------------------------------------------------------------ // Return queue size //------------------------------------------------------------------------------ size_t Subscription::size() const { return queue.size(); } //------------------------------------------------------------------------------ // Stop behaving like a queue, forward incoming messages to the given // callback. //------------------------------------------------------------------------------ void Subscription::attachCallback(const Callback &cb) { queue.attach(cb); } //------------------------------------------------------------------------------ // Detach callback, start behaving like a queue again //------------------------------------------------------------------------------ void Subscription::detachCallback() { queue.detach(); } //------------------------------------------------------------------------------ // Has this subscription been acknowledged by the server yet? //------------------------------------------------------------------------------ bool Subscription::acknowledged() const { return isAcknowledged; } //------------------------------------------------------------------------------ // Mark subscription as acknowledged //------------------------------------------------------------------------------ void Subscription::markAcknowledged() { isAcknowledged = true; } //------------------------------------------------------------------------------ // Constructor - real mode, connect to a real server //------------------------------------------------------------------------------ Subscriber::Subscriber(const Members &members, SubscriptionOptions &&options, Logger *log) : /*logger(log),*/ listener(new SubscriberListener(this)), base(new BaseSubscriber(members, listener, std::move(options))) {} //------------------------------------------------------------------------------ // Simulated mode - enable ability to feed fake messages for testing // this class //------------------------------------------------------------------------------ Subscriber::Subscriber() {} //------------------------------------------------------------------------------ // Receive notification about a Subscription being destroyed //------------------------------------------------------------------------------ void Subscriber::unsubscribe(Subscription *subscription) { std::lock_guard lock(mtx); auto it = reverseChannelSubscriptions.find(subscription); if(it == reverseChannelSubscriptions.end()) { // Something is not right, warn.. TODO return; } channelSubscriptions.erase(it->second); reverseChannelSubscriptions.erase(it); } //------------------------------------------------------------------------------ // Feed fake message - only has an effect in sumulated mode //------------------------------------------------------------------------------ void Subscriber::feedFakeMessage(const Message& msg) { processIncomingMessage(msg); } //------------------------------------------------------------------------------ // Process incoming message //------------------------------------------------------------------------------ void Subscriber::processIncomingMessage(const Message &msg) { std::lock_guard lock(mtx); if(msg.getMessageType() == MessageType::kSubscribe) { auto targetChannel = channelSubscriptions.find(msg.getChannel()); if(targetChannel != channelSubscriptions.end()) { targetChannel->second->markAcknowledged(); } return; } if(msg.getMessageType() != MessageType::kMessage && msg.getMessageType() != MessageType::kPatternMessage) { return; } // Feed to channel subscriptions auto channels = channelSubscriptions.equal_range(msg.getChannel()); for(auto it = channels.first; it != channels.second; it++) { it->second->processIncoming(msg); } } //------------------------------------------------------------------------------ // Subscribe to the given channel through a Subscription object //------------------------------------------------------------------------------ std::unique_ptr Subscriber::subscribe(const std::string &channel) { std::lock_guard lock(mtx); std::unique_ptr subscription = std::make_unique(this); auto it = channelSubscriptions.emplace(channel, subscription.get()); reverseChannelSubscriptions.emplace(subscription.get(), it); if(base) { base->subscribe( {channel} ); } return subscription; } //------------------------------------------------------------------------------ // Get underlying QClient - lifetime tied to this object //------------------------------------------------------------------------------ qclient::QClient* Subscriber::getQcl() { if(base) { return base->getQcl(); } return nullptr; } }