// ----------------------------------------------------------------------
// File: RaftJournal.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "raft/RaftJournal.hh"
#include "raft/RaftMembers.hh"
#include "storage/KeyConstants.hh"
#include "Common.hh"
#include "Utils.hh"
#include "utils/IntToBinaryString.hh"
#include "utils/StaticBuffer.hh"
#include "utils/StringUtils.hh"
#include "../deps/StringMatchLen.h"
#include "raft/RaftState.hh"
#include
#include
#include
#include
using namespace quarkdb;
#define THROW_ON_ERROR(stmt) { rocksdb::Status st2 = stmt; if(!st2.ok()) qdb_throw(st2.ToString()); }
//------------------------------------------------------------------------------
// Helper functions
//------------------------------------------------------------------------------
constexpr size_t kEntryKeySize = 1 + sizeof(LogIndex);
using KeyBuffer = StaticBuffer;
static std::string encodeEntryKey(LogIndex index) {
return SSTR("E" << intToBinaryString(index));
}
static bool parseEntryKey(std::string_view key, LogIndex &index) {
if(key.size() != 1 + sizeof(index)) {
return false;
}
if(key[0] != 'E') {
return false;
}
index = binaryStringToInt(key.data() + 1);
return true;
}
QDB_ALWAYS_INLINE
inline void encodeEntryKey(LogIndex index, KeyBuffer &key) {
key.data()[0] = 'E';
intToBinaryString(index, key.data()+1);
}
//------------------------------------------------------------------------------
// Initialize fsync policy, if not already. Ensures compatibility with
// pre-0.4.1 versions of QuarkDB.
//------------------------------------------------------------------------------
void RaftJournal::ensureFsyncPolicyInitialized() {
std::string tmp;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), KeyConstants::kJournal_FsyncPolicy, &tmp);
if(!st.ok() && !st.IsNotFound()) {
qdb_throw(st.ToString());
}
if(st.ok()) {
return;
}
this->set_or_die(KeyConstants::kJournal_FsyncPolicy, fsyncPolicyToString(FsyncPolicy::kSyncImportantUpdates));
}
//------------------------------------------------------------------------------
// Should we sync this write?
//------------------------------------------------------------------------------
bool RaftJournal::shouldSync(bool important) {
if(fsyncPolicy == FsyncPolicy::kAlways) {
return true;
}
if(fsyncPolicy == FsyncPolicy::kAsync) {
return false;
}
qdb_assert(fsyncPolicy == FsyncPolicy::kSyncImportantUpdates);
return important;
}
//------------------------------------------------------------------------------
// RaftJournal
//------------------------------------------------------------------------------
void RaftJournal::ObliterateAndReinitializeJournal(const std::string &path, RaftClusterID clusterID, std::vector nodes, LogIndex startIndex, FsyncPolicy fsyncPolicy) {
RaftJournal journal(path, clusterID, nodes, startIndex, fsyncPolicy);
}
void RaftJournal::obliterate(RaftClusterID newClusterID, const std::vector &newNodes, LogIndex startIndex, FsyncPolicy fsyncPolicy) {
IteratorPtr iter(db->NewIterator(rocksdb::ReadOptions()));
for(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
db->Delete(rocksdb::WriteOptions(), iter->key().ToString());
}
this->set_int_or_die(KeyConstants::kJournal_CurrentTerm, 0);
this->set_int_or_die(KeyConstants::kJournal_LogSize, startIndex+1);
this->set_int_or_die(KeyConstants::kJournal_LogStart, startIndex);
this->set_or_die(KeyConstants::kJournal_ClusterID, newClusterID);
this->set_or_die(KeyConstants::kJournal_VotedFor, "");
this->set_int_or_die(KeyConstants::kJournal_CommitIndex, startIndex);
RaftMembers newMembers(newNodes, {});
this->set_or_die(KeyConstants::kJournal_Members, newMembers.toString());
this->set_int_or_die(KeyConstants::kJournal_MembershipEpoch, startIndex);
this->set_or_die(KeyConstants::kJournal_FsyncPolicy, fsyncPolicyToString(fsyncPolicy) );
RaftEntry entry(0, "JOURNAL_UPDATE_MEMBERS", newMembers.toString(), newClusterID);
this->set_or_die(encodeEntryKey(startIndex), entry.serialize());
initialize();
}
void RaftJournal::initializeFsyncPolicy() {
std::string policyStr = this->get_or_die(KeyConstants::kJournal_FsyncPolicy);
FsyncPolicy tmp = FsyncPolicy::kSyncImportantUpdates;
if(!parseFsyncPolicy(policyStr, tmp)) {
qdb_critical("Invalid fsync policy in journal: " << policyStr);
}
fsyncPolicy = tmp;
}
void RaftJournal::initialize() {
currentTerm = this->get_int_or_die(KeyConstants::kJournal_CurrentTerm);
logSize = this->get_int_or_die(KeyConstants::kJournal_LogSize);
logStart = this->get_int_or_die(KeyConstants::kJournal_LogStart);
clusterID = this->get_or_die(KeyConstants::kJournal_ClusterID);
commitIndex = this->get_int_or_die(KeyConstants::kJournal_CommitIndex);
std::string vote = this->get_or_die(KeyConstants::kJournal_VotedFor);
this->fetch_or_die(logSize-1, termOfLastEntry);
membershipEpoch = this->get_int_or_die(KeyConstants::kJournal_MembershipEpoch);
members = RaftMembers(this->get_or_die(KeyConstants::kJournal_Members));
initializeFsyncPolicy();
if(!vote.empty() && !parseServer(vote, votedFor)) {
qdb_throw("journal corruption, cannot parse " << KeyConstants::kJournal_VotedFor << ": " << vote);
}
fsyncThread.reset(new FsyncThread(db, std::chrono::seconds(1)));
}
void RaftJournal::openDB(const std::string &path) {
qdb_info("Opening raft journal " << quotes(path));
dbPath = path;
rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
table_options.block_size = 16 * 1024;
options.compression = rocksdb::kNoCompression;
options.bottommost_compression = rocksdb::kNoCompression;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
options.create_if_missing = true;
options.max_manifest_file_size = 1024 * 1024;
// Warn on write stalls
writeStallWarner.reset(new WriteStallWarner("raft-journal"));
options.listeners.emplace_back(writeStallWarner);
rocksdb::Status status = rocksdb::DB::Open(options, path, &db);
if(!status.ok()) qdb_throw("Error while opening journal in " << path << ":" << status.ToString());
}
RaftJournal::RaftJournal(const std::string &filename, RaftClusterID clusterID, const std::vector &nodes, LogIndex startIndex, FsyncPolicy fsyncPolicy) {
openDB(filename);
obliterate(clusterID, nodes, startIndex, fsyncPolicy);
}
RaftJournal::~RaftJournal() {
qdb_info("Closing raft journal " << quotes(dbPath));
fsyncThread.reset();
if(db) {
delete db;
db = nullptr;
}
}
RaftJournal::RaftJournal(const std::string &filename) {
openDB(filename);
ensureFsyncPolicyInitialized();
initialize();
}
bool RaftJournal::setCurrentTerm(RaftTerm term, RaftServer vote) {
std::scoped_lock lock(currentTermMutex);
//----------------------------------------------------------------------------
// Terms should never go back in time
//----------------------------------------------------------------------------
if(term < currentTerm) {
return false;
}
//----------------------------------------------------------------------------
// The vote for the current term should never change
//----------------------------------------------------------------------------
if(term == currentTerm && !votedFor.empty()) {
return false;
}
//----------------------------------------------------------------------------
// Atomically update currentTerm and votedFor
//----------------------------------------------------------------------------
rocksdb::WriteBatch batch;
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_CurrentTerm, intToBinaryString(term)));
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_VotedFor, vote.toString()));
commitBatch(batch, -1, true);
currentTerm = term;
votedFor = vote;
return true;
}
bool RaftJournal::simulateDataLoss(size_t numberOfEntries) {
LogIndex newLogSize = logSize - numberOfEntries;
if(newLogSize <= commitIndex) {
rawSetCommitIndex(newLogSize-1);
}
return removeEntries(newLogSize);
}
bool RaftJournal::setCommitIndex(LogIndex newIndex) {
std::scoped_lock lock(commitIndexMutex);
if(newIndex < commitIndex) {
qdb_warn("attempted to set commit index in the past, from " << commitIndex << " ==> " << newIndex);
return false;
}
if(logSize <= newIndex) {
qdb_throw("attempted to mark as committed a non-existing entry. Journal size: " << logSize << ", new index: " << newIndex);
}
if(commitIndex < newIndex) {
rawSetCommitIndex(newIndex);
}
return true;
}
void RaftJournal::rawSetCommitIndex(LogIndex newIndex) {
this->set_int_or_die(KeyConstants::kJournal_CommitIndex, newIndex);
commitIndex = newIndex;
commitNotifier.notify_all();
}
bool RaftJournal::waitForCommits(const LogIndex currentCommit) {
std::unique_lock lock(commitIndexMutex);
if(currentCommit < commitIndex) return true;
commitNotifier.wait(lock);
return true;
}
void RaftJournal::commitBatch(rocksdb::WriteBatch &batch, LogIndex index, bool important) {
if(index >= 0 && index <= commitIndex) {
qdb_throw("Attempted to remove committed entries by setting logSize to " << index << " while commitIndex = " << commitIndex);
}
if(index >= 0 && index != logSize) {
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_LogSize, intToBinaryString(index)));
}
rocksdb::WriteOptions opts;
opts.sync = shouldSync(important);
rocksdb::Status st = db->Write(opts, &batch);
if(!st.ok()) qdb_throw("unable to commit journal transaction: " << st.ToString());
if(index >= 0) logSize = index;
}
RaftMembers RaftJournal::getMembers() {
std::scoped_lock lock(membersMutex);
return members;
}
RaftMembership RaftJournal::getMembership() {
std::scoped_lock lock(membersMutex);
return {members.nodes, members.observers, membershipEpoch};
}
bool RaftJournal::membershipUpdate(RaftTerm term, const RaftMembers &newMembers, std::string &err) {
std::scoped_lock lock(contentMutex);
if(commitIndex < membershipEpoch) {
err = SSTR("the current membership epoch has not been committed yet: " << membershipEpoch);
return false;
}
RaftEntry entry(term, "JOURNAL_UPDATE_MEMBERS", newMembers.toString(), clusterID);
return appendNoLock(logSize, entry, true);
}
bool RaftJournal::addObserver(RaftTerm term, const RaftServer &observer, std::string &err) {
RaftMembers newMembers = getMembers();
if(!newMembers.addObserver(observer, err)) return false;
return membershipUpdate(term, newMembers, err);
}
bool RaftJournal::removeMember(RaftTerm term, const RaftServer &member, std::string &err) {
RaftMembers newMembers = getMembers();
if(!newMembers.removeMember(member, err)) return false;
return membershipUpdate(term, newMembers, err);
}
bool RaftJournal::promoteObserver(RaftTerm term, const RaftServer &observer, std::string &err) {
RaftMembers newMembers = getMembers();
if(!newMembers.promoteObserver(observer, err)) return false;
return membershipUpdate(term, newMembers, err);
}
bool RaftJournal::demoteToObserver(RaftTerm term, const RaftServer &member, std::string &err) {
RaftMembers newMembers = getMembers();
if(!newMembers.demoteToObserver(member, err)) return false;
return membershipUpdate(term, newMembers, err);
}
bool RaftJournal::appendNoLock(LogIndex index, const RaftEntry &entry, bool important) {
if(index != logSize) {
qdb_warn("attempted to insert journal entry at an invalid position. index = " << index << ", logSize = " << logSize);
return false;
}
if(entry.term > currentTerm) {
qdb_warn("attempted to insert journal entry with a higher term than the current one: " << entry.term << " vs " << currentTerm);
return false;
}
if(entry.term < termOfLastEntry) {
qdb_warn("attempted to insert journal entry with lower term " << entry.term << ", while last one is " << termOfLastEntry);
return false;
}
rocksdb::WriteBatch batch;
if(entry.request[0] == "JOURNAL_UPDATE_MEMBERS") {
if(entry.request.size() != 3) qdb_throw("Journal corruption, invalid journal_update_members: " << entry.request);
//--------------------------------------------------------------------------
// Special case for membership updates
// We don't wait until the entry is committed, and it takes effect
// immediatelly.
// The commit applier will ignore such entries, and apply a no-op to the
// state machine.
//--------------------------------------------------------------------------
if(entry.request[2] == clusterID) {
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_Members, entry.request[1]));
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_MembershipEpoch, intToBinaryString(index)));
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_PreviousMembers, members.toString()));
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_PreviousMembershipEpoch, intToBinaryString(membershipEpoch)));
qdb_event("Transitioning into a new membership epoch: " << membershipEpoch << " => " << index
<< ". Old members: " << members.toString() << ", new members: " << entry.request[1]);
std::scoped_lock lock(membersMutex);
members = RaftMembers(entry.request[1]);
membershipEpoch = index;
}
else {
qdb_critical("Received request for membership update " << entry.request << ", but the clusterIDs do not match - mine is " << clusterID
<< ". THE MEMBERSHIP UPDATE ENTRY WILL BE IGNORED. Something is either corrupted or you force-reconfigured " <<
" the nodes recently - if it's the latter, this message is nothing to worry about.");
}
important = true;
}
KeyBuffer keyBuffer;
encodeEntryKey(index, keyBuffer);
THROW_ON_ERROR(batch.Put(keyBuffer.toView(), entry.serialize()));
commitBatch(batch, index+1, important);
termOfLastEntry = entry.term;
logUpdated.notify_all();
return true;
}
bool RaftJournal::append(LogIndex index, const RaftEntry &entry, bool important) {
std::scoped_lock lock(contentMutex);
return appendNoLock(index, entry, important);
}
bool RaftJournal::appendLeadershipMarker(LogIndex index, RaftTerm term, const RaftServer &leader) {
return append(index, RaftEntry(term, "JOURNAL_LEADERSHIP_MARKER", SSTR(term), leader.toString()), true);
}
void RaftJournal::setFsyncPolicy(FsyncPolicy pol) {
std::unique_lock lock(fsyncPolicyMutex);
if(fsyncPolicy != pol) {
this->set_or_die(KeyConstants::kJournal_FsyncPolicy, fsyncPolicyToString(pol));
fsyncPolicy = pol;
}
}
FsyncPolicy RaftJournal::getFsyncPolicy() {
return fsyncPolicy;
}
void RaftJournal::trimUntil(LogIndex newLogStart) {
// no locking - trimmed entries should be so old
// that they are not being accessed anymore
if(newLogStart <= logStart) return; // no entries to trim
if(logSize < newLogStart) qdb_throw("attempted to trim a journal past its end. logSize: " << logSize << ", new log start: " << newLogStart);
if(commitIndex < newLogStart) qdb_throw("attempted to trim non-committed entries. commitIndex: " << commitIndex << ", new log start: " << newLogStart);
qdb_info("Trimming raft journal from #" << logStart << " until #" << newLogStart);
rocksdb::WriteBatch batch;
for(LogIndex i = logStart; i < newLogStart; i++) {
THROW_ON_ERROR(batch.Delete(encodeEntryKey(i)));
}
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_LogStart, intToBinaryString(newLogStart)));
commitBatch(batch);
logStart = newLogStart;
}
RaftServer RaftJournal::getVotedFor() {
std::scoped_lock lock(votedForMutex);
return votedFor;
}
std::vector RaftJournal::getNodes() {
return getMembership().nodes;
}
void RaftJournal::notifyWaitingThreads() {
logUpdated.notify_all();
commitNotifier.notify_all();
}
void RaftJournal::waitForUpdates(LogIndex currentSize, const std::chrono::milliseconds &timeout) {
std::unique_lock lock(contentMutex);
// race, there's an update already
if(currentSize < logSize) return;
logUpdated.wait_for(lock, timeout);
}
bool RaftJournal::removeEntries(LogIndex from) {
std::unique_lock lock(contentMutex);
if(logSize <= from) return false;
if(from <= commitIndex) qdb_throw("attempted to remove committed entries. commitIndex: " << commitIndex << ", from: " << from);
qdb_warn("Removing inconsistent log entries: [" << from << "," << logSize-1 << "]");
rocksdb::WriteBatch batch;
for(LogIndex i = from; i < logSize; i++) {
THROW_ON_ERROR(batch.Delete(encodeEntryKey(i)));
}
//----------------------------------------------------------------------------
// Membership epochs take effect immediatelly, without waiting for the entries
// to be committed. (as per the Raft PhD thesis)
// This means that an uncommitted membership epoch can be theoretically rolled
// back.
// This should be extremely uncommon, so we log a critical message.
//----------------------------------------------------------------------------
if(from <= membershipEpoch) {
std::scoped_lock lock2(membersMutex);
LogIndex previousMembershipEpoch = this->get_int_or_die(KeyConstants::kJournal_PreviousMembershipEpoch);
std::string previousMembers = this->get_or_die(KeyConstants::kJournal_PreviousMembers);
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_MembershipEpoch, intToBinaryString(previousMembershipEpoch)));
THROW_ON_ERROR(batch.Put(KeyConstants::kJournal_Members, previousMembers));
qdb_critical("Rolling back an uncommitted membership epoch. Transitioning from " <<
membershipEpoch << " => " << previousMembershipEpoch << ". Old members: " << members.toString() <<
", new members: " << previousMembers);
members = RaftMembers(previousMembers);
membershipEpoch = previousMembershipEpoch;
}
commitBatch(batch, from);
fetch_or_die(from-1, termOfLastEntry);
return true;
}
// return the first entry which is not identical to the ones in the vector
LogIndex RaftJournal::compareEntries(LogIndex start, const std::vector entries) {
std::scoped_lock lock(contentMutex);
LogIndex endIndex = std::min(LogIndex(logSize), LogIndex(start+entries.size()));
LogIndex startIndex = std::max(start, LogIndex(logStart));
if(start != startIndex) {
qdb_critical("Tried to compare entries which have already been trimmed.. will assume they contain no inconsistencies. logStart: " << logStart << ", asked to compare starting from: " << start);
}
for(LogIndex i = startIndex; i < endIndex; i++) {
RaftEntry entry;
fetch_or_die(i, entry);
if(entries[i-start] != entry) {
qdb_warn("Detected inconsistency for entry #" << i << ". Contents of my journal: " << entry << ". Contents of what the leader sent: " << entries[i-start]);
return i;
}
}
return endIndex;
}
bool RaftJournal::matchEntries(LogIndex index, RaftTerm term) {
std::scoped_lock lock(contentMutex);
if(logSize <= index) {
return false;
}
RaftTerm tr;
rocksdb::Status status = this->fetch(index, tr);
if(!status.ok() && !status.IsNotFound()) {
qdb_throw("rocksdb error: " << status.ToString());
}
return status.ok() && tr == term;
}
//------------------------------------------------------------------------------
// Log entry fetch operations
//------------------------------------------------------------------------------
rocksdb::Status RaftJournal::fetch(LogIndex index, RaftEntry &entry) {
// we intentionally do not check logSize and logStart, so as to be able to
// catch potential inconsistencies between the counters and what is
// really contained in the journal
std::string data;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), encodeEntryKey(index), &data);
if(!st.ok()) return st;
RaftEntry::deserialize(entry, data);
return st;
}
rocksdb::Status RaftJournal::fetch(LogIndex index, RaftTerm &term) {
RaftEntry entry;
rocksdb::Status st = fetch(index, entry);
term = entry.term;
return st;
}
rocksdb::Status RaftJournal::fetch(LogIndex index, RaftSerializedEntry &data) {
return db->Get(rocksdb::ReadOptions(), encodeEntryKey(index), &data);
}
void RaftJournal::fetch_last(int last, std::vector &entries) {
LogIndex endIndex = logSize;
LogIndex startIndex = endIndex - last;
if(startIndex < 0) startIndex = 0;
for(LogIndex index = startIndex; index < endIndex; index++) {
RaftEntry entry;
fetch(index, entry);
entries.emplace_back(entry);
}
}
void RaftJournal::fetch_or_die(LogIndex index, RaftEntry &entry) {
rocksdb::Status st = fetch(index, entry);
if(!st.ok()) {
throw FatalException(SSTR("unable to fetch entry with index " << index));
}
}
void RaftJournal::fetch_or_die(LogIndex index, RaftTerm &term) {
rocksdb::Status st = fetch(index, term);
if(!st.ok()) {
qdb_throw("unable to fetch entry with index " << index);
}
}
void RaftJournal::set_or_die(const std::string &key, const std::string &value) {
rocksdb::Status st = db->Put(rocksdb::WriteOptions(), key, value);
if(!st.ok()) {
qdb_throw("unable to set journal key " << key << ". Error: " << st.ToString());
}
}
void RaftJournal::set_int_or_die(const std::string &key, int64_t value) {
this->set_or_die(key, intToBinaryString(value));
}
int64_t RaftJournal::get_int_or_die(const std::string &key) {
return binaryStringToInt(this->get_or_die(key).c_str());
}
std::string RaftJournal::get_or_die(const std::string &key) {
std::string tmp;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), key, &tmp);
if(!st.ok()) qdb_throw("error when getting journal key " << key << ": " << st.ToString());
return tmp;
}
//------------------------------------------------------------------------------
// Checkpoint for online backup
//------------------------------------------------------------------------------
rocksdb::Status RaftJournal::checkpoint(const std::string &path) {
rocksdb::Checkpoint *checkpoint = nullptr;
rocksdb::Status st = rocksdb::Checkpoint::Create(db, &checkpoint);
if(!st.ok()) return st;
st = checkpoint->CreateCheckpoint(path);
delete checkpoint;
return st;
}
//------------------------------------------------------------------------------
// Scan through the contents of the journal, starting from the given index
//------------------------------------------------------------------------------
rocksdb::Status RaftJournal::scanContents(LogIndex startingPoint, size_t count, std::string_view match, std::vector &out, LogIndex &nextCursor) {
out.clear();
RaftJournal::Iterator iter = getIterator(startingPoint, false);
for(size_t i = 0; i < count; i++) {
if(!iter.valid()) {
break;
}
RaftSerializedEntry item;
iter.current(item);
if(match.empty() || stringmatchlen(match.data(), match.length(), item.data(), item.length(), 0) == 1) {
RaftEntry entry;
RaftEntry::deserialize(entry, item);
out.emplace_back(entry, iter.getCurrentIndex());
}
iter.next();
}
if(!iter.valid()) {
nextCursor = 0;
}
else {
nextCursor = iter.getCurrentIndex();
}
return rocksdb::Status::OK();
}
//------------------------------------------------------------------------------
// Trigger manual compaction of the journal
//------------------------------------------------------------------------------
rocksdb::Status RaftJournal::manualCompaction() {
qdb_event("Triggering manual journal compaction.. auto-compaction will be disabled while the manual one is running.");
// Disabling auto-compactions is a hack to prevent write-stalling. Pending compaction
// bytes will jump to the total size of the DB as soon as a manual compaction is
// issued, which will most likely stall or completely stop writes for a long time.
// (depends on the size of the DB)
// This is a recommendation by rocksdb devs as a workaround: Disabling auto
// compactions will disable write-stalling as well.
THROW_ON_ERROR(db->SetOptions( { {"disable_auto_compactions", "true"} } ));
rocksdb::CompactRangeOptions opts;
opts.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce;
rocksdb::Status st = db->CompactRange(opts, nullptr, nullptr);
THROW_ON_ERROR(db->SetOptions( { {"disable_auto_compactions", "false"} } ));
qdb_event("Manual journal compaction has completed with status " << st.ToString());
return st;
}
//------------------------------------------------------------------------------
// Iterator
//------------------------------------------------------------------------------
RaftJournal::Iterator RaftJournal::getIterator(LogIndex startingPoint, bool mustMatchStartingPoint) {
rocksdb::ReadOptions readOpts;
readOpts.total_order_seek = true;
std::unique_ptr it;
it.reset(db->NewIterator(readOpts));
return RaftJournal::Iterator(std::move(it), startingPoint, mustMatchStartingPoint);
}
RaftJournal::Iterator::Iterator(std::unique_ptr it, LogIndex startingPoint, bool mustMatchStartingPoint) {
iter = std::move(it);
currentIndex = startingPoint;
iter->Seek(encodeEntryKey(currentIndex));
if(!this->valid()) {
iter.reset();
return;
}
// Maybe the startingPoint does not exist.. return an empty iterator in
// such case.
if(mustMatchStartingPoint && iter->key() != encodeEntryKey(currentIndex)) {
iter.reset();
return;
}
else {
// Figure out which index we ended up on
if(!parseEntryKey(iter->key().ToStringView(), currentIndex)) {
iter.reset();
return;
}
}
validate();
}
void RaftJournal::Iterator::validate() {
qdb_assert(this->valid());
if(iter->key()[0] != 'E') {
iter.reset();
return;
}
qdb_assert(iter->key() == encodeEntryKey(currentIndex));
}
bool RaftJournal::Iterator::valid() {
return iter && iter->Valid();
}
void RaftJournal::Iterator::next() {
qdb_assert(this->valid());
iter->Next();
if(iter->Valid()) {
currentIndex++;
validate();
}
}
void RaftJournal::Iterator::current(RaftSerializedEntry &entry) {
qdb_assert(this->valid());
entry = iter->value().ToString();
}
LogIndex RaftJournal::Iterator::getCurrentIndex() const {
return currentIndex;
}