//------------------------------------------------------------------------------
// File: ConversionJob.cc
// Author: Mihai Patrascoiu - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mgm/convert/ConversionJob.hh"
#include "mgm/Stat.hh"
#include "mgm/Quota.hh"
#include "mgm/FsView.hh"
#include "mgm/tgc/MultiSpaceTapeGc.hh"
#include "common/Constants.hh"
#include "common/Timing.hh"
#include "namespace/Prefetcher.hh"
#include "namespace/utils/Checksum.hh"
#include "namespace/interface/IView.hh"
#include "namespace/interface/IFileMDSvc.hh"
#include "namespace/ns_quarkdb/NamespaceGroup.hh"
#include "namespace/ns_quarkdb/flusher/MetadataFlusher.hh"
//------------------------------------------------------------------------------
// Utility functions to help with file conversion
//------------------------------------------------------------------------------
namespace
{
//------------------------------------------------------------------------------
// Generate default MGM URL
//------------------------------------------------------------------------------
XrdCl::URL NewUrl()
{
XrdCl::URL url;
url.SetProtocol("root");
url.SetUserName("root");
url.SetHostPort(gOFS->MgmOfsAlias.c_str(), gOFS->ManagerPort);
return url;
}
//------------------------------------------------------------------------------
// Generate default TPC properties
//------------------------------------------------------------------------------
XrdCl::PropertyList TpcProperties(uint64_t size)
{
using eos::common::FileId;
XrdCl::PropertyList properties;
properties.Set("force", true);
properties.Set("posc", false);
properties.Set("coerce", false);
properties.Set("sourceLimit", (uint16_t) 1);
properties.Set("chunkSize", (uint32_t)(4 * 1024 * 1024));
properties.Set("parallelChunks", (uint16_t) 1);
properties.Set("tpcTimeout", FileId::EstimateTpcTimeout(size).count());
if (size) {
properties.Set("thirdParty", "only");
}
return properties;
}
//----------------------------------------------------------------------------
//! Thrown if an EOS file system cannot determined
//----------------------------------------------------------------------------
struct FileSystemNotFound: public std::runtime_error {
using std::runtime_error::runtime_error;
};
//----------------------------------------------------------------------------
//! @return ID of disk file system as opposed to tape file system of specified
//! EOS file
//! @throw FileSystemNotFound if the disk location could not be determined
//----------------------------------------------------------------------------
eos::IFileMD::location_t
getDiskFsIdOfFile(eos::IFileMD& fmd)
{
const auto locations = fmd.getLocations();
if (locations.empty()) {
std::ostringstream msg;
msg << "Failed to find disk file system for fxid=" << std::hex <<
std::setfill('0') << std::setw(8) << fmd.getId()
<< ": The file has no locations";
throw FileSystemNotFound(msg.str());
}
if (EOS_TAPE_FSID != locations.at(0)) {
return locations.at(0);
}
if (2 > locations.size()) {
std::ostringstream msg;
msg << "Failed to find disk file system for fxid=" << std::hex <<
std::setfill('0') << std::setw(8) << fmd.getId()
<< ": The file only has a tape location";
throw FileSystemNotFound(msg.str());
}
return locations.at(1);
}
}
EOSMGMNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
ConversionJob::ConversionJob(const eos::IFileMD::id_t fid,
const ConversionInfo& conversion_info) :
mFid(fid), mConversionInfo(conversion_info), mStatus(Status::PENDING)
{
mConversionPath =
SSTR(gOFS->MgmProcConversionPath << "/" << mConversionInfo.ToString());
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
ConversionJob::~ConversionJob()
{
}
//------------------------------------------------------------------------------
// Execute a third-party copy
//------------------------------------------------------------------------------
void ConversionJob::DoIt() noexcept
{
using eos::common::FileId;
using eos::common::LayoutId;
std::string source_xs;
std::string source_xs_postconversion;
bool overwrite_checksum;
uint64_t source_size;
eos::IFileMD::LocationVector src_locations;
eos::IFileMD::LocationVector src_unlink_loc;
gOFS->MgmStats.Add("ConversionJobStarted", 0, 0, 1);
eos_static_debug("msg=\"starting conversion job\" conversion_id=%s",
mConversionInfo.ToString().c_str());
// Avoid running cancelled jobs
if (mProgressHandler.ShouldCancel(0)) {
HandleError("conversion job cancelled before start");
return;
}
mStatus.store(Status::RUNNING, std::memory_order_relaxed);
// Retrieve file metadata
try {
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
auto fmd = gOFS->eosFileService->getFileMD(mConversionInfo.mFid);
mSourcePath = gOFS->eosView->getUri(fmd.get());
source_size = fmd->getSize();
src_locations = fmd->getLocations();
src_unlink_loc = fmd->getUnlinkedLocations();
eos::appendChecksumOnStringAsHex(fmd.get(), source_xs);
// Check if conversion requests a checksum rewrite
std::string file_checksum = LayoutId::GetChecksumString(fmd->getLayoutId());
std::string conversion_checksum =
LayoutId::GetChecksumString(mConversionInfo.mLid);
overwrite_checksum = (file_checksum != conversion_checksum);
} catch (eos::MDException& e) {
HandleError("failed to retrieve file metadata",
SSTR("fxid=" << FileId::Fid2Hex(mConversionInfo.mFid)
<< " ec=" << e.getErrno()
<< " emsg=\"" << e.getMessage().str() << "\""));
return;
}
const std::string& app_tag = mConversionInfo.mAppTag.empty() ? EOS_APP_NAME :
mConversionInfo.mAppTag;
// Construct destination CGI
std::ostringstream dst_cgi;
dst_cgi << "&eos.ruid=" << DAEMONUID << "&eos.rgid=" << DAEMONGID
<< "&" << ConversionCGI(mConversionInfo)
<< "&eos.app=" << app_tag
<< "&eos.targetsize=" << source_size;
if (source_xs.size() && !overwrite_checksum) {
dst_cgi << "&eos.checksum=" << source_xs;
}
// Add the list of file systems to exclude for the new entry
std::string exclude_fsids = "&eos.excludefsid=";
for (const auto& fsid : src_locations) {
exclude_fsids += std::to_string(fsid);
exclude_fsids += ",";
}
for (const auto& fsid : src_unlink_loc) {
exclude_fsids += std::to_string(fsid);
exclude_fsids += ",";
}
if (*exclude_fsids.rbegin() == ',') {
exclude_fsids.pop_back();
}
dst_cgi << exclude_fsids;
// Prepare the TPC job
XrdCl::URL url_src = NewUrl();
std::string url_params = "eos.ruid=0&eos.rgid=0&eos.app=" + app_tag;
url_src.SetParams(url_params);
url_src.SetPath(mSourcePath);
XrdCl::URL url_dst = NewUrl();
url_dst.SetParams(dst_cgi.str());
url_dst.SetPath(mConversionPath);
eos::common::XrdConnIdHelper src_id_helper(gOFS->mXrdConnPool, url_src);
eos::common::XrdConnIdHelper dst_id_helper(gOFS->mXrdConnPool, url_dst);
XrdCl::PropertyList properties = TpcProperties(source_size);
properties.Set("source", url_src);
properties.Set("target", url_dst);
// Create the TPC job
XrdCl::PropertyList result;
XrdCl::CopyProcess copy;
copy.AddJob(properties, &result);
XrdCl::XRootDStatus prepare_status = copy.Prepare();
eos_static_info("[tpc]: %s@%s => %s@%s prepare_msg=%s",
url_src.GetHostId().c_str(), url_src.GetLocation().c_str(),
url_dst.GetHostId().c_str(), url_dst.GetLocation().c_str(),
prepare_status.ToStr().c_str());
// Check the TPC prepare status
if (!prepare_status.IsOK()) {
HandleError("prepare conversion failed");
return;
}
// Trigger the TPC job
XrdCl::XRootDStatus tpc_status = copy.Run(&mProgressHandler);
if (!tpc_status.IsOK()) {
HandleError(tpc_status.ToStr(),
SSTR("tpc_src=" << url_src.GetLocation()
<< " tpc_dst=" << url_dst.GetLocation()));
return;
}
eos_static_info("[tpc]: %s => %s status=success tpc_msg=%s",
url_src.GetLocation().c_str(), url_dst.GetLocation().c_str(),
tpc_status.ToStr().c_str());
// TPC job succeeded:
// - Verify new file has all fragments according to layout
// - Verify initial file hasn't changed
// - Merge the conversion entry
// Verify new file has all fragments according to layout
try {
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
auto fmd = gOFS->eosView->getFile(mConversionPath);
size_t expected = LayoutId::GetStripeNumber(mConversionInfo.mLid) + 1;
size_t actual = fmd->getNumLocation();
if (expected != actual) {
HandleError("converted file replica number mismatch",
SSTR("expected=" << expected << " actual=" << actual));
return;
}
} catch (eos::MDException& e) {
HandleError("failed to retrieve converted file metadata",
SSTR("path=" << mConversionPath << " ec=" << e.getErrno()
<< " emsg=\"" << e.getMessage().str() << "\""));
return;
}
// Verify initial file hasn't changed
try {
eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, mConversionInfo.mFid);
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
auto fmd = gOFS->eosFileService->getFileMD(mConversionInfo.mFid);
eos::appendChecksumOnStringAsHex(fmd.get(), source_xs_postconversion);
} catch (eos::MDException& e) {
eos_static_debug("msg=\"failed to retrieve file metadata\" fxid=%08llx "
"ec=%d emsg=\"%s\" conversion_id=%s", mConversionInfo.mFid,
e.getErrno(), e.getMessage().str().c_str(),
mConversionInfo.ToString().c_str());
}
if (source_xs != source_xs_postconversion) {
HandleError("file checksum changed during conversion",
SSTR("fxid=" << FileId::Fid2Hex(mConversionInfo.mFid)
<< " initial_xs=" << source_xs << " final_xs="
<< source_xs_postconversion));
return;
}
XrdOucErrInfo error;
eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root();
// Merge the conversion entry
if (!Merge()) {
HandleError("failed to merge conversion entry",
SSTR("path=" << mSourcePath << " converted_path="
<< mConversionPath));
return;
}
// Finalize QoS transition
XrdOucString target_qos;
XrdOucString current_qos;
if (gOFS->_qos_get(mSourcePath.c_str(), error, rootvid,
"target_qos", target_qos)) {
HandleError("error retrieving target_qos", SSTR("path=" << mSourcePath
<< " emsg=\"" << error.getErrText() << "\""));
return;
}
if (target_qos != "null") {
if (gOFS->_qos_get(mSourcePath.c_str(), error, rootvid,
"current_qos", current_qos)) {
HandleError("error retrieving current_qos", SSTR("path=" << mSourcePath
<< " emsg=\"" << error.getErrText() << "\""));
return;
}
if (target_qos == current_qos) {
if (gOFS->_attr_rem(mSourcePath.c_str(), error, rootvid,
(const char*) 0, "user.eos.qos.target")) {
HandleError("error removing target_qos", SSTR("path=" << mSourcePath
<< " emsg=\"" << error.getErrText() << "\""));
return;
}
}
}
gOFS->MgmStats.Add("ConversionJobSuccessful", 0, 0, 1);
eos_static_info("msg=\"conversion successful\" conversion_id=%s",
mConversionInfo.ToString().c_str());
mStatus.store(Status::DONE, std::memory_order_relaxed);
// Notify the tape garbage collector if tape support is enabled
if (gOFS->mTapeEnabled) {
try {
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
const auto fmd = gOFS->eosView->getFile(mSourcePath);
if (nullptr != fmd && fmd->hasAttribute("sys.archive.file_id")) {
const auto fsId = getDiskFsIdOfFile(*fmd);
const std::string tgcSpace = FsView::gFsView.mIdView.lookupSpaceByID(fsId);
gOFS->mTapeGc->fileConverted(tgcSpace, fmd->getId());
}
} catch (...) {
// Ignore any garbage collection exceptions
}
}
return;
}
//------------------------------------------------------------------------------
// Log the error message, store it and set the job as failed
//------------------------------------------------------------------------------
void ConversionJob::HandleError(const std::string& emsg,
const std::string& details)
{
using std::chrono::system_clock;
using eos::common::Timing;
gOFS->MgmStats.Add("ConversionJobFailed", 0, 0, 1);
eos_static_err("msg=\"%s\" %s conversion_id=%s", emsg.c_str(), details.c_str(),
mConversionInfo.ToString().c_str());
const std::time_t now = system_clock::to_time_t(system_clock::now());
const std::string timestamp = Timing::UnixTimestamp_to_ISO8601(now);
mErrorString = timestamp + " | " + emsg;
if (!details.empty()) {
mErrorString += " | " + details;
}
mStatus.store(Status::FAILED, std::memory_order_relaxed);
}
//------------------------------------------------------------------------------
// Construct CGI from conversion info
//------------------------------------------------------------------------------
std::string ConversionJob::ConversionCGI(const ConversionInfo& info) const
{
using eos::common::LayoutId;
std::ostringstream cgi;
cgi << "eos.layout.type=" << LayoutId::GetLayoutTypeString(info.mLid)
<< "&eos.layout.nstripes=" << LayoutId::GetStripeNumberString(info.mLid)
<< "&eos.layout.blockchecksum=" << LayoutId::GetBlockChecksumString(info.mLid)
<< "&eos.layout.checksum=" << LayoutId::GetChecksumString(info.mLid)
<< "&eos.layout.blocksize=" << LayoutId::GetBlockSizeString(info.mLid)
<< "&eos.space=" << info.mLocation.getSpace();
// Apend scheduling group only if present explicitly
if (info.mLocation.getSpace() != info.mLocation.getGroup()) {
cgi << "&eos.group=" << info.mLocation.getIndex();
}
if (!info.mPlctPolicy.empty()) {
cgi << "&eos.placementpolicy=" << info.mPlctPolicy;
}
return cgi.str();
}
//------------------------------------------------------------------------------
// Merge original and the newly converted one so that the initial file
// identifier and all the rest of the metadata information is preserved.
// Steps for a successful conversion
// 1. Update the new locations for original fid
// 2. Trigger FST rename of the physical files from conv_fid to fid
// 3. Unlink the locations for original fid
// 4. Update the layout information for original fid
// 5. Remove the conv_fid and FST local info
// 6. Trigger an MGM resync for the new location of fid
//------------------------------------------------------------------------------
bool
ConversionJob::Merge()
{
std::list conv_locations;
eos::IFileMD::id_t orig_fid {0ull}, conv_fid {0ull};
std::shared_ptr orig_fmd, conv_fmd;
bool has_quota = Quota::RemoveFile(mFid);
{
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
try {
orig_fmd = gOFS->eosFileService->getFileMD(mFid);
conv_fmd = gOFS->eosView->getFile(mConversionPath);
} catch (const eos::MDException& e) {
eos_static_err("msg=\"failed to retrieve file metadata\" msg=\"%s\"",
e.what());
if (has_quota) {
ns_rd_lock.Release();
Quota::AddFile(mFid);
}
return false;
}
orig_fid = orig_fmd->getId();
conv_fid = conv_fmd->getId();
// Add the new locations
for (const auto& loc : conv_fmd->getLocations()) {
orig_fmd->addLocation(loc);
conv_locations.push_back(loc);
}
gOFS->eosView->updateFileStore(orig_fmd.get());
}
// For each location get the FST information and trigger a physical file
// rename from the conv_fmd(fid) to the orig_fmd(fid)
bool failed_rename = false;
std::string fst_host;
int fst_port;
for (const auto& loc : conv_locations) {
{
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
FileSystem* fs = FsView::gFsView.mIdView.lookupByID(loc);
if ((fs == nullptr) ||
(fs->GetStatus() != eos::common::BootStatus::kBooted) ||
(fs->GetConfigStatus() != eos::common::ConfigStatus::kRW)) {
eos_static_err("msg=\"file system config cannot accept conversion\" "
"fsid=%u", loc);
failed_rename = true;
break;
}
fst_host = fs->GetHost();
fst_port = fs->getCoreParams().getLocator().getPort();
}
std::ostringstream oss;
oss << "root://" << fst_host << ":" << fst_port << "/?xrd.wantprot=sss";
XrdCl::URL url(oss.str());
if (!url.IsValid()) {
eos_static_err("msg=\"invalid FST url\" url=\"%s\"", oss.str().c_str());
failed_rename = true;
break;
}
oss.str("");
// Build up the actual query string
oss << "/?fst.pcmd=local_rename"
<< "&fst.rename.ofid=" << eos::common::FileId::Fid2Hex(conv_fid)
<< "&fst.rename.nfid=" << eos::common::FileId::Fid2Hex(orig_fid)
<< "&fst.rename.fsid=" << loc
<< "&fst.nspath=" << mSourcePath;
uint16_t timeout = 10;
XrdCl::Buffer arg;
XrdCl::Buffer* response {nullptr};
XrdCl::FileSystem fs {url};
arg.FromString(oss.str());
XrdCl::XRootDStatus status = fs.Query(XrdCl::QueryCode::OpaqueFile, arg,
response, timeout);
if (!status.IsOK() || (response->ToString() != "OK")) {
eos_static_err("msg=\"failed local rename on file system\" "
"orig_fxid=%08llx conv_fxid=%08llx fsid=%u "
"status=%d err_msg=\"%s\" response=\"%s\"",
orig_fid, conv_fid, loc, status.IsOK(),
status.GetErrorMessage().c_str(),
response->ToString().c_str());
failed_rename = true;
delete response;
break;
}
delete response;
eos_static_debug("msg=\"successful rename on file system\" orig_fxid=%08llx "
"conv_fxid=%08llx fsid=%u", orig_fid, conv_fid, loc);
}
// Do cleanup in case of failures
if (failed_rename) {
// Update locations and clean up conversion file object
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
try {
orig_fmd = gOFS->eosFileService->getFileMD(orig_fid);
} catch (const eos::MDException& e) {
eos_static_err("msg=\"failed to retrieve file metadata\" msg=\"%s\" "
"orig_fxid=%08llx", e.what(), orig_fid);
return false;
}
// Unlink all the newly added locations
for (const auto& loc : orig_fmd->getLocations()) {
if (std::find(conv_locations.begin(), conv_locations.end(), loc) !=
conv_locations.end()) {
orig_fmd->unlinkLocation(loc);
}
}
gOFS->eosView->updateFileStore(orig_fmd.get());
if (has_quota) {
ns_rd_lock.Release();
Quota::AddFile(mFid);
}
return false;
}
{
// Update locations and clean up conversion file object
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
try {
orig_fmd = gOFS->eosFileService->getFileMD(orig_fid);
conv_fmd = gOFS->eosFileService->getFileMD(conv_fid);
} catch (const eos::MDException& e) {
eos_static_err("msg=\"failed to retrieve file metadata\" msg=\"%s\"",
e.what());
return false;
}
// Unlink the old locations from the original file object
for (const auto& loc : orig_fmd->getLocations()) {
if (loc == eos::common::TAPE_FS_ID) {
continue;
}
if (std::find(conv_locations.begin(), conv_locations.end(), loc) ==
conv_locations.end()) {
orig_fmd->unlinkLocation(loc);
}
}
// Update the new layout id
orig_fmd->setLayoutId(mConversionInfo.mLid);
// If requested then also update the ctime of the original file
if (mConversionInfo.mUpdateCtime) {
orig_fmd->setCTimeNow();
}
gOFS->eosView->updateFileStore(orig_fmd.get());
}
// Update quota node given the new possible layout
if (has_quota) {
Quota::AddFile(mFid);
}
// Trigger a resync of the local information for the new locations
for (const auto& loc : conv_locations) {
if (gOFS->QueryResync(orig_fid, loc, true)) {
eos_static_err("msg=\"failed to send resync\" fxid=%08llx fsid=%u",
orig_fid, loc);
}
}
return true;
}
EOSMGMNAMESPACE_END