//------------------------------------------------------------------------------ // File: FileEos.cc // Author: Elvin-Alin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include #include #include #include "FileEos.hh" #include "Result.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "fst/io/FileIoPlugin.hh" #include "fst/io/AsyncMetaHandler.hh" #include "fst/layout/RainMetaLayout.hh" #include "fst/layout/RaidDpLayout.hh" #include "fst/layout/ReedSLayout.hh" #include "common/Timing.hh" #include "common/LayoutId.hh" EOSBMKNAMESPACE_BEGIN //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ FileEos::FileEos(const std::string& filePath, const std::string& bmkInstance, uint64_t fileSize, uint32_t blockSize): eos::common::LogId(), mFilePath(filePath), mBmkInstance(bmkInstance), mFileSize(fileSize), mBlockSize(blockSize) { // empty } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ FileEos::~FileEos() { // empty } //------------------------------------------------------------------------------ // Write file //------------------------------------------------------------------------------ int FileEos::Write(Result*& result) { eos_debug("Calling function"); int retc = 0; uint64_t file_size = mFileSize; uint32_t block_size = mBlockSize; eos::common::Timing wr_timing("write"); // Fill buffer with random characters char* buffer = new char[block_size]; std::ifstream urandom("/dev/urandom", std::ios::in | std::ios::binary); urandom.read(buffer, block_size); urandom.close(); // Open the file for writing and get and XrdFileIo object mode_t mode_sfs = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IROTH; XrdSfsFileOpenMode flags_sfs = SFS_O_CREAT | SFS_O_RDWR; COMMONTIMING("OPEN", &wr_timing); std::string full_path = mBmkInstance; full_path += "/"; full_path += mFilePath; eos::fst::FileIo* file = eos::fst::FileIoPlugin::GetIoObject( full_path.c_str()); retc = file->fileOpen(flags_sfs, mode_sfs, ""); if (retc) { eos_err("Error while opening file: %s", full_path.c_str()); delete file; delete[] buffer; return retc; } COMMONTIMING("WRITE", &wr_timing); // Do the actual writing uint64_t offset = 0; uint64_t length = 0; int64_t nwrite; while (file_size > 0) { length = ((file_size > block_size) ? block_size : file_size); nwrite = file->fileWriteAsync(offset, buffer, length); if (nwrite != (int64_t)length) { eos_err("Failed while doing write at offset=%llu", offset); retc = -1; break; } offset += nwrite; file_size -= nwrite; } COMMONTIMING("WAIT_ASYNC", &wr_timing); // Collect all the write responses eos::fst::AsyncMetaHandler* ptr_handler = static_cast(file->fileGetAsyncHandler()); if (ptr_handler && (!ptr_handler->WaitOK())) { eos_err("Error while waiting for write async responses"); retc = -1; } COMMONTIMING("CLOSE", &wr_timing); retc += file->fileClose(); COMMONTIMING("END", &wr_timing); // Collect statistics for this operation in the result object at job level ResultProto& pb_result = result->GetPbResult(); float transaction_time = wr_timing.GetTagTimelapse("OPEN", "END"); pb_result.add_timestamp(GetTimestamp()); pb_result.add_opentime(wr_timing.GetTagTimelapse("OPEN", "WRITE")); pb_result.add_readtime(0); pb_result.add_readwaitasync(0); pb_result.add_writetime(wr_timing.GetTagTimelapse("WRITE", "WAIT_ASYNC")); pb_result.add_writewaitasync(wr_timing.GetTagTimelapse("WAIT_ASYNC", "CLOSE")); pb_result.add_closetime(wr_timing.GetTagTimelapse("CLOSE", "END")); pb_result.add_transactiontime(wr_timing.GetTagTimelapse("OPEN", "END")); pb_result.add_readspeed(0); pb_result.add_writespeed(Result::GetTransferSpeed((float)offset, transaction_time)); pb_result.add_readtotal(0); pb_result.add_writetotal(offset); delete file; delete[] buffer; return retc; } //------------------------------------------------------------------------------ // Read a file in gateway mode //------------------------------------------------------------------------------ int FileEos::ReadGw(Result*& result) { eos_debug("Calling function"); int retc = 0; char* buffer; std::vector vect_buff; int32_t total_buffs = 64; uint64_t file_size = mFileSize; uint32_t block_size = mBlockSize; eos::common::Timing rd_timing("rdgw"); // Allocate a pool of read buffers and then do round-robin for reading in them // so as to minimse the probability of two requests writing in the same buffer // at the same time for (int32_t i = 0; i < total_buffs; i++) { buffer = new char[block_size]; vect_buff.push_back(buffer); } // Open the file for reading and get and XrdFileIo object XrdSfsFileOpenMode flags_sfs = SFS_O_RDONLY; COMMONTIMING("OPEN", &rd_timing); std::string full_path = mBmkInstance; full_path += "/"; full_path += mFilePath; eos::fst::FileIo* file = eos::fst::FileIoPlugin::GetIoObject( full_path.c_str()); retc = file->fileOpen(flags_sfs, 0, "fst.readahead=true"); if (retc) { eos_err("Error while opening files: %s", full_path.c_str()); delete file; return retc; } COMMONTIMING("READ", &rd_timing); // Do the actual reading int32_t indx_buff = 0; uint64_t offset = 0; uint64_t length = 0; int64_t nread; while (file_size > 0) { length = ((file_size > block_size) ? block_size : file_size); nread = file->fileReadPrefetch(offset, vect_buff[indx_buff], length); offset += nread; file_size -= nread; indx_buff = (indx_buff + 1) % total_buffs; } COMMONTIMING("WAIT_ASYNC", &rd_timing); // Collect all the read responses eos::fst::AsyncMetaHandler* ptr_handler = static_cast(file->fileGetAsyncHandler()); if (ptr_handler && (!ptr_handler->WaitOK())) { eos_err("Error while waiting for read async responses"); retc = -1; } COMMONTIMING("CLOSE", &rd_timing); retc += file->fileClose(); COMMONTIMING("END", &rd_timing); // Collect statistics for this operation in the result object at thread level ResultProto& pb_result = result->GetPbResult(); float transaction_time = rd_timing.GetTagTimelapse("OPEN", "END"); pb_result.add_timestamp(GetTimestamp()); pb_result.add_opentime(rd_timing.GetTagTimelapse("OPEN", "READ")); pb_result.add_readtime(rd_timing.GetTagTimelapse("READ", "WAIT_ASYNC")); pb_result.add_readwaitasync(rd_timing.GetTagTimelapse("WAIT_ASYNC", "CLOSE")); pb_result.add_writetime(0); pb_result.add_writewaitasync(0); pb_result.add_closetime(rd_timing.GetTagTimelapse("CLOSE", "END")); pb_result.add_transactiontime(rd_timing.GetTagTimelapse("OPEN", "END")); pb_result.add_readspeed(Result::GetTransferSpeed((float)offset, transaction_time)); pb_result.add_writespeed(0); pb_result.add_readtotal(offset); pb_result.add_writetotal(0); //Free allocated memory for (uint32_t i = 0; i < vect_buff.size(); i++) { delete[] vect_buff[i]; } vect_buff.clear(); delete file; return retc; } //------------------------------------------------------------------------------ // Read a file in parallel IO mode //------------------------------------------------------------------------------ int FileEos::ReadPio(Result*& result) { eos_debug("Calling function"); int retc = 0; char* buffer; std::vector vect_buff; int32_t total_buffs = 64; uint64_t file_size = mFileSize; uint32_t block_size = mBlockSize; eos::common::Timing rd_timing("rdpio"); XrdCl::Buffer arg; XrdCl::Buffer* response = 0; XrdCl::XRootDStatus status; eos::fst::RainMetaLayout* file = 0; XrdSfsFileOpenMode flags_sfs = SFS_O_RDONLY; // open for read by default // Allocate a pool of read buffers and then do round-robin for reading in them // so as to minimse the probability of two requests writing in the same buffer // at the same time for (int32_t i = 0; i < total_buffs; i++) { buffer = new char[block_size]; vect_buff.push_back(buffer); } // Create an XrdCl::FileSystem object and do PIO request COMMONTIMING("OPEN", &rd_timing); XrdCl::URL url(mBmkInstance); if (!url.IsValid()) { cerr << "URL is invalid." << endl; return -1; } XrdCl::FileSystem* fs = new XrdCl::FileSystem(url); std::string request = mFilePath; request += "?mgm.pcmd=open"; arg.FromString(request); status = fs->Query(XrdCl::QueryCode::OpaqueFile, arg, response); if (status.IsOK()) { //.......................................................................... // Parse output //.......................................................................... XrdOucString tag; XrdOucString stripePath; std::vector stripeUrls; XrdOucString origResponse = response->GetBuffer(); XrdOucString stringOpaque = response->GetBuffer(); while (stringOpaque.replace("?", "&")) {} while (stringOpaque.replace("&&", "&")) {} XrdOucEnv* openOpaque = new XrdOucEnv(stringOpaque.c_str()); char* opaqueInfo = (char*) strstr(origResponse.c_str(), "&&mgm.logid"); if (opaqueInfo) { opaqueInfo += 2; eos::common::LayoutId::layoutid_t layout = openOpaque->GetInt("mgm.lid"); for (unsigned int i = 0; i <= eos::common::LayoutId::GetStripeNumber(layout); i++) { tag = "pio."; tag += static_cast(i); stripePath = "root://"; stripePath += openOpaque->Get(tag.c_str()); stripePath += "/"; stripePath += mFilePath.c_str(); stripeUrls.push_back(stripePath.c_str()); } if (eos::common::LayoutId::GetLayoutType(layout) == eos::common::LayoutId::kRaidDP) { file = new eos::fst::RaidDpLayout(NULL, layout, NULL, NULL, ""); } else if ((eos::common::LayoutId::IsRain(layout))) { file = new eos::fst::ReedSLayout(NULL, layout, NULL, NULL, ""); } else { eos_err("No such supported layout for PIO"); file = 0; } if (file) { retc = file->OpenPio(stripeUrls, flags_sfs, 0, opaqueInfo); if (retc) { eos_err("error=open PIO failed for path=%s", mFilePath.c_str()); //Free allocated memory for (uint32_t i = 0; i < vect_buff.size(); i++) { delete vect_buff[i]; } delete response; delete openOpaque; delete file; return retc; } } else { delete response; delete openOpaque; delete file; cout << "Falling back to read gw.(0)" << endl; return ReadGw(result); } } else { eos_err("error=opaque info not what we expected"); delete response; delete openOpaque; delete file; cout << "Falling back to read gw.(1)" << endl; return ReadGw(result); } } else { eos_warning("Failed to get PIO request falling back to GW mode"); //Free allocated memory for (uint32_t i = 0; i < vect_buff.size(); i++) { delete vect_buff[i]; } delete response; cout << "Falling back to read gw.(2)" << endl; return ReadGw(result); } COMMONTIMING("READ", &rd_timing); // Do the actual reading int32_t indx_buff = 0; uint64_t offset = 0; uint64_t length = 0; int64_t nread; while (file_size > 0) { length = ((file_size > block_size) ? block_size : file_size); nread = file->Read(offset, vect_buff[indx_buff], length); offset += nread; file_size -= nread; indx_buff = (indx_buff + 1) % total_buffs; } COMMONTIMING("CLOSE", &rd_timing); retc = file->Close(); COMMONTIMING("END", &rd_timing); // Collect statistics for this operation in the result object at thread level ResultProto& pb_result = result->GetPbResult(); float transaction_time = rd_timing.GetTagTimelapse("OPEN", "END"); pb_result.add_timestamp(GetTimestamp()); pb_result.add_opentime(rd_timing.GetTagTimelapse("OPEN", "READ")); pb_result.add_readtime(rd_timing.GetTagTimelapse("READ", "CLOSE")); pb_result.add_readwaitasync(0); pb_result.add_writetime(0); pb_result.add_writewaitasync(0); pb_result.add_closetime(rd_timing.GetTagTimelapse("CLOSE", "END")); pb_result.add_transactiontime(rd_timing.GetTagTimelapse("OPEN", "END")); pb_result.add_readspeed(Result::GetTransferSpeed((float)offset, transaction_time)); pb_result.add_writespeed(0); pb_result.add_readtotal(offset); pb_result.add_writetotal(0); //Free allocated memory for (uint32_t i = 0; i < vect_buff.size(); i++) { delete[] vect_buff[i]; } vect_buff.clear(); delete file; delete response; return retc; } //------------------------------------------------------------------------------ // Write and read file in gateway mode //------------------------------------------------------------------------------ int FileEos::ReadWriteGw(Result*& result) { eos_debug("Calling function"); int retc = 0; retc += Write(result); retc += ReadGw(result); return retc; } //------------------------------------------------------------------------------ // Write and read file in parallel IO mode //------------------------------------------------------------------------------ int FileEos::ReadWritePio(Result*& result) { eos_debug("Calling function"); int retc = 0; retc += Write(result); retc += ReadPio(result); return retc; } //------------------------------------------------------------------------------ // Get current timestamp as a string //------------------------------------------------------------------------------ std::string FileEos::GetTimestamp() { std::string time_str; char time_buff[1024]; time_t current_time; struct tm* tm; time(¤t_time); tm = localtime(¤t_time); sprintf(time_buff, "%02d/%02d/%04d %02d:%02d:%02d", tm->tm_mday, tm->tm_mon + 1, tm->tm_year + 1900, tm->tm_hour, tm->tm_min, tm->tm_sec); time_str = time_buff; return time_str; } EOSBMKNAMESPACE_END