// ----------------------------------------------------------------------
// File: Raft.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "utils/IntToBinaryString.hh"
#include "raft/RaftTalker.hh"
#include "raft/RaftContactDetails.hh"
#include "raft/RaftTimeouts.hh"
#include "Version.hh"
#include
namespace quarkdb {
class RaftHandshake : public qclient::Handshake {
public:
virtual ~RaftHandshake() override {}
RaftHandshake(const RaftContactDetails &cd)
: contactDetails(cd) {
restart();
}
virtual std::vector provideHandshake() override {
return {"RAFT_HANDSHAKE", VERSION_FULL_STRING, contactDetails.getClusterID(), contactDetails.getRaftTimeouts().toString() };
}
virtual Status validateResponse(const redisReplyPtr &reply) override {
if(!reply) {
return Status::INVALID;
}
if(reply->type != REDIS_REPLY_STATUS) {
return Status::INVALID;
}
if(std::string(reply->str, reply->len) != "OK") {
return Status::INVALID;
}
return Status::VALID_COMPLETE;
}
virtual void restart() override {}
virtual std::unique_ptr clone() const override{
return std::unique_ptr(new RaftHandshake(contactDetails));
}
private:
const RaftContactDetails &contactDetails;
};
//------------------------------------------------------------------------------
// VersionHandshake is used to determine which QDB version a node is at.
//------------------------------------------------------------------------------
class VersionHandshake : public qclient::Handshake {
public:
VersionHandshake() {
version = "N/A";
}
virtual std::vector provideHandshake() override {
return {"QUARKDB_VERSION"};
}
virtual Status validateResponse(const redisReplyPtr &reply) override {
std::unique_lock lock(mtx);
version = "N/A";
if(!reply) {
return Status::INVALID;
}
if(reply->type != REDIS_REPLY_STRING) {
// cannot parse output of quarkdb-version.. maybe the other node
// is running a really old version without support for quarkdb-version
// command.
// TODO(gbitzes): Eventually make this an error, and refuse to communicate
// if the other node is *that* old.
return Status::VALID_COMPLETE;
}
version = std::string(reply->str, reply->len);
return Status::VALID_COMPLETE;
}
virtual void restart() override {
std::unique_lock lock(mtx);
version = "N/A";
}
std::string getVersion() const {
std::unique_lock lock(mtx);
return version;
}
virtual std::unique_ptr clone() const override {
return std::unique_ptr(new VersionHandshake());
}
private:
mutable std::mutex mtx;
std::string version;
};
class QuarkDBLogger : public qclient::Logger {
public:
QuarkDBLogger() {
logLevel = qclient::LogLevel::kWarn;
}
void print(LogLevel level, int line, const std::string &file, const std::string &msg) override {
___log("QCLIENT (" << logLevelToString(level) << "): " << msg);
}
};
RaftTalker::RaftTalker(const RaftServer &server_, const RaftContactDetails &contactDetails, std::string_view name)
: server(server_) {
qclient::Options opts;
opts.transparentRedirects = false;
opts.retryStrategy = qclient::RetryStrategy::NoRetries();
opts.backpressureStrategy = qclient::BackpressureStrategy::Default();
opts.logger.reset(new QuarkDBLogger());
opts.chainHmacHandshake(contactDetails.getPassword());
opts.chainHandshake(std::unique_ptr(new RaftHandshake(contactDetails)));
opts.chainHandshake(std::unique_ptr(new qclient::SetClientNameHandshake(std::string(name))));
// Make a version handshake - capture ownership inside QClient, but keep pointer
// to it here.
versionHandshake = new VersionHandshake();
opts.chainHandshake(std::unique_ptr(versionHandshake));
qcl.reset(new QClient(server.hostname, server.port, std::move(opts)));
}
std::string RaftTalker::getNodeVersion() {
return versionHandshake->getVersion();
}
std::future RaftTalker::heartbeat(RaftTerm term, const RaftServer &leader) {
RedisRequest payload;
payload.emplace_back("RAFT_HEARTBEAT");
payload.emplace_back(std::to_string(term));
payload.emplace_back(leader.toString());
return qcl->execute(payload);
}
std::future RaftTalker::appendEntries(
RaftTerm term, RaftServer leader, LogIndex prevIndex,
RaftTerm prevTerm, LogIndex commit,
const std::vector &entries) {
if(term < prevTerm) {
qdb_throw(SSTR("term < prevTerm.. " << prevTerm << "," << term));
}
RedisRequest payload;
payload.reserve(3 + entries.size());
payload.emplace_back("RAFT_APPEND_ENTRIES");
payload.emplace_back(leader.toString());
char buffer[sizeof(int64_t) * 5];
intToBinaryString(term, buffer + 0*sizeof(int64_t));
intToBinaryString(prevIndex, buffer + 1*sizeof(int64_t));
intToBinaryString(prevTerm, buffer + 2*sizeof(int64_t));
intToBinaryString(commit, buffer + 3*sizeof(int64_t));
intToBinaryString(entries.size(), buffer + 4*sizeof(int64_t));
payload.emplace_back(buffer, 5*sizeof(int64_t));
for(size_t i = 0; i < entries.size(); i++) {
payload.push_back(entries[i]);
qdb_assert(RaftEntry::fetchTerm(entries[i]) <= term);
}
return qcl->execute(payload);
}
std::future RaftTalker::requestVote(const RaftVoteRequest &req, bool preVote) {
RedisRequest payload;
if(preVote) {
payload.emplace_back("RAFT_REQUEST_PRE_VOTE");
}
else {
payload.emplace_back("RAFT_REQUEST_VOTE");
}
payload.emplace_back(std::to_string(req.term));
payload.emplace_back(req.candidate.toString());
payload.emplace_back(std::to_string(req.lastIndex));
payload.emplace_back(std::to_string(req.lastTerm));
return qcl->execute(payload);
}
std::future RaftTalker::fetch(LogIndex index) {
RedisRequest payload;
payload.emplace_back("RAFT_FETCH");
payload.emplace_back(std::to_string(index));
return qcl->execute(payload);
}
std::future RaftTalker::resilveringStart(const ResilveringEventID &id) {
return qcl->exec("quarkdb_start_resilvering", id);
}
std::future RaftTalker::resilveringCopy(const ResilveringEventID &id, const std::string &filename, const std::string &contents) {
return qcl->exec("quarkdb_resilvering_copy_file", id, filename, contents);
}
std::future RaftTalker::resilveringFinish(const ResilveringEventID &id) {
return qcl->exec("quarkdb_finish_resilvering", id);
}
std::future RaftTalker::resilveringCancel(const ResilveringEventID &id, const std::string &reason) {
return qcl->exec("quarkdb_cancel_resilvering");
}
}