//------------------------------------------------------------------------------
// File: WriterThread.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "WriterThread.hh"
#include "ConnectionCore.hh"
#include "network/NetworkStream.hh"
#include "qclient/Handshake.hh"
#include "qclient/Logger.hh"
#include
#define DBG(message) std::cerr << __FILE__ << ":" << __LINE__ << " -- " << #message << " = " << message << std::endl
using namespace qclient;
WriterThread::WriterThread(Logger *log, ConnectionCore &core, EventFD &shutdownFD)
: logger(log), connectionCore(core), shutdownEventFD(shutdownFD) { }
WriterThread::~WriterThread() {
deactivate();
}
void WriterThread::activate(NetworkStream *stream) {
connectionCore.setBlockingMode(true);
thread.reset(&WriterThread::eventLoop, this, stream);
}
void WriterThread::deactivate() {
thread.stop();
connectionCore.setBlockingMode(false);
thread.join();
}
void WriterThread::eventLoop(NetworkStream *networkStream, ThreadAssistant &assistant) {
struct pollfd polls[2];
polls[0].fd = shutdownEventFD.getFD();
polls[0].events = POLLIN;
polls[1].fd = networkStream->getFd();
polls[1].events = POLLOUT;
StagedRequest *beingProcessed = nullptr;
size_t bytesWritten = 0;
bool canWrite = true;
while(!assistant.terminationRequested() && networkStream->ok()) {
// What should we do during this round?
if(!canWrite) {
// We have data to write but cannot, because the kernel buffers are full.
// Poll until the socket is writable.
int rpoll = poll(polls, 2, -1);
if(rpoll < 0 && errno != EINTR) {
QCLIENT_LOG(logger, LogLevel::kError,
"error during poll() in WriterThread::eventLoop. errno="
<< errno << ":" << strerror(errno));
}
canWrite = true; // try writing again, regardless of poll outcome
}
// Determine what exactly we should be writing into the socket. getNextToWrite
// will block until there's something to write, or shutdown has been requested.
if(beingProcessed == nullptr) {
bytesWritten = 0;
beingProcessed = connectionCore.getNextToWrite();
if(!beingProcessed) continue;
if (connectionCore.hasPerfCb()) {
beingProcessed->setTimestamp();
}
}
// The socket is writable AND there's staged requests waiting to be written.
int bytes;
bytes = networkStream->send(
beingProcessed->getBuffer() + bytesWritten,
beingProcessed->getLen() - bytesWritten
);
// Determine what happened during sending.
if(bytes < 0 && errno == EWOULDBLOCK) {
// Recoverable error: EWOULDBLOCK
// All is good, we just need to poll before writing again.
canWrite = false;
continue;
}
if(bytes < 0) {
// Non-recoverable error, this looks bad. Kill connection.
QCLIENT_LOG(logger, LogLevel::kError, "Bad return value from send(): "
<< bytes << ", errno: " << errno << "," << strerror(errno));
networkStream->shutdown();
// Stop the loop. The parent class will activate us again with a
// new network stream if need be.
return;
}
// Seems good, at least some bytes were written. Whoo!
bytesWritten += bytes;
if(bytesWritten > beingProcessed->getLen()) {
QCLIENT_LOG(logger, LogLevel::kFatal, "Wrote more bytes for a request than its length: "
<< bytesWritten << ", " << beingProcessed->getLen());
std::abort();
}
// Are we done with 'beingProcessed' yet?
if(bytesWritten == beingProcessed->getLen()) {
// Yep, set to null and process next one.
beingProcessed = nullptr;
}
else {
// Fewer bytes were written than the full length of the request, the
// kernel buffers must be full. Poll until the socket is writable.
canWrite = false;
}
}
}