// ----------------------------------------------------------------------
// File: RaftReplicator.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef QUARKDB_RAFT_REPLICATOR_H
#define QUARKDB_RAFT_REPLICATOR_H
#include "RaftTimeouts.hh"
#include
#include
#include "raft/RaftTalker.hh"
#include "raft/RaftState.hh"
#include "raft/RaftTrimmer.hh"
#include "utils/AssistedThread.hh"
#include "utils/Synchronized.hh"
namespace quarkdb {
//------------------------------------------------------------------------------
// Forward declarations
//------------------------------------------------------------------------------
class RaftTalker; class RaftResilverer; class RaftTrimmer;
class ShardDirectory; class RaftConfig; class RaftState;
class StateMachine; class RaftJournal; class RaftLease;
class RaftCommitTracker; class RaftMatchIndexTracker; class RaftLastContact;
class RaftContactDetails;
//------------------------------------------------------------------------------
// Tracks a single raft replica
//------------------------------------------------------------------------------
class RaftReplicaTracker {
public:
RaftReplicaTracker(const RaftServer &target, const RaftStateSnapshotPtr &snapshot, RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftContactDetails &contactDetails);
~RaftReplicaTracker();
ReplicaStatus getStatus();
bool isRunning() { return running; }
private:
struct PendingResponse {
PendingResponse(std::future &&f, std::chrono::steady_clock::time_point s, LogIndex pushed, int64_t payload, RaftTerm let)
: fut(std::move(f)), sent(s), pushedFrom(pushed), payloadSize(payload), lastEntryTerm(let) {}
std::future fut;
std::chrono::steady_clock::time_point sent;
LogIndex pushedFrom;
int64_t payloadSize;
RaftTerm lastEntryTerm;
};
void sendHeartbeats(ThreadAssistant &assistant);
void main();
void monitorAckReception(ThreadAssistant &assistant);
std::mutex inFlightMtx;
std::condition_variable inFlightCV;
std::condition_variable inFlightPoppedCV;
std::queue inFlight;
std::atomic streamingUpdates;
LogIndex streamUpdates(RaftTalker &talker, LogIndex nextIndex);
void triggerResilvering();
bool buildPayload(LogIndex nextIndex, int64_t payloadLimit, std::vector &entries,
RaftTerm &lastEntryTerm);
bool sendPayload(RaftTalker &talker, LogIndex nextIndex, int64_t payloadLimit,
std::future &reply, std::chrono::steady_clock::time_point &contact, int64_t &payloadSize,
RaftTerm &lastEntryTerm);
RaftServer target;
RaftStateSnapshotPtr snapshot;
void updateStatus(bool online, LogIndex logSize);
// Values to report when getStatus is called. Updated infrequently to avoid
// overhead of atomics.
std::atomic statusOnline {false};
std::atomic statusLogSize {-1};
Synchronized statusNodeVersion {"N/A"};
Synchronized statusResilveringProgress {""};
RaftJournal &journal;
RaftState &state;
RaftLease &lease;
RaftCommitTracker &commitTracker;
RaftTrimmer &trimmer;
ShardDirectory &shardDirectory;
RaftConfig &config;
const RaftContactDetails &contactDetails;
RaftMatchIndexTracker &matchIndex;
RaftLastContact &lastContact;
std::atomic running {false};
std::atomic shutdown {false};
std::thread thread;
AssistedThread heartbeatThread;
std::unique_ptr resilverer;
RaftTrimmingBlock trimmingBlock;
};
//------------------------------------------------------------------------------
// A class that tracks multiple raft replicas over the duration of a single
// term
//------------------------------------------------------------------------------
class RaftReplicator {
public:
RaftReplicator(RaftJournal &journal, RaftState &state, RaftLease &lease, RaftCommitTracker &commitTracker, RaftTrimmer &trimmer, ShardDirectory &shardDirectory, RaftConfig &config, const RaftContactDetails &contactDetails);
~RaftReplicator();
void activate(RaftStateSnapshotPtr &snapshot);
void deactivate();
ReplicationStatus getStatus();
void reconfigure();
private:
void setTargets(const std::vector &targets);
RaftStateSnapshotPtr snapshot;
RaftJournal &journal;
RaftState &state;
RaftLease &lease;
RaftCommitTracker &commitTracker;
RaftTrimmer &trimmer;
ShardDirectory &shardDirectory;
RaftConfig &config;
const RaftContactDetails &contactDetails;
std::map targets;
std::recursive_mutex mtx;
};
}
#endif