/** * @project The CERN Tape Archive (CTA) * @copyright Copyright © 2023 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "scheduler/PostgresSchedDB/RetrieveRequest.hpp" #include "scheduler/PostgresSchedDB/sql/Enums.hpp" // generated during build in the build tree #include "PostgresSchedDB/rowblobs.pb.h" namespace cta::postgresscheddb { void RetrieveRequest::insert() { postgresscheddb::sql::RetrieveJobQueueRow row; postgresscheddb::blobser::RetrieveJobs rj; postgresscheddb::blobser::RetrieveRequestRepackInfo ri; row.retrieveReqId = m_requestId; row.mountId = m_mountId; row.status = m_status; row.vid = m_vid; row.priority = m_priority; row.retMinReqAge = m_retrieveMinReqAge; row.startTime = m_startTime; row.failureReportUrl = m_failureReportUrl; row.failureReportLog = m_failureReportLog; row.isFailed = m_isFailed; row.retrieveRequest = m_schedRetrieveReq; row.archiveFile = m_archiveFile; // the tapeFiles from the archiveFile are not stored in the row // Instead each tapefile is a part of the retrieve job protobuf. // So fill in the protobuf Job and Tapefile info here: for(auto &j: m_jobs) { postgresscheddb::blobser::RetrieveJob *pb_job = rj.add_jobs(); postgresscheddb::blobser::TapeFile *pb_tf = pb_job->mutable_tapefile(); const cta::common::dataStructures::TapeFile *tf = 0; for(auto &f: m_archiveFile.tapeFiles) { if (f.copyNb == j.copyNb) { tf = &f; break; } } if (!tf) { throw std::runtime_error("Found job without tapefile"); } pb_tf->set_vid(tf->vid); pb_tf->set_fseq(tf->fSeq); pb_tf->set_blockid(tf->blockId); pb_tf->set_filesize(tf->fileSize); pb_tf->set_copynb(tf->copyNb); pb_tf->set_creationtime(tf->creationTime); pb_tf->set_checksumblob(tf->checksumBlob.serialize()); pb_job->set_copynb(j.copyNb); pb_job->set_maxtotalretries(j.maxtotalretries); pb_job->set_maxretrieswithinmount(j.maxretrieswithinmount); pb_job->set_retrieswithinmount(j.retrieswithinmount); pb_job->set_totalretries(j.totalretries); pb_job->set_lastmountwithfailure(j.lastmountwithfailure); pb_job->set_maxreportretries(j.maxreportretries); pb_job->set_totalreportretries(j.totalreportretries); pb_job->set_isfailed(j.isfailed); for(auto &s: j.failurelogs) { pb_job->add_failurelogs(s); } for(auto &s: j.reportfailurelogs) { pb_job->add_reportfailurelogs(s); } switch(j.status) { case postgresscheddb::RetrieveJobStatus::RJS_ToTransfer: pb_job->set_status(postgresscheddb::blobser::RetrieveJobStatus::RJS_ToTransfer); break; case postgresscheddb::RetrieveJobStatus::RJS_ToReportToUserForFailure: pb_job->set_status(postgresscheddb::blobser::RetrieveJobStatus::RJS_ToReportToUserForFailure); break; case postgresscheddb::RetrieveJobStatus::RJS_Failed: pb_job->set_status(postgresscheddb::blobser::RetrieveJobStatus::RJS_Failed); break; case postgresscheddb::RetrieveJobStatus::RJS_ToReportToRepackForSuccess: pb_job->set_status(postgresscheddb::blobser::RetrieveJobStatus::RJS_ToReportToRepackForSuccess); break; case postgresscheddb::RetrieveJobStatus::RJS_ToReportToRepackForFailure: pb_job->set_status(postgresscheddb::blobser::RetrieveJobStatus::RJS_ToReportToRepackForFailure); break; default: throw std::runtime_error("unexpected status in RetrieveRequest insert"); break; } } row.mountPolicyName= m_mountPolicyName; row.activity = m_activity; row.diskSystemName = m_diskSystemName; row.actCopyNb = m_actCopyNb; // isrepack & repackReqId are stored both individually in the row and inside // the repackinfo protobuf row.isRepack = m_repackInfo.isRepack; row.repackReqId = m_repackInfo.repackRequestId; ri.set_fseq(m_repackInfo.fSeq); ri.set_file_buffer_url(m_repackInfo.fileBufferURL); ri.set_has_user_provided_file(m_repackInfo.hasUserProvidedFile); for(auto &m: m_repackInfo.archiveRouteMap) { postgresscheddb::blobser::RetrieveRequestArchiveRoute *ar = ri.add_archive_routes(); ar->set_copynb(m.first); ar->set_tapepool(m.second); } for(auto &c: m_repackInfo.copyNbsToRearchive) { ri.add_copy_nbs_to_rearchive(c); } rj.SerializeToString(&row.retrieveJobsProtoBuf); ri.SerializeToString(&row.repackInfoProtoBuf); log::ScopedParamContainer params(m_lc); row.addParamsToLogContext(params); m_txn.reset(new postgresscheddb::Transaction(*m_connPool)); try { row.insert(*m_txn); } catch(exception::Exception &ex) { params.add("exeptionMessage", ex.getMessageValue()); m_lc.log(log::ERR, "In RetrieveRequest::insert(): failed to queue job."); throw; } m_lc.log(log::INFO, "In RetrieveRequest::insert(): added job to queue."); } void RetrieveRequest::update() { throw std::runtime_error("update not implemented."); } void RetrieveRequest::commit() { if (m_txn) { m_txn->commit(); } m_txn.reset(); } void RetrieveRequest::setFailureReason(const std::string & reason) { throw std::runtime_error("setFailureReason not implemented."); } bool RetrieveRequest::addJobFailure(uint32_t copyNumber, uint64_t mountId, const std::string & failureReason, log::LogContext & lc) { throw std::runtime_error("addJobFailure not implemented."); } void RetrieveRequest::setRepackInfo(const cta::postgresscheddb::RetrieveRequest::RetrieveReqRepackInfo & repackInfo) { throw std::runtime_error("setRepackInfo not implemented."); } void RetrieveRequest::setJobStatus(uint32_t copyNumber, const cta::postgresscheddb::RetrieveJobStatus &status) { throw std::runtime_error("setJobStatus not implemented."); } void RetrieveRequest::setSchedulerRequest(const cta::common::dataStructures::RetrieveRequest & retrieveRequest) { m_schedRetrieveReq = retrieveRequest; } void RetrieveRequest::setRetrieveFileQueueCriteria(const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria) { m_archiveFile = criteria.archiveFile; m_mountPolicyName = criteria.mountPolicy.name; m_priority = criteria.mountPolicy.retrievePriority; m_retrieveMinReqAge = criteria.mountPolicy.retrieveMinRequestAge; const uint32_t hardcodedRetriesWithinMount = m_repackInfo.isRepack ? 1 : 3; const uint32_t hardcodedTotalRetries = m_repackInfo.isRepack ? 1 : 6; const uint32_t hardcodedReportRetries = 2; m_jobs.clear(); // create jobs for each tapefile in the archiveFile; for(auto &tf: m_archiveFile.tapeFiles) { m_jobs.emplace_back(); m_jobs.back().copyNb = tf.copyNb; m_jobs.back().maxretrieswithinmount = hardcodedRetriesWithinMount; m_jobs.back().maxtotalretries = hardcodedTotalRetries; m_jobs.back().maxreportretries = hardcodedReportRetries; } } void RetrieveRequest::setActivityIfNeeded(const cta::common::dataStructures::RetrieveRequest & retrieveRequest, const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria) { m_activity = retrieveRequest.activity; } void RetrieveRequest::setDiskSystemName(const std::string & diskSystemName) { m_diskSystemName = diskSystemName; } void RetrieveRequest::setCreationTime(const uint64_t creationTime) { m_schedRetrieveReq.lifecycleTimings.creation_time = creationTime; } void RetrieveRequest::setFirstSelectedTime(const uint64_t firstSelectedTime) { throw std::runtime_error("setFirstSelectedTime not implemented."); } void RetrieveRequest::setCompletedTime(const uint64_t completedTime) { throw std::runtime_error("setCompletedTime not implemented."); } void RetrieveRequest::setReportedTime(const uint64_t reportedTime) { throw std::runtime_error("setReportedTime not implemented."); } void RetrieveRequest::setActiveCopyNumber(uint32_t activeCopyNb) { m_actCopyNb = activeCopyNb; // copy the active job info to the request level columns for(auto &j: m_jobs) { if (j.copyNb != m_actCopyNb) continue; for(auto &tf: m_archiveFile.tapeFiles) { if (tf.copyNb != m_actCopyNb) continue; m_status = j.status; m_vid = tf.vid; } } } void RetrieveRequest::setIsVerifyOnly(bool isVerifyOnly) { m_schedRetrieveReq.isVerifyOnly = isVerifyOnly; } void RetrieveRequest::setFailed() { throw std::runtime_error("setFailed not implemented."); } std::list RetrieveRequest::dumpJobs() { throw std::runtime_error("dumpJobs not implemented."); } RetrieveRequest& RetrieveRequest::operator=(const postgresscheddb::sql::RetrieveJobQueueRow &row) { postgresscheddb::blobser::RetrieveJobs rj; postgresscheddb::blobser::RetrieveRequestRepackInfo ri; rj.ParseFromString(row.retrieveJobsProtoBuf); ri.ParseFromString(row.repackInfoProtoBuf); m_requestId = row.retrieveReqId; m_mountId = row.mountId; m_status = row.status; m_vid = row.vid; m_priority = row.priority; m_retrieveMinReqAge = row.retMinReqAge; m_startTime = row.startTime; m_failureReportUrl = row.failureReportUrl; m_failureReportLog = row.failureReportLog; m_isFailed = row.isFailed; m_schedRetrieveReq = row.retrieveRequest; m_archiveFile = row.archiveFile; // the archiveFile above doesn't include the tapeFiles list. We only consider // tapefiles that the scheduler originally gave us for in the criteria of // the retrieve request (which might be a subset of those in the catalogue // for the given archiveFile). The tape files are packed inside the jobs list // in the row. for(auto &j: rj.jobs()) { m_archiveFile.tapeFiles.emplace_back(); m_archiveFile.tapeFiles.back().vid = j.tapefile().vid(); m_archiveFile.tapeFiles.back().fSeq = j.tapefile().fseq(); m_archiveFile.tapeFiles.back().blockId = j.tapefile().blockid(); m_archiveFile.tapeFiles.back().fileSize = j.tapefile().filesize(); m_archiveFile.tapeFiles.back().copyNb = j.tapefile().copynb(); m_archiveFile.tapeFiles.back().creationTime = j.tapefile().creationtime(); m_archiveFile.tapeFiles.back().checksumBlob.deserialize( j.tapefile().checksumblob() ); } m_mountPolicyName = row.mountPolicyName; m_activity = row.activity; m_diskSystemName = row.diskSystemName; m_actCopyNb = row.actCopyNb; m_repackInfo.isRepack = row.isRepack; m_repackInfo.repackRequestId = row.repackReqId; m_repackInfo.archiveRouteMap.clear(); for(auto &rm: ri.archive_routes()) { m_repackInfo.archiveRouteMap[rm.copynb()] = rm.tapepool(); } m_repackInfo.copyNbsToRearchive.clear(); for(auto &cn: ri.copy_nbs_to_rearchive()) { m_repackInfo.copyNbsToRearchive.insert(cn); } m_repackInfo.fSeq = ri.fseq(); m_repackInfo.fileBufferURL = ri.file_buffer_url(); m_repackInfo.hasUserProvidedFile = ri.has_user_provided_file(); m_jobs.clear(); for(auto &j: rj.jobs()) { m_jobs.emplace_back(); m_jobs.back().copyNb = j.copynb(); m_jobs.back().maxtotalretries = j.maxtotalretries(); m_jobs.back().maxretrieswithinmount = j.maxretrieswithinmount(); m_jobs.back().retrieswithinmount = j.retrieswithinmount(); m_jobs.back().totalretries = j.totalretries(); m_jobs.back().lastmountwithfailure = j.lastmountwithfailure(); for(auto &fl: j.failurelogs()) { m_jobs.back().failurelogs.push_back(fl); } m_jobs.back().maxreportretries = j.maxreportretries(); m_jobs.back().totalreportretries = j.totalreportretries(); for(auto &rfl: j.reportfailurelogs()) { m_jobs.back().reportfailurelogs.push_back(rfl); } m_jobs.back().isfailed = j.isfailed(); switch(j.status()) { case postgresscheddb::blobser::RetrieveJobStatus::RJS_ToTransfer: m_jobs.back().status = postgresscheddb::RetrieveJobStatus::RJS_ToTransfer; break; case postgresscheddb::blobser::RetrieveJobStatus::RJS_ToReportToUserForFailure: m_jobs.back().status = postgresscheddb::RetrieveJobStatus::RJS_ToReportToUserForFailure; break; case postgresscheddb::blobser::RetrieveJobStatus::RJS_Failed: m_jobs.back().status = postgresscheddb::RetrieveJobStatus::RJS_Failed; break; case postgresscheddb::blobser::RetrieveJobStatus::RJS_ToReportToRepackForSuccess: m_jobs.back().status = postgresscheddb::RetrieveJobStatus::RJS_ToReportToRepackForSuccess; break; case postgresscheddb::blobser::RetrieveJobStatus::RJS_ToReportToRepackForFailure: m_jobs.back().status = postgresscheddb::RetrieveJobStatus::RJS_ToReportToRepackForFailure; break; default: throw std::runtime_error("unexpected status in RetrieveRequest assign from row"); break; } } return *this; } } // namespace cta::postgresscheddb