// ----------------------------------------------------------------------
// File: RaftJournal.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef __QUARKDB_RAFT_JOURNAL_H__
#define __QUARKDB_RAFT_JOURNAL_H__
#include
#include
#include
#include "RaftCommon.hh"
#include "RaftMembers.hh"
#include "utils/FsyncThread.hh"
#include "storage/WriteStallWarner.hh"
namespace quarkdb {
class RaftJournal {
public:
static void ObliterateAndReinitializeJournal(const std::string &path, RaftClusterID clusterID, std::vector nodes, LogIndex startIndex, FsyncPolicy fsyncPolicy);
// opens an existing journal
RaftJournal(const std::string &path);
// re-initializes a journal, obliterates the contents of the old one, if it exists
RaftJournal(const std::string &path, RaftClusterID clusterID, const std::vector &nodes, LogIndex startIndex, FsyncPolicy fscynPolicy);
~RaftJournal();
// should never have to be called during normal operation, only in the tests
// assumes there's no other concurrent access to the journal
void obliterate(RaftClusterID clusterID, const std::vector &nodes, LogIndex startIndex, FsyncPolicy fscynPolicy);
bool setCurrentTerm(RaftTerm term, RaftServer vote);
bool setCommitIndex(LogIndex index);
void setFsyncPolicy(FsyncPolicy pol);
FsyncPolicy getFsyncPolicy();
RaftTerm getCurrentTerm() const { return currentTerm; }
LogIndex getLogSize() const { return logSize; }
LogIndex getLogStart() const { return logStart; }
RaftClusterID getClusterID() const { return clusterID; }
LogIndex getCommitIndex() const { return commitIndex; }
std::vector getNodes();
RaftServer getVotedFor();
LogIndex getEpoch() const { return membershipEpoch; }
RaftMembership getMembership();
bool append(LogIndex index, const RaftEntry &entry, bool important = false);
rocksdb::Status fetch(LogIndex index, RaftEntry &entry);
rocksdb::Status fetch(LogIndex index, RaftTerm &term);
rocksdb::Status fetch(LogIndex index, RaftSerializedEntry &data);
void fetch_last(int lastEntries, std::vector &entry);
void fetch_or_die(LogIndex index, RaftEntry &entry);
void fetch_or_die(LogIndex index, RaftTerm &term);
bool matchEntries(LogIndex index, RaftTerm term);
bool removeEntries(LogIndex start);
LogIndex compareEntries(LogIndex start, const std::vector entries);
void waitForUpdates(LogIndex currentSize, const std::chrono::milliseconds &timeout);
bool waitForCommits(const LogIndex currentCommit);
void notifyWaitingThreads();
std::string getDBPath() { return dbPath; }
rocksdb::Status checkpoint(const std::string &path);
void trimUntil(LogIndex newLogStart);
bool addObserver(RaftTerm term, const RaftServer &observer, std::string &err);
bool promoteObserver(RaftTerm term, const RaftServer &obserer, std::string &err);
bool removeMember(RaftTerm term, const RaftServer &member, std::string &err);
bool demoteToObserver(RaftTerm term, const RaftServer &member, std::string &err);
bool appendLeadershipMarker(LogIndex index, RaftTerm term, const RaftServer &leader);
bool simulateDataLoss(size_t numberOfEntries);
class Iterator {
public:
Iterator(std::unique_ptr iter, LogIndex startingPoint, bool mustMatchStartingPoint);
bool valid();
void next();
void current(RaftSerializedEntry &entry);
LogIndex getCurrentIndex() const;
private:
void validate();
LogIndex currentIndex;
std::unique_ptr iter;
};
Iterator getIterator(LogIndex startingPoint, bool mustMatchStartingPoint);
rocksdb::Status scanContents(LogIndex startingPoint, size_t count, std::string_view match, std::vector &out, LogIndex &nextCursor);
rocksdb::Status manualCompaction();
private:
void openDB(const std::string &path);
void rawSetCommitIndex(LogIndex index);
void ensureFsyncPolicyInitialized();
bool shouldSync(bool important);
void initializeFsyncPolicy();
void initialize();
rocksdb::DB* db = nullptr;
std::string dbPath;
std::unique_ptr fsyncThread;
using IteratorPtr = std::unique_ptr;
//----------------------------------------------------------------------------
// Cached values, always backed to stable storage
//----------------------------------------------------------------------------
std::atomic currentTerm;
std::atomic commitIndex;
std::atomic logSize;
std::atomic logStart;
std::atomic membershipEpoch;
RaftMembers members;
RaftServer votedFor;
RaftClusterID clusterID;
std::atomic fsyncPolicy;
std::mutex currentTermMutex;
std::mutex lastAppliedMutex;
std::mutex commitIndexMutex;
std::mutex contentMutex;
std::mutex membersMutex;
std::mutex votedForMutex;
std::mutex fsyncPolicyMutex;
std::condition_variable commitNotifier;
std::condition_variable logUpdated;
std::shared_ptr writeStallWarner;
//----------------------------------------------------------------------------
// Utility functions for write batches
//----------------------------------------------------------------------------
void commitBatch(rocksdb::WriteBatch &batch, LogIndex index = -1, bool important = false);
//----------------------------------------------------------------------------
// Transient values, can always be inferred from stable storage
//----------------------------------------------------------------------------
RaftTerm termOfLastEntry;
//----------------------------------------------------------------------------
// Helper functions
//----------------------------------------------------------------------------
RaftMembers getMembers();
bool membershipUpdate(RaftTerm term, const RaftMembers &newMembers, std::string &err);
bool appendNoLock(LogIndex index, const RaftEntry &entry, bool important);
void set_or_die(const std::string &key, const std::string &value);
void set_int_or_die(const std::string &key, int64_t value);
std::string get_or_die(const std::string &key);
int64_t get_int_or_die(const std::string &key);
};
}
#endif