// ---------------------------------------------------------------------- // File: Link.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include #include #include "Link.hh" #include "Common.hh" #include "Utils.hh" #include "utils/Uuid.hh" #include "utils/Stacktrace.hh" #include #include using namespace quarkdb; namespace { // No external linkage bool connectionLogging = true; } void Link::setConnectionLogging(bool val) { connectionLogging = val; } using std::placeholders::_1; using std::placeholders::_2; using std::placeholders::_3; qclient::RecvStatus Link::recvStatus(char *buff, int blen, int timeout) { int rc = this->rawRecv(buff, blen, timeout); if(rc == 0) return qclient::RecvStatus(true, 0, 0); // no pending data to read if(rc < 0) return qclient::RecvStatus(false, rc, 0); // connection error return qclient::RecvStatus(true, 0, rc); // return data } Link::Link(const qclient::TlsConfig &tlsconfig_) : tlsconfig(tlsconfig_), tlsfilter(tlsconfig, qclient::FilterType::SERVER, std::bind(&Link::recvStatus, this, _1, _2, _3), std::bind(&Link::rawSend, this, _1, _2)) { uuid = generateUuid(); } Link::Link(asio::ip::tcp::socket &socket, const std::string &hostname, qclient::TlsConfig tlsconfig_) : Link(tlsconfig_) { asioSocket = &socket; uuid = generateUuid(); host = hostname; if(connectionLogging) qdb_info("New link from " << describe()); } Link::Link(int fd_, qclient::TlsConfig tlsconfig_) : Link(tlsconfig_) { uuid = generateUuid(); fd = fd_; fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK); } Link::Link(XrdLink *lp, qclient::TlsConfig tlsconfig_) : Link(tlsconfig_) { uuid = generateUuid(); host = lp->Host(); link = lp; if(connectionLogging) qdb_info("New link from " << describe()); } Link::~Link() { if(connectionLogging) qdb_info("Shutting down link from " << describe()); Close(); } std::string Link::describe() const { return SSTR(host << " [" << uuid << "]"); } void Link::preventXrdLinkClose() { xrdLinkCloseDisabled = true; } LinkStatus Link::rawRecv(char *buff, int blen, int timeout) { if(link) return link->Recv(buff, blen, timeout); if(asioSocket) return asioRecv(buff, blen, timeout); if(fd >= 0) return fdRecv(buff, blen, timeout); return streamRecv(buff, blen, timeout); } LinkStatus Link::Recv(char *buff, int blen, int timeout) { if(tlsconfig.active) { qclient::RecvStatus status = tlsfilter.recv(buff, blen, timeout); if(!status.connectionAlive) return -1; return status.bytesRead; } return rawRecv(buff, blen, timeout); } LinkStatus Link::Close(int defer) { if(tlsconfig.active) tlsfilter.close(defer); if(link) { if(xrdLinkCloseDisabled) return 1; return link->Close(defer); } if(asioSocket) return asioClose(defer); if(fd >= 0) return fdClose(defer); return streamClose(defer); } LinkStatus Link::rawSend(const char *buff, int blen) { if(link) return link->Send(buff, blen); if(asioSocket) return asioSend(buff, blen); if(fd >= 0) return fdSend(buff, blen); return streamSend(buff, blen); } LinkStatus Link::Send(const char *buff, int blen) { if(dead) return -1; LinkStatus ret; if(tlsconfig.active) ret = tlsfilter.send(buff, blen); ret = rawSend(buff, blen); if(ret != blen) { dead = true; if(ret >= 0) { qdb_critical("wrote " << ret << " bytes into Link, even though it should be " << blen); } } return ret; } LinkStatus Link::Send(const std::string &str) { return Send(str.c_str(), str.size()); } LinkStatus Link::asioRecv(char *buff, int blen, int timeout) { asio::mutable_buffer asioBuff(buff, blen); std::error_code ec; int len = asioSocket->receive(asioBuff, 0, ec); if(ec.value() == 0) { return len; } else if(ec.value() == EAGAIN || ec.value() == EWOULDBLOCK) { return 0; } return - ec.value(); } LinkStatus Link::asioSend(const char *buff, int blen) { asio::error_code ec; int bytesWritten = asio::write(*asioSocket, asio::buffer(buff, blen), ec); if(ec.value() != 0) { return -1; } return bytesWritten; } LinkStatus Link::asioClose(int defer) { asio::error_code ignored_ec; asioSocket->shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); return 0; } LinkStatus Link::streamSend(const char *buff, int blen) { if(stream.eof()) return -1; stream.write(buff, blen); return blen; } LinkStatus Link::streamClose(int defer) { stream.ignore(std::numeric_limits::max()); return 0; } LinkStatus Link::streamRecv(char *buff, int blen, int timeout) { if(stream.eof()) return -1; int totalRead = 0; while(true) { int rc = stream.readsome(buff, blen); totalRead += rc; blen -= rc; buff += rc; if(rc == 0 || blen == 0) break; } return totalRead; } LinkStatus Link::fdRecv(char *buff, int blen, int timeout) { int rc = recv(fd, buff, blen, 0); if(rc == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) return 0; return rc; } LinkStatus Link::fdSend(const char *buff, int blen) { return send(fd, buff, blen, 0); } LinkStatus Link::fdClose(int defer) { return close(fd); } void Link::overrideHost(const std::string &newhost) { host = newhost; } bool Link::isLocalhost() const { return ( host == "localhost.localdomain" || host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "localhost6" || host == "localhost6.localdomain6" ); }