// ----------------------------------------------------------------------
// File: RaftReplicator.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include
#include
#include "raft/RaftReplicator.hh"
#include "raft/RaftTalker.hh"
#include "raft/RaftUtils.hh"
#include "raft/RaftResilverer.hh"
#include "raft/RaftConfig.hh"
#include "raft/RaftState.hh"
#include "raft/RaftJournal.hh"
#include "raft/RaftCommitTracker.hh"
#include "raft/RaftTimeouts.hh"
#include "raft/RaftLease.hh"
#include "raft/RaftTrimmer.hh"
#include "raft/RaftContactDetails.hh"
#include "utils/FileUtils.hh"
#include
#include
using namespace quarkdb;
RaftReplicator::RaftReplicator(RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftContactDetails &cd)
: journal(journal_), state(state_), lease(lease_), commitTracker(ct), trimmer(trim), shardDirectory(sharddir), config(conf), contactDetails(cd) {
}
RaftReplicator::~RaftReplicator() {
deactivate();
}
RaftReplicaTracker::RaftReplicaTracker(const RaftServer &target_, const RaftStateSnapshotPtr &snapshot_, RaftJournal &journal_, RaftState &state_, RaftLease &lease_, RaftCommitTracker &ct, RaftTrimmer &trim, ShardDirectory &sharddir, RaftConfig &conf, const RaftContactDetails &cd)
: target(target_), snapshot(snapshot_), journal(journal_),
state(state_), lease(lease_), commitTracker(ct), trimmer(trim), shardDirectory(sharddir), config(conf), contactDetails(cd),
matchIndex(commitTracker.getHandler(target)),
lastContact(lease.getHandler(target)),
trimmingBlock(trimmer, 0) {
if(target == state.getMyself()) {
qdb_throw("attempted to run replication on myself");
}
RaftStateSnapshotPtr current = state.getSnapshot();
if(snapshot->term > current->term) {
qdb_throw("bug, a state snapshot has a larger term than the current state");
}
if(snapshot->term < current->term) {
return;
}
if(current->status != RaftStatus::LEADER && current->status != RaftStatus::SHUTDOWN) {
qdb_throw("bug, attempted to initiate replication for a term in which I'm not a leader");
}
running = true;
thread = std::thread(&RaftReplicaTracker::main, this);
heartbeatThread.reset(&RaftReplicaTracker::sendHeartbeats, this);
heartbeatThread.setName(SSTR("heartbeat-thread-for-" << target.toString()));
}
RaftReplicaTracker::~RaftReplicaTracker() {
shutdown = 1;
while(running) {
journal.notifyWaitingThreads();
}
if(thread.joinable()) {
thread.join();
}
}
bool RaftReplicaTracker::buildPayload(LogIndex nextIndex, int64_t payloadLimit,
std::vector &entries, RaftTerm &lastEntryTerm) {
int64_t payloadSize = std::min(payloadLimit, journal.getLogSize() - nextIndex);
entries.resize(payloadSize);
RaftJournal::Iterator iterator = journal.getIterator(nextIndex, true);
RaftTerm entryTerm = -1;
for(int64_t i = nextIndex; i < nextIndex+payloadSize; i++) {
if(!iterator.valid()) {
qdb_critical("could not fetch entry with index " << i << " .. aborting building payload");
return false;
}
iterator.current(entries[i-nextIndex]);
entryTerm = RaftEntry::fetchTerm(entries[i-nextIndex]);
if(snapshot->term < entryTerm) {
qdb_warn("Found journal entry with higher term than my snapshot, " << snapshot->term << " vs " << entryTerm);
return false;
}
iterator.next();
}
lastEntryTerm = entryTerm;
return true;
}
enum class AppendEntriesReception {
kOk = 0,
kNotArrivedYet = 1,
kError = 2
};
static AppendEntriesReception retrieve_response(
std::future &fut,
RaftAppendEntriesResponse &resp,
const std::chrono::milliseconds &timeout
) {
std::future_status status = fut.wait_for(timeout);
if(status != std::future_status::ready) {
return AppendEntriesReception::kNotArrivedYet;
}
redisReplyPtr rep = fut.get();
if(rep == nullptr) return AppendEntriesReception::kError;
if(!RaftParser::appendEntriesResponse(rep, resp)) {
if(strncmp(rep->str, "ERR unavailable", strlen("ERR unavailable")) != 0) {
// unexpected response
qdb_critical("cannot parse response from append entries");
}
return AppendEntriesReception::kError;
}
return AppendEntriesReception::kOk;
}
static bool retrieve_heartbeat_reply(std::future &fut, RaftHeartbeatResponse &resp) {
std::future_status status = fut.wait_for(std::chrono::milliseconds(500));
if(status != std::future_status::ready) {
return false;
}
redisReplyPtr rep = fut.get();
if(rep == nullptr) return false;
if(!RaftParser::heartbeatResponse(rep, resp)) {
if(strncmp(rep->str, "ERR unavailable", strlen("ERR unavailable")) != 0) {
// unexpected response
qdb_critical("cannot parse response from heartbeat");
}
return false;
}
return true;
}
void RaftReplicaTracker::triggerResilvering() {
// Check: Already resilvering target?
if(resilverer && resilverer->getStatus().state == ResilveringState::INPROGRESS) {
return;
}
if(resilverer && resilverer->getStatus().state == ResilveringState::FAILED) {
qdb_critical("Resilvering attempt for " << target.toString() << " failed: " << resilverer->getStatus().err);
resilverer.reset();
// Try again during the next round
return;
}
// Start the resilverer
resilverer.reset(new RaftResilverer(shardDirectory, target, contactDetails, trimmer));
}
class ConditionVariableNotifier {
public:
ConditionVariableNotifier(std::mutex &mtx_, std::condition_variable &cv_)
: mtx(mtx_), cv(cv_) {}
~ConditionVariableNotifier() {
std::unique_lock lock(mtx);
cv.notify_one();
}
private:
std::mutex &mtx;
std::condition_variable &cv;
};
void RaftReplicaTracker::monitorAckReception(ThreadAssistant &assistant) {
ConditionVariableNotifier destructionNotifier(inFlightMtx, inFlightPoppedCV);
std::unique_lock lock(inFlightMtx);
while(!assistant.terminationRequested()) {
if(inFlight.size() == 0) {
// Empty queue, sleep
inFlightCV.wait_for(lock, contactDetails.getRaftTimeouts().getHeartbeatInterval());
continue;
}
// Fetch top item
PendingResponse item = std::move(inFlight.front());
inFlight.pop();
inFlightPoppedCV.notify_one();
lock.unlock();
RaftAppendEntriesResponse response;
size_t attempts = 0;
while(attempts < 10) {
if(assistant.terminationRequested()) {
streamingUpdates = false;
return;
}
AppendEntriesReception reception = retrieve_response(
item.fut,
response,
std::chrono::milliseconds(500)
);
if(reception == AppendEntriesReception::kOk) {
// Exit inner loop to verify acknowledgement
break;
}
if(reception == AppendEntriesReception::kError) {
// Stop streaming, we need to stabilize the target
streamingUpdates = false;
return;
}
}
// If we're here, an acknowledgement to AppendEntries has been received.
// Verify it makes sense.
state.observed(response.term, {});
if(!response.outcome) {
streamingUpdates = false;
return;
}
if(response.term != snapshot->term) {
streamingUpdates = false;
return;
}
if(response.logSize != item.pushedFrom + item.payloadSize) {
qdb_warn("Mismatch in expected logSize when streaming updates: response.logsize: " << response.logSize <<
", pushedFrom: " << item.pushedFrom << ", payloadSize: " << item.payloadSize);
streamingUpdates = false;
return;
}
// All clear, acknowledgement is OK, carry on.
updateStatus(true, response.logSize);
lastContact.heartbeat(item.sent);
// Only update the commit tracker once we're replicating entries from our
// snapshot term. (Figure 8 and section 5.4.2 from the raft paper)
if(item.lastEntryTerm == snapshot->term) {
matchIndex.update(response.logSize-1);
}
// Progress trimming block.
trimmingBlock.enforce(response.logSize-2);
lock.lock();
}
streamingUpdates = false;
}
bool RaftReplicaTracker::sendPayload(RaftTalker &talker, LogIndex nextIndex, int64_t payloadLimit,
std::future &reply, std::chrono::steady_clock::time_point &contact, int64_t &payloadSize,
RaftTerm &lastEntryTerm) {
RaftTerm prevTerm;
if(!journal.fetch(nextIndex-1, prevTerm).ok()) {
qdb_critical("unable to fetch log entry " << nextIndex-1 << " when tracking " << target.toString() << ". My log start: " << journal.getLogStart());
state.observed(snapshot->term+1, {});
return false;
}
if(snapshot->term < prevTerm) {
qdb_warn("Last journal entry has higher term than my snapshot, halting replication.");
state.observed(snapshot->term+1, {});
return false;
}
// It's critical that we retrieve the commit index before the actual entries.
// The following could happen:
// - We build the payload.
// - We recognize a different leader in the meantime.
// - The other leader overwrites some of our enties as inconsistent, and progresses
// our commit index.
// - We now send an AppendEntries to the poor target, marking potentially inconsistent
// entries as committed.
// - The target crashes after detecting journal inconsistency once the new
// leader tries to replicate entries onto it.
LogIndex commitIndexForTarget = journal.getCommitIndex();
std::vector entries;
if(!buildPayload(nextIndex, payloadLimit, entries, lastEntryTerm)) {
state.observed(snapshot->term+1, {});
return false;
}
payloadSize = entries.size();
contact = std::chrono::steady_clock::now();
reply = talker.appendEntries(
snapshot->term,
state.getMyself(),
nextIndex-1,
prevTerm,
commitIndexForTarget,
entries
);
return true;
}
LogIndex RaftReplicaTracker::streamUpdates(RaftTalker &talker, LogIndex firstNextIndex) {
// If we're here, it means our target is very stable, so we should be able to
// continuously stream updates without waiting for the replies.
//
// As soon as an error is discovered we return, and let the parent function
// deal with it to stabilize the target once more.
streamingUpdates = true;
AssistedThread ackmonitor(&RaftReplicaTracker::monitorAckReception, this);
ackmonitor.setName(SSTR("streaming-replication-ack-monitor-for-" << SSTR(target.toString())));
const int64_t payloadLimit = 512;
LogIndex nextIndex = firstNextIndex;
while(shutdown == 0 && streamingUpdates && state.isSnapshotCurrent(snapshot.get())) {
std::chrono::steady_clock::time_point contact;
std::future fut;
int64_t payloadSize;
RaftTerm lastEntryTerm;
if(!sendPayload(talker, nextIndex, payloadLimit, fut, contact, payloadSize, lastEntryTerm)) {
qdb_warn("Unexpected error when sending payload to target " << target.toString() << ", halting replication");
break;
}
std::unique_lock lock(inFlightMtx);
inFlight.emplace(
std::move(fut),
contact,
nextIndex,
payloadSize,
lastEntryTerm
);
inFlightCV.notify_one();
while(inFlight.size() >= 512 && shutdown == 0 && streamingUpdates && state.isSnapshotCurrent(snapshot.get())) {
inFlightPoppedCV.wait_for(lock, contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
lock.unlock();
// Assume a positive response from the target, and keep pushing
// if there are more entries.
nextIndex += payloadSize;
if(nextIndex >= journal.getLogSize()) {
journal.waitForUpdates(nextIndex, contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
else {
// fire next round
}
}
// Again, no guarantees this is the actual, current logSize of the target,
// but the parent will figure it out.
return nextIndex;
}
void RaftReplicaTracker::updateStatus(bool online, LogIndex logSize) {
statusOnline = online;
statusLogSize = logSize;
if(resilverer) {
statusResilveringProgress.set(SSTR(resilverer->getProgress() << "/" << resilverer->getTotalToSend()));
}
else {
statusResilveringProgress.set("");
}
}
ReplicaStatus RaftReplicaTracker::getStatus() {
return { target, statusOnline, statusLogSize, statusNodeVersion.get(), statusResilveringProgress.get() };
}
void RaftReplicaTracker::sendHeartbeats(ThreadAssistant &assistant) {
RaftTalker talker(target, contactDetails, "internal-heartbeat-sender");
while(!assistant.terminationRequested() && shutdown == 0 && state.isSnapshotCurrent(snapshot.get())) {
statusNodeVersion.set(talker.getNodeVersion());
std::chrono::steady_clock::time_point contact = std::chrono::steady_clock::now();
std::future fut = talker.heartbeat(snapshot->term, state.getMyself());
RaftHeartbeatResponse resp;
if(!retrieve_heartbeat_reply(fut, resp)) {
goto nextRound;
}
state.observed(resp.term, {});
if(snapshot->term < resp.term || !resp.nodeRecognizedAsLeader) continue;
lastContact.heartbeat(contact);
nextRound:
state.wait(contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
}
class OnlineTracker {
public:
OnlineTracker() : online(false) { }
void seenOnline(){
online = true;
lastSeen = std::chrono::steady_clock::now();
}
void seenOffline() {
online = false;
}
bool isOnline() const {
return online;
}
bool hasBeenOfflineForLong() const {
if(online) return false;
return std::chrono::duration_cast(std::chrono::steady_clock::now() - lastSeen) > std::chrono::minutes(1);
}
private:
bool online;
std::chrono::steady_clock::time_point lastSeen;
};
void RaftReplicaTracker::main() {
RaftTalker talker(target, contactDetails, "internal-replicator");
LogIndex nextIndex = journal.getLogSize();
RaftMatchIndexTracker &matchIndex = commitTracker.getHandler(target);
RaftLastContact &lastContact = lease.getHandler(target);
OnlineTracker onlineTracker;
int64_t payloadLimit = 1;
bool warnStreamingHiccup = false;
bool needResilvering = false;
while(shutdown == 0 && state.isSnapshotCurrent(snapshot.get())) {
if(warnStreamingHiccup) {
qdb_warn("Hiccup during streaming replication of " << target.toString() << ", switching back to conservative replication.");
warnStreamingHiccup = false;
}
// Target looks pretty stable, start continuous stream
if(onlineTracker.isOnline() && payloadLimit >= 8) {
qdb_info("Target " << target.toString() << " appears stable, initiating streaming replication.");
resilverer.reset();
nextIndex = streamUpdates(talker, nextIndex);
inFlight = std::queue(); // clear queue
warnStreamingHiccup = true;
onlineTracker.seenOnline();
// Something happened when streaming updates, switch back to conservative
// mode and wait for each response
payloadLimit = 1;
continue;
}
if(nextIndex <= 0) qdb_throw("nextIndex has invalid value: " << nextIndex);
if(nextIndex <= journal.getLogStart()) nextIndex = journal.getLogSize();
std::chrono::steady_clock::time_point contact;
std::future fut;
int64_t payloadSize;
RaftTerm lastEntryTerm;
if(!sendPayload(talker, nextIndex, payloadLimit, fut, contact, payloadSize, lastEntryTerm)) {
qdb_warn("Unexpected error when sending payload to target " << target.toString() << ", halting replication");
break;
}
RaftAppendEntriesResponse resp;
// Check: Is the target even online?
if(retrieve_response(fut, resp, std::chrono::milliseconds(500)) != AppendEntriesReception::kOk) {
if(onlineTracker.isOnline()) {
payloadLimit = 1;
qdb_event("Replication target " << target.toString() << " went offline.");
onlineTracker.seenOffline();
}
goto nextRound;
}
if(!onlineTracker.isOnline()) {
// Print an event if the target just came back online
onlineTracker.seenOnline();
qdb_event("Replication target " << target.toString() << " came back online. Log size: " << resp.logSize << ", lagging " << (journal.getLogSize() - resp.logSize) << " entries behind me. (approximate)");
}
state.observed(resp.term, {});
if(snapshot->term < resp.term) continue;
lastContact.heartbeat(contact);
// Check: Does the target need resilvering?
if(resp.logSize <= journal.getLogStart()) {
nextIndex = journal.getLogSize();
if(!needResilvering) {
qdb_event("Unable to perform replication on " << target.toString() << ", it's too far behind (its logsize: " << resp.logSize << ") and my journal starts at " << journal.getLogStart() << ".");
needResilvering = true;
payloadLimit = 1;
}
if(config.getResilveringEnabled()) {
triggerResilvering();
}
goto nextRound;
}
needResilvering = false;
resilverer.reset();
// Check: Is my current view of the target's journal correct? (nextIndex)
if(!resp.outcome) {
// never try to touch entry #0
if(nextIndex >= 2 && nextIndex <= resp.logSize) {
// There are journal inconsistencies. Move back a step to remove a single
// inconsistent entry in the next round.
nextIndex--;
} else if(resp.logSize > 0) {
// Our nextIndex is outdated, update
nextIndex = resp.logSize;
}
goto nextRound;
}
// All checks have passed
if(nextIndex+payloadSize != resp.logSize) {
qdb_warn("mismatch in expected logSize. nextIndex = " << nextIndex << ", payloadSize = " << payloadSize << ", logSize: " << resp.logSize << ", resp.term: " << resp.term << ", my term: " << snapshot->term << ", journal size: " << journal.getLogSize());
}
// Only update the commit tracker once we're replicating entries from our
// snapshot term. (Figure 8 and section 5.4.2 from the raft paper)
if(lastEntryTerm == snapshot->term) {
matchIndex.update(resp.logSize-1);
}
nextIndex = resp.logSize;
if(payloadLimit < 1024) {
payloadLimit *= 2;
}
nextRound:
if(onlineTracker.hasBeenOfflineForLong()) {
// Don't let a "permanently offline" node block journal trimming indefinitely
trimmingBlock.lift();
}
else {
trimmingBlock.enforce(nextIndex-2);
}
updateStatus(onlineTracker.isOnline(), resp.logSize);
if(!onlineTracker.isOnline() || needResilvering) {
state.wait(contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
else if(onlineTracker.isOnline() && nextIndex >= journal.getLogSize()) {
journal.waitForUpdates(nextIndex, contactDetails.getRaftTimeouts().getHeartbeatInterval());
}
else {
// don't wait, fire next round of updates
}
}
qdb_event("Shutting down replicator tracker for " << target.toString());
running = false;
}
void RaftReplicator::activate(RaftStateSnapshotPtr &snapshot_) {
std::scoped_lock lock(mtx);
qdb_event("Activating replicator for term " << snapshot_->term);
qdb_assert(targets.empty());
snapshot = snapshot_;
commitTracker.reset();
reconfigure();
}
void RaftReplicator::deactivate() {
std::scoped_lock lock(mtx);
qdb_event("De-activating replicator");
for(auto it = targets.begin(); it != targets.end(); it++) {
delete it->second;
}
targets.clear();
snapshot = {};
commitTracker.reset();
}
ReplicationStatus RaftReplicator::getStatus() {
std::scoped_lock lock(mtx);
ReplicationStatus ret;
for(auto it = targets.begin(); it != targets.end(); it++) {
ret.addReplica(it->second->getStatus());
}
ret.shakyQuorum = lease.getShakyQuorumDeadline() < std::chrono::steady_clock::now();
return ret;
}
static std::vector all_servers_except_myself(const std::vector &nodes, const RaftServer &myself) {
std::vector remaining;
size_t skipped = 0;
for(size_t i = 0; i < nodes.size(); i++) {
if(myself == nodes[i]) {
if(skipped != 0) qdb_throw("found myself in the nodes list twice");
skipped++;
continue;
}
remaining.push_back(nodes[i]);
}
if(skipped != 1) qdb_throw("unexpected value for 'skipped', got " << skipped << " instead of 1");
if(remaining.size() != nodes.size()-1) qdb_throw("unexpected size for remaining: " << remaining.size() << " instead of " << nodes.size()-1);
return remaining;
}
void RaftReplicator::reconfigure() {
RaftMembership membership = journal.getMembership();
qdb_info("Reconfiguring replicator for membership epoch " << membership.epoch);
// Build list of targets
std::vector full_nodes = all_servers_except_myself(membership.nodes, state.getMyself());
std::vector targets = full_nodes;
// add observers
for(const RaftServer& srv : membership.observers) {
if(srv == state.getMyself()) qdb_throw("found myself in the list of observers, even though I'm leader: " << serializeNodes(membership.observers));
targets.push_back(srv);
}
// reconfigure lease and commit tracker - only take into account full nodes!
commitTracker.updateTargets(full_nodes);
lease.updateTargets(full_nodes);
// now set them
setTargets(targets);
}
void RaftReplicator::setTargets(const std::vector &newTargets) {
std::scoped_lock lock(mtx);
// add targets?
for(size_t i = 0; i < newTargets.size(); i++) {
if(targets.find(newTargets[i]) == targets.end()) {
targets[newTargets[i]] = new RaftReplicaTracker(newTargets[i], snapshot, journal, state, lease, commitTracker, trimmer, shardDirectory, config, contactDetails);
}
}
// remove targets?
std::vector todel;
for(auto it = targets.begin(); it != targets.end(); it++) {
if(!contains(newTargets, it->first)) {
todel.push_back(it->first);
}
}
for(size_t i = 0; i < todel.size(); i++) {
delete targets[todel[i]];
targets.erase(todel[i]);
}
}