// ----------------------------------------------------------------------
// File: RaftWriteTracker.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "raft/RaftBlockedWrites.hh"
#include "raft/RaftWriteTracker.hh"
#include "raft/RaftJournal.hh"
#include "raft/RaftState.hh"
#include "Formatter.hh"
#include "StateMachine.hh"
#include "Utils.hh"
using namespace quarkdb;
RaftWriteTracker::RaftWriteTracker(RaftJournal &jr, StateMachine &sm, Publisher &pub)
: journal(jr), stateMachine(sm), redisDispatcher(sm, pub) {
commitApplier = std::thread(&RaftWriteTracker::applyCommits, this);
}
RaftWriteTracker::~RaftWriteTracker() {
shutdown = true;
while(commitApplierActive) {
journal.notifyWaitingThreads();
}
commitApplier.join();
flushQueues(Formatter::err("unavailable"));
}
void RaftWriteTracker::applySingleCommit(LogIndex index) {
// Determine if this particular index entry is associated to a request queue.
std::shared_ptr blockedQueue = blockedWrites.popIndex(index);
if(blockedQueue.get() == nullptr) {
// this journal entry is not related to any connection,
// let's just apply it manually from the journal
RaftEntry entry;
if(!journal.fetch(index, entry).ok()) {
// serious error, threatens consistency. Bail out
qdb_throw("failed to fetch log entry " << index << " when applying commits");
}
redisDispatcher.dispatch(entry.request, index);
return;
}
LogIndex newBlockingIndex = blockedQueue->dispatchPending(&redisDispatcher, index);
if(newBlockingIndex > 0) {
if(newBlockingIndex <= index) qdb_throw("blocking index of queue went backwards: " << index << " => " << newBlockingIndex);
blockedWrites.insert(newBlockingIndex, blockedQueue);
}
}
void RaftWriteTracker::updatedCommitIndex(LogIndex commitIndex) {
std::scoped_lock lock(mtx);
for(LogIndex index = stateMachine.getLastApplied()+1; index <= commitIndex; index++) {
applySingleCommit(index);
}
}
void RaftWriteTracker::applyCommits() {
LogIndex commitIndex = journal.getCommitIndex(); // local cached value
updatedCommitIndex(commitIndex);
while(journal.waitForCommits(commitIndex)) {
if(shutdown) break;
commitIndex = journal.getCommitIndex();
updatedCommitIndex(journal.getCommitIndex());
}
commitApplierActive = false;
}
void RaftWriteTracker::flushQueues(const RedisEncodedResponse &response) {
std::scoped_lock lock(mtx);
blockedWrites.flush(response);
}
bool RaftWriteTracker::append(LogIndex index, RaftTerm term, Transaction &&tx, const std::shared_ptr &queue, RedisDispatcher &dispatcher) {
std::scoped_lock lock(mtx);
if(!journal.append(index, RaftEntry(term, tx.toRedisRequest()), false)) {
qdb_warn("appending to journal failed for index = " << index <<
" and term " << term << " when appending to write tracker");
return false;
}
blockedWrites.insert(index, queue);
queue->addPendingTransaction(&dispatcher, std::move(tx), index);
return true;
}