//------------------------------------------------------------------------------
// File: AsyncConnector.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "qclient/network/AsyncConnector.hh"
#include "qclient/network/HostResolver.hh"
#include
#include
#include
#include
#include
#include
#include
#include
namespace qclient {
#define SSTR(message) static_cast(std::ostringstream().flush() << message).str()
//------------------------------------------------------------------------------
// Constructor - initiate connection towards the given ServiceEndpoint. Does
// not block the calling thread until connected - issues an asynchronous
// request the OS, asking to connect.
//------------------------------------------------------------------------------
AsyncConnector::AsyncConnector(const ServiceEndpoint &endpoint) {
//----------------------------------------------------------------------------
// Create the socket..
//----------------------------------------------------------------------------
fd = FileDescriptor(socket(endpoint.getAiFamily(), endpoint.getAiSocktype(), endpoint.getAiProtocol()));
if(fd.get() < 0) {
localerrno = errno;
error = SSTR("Unable to create a socket: " << strerror(localerrno));
return;
}
#ifndef __APPLE__
#define CUSTOM_TCP_USER_TIMEOUT 18
//----------------------------------------------------------------------------
// Set TCP timeout to 30 sec. Allow failure, as it's not supported on SLC6.
//----------------------------------------------------------------------------
int timeout = 30 * 1000;
if(setsockopt(fd.get(), IPPROTO_TCP, CUSTOM_TCP_USER_TIMEOUT, &timeout, sizeof(timeout)) != 0) {
std::cerr << "qclient: could not set TCP_USER_TIMEOUT: " << strerror(localerrno) << std::endl;
}
#endif
//----------------------------------------------------------------------------
// Make non-blocking..
//----------------------------------------------------------------------------
int rv = fcntl(fd.get(), F_SETFL, fcntl(fd.get(), F_GETFL) | O_NONBLOCK);
if(rv != 0) {
localerrno = errno;
error = SSTR("Unable to make socket non-blocking: " << strerror(localerrno));
fd.reset();
return;
}
//----------------------------------------------------------------------------
// Initiate connect..
//----------------------------------------------------------------------------
const std::vector& addr = endpoint.getAddressBytes();
rv = ::connect(fd.get(), (const sockaddr*) addr.data(), addr.size());
if(rv < 0 && errno != EINPROGRESS) {
localerrno = errno;
fd.reset();
error = SSTR("Unable to connect to " << endpoint.getOriginalHostname() << ":" << strerror(localerrno));
return;
}
else if(rv == 0) {
//--------------------------------------------------------------------------
// ::connect succeeded immediately
//--------------------------------------------------------------------------
finished = true;
}
}
//------------------------------------------------------------------------------
// Is ::connect ready yet?
//------------------------------------------------------------------------------
bool AsyncConnector::isReady() {
if(finished || localerrno != 0 || fd.get() < 0) {
return true;
}
//----------------------------------------------------------------------------
// We don't know yet, need to ask kernel..
// poll() should be instantaneous here.
//----------------------------------------------------------------------------
struct pollfd polls[1];
polls[0].fd = fd.get();
polls[0].events = POLLOUT;
int rpoll = poll(polls, 1, 0);
if(rpoll == 1) {
finished = true;
}
return finished;
}
//------------------------------------------------------------------------------
// Block until file descriptor is ready, OR a POLLIN event occurs in the
// given shutdown fd.
//
// Return true if file descriptor is ready, false if we had to cancel due
// to events in shutdownFd.
//------------------------------------------------------------------------------
bool AsyncConnector::blockUntilReady(int shutdownFd, std::chrono::seconds timeout) {
if(finished || localerrno != 0 || fd.get() < 0) {
return true;
}
//----------------------------------------------------------------------------
// Calculate deadline
//----------------------------------------------------------------------------
std::chrono::steady_clock::time_point deadline = std::chrono::steady_clock::now()
+ timeout;
//----------------------------------------------------------------------------
// Sleep until something happens in either file descriptor..
//----------------------------------------------------------------------------
struct pollfd polls[2];
polls[0].fd = shutdownFd;
polls[0].events = POLLIN;
polls[1].fd = fd.get();
polls[1].events = POLLOUT;
while(true) {
//--------------------------------------------------------------------------
// Timed-out?
//--------------------------------------------------------------------------
if(deadline < std::chrono::steady_clock::now()) {
return false;
}
int rpoll = poll(polls, 2, 1);
if(rpoll < 0 && errno != EINTR) {
//------------------------------------------------------------------------
// Something is wrong, bail out
//------------------------------------------------------------------------
return false;
}
if(rpoll < 0) {
//------------------------------------------------------------------------
// EINTR
//------------------------------------------------------------------------
continue;
}
if(polls[1].revents != 0) {
//------------------------------------------------------------------------
// An event on our file descriptor.. we could check POLLOUT and POLLERR,
// but getsockopt seems more robust, and we can get the errno on failure.
//------------------------------------------------------------------------
int valopt = 0;
socklen_t optlen = sizeof(int);
if(getsockopt(fd.get(), SOL_SOCKET, SO_ERROR, (void*)(&valopt), &optlen) < 0) {
//----------------------------------------------------------------------
// Not really supposed to happen..
//----------------------------------------------------------------------
localerrno = errno;
error = SSTR("Unable to run getsockopt() after poll(), errno=" << localerrno << strerror(localerrno));
finished = true;
return true;
}
if(valopt == EINTR || valopt == EINPROGRESS) {
//----------------------------------------------------------------------
// Strange, but ok.. retry.. might never happen.
//----------------------------------------------------------------------
continue;
}
finished = true;
if(valopt != 0) {
localerrno = valopt;
error = SSTR("Unable to connect (" << localerrno << ")" << ":" << strerror(localerrno));
return true;
}
//------------------------------------------------------------------------
// Success, connection is active
//------------------------------------------------------------------------
return true;
}
if(polls[0].revents != 0) {
//------------------------------------------------------------------------
// Signalled to break
//------------------------------------------------------------------------
return false;
}
}
}
//------------------------------------------------------------------------------
// Has there been an error yet? Note that, if ::connect is still pending,
// there might be an error in the future.
//------------------------------------------------------------------------------
bool AsyncConnector::ok() const {
return (fd.get() > 0) && (localerrno == 0) && (error.empty());
}
//------------------------------------------------------------------------------
// Get file descriptor - could be -1 if an error has occurred.
//------------------------------------------------------------------------------
int AsyncConnector::release() {
return fd.release();
}
//------------------------------------------------------------------------------
// If an error has occurred, return its errno. Returns 0 if no errors have
// occurred.
//------------------------------------------------------------------------------
int AsyncConnector::getErrno() const {
return localerrno;
}
//------------------------------------------------------------------------------
// If an error has occurred, return a string description. Returns empty string
// if no errors have occurred.
//------------------------------------------------------------------------------
std::string AsyncConnector::getError() const {
return error;
}
}