// ----------------------------------------------------------------------
// File: BaseSubscriber.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2018 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "qclient/pubsub/BaseSubscriber.hh"
#include "qclient/Handshake.hh"
#include "qclient/Logger.hh"
#include "qclient/ReconnectionListener.hh"
namespace qclient {
class BaseSubscriberListener : public qclient::ReconnectionListener {
public:
BaseSubscriberListener(BaseSubscriber *sub) : subscriber(sub) {}
virtual void notifyConnectionLost(int64_t epoch, int errc,
const std::string &msg) override {}
virtual void notifyConnectionEstablished(int64_t epoch) override {
subscriber->notifyConnectionEstablished(epoch);
}
private:
BaseSubscriber *subscriber = nullptr;
};
//------------------------------------------------------------------------------
// Make QClient options
//------------------------------------------------------------------------------
static Options makeOptions(SubscriptionOptions &&opts,
std::shared_ptr listener) {
qclient::Options options;
options.tlsconfig = opts.tlsconfig;
options.handshake = std::move(opts.handshake);
options.logger = opts.logger;
options.ensureConnectionIsPrimed = true;
options.transparentRedirects = true;
options.retryStrategy = opts.retryStrategy;
options.backpressureStrategy = BackpressureStrategy::Default();
options.messageListener = listener;
options.exclusivePubsub = !opts.usePushTypes;
if(opts.usePushTypes) {
options.chainHandshake(std::unique_ptr(new ActivatePushTypesHandshake()));
}
return options;
}
//------------------------------------------------------------------------------
// Constructor taking a list of members for the cluster
//------------------------------------------------------------------------------
BaseSubscriber::BaseSubscriber(const Members &memb,
std::shared_ptr list, SubscriptionOptions &&opt)
: reconnectionListener(new BaseSubscriberListener(this)), members(memb),
listener(list),
qcl(members, makeOptions(std::move(opt), list)) {
// Invalid listener?
if(!listener) {
QCLIENT_LOG(options.logger, LogLevel::kFatal, "Attempted to initialize "
"qclient::BaseSubscriber object with nullptr message listener!");
std::abort();
}
qcl.attachListener(reconnectionListener.get());
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
BaseSubscriber::~BaseSubscriber() { }
//------------------------------------------------------------------------------
// Notify of a reconnection in the underlying qclient - re-subscribe
// to everything
//------------------------------------------------------------------------------
void BaseSubscriber::notifyConnectionEstablished(int64_t epoch) {
std::unique_lock lock(mtx);
std::vector payloadChannels = {"subscribe"};
for(auto it = channels.begin(); it != channels.end(); it++) {
payloadChannels.emplace_back(*it);
}
std::vector payloadPatterns = {"psubscribe"};
for(auto it = patterns.begin(); it != patterns.end(); it++) {
payloadPatterns.emplace_back(*it);
}
if(payloadChannels.size() != 1) {
qcl.execute(nullptr, payloadChannels);
}
if(payloadPatterns.size() != 1) {
qcl.execute(nullptr, payloadPatterns);
}
}
//------------------------------------------------------------------------------
// Subscribe to the given channels, in addition to any other subscriptions
// we may currently have.
//------------------------------------------------------------------------------
void BaseSubscriber::subscribe(const std::vector &newChannels)
{
std::unique_lock lock(mtx);
std::vector payload = {"subscribe"};
for(auto it = newChannels.begin(); it != newChannels.end(); it++) {
if(channels.find(*it) == channels.end()) {
payload.emplace_back(*it);
channels.emplace(*it);
}
}
if(payload.size() != 1) {
qcl.execute(nullptr, payload);
}
}
//------------------------------------------------------------------------------
// Subscribe to the given patterns, in addition to any other subscriptions
// we may currently have.
//------------------------------------------------------------------------------
void BaseSubscriber::psubscribe(const std::vector &newPatterns) {
std::unique_lock lock(mtx);
std::vector payload = {"psubscribe"};
for(auto it = newPatterns.begin(); it != newPatterns.end(); it++) {
if(patterns.find(*it) == patterns.end()) {
payload.emplace_back(*it);
patterns.emplace(*it);
}
}
if(payload.size() != 1) {
qcl.execute(nullptr, payload);
}
}
//------------------------------------------------------------------------------
// Unsubscribe from the given channels. If an empty vector is given, we are
// unsubscribed from all channels. (but not patterns!)
//------------------------------------------------------------------------------
void BaseSubscriber::unsubscribe(const std::vector &remChannels) {
std::unique_lock lock(mtx);
std::vector payload = {"unsubscribe"};
for(auto it = remChannels.begin(); it != remChannels.end(); it++) {
payload.emplace_back(*it);
channels.erase(*it);
}
if(remChannels.size() == 0) {
channels.clear();
}
qcl.execute(nullptr, payload);
}
//------------------------------------------------------------------------------
// Unsubscribe from the given patterns. If an empty vector is given, we are
// unsubscribed from all patterns. (but not channels!)
//------------------------------------------------------------------------------
void BaseSubscriber::punsubscribe(const std::vector &remPatterns) {
std::unique_lock lock(mtx);
std::vector payload = {"punsubscribe"};
for(auto it = remPatterns.begin(); it != remPatterns.end(); it++) {
payload.emplace_back(*it);
patterns.erase(*it);
}
if(remPatterns.size() == 0) {
patterns.clear();
}
qcl.execute(nullptr, payload);
}
//------------------------------------------------------------------------------
// Get underlying QClient object - lifetime tied to this object
//------------------------------------------------------------------------------
qclient::QClient* BaseSubscriber::getQcl() {
return &qcl;
}
}