//------------------------------------------------------------------------------ // File XrdFstOssFile.cc // Author Elvin-Alin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "fst/XrdFstOss.hh" #include "fst/XrdFstOssFile.hh" #include "fst/checksum/ChecksumPlugins.hh" #include "common/BufferManager.hh" #include #include #include #include namespace { eos::common::BufferManager gOssBuffMgr(16 * eos::common::MB, 1, 4 * eos::common::KB); } EOSFSTNAMESPACE_BEGIN #ifdef __APPLE__ #define O_LARGEFILE 0 #endif //! Pointer to the current OSS implementation to be used by the oss files extern XrdFstOss* XrdFstSS; //------------------------------------------------------------------------------ // Constuctor //------------------------------------------------------------------------------ XrdFstOssFile::XrdFstOssFile(const char* tid) : XrdOssDF(), eos::common::LogId(), mIsRW(false), mRWLockXs(0), mBlockXs(0), fdDirect(-1) {} //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ XrdFstOssFile::~XrdFstOssFile() { if (fd >= 0) { close(fd); } if (fdDirect >= 0) { close(fdDirect); } fd = -1; fdDirect = -1; } //------------------------------------------------------------------------------ // Open function //------------------------------------------------------------------------------ int XrdFstOssFile::Open(const char* path, int flags, mode_t mode, XrdOucEnv& env) { int newfd; const char* val = 0; unsigned long lid = 0; off_t booking_size = 0; eos_debug("path=%s", path); mPath = path; bool directIO = false; if (fd >= 0) { return -EBADF; } if ((val = env.Get("mgm.lid"))) { lid = atol(val); } if ((val = env.Get("mgm.bookingsize"))) { booking_size = strtoull(val, 0, 10); if (errno == ERANGE) { eos_err("error=invalid bookingsize in capability: %s", val); return -EINVAL; } } // add support for IO flags like synchronous or direct IO if ((val = env.Get("mgm.ioflag"))) { if (!strcmp(val, "direct")) { directIO = true; // flags |= O_DIRECT; } else { if (!strcmp(val, "sync")) { // data + meta data flags |= O_SYNC; } else { // data if (!strcmp(val, "dsync")) { flags |= O_DSYNC; } } } } if ((flags & (O_WRONLY | O_RDWR | O_CREAT | O_TRUNC)) != 0) { mIsRW = true; } if ((eos::common::LayoutId::GetBlockChecksum(lid) != eos::common::LayoutId::kNone) && (mPath[0] == '/')) { std::pair pair_value; pair_value = XrdFstSS->GetXsObj(path, mIsRW); mRWLockXs = pair_value.first; mBlockXs = pair_value.second; if (!mBlockXs) { auto xs_ptr = ChecksumPlugins::GetChecksumObject(lid, true); mBlockXs = xs_ptr.get(); // Management of the xs object lifetime is handled by the OSS class xs_ptr.release(); if (mBlockXs) { XrdOucString xs_path = mBlockXs->MakeBlockXSPath(mPath.c_str()); struct stat buf; int retc = XrdFstSS->Stat(mPath.c_str(), &buf); if (!mBlockXs->OpenMap(xs_path.c_str(), (retc ? booking_size : buf.st_size), eos::common::LayoutId::OssXsBlockSize, mIsRW)) { eos_err("error=unable to open blockxs file: %s", xs_path.c_str()); return -EIO; } //...................................................................... mRWLockXs = XrdFstSS->AddMapping(path, mBlockXs, mIsRW); } else { eos_err("error=unable to create the blockxs obj"); return -EIO; } } } do { #if defined(O_CLOEXEC) fd = open(path, flags | O_LARGEFILE | O_CLOEXEC, mode); #else fd = open(path, flags | O_LARGEFILE, mode); #endif } while ((fd < 0) && (errno == EINTR)); if (directIO) { do { #if defined(O_CLOEXEC) fdDirect = open(path, flags | O_DIRECT | O_LARGEFILE | O_CLOEXEC, mode); #else fdDirect = open(path, flags | O_DIRECT | O_LARGEFILE, mode); #endif } while ((fdDirect < 0) && (errno == EINTR)); } if (fd >= 0) { if (fd < XrdFstSS->mFdFence) { #if defined(__linux__) && defined(SOCK_CLOEXEC) && defined(O_CLOEXEC) // Relocate the file descriptor if need be and make sure file is closed // on exec if ((newfd = fcntl(fd, F_DUPFD_CLOEXEC, XrdFstSS->mFdFence)) < 0) { #else if ((newfd = fcntl(fd, F_DUPFD, XrdFstSS->mFdFence)) < 0) { #endif eos_err("error= unable to reloc FD for ", path); } else { close(fd); fd = newfd; } } } if (fdDirect >= 0) { if (fdDirect < XrdFstSS->mFdFence) { #if defined(__linux__) && defined(SOCK_CLOEXEC) && defined(O_CLOEXEC) // Relocate the file descriptor if need be and make sure file is closed // on exec if ((newfd = fcntl(fdDirect, F_DUPFD_CLOEXEC, XrdFstSS->mFdFence)) < 0) { #else if ((newfd = fcntl(fdDirect, F_DUPFD, XrdFstSS->mFdFence)) < 0) { #endif eos_err("error= unable to reloc FD for ", path); } else { close(fdDirect); fdDirect = newfd; } } } eos_debug("fd=%d fdDirect=%d flags=%x", fd, fdDirect, flags); return (fd < 0 ? fd : XrdOssOK); } //------------------------------------------------------------------------------ // Read //------------------------------------------------------------------------------ ssize_t XrdFstOssFile::Read(void* buffer, off_t offset, size_t length) { ssize_t retval = 0; ssize_t nread; std::vector pieces; std::shared_ptr start_piece, end_piece; eos_debug("off=%ji len=%ji", offset, length); if (fd < 0) { return static_cast(-EBADF); } if (!mBlockXs) { // If we don't have blockxs enabled then there is no point in aligning XrdOucIOVec piece = {(long long)offset, (int)length, 0, (char*)buffer}; pieces.push_back(piece); } else { // Align to the block checksum offset by possibly reading two extra // pieces in the beginning and/or at the end of the requested piece pieces = AlignBuffer(buffer, offset, length, start_piece, end_piece); } // Loop through all the pieces and read them in for (auto piece = pieces.begin(); piece != pieces.end(); ++piece) { int rfd = fd; if ((fdDirect >= 0)) { if ( (!(piece->offset % 512)) && (!(piece->size % 512))) { rfd = fdDirect; } else { // we don't want cache data, but we cannot use direct IO posix_fadvise(rfd, piece->offset, piece->size, POSIX_FADV_DONTNEED); } } do { nread = pread(rfd, piece->data, piece->size, piece->offset); } while ((nread < 0) && (errno == EINTR)); if (mBlockXs) { XrdSysRWLockHelper wr_lock(mRWLockXs, 0); if ((nread > 0) && (!mBlockXs->CheckBlockSum(piece->offset, piece->data, nread))) { eos_err("error=read block-xs error offset=%zu, length=%zu", piece->offset, piece->size); retval = -EIO; break; } } if (nread < 0) { eos_err("msg=\"failed read\" offset=%zu length=%zu", piece->offset, piece->size); retval = -EIO; break; } if (nread > 0) { char* ptr_buff, *ptr_piece; off_t off_copy; size_t len_copy; if (piece->offset < offset) { // Copy back begin edge ptr_buff = (char*)buffer; off_copy = offset - piece->offset; len_copy = std::min((size_t)(nread - off_copy), length); ptr_piece = piece->data + off_copy; ptr_buff = (char*)memcpy((void*)ptr_buff, ptr_piece, len_copy); retval += len_copy; } else if ((piece->offset >= offset) && (piece->offset + nread >= (ssize_t)(offset + length))) { // Copy back end edge len_copy = std::min((ssize_t)(offset + length - piece->offset), nread); ptr_buff = (char*)buffer + (piece->offset - offset); if (ptr_buff != piece->data) { ptr_buff = (char*)memcpy((void*)ptr_buff, piece->data, len_copy); } retval += len_copy; } else { retval += nread; } } } // Recycle any buffer used for the blockxs alignment gOssBuffMgr.Recycle(start_piece); gOssBuffMgr.Recycle(end_piece); if (retval > (ssize_t)length) { eos_err("msg=\"read more than requested\" ret=%ji length=%ju", retval, length); return -EIO; } return retval; } //------------------------------------------------------------------------------ // Align request to the blockchecksum offset so that the whole request is // checksummed //------------------------------------------------------------------------------ std::vector XrdFstOssFile::AlignBuffer(void* buffer, off_t offset, size_t length, std::shared_ptr& start_piece, std::shared_ptr& end_piece) { XrdOucIOVec piece; std::vector resp; resp.reserve(3); // worst case uint64_t blk_size = eos::common::LayoutId::OssXsBlockSize; off_t chunk_end = offset + length; off_t align_start = (offset / blk_size) * blk_size; off_t align_end = (chunk_end / blk_size) * blk_size; if (align_start < offset) { // Extra piece at the beginning start_piece = gOssBuffMgr.GetBuffer(eos::common::LayoutId::OssXsBlockSize); if (start_piece == nullptr) { throw std::bad_alloc(); } piece = {(long long) align_start, (int) blk_size, 0, start_piece->GetDataPtr() }; resp.push_back(piece); align_start += blk_size; } // Add rest of pieces if this was not all if (align_start < chunk_end) { if (align_start != align_end) { // Add the main piece char* ptr_buff = (char*)buffer + (align_start - offset); piece = {(long long) align_start, (int)(align_end - align_start), 0, ptr_buff }; resp.push_back(piece); } if (((off_t)align_end < chunk_end) && ((off_t)(align_end + blk_size) > chunk_end)) { // Extra piece at the end end_piece = gOssBuffMgr.GetBuffer(eos::common::LayoutId::OssXsBlockSize); if (end_piece == nullptr) { throw std::bad_alloc(); } piece = {(long long) align_end, (int) blk_size, 0, end_piece->GetDataPtr() }; resp.push_back(piece); } } return resp; } //------------------------------------------------------------------------------ // Read raw //------------------------------------------------------------------------------ ssize_t XrdFstOssFile::ReadRaw(void* buffer, off_t offset, size_t length) { return Read(buffer, offset, length); } //------------------------------------------------------------------------------ // Write //------------------------------------------------------------------------------ ssize_t XrdFstOssFile::ReadV(XrdOucIOVec* readV, int n) { ssize_t rdsz; ssize_t totBytes = 0; #if defined(__linux__) long long begOff, endOff, begLst = -1, endLst = -1; int nPR = n; // Indicate we are in preread state and see if we have exceeded the limit if ((fdDirect == -1) && XrdFstSS->mPrDepth && ((XrdFstSS->mPrActive++) < XrdFstSS->mPrQSize) && (n > 2)) { int faBytes = 0; for (nPR = 0; (nPR < XrdFstSS->mPrDepth) && (faBytes < XrdFstSS->mPrBytes); nPR++) if (readV[nPR].size > 0) { begOff = XrdFstSS->mPrPMask & readV[nPR].offset; endOff = XrdFstSS->mPrPBits | (readV[nPR].offset + readV[nPR].size); rdsz = endOff - begOff + 1; if ((begOff > endLst || endOff < begLst) && (rdsz < XrdFstSS->mPrBytes)) { (void) posix_fadvise(fd, begOff, rdsz, POSIX_FADV_WILLNEED); eos_debug("fadvise fd=%i off=%lli len=%ji", fd, begOff, rdsz); faBytes += rdsz; } begLst = begOff; endLst = endOff; } } #endif // Read in the vector and do a pre-advise if we support that for (int i = 0; i < n; i++) { // Use normal block read since it also does the blockxs and we have the // guarantee that the previous advice was issued for the full block to // be read even with the 4K alignment since fadvice does this on its own rdsz = Read(readV[i].data, readV[i].offset, readV[i].size); if (rdsz < 0 || rdsz != readV[i].size) { totBytes = (rdsz < 0 ? -errno : -ESPIPE); break; } totBytes += rdsz; #if defined(__linux__) if (nPR < n && readV[nPR].size > 0) { begOff = XrdFstSS->mPrPMask & readV[nPR].offset; endOff = XrdFstSS->mPrPBits | (readV[nPR].offset + readV[nPR].size); rdsz = endOff - begOff + 1; if ((begOff > endLst || endOff < begLst) && rdsz <= XrdFstSS->mPrBytes) { posix_fadvise(fd, begOff, rdsz, POSIX_FADV_WILLNEED); eos_debug("fadvise fd=%i off=%lli len=%ji", fd, begOff, rdsz); } begLst = begOff; endLst = endOff; } nPR++; #endif } // All done, return bytes read. #if defined(__linux__) if (XrdFstSS->mPrDepth) { XrdFstSS->mPrActive--; } #endif return totBytes; } //------------------------------------------------------------------------------ // Vector write //------------------------------------------------------------------------------ ssize_t XrdFstOssFile::WriteV(XrdOucIOVec* writeV, int n) { ssize_t nbytes = 0; ssize_t curCount = 0; for (int i = 0; i < n; i++) { curCount = Write((void*)writeV[i].data, (off_t)writeV[i].offset, (size_t)writeV[i].size); if (curCount != writeV[i].size) { if (curCount < 0) { return curCount; } return -ESPIPE; } nbytes += curCount; } return nbytes; } //------------------------------------------------------------------------------ // Chmod function //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ ssize_t XrdFstOssFile::Write(const void* buffer, off_t offset, size_t length) { ssize_t retval; if (fd < 0) { return static_cast(-EBADF); } if ((fdDirect >= 0) && (!(offset % 512)) && (!(length % 512))) { // tell the kernel to drop cache pages for the buffered fd posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); // try direct IO do { retval = pwrite(fdDirect, buffer, length, offset); } while ((retval < 0) && (errno == EINTR)); } else { // buffered IO do { retval = pwrite(fd, buffer, length, offset); } while ((retval < 0) && (errno == EINTR)); if ((retval > 0) && (fdDirect >= 0)) { // force the data flush out of the buffer cache if (fdatasync(fd)) { retval = -1; } } } if ((retval > 0) && mBlockXs) { XrdSysRWLockHelper wr_lock(mRWLockXs, 0); mBlockXs->AddBlockSum(offset, static_cast(buffer), retval); } return (retval >= 0 ? retval : static_cast(-errno)); } //------------------------------------------------------------------------------ // Get file status //------------------------------------------------------------------------------ int XrdFstOssFile::Fchmod(mode_t mode) { return (fchmod(fd, mode) ? -errno : XrdOssOK); } //------------------------------------------------------------------------------ // Sync file to local disk //------------------------------------------------------------------------------ int XrdFstOssFile::Fstat(struct stat* statinfo) { return (fstat(fd, statinfo) ? -errno : XrdOssOK); } //------------------------------------------------------------------------------ // Fsync the file //------------------------------------------------------------------------------ int XrdFstOssFile::Fsync() { return (fsync(fd) ? -errno : XrdOssOK); } //............................................................................ // Note that space adjustment will occur when the file is closed, not here //............................................................................ int XrdFstOssFile::Ftruncate(unsigned long long flen) { off_t newlen = flen; if ((sizeof(newlen) < sizeof(flen)) && (flen >> 31)) { return -EOVERFLOW; } return (ftruncate(fd, newlen) ? -errno : XrdOssOK); } //------------------------------------------------------------------------------ // Get file descriptor //------------------------------------------------------------------------------ int XrdFstOssFile::getFD() { return fd; } //------------------------------------------------------------------------------ // Close function //------------------------------------------------------------------------------ int XrdFstOssFile::Close(long long* retsz) { bool delete_mapping = false; bool unlinked = false; if (fd < 0) { return -EBADF; } //............................................................................ // Code dealing with block checksums //............................................................................ if (mBlockXs) { struct stat statinfo; if ((XrdFstSS->Stat(mPath.c_str(), &statinfo))) { eos_err("error=close - cannot stat unlinked file: %s", mPath.c_str()); unlinked = true; } XrdSysRWLockHelper wr_lock(mRWLockXs, 0); // ---> wrlock xs obj mBlockXs->DecrementRef(mIsRW); if (mBlockXs->GetTotalRef() >= 1) { //........................................................................ // If multiple references //........................................................................ if ((mBlockXs->GetNumRef(true) == 0) && mIsRW) { //...................................................................... // If one last writer and this is the current one //...................................................................... if (! unlinked) { if (!mBlockXs->ChangeMap(statinfo.st_size, true)) { eos_err("error=unable to change block checksum map for file %s", mPath.c_str()); } else { eos_info("info=\"adjusting block XS map\""); } if (!mBlockXs->AddBlockSumHoles(getFD())) { eos_warning("warning=unable to fill holes of block checksum map for file %s", mPath.c_str()); } } } } else { //........................................................................ // Just one reference left (the current one) //........................................................................ if (mIsRW && !unlinked) { if (!mBlockXs->ChangeMap(statinfo.st_size, true)) { eos_err("error=Unable to change block checksum map for file %s", mPath.c_str()); } else { eos_info("info=\"adjusting block XS map\""); } if (!mBlockXs->AddBlockSumHoles(getFD())) { eos_warning("warning=unable to fill holes of block checksum map for file %s", mPath.c_str()); } } if (!mBlockXs->CloseMap()) { eos_err("error=unable to close block checksum map for file %s", mPath.c_str()); } delete_mapping = true; } } //............................................................................ if (delete_mapping) { eos_debug("Delete entry from oss map for file %s", mPath.c_str()); XrdFstSS->DropXs(mPath.c_str()); } else { eos_debug("No delete from oss map for file %s", mPath.c_str()); } if (unlinked) { close(fd); fd = -1; if (fdDirect >= 0) { close(fdDirect); fdDirect = -1; } return -EIO; } //............................................................................ if (close(fd)) { if (fdDirect >= 0) { close(fdDirect); } return -errno; } if (fdDirect >= 0) { if (close(fdDirect)) { return -errno; } } fd = -1; fdDirect = -1; return XrdOssOK; } EOSFSTNAMESPACE_END