// ---------------------------------------------------------------------- // File: XrdQuarkDB.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include #include #include "XrdVersion.hh" #include "XrdQuarkDB.hh" #include "raft/RaftDispatcher.hh" #include "utils/ScopedAdder.hh" #include "QuarkDBNode.hh" using namespace quarkdb; //------------------------------------------------------------------------------ // Globals //------------------------------------------------------------------------------ QuarkDBNode *XrdQuarkDB::quarkdbNode = 0; InFlightTracker XrdQuarkDB::inFlightTracker; EventFD XrdQuarkDB::shutdownFD; //------------------------------------------------------------------------------ // Shutdown mechanism. Here's how it works. // The signal handler sets inShutdown and notifies shutdownMonitor. Since we can // only call signal-safe functions there, using a condition variable is not // safe. write() is signal-safe, so let's use an eventfd. // // After inShutdown is set, all new requests are rejected, and we wait until // all requests currently in flight are completed before deleting the main node. //------------------------------------------------------------------------------ void XrdQuarkDB::shutdownMonitor() { while(inFlightTracker.isAcceptingRequests()) { shutdownFD.wait(); } qdb_event("Received request to shut down. Spinning until all requests in flight (" << inFlightTracker.getInFlight() << ") have been processed.."); inFlightTracker.spinUntilNoRequestsInFlight(); delete quarkdbNode; qdb_event("SHUTTING DOWN"); std::quick_exit(0); } //------------------------------------------------------------------------------ // XrdQuarkDB class //------------------------------------------------------------------------------ XrdQuarkDB::XrdQuarkDB(bool tls) : XrdProtocol("Redis protocol handler") { Reset(); tlsconfig.active = tls; if(tls) { tlsconfig.certificatePath = quarkdbNode->getConfiguration().getCertificatePath(); tlsconfig.keyPath = quarkdbNode->getConfiguration().getCertificateKeyPath(); } } int XrdQuarkDB::Process(XrdLink *lp) { InFlightRegistration registration(inFlightTracker); if(!registration.ok()) { return -1; } // TODO log client DN if(!link && tlsconfig.active) qdb_info("handling TLS connection. Security is intensifying"); if(!link) link = new Link(lp, tlsconfig); if(!conn) conn = new Connection(link); return conn->processRequests(quarkdbNode, inFlightTracker); } XrdProtocol* XrdQuarkDB::Match(XrdLink *lp) { char buffer[2]; // Peek at the first bytes of data int dlen = lp->Peek(buffer, (int) sizeof (buffer), 10000); if(dlen <= 0) return nullptr; if(buffer[0] != '*') { // This is probably a TLS connection. Reject if there's no certificate // configured. if(quarkdbNode->getConfiguration().getCertificatePath().empty()) { return nullptr; } return new XrdQuarkDB(true); } return new XrdQuarkDB(false); // TLS not enabled } void XrdQuarkDB::Reset() { if(conn) { quarkdbNode->notifyDisconnect(conn); delete conn; conn = nullptr; } if(link) { link->preventXrdLinkClose(); delete link; link = nullptr; } } void XrdQuarkDB::Recycle(XrdLink *lp,int consec,const char *reason) { Reset(); } int XrdQuarkDB::Stats(char *buff, int blen, int do_sync) { return 0; } void XrdQuarkDB::DoIt() { } static void handle_sigint(int sig) { XrdQuarkDB::inFlightTracker.setAcceptingRequests(false); XrdQuarkDB::shutdownFD.notify(); } int XrdQuarkDB::Configure(char *parms, XrdProtocol_Config * pi) { char* rdf = (parms && *parms ? parms : pi->ConfigFN); Configuration configuration; bool success = Configuration::fromFile(rdf, configuration); if(!success) return 0; if(configuration.getMode() == Mode::raft && pi->Port != configuration.getMyself().port) { qdb_throw("configuration error: xrootd listening port doesn't match redis.myself"); return 0; } quarkdbNode = new QuarkDBNode(configuration, defaultTimeouts); std::thread(&XrdQuarkDB::shutdownMonitor).detach(); signal(SIGINT, handle_sigint); signal(SIGTERM, handle_sigint); return 1; } XrdQuarkDB::~XrdQuarkDB() { Reset(); }