// ---------------------------------------------------------------------- // File: Verify.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #define __STDC_FORMAT_MACROS #include #include "fst/storage/Storage.hh" #include "fst/XrdFstOfs.hh" #include "fst/XrdFstOss.hh" #include "fst/Verify.hh" #include "fst/checksum/ChecksumPlugins.hh" #include "fst/io/FileIoPluginCommon.hh" #include "common/Path.hh" extern eos::fst::XrdFstOss* XrdOfsOss; EOSFSTNAMESPACE_BEGIN /*----------------------------------------------------------------------------*/ void Storage::Verify() { using eos::common::FileId; std::map open_w_out; // Thread that verifies stored files while (1) { mVerifyMutex.Lock(); if (!mVerifications.size()) { mVerifyMutex.UnLock(); sleep(1); continue; } eos::fst::Verify* verifyfile = mVerifications.front(); if (verifyfile) { eos_static_debug("got %llu\n", (unsigned long long) verifyfile); mVerifications.pop(); mRunningVerify = verifyfile; if (gOFS.openedForWriting.isOpen(verifyfile->fsId, verifyfile->fId)) { time_t now = time(NULL); if (open_w_out[verifyfile->fId] < now) { eos_static_warning("file is currently opened for writing id=%x on " "fs=%u - skipping verification", verifyfile->fId, verifyfile->fsId); // Spit this message out only once pre minute open_w_out[verifyfile->fId] = now + 60; } mVerifications.push(verifyfile); mVerifyMutex.UnLock(); continue; } } else { eos_static_debug("got nothing"); mVerifyMutex.UnLock(); mRunningVerify = 0; continue; } mVerifyMutex.UnLock(); eos_static_debug("verifying File Id=%x on Fs=%u", verifyfile->fId, verifyfile->fsId); // verify the file const std::string hex_fid = FileId::Fid2Hex(verifyfile->fId); XrdOucErrInfo error; std::string fstPath = FileId::FidPrefix2FullPath(hex_fid.c_str(), verifyfile->localPrefix.c_str()); { auto fMd = gOFS.mFmdHandler->LocalGetFmd(verifyfile->fId, verifyfile->fsId, true); if (fMd) { // force a resync of meta data from the MGM // e.g. store in the WrittenFilesQueue to have it done asynchronous gOFS.WrittenFilesQueueMutex.Lock(); gOFS.WrittenFilesQueue.push(*fMd.get()); gOFS.WrittenFilesQueueMutex.UnLock(); } } FileIo* io = eos::fst::FileIoPluginHelper::GetIoObject(fstPath.c_str()); // get current size on disk struct stat statinfo; int open_rc = -1; if (!io || (open_rc = io->fileOpen(0, 0)) || io->fileStat(&statinfo)) { eos_static_err("unable to verify file id=%x on fs=%u path=%s - stat on " "local disk failed", verifyfile->fId, verifyfile->fsId, fstPath.c_str()); // If there is no file, we should not commit anything to the MGM verifyfile->commitSize = 0; verifyfile->commitChecksum = 0; statinfo.st_size = 0; // indicates the missing file - not perfect though } // even if the stat failed, we run this code to tag the file as is ... // attach meta data bool localUpdate = false; auto fMd = gOFS.mFmdHandler->LocalGetFmd(verifyfile->fId, verifyfile->fsId, true, verifyfile->commitFmd); if (!fMd) { eos_static_err("unable to verify id=%x on fs=%u path=%s - no local MD stored", verifyfile->fId, verifyfile->fsId, fstPath.c_str()); } else { if (fMd->mProtoFmd.disksize() != (unsigned long long) statinfo.st_size) { eos_static_err("msg=\"updating disk size\" path=\"%s\" fxid=%s " "stat_sz=%llu disk_sz=%llu", verifyfile->path.c_str(), hex_fid.c_str(), statinfo.st_size, fMd->mProtoFmd.disksize()); fMd->mProtoFmd.set_disksize(statinfo.st_size); localUpdate = true; } if (fMd->mProtoFmd.lid() != verifyfile->lId) { eos_static_err("msg=\"updating layout id\" path=\"%s\" fxid=%s " "central value %u - changelog value %u", verifyfile->path.c_str(), hex_fid.c_str(), verifyfile->lId, fMd->mProtoFmd.lid()); localUpdate = true; } if (fMd->mProtoFmd.cid() != verifyfile->cId) { eos_static_err("msg=\"updating container id\" path=\"%s\" fxid=%s " "central value %llu - changelog value %llu", verifyfile->path.c_str(), hex_fid.c_str(), verifyfile->cId, fMd->mProtoFmd.cid()); localUpdate = true; } // Update reference size if (eos::common::LayoutId::IsRain(fMd->mProtoFmd.lid())) { // This is the best he have, no easy way to know the logical size // for a RAIN file if (fMd->mProtoFmd.size() != fMd->mProtoFmd.mgmsize()) { fMd->mProtoFmd.set_size(fMd->mProtoFmd.mgmsize()); localUpdate = true; } } else { if (fMd->mProtoFmd.size() != (unsigned long long)statinfo.st_size) { fMd->mProtoFmd.set_size(statinfo.st_size); localUpdate = true; } } fMd->mProtoFmd.set_lid(verifyfile->lId); fMd->mProtoFmd.set_cid(verifyfile->cId); std::unique_ptr checksummer = ChecksumPlugins::GetChecksumObject(fMd->mProtoFmd.lid()); unsigned long long scansize = 0; float scantime = 0; // is ms eos::fst::CheckSum::ReadCallBack::callback_data_t cbd; cbd.caller = (void*) io; eos::fst::CheckSum::ReadCallBack cb(eos::fst::XrdFstOfsFile::FileIoReadCB, cbd); if ((checksummer) && verifyfile->computeChecksum && (!checksummer->ScanFile(cb, scansize, scantime, verifyfile->verifyRate))) { eos_static_crit("cannot scan file to recalculate the checksum id=%llu on fs=%u path=%s", verifyfile->fId, verifyfile->fsId, fstPath.c_str()); } else { XrdOucString sizestring; if (checksummer && verifyfile->computeChecksum) { eos_static_info("rescanned checksum - size=%s time=%.02fms rate=%.02f " "MB/s limit=%d MB/s", eos::common::StringConversion::GetReadableSizeString( sizestring, scansize, "B"), scantime, 1.0 * scansize / 1000 / (scantime ? scantime : 99999999999999LL), verifyfile->verifyRate); } if (checksummer && verifyfile->computeChecksum) { int checksumlen = 0; checksummer->GetBinChecksum(checksumlen); bool cxError = false; std::string computedchecksum = checksummer->GetHexChecksum(); if (fMd->mProtoFmd.checksum() != computedchecksum) { cxError = true; } // commit the disk checksum in case of differences between the in-memory value if (fMd->mProtoFmd.diskchecksum() != computedchecksum) { cxError = true; localUpdate = true; } if (cxError) { eos_static_err("checksum invalid : path=%s fxid=%s checksum=%s stored-checksum=%s", verifyfile->path.c_str(), hex_fid.c_str(), checksummer->GetHexChecksum(), fMd->mProtoFmd.checksum().c_str()); fMd->mProtoFmd.set_checksum(computedchecksum); fMd->mProtoFmd.set_diskchecksum(computedchecksum); fMd->mProtoFmd.set_disksize(fMd->mProtoFmd.size()); if (verifyfile->commitSize) { fMd->mProtoFmd.set_mgmsize(fMd->mProtoFmd.size()); } if (verifyfile->commitChecksum) { fMd->mProtoFmd.set_mgmchecksum(computedchecksum); fMd->mProtoFmd.set_blockcxerror(0); fMd->mProtoFmd.set_filecxerror(0); } localUpdate = true; } else { eos_static_info("checksum OK : path=%s fxid=%s checksum=%s", verifyfile->path.c_str(), hex_fid.c_str(), checksummer->GetHexChecksum()); // Reset error flags if needed if (fMd->mProtoFmd.blockcxerror() || fMd->mProtoFmd.filecxerror()) { fMd->mProtoFmd.set_blockcxerror(0); fMd->mProtoFmd.set_filecxerror(0); localUpdate = true; } } // Update the extended attributes if (io) { (void)io->attrSet("user.eos.checksum", checksummer->GetBinChecksum(checksumlen), checksumlen); (void)io->attrSet("user.eos.checksumtype", checksummer->GetName(), strlen(checksummer->GetName())); (void)io->attrSet("user.eos.filecxerror", "0", 1); (void)io->attrSet("user.eos.blockcxerror", "0"); } } eos::common::Path cPath(verifyfile->path.c_str()); // commit local if (localUpdate && (!gOFS.mFmdHandler->Commit(fMd.get()))) { eos_static_err("unable to verify file id=%llu on fs=%u path=%s - commit " "to local MD storage failed", verifyfile->fId, verifyfile->fsId, fstPath.c_str()); } else { if (localUpdate) { eos_static_info("committed verified meta data locally id=%llu on fs=%u path=%s", verifyfile->fId, verifyfile->fsId, fstPath.c_str()); } // commit to central mgm cache, only if commitSize or commitChecksum is set XrdOucString capOpaqueFile = ""; XrdOucString mTimeString = ""; capOpaqueFile += "/?"; capOpaqueFile += "&mgm.pcmd=commit"; capOpaqueFile += "&mgm.verify.checksum=1"; capOpaqueFile += "&mgm.size="; char filesize[1024]; sprintf(filesize, "%" PRIu64 "", fMd->mProtoFmd.size()); capOpaqueFile += filesize; capOpaqueFile += "&mgm.fid="; capOpaqueFile += hex_fid.c_str(); capOpaqueFile += "&mgm.path="; capOpaqueFile += verifyfile->path.c_str(); if (checksummer && verifyfile->computeChecksum) { capOpaqueFile += "&mgm.checksum="; capOpaqueFile += checksummer->GetHexChecksum(); if (verifyfile->commitChecksum) { capOpaqueFile += "&mgm.commit.checksum=1"; } } if (verifyfile->commitSize) { capOpaqueFile += "&mgm.commit.size=1"; } capOpaqueFile += "&mgm.commit.verify=1"; capOpaqueFile += "&mgm.mtime="; capOpaqueFile += eos::common::StringConversion::GetSizeString(mTimeString, (unsigned long long) fMd->mProtoFmd.mtime()); capOpaqueFile += "&mgm.mtime_ns="; capOpaqueFile += eos::common::StringConversion::GetSizeString(mTimeString, (unsigned long long) fMd->mProtoFmd.mtime_ns()); capOpaqueFile += "&mgm.add.fsid="; capOpaqueFile += (int) fMd->mProtoFmd.fsid(); if (verifyfile->commitSize || verifyfile->commitChecksum) { if (localUpdate) { eos_static_info("committed verified meta data centrally id=%llu on fs=%u path=%s", verifyfile->fId, verifyfile->fsId, fstPath.c_str()); } int rc = gOFS.CallManager(&error, verifyfile->path.c_str(), 0, capOpaqueFile); if (rc) { eos_static_err("unable to verify file id=%s fs=%u at manager %s", hex_fid.c_str(), verifyfile->fsId, verifyfile->managerId.c_str()); } } } } } if (!open_rc) { io->fileClose(); } mRunningVerify = 0; if (verifyfile) { delete verifyfile; } } } EOSFSTNAMESPACE_END