// ---------------------------------------------------------------------- // File: Publisher.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "pubsub/Publisher.hh" #include "Formatter.hh" #include "storage/PatternMatching.hh" using namespace quarkdb; // Constructor Publisher::Publisher() { asyncPublishingThread.reset(&Publisher::asyncPublisher, this); } void Publisher::asyncPublisher(ThreadAssistant &assistant) { auto frontier = revisionQueue.begin(); while(!assistant.terminationRequested()) { VersionedHashRevisionTracker *nextItem = frontier.getItemBlockOrNull(); if(!nextItem) continue; for(auto it = nextItem->begin(); it != nextItem->end(); it++) { publish(SSTR("__vhash@" << it->first), it->second.serialize()); } frontier.next(); revisionQueue.pop_front(); } } Publisher::~Publisher() { asyncPublishingThread.stop(); revisionQueue.setBlockingMode(false); asyncPublishingThread.join(); purgeListeners(Formatter::err("unavailable")); } void Publisher::purgeListeners(RedisEncodedResponse resp) { for(auto it = channelSubscriptions.getFullIterator(); it.valid(); it.next()) { it.getValue()->appendIfAttached(RedisEncodedResponse(resp)); it.erase(); } for(auto it = patternMatcher.getFullIterator(); it.valid(); it.next()) { it.getValue()->appendIfAttached(RedisEncodedResponse(resp)); it.erase(); } } bool Publisher::unsubscribe(std::shared_ptr connection, std::string_view channel) { connection->unsubscribe(std::string(channel)); return channelSubscriptions.erase(std::string(channel), connection); } bool Publisher::punsubscribe(std::shared_ptr connection, std::string_view pattern) { connection->punsubscribe(std::string(pattern)); return patternMatcher.erase(std::string(pattern), connection); } int Publisher::subscribe(std::shared_ptr connection, std::string_view channel) { connection->subscribe(std::string(channel)); return channelSubscriptions.insert(std::string(channel), connection); } int Publisher::psubscribe(std::shared_ptr connection, std::string_view pattern) { connection->psubscribe(std::string(pattern)); return patternMatcher.insert(std::string(pattern), connection); } int Publisher::publishChannels(const std::string &channel, std::string_view payload) { int hits = 0; // publish to matching channels for(auto it = channelSubscriptions.findMatching(std::string(channel)); it.valid(); it.next()) { bool stillAlive = it.getValue()->addMessageIfAttached(channel, payload); if(!stillAlive) { it.erase(); } else { hits++; } } return hits; } int Publisher::publishPatterns(const std::string& channel, std::string_view payload) { int hits = 0; // publish to matching patterns for(auto it = patternMatcher.find(std::string(channel)); it.valid(); it.next()) { bool stillAlive = it.getValue()->addPatternMessageIfAttached(it.getPattern(), channel, payload); if(!stillAlive) { it.erase(); } else { hits++; } } return hits; } int Publisher::publish(const std::string &channel, std::string_view payload) { return publishChannels(channel, payload) + publishPatterns(channel, payload); } LinkStatus Publisher::dispatch(Connection *conn, RedisRequest &req) { bool pushTypes = conn->hasPushTypesActive(); switch(req.getCommand()) { case RedisCommand::SUBSCRIBE: { if(req.size() <= 1) return conn->errArgs(req[0]); int retval = 1; for(size_t i = 1; i < req.size(); i++) { conn->getQueue()->subscriptions += subscribe(conn->getQueue(), req[i]); if(retval >= 0) { retval = conn->raw(Formatter::subscribe(pushTypes, req[i], conn->getQueue()->subscriptions)); } } if(conn->hasPushTypesActive()) { conn->ok(); } return retval; } case RedisCommand::PSUBSCRIBE: { if(req.size() <= 1) return conn->errArgs(req[0]); int retval = 1; for(size_t i = 1; i < req.size(); i++) { conn->getQueue()->subscriptions += psubscribe(conn->getQueue(), req[i]); if(retval >= 0) { retval = conn->raw(Formatter::psubscribe(pushTypes, req[i], conn->getQueue()->subscriptions)); } } if(conn->hasPushTypesActive()) { conn->ok(); } return retval; } case RedisCommand::UNSUBSCRIBE: { if(req.size() <= 1) return conn->errArgs(req[0]); int retval = 1; for(size_t i = 1; i < req.size(); i++) { conn->getQueue()->subscriptions -= unsubscribe(conn->getQueue(), req[i]); if(retval >= 0) { retval = conn->raw(Formatter::unsubscribe(pushTypes, req[i], conn->getQueue()->subscriptions)); } } if(conn->hasPushTypesActive()) { conn->ok(); } return retval; } case RedisCommand::PUNSUBSCRIBE: { if(req.size() <= 1) return conn->errArgs(req[0]); int retval = 1; for(size_t i = 1; i < req.size(); i++) { conn->getQueue()->subscriptions -= punsubscribe(conn->getQueue(), req[i]); if(retval >= 0) { retval = conn->raw(Formatter::punsubscribe(pushTypes, req[i], conn->getQueue()->subscriptions)); } } if(conn->hasPushTypesActive()) { conn->ok(); } return retval; } case RedisCommand::PUBLISH: { if(req.size() != 3) return conn->errArgs(req[0]); int hits = publish(std::string(req[1]), req[2]); return conn->integer(hits); } default: { qdb_throw("should never reach here"); } } } void Publisher::schedulePublishing(VersionedHashRevisionTracker &&revisionTracker) { revisionQueue.emplace_back(std::move(revisionTracker)); } LinkStatus Publisher::dispatch(Connection *conn, Transaction &tx) { qdb_throw("internal dispatching error, Publisher does not support transactions"); }