// ----------------------------------------------------------------------
// File: RaftCommitTracker.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "raft/RaftUtils.hh"
#include "raft/RaftCommitTracker.hh"
#include "raft/RaftJournal.hh"
#include
#include
using namespace quarkdb;
RaftMatchIndexTracker::RaftMatchIndexTracker(RaftCommitTracker &tr, const RaftServer &srv)
: tracker(tr), server(srv) {
}
void RaftMatchIndexTracker::update(LogIndex newMatchIndex) {
if(newMatchIndex < matchIndex) {
qdb_critical("Detected reduction in matchIndex: Target's journal went back in time: " << matchIndex << " ==> " << newMatchIndex);
}
matchIndex = newMatchIndex;
tracker.updated(matchIndex);
}
RaftCommitTracker::RaftCommitTracker(RaftJournal &jr)
: journal(jr) {
updateTargets(journal.getMembership().nodes);
}
RaftCommitTracker::~RaftCommitTracker() {
reset();
}
void RaftCommitTracker::reset() {
for(auto it = registrations.begin(); it != registrations.end(); it++) {
delete it->second;
}
registrations.clear();
commitIndex = 0;
}
RaftMatchIndexTracker& RaftCommitTracker::getHandler(const RaftServer &srv) {
std::scoped_lock lock(mtx);
return this->getHandlerInternal(srv);
}
RaftMatchIndexTracker& RaftCommitTracker::getHandlerInternal(const RaftServer &srv) {
auto it = registrations.find(srv);
if(it == registrations.end()) {
registrations[srv] = new RaftMatchIndexTracker(*this, srv);
}
return *registrations[srv];
}
void RaftCommitTracker::updateTargets(const std::vector &trgt) {
std::scoped_lock lock(mtx);
// shut autoCommitter down, if running
autoCommitter.join();
// clear the map of the old targets
targets.clear();
// update to new targets - the matchIndex is NOT lost
// for servers which exist in both sets!
quorumSize = calculateQuorumSize(trgt.size() + 1);
qdb_assert(quorumSize > 0);
if(quorumSize == 1) {
qdb_assert(trgt.empty());
autoCommitter.reset(&RaftCommitTracker::runAutoCommit, this);
autoCommitter.setName("autocommitter");
}
for(const RaftServer& target : trgt) {
targets[target] = &this->getHandlerInternal(target);
}
}
void RaftCommitTracker::runAutoCommit(ThreadAssistant &assistant) {
qdb_assert(quorumSize == 1);
LogIndex commitIndex = journal.getCommitIndex();
while(true) {
journal.waitForUpdates(commitIndex+1, std::chrono::milliseconds(500));
if(assistant.terminationRequested()) return;
// Progress commit index?
commitIndex = journal.getCommitIndex();
if(journal.getLogSize()-1 != commitIndex) {
qdb_assert(journal.setCommitIndex(journal.getLogSize() - 1));
}
}
}
void RaftCommitTracker::updateCommitIndex(LogIndex newCommitIndex) {
LogIndex journalCommitIndex = journal.getCommitIndex();
if(newCommitIndex < journalCommitIndex) {
qdb_warn("calculated a commitIndex which is smaller than journal.commitIndex: " << newCommitIndex << ", " << journalCommitIndex << ". Will be unable to commit new entries until this is resolved.");
commitIndexLagging = true;
}
else {
if(commitIndexLagging) {
qdb_info("commitIndex no longer lagging behind journal.commitIndex, committing of new entries is now possible again.");
commitIndexLagging = false;
}
commitIndex = newCommitIndex;
journal.setCommitIndex(commitIndex);
}
}
void RaftCommitTracker::recalculateCommitIndex() {
// If targets are empty, auto-committer is active and we don't need to
// recalculate the commit index.
if(targets.empty()) return;
// remember, we also take into account the current node, which is a leader.
// (otherwise we wouldn't be running the commit tracker)
// The leader is by definition always up-to-date, so we don't run
// a RaftMatchIndexTracker on it. But it has to be taken into account in the
// commitIndex calculation.
matchIndexes.resize(targets.size());
size_t i = 0;
for(auto it = targets.begin(); it != targets.end(); it++) {
matchIndexes[i++] = it->second->get();
}
std::sort(matchIndexes.begin(), matchIndexes.end());
size_t threshold = (matchIndexes.size()+1) - quorumSize;
updateCommitIndex(matchIndexes[threshold]);
}
void RaftCommitTracker::updated(LogIndex val) {
std::scoped_lock lock(mtx);
if(val <= commitIndex) return; // nothing to do, we've already notified journal of the change
recalculateCommitIndex();
}