// ----------------------------------------------------------------------
// File: hset.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "raft/RaftContactDetails.hh"
#include "StateMachine.hh"
#include "Dispatcher.hh"
#include "../test-utils.hh"
#include "bench-utils.hh"
#include
using namespace quarkdb;
enum class Mode {
kDirect = 1,
kRedisStandalone = 2,
kConsensus = 3
};
std::string modeToStr(Mode mode) {
if(mode == Mode::kDirect) return "direct";
if(mode == Mode::kRedisStandalone) return "standalone";
if(mode == Mode::kConsensus) return "consensus";
qdb_throw("should never happen");
}
struct BenchmarkParams {
int nthreads;
int events;
Mode mode;
BenchmarkParams(int threads, int ev, Mode m) : nthreads(threads), events(ev), mode(m) {}
operator std::string() const {
return SSTR("threads" << nthreads << "_events" << events << "_" << modeToStr(mode));
}
};
class Executor {
public:
Executor(size_t ev) : events(ev) {}
virtual ~Executor() {}
virtual void main(int threadId) = 0;
virtual std::string describe() = 0;
size_t getEvents() {
return events;
}
protected:
size_t events;
};
// We provide this class to avoid the overhead of dynamic dispatch with
// virtual functions.
class HsetProvider {
public:
static inline void handleEventDirect(StateMachine *sm, size_t threadId, size_t eventId) {
bool created;
ASSERT_TRUE(sm->hset(SSTR("key-" << eventId), "field", "some_contents", created, 0).ok());
}
static inline std::future handleEventRedis(qclient::QClient &tunnel, size_t threadId, size_t eventId) {
return tunnel.exec("hset", SSTR("key-" << eventId), "field", "some_contents");
}
static std::string describe() {
return "HSET";
}
};
template
class ExecutorHelper : public Executor {
public:
ExecutorHelper(size_t ev, StateMachine *sm) : Executor(ev), stateMachine(sm) {}
ExecutorHelper(size_t ev, const RaftServer &srv, const std::string &pw)
: Executor(ev), server(srv), password(pw) {}
void main(int threadId) override {
if(stateMachine) {
return mainDirect(threadId);
}
return mainRedis(threadId);
}
void mainDirect(int threadId) {
while(true) {
size_t next = nextEvent++;
if(next > events) break;
TestcaseProvider::handleEventDirect(stateMachine, threadId, next);
}
}
void mainRedis(int threadId) {
qclient::Options opts;
if(!password.empty()) {
opts.handshake.reset(new qclient::HmacAuthHandshake(password));
}
qclient::QClient tunnel(server.hostname, server.port, std::move(opts) );
while(true) {
size_t next = nextEvent++;
if(next > events) break;
TestcaseProvider::handleEventRedis(tunnel, threadId, next);
}
tunnel.exec("ping").get(); // receive all responses
}
std::string describe() override {
return TestcaseProvider::describe();
}
private:
RaftServer server;
StateMachine *stateMachine = nullptr;
std::atomic nextEvent {0};
std::string password;
};
class Benchmarker {
public:
static float measureRate(Executor &executor, size_t nthreads) {
Stopwatch stopwatch(executor.getEvents());
std::vector threads;
for(size_t i = 0; i < nthreads; i++) {
threads.emplace_back(&Executor::main, &executor, i);
}
for(size_t i = 0; i < threads.size(); i++) {
threads[i].join();
}
stopwatch.stop();
return stopwatch.rate();
}
void run(Executor &executor, size_t nthreads) {
qdb_info("Starting benchmark: " << executor.describe());
float rate = measureRate(executor, nthreads);
qdb_info("Benchmark has ended. Rate: " << rate << " Hz");
}
};
template
class BenchmarkHelper : public TestCluster3Nodes {
public:
~BenchmarkHelper() {
if(customPoller) delete customPoller;
if(customDispatcher) delete customDispatcher;
if(executor) delete executor;
}
void createExecutor(const BenchmarkParams ¶ms) {
if(params.mode == Mode::kDirect) {
executor = new ExecutorHelper(params.events, stateMachine());
}
else if(params.mode == Mode::kRedisStandalone) {
customDispatcher = new RedisDispatcher(*stateMachine(), *publisher());
customPoller = new AsioPoller(34567, 10, customDispatcher);
executor = new ExecutorHelper(params.events, {"localhost", 34567}, "");
}
else if(params.mode == Mode::kConsensus) {
spinup(0); spinup(1); spinup(2);
RETRY_ASSERT_TRUE(checkStateConsensus(0, 1, 2));
executor = new ExecutorHelper(params.events, myself(getLeaderID()), contactDetails()->getPassword() );
}
}
protected:
RedisDispatcher *customDispatcher {nullptr};
AsioPoller *customPoller {nullptr};
Executor *executor {nullptr};
};
class hset : public BenchmarkHelper, public ::testing::TestWithParam {
public:
void run() {
createExecutor(GetParam());
Benchmarker benchmarker;
benchmarker.run(*executor, GetParam().nthreads);
}
};
static std::vector generateParams() {
std::vector ret;
for(size_t threads : testconfig.benchmarkThreads.get() ) {
for(size_t events : testconfig.benchmarkEvents.get() ) {
for(Mode mode : {Mode::kDirect, Mode::kRedisStandalone, Mode::kConsensus}) {
ret.emplace_back(threads, events, mode);
}
}
}
return ret;
}
struct BenchmarkParamsPrinter {
template
std::string operator()(const T& info) const {
return info.param;
}
};
INSTANTIATE_TEST_SUITE_P(Benchmark,
hset,
::testing::ValuesIn(generateParams()),
BenchmarkParamsPrinter());
TEST_P(hset, hset) {
run();
}