//------------------------------------------------------------------------------
// File: ScanDir.cc
// Author: Elvin Sindrilaru - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "fst/ScanDir.hh"
#include "common/Path.hh"
#include "common/Constants.hh"
#include "common/IoPriority.hh"
#include "common/StringSplit.hh"
#include "common/StringTokenizer.hh"
#include "console/commands/helpers/FsHelper.hh"
#include "fst/Config.hh"
#include "fst/XrdFstOfs.hh"
#include "fst/Deletion.hh"
#include "fst/filemd/FmdMgm.hh"
#include "fst/storage/FileSystem.hh"
#include "fst/checksum/ChecksumPlugins.hh"
#include "fst/io/FileIoPluginCommon.hh"
#include "fst/layout/HeaderCRC.hh"
#include "fst/layout/ReedSLayout.hh"
#include "fst/layout/RaidDpLayout.hh"
#include "namespace/ns_quarkdb/Constants.hh"
#include "qclient/structures/QSet.hh"
#include
#include
#include
#include
#include
#include
#ifndef __APPLE__
#include
#include
#endif
EOSFSTNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
ScanDir::ScanDir(const char* dirpath, eos::common::FileSystem::fsid_t fsid,
eos::fst::Load* fstload, bool bgthread,
long int file_rescan_interval, int ratebandwidth,
bool fake_clock) :
mFstLoad(fstload), mFsId(fsid), mDirPath(dirpath),
mRateBandwidth(ratebandwidth), mEntryIntervalSec(file_rescan_interval),
mRainEntryIntervalSec(DEFAULT_RAIN_RESCAN_INTERVAL),
mDiskIntervalSec(DEFAULT_DISK_INTERVAL), mNsIntervalSec(DEFAULT_NS_INTERVAL),
mConfDiskIntervalSec(DEFAULT_DISK_INTERVAL),
mNumScannedFiles(0), mNumCorruptedFiles(0),
mNumHWCorruptedFiles(0), mTotalScanSize(0), mNumTotalFiles(0),
mNumSkippedFiles(0), mBuffer(nullptr),
mBufferSize(0), mBgThread(bgthread), mClock(fake_clock), mRateLimit(nullptr)
{
long alignment = pathconf((mDirPath[0] != '/') ? "/" : mDirPath.c_str(),
_PC_REC_XFER_ALIGN);
if (alignment > 0) {
mBufferSize = 256 * alignment;
if (posix_memalign((void**) &mBuffer, alignment, mBufferSize)) {
fprintf(stderr, "error: error calling posix_memaling on dirpath=%s. \n",
mDirPath.c_str());
std::abort();
}
} else {
mBufferSize = 256 * 1024;
mBuffer = (char*) malloc(mBufferSize);
fprintf(stderr,
"error: OS does not provide alignment or path does not exist\n");
}
if (mBgThread) {
openlog("scandir", LOG_PID | LOG_NDELAY, LOG_USER);
mDiskThread.reset(&ScanDir::RunDiskScan, this);
#ifndef _NOOFS
mRateLimit.reset(new eos::common::RequestRateLimit());
mRateLimit->SetRatePerSecond(sDefaultNsScanRate);
mNsThread.reset(&ScanDir::RunNsScan, this);
#endif
}
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
ScanDir::~ScanDir()
{
if (mBgThread) {
mDiskThread.join();
mNsThread.join();
closelog();
}
if (mBuffer) {
free(mBuffer);
}
}
//------------------------------------------------------------------------------
// Update scanner configuration
//------------------------------------------------------------------------------
void
ScanDir::SetConfig(const std::string& key, long long value)
{
eos_info("msg=\"update scanner configuration\" key=\"%s\" value=\"%s\"",
key.c_str(), std::to_string(value).c_str());
if (key == eos::common::SCAN_IO_RATE_NAME) {
mRateBandwidth.store(static_cast(value), std::memory_order_relaxed);
} else if (key == eos::common::SCAN_ENTRY_INTERVAL_NAME) {
mEntryIntervalSec.store(value, std::memory_order_release);
} else if (key == eos::common::SCAN_RAIN_ENTRY_INTERVAL_NAME) {
mRainEntryIntervalSec.store(value, std::memory_order_release);
} else if (key == eos::common::SCAN_DISK_INTERVAL_NAME) {
if (mDiskIntervalSec.compare_exchange_strong(mConfDiskIntervalSec,
static_cast(value),
std::memory_order_acq_rel)) {
// Move the following line after join if you want to prevent a toggle until join
mConfDiskIntervalSec = static_cast(value);
mDiskThread.join();
mDiskThread.reset(&ScanDir::RunDiskScan, this);
}
} else if (key == eos::common::SCAN_NS_INTERVAL_NAME) {
#ifndef _NOOFS
if (mNsIntervalSec != static_cast(value)) {
mNsIntervalSec.store(value, std::memory_order_relaxed);
mNsThread.join();
mNsThread.reset(&ScanDir::RunNsScan, this);
}
#endif
} else if (key == eos::common::SCAN_NS_RATE_NAME) {
mRateLimit->SetRatePerSecond(value);
}
}
#ifndef _NOOFS
//------------------------------------------------------------------------------
// Infinite loop doing the scanning of namespace entries
//------------------------------------------------------------------------------
void
ScanDir::RunNsScan(ThreadAssistant& assistant) noexcept
{
using namespace std::chrono;
using eos::common::FileId;
eos_info("msg=\"started the ns scan thread\" fsid=%lu dirpath=\"%s\" "
"ns_scan_interval_sec=%llu", mFsId, mDirPath.c_str(),
mNsIntervalSec.load(std::memory_order_relaxed));
if (gOFS.mFsckQcl == nullptr) {
eos_notice("%s", "msg=\"no qclient present, skipping ns scan\"");
return;
}
// Wait for the corresponding file system to boot before starting
while ((gOFS.Storage->ExistsFs(mFsId) == false) ||
gOFS.Storage->IsFsBooting(mFsId)) {
assistant.wait_for(std::chrono::seconds(5));
if (assistant.terminationRequested()) {
eos_info("%s", "msg=\"stopping ns scan thread\"");
return;
}
}
// Get a random smearing and avoid that all start at the same time
size_t sleep_sec = (1.0 * mNsIntervalSec.load(std::memory_order_relaxed) *
random() / RAND_MAX);
eos_info("msg=\"delay ns scan thread by %llu seconds\" fsid=%lu dirpath=\"%s\"",
sleep_sec, mFsId, mDirPath.c_str());
assistant.wait_for(seconds(sleep_sec));
while (!assistant.terminationRequested()) {
AccountMissing();
CleanupUnlinked();
assistant.wait_for(seconds(mNsIntervalSec.load(std::memory_order_relaxed)));
}
}
//----------------------------------------------------------------------------
// Account for missing replicas
//----------------------------------------------------------------------------
void
ScanDir::AccountMissing()
{
using eos::common::FileId;
struct stat info;
eos::common::FsckErrsPerFsMap errs_map;
auto fids = CollectNsFids(eos::fsview::sFilesSuffix);
eos_info("msg=\"scanning %llu attached namespace entries\"", fids.size());
while (!fids.empty()) {
// Tag any missing replicas
eos::IFileMD::id_t fid = fids.front();
fids.pop_front();
std::string fpath =
FileId::FidPrefix2FullPath(FileId::Fid2Hex(fid).c_str(), mDirPath.c_str());
if (stat(fpath.c_str(), &info)) {
// Double check that this not a file which was deleted in the meantime
try {
if (IsBeingDeleted(fid)) {
// Give it one more kick by dropping the file from disk and QDB
XrdOucErrInfo tmp_err;
if (gOFS._rem("/DELETION_FSCK", tmp_err, nullptr, nullptr, fpath.c_str(),
fid, mFsId, true)) {
eos_err("msg=\"failed to remove local file\" path=%s fxid=%08llx "
"fsid=%lu", fpath.c_str(), fid, mFsId);
}
} else {
// File missing on disk, mark it but check the MGM info since the
// file might be 0-size so we need to remove the kMissing flag
eos::common::FmdHelper ns_fmd;
auto file = eos::MetadataFetcher::getFileFromId(*gOFS.mFsckQcl.get(),
eos::FileIdentifier(fid));
FmdMgmHandler::NsFileProtoToFmd(std::move(file).get(), ns_fmd);
if (ns_fmd.mProtoFmd.mgmsize() != 0) {
// Mark as missing and also mark the current fsid
ns_fmd.mProtoFmd.set_fsid(mFsId);
ns_fmd.mProtoFmd.set_layouterror(ns_fmd.mProtoFmd.layouterror() |
LayoutId::kMissing);
}
CollectInconsistencies(ns_fmd, mFsId, errs_map);
}
} catch (eos::MDException& e) {
// No file on disk, no ns file metadata object but we have a ghost entry
// in the file system view - delete it
if (!DropGhostFid(mFsId, fid)) {
eos_err("msg=\"failed to drop ghost entry\" fxid=%08llx fsid=%lu",
fid, mFsId);
}
}
}
// Rate limit enforced for the current disk
mRateLimit->Allow();
}
// Push collected errors to QDB
if (!gOFS.Storage->PushToQdb(mFsId, errs_map)) {
eos_err("msg=\"failed to push fsck errors to QDB\" fsid=%lu", mFsId);
}
}
//----------------------------------------------------------------------------
// Cleanup unlinked replicas older than 10 min still laying around
//----------------------------------------------------------------------------
void
ScanDir::CleanupUnlinked()
{
using eos::common::FileId;
// Loop over the unlinked files and force unlink them if too old
auto unlinked_fids = CollectNsFids(eos::fsview::sUnlinkedSuffix);
eos_info("msg=\"scanning %llu unlinked namespace entries\"",
unlinked_fids.size());
while (!unlinked_fids.empty()) {
eos::IFileMD::id_t fid = unlinked_fids.front();
unlinked_fids.pop_front();
try {
if (IsBeingDeleted(fid) == false) {
// Put the fid in the queue of files to be deleted and this should
// clean both the disk file and update the namespace entry
eos_info("msg=\"resubmit for deletion\" fxid=%08llx fsid=%lu",
fid, mFsId);
std::vector id_vect {fid};
auto deletion = std::make_unique
(id_vect, mFsId, mDirPath.c_str());
gOFS.Storage->AddDeletion(std::move(deletion));
}
} catch (eos::MDException& e) {
// There is no file metadata object so we delete any potential file from
// the local disk and also drop the ghost entry from the file system view
eos_info("msg=\"cleanup ghost unlinked file\" fxid=%08llx fsid=%lu",
fid, mFsId);
std::string fpath =
FileId::FidPrefix2FullPath(FileId::Fid2Hex(fid).c_str(), mDirPath.c_str());
// Drop the file from disk and local DB
XrdOucErrInfo tmp_err;
if (gOFS._rem("/DELETION_FSCK", tmp_err, nullptr, nullptr, fpath.c_str(),
fid, mFsId, true)) {
eos_err("msg=\"failed remove local file\" path=%s fxid=%08llx fsid=%lu",
fpath.c_str(), fid, mFsId);
}
if (!DropGhostFid(mFsId, fid)) {
eos_err("msg=\"failed to drop ghost entry\" fxid=%08llx fsid=%lu",
fid, mFsId);
}
}
mRateLimit->Allow();
}
}
//------------------------------------------------------------------------------
// Drop ghost fid from the given file system id
//------------------------------------------------------------------------------
bool
ScanDir::DropGhostFid(const eos::common::FileSystem::fsid_t fsid,
const eos::IFileMD::id_t fid) const
{
GlobalOptions opts;
opts.mForceSss = true;
FsHelper fs_cmd(opts);
if (fs_cmd.ParseCommand(SSTR("fs dropghosts " << fsid
<< " --fid " << fid).c_str())) {
eos_err("%s", "msg=\"failed to parse fs dropghosts command\"");
return false;
}
if (fs_cmd.ExecuteWithoutPrint()) {
return false;
}
return true;
}
//------------------------------------------------------------------------------
// Check if file is unlinked from the namespace and in the process of being
// deleted from the disk. Files that are unlinked for more than 10 min
// definetely have a problem and we don't account them as in the process of
// being deleted.
//------------------------------------------------------------------------------
bool
ScanDir::IsBeingDeleted(const eos::IFileMD::id_t fid) const
{
using namespace std::chrono;
auto file_fut = eos::MetadataFetcher::getFileFromId(*gOFS.mFsckQcl.get(),
eos::FileIdentifier(fid));
// Throws an exception if file metadata object doesn't exist
eos::ns::FileMdProto fmd = std::move(file_fut).get();
return (fmd.cont_id() == 0ull);
}
//------------------------------------------------------------------------------
// Collect all file ids present on the current file system from the NS view
//------------------------------------------------------------------------------
std::deque
ScanDir::CollectNsFids(const std::string& type) const
{
std::deque queue;
if ((type != eos::fsview::sFilesSuffix) &&
(type != eos::fsview::sUnlinkedSuffix)) {
eos_err("msg=\"unsupported type %s\"", type.c_str());
return queue;
}
std::ostringstream oss;
oss << eos::fsview::sPrefix << mFsId << ":" << type;
const std::string key = oss.str();
qclient::QSet qset(*gOFS.mFsckQcl.get(), key);
try {
for (qclient::QSet::Iterator it = qset.getIterator(); it.valid(); it.next()) {
try {
queue.push_back(std::stoull(it.getElement()));
} catch (...) {
eos_err("msg=\"failed to convert fid entry\" data=\"%s\"",
it.getElement().c_str());
}
}
} catch (const std::runtime_error& e) {
// There is no such set in QDB
}
return queue;
}
#endif
//------------------------------------------------------------------------------
// Infinite loop doing the scanning
//------------------------------------------------------------------------------
void
ScanDir::RunDiskScan(ThreadAssistant& assistant) noexcept
{
using namespace std::chrono;
pid_t tid = 0;
if (mBgThread) {
tid = (pid_t) syscall(SYS_gettid);
int retc = 0;
if ((retc = ioprio_set(IOPRIO_WHO_PROCESS,
IOPRIO_PRIO_VALUE(IOPRIO_CLASS_BE, 7)))) {
eos_err("msg=\"cannot set io priority to lowest best effort\" "
"retc=%d errno=%d\n", retc, errno);
} else {
eos_notice("msg=\"set io priority to 7(lowest best-effort)\" pid=%u "
"fsid=%lu", tid, mFsId);
}
}
#ifndef _NOOFS
// Wait for the corresponding file system to boot before starting
while (gOFS.Storage->IsFsBooting(mFsId)) {
assistant.wait_for(std::chrono::seconds(5));
if (assistant.terminationRequested()) {
eos_info("%s", "msg=\"stopping disk scan thread\"");
return;
}
}
if (gOFS.mFsckQcl == nullptr) {
eos_notice("%s", "msg=\"no qclient present, skipping disk scan\"");
return;
}
#endif
// If there is a reconfiguration of Disk/Fsck Interval, reload these only
// after current run
uint64_t disk_interval_sec = mDiskIntervalSec.load(std::memory_order_acquire);
if (mBgThread) {
// Get a random smearing and avoid that all start at the same time! 0-4 hours
size_t sleeper = (1.0 * disk_interval_sec * random() / RAND_MAX);
eos_info("msg=\"start scanning\" fsid=%lu disk_scan_interval_sec=%llu "
"init_delay_sec=%llu", mFsId, disk_interval_sec, sleeper);
assistant.wait_for(seconds(sleeper));
}
while (!assistant.terminationRequested()) {
mNumScannedFiles = mTotalScanSize = mNumCorruptedFiles = 0;
mNumHWCorruptedFiles = mNumTotalFiles = mNumSkippedFiles = 0;
auto start_ts = std::chrono::system_clock::now();
// Do the heavy work
ScanSubtree(assistant);
auto finish_ts = std::chrono::system_clock::now();
seconds duration = duration_cast(finish_ts - start_ts);
// Check if there was a config update before we sleep
disk_interval_sec = mDiskIntervalSec.load(std::memory_order_acquire);
std::string log_msg =
SSTR("[ScanDir] Directory: " << mDirPath << " files=" << mNumTotalFiles
<< " scanduration=" << duration.count() << " [s] scansize="
<< mTotalScanSize << " [Bytes] [ " << (mTotalScanSize / 1e6)
<< " MB ] scannedfiles=" << mNumScannedFiles << " corruptedfiles="
<< mNumCorruptedFiles << " hwcorrupted=" << mNumHWCorruptedFiles
<< " skippedfiles=" << mNumSkippedFiles
<< " disk_scan_interval_sec=" << disk_interval_sec);
if (mBgThread) {
syslog(LOG_ERR, "%s\n", log_msg.c_str());
eos_notice("%s", log_msg.c_str());
} else {
fprintf(stderr, "%s\n", log_msg.c_str());
}
if (mBgThread) {
// Run again after (default) 4 hours
assistant.wait_for(std::chrono::seconds(disk_interval_sec));
} else {
break;
}
}
eos_notice("msg=\"done disk scan\" pid=%u fsid=%lu", tid, mFsId);
}
//------------------------------------------------------------------------------
// Method traversing all the files in the subtree and potentially rescanning
// some of them.
//------------------------------------------------------------------------------
void
ScanDir::ScanSubtree(ThreadAssistant& assistant) noexcept
{
std::unique_ptr io(FileIoPluginHelper::GetIoObject(mDirPath.c_str()));
if (!io) {
LogMsg(LOG_ERR, "msg=\"no IO plug-in available\" url=\"%s\"",
mDirPath.c_str());
return;
}
std::unique_ptr handle {io->ftsOpen()};
if (!handle) {
LogMsg(LOG_ERR, "msg=\"fts_open failed\" dir=%s", mDirPath.c_str());
return;
}
std::string fpath;
eos::common::FsckErrsPerFsMap errs_map;
while ((fpath = io->ftsRead(handle.get())) != "") {
if (!mBgThread) {
fprintf(stderr, "[ScanDir] processing file %s\n", fpath.c_str());
}
if (CheckFile(fpath)) {
#ifndef _NOOFS
// Collect fsck errors and save them to be sent later on to QDB
auto fid = eos::common::FileId::PathToFid(fpath.c_str());
if (!fid) {
eos_static_info("msg=\"skip file which is not a eos data file\", "
"path=\"%s\"", fpath.c_str());
continue;
}
auto fmd = gOFS.mFmdHandler->LocalGetFmd(fid, mFsId, true, false);
if (fmd) {
CollectInconsistencies(*fmd.get(), mFsId, errs_map);
}
#endif
}
if (assistant.terminationRequested()) {
return;
}
}
if (io->ftsClose(handle.get())) {
LogMsg(LOG_ERR, "msg=\"fts_close failed\" dir=%s", mDirPath.c_str());
}
#ifndef _NOOFS
// Push collected errors to QDB
if (!gOFS.Storage->PushToQdb(mFsId, errs_map)) {
eos_err("msg=\"failed to push fsck errors to QDB\" fsid=%lu", mFsId);
}
#endif
}
//------------------------------------------------------------------------------
// Check the given file for errors and properly account them both at the
// scanner level and also by setting the proper xattrs on the file.
//------------------------------------------------------------------------------
bool
ScanDir::CheckFile(const std::string& fpath)
{
using eos::common::LayoutId;
eos_debug("msg=\"start file check\" path=\"%s\"", fpath.c_str());
// Skip scanning orphan files
if (fpath.find("/.eosorphans") != std::string::npos) {
eos_debug("msg=\"skip orphan file\" path=\"%s\"", fpath.c_str());
return false;
}
// Skip scanning our scrub files (/scrub.write-once.X, /scrub.re-write.X)
if ((fpath.find("/scrub.") != std::string::npos) ||
(fpath.find(".xsmap") != std::string::npos)) {
eos_debug("msg=\"skip scrub/xs file\" path=\"%s\"", fpath.c_str());
return false;
}
std::unique_ptr io(FileIoPluginHelper::GetIoObject(fpath.c_str()));
auto fid = eos::common::FileId::PathToFid(fpath.c_str());
if (!fid) {
eos_static_info("msg=\"skip file which is not an eos data file\", "
"path=\"%s\"", fpath.c_str());
return false;
}
// Get last modification time
struct stat info;
if ((io->fileOpen(0, 0)) || io->fileStat(&info)) {
LogMsg(LOG_ERR, "msg=\"open/stat failed\" path=%s\"", fpath.c_str());
return false;
}
++mNumTotalFiles;
#ifndef _NOOFS
if (mBgThread) {
if (gOFS.openedForWriting.isOpen(mFsId, fid)) {
syslog(LOG_ERR, "skipping scan w-open file: localpath=%s fsid=%d fxid=%08llx\n",
fpath.c_str(), mFsId, fid);
eos_warning("msg=\"skipping scan of w-open file\" localpath=%s fsid=%d "
"fxid=%08llx", fpath.c_str(), mFsId, fid);
return false;
}
gOFS.mFmdHandler->ClearErrors(fid, mFsId);
}
#endif
std::string scan_ts_sec = "0";
io->attrGet("user.eos.timestamp", scan_ts_sec);
// Handle the old format in microseconds, truncate to seconds
if (scan_ts_sec.length() > 10) {
scan_ts_sec.erase(10);
}
bool scan_result = false;
if (DoRescan(scan_ts_sec)) {
scan_result = ScanFile(io, fpath, fid, scan_ts_sec, info.st_mtime);
} else {
++mNumSkippedFiles;
}
#ifndef _NOOFS
// Check for block xs file otherwise it has no sense even to attempt a
// rain rescan of the file
struct stat info_xs;
const std::string filexs_path = fpath + ".xsmap";
if ((stat(filexs_path.c_str(), &info_xs) == 0) && info_xs.st_size) {
// Grab the latest rain scan timestamp if it exists
scan_ts_sec = "0";
io->attrGet("user.eos.rain_timestamp", scan_ts_sec);
if (DoRescan(scan_ts_sec, true)) {
scan_result = (ScanRainFile(io, fpath, fid, scan_ts_sec) || scan_result);
}
}
#endif
return scan_result;
}
//------------------------------------------------------------------------------
// Get block checksum object for the given file. First we need to check if
// there is a block checksum file (.xsmap) corresponding to the given raw file.
//------------------------------------------------------------------------------
std::unique_ptr
ScanDir::GetBlockXS(const std::string& file_path)
{
using eos::common::LayoutId;
std::string str_bxs_type, str_bxs_size;
std::string filexs_path = file_path + ".xsmap";
std::unique_ptr io(FileIoPluginHelper::GetIoObject(
filexs_path));
struct stat info;
if (!io->fileStat(&info, 0)) {
io->attrGet("user.eos.blockchecksum", str_bxs_type);
io->attrGet("user.eos.blocksize", str_bxs_size);
if (str_bxs_type.compare("")) {
unsigned long bxs_type = LayoutId::GetBlockChecksumFromString(str_bxs_type);
int bxs_size = atoi(str_bxs_size.c_str());
int bxs_size_type = LayoutId::BlockSizeEnum(bxs_size);
auto layoutid = LayoutId::GetId(LayoutId::kPlain, LayoutId::kNone, 0,
bxs_size_type, bxs_type);
std::unique_ptr checksum =
eos::fst::ChecksumPlugins::GetChecksumObject(layoutid, true);
if (checksum) {
if (checksum->OpenMap(filexs_path.c_str(), info.st_size, bxs_size, false)) {
return checksum;
} else {
return nullptr;
}
} else {
LogMsg(LOG_ERR, "%s", SSTR("msg=\"failed to get checksum object\" "
<< "layoutid=" << std::hex << layoutid
<< std::dec << "path=" << filexs_path).c_str());
}
} else {
LogMsg(LOG_ERR, "%s", SSTR("msg=\"file has no blockchecksum xattr\""
<< " path=" << filexs_path).c_str());
}
}
return nullptr;
}
//------------------------------------------------------------------------------
// Decide if a rescan is needed based on the timestamp provided and the
// configured rescan interval
//------------------------------------------------------------------------------
bool
ScanDir::DoRescan(const std::string& timestamp_sec, bool rain_ts) const
{
using namespace std::chrono;
uint64_t rescan_interval = rain_ts ?
mRainEntryIntervalSec.load(std::memory_order_acquire) :
mEntryIntervalSec.load(std::memory_order_acquire);
if (timestamp_sec == "") {
if (rescan_interval == 0ull) {
return false;
} else {
// Check the first time if scanner is not completely disabled
return true;
}
}
uint64_t elapsed_sec {0ull};
// Used only during testing
if (mClock.IsFake()) {
steady_clock::time_point old_ts(seconds(std::stoull(timestamp_sec)));
steady_clock::time_point now_ts(mClock.getTime());
elapsed_sec = duration_cast(now_ts - old_ts).count();
} else {
system_clock::time_point old_ts(seconds(std::stoull(timestamp_sec)));
system_clock::time_point now_ts(system_clock::now());
elapsed_sec = duration_cast(now_ts - old_ts).count();
}
if (elapsed_sec < rescan_interval) {
return false;
} else {
if (rescan_interval) {
return true;
} else {
return false;
}
}
}
//------------------------------------------------------------------------------
// Check the given file for errors and properly account them both at the
// scanner level and also by setting the proper xattrs on the file.
//------------------------------------------------------------------------------
bool
ScanDir::ScanFile(const std::unique_ptr& io,
const std::string& fpath,
eos::common::FileId::fileid_t fid,
const std::string& scan_ts_sec,
time_t mtime)
{
std::string lfn, previous_xs_err;
io->attrGet("user.eos.lfn", lfn);
io->attrGet("user.eos.filecxerror", previous_xs_err);
bool was_healthy = (previous_xs_err == "0");
// Flag if file has been modified since the last time we scanned it
bool didnt_change = (mtime < atoll(scan_ts_sec.c_str()));
bool blockxs_err = false;
bool filexs_err = false;
unsigned long long scan_size{0ull};
std::string scan_xs_hex;
if (!ScanFileLoadAware(io, scan_size, scan_xs_hex, filexs_err, blockxs_err)) {
return false;
}
bool reopened = false;
#ifndef _NOOFS
if (mBgThread) {
if (gOFS.openedForWriting.isOpen(mFsId, fid)) {
eos_err("msg=\"file reopened during the scan, ignore checksum error\" "
"path=%s", fpath.c_str());
reopened = true;
}
}
#endif
// If file changed or opened for update in the meantime then skip the scan
struct stat info;
if (reopened || io->fileStat(&info) || (mtime != info.st_mtime)) {
LogMsg(LOG_ERR, "msg=\"[ScanDir] skip file modified during scan path=%s",
fpath.c_str());
return false;
}
if (filexs_err) {
if (mBgThread) {
syslog(LOG_ERR, "corrupted file checksum path=%s lfn=%s\n",
fpath.c_str(), lfn.c_str());
eos_err("msg=\"corrupted file checksum\" path=\"%s\" lfn=\"%s\"",
fpath.c_str(), lfn.c_str());
} else {
fprintf(stderr, "[ScanDir] corrupted file checksum path=%s lfn=%s\n",
fpath.c_str(), lfn.c_str());
}
if (was_healthy && didnt_change) {
++mNumHWCorruptedFiles;
if (mBgThread) {
syslog(LOG_ERR, "HW corrupted file found path=%s lfn=%s\n",
fpath.c_str(), lfn.c_str());
} else {
fprintf(stderr, "HW corrupted file found path=%s lfn=%s\n",
fpath.c_str(), lfn.c_str());
}
}
}
// Collect statistics
mTotalScanSize += scan_size;
if ((io->attrSet("user.eos.timestamp", GetTimestampSmearedSec())) ||
(io->attrSet("user.eos.filecxerror", filexs_err ? "1" : "0")) ||
(io->attrSet("user.eos.blockcxerror", blockxs_err ? "1" : "0"))) {
LogMsg(LOG_ERR, "msg=\"failed to set xattrs\" path=%s", fpath.c_str());
}
#ifndef _NOOFS
if (mBgThread) {
gOFS.mFmdHandler->UpdateWithScanInfo(fid, mFsId, fpath, scan_size,
scan_xs_hex, gOFS.mFsckQcl);
}
#endif
return true;
}
//------------------------------------------------------------------------------
// Scan file taking the load into consideration
//------------------------------------------------------------------------------
bool
ScanDir::ScanFileLoadAware(const std::unique_ptr& io,
unsigned long long& scan_size,
std::string& scan_xs_hex,
bool& filexs_err, bool& blockxs_err)
{
scan_size = 0ull;
filexs_err = blockxs_err = false;
int scan_rate = mRateBandwidth.load(std::memory_order_relaxed);
std::string file_path = io->GetPath();
struct stat info;
if (io->fileStat(&info)) {
eos_err("msg=\"failed stat\" path=%s\"", file_path.c_str());
return false;
}
// Get checksum type and value
std::string xs_type;
char xs_val[SHA256_DIGEST_LENGTH];
memset(xs_val, 0, sizeof(xs_val));
size_t xs_len = SHA256_DIGEST_LENGTH;
io->attrGet("user.eos.checksumtype", xs_type);
io->attrGet("user.eos.checksum", xs_val, xs_len);
auto comp_file_xs = eos::fst::ChecksumPlugins::GetXsObj(xs_type);
std::unique_ptr blockXS {GetBlockXS(file_path)};
if (comp_file_xs) {
comp_file_xs->Reset();
}
int64_t nread = 0;
off_t offset = 0;
const auto open_ts = std::chrono::system_clock::now();
do {
nread = io->fileRead(offset, mBuffer, mBufferSize);
if (nread < 0) {
if (blockXS) {
blockXS->CloseMap();
}
eos_err("msg=\"failed read\" offset=%llu path=%s", offset,
file_path.c_str());
return false;
}
if (nread) {
if (nread > mBufferSize) {
eos_err("msg=\"read returned more than the buffer size\" buff_sz=%llu "
"nread=%lli\"", mBufferSize, nread);
if (blockXS) {
blockXS->CloseMap();
}
return false;
}
if (blockXS && (blockxs_err == false)) {
if (!blockXS->CheckBlockSum(offset, mBuffer, nread)) {
blockxs_err = true;
}
}
if (comp_file_xs) {
comp_file_xs->Add(mBuffer, nread, offset);
}
offset += nread;
EnforceAndAdjustScanRate(offset, open_ts, scan_rate);
}
} while (nread == mBufferSize);
scan_size = (unsigned long long) offset;
const auto close_ts = std::chrono::system_clock().now();
auto tx_duration_ms = std::chrono::duration_cast
(close_ts - open_ts).count();
eos_static_debug("path=%s size(bytes)=%llu scan_duration_ms=%llu rate(MB/s)=%.02f",
file_path.c_str(), scan_size, tx_duration_ms,
(((1.0 * offset) / (1024 * 1024)) * 1000) / tx_duration_ms)
if (comp_file_xs) {
comp_file_xs->Finalize();
}
// Check file checksum only for replica layouts
if (comp_file_xs) {
scan_xs_hex = comp_file_xs->GetHexChecksum();
if (!comp_file_xs->Compare(xs_val)) {
auto exp_file_xs = eos::fst::ChecksumPlugins::GetXsObj(xs_type);
exp_file_xs->SetBinChecksum(xs_val, xs_len);
LogMsg(LOG_ERR, "msg=\"file checksum error\" expected_file_xs=%s "
"computed_file_xs=%s scan_size=%llu path=%s",
exp_file_xs->GetHexChecksum(), comp_file_xs->GetHexChecksum(),
scan_size, file_path.c_str());
++mNumCorruptedFiles;
filexs_err = true;
}
}
// Check block checksum
if (blockxs_err) {
LogMsg(LOG_ERR, "msg=\"corrupted block checksum\" path=%s "
"blockxs_path=%s.xsmap", file_path.c_str(), file_path.c_str());
if (mBgThread) {
syslog(LOG_ERR, "corrupted block checksum path=%s blockxs_path=%s.xsmap\n",
file_path.c_str(), file_path.c_str());
}
}
if (blockXS) {
blockXS->CloseMap();
}
++mNumScannedFiles;
return true;
}
#ifndef _NOOFS
//------------------------------------------------------------------------------
// Check the given file for rain stripes errors
//------------------------------------------------------------------------------
bool
ScanDir::ScanRainFile(const std::unique_ptr& io,
const std::string& fpath, eos::common::FileId::fileid_t fid,
const std::string& scan_ts_sec)
{
if (mBgThread) {
// Skip check if file is open for reading, as this can mean we are in the
// middle of a recovery operation, and another stripe is open for write
if (gOFS.openedForReading.isOpen(mFsId, fid)) {
syslog(LOG_ERR, "skipping rain scan r-open file: localpath=%s fsid=%d "
"fxid=%08llx\n", fpath.c_str(), mFsId, fid);
eos_warning("msg=\"skipping rain scan of r-open file\" localpath=%s "
"fsid=%d fxid=%08llx", fpath.c_str(), mFsId, fid);
return false;
}
}
std::unique_ptr hd(new HeaderCRC(0, 0));
if (!hd) {
eos_static_err("msg=\"failed to allocate header\" path=\"%s\"",
fpath.c_str());
return false;
}
hd->ReadFromFile(io.get(), 0);
// Only do the check from the fst with the first stripe
if (!hd->IsValid() || hd->GetIdStripe() != 0) {
return false;
}
struct stat info1;
if (io->fileStat(&info1)) {
LogMsg(LOG_ERR, "msg=\"stat failed\" path=%s\"", fpath.c_str());
return false;
}
if (info1.st_ctime < std::stoll(scan_ts_sec)) {
eos_static_debug("msg=\"skip rain check for unmodified file\" path=\"%s\"",
fpath.c_str());
return false;
}
std::set invalid_fsid;
if (!ScanRainFileLoadAware(fid, invalid_fsid)) {
return false;
}
bool reopened = false;
if (mBgThread) {
if (gOFS.openedForWriting.isOpen(mFsId, fid)) {
eos_static_err("msg=\"file reopened during the scan, ignore stripe "
"error\" path=\"%s\"", fpath.c_str());
reopened = true;
}
}
// If file changed or opened for update in the meantime then skip the scan
struct stat info2;
if (reopened || io->fileStat(&info2) || (info1.st_ctime != info2.st_ctime)) {
eos_static_err("msg=\"skip file modified during scan\" path=\"%s\"",
fpath.c_str());
return false;
}
if (io->attrSet("user.eos.rain_timestamp", GetTimestampSmearedSec(true))) {
eos_static_err("msg=\"failed to set xattr rain_timestamp\" path=\"%s\"",
fpath.c_str());
}
if (mBgThread) {
gOFS.mFmdHandler->UpdateWithStripeCheckInfo(fid, mFsId, invalid_fsid);
}
return true;
}
//------------------------------------------------------------------------------
// Check if a given stripe combination can recreate the original file
//------------------------------------------------------------------------------
bool
ScanDir::IsValidStripeCombination(
const std::vector>& stripes,
const std::string& xs_val, std::unique_ptr& xs_obj,
LayoutId::layoutid_t layout, const std::string& opaqueInfo)
{
std::unique_ptr redundancyObj;
if (LayoutId::GetLayoutType(layout) == LayoutId::kRaidDP) {
redundancyObj = std::make_unique
(nullptr, layout, nullptr, nullptr,
stripes.front().second.c_str(), 0, false);
} else {
redundancyObj = std::make_unique
(nullptr, layout, nullptr, nullptr,
stripes.front().second.c_str(), 0, false);
}
if (redundancyObj->OpenPio(stripes, 0, 0, opaqueInfo.c_str())) {
eos_static_err("msg=\"unable to pio open\" opaque=\"%s\"", opaqueInfo.c_str());
redundancyObj->Close();
return false;
}
uint32_t offsetXrd = 0;
const auto open_ts = std::chrono::system_clock::now();
int scan_rate = mRateBandwidth.load(std::memory_order_relaxed);
xs_obj->Reset();
while (true) {
int64_t nread = redundancyObj->Read(offsetXrd, mBuffer, mBufferSize);
if (nread == 0) {
break;
}
if (nread == -1) {
redundancyObj->Close();
return false;
}
xs_obj->Add(mBuffer, nread, offsetXrd);
offsetXrd += nread;
EnforceAndAdjustScanRate(offsetXrd, open_ts, scan_rate);
}
redundancyObj->Close();
xs_obj->Finalize();
return !strcmp(xs_obj->GetHexChecksum(), xs_val.c_str());
}
//------------------------------------------------------------------------------
// Check for stripes that are unable to reconstruct the original file
//------------------------------------------------------------------------------
bool
ScanDir::ScanRainFileLoadAware(eos::common::FileId::fileid_t fid,
std::set&
invalid_fsid)
{
const std::string mgr = gConfig.GetManager();
if (mgr.empty()) {
eos_static_err("%s", "msg=\"no manager info available\"");
return false;
}
std::string xs_mgm;
uint32_t num_locations;
LayoutId::layoutid_t layout;
{
// Reduce scope of the FmdHelper object
auto fmd = gOFS.mFmdHandler->LocalGetFmd(fid, mFsId, true, false);
if (!fmd) {
eos_static_err("msg=\"could not get fmd from manager\" fxid=%08llx", fid);
return false;
}
layout = fmd->mProtoFmd.lid();
if (!LayoutId::IsRain(layout)) {
eos_static_err("msg=\"layout is not rain\" fixd=%08llx", fid);
return false;
}
num_locations = fmd->GetLocations().size();
xs_mgm = fmd->mProtoFmd.mgmchecksum();
}
if (xs_mgm.empty() || (num_locations == 0)) {
eos_static_err("msg=\"mgm checksum empty or no locations\" fxid=%08llx",
fid);
return false;
}
const std::string address = SSTR("root://" << mgr << "/");
const XrdCl::URL url(address.c_str());
if (!url.IsValid()) {
eos_static_err("msg=\"invalid url\" url=\"%s\"", address.c_str());
return false;
}
// Query MGM for list of stripes to open
XrdCl::Buffer arg;
XrdCl::Buffer* resp_raw = nullptr;
const std::string opaque = SSTR("/.fxid:" << std::hex << fid << std::dec
<< "?mgm.pcmd=open&eos.ruid=0&eos.rgid=0&"
<< "xrd.wantprot=sss");
arg.FromString(opaque);
XrdCl::FileSystem fs(url);
const XrdCl::XRootDStatus status =
fs.Query(XrdCl::QueryCode::OpaqueFile, arg, resp_raw);
if (!status.IsOK()) {
eos_static_err("msg=\"MGM query failed\" opaque=\"%s\"", opaque.c_str());
delete resp_raw;
return false;
}
const std::string response(resp_raw->GetBuffer(), resp_raw->GetSize());
delete resp_raw;
// @note: fragile design as it depends on the location of mgm.logid!
const std::string opaqueInfo = strstr(response.c_str(), "&mgm.logid");
std::unique_ptr openOpaque(new XrdOucEnv(response.c_str()));
XrdOucEnv* raw_cap_opaque = nullptr;
eos::common::SymKey::ExtractCapability(openOpaque.get(), raw_cap_opaque);
std::unique_ptr capOpaque(raw_cap_opaque);
if (!capOpaque->Get("mgm.path")) {
eos_static_err("msg=\"no path in mgm cap response\" response=\"%s\"",
response.c_str());
return false;
}
const std::string ns_path = capOpaque->Get("mgm.path");
const auto nStripes = LayoutId::GetStripeNumber(layout) + 1;
const auto nParityStripes = LayoutId::GetRedundancyStripeNumber(layout);
const auto nDataStripes = nStripes - nParityStripes;
FileSystem::fsid_t stripeFsId = 0;
std::string stripeUrl;
std::string pio;
std::string tag;
// Struct for stripe extra information
struct stripe_s {
FileSystem::fsid_t fsid;
std::string url;
enum { Unknown, Valid, Invalid } state;
unsigned int id; // logical stripe id
};
std::vector stripes;
stripes.reserve(num_locations);
for (unsigned long i = 0; i < num_locations; ++i) {
tag = SSTR("pio." << i);
// Skip files with missing replicas, they will be detected elsewhere.
if (!openOpaque->Get(tag.c_str())) {
eos_static_err("msg=\"missing pio entry in mgm response\" fxid=%08llx",
fid);
return false;
}
pio = openOpaque->Get(tag.c_str());
stripeUrl = SSTR("root://" << pio << "/" << ns_path);
stripeFsId = capOpaque->GetInt(SSTR("mgm.fsid" << i).c_str());
// Start by marking all stripes invalid. Mark them unknown once we
// have successfully read their headers.
stripes.push_back({stripeFsId, stripeUrl, stripe_s::Invalid, 0});
}
std::unique_ptr hd {new HeaderCRC(0, 0)};
if (!hd) {
eos_static_err("%s", "msg=\"failed to instantiate header object\"");
return false;
}
// Read each header to check if it is valid
std::map> mapPL;
for (unsigned long i = 0; i < stripes.size(); ++i) {
std::unique_ptr file{FileIoPlugin::GetIoObject(stripes[i].url)};
if (file) {
const std::string new_opaque =
SSTR(opaqueInfo << "&mgm.replicaindex=" << i);
file->fileOpen(SFS_O_RDONLY, 0, new_opaque);
hd->ReadFromFile(file.get(), 0);
file->fileClose();
// If stripe id is greater than nStripe, it's invalid
if (hd->IsValid() && (hd->GetIdStripe() < nStripes)) {
stripes[i].id = hd->GetIdStripe();
stripes[i].state = stripe_s::Unknown;
mapPL[hd->GetIdStripe()].insert(i);
}
}
}
if (mapPL.size() < nDataStripes) {
eos_static_err("msg=\"not enough valid stripes to reconstruct\" "
"fxid=%08llx", fid);
invalid_fsid.insert(0);
return true;
}
std::unique_ptr xs_obj(ChecksumPlugins::GetChecksumObject(layout));
if (!xs_obj) {
eos_static_err("msg=\"invalid xs_type\" fxid=%08llx", fid);
return false;
}
std::vector combinations(num_locations, false);
std::fill(combinations.begin(), combinations.begin() + nDataStripes, true);
std::vector>
stripeCombination(nParityStripes, std::make_pair(0, "root://__offline_"));
stripeCombination.reserve(nStripes);
// Try to find a valid stripe combination
do {
stripeCombination.erase(stripeCombination.begin() + nParityStripes,
stripeCombination.end());
for (unsigned long i = 0; i < combinations.size(); ++i) {
if (combinations[i]) {
// Skip combinations with invalid stripes
if (stripes[i].state == stripe_s::Invalid) {
break;
}
// Skip if multiple duplicated stripes are in the same combination
auto HasDuplicate = [i, &combinations](unsigned long j) {
return i != j && combinations[j];
};
auto& stripe_loc = mapPL[stripes[i].id];
if (std::find_if(stripe_loc.begin(), stripe_loc.end(), HasDuplicate) !=
stripe_loc.end()) {
break;
}
stripeCombination.emplace_back(i, stripes[i].url);
}
}
// Skip combination if we exited early from previous loop
if (stripeCombination.size() != nStripes) {
continue;
}
if (IsValidStripeCombination(stripeCombination, xs_mgm, xs_obj,
layout, opaqueInfo)) {
for (unsigned long i = 0; i < combinations.size(); ++i) {
if (combinations[i]) {
stripes[i].state = stripe_s::Valid;
}
}
break;
}
} while (std::prev_permutation(combinations.begin(), combinations.end()));
auto isValid = [](const stripe_s & s) {
return s.state == stripe_s::Valid;
};
if (std::none_of(stripes.begin(), stripes.end(), isValid)) {
eos_static_err("msg=\"not enough valid stripes for reconstruction\" "
"fxid=%08llx", fid);
invalid_fsid.insert(0);
return true;
}
// Found a valid combination, check the rest of the stripes
for (unsigned long i = 0; i < stripes.size(); ++i) {
if (stripes[i].state == stripe_s::Unknown) {
stripeCombination.erase(stripeCombination.begin() + nParityStripes,
stripeCombination.end());
// Try combinations with 1 unknown stripe and nDataStripes - 1 valid stripes.
// Exclude duplicates from the combination.
stripeCombination.emplace_back(i, stripes[i].url);
auto& skipStripes = mapPL[stripes[i].id];
for (unsigned long j = 0; j < stripes.size(); ++j) {
if (stripes[j].state == stripe_s::Valid &&
skipStripes.find(j) == skipStripes.end()) {
stripeCombination.emplace_back(j, stripes[j].url);
if (stripeCombination.size() == nStripes) {
break;
}
}
}
if (IsValidStripeCombination(stripeCombination, xs_mgm, xs_obj,
layout, opaqueInfo)) {
stripes[i].state = stripe_s::Valid;
} else {
stripes[i].state = stripe_s::Invalid;
}
}
}
// Collect the fsids of all the invalid stripes
for (unsigned long i = 0; i < stripes.size(); i++) {
if (stripes[i].state == stripe_s::Invalid) {
eos_static_err("msg=\"stripe %d on fst %d is invalid\" fxid=%08llx", i,
stripes[i].fsid, fid);
invalid_fsid.insert(stripes[i].fsid);
}
}
// Collect the fsids of all the duplicated stripes, keeping the replica
// with the lowest fsid
for (auto [_, replicas] : mapPL) {
if (replicas.size() > 1) {
// Used to get the index of the replica which is both valid
// and has the lowest fsid.
auto fsidCmp = [&stripes](const auto & a, const auto & b) {
return stripes[b].state != stripe_s::Valid ||
(stripes[a].state == stripe_s::Valid &&
stripes[a].fsid < stripes[b].fsid);
};
auto min = *std::min_element(replicas.begin(), replicas.end(), fsidCmp);
// Skip if all replicas are invalid
if (stripes[min].state == stripe_s::Valid) {
for (auto i : replicas) {
if ((i != min) && (stripes[i].state == stripe_s::Valid)) {
eos_static_info("msg=\"marking excess stripe %d on fst %d as "
"invalid\" fxid=%08llx", i, stripes[i].fsid, fid);
invalid_fsid.insert(stripes[i].fsid);
}
}
}
}
}
return true;
}
#endif
//------------------------------------------------------------------------------
// Enforce the scan rate by throttling the current thread and also adjust it
// depending on the IO load on the mountpoint
//------------------------------------------------------------------------------
void
ScanDir::EnforceAndAdjustScanRate(const off_t offset,
const std::chrono::time_point
open_ts,
int& scan_rate)
{
using namespace std::chrono;
if (scan_rate && mFstLoad) {
const auto now_ts = std::chrono::system_clock::now();
uint64_t scan_duration_msec =
duration_cast(now_ts - open_ts).count();
uint64_t expect_duration_msec =
(uint64_t)((1.0 * offset) / (scan_rate * 1024 * 1024)) * 1000;
if (expect_duration_msec > scan_duration_msec) {
std::this_thread::sleep_for(milliseconds(expect_duration_msec -
scan_duration_msec));
}
// Adjust the rate according to the load information
double load = mFstLoad->GetDiskRate(mDirPath.c_str(), "millisIO") / 1000.0;
if (load > 0.7) {
// Adjust the scan_rate which is in MB/s but no lower then 5 MB/s
if (scan_rate > 5) {
scan_rate = 0.9 * scan_rate;
}
} else {
scan_rate = mRateBandwidth.load(std::memory_order_relaxed);
}
}
}
//------------------------------------------------------------------------------
// Get timestamp smeared +/-20% of mEntryIntervalSec around the current
// timestamp value
//------------------------------------------------------------------------------
std::string
ScanDir::GetTimestampSmearedSec(bool rain_ts) const
{
using namespace std::chrono;
uint64_t entry_interval_sec;
if (rain_ts) {
entry_interval_sec = mRainEntryIntervalSec.load(std::memory_order_relaxed);
} else {
entry_interval_sec = mEntryIntervalSec.load(std::memory_order_relaxed);
}
int64_t smearing =
(int64_t)(0.2 * 2 * entry_interval_sec * random() / RAND_MAX) -
(int64_t)(0.2 * entry_interval_sec);
uint64_t ts_sec;
if (mClock.IsFake()) {
ts_sec = duration_cast(mClock.getTime().time_since_epoch()).count();
} else {
ts_sec = duration_cast(system_clock::now().time_since_epoch()).count();
}
// Avoid underflow when using the steady_clock for testing
if ((uint64_t)std::abs(smearing) < ts_sec) {
ts_sec += smearing;
}
return std::to_string(ts_sec);
}
EOSFSTNAMESPACE_END