// ---------------------------------------------------------------------- // File: RaftResilverer.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "raft/RaftResilverer.hh" #include "raft/RaftTrimmer.hh" #include "raft/RaftContactDetails.hh" #include "ShardDirectory.hh" #include "Utils.hh" #include "utils/Uuid.hh" #include "utils/DirectoryIterator.hh" #include "utils/FileUtils.hh" #include #include using namespace quarkdb; class OkResponseVerifier { public: OkResponseVerifier(std::future &&fut, size_t timeout = 15) { std::future_status status = fut.wait_for(std::chrono::seconds(timeout)); if(status != std::future_status::ready) { error = SSTR("Timeout after " << timeout << " seconds"); return; } redisReplyPtr rep = fut.get(); if(rep == nullptr) { error = SSTR("Received nullptr response (should never happen)"); return; } if(rep->type != REDIS_REPLY_STATUS) { error = SSTR("Unexpected response type: " << rep->type); return; } std::string response = std::string(rep->str, rep->len); if(response != "OK") { error = SSTR("Unexpected response: " << response); return; } } bool ok() { return error.empty(); } std::string err() { return error; } private: std::string error; }; RaftResilverer::RaftResilverer(ShardDirectory &dir, const RaftServer &trg, const RaftContactDetails &contactDetails, RaftTrimmer &trimmer) : shardDirectory(dir), target(trg), trimmingBlock(new RaftTrimmingBlock(trimmer, 0)), talker(target, contactDetails, "internal-resilverer") { resilveringID = generateUuid(); setStatus(ResilveringState::INPROGRESS, ""); mainThread.reset(&RaftResilverer::main, this); mainThread.setName(SSTR("resilvering-thread-targetting-" << target.toString())); } RaftResilverer::~RaftResilverer() { mainThread.join(); } ResilveringStatus RaftResilverer::getStatus() { std::scoped_lock lock(statusMtx); return status; } void RaftResilverer::setStatus(const ResilveringState &state, const std::string &err) { std::scoped_lock lock(statusMtx); status.state = state; status.err = err; if(status.state == ResilveringState::FAILED) { qdb_critical("Attempt to resilver " << target.toString() << " has failed: " << status.err); cancel(status.err); } else if(status.state == ResilveringState::SUCCEEDED) { qdb_event("Target " << target.toString() << " has been successfully resilvered."); } } void RaftResilverer::cancel(const std::string &reason) { // Fire and forget. The target should be able to automatically cancel failed // resilverings after some timeout, anyway. talker.resilveringCancel(resilveringID, reason); } bool RaftResilverer::copyFile(const std::string &path, const std::string &prefix, std::string &err) { std::ifstream t(path); std::stringstream buffer; buffer << t.rdbuf(); OkResponseVerifier verifier(talker.resilveringCopy(resilveringID, prefix, buffer.str())); if(!verifier.ok()) { err = SSTR("Error when coping " << path << ": " << verifier.err()); } mFilesSent++; return verifier.ok(); } bool RaftResilverer::copyDirectory(const std::string &target, const std::string &prefix, std::string &err) { qdb_info("Resilvering: Copying directory " << target << " under prefix '" << prefix << "' of remote target"); DirectoryIterator dirIterator(target); struct dirent *entry; while( (entry = dirIterator.next()) ) { if(strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) continue; std::string currentPath = SSTR(target << "/" << entry->d_name); std::string currentPrefix; if(prefix.empty()) { currentPrefix = SSTR(entry->d_name); } else { currentPrefix = SSTR(prefix << "/" << entry->d_name); } if(entry->d_type == DT_DIR) { if(!copyDirectory(currentPath, currentPrefix, err)) { return false; } } else { if(!copyFile(currentPath, currentPrefix, err)) { return false; } } } if(!dirIterator.ok()) { err = SSTR("copyDirectory failed, unable to iterate directory: " << dirIterator.err()); return false; } return true; } void RaftResilverer::main(ThreadAssistant &assistant) { OkResponseVerifier verifier(talker.resilveringStart(resilveringID)); if(!verifier.ok()) { setStatus(ResilveringState::FAILED, SSTR("Could not initiate resilvering: " << verifier.err())); return; } std::string err; std::unique_ptr shardSnapshot = shardDirectory.takeSnapshot(resilveringID, err); if(shardSnapshot == nullptr || !err.empty()) { setStatus(ResilveringState::FAILED, SSTR("Could not create snapshot: " << err)); return; } size_t totalFiles = 0; if(!countFilesInDirectoryRecursively(shardSnapshot->getPath(), err, totalFiles)) { setStatus(ResilveringState::FAILED, err); } mFilesTotal = totalFiles; if(!copyDirectory(shardSnapshot->getPath(), "", err)) { setStatus(ResilveringState::FAILED, err); return; } verifier = OkResponseVerifier(talker.resilveringFinish(resilveringID), 60); if(!verifier.ok()) { setStatus(ResilveringState::FAILED, SSTR("Error when finishing resilvering: " << verifier.err())); return; } setStatus(ResilveringState::SUCCEEDED, ""); }