/* * @project The CERN Tape Archive (CTA) * @copyright Copyright © 2022 CERN * @license This program is free software, distributed under the terms of the GNU General Public * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your * option) any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A * PARTICULAR PURPOSE. See the GNU General Public License for more details. * * In applying this licence, CERN does not waive the privileges and immunities * granted to it by virtue of its status as an Intergovernmental Organization or * submit itself to any jurisdiction. */ #include "catalogue/rdbms/postgres/PostgresArchiveFileCatalogue.hpp" #include "catalogue/rdbms/RdbmsCatalogue.hpp" #include "catalogue/rdbms/RdbmsCatalogueUtils.hpp" #include "catalogue/rdbms/RdbmsFileRecycleLogCatalogue.hpp" #include "catalogue/rdbms/RdbmsTapeFileCatalogue.hpp" #include "catalogue/TapeFileWritten.hpp" #include "common/dataStructures/ArchiveFile.hpp" #include "common/dataStructures/DeleteArchiveRequest.hpp" #include "common/exception/Exception.hpp" #include "common/exception/UserError.hpp" #include "common/log/TimingList.hpp" #include "common/Timer.hpp" #include "rdbms/AutoRollback.hpp" #include "rdbms/Conn.hpp" #include "rdbms/ConnPool.hpp" namespace cta::catalogue { PostgresArchiveFileCatalogue::PostgresArchiveFileCatalogue(log::Logger &log, std::shared_ptr connPool, RdbmsCatalogue* rdbmsCatalogue) : RdbmsArchiveFileCatalogue(log, connPool, rdbmsCatalogue) {} void PostgresArchiveFileCatalogue::DO_NOT_USE_deleteArchiveFile_DO_NOT_USE(const std::string &diskInstanceName, const uint64_t archiveFileId, log::LogContext &lc) { try { const char *selectSql = "SELECT " "ARCHIVE_FILE.ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID," "ARCHIVE_FILE.DISK_INSTANCE_NAME AS DISK_INSTANCE_NAME," "ARCHIVE_FILE.DISK_FILE_ID AS DISK_FILE_ID," "ARCHIVE_FILE.DISK_FILE_UID AS DISK_FILE_UID," "ARCHIVE_FILE.DISK_FILE_GID AS DISK_FILE_GID," "ARCHIVE_FILE.SIZE_IN_BYTES AS SIZE_IN_BYTES," "ARCHIVE_FILE.CHECKSUM_BLOB AS CHECKSUM_BLOB," "ARCHIVE_FILE.CHECKSUM_ADLER32 AS CHECKSUM_ADLER32," "STORAGE_CLASS.STORAGE_CLASS_NAME AS STORAGE_CLASS_NAME," "ARCHIVE_FILE.CREATION_TIME AS ARCHIVE_FILE_CREATION_TIME," "ARCHIVE_FILE.RECONCILIATION_TIME AS RECONCILIATION_TIME," "TAPE_FILE.VID AS VID," "TAPE_FILE.FSEQ AS FSEQ," "TAPE_FILE.BLOCK_ID AS BLOCK_ID," "TAPE_FILE.LOGICAL_SIZE_IN_BYTES AS LOGICAL_SIZE_IN_BYTES," "TAPE_FILE.COPY_NB AS COPY_NB," "TAPE_FILE.CREATION_TIME AS TAPE_FILE_CREATION_TIME " "FROM " "ARCHIVE_FILE " "INNER JOIN STORAGE_CLASS ON " "ARCHIVE_FILE.STORAGE_CLASS_ID = STORAGE_CLASS.STORAGE_CLASS_ID " "INNER JOIN TAPE_FILE ON " "ARCHIVE_FILE.ARCHIVE_FILE_ID = TAPE_FILE.ARCHIVE_FILE_ID " "WHERE " "ARCHIVE_FILE.ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID " "FOR UPDATE OF ARCHIVE_FILE"; utils::Timer t; auto conn = m_connPool->getConn(); rdbms::AutoRollback autoRollback(conn); conn.executeNonQuery("BEGIN"); const auto getConnTime = t.secs(utils::Timer::resetCounter); auto selectStmt = conn.createStmt(selectSql); const auto createStmtTime = t.secs(); selectStmt.bindUint64(":ARCHIVE_FILE_ID", archiveFileId); t.reset(); rdbms::Rset selectRset = selectStmt.executeQuery(); const auto selectFromArchiveFileTime = t.secs(); std::unique_ptr archiveFile; std::set vidsToSetDirty; while(selectRset.next()) { if(nullptr == archiveFile.get()) { archiveFile = std::make_unique(); archiveFile->archiveFileID = selectRset.columnUint64("ARCHIVE_FILE_ID"); archiveFile->diskInstance = selectRset.columnString("DISK_INSTANCE_NAME"); archiveFile->diskFileId = selectRset.columnString("DISK_FILE_ID"); archiveFile->diskFileInfo.owner_uid = selectRset.columnUint64("DISK_FILE_UID"); archiveFile->diskFileInfo.gid = selectRset.columnUint64("DISK_FILE_GID"); archiveFile->fileSize = selectRset.columnUint64("SIZE_IN_BYTES"); archiveFile->checksumBlob.deserializeOrSetAdler32(selectRset.columnBlob("CHECKSUM_BLOB"), selectRset.columnUint64("CHECKSUM_ADLER32")); archiveFile->storageClass = selectRset.columnString("STORAGE_CLASS_NAME"); archiveFile->creationTime = selectRset.columnUint64("ARCHIVE_FILE_CREATION_TIME"); archiveFile->reconciliationTime = selectRset.columnUint64("RECONCILIATION_TIME"); } // If there is a tape file if(!selectRset.columnIsNull("VID")) { // Add the tape file to the archive file's in-memory structure common::dataStructures::TapeFile tapeFile; tapeFile.vid = selectRset.columnString("VID"); vidsToSetDirty.insert(tapeFile.vid); tapeFile.fSeq = selectRset.columnUint64("FSEQ"); tapeFile.blockId = selectRset.columnUint64("BLOCK_ID"); tapeFile.fileSize = selectRset.columnUint64("LOGICAL_SIZE_IN_BYTES"); tapeFile.copyNb = selectRset.columnUint64("COPY_NB"); tapeFile.creationTime = selectRset.columnUint64("TAPE_FILE_CREATION_TIME"); tapeFile.checksumBlob = archiveFile->checksumBlob; // Duplicated for convenience archiveFile->tapeFiles.push_back(tapeFile); } } if(nullptr == archiveFile.get()) { log::ScopedParamContainer spc(lc); spc.add("fileId", archiveFileId); lc.log(log::WARNING, "Ignoring request to delete archive file because it does not exist in the catalogue"); return; } if(diskInstanceName != archiveFile->diskInstance) { log::ScopedParamContainer spc(lc); spc.add("fileId", std::to_string(archiveFile->archiveFileID)) .add("diskInstance", archiveFile->diskInstance) .add("requestDiskInstance", diskInstanceName) .add("diskFileId", archiveFile->diskFileId) .add("diskFileInfo.owner_uid", archiveFile->diskFileInfo.owner_uid) .add("diskFileInfo.gid", archiveFile->diskFileInfo.gid) .add("fileSize", std::to_string(archiveFile->fileSize)) .add("creationTime", std::to_string(archiveFile->creationTime)) .add("reconciliationTime", std::to_string(archiveFile->reconciliationTime)) .add("storageClass", archiveFile->storageClass) .add("getConnTime", getConnTime) .add("createStmtTime", createStmtTime) .add("selectFromArchiveFileTime", selectFromArchiveFileTime); archiveFile->checksumBlob.addFirstChecksumToLog(spc); for(auto it=archiveFile->tapeFiles.begin(); it!=archiveFile->tapeFiles.end(); it++) { std::stringstream tapeCopyLogStream; tapeCopyLogStream << "copy number: " << it->copyNb << " vid: " << it->vid << " fSeq: " << it->fSeq << " blockId: " << it->blockId << " creationTime: " << it->creationTime << " fileSize: " << it->fileSize << " checksumBlob: " << it->checksumBlob //this shouldn't be here: repeated field << " copyNb: " << it->copyNb; spc.add("TAPE FILE", tapeCopyLogStream.str()); } lc.log(log::WARNING, "Failed to delete archive file because the disk instance of the request does not match that " "of the archived file"); exception::UserError ue; ue.getMessage() << "Failed to delete archive file with ID " << archiveFileId << " because the disk instance of " "the request does not match that of the archived file: archiveFileId=" << archiveFileId << " requestDiskInstance=" << diskInstanceName << " archiveFileDiskInstance=" << archiveFile->diskInstance; throw ue; } t.reset(); { const char *const sql = "DELETE FROM TAPE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID"; auto stmt = conn.createStmt(sql); stmt.bindUint64(":ARCHIVE_FILE_ID", archiveFileId); stmt.executeNonQuery(); } const auto deleteFromTapeFileTime = t.secs(utils::Timer::resetCounter); for(auto &vidToSetDirty: vidsToSetDirty){ //We deleted the TAPE_FILE so the tapes containing them should be set as dirty RdbmsCatalogueUtils::setTapeDirty(conn,vidToSetDirty); } const auto setTapeDirtyTime = t.secs(utils::Timer::resetCounter); { const char *const sql = "DELETE FROM ARCHIVE_FILE WHERE ARCHIVE_FILE_ID = :ARCHIVE_FILE_ID"; auto stmt = conn.createStmt(sql); stmt.bindUint64(":ARCHIVE_FILE_ID", archiveFileId); stmt.executeNonQuery(); } const auto deleteFromArchiveFileTime = t.secs(utils::Timer::resetCounter); conn.commit(); autoRollback.cancel(); const auto commitTime = t.secs(); log::ScopedParamContainer spc(lc); spc.add("fileId", std::to_string(archiveFile->archiveFileID)) .add("diskInstance", archiveFile->diskInstance) .add("diskFileId", archiveFile->diskFileId) .add("diskFileInfo.owner_uid", archiveFile->diskFileInfo.owner_uid) .add("diskFileInfo.gid", archiveFile->diskFileInfo.gid) .add("fileSize", std::to_string(archiveFile->fileSize)) .add("creationTime", std::to_string(archiveFile->creationTime)) .add("reconciliationTime", std::to_string(archiveFile->reconciliationTime)) .add("storageClass", archiveFile->storageClass) .add("getConnTime", getConnTime) .add("createStmtTime", createStmtTime) .add("selectFromArchiveFileTime", selectFromArchiveFileTime) .add("deleteFromTapeFileTime", deleteFromTapeFileTime) .add("deleteFromArchiveFileTime", deleteFromArchiveFileTime) .add("setTapeDirtyTime",setTapeDirtyTime) .add("commitTime", commitTime); archiveFile->checksumBlob.addFirstChecksumToLog(spc); for(auto it=archiveFile->tapeFiles.begin(); it!=archiveFile->tapeFiles.end(); it++) { std::stringstream tapeCopyLogStream; tapeCopyLogStream << "copy number: " << it->copyNb << " vid: " << it->vid << " fSeq: " << it->fSeq << " blockId: " << it->blockId << " creationTime: " << it->creationTime << " fileSize: " << it->fileSize << " checksumBlob: " << it->checksumBlob //this shouldn't be here: repeated field << " copyNb: " << static_cast(it->copyNb); //this shouldn't be here: repeated field spc.add("TAPE FILE", tapeCopyLogStream.str()); } lc.log(log::INFO, "Archive file deleted from CTA catalogue"); } catch(exception::UserError &) { throw; } catch(exception::Exception &ex) { ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); throw; } } uint64_t PostgresArchiveFileCatalogue::getNextArchiveFileId(rdbms::Conn &conn) { try { const char *const sql = "select NEXTVAL('ARCHIVE_FILE_ID_SEQ') AS ARCHIVE_FILE_ID"; auto stmt = conn.createStmt(sql); auto rset = stmt.executeQuery(); if(!rset.next()) { throw exception::Exception("Result set is unexpectedly empty"); } return rset.columnUint64("ARCHIVE_FILE_ID"); } catch(exception::UserError &) { throw; } catch(exception::Exception &ex) { ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); throw; } } void PostgresArchiveFileCatalogue::copyArchiveFileToFileRecyleLogAndDelete(rdbms::Conn & conn, const common::dataStructures::DeleteArchiveRequest &request, log::LogContext & lc) { try { utils::Timer t; log::TimingList tl; //We currently do an INSERT INTO and a DELETE FROM //in a single transaction conn.executeNonQuery("BEGIN"); const auto fileRecycleLog = static_cast(m_rdbmsCatalogue->FileRecycleLog().get()); fileRecycleLog->copyArchiveFileToFileRecycleLog(conn,request); tl.insertAndReset("insertToRecycleBinTime",t); RdbmsCatalogueUtils::setTapeDirty(conn,request.archiveFileID); tl.insertAndReset("setTapeDirtyTime",t); const auto tapeFileCatalogue = static_cast(m_rdbmsCatalogue->TapeFile().get()); tapeFileCatalogue->deleteTapeFiles(conn,request); tl.insertAndReset("deleteTapeFilesTime",t); deleteArchiveFile(conn,request); tl.insertAndReset("deleteArchiveFileTime",t); conn.commit(); tl.insertAndReset("commitTime",t); log::ScopedParamContainer spc(lc); spc.add("archiveFileId",request.archiveFileID); spc.add("diskFileId",request.diskFileId); spc.add("diskFilePath",request.diskFilePath); spc.add("diskInstance",request.diskInstance); tl.addToLog(spc); lc.log(log::INFO,"In PostgresCatalogue::copyArchiveFileToRecycleBinAndDelete: ArchiveFile moved to the recycle-bin."); } catch(exception::UserError &) { throw; } catch(exception::Exception &ex) { ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); throw; } } //------------------------------------------------------------------------------ // selectArchiveFileSizeAndChecksum //------------------------------------------------------------------------------ std::map PostgresArchiveFileCatalogue::selectArchiveFileSizesAndChecksums( rdbms::Conn &conn, const std::set &events) const { try { std::vector archiveFileIdList(events.size()); for (const auto &event: events) { archiveFileIdList.push_back(event.archiveFileId); } const char *const sql = "SELECT " "ARCHIVE_FILE.ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID," "ARCHIVE_FILE.SIZE_IN_BYTES AS SIZE_IN_BYTES," "ARCHIVE_FILE.CHECKSUM_BLOB AS CHECKSUM_BLOB," "ARCHIVE_FILE.CHECKSUM_ADLER32 AS CHECKSUM_ADLER32 " "FROM " "ARCHIVE_FILE " "INNER JOIN TEMP_TAPE_FILE_BATCH ON " "ARCHIVE_FILE.ARCHIVE_FILE_ID = TEMP_TAPE_FILE_BATCH.ARCHIVE_FILE_ID"; auto stmt = conn.createStmt(sql); auto rset = stmt.executeQuery(); std::map fileSizesAndChecksums; while (rset.next()) { const uint64_t archiveFileId = rset.columnUint64("ARCHIVE_FILE_ID"); if (fileSizesAndChecksums.end() != fileSizesAndChecksums.find(archiveFileId)) { exception::Exception ex; ex.getMessage() << __FUNCTION__ << " failed: " "Found duplicate archive file identifier in batch of files written to tape: archiveFileId=" << archiveFileId; throw ex; } FileSizeAndChecksum fileSizeAndChecksum; fileSizeAndChecksum.fileSize = rset.columnUint64("SIZE_IN_BYTES"); fileSizeAndChecksum.checksumBlob.deserializeOrSetAdler32(rset.columnBlob("CHECKSUM_BLOB"), rset.columnUint64("CHECKSUM_ADLER32")); fileSizesAndChecksums[archiveFileId] = fileSizeAndChecksum; } return fileSizesAndChecksums; } catch(exception::UserError &) { throw; } catch(exception::Exception &ex) { ex.getMessage().str(std::string(__FUNCTION__) + ": " + ex.getMessage().str()); throw; } } } // namespace cta::catalogue