// ----------------------------------------------------------------------
// File: AsioPoller.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "netio/AsioPoller.hh"
#include "Link.hh"
#include "Connection.hh"
#include
namespace quarkdb {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
AsioPoller::AsioPoller(int port, size_t threadPoolSize, Dispatcher *disp)
: mPort(port), mThreadPoolSize(threadPoolSize), mDispatcher(disp),
mResolver(mContext),
mAcceptor4(mContext),
mAcceptor6(mContext),
mNextSocket4(mContext),
mNextSocket6(mContext)
{
std::error_code ec;
mAcceptor4.open(asio::ip::tcp::v4(), ec);
if(ec.value() == 0) {
mAcceptor4.set_option(asio::socket_base::reuse_address(true));
mAcceptor4.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), mPort));
mAcceptor4.listen();
}
mAcceptor6.open(asio::ip::tcp::v6(), ec);
if(ec.value() == 0) {
mAcceptor6.set_option(asio::socket_base::reuse_address(true));
mAcceptor6.set_option(asio::ip::v6_only(true), ec);
mAcceptor6.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v6(), mPort));
mAcceptor6.listen();
}
requestAccept4();
requestAccept6();
for(size_t i = 0; i < mThreadPoolSize; i++) {
mThreadPool.emplace_back(&AsioPoller::workerThread, this);
}
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
AsioPoller::~AsioPoller() {
mShutdown = true;
std::unique_lock lock(mAcceptorMtx);
mAcceptor4.close();
mAcceptor6.close();
lock.unlock();
mContext.stop();
mInFlightTracker.setAcceptingRequests(false);
for(size_t i = 0; i < mThreadPool.size(); i++) {
mThreadPool[i].join();
}
mEntries.clear();
}
//------------------------------------------------------------------------------
// Thread pool
//------------------------------------------------------------------------------
void AsioPoller::workerThread(ThreadAssistant &assistant) {
mContext.run();
}
//------------------------------------------------------------------------------
// Request next async accept
//------------------------------------------------------------------------------
void AsioPoller::requestAccept4() {
mNextSocket4 = asio::ip::tcp::socket(mContext);
std::scoped_lock lock(mAcceptorMtx);
mAcceptor4.async_accept(mNextSocket4, std::bind(&AsioPoller::handleAccept4, this, std::placeholders::_1));
}
void AsioPoller::requestAccept6() {
mNextSocket6 = asio::ip::tcp::socket(mContext);
std::scoped_lock lock(mAcceptorMtx);
mAcceptor6.async_accept(mNextSocket6, std::bind(&AsioPoller::handleAccept6, this, std::placeholders::_1));
}
//------------------------------------------------------------------------------
// Handle incoming TCP connect
//------------------------------------------------------------------------------
void AsioPoller::handleAccept4(const std::error_code& ec) {
if(!ec) {
handleAccept(std::move(mNextSocket4));
}
if(!mShutdown) {
requestAccept4();
}
}
void AsioPoller::handleAccept6(const std::error_code& ec) {
if(!ec) {
handleAccept(std::move(mNextSocket6));
}
if(!mShutdown) {
requestAccept6();
}
}
void AsioPoller::handleAccept(asio::ip::tcp::socket socket) {
std::error_code ec;
socket.non_blocking(true, ec);
if(ec) {
// client connected and disconnected immediately, before we even
// had a chance to set the socket to non-blocking
return;
}
asio::ip::tcp::endpoint remoteEndpoint = socket.remote_endpoint(ec);
std::shared_ptr socketPtr;
socketPtr.reset(new asio::ip::tcp::socket(std::move(socket)));
mResolver.async_resolve(remoteEndpoint,
std::bind(&AsioPoller::handleResolve, this, socketPtr, std::placeholders::_1, std::placeholders::_2));
}
//------------------------------------------------------------------------------
// Handle resolve
//------------------------------------------------------------------------------
void AsioPoller::handleResolve(std::shared_ptr socketPtr, const std::error_code &ec,
asio::ip::tcp::resolver::iterator resultIterator) {
std::string resolvedHostname = "N/A";
asio::ip::tcp::resolver::iterator end;
if(resultIterator != end) {
resolvedHostname = resultIterator->host_name();
}
asio::ip::tcp::socket socket = std::move(*socketPtr.get());
std::unique_ptr activeEntry;
activeEntry.reset(new ActiveEntry(std::move(socket)));
qclient::TlsConfig tlsconfig;
activeEntry->link = new Link(activeEntry->socket, resolvedHostname, tlsconfig);
activeEntry->conn = new Connection(activeEntry->link);
ActiveEntry *ptr = activeEntry.get();
std::scoped_lock lock(mEntriesMtx);
mEntries[ptr] = std::move(activeEntry);
ptr->socket.async_wait(asio::ip::tcp::socket::wait_read,
std::bind(&AsioPoller::handleWait, this, ptr, std::placeholders::_1));
}
//------------------------------------------------------------------------------
// ActiveEntry destructor
//------------------------------------------------------------------------------
ActiveEntry::~ActiveEntry() {
delete conn;
delete link;
}
//------------------------------------------------------------------------------
// Handle wait
//------------------------------------------------------------------------------
void AsioPoller::handleWait(ActiveEntry *entry, const std::error_code& ec) {
LinkStatus status = entry->conn->processRequests(mDispatcher, mInFlightTracker);
if(ec.value() == 0 && status >= 0) {
entry->socket.async_wait(asio::ip::tcp::socket::wait_read,
std::bind(&AsioPoller::handleWait, this, entry, std::placeholders::_1));
}
else {
std::scoped_lock lock(mEntriesMtx);
mEntries.erase(entry);
}
}
}