// ----------------------------------------------------------------------
// File: ShardDirectory.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "ShardDirectory.hh"
#include "utils/FileUtils.hh"
#include "StateMachine.hh"
#include "raft/RaftJournal.hh"
#include
using namespace quarkdb;
ShardSnapshot::ShardSnapshot(const std::string &path_)
: path(path_) { }
ShardSnapshot::~ShardSnapshot() {
if (system(SSTR("rm -rf " << path).c_str())) {
qdb_error("Failed to delete: " << path);
}
}
std::string ShardSnapshot::getPath() {
return path;
}
std::string ShardDirectory::resilveringHistoryPath() {
return pathJoin(path, "RESILVERING-HISTORY");
}
void ShardDirectory::parseResilveringHistory() {
std::string historyPath(resilveringHistoryPath()), tmp;
if(!readFile(historyPath, tmp)) {
qdb_throw("Unable to read resilvering history from '" << historyPath << "'");
}
if(!ResilveringHistory::deserialize(tmp, resilveringHistory)) {
qdb_throw("Unable to parse resilvering history from '" << historyPath << "'");
}
}
void ShardDirectory::storeResilveringHistory() {
write_file_or_die(resilveringHistoryPath(), resilveringHistory.serialize());
}
ShardDirectory::ShardDirectory(const std::string &initpath, Configuration config)
: path(initpath), configuration(config) {
std::string err;
if(!directoryExists(path, err)) {
qdb_fatal("Cannot initialize shard directory at '" << path << "': " << err);
}
std::string idPath(pathJoin(path, "SHARD-ID"));
if(!readFile(idPath, shardID)) {
qdb_throw("Unable to read shard id from '" << idPath << "'");
}
parseResilveringHistory();
}
ShardDirectory::~ShardDirectory() {
detach();
}
void ShardDirectory::detach() {
if(smptr) {
delete smptr;
smptr = nullptr;
}
if(journalptr) {
delete journalptr;
journalptr = nullptr;
}
}
StateMachine* ShardDirectory::getStateMachineForBulkload() {
qdb_assert(!smptr);
smptr = new StateMachine(stateMachinePath(), false, true);
return smptr;
}
StateMachine* ShardDirectory::getStateMachine() {
if(smptr) return smptr;
smptr = new StateMachine(stateMachinePath(), configuration.getWriteAheadLog());
return smptr;
}
bool ShardDirectory::hasRaftJournal(std::string &err) const {
return directoryExists(raftJournalPath(), err);
}
RaftJournal* ShardDirectory::getRaftJournal() {
if(journalptr) return journalptr;
std::string suberr;
if(!hasRaftJournal(suberr)) {
qdb_throw("Cannot open raft journal: " << suberr);
}
journalptr = new RaftJournal(raftJournalPath());
return journalptr;
}
std::string ShardDirectory::currentPath() const {
return pathJoin(path, "current");
}
std::string ShardDirectory::stateMachinePath() const {
return pathJoin(currentPath(), "state-machine");
}
std::string ShardDirectory::raftJournalPath() const {
return pathJoin(currentPath(), "raft-journal");
}
//------------------------------------------------------------------------------
// Wipe out StateMachine contents.
//------------------------------------------------------------------------------
void ShardDirectory::wipeoutStateMachineContents() {
if(smptr) {
//--------------------------------------------------------------------------
// We have the state machine open already.. wipe contents through reset
//--------------------------------------------------------------------------
getStateMachine()->reset();
}
else {
//--------------------------------------------------------------------------
// Not open, simply delete the entire folder
//--------------------------------------------------------------------------
qdb_assert(system(SSTR("rm -rf '" << stateMachinePath() << "'").c_str()) == 0);
}
}
//------------------------------------------------------------------------------
// Initialize our StateMachine with the given source, if any.
// If no source is given, create a brand new one.
//------------------------------------------------------------------------------
void ShardDirectory::initializeStateMachine(std::unique_ptr sm, LogIndex initialLastApplied) {
if(!sm && initialLastApplied == 0) {
//--------------------------------------------------------------------------
// Simplest case: No seed machine, and starting from 0.
// Just ensure SM is wiped out.
//--------------------------------------------------------------------------
wipeoutStateMachineContents();
return;
}
if(!sm) {
//--------------------------------------------------------------------------
// No seed machine, but starting from non-zero.
//--------------------------------------------------------------------------
wipeoutStateMachineContents();
getStateMachine()->forceResetLastApplied(initialLastApplied);
}
//----------------------------------------------------------------------------
// We have to reset any old contents with those of the pre-existing seed
// StateMachine. First, get the target filename..
//----------------------------------------------------------------------------
std::string sourceStateMahchine = sm->getPhysicalLocation();
//----------------------------------------------------------------------------
// Shut it down - we don't want to be moving files of a live SM..
//----------------------------------------------------------------------------
sm.reset();
//----------------------------------------------------------------------------
// Shut down and wipe out any existing, to-be-deleted SMs we own
//----------------------------------------------------------------------------
detach();
wipeoutStateMachineContents();
//----------------------------------------------------------------------------
// Do the actual move
//----------------------------------------------------------------------------
qdb_assert(system(SSTR("mv " << quotes(sourceStateMahchine) << " " << quotes(stateMachinePath()) ).c_str()) == 0);
//----------------------------------------------------------------------------
// Force reset lastApplied.
//----------------------------------------------------------------------------
getStateMachine()->forceResetLastApplied(initialLastApplied);
}
void ShardDirectory::obliterate(RaftClusterID clusterID, const std::vector &nodes, LogIndex startIndex, FsyncPolicy fsyncPolicy, std::unique_ptr sm) {
bool hasSeedSM = (sm.get() != nullptr);
initializeStateMachine(std::move(sm), startIndex);
if(!journalptr) {
journalptr = new RaftJournal(raftJournalPath(), clusterID, nodes, startIndex, fsyncPolicy);
}
else {
getRaftJournal()->obliterate(clusterID, nodes, startIndex, fsyncPolicy);
}
resilveringHistory.clear();
if(!hasSeedSM) {
resilveringHistory.append(ResilveringEvent("GENESIS", time(NULL)));
}
else {
resilveringHistory.append(ResilveringEvent(SSTR("GENESIS-FROM-EXISTING-SM-AT-INDEX:" << startIndex), time(NULL)));
}
storeResilveringHistory();
}
Status ShardDirectory::initializeDirectory(const std::string &path, RaftClusterID clusterID, ShardID shardID) {
std::string err;
if(directoryExists(path, err)) {
return Status(EEXIST, SSTR("Cannot initialize shard directory for '" << shardID << "', path already exists: " << path));
}
mkpath_or_die(path + "/", 0755);
write_file_or_die(pathJoin(path, "SHARD-ID"), shardID);
mkpath_or_die(pathJoin(path, "current") + "/", 0755);
ResilveringHistory history;
history.append(ResilveringEvent("GENESIS", time(NULL)));
write_file_or_die(pathJoin(path, "RESILVERING-HISTORY"), history.serialize());
return Status();
}
ShardDirectory* ShardDirectory::create(const std::string &path, RaftClusterID clusterID, ShardID shardID, std::unique_ptr sm, Status &st) {
st = initializeDirectory(path, clusterID, shardID);
if(!st.ok()) {
return nullptr;
}
ShardDirectory *shardDirectory = new ShardDirectory(path);
// Standalone shard, we start from LogIndex 0
shardDirectory->initializeStateMachine(std::move(sm), 0);
return new ShardDirectory(path);
}
ShardDirectory* ShardDirectory::create(const std::string &path, RaftClusterID clusterID, ShardID shardID, const std::vector &nodes, LogIndex startIndex, FsyncPolicy fsyncPolicy, std::unique_ptr sm, Status &st) {
st = initializeDirectory(path, clusterID, shardID);
if(!st.ok()) {
return nullptr;
}
ShardDirectory *shardDirectory = new ShardDirectory(path);
shardDirectory->obliterate(clusterID, nodes, startIndex, fsyncPolicy, std::move(sm));
return shardDirectory;
}
// Before calling this function, journal trimming should have been turned off!
std::unique_ptr ShardDirectory::takeSnapshot(const SnapshotID &id, std::string &err) {
std::string snapshotDirectory = getTempSnapshot(id);
if(!mkpath(snapshotDirectory + "/", 0755, err)) {
qdb_critical(err);
return nullptr;
}
std::string smCheckpoint = pathJoin(snapshotDirectory, "state-machine");
rocksdb::Status st = getStateMachine()->checkpoint(smCheckpoint);
if(!st.ok()) {
qdb_critical("cannot create state machine checkpoint in " << smCheckpoint << ": " << st.ToString());
return nullptr;
}
std::string journalCheckpoint = pathJoin(snapshotDirectory, "raft-journal");
st = getRaftJournal()->checkpoint(journalCheckpoint);
if(!st.ok()) {
qdb_critical("cannot create journal checkpoint in " << journalCheckpoint << ": " << st.ToString());
return nullptr;
}
return std::unique_ptr(new ShardSnapshot(snapshotDirectory));
}
bool ShardDirectory::resilveringStart(const ResilveringEventID &id, std::string &err) {
if(!mkpath(getResilveringArena(id) + "/", 0755, err)) {
err = SSTR("Unable to create resilvering-arena for '" << id << "'");
qdb_critical(err);
return false;
}
return true;
}
bool ShardDirectory::resilveringCopy(const ResilveringEventID &id, std::string_view filename, std::string_view contents, std::string &err) {
std::string targetPath = pathJoin(getResilveringArena(id), filename);
if(!mkpath(targetPath, 0755, err)) {
goto error;
}
if(!write_file(targetPath, contents, err)) {
goto error;
}
return true;
error:
qdb_critical("error during resilveringCopy: " << err);
return false;
}
// When calling this function, we assume caller has released any references
// to the journal and state machine!
bool ShardDirectory::resilveringFinish(const ResilveringEventID &id, std::string &err) {
std::string resilveringArena = getResilveringArena(id);
if(!directoryExists(resilveringArena, err)) {
return false;
}
detach();
qdb_event("Finalizing resilvering, id '" << id << "'.");
std::string supplanted = getSupplanted(id);
mkpath_or_die(supplanted, 0755);
std::string source = currentPath();
std::string destination = supplanted;
rename_directory_or_die(source, destination);
source = resilveringArena;
destination = currentPath();
rename_directory_or_die(source, destination);
// By some kind of miracle, we have survived up to this point. Attach!
getStateMachine();
getRaftJournal();
// Store the resilvering event into the history.
resilveringHistory.append(ResilveringEvent(id, time(NULL)));
storeResilveringHistory();
qdb_event("Resilvering '" << id << "'" << " has been successful!");
return true;
}
std::string ShardDirectory::getSupplanted(const ResilveringEventID &id) {
return pathJoin(pathJoin(path, "supplanted"), id);
}
std::string ShardDirectory::getResilveringArena(const ResilveringEventID &id) {
return pathJoin(pathJoin(path, "resilvering-arena"), id);
}
std::string ShardDirectory::getTempSnapshot(const SnapshotID &id) {
return pathJoin(pathJoin(path, "temp-snapshots"), id);
}
const ResilveringHistory& ShardDirectory::getResilveringHistory() const {
return resilveringHistory;
}
std::string ShardDirectory::checkpoint(std::string_view path) {
if(mkdir(std::string(path).c_str(), S_IRWXU) != 0) {
return SSTR("Could not mkdir " << path << ": " << errno << " (" << strerror(errno) << ")");
}
Status sameFs = ensureSameFilesystem(path, this->path);
if(!sameFs.ok()) {
return SSTR(sameFs.getMsg() << " (checkpoint needs to be on same physical fs as " << this->path << ")");
}
std::string checkpointCurrent = pathJoin(path, "current");
if(mkdir(checkpointCurrent.c_str(), S_IRWXU) != 0) {
return SSTR("Could not mkdir " << checkpointCurrent << ": " << errno << " (" << strerror(errno) << ")");
}
std::string smCheckpoint = pathJoin(checkpointCurrent, "state-machine");
rocksdb::Status st = getStateMachine()->checkpoint(smCheckpoint);
if(!st.ok()) {
std::string err = SSTR("Could not create state machine checkpoint in " << smCheckpoint << ": " << st.ToString());
qdb_critical(err);
return err;
}
if(journalptr) {
std::string journalCheckpoint = pathJoin(checkpointCurrent, "raft-journal");
rocksdb::Status st = getRaftJournal()->checkpoint(journalCheckpoint);
if(!st.ok()) {
std::string err = SSTR("Could not create journal checkpoint in " << journalCheckpoint << ": " << st.ToString());
qdb_critical(err);
return err;
}
}
std::string resilvHist = pathJoin(path, "RESILVERING-HISTORY");
std::string err;
if(!write_file(resilvHist, resilveringHistory.serialize(), err)) {
qdb_critical(err);
return err;
}
std::string shardIdentPath = pathJoin(path, "SHARD-ID");
if(!write_file(shardIdentPath, shardID, err)) {
qdb_critical(err);
return err;
}
return {}; // success
}