// ----------------------------------------------------------------------
// File: RequestCounter.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "Utils.hh"
#include "utils/RequestCounter.hh"
#include "Commands.hh"
#include "redis/Transaction.hh"
using namespace quarkdb;
RequestCounter::RequestCounter(std::chrono::seconds intv)
: interval(intv), historical(100), thread(&RequestCounter::mainThread, this) {
thread.setName("request-count-reporter");
}
void RequestCounter::account(const RedisRequest &req, Statistics *stats) {
if(req.getCommandType() == CommandType::READ) {
stats->reads++;
}
else if(req.getCommandType() == CommandType::WRITE) {
stats->writes++;
}
}
void RequestCounter::account(const RedisRequest &req) {
Statistics *stats = aggregator.getStats();
account(req, stats);
}
void RequestCounter::account(const Transaction &transaction) {
Statistics *stats = aggregator.getStats();
if(transaction.containsWrites()) {
stats->txreadwrite++;
}
else {
stats->txread++;
}
for(size_t i = 0; i < transaction.size(); i++) {
account(transaction[i], stats);
}
}
std::string RequestCounter::toRate(int64_t val) {
return SSTR("(" << val / interval.count() << " Hz)");
}
void RequestCounter::setReportingStatus(bool val) {
activated = val;
}
Statistics RequestCounter::getOverallStats() {
return aggregator.getOverallStats();
}
void RequestCounter::mainThread(ThreadAssistant &assistant) {
while(!assistant.terminationRequested()) {
Statistics local = aggregator.getOverallStatsSinceLastTime();
if(local.reads != 0 || local.writes != 0) {
paused = false;
if(activated) {
qdb_info("During the last " << interval.count() << " seconds, I serviced " << local.reads << " reads " << toRate(local.reads) << ", and " << local.writes << " writes " << toRate(local.writes) << " over " << local.txreadwrite << " write transactions");
}
}
else if(!paused) {
paused = true;
if(activated) {
qdb_info("No reads or writes during the last " << interval.count() << " seconds - will report again once load re-appears.");
}
}
historical.push(local, std::chrono::system_clock::now());
assistant.wait_for(interval);
}
}
void RequestCounter::fillHistorical(std::vector &headers,
std::vector> &data) {
headers.clear();
data.clear();
headers.emplace_back("TOTALS");
data.emplace_back(aggregator.getOverallStats().serialize());
historical.serialize(headers, data);
}