// ---------------------------------------------------------------------- // File: CommitHelper.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2018 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mgm/XrdMgmOfs/fsctl/CommitHelper.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/FsView.hh" #include "mgm/Stat.hh" #include "mgm/FuseServer/FusexCastBatch.hh" #include "common/http/OwnCloud.hh" #include "common/LayoutId.hh" #include "namespace/interface/IQuota.hh" #include "namespace/interface/IView.hh" #include "namespace/Prefetcher.hh" EOSMGMNAMESPACE_BEGIN // Initialize static variables thread_local eos::common::LogId CommitHelper::tlLogId; //------------------------------------------------------------------------------ // convert hex to binary checksum //------------------------------------------------------------------------------ void CommitHelper::hex2bin_checksum(std::string& checksum, char* binchecksum) { // hex2binary conversion memset(binchecksum, 0, 32); for (unsigned int i = 0; i < checksum.length(); i += 2) { char hex[3]; hex[0] = checksum.at(i); hex[1] = checksum.at(i + 1); hex[2] = 0; binchecksum[i / 2] = strtol(hex, 0, 16); } } //------------------------------------------------------------------------------ // check if the filesystem to commit to is writable state //------------------------------------------------------------------------------ int CommitHelper::check_filesystem(eos::common::VirtualIdentity& vid, unsigned long fsid, CommitHelper::cgi_t& cgi, CommitHelper::option_t& option, CommitHelper::param_t& params, std::string& emsg) { // Check that the file system is still allowed to accept replica's eos::common::RWMutexReadLock vlock(FsView::gFsView.ViewMutex); eos::mgm::FileSystem* fs = FsView::gFsView.mIdView.lookupByID(fsid); if ((!fs) || (fs->GetConfigStatus() < eos::common::ConfigStatus::kDrain)) { eos_thread_err("msg=\"commit suppressed\" configstatus=%s subcmd=commit " "path=%s size=%s fxid=%s fsid=%s dropfsid=%s checksum=%s" " mtime=%s mtime.nsec=%s oc-chunk=%d oc-n=%d oc-max=%d " "oc-uuid=%s", (fs ? eos::common::FileSystem::GetConfigStatusAsString( fs->GetConfigStatus()) : "deleted"), cgi["path"].c_str(), cgi["size"].c_str(), cgi["fid"].c_str(), cgi["fsid"].c_str(), cgi["dropfsid"].c_str(), cgi["checksum"].c_str(), cgi["mtime"].c_str(), cgi["mtimensec"].c_str(), option["occhunk"], params["oc_n"], params["oc_max"], cgi["oc_uuid"].c_str()); emsg = "commit file metadata - " "filesystem is in non-operational state [EIO]"; return EIO; } return 0; } //------------------------------------------------------------------------------ // extract all CGI KV pairs //------------------------------------------------------------------------------ void CommitHelper::grab_cgi(XrdOucEnv& env, CommitHelper::cgi_t& cgi) { if (env.Get("mgm.size")) { cgi["size"] = env.Get("mgm.size"); } if (env.Get("mgm.path")) { cgi["path"] = env.Get("mgm.path"); } if (env.Get("mgm.fid")) { cgi["fid"] = env.Get("mgm.fid"); } if (env.Get("mgm.add.fsid")) { cgi["fsid"] = env.Get("mgm.add.fsid"); } if (env.Get("mgm.mtime")) { cgi["mtime"] = env.Get("mgm.mtime"); } if (env.Get("mgm.mtime_ns")) { cgi["mtimensec"] = env.Get("mgm.mtime_ns"); } if (env.Get("mgm.logid")) { cgi["logid"] = env.Get("mgm.logid"); } if (env.Get("mgm.verify.checksum")) { cgi["verifychecksum"] = env.Get("mgm.verify.checksum"); } if (env.Get("mgm.commit.checksum")) { cgi["commitchecksum"] = env.Get("mgm.commit.checksum"); } if (env.Get("mgm.commit.verify")) { cgi["commitverify"] = env.Get("mgm.commit.verify"); } if (env.Get("mgm.verify.size")) { cgi["verifysize"] = env.Get("mgm.verify.size"); } if (env.Get("mgm.commit.size")) { cgi["commitsize"] = env.Get("mgm.commit.size"); } if (env.Get("mgm.drop.fsid")) { cgi["dropfsid"] = env.Get("mgm.drop.fsid"); } if (env.Get("mgm.replication")) { cgi["replication"] = env.Get("mgm.replication"); } if (env.Get("mgm.reconstruction")) { cgi["reconstruction"] = env.Get("mgm.reconstruction"); } if (env.Get("mgm.modified")) { cgi["ismodified"] = env.Get("mgm.modified"); } if (env.Get("mgm.fusex")) { cgi["fusex"] = env.Get("mgm.fusex"); } if (env.Get("mgm.checksum")) { cgi["checksum"] = env.Get("mgm.checksum"); } } //------------------------------------------------------------------------------ // log commit information //------------------------------------------------------------------------------ void CommitHelper::log_info(eos::common::VirtualIdentity& vid, const eos::common::LogId& thread_logid, CommitHelper::cgi_t& cgi, CommitHelper::option_t& option, CommitHelper::param_t& params) { // Set the thread local variable that will be used when calling eos_thread_* tlLogId = thread_logid; if (cgi["checksum"].length()) { eos_thread_info("subcmd=commit path=%s size=%s fxid=%s fsid=%s dropfsid=%s " "checksum=%s mtime=%s mtime.nsec=%s oc-chunk=%d oc-n=%d " "oc-max=%d oc-uuid=%s", cgi["path"].c_str(), cgi["size"].c_str(), cgi["fid"].c_str(), cgi["fsid"].c_str(), cgi["dropfsid"].c_str(), cgi["checksum"].c_str(), cgi["mtime"].c_str(), cgi["mtimensec"].c_str(), option["occhunk"], params["oc_n"], params["oc_max"], cgi["ocuuid"].c_str()); } else { eos_thread_info("subcmd=commit path=%s size=%s fxid=%s fsid=%s dropfsid=%s " "mtime=%s mtime.nsec=%s oc-chunk=%d oc-n=%d " "oc-max=%d oc-uuid=%s", cgi["path"].c_str(), cgi["size"].c_str(), cgi["fid"].c_str(), cgi["fsid"].c_str(), cgi["dropfsid"].c_str(), cgi["mtime"].c_str(), cgi["mtimensec"].c_str(), option["occhunk"], params["oc_n"], params["oc_max"], cgi["ocuuid"].c_str()); } } //------------------------------------------------------------------------------ // extract options from CGI //------------------------------------------------------------------------------ void CommitHelper::set_options(CommitHelper::option_t& option, CommitHelper::cgi_t& cgi) { option["verifychecksum"] = (cgi["verifychecksum"] == "1"); option["commitchecksum"] = (cgi["commitchecksum"] == "1"); option["commitsize"] = (cgi["commitsize"] == "1"); option["commitverify"] = (cgi["commitverify"] == "1"); option["verifysize"] = (cgi["verifysize"] == "1"); option["replication"] = (cgi["replication"] == "1"); option["reconstruction"] = (cgi["reconstruction"] == "1"); option["modified"] = (cgi["ismodified"] == "1"); option["fusex"] = (cgi["fusex"] == "1"); option["abort"] = false; // indicate to abort a commit option["versioning"] = false; // indicate versioning option["atomic"] = false; // indicate an atomic upload option["occhunk"] = false; // indicate owncloud chunking option["ocdone"] = false; // indicate when the last chunk of a chunked OC upload has been committed } //------------------------------------------------------------------------------ // initialize OC commit parameters //------------------------------------------------------------------------------ void CommitHelper::init_oc(XrdOucEnv& env, CommitHelper::cgi_t& cgi, CommitHelper::option_t& option, CommitHelper::param_t& params) { int envlen; int oc_n = 0; int oc_max = 0; XrdOucString oc_uuid = ""; option["occhunk"] = eos::common::OwnCloud::GetChunkInfo(env.Env(envlen), oc_n, oc_max, oc_uuid); cgi["ocuuid"] = oc_uuid.c_str(); params["oc_n"] = oc_n; params["oc_max"] = oc_max; } //------------------------------------------------------------------------------ // check for a reconstruction commit //------------------------------------------------------------------------------ bool CommitHelper::is_reconstruction(CommitHelper::option_t& option) { if (option["reconstruction"]) { option["verifysize"] = false; option["verifychecksum"] = false; option["commitsize"] = false; option["commitchecksum"] = false; option["commitverify"] = false; option["replication"] = false; return true; } return false; } //------------------------------------------------------------------------------ // check proper commit parameter //------------------------------------------------------------------------------ bool CommitHelper::check_commit_params(CommitHelper::cgi_t& cgi) { if (cgi["size"].length() && cgi["fid"].length() && cgi["path"].length() && cgi["fsid"].length() && cgi["mtime"].length() && cgi["mtimensec"].length()) { return true; } else { return false; } } //------------------------------------------------------------------------------ // Remove fid from the trakced maps //------------------------------------------------------------------------------ void CommitHelper::remove_scheduler(unsigned long long fid) { // Remove tracked entry from balancing gOFS->mFidTracker.RemoveEntry(fid); } //------------------------------------------------------------------------------ // validate the given size information //------------------------------------------------------------------------------ bool CommitHelper::validate_size(eos::common::VirtualIdentity& vid, std::shared_ptr fmd, unsigned long fsid, unsigned long long size, CommitHelper::option_t& option) { if (fmd->getSize() != size) { eos_thread_err("replication for fxid=%08llx resulted in a different file " "size on fsid=%llu - %llu vs %llu - rejecting replica", fmd->getId(), fsid, fmd->getSize(), size); gOFS->MgmStats.Add("ReplicaFailedSize", 0, 0, 1); // ----------------------------------------------------------- // if we come via FUSE, we have to remove this replica // ----------------------------------------------------------- if (option["fusex"]) { if (fmd->hasLocation((unsigned short) fsid)) { fmd->unlinkLocation((unsigned short) fsid); fmd->removeLocation((unsigned short) fsid); try { gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& e) { errno = e.getErrno(); std::string errmsg = e.getMessage().str(); eos_thread_crit("msg=\"exception\" ec=%d emsg=\"%s\"\n", e.getErrno(), e.getMessage().str().c_str()); } } } return false; } return true; } //------------------------------------------------------------------------------ // validate the given checksum //------------------------------------------------------------------------------ bool CommitHelper::validate_checksum(eos::common::VirtualIdentity& vid, std::shared_ptr fmd, eos::Buffer& checksumbuffer, unsigned long long fsid, CommitHelper::option_t& option) { bool cxError = false; size_t cxlen = eos::common::LayoutId::GetChecksumLen(fmd->getLayoutId()); for (size_t i = 0; i < cxlen; i++) { if (fmd->getChecksum().getDataPadded(i) != checksumbuffer.getDataPadded(i)) { cxError = true; } } if (cxError) { eos_thread_err("replication for fxid=%08llx resulted in a different checksum " "on fsid=%llu - rejecting replica", fmd->getId(), fsid); gOFS->MgmStats.Add("ReplicaFailedChecksum", 0, 0, 1); // ----------------------------------------------------------- // if we don't come via FUSEX, we have to remove this replica // ----------------------------------------------------------- if (!option["fusex"]) { if (fmd->hasLocation((unsigned short) fsid)) { fmd->unlinkLocation((unsigned short) fsid); fmd->removeLocation((unsigned short) fsid); eos_thread_err("replication for fxid=%08llx resulted in a different checksum " "on fsid=%llu - dropping replica", fmd->getId(), fsid); try { gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& e) { errno = e.getErrno(); std::string errmsg = e.getMessage().str(); eos_thread_crit("msg=\"exception\" ec=%d emsg=\"%s\"\n", e.getErrno(), e.getMessage().str().c_str()); } } } return false; } return true; } //------------------------------------------------------------------------------ // log checksum verification //------------------------------------------------------------------------------ void CommitHelper::log_verifychecksum(eos::common::VirtualIdentity& vid, std::shared_ptrfmd, eos::Buffer& checksumbuffer, unsigned long fsid, CommitHelper::cgi_t& cgi, CommitHelper::option_t& option) { if (cgi["checksum"].length()) { if (option["verifychecksum"]) { bool cxError = false; size_t cxlen = eos::common::LayoutId::GetChecksumLen(fmd->getLayoutId()); for (size_t i = 0; i < cxlen; i++) { if (fmd->getChecksum().getDataPadded(i) != checksumbuffer.getDataPadded(i)) { cxError = true; } } if (cxError) { eos_thread_err("commit for fxid=%08llx gave a different checksum after " "verification on fsid=%llu", fmd->getId(), fsid); } } } } //------------------------------------------------------------------------------ // handle replica location updates //------------------------------------------------------------------------------ bool CommitHelper::handle_location(eos::common::VirtualIdentity& vid, unsigned long cid, std::shared_ptr fmd, unsigned long fsid, unsigned long long size, CommitHelper::cgi_t& cgi, CommitHelper::option_t& option) { // For changing the modification time we have to figure out if we // just attach a new replica or if we have a change of the contents std::shared_ptr dir; try { dir = gOFS->eosDirectoryService->getContainerMD(cid); } catch (eos::MDException& e) { eos_thread_err("parent_id=%llu not found", cid); gOFS->MgmStats.Add("CommitFailedUnlinked", 0, 0, 1); return false; } eos::IQuotaNode* ns_quota = nullptr; try { ns_quota = gOFS->eosView->getQuotaNode(dir.get()); } catch (const eos::MDException& e) { // empty } // Free previous quota if (ns_quota) { ns_quota->removeFile(fmd.get()); } std::string locations; try { locations = fmd->getAttribute("sys.fs.tracking"); } catch (...) {} if (!fmd->hasLocation(fsid)) { locations += "+"; locations += std::to_string(fsid); } fmd->addLocation(fsid); // If fsid is in the deletion list, we try to remove it if there // is something in the deletion list if (fmd->getNumUnlinkedLocation()) { fmd->removeLocation(fsid); } if (cgi["dropfsid"].length()) { std::vector fsid_tokens; eos::common::StringConversion::Tokenize(cgi["dropfsid"], fsid_tokens, ","); for (auto id : fsid_tokens) { unsigned long drop_fsid = std::stoul(id); eos_thread_info("commit: dropping replica on fs %lu", drop_fsid); fmd->unlinkLocation((unsigned short)drop_fsid); locations += "-"; locations += std::to_string(drop_fsid); } } std::string tracking = eos::common::StringConversion::ReduceString(locations); if (tracking.length()) { fmd->setAttribute("sys.fs.tracking", tracking.c_str()); } option["update"] = false; if (option["commitsize"]) { if ((fmd->getSize() != size) || option["modified"]) { eos_thread_debug("size difference forces mtime %lld %lld or " "ismodified=%d", fmd->getSize(), size, option.count("modified")); option["update"] = true; } fmd->setSize(size); } if (ns_quota) { ns_quota->addFile(fmd.get()); } return true; } //------------------------------------------------------------------------------ // handle OC chunk uploads //------------------------------------------------------------------------------ void CommitHelper::handle_occhunk(eos::common::VirtualIdentity& vid, std::shared_ptr& fmd, CommitHelper::option_t& option, CommitHelper::param_t& params) { if (option["occhunk"] && option["commitsize"]) { // store the index in flags; fmd->setFlags(params["oc_n"] + 1); eos_thread_info("subcmd=commit max-chunks=%d committed-chunks=%d", params["oc_max"], fmd->getFlags()); // The last chunk terminates all if (params["oc_max"] == (params["oc_n"] + 1)) { // we are done with chunked upload, remove the flags counter fmd->setFlags((S_IRWXU | S_IRWXG | S_IRWXO)); option["ocdone"] = true; } } } //------------------------------------------------------------------------------ // handle new checksums //------------------------------------------------------------------------------ void CommitHelper::handle_checksum(eos::common::VirtualIdentity& vid, std::shared_ptrfmd, CommitHelper::option_t& option, eos::Buffer& checksumbuffer) { if (option["commitchecksum"]) { if (!option["update"]) { for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) { if (fmd->getChecksum().getDataPadded(i) != checksumbuffer.getDataPadded(i)) { eos_thread_debug("checksum difference forces mtime"); option["update"] = true; } } } fmd->setChecksum(checksumbuffer); } } //------------------------------------------------------------------------------ // commit new file meta data //------------------------------------------------------------------------------ bool CommitHelper::commit_fmd(eos::common::VirtualIdentity& vid, unsigned long cid, std::shared_ptrfmd, unsigned long long replica_size, CommitHelper::option_t& option, std::string& errmsg, eos::ContainerIdentifier& p_ident ) { std::shared_ptr cmd; try { // check for a temporary Etag and remove it std::string tmpEtag = "sys.tmp.etag"; if (fmd->hasAttribute(tmpEtag) && (!option["atomic"] || option["occhunk"]) && (option["commitsize"] || option["commitchecksum"])) { // Drop the tmp etag only if this was not a creation of a 0-size file if ((fmd->getSize() != 0) || (replica_size != 0)) { fmd->removeAttribute(tmpEtag); } } gOFS->eosView->updateFileStore(fmd.get()); cmd = gOFS->eosDirectoryService->getContainerMD(cid); p_ident = cmd->getParentIdentifier(); if (option["update"]) { if (cmd->hasAttribute(tmpEtag)) { // Drop the tmp etag only if this was not a creation of a 0-size file if ((fmd->getSize() != 0) || (replica_size != 0)) { cmd->removeAttribute(tmpEtag); } } // update parent mtime and ctime cmd->setMTimeNow(); eos::IContainerMD::mtime_t new_mtime; cmd->getMTime(new_mtime); cmd->setCTime(new_mtime); gOFS->eosView->updateContainerStore(cmd.get()); cmd->notifyMTimeChange(gOFS->eosDirectoryService); } return true; } catch (eos::MDException& e) { errno = e.getErrno(); errmsg = e.getMessage().str(); eos_thread_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n", e.getErrno(), e.getMessage().str().c_str()); gOFS->MgmStats.Add("CommitFailedNamespace", 0, 0, 1); return false; } } //------------------------------------------------------------------------------ // identify latest version file id //------------------------------------------------------------------------------ unsigned long long CommitHelper::get_version_fid(eos::common::VirtualIdentity& vid, unsigned long long fid, CommitHelper::path_t& paths, CommitHelper::option_t& option) { eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, fid); std::shared_ptr versionfmd; eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex); try { auto fmd = gOFS->eosFileService->getFileMD(fid); paths["versiondir"] = gOFS->eosView->getUri(fmd.get()); if (option["versioning"]) { versionfmd = gOFS->eosView->getFile(std::string( paths["versiondir"].GetParentPath()) + std::string(paths["atomic"].GetPath())); return versionfmd->getId(); } } catch (eos::MDException& e) { errno = e.getErrno(); eos_thread_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n", e.getErrno(), e.getMessage().str().c_str()); } return 0; } //------------------------------------------------------------------------------ // handle creation of a new version during commit //------------------------------------------------------------------------------ void CommitHelper::handle_versioning(eos::common::VirtualIdentity& vid, unsigned long fid, CommitHelper::path_t& paths, CommitHelper::option_t& option, std::string& delete_path ) { eos::mgm::FusexCastBatch fuse_batch; eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); // We have to de-atomize the fmd name here e.g. make the temporary // atomic name a persistent name try { std::shared_ptr fmd; std::shared_ptr dir; std::shared_ptr versiondir; std::shared_ptr versionfmd; eos::IFileMD::XAttrMap attrmapF; dir = gOFS->eosView->getContainer(paths["versiondir"].GetParentPath()); fmd = gOFS->eosFileService->getFileMD(fid); if (fmd->getName() == paths["atomic"].GetName()) { // defere version handling for an overlapping secondary commit // due to lock-release during commit return; } if (option["versioning"] && (std::string(paths["version"].GetPath()) != "/")) { try { versiondir = gOFS->eosView->getContainer(paths["version"].GetParentPath()); // rename the existing path to the version path versionfmd = gOFS->eosView->getFile(std::string( paths["versiondir"].GetParentPath()) + std::string(paths["atomic"].GetPath())); dir->removeFile(paths["atomic"].GetName()); versionfmd->setName(paths["version"].GetName()); versionfmd->setContainerId(versiondir->getId()); attrmapF = versionfmd->getAttributes(); versiondir->addFile(versionfmd.get()); versiondir->setMTimeNow(); gOFS->eosView->updateFileStore(versionfmd.get()); const eos::FileIdentifier vfid = versionfmd->getIdentifier(); const eos::ContainerIdentifier did = dir->getIdentifier(); const eos::ContainerIdentifier vdid = versiondir->getIdentifier(); const std::string atomic_name = paths["atomic"].GetName(); fuse_batch.Register([&, vfid, did, vdid, atomic_name]() { gOFS->FuseXCastDeletion(did, atomic_name); gOFS->FuseXCastRefresh(vfid, vdid); }); // Update the ownership and mode of the new file to the original one fmd->setCUid(versionfmd->getCUid()); fmd->setCGid(versionfmd->getCGid()); fmd->setFlags(versionfmd->getFlags()); static std::set skip_tag {"sys.eos.btime", "sys.fs.tracking", "sys.utrace", "sys.vtrace", "sys.tmp.atomic"}; for (const auto& xattr : attrmapF) { if (skip_tag.find(xattr.first) == skip_tag.end()) { fmd->setAttribute(xattr.first, xattr.second); } } gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& e) { errno = e.getErrno(); eos_thread_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n", e.getErrno(), e.getMessage().str().c_str()); } } std::shared_ptr pfmd; // Rename the temporary upload path to the final path if ((pfmd = dir->findFile(paths["atomic"].GetName()))) { // Check if we are tagged as that 'latest' atomic upload std::string atomic_tag; try { atomic_tag = pfmd->getAttribute("sys.tmp.atomic"); } catch (eos::MDException& e) { } if ((!option["ocdone"]) && (atomic_tag != fmd->getName())) { // this is not our atomic upload, just abort that and delete the temporary artefact delete_path = fmd->getName(); eos_thread_err("msg=\"we are not the last atomic upload - cleaning %s\"", delete_path.c_str()); option["abort"] = true; } else { eos_thread_info("msg=\"found final path\" %s", paths["atomic"].GetName()); // If the target exists we swap the two and then delete the // previous one delete_path = fmd->getName(); delete_path += ".delete"; gOFS->eosView->renameFile(pfmd.get(), delete_path); } } else { eos_thread_info("msg=\"didn't find path\" %s", paths["atomic"].GetName()); } if (!option["abort"]) { gOFS->eosView->renameFile(fmd.get(), paths["atomic"].GetName()); eos_thread_info("msg=\"de-atomize file\" fxid=%08llx atomic-name=%s " "final-name=%s", fmd->getId(), fmd->getName().c_str(), paths["atomic"].GetName()); } } catch (eos::MDException& e) { delete_path = ""; errno = e.getErrno(); std::string errmsg = e.getMessage().str(); eos_thread_err("msg=\"exception\" ec=%d emsg=\"%s\"\n", e.getErrno(), e.getMessage().str().c_str()); } lock.Release(); // the fuse_batch guard will run broadcasts here } EOSMGMNAMESPACE_END