// ----------------------------------------------------------------------
// File: Commit.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2018 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "common/Logging.hh"
#include "common/LayoutId.hh"
#include "namespace/interface/IFileMD.hh"
#include "namespace/interface/IFileMDSvc.hh"
#include "namespace/interface/IView.hh"
#include "mgm/Stat.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/Macros.hh"
#include "mgm/tracker/ReplicationTracker.hh"
#include "mgm/XrdMgmOfs/fsctl/CommitHelper.hh"
#include "namespace/Prefetcher.hh"
#include
#include
//----------------------------------------------------------------------------
// Commit a replica
//----------------------------------------------------------------------------
int
XrdMgmOfs::Commit(const char* path,
const char* ininfo,
XrdOucEnv& env,
XrdOucErrInfo& error,
eos::common::VirtualIdentity& vid,
const XrdSecEntity* client)
{
static const char* epname = "Commit";
REQUIRE_SSS_OR_LOCAL_AUTH;
ACCESSMODE_W;
MAYSTALL;
MAYREDIRECT;
EXEC_TIMING_BEGIN("Commit");
// Checksum string
char binchecksum[SHA256_DIGEST_LENGTH];
// Process CGI parameters
CommitHelper::cgi_t cgi;
CommitHelper::grab_cgi(env, cgi);
// Initialize logging
if (cgi.count("logid")) {
tlLogId.SetLogId(cgi["logid"].c_str(), error.getErrUser());
}
// OC parameters
CommitHelper::param_t params;
params["oc_n"] = 0;
params["oc_max"] = 0;
// Selected options
CommitHelper::option_t option;
CommitHelper::set_options(option, cgi);
// Check 'path' parameter
CommitHelper::path_t paths;
paths["atomic"] = std::string("");
if (cgi.count("path")) {
paths["commit"] = cgi["path"];
}
// Extract all OC upload relevant parameters
CommitHelper::init_oc(env, cgi, option, params);
if (CommitHelper::is_reconstruction(option)) {
// Remove checksum in case of a chunk reconstruction
// (they have to be ignored)
cgi["checksum"] = "";
}
if (cgi["checksum"].length()) {
// Compute binary checksum
CommitHelper::hex2bin_checksum(cgi["checksum"], binchecksum);
}
// Check all commit required parameters are defined
if (CommitHelper::check_commit_params(cgi)) {
// Convert the main CGI parameters into numbers
unsigned long long size = std::stoull(cgi["size"]);
unsigned long long fid = strtoull(cgi["fid"].c_str(), 0, 16);
unsigned long fsid = std::stoul(cgi["fsid"]);
unsigned long mtime = std::stoul(cgi["mtime"]);
unsigned long mtimens = std::stoul(cgi["mtimensec"]);
std::string emsg;
CommitHelper::log_info(vid, tlLogId, cgi, option, params);
int rc = CommitHelper::check_filesystem(vid, fsid, cgi, option,
params, emsg);
if (rc) {
return Emsg(epname, error, rc, emsg.c_str(), "");
}
// Create a checksum buffer object
eos::Buffer checksumbuffer;
checksumbuffer.putData(binchecksum, SHA256_DIGEST_LENGTH);
// Attempt file meta data retrieval
std::shared_ptr fmd;
eos::IContainerMD::id_t cid = 0;
std::string fmdname;
{
eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, fid);
// Keep the lock order View => Namespace => Quota
eos::common::RWMutexWriteLock ns_wr_lock(gOFS->eosViewRWMutex);
errno = 0;
try {
fmd = gOFS->eosFileService->getFileMD(fid);
} catch (eos::MDException& e) {
errno = e.getErrno();
eos_thread_debug("msg=\"exception\" ec=%d emsg=\"%s\"",
e.getErrno(), e.getMessage().str().c_str());
emsg = "retc=";
emsg += e.getErrno();
emsg += " msg=";
emsg += e.getMessage().str().c_str();
}
if (!fmd) {
if (errno == ENOENT) {
return Emsg(epname, error, ENOENT,
"commit filesize change - file is already removed [EIDRM]", "");
}
emsg.insert(0, "commit filesize change [EIO]");
return Emsg(epname, error, errno, emsg.c_str(), cgi["path"].c_str());
}
unsigned long lid = fmd->getLayoutId();
// Check if fsid and fid are ok
if (fmd->getId() != fid) {
eos_thread_notice("commit for fxid=%08llx != fmd_fxid=%08llx",
fid, fmd->getId());
gOFS->MgmStats.Add("CommitFailedFid", 0, 0, 1);
return Emsg(epname, error, EINVAL,
"commit filesize change - file id is wrong [EINVAL]",
cgi["path"].c_str());
}
// Check if file is already unlinked from the visible namespace
if (!(cid = fmd->getContainerId())) {
eos_thread_debug("commit for fxid=%08llx but file is disconnected "
"from any container", fmd->getId());
gOFS->MgmStats.Add("CommitFailedUnlinked", 0, 0, 1);
return Emsg(epname, error, EIDRM,
"commit filesize change - file is already removed [EIDRM]", "");
}
// Check if we have this replica in the unlink list or not linked list, if yes, the commit has to be suppressed
if (option["fusex"] &&
(fmd->hasUnlinkedLocation((unsigned int) fsid) ||
(!fmd->hasLocation((unsigned int) fsid)))) {
eos_thread_err("suppressing possible recovery replica for fxid=%08llx "
"on unlinked/not linked fsid=%llu - rejecting replica",
fmd->getId(), fsid);
// This happens when a FUSEX recovery has been triggered.
// To avoid to reattach replicas, we clean them up here
return Emsg(epname, error, EBADE,
"commit replica - file size is wrong [EBADE] "
"- suppressing recovery replica", "");
}
// Check if commit comes from a replication procedure
// and if the size/checksum is ok
if (option["replication"]) {
CommitHelper::remove_scheduler(fid);
if (eos::common::LayoutId::GetLayoutType(lid) ==
eos::common::LayoutId::kReplica) {
// We check filesize and the checksum only for replica layouts
eos_thread_debug("fmd_size=%llu, size=%lli", fmd->getSize(), size);
// Validate size parameters
if (!CommitHelper::validate_size(vid, fmd, fsid, size, option)) {
return Emsg(epname, error, EBADE,
"commit replica - file size is wrong [EBADE]", "");
}
// Validate checksum parameters
if (option["verifychecksum"] &&
!CommitHelper::validate_checksum(vid, fmd, checksumbuffer,
fsid, option)) {
return Emsg(epname, error, EBADR,
"commit replica - file checksum is wrong [EBADR]", "");
}
}
}
if (option["verifysize"]) {
// Check if a file size change was detected
if (fmd->getSize() != size) {
eos_thread_err("commit for fxid=%08llx gave a file size change after "
"verification on fsid=%llu", fmd->getId(), fsid);
}
}
if (option["verifychecksum"]) {
CommitHelper::log_verifychecksum(vid, fmd, checksumbuffer, fsid,
cgi, option);
}
if (!CommitHelper::handle_location(vid, cid, fmd, fsid, size,
cgi, option)) {
return Emsg(epname, error, EIDRM,
"commit file, parent container removed [EIDRM]", "");
}
if (option["fusex"]) {
std::string fusexstate;
try {
fusexstate = fmd->getAttribute("sys.fusex.state");
} catch (...) {}
if (option["update"] || option["replication"]) {
fusexstate += "+";
fusexstate += std::to_string(fsid);
}
if (eos::common::LayoutId::GetChecksum(lid) != eos::common::LayoutId::kNone) {
if (option["commitsize"]) {
fusexstate += "s|";
}
} else {
if (option["commitsize"]) {
fusexstate += "s";
}
}
if (option["commitchecksum"]) {
fusexstate += "c|";
}
if (option["verifychecksum"]) {
fusexstate += "v|";
}
if (option["verifysize"]) {
fusexstate += "V|";
}
fmd->setAttribute("sys.fusex.state", fusexstate);
}
// Advance oc upload parameters if concerned
CommitHelper::handle_occhunk(vid, fmd, option, params);
// Set checksum if concerned
CommitHelper::handle_checksum(vid, fmd, option, checksumbuffer);
fmdname = fmd->getName();
paths["atomic"].Init(fmdname.c_str());
paths["atomic"].DecodeAtomicPath(option["versioning"]);
option["atomic"] = (paths["atomic"].GetName() != fmdname);
if (option["commitverify"]) {
// disable atomic and versioning functionality for commits originated by "verify --commitxyz"
option["atomic"] = false;
option["versioning"] = false;
}
if (option["update"] && mtime) {
// Update the modification time only if the file contents changed and
// mtime != 0
// - FUSE clients will commit mtime=0 to indicate they call utimes anyway
// - OC clients set the mtime during a commit
if (!option["atomic"] || option["occhunk"]) {
eos::IFileMD::ctime_t mt;
mt.tv_sec = mtime;
mt.tv_nsec = mtimens;
fmd->setMTime(mt);
}
}
eos_thread_debug("commit: setting size to %llu", fmd->getSize());
eos::ContainerIdentifier p_ident;
if (!CommitHelper::commit_fmd(vid, cid, fmd, size, option, emsg, p_ident)) {
return Emsg(epname, error, errno, "commit filesize change", emsg.c_str());
}
eos::FileIdentifier f_ident = fmd->getIdentifier();
eos::ContainerIdentifier c_ident = eos::ContainerIdentifier(
fmd->getContainerId());
ns_wr_lock.Release();
if (option["update"]) {
// broadcast file md
gOFS->FuseXCastRefresh(f_ident,
c_ident);
// Broadcast to the fusex network only if the change has been
// triggered outside the fusex client network e.g. xrdcp etc.
if (!option["fusex"]) {
gOFS->FuseXCastRefresh(c_ident, p_ident);
}
}
}
{
eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root();
// Path of a previous version existing before an atomic/versioning upload
std::string delete_path = "";
eos_thread_info("commitsize=%d n1=%s n2=%s occhunk=%d ocdone=%d",
option["commitsize"],
fmdname.c_str(), paths["atomic"].GetName(),
option["occhunk"], option["ocdone"]);
// -----------------------------------------------------------------------
// We are asked to commit the size and this commit changes the current
// atomic name to the final name and we are not an OC upload
// -----------------------------------------------------------------------
if ((option["commitsize"]) && (fmdname != paths["atomic"].GetName()) &&
(!option["occhunk"] || option["ocdone"])) {
eos_thread_info("commit: de-atomize file %s => %s",
fmdname.c_str(), paths["atomic"].GetName());
unsigned long long vfid =
CommitHelper::get_version_fid(vid, fid, paths, option);
// Check for versioning request
if (option["versioning"]) {
eos_static_info("checked %s%s vfxid=%08llx",
paths["versiondir"].GetParentPath(),
paths["atomic"].GetPath(), vfid);
// We purged the versions before during open, so we just simulate
// a new one and do the final rename in a transaction
if (vfid) {
XrdOucString versionedname = "";
if (gOFS->Version(vfid, error, rootvid, 0xffff, &versionedname, true)) {
eos_static_crit("versioning failed %s/%s vfxid=%08lxx",
paths["versiondir"].GetParentPath(),
paths["atomic"].GetPath(), vfid);
const char* errmsg = "commit - versioning failed";
return Emsg(epname, error, EREMCHG, errmsg, paths["atomic"].GetName());
} else {
paths["version"].Init(versionedname.c_str());
}
}
}
CommitHelper::handle_versioning(vid, fid, paths,
option, delete_path);
}
gOFS->mReplicationTracker->Commit(fmd);
// -----------------------------------------------------------------------
// If there was a previous target file we have to delete the renamed
// atomic left-over
// -----------------------------------------------------------------------
if (delete_path.length()) {
delete_path.insert(0, paths["versiondir"].GetParentPath());
eos_thread_info("msg=\"delete path\" path=%s", delete_path.c_str());
if (gOFS->_rem(delete_path.c_str(), error, rootvid, "")) {
eos_thread_err("msg=\"failed to remove atomic left-over\" path=%s",
delete_path.c_str());
}
}
if (option["abort"]) {
return Emsg(epname, error, EREMCHG, "commit replica - overlapping "
"atomic upload - discarding atomic upload [EREMCHG]", "");
}
}
} else {
int envlen = 0;
eos_thread_err("commit message does not contain all meta information: %s",
env.Env(envlen));
gOFS->MgmStats.Add("CommitFailedParameters", 0, 0, 1);
const char* errtarget = "unknown";
const char* errmsg =
"commit filesize change - size, fid, fsid, mtime, path not complete";
if (cgi.count("path")) {
errmsg = "commit filesize change - size, fid, fsid, mtime not complete";
errtarget = cgi["path"].c_str();
}
return Emsg(epname, error, EINVAL, errmsg, errtarget);
}
gOFS->MgmStats.Add("Commit", 0, 0, 1);
const char* ok = "OK";
error.setErrInfo(strlen(ok) + 1, ok);
EXEC_TIMING_END("Commit");
return SFS_DATA;
}