//------------------------------------------------------------------------------
//! @file data.cc
//! @author Andreas-Joachim Peters CERN
//! @brief data handling class
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "data/data.hh"
#include "kv/kv.hh"
#include "eosfuse.hh"
#include "data/cachesyncer.hh"
#include "data/journalcache.hh"
#include "data/xrdclproxy.hh"
#include "misc/MacOSXHelper.hh"
#include "misc/fusexrdlogin.hh"
#include "misc/filename.hh"
#include "common/Logging.hh"
#include "common/SymKeys.hh"
#include
#include
bufferllmanager data::datax::sBufferManager;
std::string data::datax::kInlineAttribute = "sys.file.buffer";
std::string data::datax::kInlineMaxSize = "sys.file.inline.maxsize";
std::string data::datax::kInlineCompressor = "sys.file.inline.compressor";
/* -------------------------------------------------------------------------- */
data::data()
/* -------------------------------------------------------------------------- */
{
}
/* -------------------------------------------------------------------------- */
data::~data()
/* -------------------------------------------------------------------------- */
{
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::init()
/* -------------------------------------------------------------------------- */
{
// configure the ra,rd,wr buffer sizes
XrdCl::Proxy::sRaBufferManager.configure(16,
cachehandler::instance().get_config().default_read_ahead_size,
cachehandler::instance().get_config().max_inflight_read_ahead_buffer_size);
XrdCl::Proxy::sWrBufferManager.configure(128,
128 * 1024,
cachehandler::instance().get_config().max_inflight_write_buffer_size);
datamap.run();
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::terminate(uint64_t seconds)
/* -------------------------------------------------------------------------- */
{
if (datamap.waitflush(seconds)) {
datamap.join();
}
}
/* -------------------------------------------------------------------------- */
data::shared_data
/* -------------------------------------------------------------------------- */
data::get(fuse_req_t req,
fuse_ino_t ino,
metad::shared_md md)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
shared_data io = datamap[ino];
io->attach(); // client ref counting
return io;
} else {
// protect against running out of file descriptors
size_t openfiles = datamap.size();
size_t openlimit = (EosFuse::Instance().Config().options.fdlimit - 128) / 2;
while ((openfiles = datamap.size()) > openlimit) {
datamap.UnLock();
eos_static_warning("open-files=%lu limit=%lu - waiting for release of file descriptors",
openfiles, openlimit);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
datamap.Lock();
}
if (datamap.count(ino)) {
// might have been created in the meanwhile
shared_data io = datamap[ino];
io->attach(); // client ref counting
return io;
} else {
shared_data io = std::make_shared(md);
io->set_id(ino, req);
datamap[(fuse_ino_t) io->id()] = io;
io->attach();
return io;
}
}
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
data::has(fuse_ino_t ino, bool checkwriteopen)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
if (checkwriteopen) {
if (datamap[ino]->flags() & (O_RDWR | O_WRONLY)) {
return true;
} else {
return false;
}
} else {
return true;
}
} else {
return false;
}
}
/* -------------------------------------------------------------------------- */
std::string
/* -------------------------------------------------------------------------- */
data::url(fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
std::string p;
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
p = datamap[ino]->fullpath();
while (p.find("//") != std::string::npos) {
p.replace(p.find("//"), p.size(), "/");
}
p += " [";
p += datamap[ino]->url(true);
p += " ]";
}
return p;
}
/* -------------------------------------------------------------------------- */
metad::shared_md
data::retrieve_wr_md(fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
// return the shared_md boject if this is a writer
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
if (datamap[ino]->flags() & (O_RDWR | O_WRONLY)) {
return datamap[ino]->md();
}
}
return nullptr;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::release(fuse_req_t req,
fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
shared_data io = datamap[ino];
io->detach();
// the object is cleaned by the flush thread
}
if (datamap.count(ino + 0xffffffff)) {
// in case this is an unlinked object
shared_data io = datamap[ino + 0xffffffff];
io->detach();
}
}
void
/* -------------------------------------------------------------------------- */
data::update_cookie(uint64_t ino, std::string& cookie)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
shared_data io = datamap[ino];
io->attach(); // client ref counting
io->store_cookie(cookie);
io->detach();
}
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::invalidate_cache(fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
shared_data io = datamap[ino];
io->attach(); // client ref counting
io->cache_invalidate();
io->detach();
}
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::unlink(fuse_req_t req, fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
bool has_data = false;
shared_data datap;
{
XrdSysMutexHelper mLock(datamap);
has_data = datamap.count(ino);
if (has_data) {
datap = datamap[ino];
}
}
if (has_data) {
{
XrdSysMutexHelper helper(datap->Locker());
// wait for open in flight to be done
datap->WaitOpen();
datap->unlink(req);
}
// put the unlinked inode in a high bucket, will be removed by the flush thread
{
XrdSysMutexHelper mLock(datamap);
if (datamap.count(ino)) {
datamap[ino + 0xffffffff] = datamap[ino];
datamap.erase(ino);
eos_static_info("datacache::unlink size=%lu", datamap.size());
}
}
} else {
shared_data io = std::make_shared();
io->set_id(ino, req);
io->unlink(req);
}
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::flush(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
eos_info("");
XrdSysMutexHelper lLock(mLock);
bool flush_wait_open = false;
if (mFlags & O_CREAT) {
flush_wait_open = (EosFuse::Instance().Config().options.flush_wait_open ==
EosFuse::Instance().Config().options.kWAIT_FLUSH_ON_CREATE) ? true : false;
if ((!flush_wait_open) &&
((*mMd)()->size() >=
EosFuse::Instance().Config().options.flush_wait_open_size)) {
flush_wait_open = true;
}
if (EosFuse::Instance().Config().options.nowait_flush_executables.size()) {
if (!filename::matches_suffix(fusexrdlogin::executable(req),
EosFuse::Instance().Config().options.nowait_flush_executables)) {
eos_notice("flush-wait-open: forced for exec=%s",
fusexrdlogin::executable(req).c_str());
flush_wait_open = true;
}
}
} else {
flush_wait_open = (EosFuse::Instance().Config().options.flush_wait_open !=
EosFuse::Instance().Config().options.kWAIT_FLUSH_NEVER) ? true : false;
}
if (EOS_LOGS_DEBUG) {
eos_notice("flush-wait-open: %d size=%lu exec=%s\n", flush_wait_open,
(*mMd)()->size(), fusexrdlogin::executable(req).c_str());
}
return flush_nolock(req, flush_wait_open, false);
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::flush_nolock(fuse_req_t req, bool wait_open, bool wait_writes)
/* -------------------------------------------------------------------------- */
{
eos_info("");
set_shared_url();
bool journal_recovery = false;
errno = 0;
if (mFile->journal() && mFile->has_xrdiorw(req)) {
eos_info("flushing journal");
ssize_t truncate_size = mFile->journal()->get_truncatesize();
if (wait_open) {
// wait atleast that we could open that file
mFile->xrdiorw(req)->WaitOpen();
// set again the shared url now, since we know where we are
set_shared_url();
}
if ((truncate_size != -1)
|| (wait_writes && mFile->journal()->size())) {
// if there is a truncate to be done, we have to wait for the writes and truncate
// if we are asked to wait for writes (when pwrite sees a journal full) we free the journal
for (auto it = mFile->get_xrdiorw().begin();
it != mFile->get_xrdiorw().end(); ++it) {
XrdCl::XRootDStatus status = it->second->WaitOpen();
if (!status.IsOK()) {
if (status.errNo == kXR_overQuota) {
eos_crit("flush error errno=%d", XrdCl::Proxy::status2errno(status));
return XrdCl::Proxy::status2errno(status);
}
journal_recovery = true;
eos_err("file not open");
}
status = it->second->WaitWrite();
if (!status.IsOK()) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='will TryRecovery'",
status.ToString().c_str()));
journal_recovery = true;
eos_err("write error error=%s", status.ToStr().c_str());
}
}
ssize_t truncate_size = mFile->journal()->get_truncatesize();
if (!journal_recovery && (truncate_size != -1)) {
// the journal might have a truncation size indicated, so we need to run a sync truncate in the end
XrdCl::XRootDStatus status = mFile->xrdiorw(req)->Truncate(truncate_size);
if (!status.IsOK()) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='will TryRecovery'",
status.ToString().c_str()));
journal_recovery = true;
eos_err("truncation failed");
}
}
if (simulate_write_error_in_flush()) {
// force a 'fake' repair now for testing purposes
journal_recovery = true;
}
if (journal_recovery) {
eos_debug("try recovery");
int rc = 0;
if ((rc = TryRecovery(req, true))) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"errno='%d' hint='failed TryRecovery'",
rc));
eos_err("journal-flushing recovery failed rc=%d", rc);
return rc;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='success TryRecovery'"));
if ((rc = journalflush(req))) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"errno='%d' hint='failed journalflush'",
rc));
eos_err("journal-flushing failed rc=%d", rc);
return rc;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='success journalflush'"));
}
}
}
// truncate the journal
if (mFile->journal()->reset()) {
char msg[1024];
snprintf(msg, sizeof(msg), "journal reset failed - ino=%#lx errno=%d %s", id(),
errno, mFile->journal()->dump().c_str());
eos_crit("%s", msg);
throw std::runtime_error(msg);
}
mFile->journal()->done_flush();
}
}
// check if the open failed
XrdCl::shared_proxy proxy = mFile->has_xrdiorw(req) ? mFile->xrdiorw(
req) : nullptr;
if (proxy) {
if (proxy->state() == XrdCl::Proxy::FAILED) {
int rc = 0;
eos_debug("try recovery");
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='XrdCl::Proxy::FAILED' hint='will TryRecovery'"));
if ((rc = TryRecovery(req, true))) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"errno='%d' hint='failed TryRecovery'",
rc));
eos_err("remote open failed - returning %d", rc);
return rc;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='success TryRecovery'"));
if ((rc = journalflush(req))) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"errno='%d' hint='failed journalflush'",
rc));
eos_err("journal-flushing failed");
return rc;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='success journalflush'"));
// truncate the journal
if (mFile->journal()->reset()) {
char msg[1024];
snprintf(msg, sizeof(msg), "journal reset failed - ino=%#lx errno=%d %s", id(),
errno, mFile->journal()->dump().c_str());
eos_crit("%s", msg);
throw std::runtime_error(msg);
}
mFile->journal()->done_flush();
}
}
}
}
eos_info("retc=0");
return 0;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::journalflush(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
// call this with a mLock locked
eos_info("");
XrdCl::XRootDStatus status = mFile->xrdiorw(req)->WaitOpen();
// we have to push the journal now
if (!status.IsOK()) {
eos_err("async journal-cache-wait-open failed - ino=%#lx", id());
errno = XrdCl::Proxy::status2errno(status);
return errno;
}
eos_info("syncing cache");
cachesyncer cachesync(*((XrdCl::File*)mFile->xrdiorw(req).get()));
if (!mFile->journal()) {
// no journal this has to fail
errno = EOPNOTSUPP;
return errno;
}
if ((mFile->journal())->remote_sync(cachesync)) {
eos_err("async journal-cache-sync failed - ino=%#lx", id());
return EREMOTEIO;
}
eos_info("retc=0");
return 0;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::journalflush(std::string cid)
/* -------------------------------------------------------------------------- */
{
// call this with a mLock locked
eos_info("");
XrdCl::XRootDStatus status = mFile->xrdiorw(cid)->WaitOpen();
// we have to push the journal now
if (!status.IsOK()) {
eos_err("async journal-cache-wait-open failed - ino=%#lx", id());
errno = XrdCl::Proxy::status2errno(status);
return errno;
}
if (mFile->journal()) {
eos_info("syncing cache");
cachesyncer cachesync(*((XrdCl::File*)mFile->xrdiorw(cid).get()));
if ((mFile->journal())->remote_sync(cachesync)) {
eos_err("async journal-cache-sync failed - ino=%#lx", id());
return EREMOTEIO;
}
}
eos_info("retc=0");
return 0;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
data::datax::is_wopen(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mLock);
return mFile->xrdiorw(req)->IsOpen();
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::journalflush_async(std::string cid)
/* -------------------------------------------------------------------------- */
{
// call this with a mLock locked
eos_info("");
// we have to push the journal now
if (!mFile->xrdiorw(cid)->WaitOpen().IsOK()) {
eos_err("async journal-cache-wait-open failed - ino=%#lx", id());
return -1;
}
if (mFile->journal()) {
eos_info("syncing cache asynchronously");
if ((mFile->journal())->remote_sync_async(mFile->xrdiorw(cid))) {
eos_err("async journal-cache-sync-async failed - ino=%#lx", id());
return -1;
}
}
eos_info("retc=0");
return 0;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::datax::set_id(uint64_t ino, fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
mIno = ino;
mReq = req;
mFile = cachehandler::instance().get(ino);
char lid[64];
snprintf(lid, sizeof(lid), "logid:ino:%016lx", ino);
SetLogId(lid);
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::attach(fuse_req_t freq, std::string& cookie, int flags)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mLock);
bool isRW = false;
bool add_O_SYNC = false;
bool add_O_CREAT = false;
if (mFlags & O_SYNC) {
// preserve the sync flag
add_O_SYNC = true;
}
if (mFlags & O_CREAT) {
// preserve the creat flag
add_O_CREAT = true;
}
mFlags = flags;
if (add_O_SYNC) {
mFlags |= O_SYNC;
}
if (add_O_CREAT) {
mFlags |= O_CREAT;
}
if (mFlags & O_CREAT) {
mFlags |= O_RDWR;
}
// check for file inlining only for the first attach call
if ((!inline_buffer) && (EosFuse::Instance().Config().inliner.max_size ||
mMd->inlinesize())) {
if (mMd->inlinesize()) {
mInlineMaxSize = mMd->inlinesize();
} else {
mInlineMaxSize = EosFuse::Instance().Config().inliner.max_size;
}
auto attrMap = (*mMd)()->attr();
if (attrMap.count(kInlineMaxSize)) {
mInlineMaxSize = strtoull(attrMap[kInlineMaxSize].c_str(), 0, 10);
}
if (attrMap.count(kInlineCompressor)) {
mInlineCompressor = attrMap[kInlineCompressor];
} else {
mInlineCompressor = EosFuse::Instance().Config().inliner.default_compressor;
}
eos_debug("inline-size=%llu inline-compressor=%s", mInlineMaxSize,
mInlineCompressor.c_str());
// reserve buffer for inlining
inline_buffer = std::make_shared(mInlineMaxSize,
mInlineMaxSize);
mIsInlined = true;
if (attrMap.count(kInlineAttribute)) {
std::string base64_string(attrMap[kInlineAttribute].c_str(),
attrMap[kInlineAttribute].size());
std::string raw_string;
bool decoding = false;
if (base64_string.substr(0, 8) == "zbase64:") {
SymKey::ZDeBase64(base64_string, raw_string);
decoding = true;
} else if (base64_string.substr(0, 7) == "base64:") {
SymKey::DeBase64(base64_string, raw_string);
decoding = true;
}
if (decoding) {
// decode attribute to buffer
inline_buffer->writeData(raw_string.c_str(), 0, raw_string.size());
// in case there is any inconsistency between size and attribute buffer, just ignore this one
if (raw_string.size() != (*mMd)()->size()) {
inline_buffer = 0;
// delete the inline buffer
((*mMd)()->mutable_attr())->erase(kInlineAttribute);
mIsInlined = false;
}
} else {
mIsInlined = false;
}
} else {
if ((*mMd)()->size()) {
mIsInlined = false;
}
}
}
if (flags & (O_CREAT | O_RDWR | O_WRONLY)) {
isRW = true;
}
eos_info("cookie=%s flags=%o isrw=%d md-size=%d %s", cookie.c_str(), flags,
isRW, (*mMd)()->size(),
isRW ? mRemoteUrlRW.c_str() : mRemoteUrlRO.c_str());
// store the currently known size here
mSize = (*mMd)()->size();
// set write error simulation flags
if ((*mMd)()->name().find("#err_sim_flush#") != std::string::npos) {
eos_crit("enabling error simulation on flush");
mSimulateWriteErrorInFlush = true;
} else if ((*mMd)()->name().find("#err_sim_flusher#") != std::string::npos) {
eos_crit("enabling error simulation on flusher");
mSimulateWriteErrorInFlusher = true;
}
if ((flags & O_SYNC) ||
((time(NULL) - (*mMd)()->bc_time()) <
EosFuse::Instance().Config().options.nocache_graceperiod)) {
mFile->disable_caches();
}
int bcache = mFile->file() ? mFile->file()->attach(freq, cookie, isRW) : 0;
int jcache = mFile->journal() ? ((isRW ||
(mFlags & O_CACHE)) ? mFile->journal()->attach(freq, cookie,
flags) : 0) : 0;
if (bcache < 0) {
char msg[1024];
snprintf(msg, sizeof(msg), "attach to cache failed - ino=%#lx errno=%d", id(),
errno);
eos_crit("%s", msg);
throw std::runtime_error(msg);
}
if (jcache < 0) {
char msg[1024];
snprintf(msg, sizeof(msg), "attach to journal failed - ino=%#lx errno=%d", id(),
errno);
eos_crit("%s", msg);
throw std::runtime_error(msg);
}
if (isRW) {
if (!mFile->has_xrdiorw(freq) || mFile->xrdiorw(freq)->IsClosing() ||
mFile->xrdiorw(freq)->IsClosed()) {
if (mFile->has_xrdiorw(freq) && (mFile->xrdiorw(freq)->IsClosing() ||
mFile->xrdiorw(freq)->IsClosed())) {
mFile->xrdiorw(freq)->WaitClose();
mFile->xrdiorw(freq)->attach();
} else {
// attach an rw io object
mFile->set_xrdiorw(freq, XrdCl::Proxy::Factory());
mFile->xrdiorw(freq)->attach();
mFile->xrdiorw(freq)->set_id(id(), req());
}
XrdCl::OpenFlags::Flags targetFlags = XrdCl::OpenFlags::Update;
XrdCl::Access::Mode mode = XrdCl::Access::UR | XrdCl::Access::UW |
XrdCl::Access::UX;
mFile->xrdiorw(freq)->OpenAsync(mFile->xrdiorw(freq), mRemoteUrlRW.c_str(),
targetFlags, mode, 0);
} else {
if (mFile->xrdiorw(freq)->IsWaitWrite()) {
// re-open the file in the state machine
mFile->xrdiorw(freq)->set_state_TS(XrdCl::Proxy::OPENED);
}
mFile->xrdiorw(freq)->attach();
}
// when someone attaches a writer, we have to drop all the read-ahead buffers because we might get stale information in readers
for (auto fit = mFile->get_xrdioro().begin();
fit != mFile->get_xrdioro().end(); ++fit) {
if (fit->second->IsOpen()) {
fit->second->DropReadAhead();
}
}
} else {
if (!mFile->has_xrdioro(freq) || mFile->xrdioro(freq)->IsClosing() ||
mFile->xrdioro(freq)->IsClosed()) {
if (mFile->has_xrdioro(freq) && (mFile->xrdioro(freq)->IsClosing() ||
mFile->xrdioro(freq)->IsClosed())) {
mFile->xrdioro(freq)->WaitClose();
mFile->xrdioro(freq)->attach();
} else {
mFile->set_xrdioro(freq, XrdCl::Proxy::Factory());
mFile->xrdioro(freq)->attach();
mFile->xrdioro(freq)->set_id(id(), req());
if (!(flags & O_SYNC)) {
if (EOS_LOGS_DEBUG)
eos_debug("readhead: strategy=%s nom:%lu max:%lu sparse-ratio:%.01f",
cachehandler::instance().get_config().read_ahead_strategy.c_str(),
cachehandler::instance().get_config().default_read_ahead_size,
cachehandler::instance().get_config().max_read_ahead_size,
cachehandler::instance().get_config().read_ahead_sparse_ratio);
mFile->xrdioro(freq)->set_readahead_strategy(
XrdCl::Proxy::readahead_strategy_from_string(
cachehandler::instance().get_config().read_ahead_strategy),
4096,
cachehandler::instance().get_config().default_read_ahead_size,
cachehandler::instance().get_config().max_read_ahead_size,
cachehandler::instance().get_config().max_read_ahead_blocks,
cachehandler::instance().get_config().read_ahead_sparse_ratio
);
mFile->xrdioro(freq)->set_readahead_maximum_position(mSize);
}
}
XrdCl::OpenFlags::Flags targetFlags = XrdCl::OpenFlags::Read;
XrdCl::Access::Mode mode = XrdCl::Access::UR | XrdCl::Access::UX;
// we might need to wait for a creation to go through
WaitOpen();
mFile->xrdioro(freq)->OpenAsync(mFile->xrdioro(freq), mRemoteUrlRO.c_str(),
targetFlags, mode, 0);
} else {
if (mFile->has_xrdiorw(freq)) {
// we have to drop all existing read-ahead buffers to avoid reading outdated buffers
mFile->xrdioro(freq)->DropReadAhead();
}
mFile->xrdioro(freq)->attach();
}
}
return bcache | jcache;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
data::datax::inline_file(ssize_t size)
{
XrdSysMutexHelper lLock(mLock);
if (size == -1) {
size = (*mMd)()->size();
}
if (inlined() && inline_buffer) {
if ((size_t) size <= mInlineMaxSize) {
// rewrite the extended attribute
std::string raw_string(inline_buffer->ptr(), size);
std::string base64_string;
if (mInlineCompressor == "zlib") {
SymKey::ZBase64(raw_string, base64_string);
} else {
SymKey::Base64(raw_string, base64_string);
}
(*((*mMd)()->mutable_attr()))[kInlineAttribute] = base64_string;
(*((*mMd)()->mutable_attr()))[kInlineMaxSize] = std::to_string(mInlineMaxSize);
(*((*mMd)()->mutable_attr()))[kInlineCompressor] = mInlineCompressor;
return true;
} else {
// remove the extended attribute
((*mMd)()->mutable_attr())->erase(kInlineAttribute);
mIsInlined = false;
return false;
}
}
return false;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
data::datax::prefetch(fuse_req_t req, bool lock)
/* -------------------------------------------------------------------------- */
{
size_t file_size = (*mMd)()->size();
eos_info("handler=%d file=%lx size=%lu md-size=%lu", mPrefetchHandler ? 1 : 0,
mFile ? mFile->file() : 0,
mFile ? mFile->file() ? mFile->file()->size() : 0 : 0,
file_size);
if (mFile && mFile->has_xrdiorw(req)) {
// never prefetch on a wr open file
return true;
}
if (inlined()) {
// never prefetch an inlined file
return true;
}
if (lock) {
mLock.Lock();
}
if (!mPrefetchHandler && mFile->file() && !mFile->file()->size() && file_size) {
XrdCl::shared_proxy proxy = mFile->has_xrdioro(req) ? mFile->xrdioro(
req) : mFile->xrdiorw(req);
if (proxy) {
XrdCl::XRootDStatus status;
size_t prefetch_size = std::min((size_t) file_size,
(size_t) mFile->file()->prefetch_size());
// try to send an async read request
mPrefetchHandler = proxy->ReadAsyncPrepare(proxy, 0, prefetch_size, false);
bool nobuffer = false;
if (mPrefetchHandler->valid()) {
status = proxy->PreReadAsync(0, prefetch_size, mPrefetchHandler, 0);
} else {
// no free IO buffer
XrdCl::XRootDStatus newstatus(XrdCl::stFatal,
0,
XrdCl::errOSError,
"no free read-ahead buffer"
);
status = newstatus;
nobuffer = true;
}
if (!status.IsOK()) {
if (!nobuffer) {
eos_err("pre-fetch failed error=%s", status.ToStr().c_str());
}
mPrefetchHandler = 0;
} else {
// instruct the read-ahead handler where to start
proxy->set_readahead_position(prefetch_size);
}
}
}
if (lock) {
mLock.UnLock();
}
return mPrefetchHandler ? true : false;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::datax::WaitPrefetch(fuse_req_t req, bool lock)
/* -------------------------------------------------------------------------- */
{
eos_info("");
if (lock) {
mLock.Lock();
}
size_t file_size = (*mMd)()->size();
if (mPrefetchHandler && mFile->file()) {
XrdCl::shared_proxy proxy = mFile->has_xrdioro(req) ? mFile->xrdioro(
req) : mFile->xrdiorw(req);
if (mPrefetchHandler && proxy) {
XrdCl::XRootDStatus status = proxy->WaitRead(mPrefetchHandler);
if (status.IsOK()) {
eos_info("pre-read done with size=%lu md-size=%lu",
mPrefetchHandler->vbuffer().size(), file_size);
if ((mPrefetchHandler->vbuffer().size() == file_size) && mFile->file()) {
ssize_t nwrite = mFile->file()->pwrite(mPrefetchHandler->buffer(),
mPrefetchHandler->vbuffer().size(), 0);
eos_debug("nwb=%lu to local cache", nwrite);
}
} else {
eos_err("pre-read failed error=%s", status.ToStr().c_str());
}
}
}
if (lock) {
mLock.UnLock();
}
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::datax::WaitOpen()
{
// make sure there is no pending remote open.
// this has to be done to avoid opening a file for read, which is not yet
// created.
for (auto fit = mFile->get_xrdiorw().begin();
fit != mFile->get_xrdiorw().end(); ++fit) {
if (fit->second->IsOpening()) {
eos_info("status=pending url=%s", fit->second->url().c_str());
fit->second->WaitOpen();
eos_info("status=final url=%s", fit->second->url().c_str());
} else {
eos_info("status=final url=%s", fit->second->url().c_str());
}
}
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::datax::FlagDeleted()
{
for (auto fit = mFile->get_xrdiorw().begin();
fit != mFile->get_xrdiorw().end(); ++fit) {
fit->second->setDeleted();
}
for (auto fit = mFile->get_xrdioro().begin();
fit != mFile->get_xrdioro().end(); ++fit) {
fit->second->setDeleted();
}
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::TryRecovery(fuse_req_t req, bool iswrite)
{
eos_debug("");
if (req && fuse_req_interrupted(req)) {
eos_warning("request interrupted");
// clean-up pending in-memory requests
if (iswrite) {
XrdCl::shared_proxy proxy = mFile->xrdiorw(req);
if (proxy) {
proxy->CleanWriteQueue();
}
}
return EINTR;
}
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:n"]++;
}
if (mReadErrorStack.size() > 128) {
std::string stack_dump;
// we give up recovery if to many recoveries took place
for (auto it = mReadErrorStack.begin(); it != mReadErrorStack.end(); ++it) {
stack_dump += "\n";
stack_dump += *it;
}
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:exceeded"]++;
}
eos_err("giving up recovery - error-stack:%s", stack_dump.c_str());
return EREMOTEIO;
}
if (iswrite) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:disabled"]++;
}
// recover write failures
if (!EosFuse::Instance().Config().recovery.write) { // might be disabled
eos_warning("write recovery disabled");
return EREMOTEIO;
}
if (!mFile->has_xrdiorw(req)) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:noproxy"]++;
}
eos_crit("no proxy object");
return EFAULT;
}
XrdCl::shared_proxy proxy = mFile->xrdiorw(req);
if (proxy->opening_state().IsError() &&
! proxy->opening_state_should_retry()) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:unrecoverable"]++;
}
eos_err("unrecoverable error - code=%d errNo=%d",
proxy->opening_state().code,
proxy->opening_state().errNo);
proxy->CleanWriteQueue();
return XrdCl::Proxy::status2errno(proxy->opening_state());
}
switch (proxy->state()) {
case XrdCl::Proxy::FAILED:
case XrdCl::Proxy::OPENED:
case XrdCl::Proxy::WAITWRITE:
default: {
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:n"]++;
}
eos_crit("triggering write recovery state = %d", proxy->state());
return recover_write(req);
eos_crit("default action");
}
} else {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:disabled"]++;
}
// recover read failures
if (!EosFuse::Instance().Config().recovery.read) { // might be disabled
return EREMOTEIO;
}
// no way to recover
if (!mFile->has_xrdioro(req)) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:noproxy"]++;
}
eos_crit("no proxy object");
return EFAULT;
}
XrdCl::shared_proxy proxy = mFile->xrdioro(req);
if (proxy->opening_state().IsError() &&
! proxy->opening_state_should_retry()) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:unrecoverable"]++;
}
eos_err("unrecoverable error - code=%d errNo=%d",
proxy->opening_state().code,
proxy->opening_state().errNo);
proxy->CleanWriteQueue();
return XrdCl::Proxy::status2errno(proxy->opening_state());
}
switch (proxy->state()) {
case XrdCl::Proxy::FAILED: {
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reopen:n"]++;
}
mReadErrorStack.push_back("open-failed");
return recover_ropen(req);
case XrdCl::Proxy::OPENED: {
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reread:n"]++;
}
mReadErrorStack.push_back("read-failed");
return recover_read(req);
default:
break;
}
}
return EREMOTEIO;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::recover_ropen(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
XrdCl::shared_proxy proxy = 0;
struct timespec ts;
eos::common::Timing::GetTimeSpec(ts, true);
while (1) {
proxy = mFile->xrdioro(req);
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='recover read-open'"));
eos_warning("recover read-open [%d]",
EosFuse::Instance().Config().recovery.read_open);
if (!EosFuse::Instance().Config().recovery.read_open) { // might be disabled
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reopen:disabled"]++;
}
break;
}
XrdCl::XRootDStatus status = proxy->opening_state();
if (status.errNo == kXR_noserver) {
eos_crit("recover read-open-noserver [%d]",
EosFuse::Instance().Config().recovery.read_open_noserver);
// there is no server to read that file
if (!EosFuse::Instance().Config().recovery.read_open_noserver) { // might be disabled
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reopen:noserver:disabled"]++;
}
return ENETUNREACH;
}
}
if (status.IsFatal()) {
// error useless to retry
eos_crit("recover-ropen failed errno=%d", XrdCl::Proxy::status2errno(status));
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reopen:failed"]++;
}
return XrdCl::Proxy::status2errno(status);
}
eos_warning("recover reopening file for read");
XrdCl::OpenFlags::Flags targetFlags = XrdCl::OpenFlags::Read;
XrdCl::Access::Mode mode = XrdCl::Access::UR | XrdCl::Access::UX;
// retrieve the 'tried' information to apply this for the file-reopening to exclude already known 'bad' locations
std::string slasturl;
proxy->GetProperty("LastURL", slasturl);
XrdCl::URL lasturl(slasturl);
XrdCl::URL newurl(mRemoteUrlRO);
XrdCl::URL::ParamsMap last_cgi = lasturl.GetParams();
XrdCl::URL::ParamsMap new_cgi = newurl.GetParams();
std::string last_host = lasturl.GetHostName();
if ((lasturl.GetHostName() != newurl.GetHostName()) &&
(lasturl.GetPort() != newurl.GetPort())) {
eos_warning("applying exclusion list: tried=%s,%s", last_host.c_str(),
new_cgi["tried"].c_str());
new_cgi["tried"] = last_host.c_str() + std::string(",") + last_cgi["tried"];
new_cgi["triedrc"] = "EIO";
new_cgi["eos.repairread"] = "1";
newurl.SetParams(new_cgi);
mRemoteUrlRO = newurl.GetURL();
} else {
new_cgi.erase("tried");
new_cgi.erase("triedrc");
new_cgi["eos.repairread"] = "1";
newurl.SetParams(new_cgi);
mRemoteUrlRO = newurl.GetURL();
}
// issue a new open
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory();
newproxy->OpenAsync(newproxy, mRemoteUrlRO.c_str(), targetFlags, mode, 0);
// wait this time for completion
if ((req && fuse_req_interrupted(req)) || (newproxy->WaitOpen(req) == EINTR)) {
eos_warning("request interrupted");
return EINTR;
}
newproxy->inherit_attached(proxy);
newproxy->inherit_protocol(proxy);
// replace the proxy object
mFile->set_xrdioro(req, newproxy);
proxy->detach();
// save the error status of the previous proxy object
status = proxy->opening_state();
if (newproxy->state() == XrdCl::Proxy::OPENED) { // that worked !
eos_warning("recover reopened file successfully");
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reopen:success"]++;
}
return 0;
}
// that failed again ...
if (status.errNo == kXR_noserver) {
double retry_time_sec = 1.0 * eos::common::Timing::GetCoarseAgeInNs(&ts,
0) / 1000000000.0;
eos_warning("recover no server retry window [ %.02f/%lu ]",
retry_time_sec,
EosFuse::Instance().Config().recovery.read_open_noserver_retrywindow
);
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reopen:noserver:retry"]++;
}
// check how long we are supposed to retry
if (retry_time_sec <
EosFuse::Instance().Config().recovery.read_open_noserver_retrywindow) {
eos_warning("recover no server retry in 5 seconds");
for (auto i = 0; i < 50; ++i) {
// sleep for 5s and then try again
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (req && fuse_req_interrupted(req)) {
eos_warning("request interrupted");
return EINTR;
}
}
// empty the tried= CGI tag
new_cgi.erase("tried");
new_cgi.erase("triedrc");
newurl.SetParams(new_cgi);
mRemoteUrlRO = newurl.GetURL();
continue;
}
break;
}
if (status.IsFatal()) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:reopen:noserver:fatal"]++;
}
// error useless to retry
eos_crit("recover-ropen failed errno=%d", XrdCl::Proxy::status2errno(status));
return XrdCl::Proxy::status2errno(status);
}
break;
}
return EREMOTEIO;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::try_ropen(fuse_req_t req, XrdCl::shared_proxy proxy,
std::string open_url)
{
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='try read-open'"));
struct timespec ts;
eos::common::Timing::GetTimeSpec(ts, true);
XrdCl::OpenFlags::Flags targetFlags = XrdCl::OpenFlags::Read;
XrdCl::Access::Mode mode = XrdCl::Access::UR | XrdCl::Access::UX;
proxy->OpenAsync(proxy, open_url, targetFlags, mode, 0);
// wait this time for completion
if ((req && fuse_req_interrupted(req)) || (proxy->WaitOpen(req) == EINTR)) {
eos_warning("request interrupted");
return EINTR;
}
if (proxy->state() == XrdCl::Proxy::OPENED) { // that worked !
eos_warning("recover read-open succesfull");
return 0;
}
while (1) {
eos_warning("recover read-open [%d]",
EosFuse::Instance().Config().recovery.read_open);
if (!EosFuse::Instance().Config().recovery.read_open) { // might be disabled
break;
}
XrdCl::XRootDStatus status = proxy->opening_state();
if (status.errNo == kXR_noserver) {
eos_crit("recover read-open-noserver [%d]",
EosFuse::Instance().Config().recovery.read_open_noserver);
// there is no server to read that file
if (!EosFuse::Instance().Config().recovery.read_open_noserver) { // might be disabled
return ENETUNREACH;
}
}
if (status.IsFatal()) {
// error useless to retry
eos_crit("recover read-open errno=%d", XrdCl::Proxy::status2errno(status));
return XrdCl::Proxy::status2errno(status);
}
if ((status.errNo == kXR_overQuota) ||
(status.errNo == kXR_NoSpace)) {
// error useless to retry - this can happen if the open tries to reattach a file without locations and the user is out of quota
eos_crit("recover read-open errno=%d", XrdCl::Proxy::status2errno(status));
return XrdCl::Proxy::status2errno(status);
}
eos_warning("recover reopening file for read");
// retrieve the 'tried' information to apply this for the file-reopening to exclude already knowns 'bad' locations
std::string slasturl;
proxy->GetProperty("LastURL", slasturl);
XrdCl::URL lasturl(slasturl);
XrdCl::URL newurl(open_url);
XrdCl::URL::ParamsMap last_cgi = lasturl.GetParams();
XrdCl::URL::ParamsMap new_cgi = newurl.GetParams();
std::string last_host = lasturl.GetHostName();
if ((lasturl.GetHostName() != newurl.GetHostName()) ||
(lasturl.GetPort() != newurl.GetPort())) {
eos_warning("applying exclusion list: tried=%s,%s", last_host.c_str(),
new_cgi["tried"].c_str());
new_cgi["tried"] = last_host.c_str() + std::string(",") + last_cgi["tried"];
new_cgi["triedrc"] = "EIO" + std::string(",") + last_cgi["triedrc"];
newurl.SetParams(new_cgi);
open_url = newurl.GetURL();
} else {
new_cgi.erase("tried");
new_cgi.erase("triedrc");
newurl.SetParams(new_cgi);
open_url = newurl.GetURL();
}
// issue a new open
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory();
newproxy->OpenAsync(newproxy, open_url.c_str(), targetFlags, mode, 0);
// wait this time for completion
if ((req && fuse_req_interrupted(req)) || (newproxy->WaitOpen(req) == EINTR)) {
eos_warning("request interrupted");
return EINTR;
}
newproxy->inherit_attached(proxy);
newproxy->inherit_protocol(proxy);
proxy->detach();
// replace the proxy object
proxy = newproxy;
if (newproxy->state() == XrdCl::Proxy::OPENED) { // that worked !
eos_warning("recover reopened file successfully");
return 0;
}
// that failed again ...
status = proxy->opening_state();
if (status.errNo == kXR_noserver) {
double retry_time_sec = 1.0 * eos::common::Timing::GetCoarseAgeInNs(&ts,
0) / 1000000000.0;
eos_warning("recover no server retry window [ %.02f/%lu ]",
retry_time_sec,
EosFuse::Instance().Config().recovery.read_open_noserver_retrywindow
);
// check how long we are supposed to retry
if (retry_time_sec <
EosFuse::Instance().Config().recovery.read_open_noserver_retrywindow) {
eos_warning("recover no server retry in 5 seconds");
for (auto i = 0; i < 50; ++i) {
// sleep for 5s and then try again
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (req && fuse_req_interrupted(req)) {
eos_warning("request interrupted");
return EINTR;
}
}
// empty the tried= CGI tag
new_cgi.erase("tried");
newurl.SetParams(new_cgi);
open_url = newurl.GetURL();
continue;
}
break;
}
break;
}
eos_warning("recover failed try_ropen");
return EREMOTEIO;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::try_wopen(fuse_req_t req, XrdCl::shared_proxy proxy,
std::string open_url)
{
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='try write-open'"));
struct timespec ts;
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:n"]++;
}
eos::common::Timing::GetTimeSpec(ts, true);
// try to open this file for writing
XrdCl::OpenFlags::Flags targetFlags = XrdCl::OpenFlags::Update;
XrdCl::Access::Mode mode = XrdCl::Access::UR | XrdCl::Access::UW |
XrdCl::Access::UX;
// try to open
XrdCl::XRootDStatus status = proxy->OpenAsync(proxy, open_url.c_str(),
targetFlags,
mode, 0);
if (proxy->WaitOpen(req) == EINTR) {
eos_warning("request interrupted");
proxy->CleanWriteQueue();
return EINTR;
}
// if that worked we are already fine, otherwise we enter a timebased logic for retries
if (proxy->state() == XrdCl::Proxy::OPENED) { // that worked !
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:success"]++;
}
eos_warning("re-opening for write succeeded");
return 0;
}
while (1) {
eos_warning("recover write-open [%d]",
EosFuse::Instance().Config().recovery.write_open);
if (!EosFuse::Instance().Config().recovery.write_open) { // might be disabled
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:disabled"]++;
}
break;
}
XrdCl::XRootDStatus status = proxy->opening_state();
if (status.errNo == kXR_noserver) {
eos_crit("recover write-open-noserver [%d]",
EosFuse::Instance().Config().recovery.write_open_noserver);
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:noserver::retry"]++;
}
// there is no server to read that file
if (!EosFuse::Instance().Config().recovery.write_open_noserver) { // might be disable
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:noserver::disabled"]++;
}
return ENETUNREACH;
}
}
if (status.IsFatal()) {
// error useless to retry
eos_crit("recover write-open-fatal queue=%d errno=%d",
proxy->WriteQueue().size(), XrdCl::Proxy::status2errno(status));
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:unrecoverable"]++;
}
return XrdCl::Proxy::status2errno(status);
}
if (status.errNo == kXR_overQuota) {
// error useless to retry - no quota anymore
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:overquota"]++;
}
eos_crit("recover write-open errno=%d", XrdCl::Proxy::status2errno(status));
return XrdCl::Proxy::status2errno(status);
}
eos_warning("recover reopening file for writing");
// issue a new open
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory();
newproxy->OpenAsync(newproxy, open_url.c_str(), targetFlags, mode, 0);
// wait this time for completion
if ((req && fuse_req_interrupted(req)) || (newproxy->WaitOpen(req) == EINTR)) {
eos_warning("request interrupted");
proxy->CleanWriteQueue();
return EINTR;
}
newproxy->inherit_attached(proxy);
newproxy->inherit_protocol(proxy);
newproxy->inherit_writequeue(newproxy, proxy);
// replace the proxy object
proxy = newproxy;
if (newproxy->state() == XrdCl::Proxy::OPENED) { // that worked !
eos_warning("recover reopened file successfully");
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:success"]++;
}
return 0;
}
// that failed again ...
status = proxy->opening_state();
if (status.errNo == kXR_noserver) {
double retry_time_sec = 1.0 * eos::common::Timing::GetCoarseAgeInNs(&ts,
0) / 1000000000.0;
eos_warning("recover no server retry window [ %.02f/%.02f ]",
retry_time_sec,
EosFuse::Instance().Config().recovery.read_open_noserver_retrywindow
);
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:nosever"]++;
}
// check how long we are supposed to retry
if (retry_time_sec <
EosFuse::Instance().Config().recovery.read_open_noserver_retrywindow) {
eos_warning("recover no server retry in 5 seconds");
for (auto i = 0; i < 50; ++i) {
// sleep for 5s and then try again
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (req && fuse_req_interrupted(req)) {
eos_warning("request interrupted");
proxy->CleanWriteQueue();
return EINTR;
}
}
continue;
}
break;
}
if (status.IsFatal()) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:reopen:noserver:failed"]++;
}
// error useless to retry
eos_crit("recover write-open-fatal errno=%d",
XrdCl::Proxy::status2errno(status));
return XrdCl::Proxy::status2errno(status);
}
break;
}
return EREMOTEIO;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::recover_read(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='recover read'"));
XrdCl::shared_proxy proxy = mFile->xrdioro(req);
// recover a pread error
XrdCl::XRootDStatus status = proxy->read_state();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:n"]++;
}
if (req && fuse_req_interrupted(req)) {
eos_warning("request interrupted");
return EINTR;
}
// re-open the file
int reopen = recover_ropen(req);
if (!reopen) {
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:success"]++;
}
eos_warning("recover reopened file successfully to re-read");
// let's try again
return 0;
}
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:read:failed"]++;
}
return reopen;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::recover_write(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='recover write'"));
eos_debug("");
XrdCl::shared_proxy proxy = mFile->xrdiorw(req);
// check if we have a problem with the open
XrdCl::XRootDStatus status = proxy->WaitOpen();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:n"]++;
}
if (status.IsFatal() ||
(proxy->opening_state().IsError() &&
! proxy->opening_state_should_retry())
) {
// error useless to retry
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:unrecoverable"]++;
}
proxy->CleanWriteQueue();
proxy->ChunkMap().clear();
eos_crit("recover write-open-fatal queue=%d errno=%d",
proxy->WriteQueue().size(), XrdCl::Proxy::status2errno(status));
return XrdCl::Proxy::status2errno(status);
}
// try to open this file for reading
bool recover_from_file_cache = false;
bool recover_truncate = false;
// check if the file has been created here and is still complete in the local caches
if ((mFlags & O_CREAT) && mFile->file() &&
(((mSize <= mFile->file()->prefetch_size()) &&
(mSize == (ssize_t) mFile->file()->size())) ||
(mFile->journal() &&
mFile->journal()->first_flush()))) { // if the journal was not flushed yet and it is a creation, we have still all the data in the journal
eos_debug("recover from file cache");
// this file can be recovered from the file start cache
recover_from_file_cache = true;
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='recover from file cache'"));
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromcache"]++;
}
} else {
// we have to recover this from remote
eos_debug("recover from remote file");
recover_from_file_cache = false;
ssize_t truncate_size = mFile->journal() ? mFile->journal()->get_truncatesize()
: -1;
if (truncate_size == 0) {
recover_truncate = true;
}
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromremote"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"hint='recover from remote file'"));
}
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory();
if (!recover_from_file_cache && !recover_truncate) {
// we need to open this file because it is not complete locally
int rc = try_ropen(req, newproxy,
mRemoteUrlRW + "&eos.checksum=ignore&eos.repairread=1");
if (rc) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"hint='read-open failed with rc=%d'", rc));
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromcache:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='read-open failed with rc=%d'", rc));
return rc;
}
}
if (mFile->file() || recover_truncate) {
void* buf = 0;
std::string stagefile;
int fd = -1;
off_t off = 0;
uint32_t size = 1 * 1024 * 1024;
bufferllmanager::shared_buffer buffer;
struct fd_guard {
fd_guard(int &fd) : fd_(fd) { }
~fd_guard() {
if (fd_ >= 0) { ::close(fd_); fd_ = -1; }
}
int &fd_;
} fdg(fd);
if (!recover_truncate) {
mFile->file()->recovery_location(stagefile);
buffer = sBufferManager.get_buffer(size);
buf = (void*) buffer->ptr();
// open local stagefile
fd = ::open(stagefile.c_str(), O_CREAT | O_RDWR, S_IRWXU);
::unlink(stagefile.c_str());
if (fd < 0) {
sBufferManager.put_buffer(buffer);
eos_crit("failed to open local stagefile %s", stagefile.c_str());
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromremote:local:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='failed to open local stagefile'"));
return EREMOTEIO;
}
}
if (req && begin_flush(req)) {
eos_warning("failed to signal begin-flush");
}
if (recover_from_file_cache) {
eos_debug("recovering from local start cache into stage file %s",
stagefile.c_str());
// make sure the buffer size fits
buffer->resize(mFile->file()->size());
buffer->reserve(mFile->file()->size());
buf = (void*) buffer->ptr();
// recover file from the local start cache
if (mFile->file()->pread(buf, mFile->file()->size(), 0) < 0) {
if (!recover_truncate) {
sBufferManager.put_buffer(buffer);
}
eos_crit("unable to read file for recovery from local file cache");
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromcache:read:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='unable to read file for recovery from local cache file'"));
return EIO;
}
} else {
eos_debug("recovering from remote file into stage file %s", stagefile.c_str());
if (!recover_truncate) {
// download all into local stagefile, we don't need to do this, if there is a truncate request
uint32_t bytesRead = 0;
do {
status = newproxy->Read(newproxy, off, size, buf, bytesRead);
eos_debug("off=%lu bytesread=%u", off, bytesRead);
if (!status.IsOK()) {
sBufferManager.put_buffer(buffer);
eos_warning("failed to read remote file for recovery msg='%s'",
status.ToString().c_str());
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='failed to read remote file for recovery'",
status.ToString().c_str()));
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromremote:read:failed"]++;
}
return EREMOTEIO;
} else {
off += bytesRead;
}
ssize_t wr = ::write(fd, buf, bytesRead);
if (wr != bytesRead) {
sBufferManager.put_buffer(buffer);
eos_crit("failed to write to local stage file %s", stagefile.c_str());
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromremote:localwrite:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='failed to write to local stage file'"));
return EREMOTEIO;
}
} while (bytesRead > 0);
}
}
// upload into identical inode using the drop & replace option (repair flag)
XrdCl::shared_proxy uploadproxy = XrdCl::Proxy::Factory();
uploadproxy->inherit_attached(proxy);
uploadproxy->inherit_writequeue(uploadproxy, proxy);
// we have to remove the flush otherwise we cannot open this file even ourselfs
if (req && end_flush(req)) {
eos_warning("failed to signal begin-flush");
}
// add the repair flag to drop existing locations and select new ones
mRemoteUrlRW += "&eos.repair=1";
// request enough space for this recovery upload
mRemoteUrlRW += "&eos.bookingsize=0";
eos_warning("re-opening with repair flag for recovery %s",
mRemoteUrlRW.c_str());
int rc = try_wopen(req, uploadproxy, mRemoteUrlRW);
mRemoteUrlRW.erase(mRemoteUrlRW.length() -
std::string("&eos.repair=1").length());
// put back the flush indicator
if (req && begin_flush(req)) {
eos_warning("failed to signal begin-flush");
}
if (rc) {
if (!recover_truncate) {
sBufferManager.put_buffer(buffer);
}
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromremote:beginflush:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='failed to signal endflush rc=%d'", rc));
return rc;
}
off_t upload_offset = 0;
if (!recover_truncate) {
ssize_t nr = 0;
do {
nr = ::pread(fd, buf, size, upload_offset);
if (nr < 0) {
sBufferManager.put_buffer(buffer);
eos_crit("failed to read from local stagefile");
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
sBufferManager.put_buffer(buffer);
{
uploadproxy->WaitWrite(req);
}
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromremote:endflush:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='failed to read from local stagefile errno=%d'", errno));
return EREMOTEIO;
}
if (nr) {
// send asynchronous upstream writes
XrdCl::Proxy::write_handler handler = uploadproxy->WriteAsyncPrepare(
uploadproxy, nr,
upload_offset, 60);
uploadproxy->ScheduleWriteAsync(buf, handler);
upload_offset += nr;
}
} while (nr > 0);
// collect the writes to verify everything is alright now
uploadproxy->WaitWrite(req);
if (!uploadproxy->write_state().IsOK()) {
sBufferManager.put_buffer(buffer);
eos_crit("got failure when collecting outstanding writes from the upload proxy");
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:fromremote:write:failed"]++;
}
return EREMOTEIO;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "uploaded-bytes=%lu",
upload_offset));
sBufferManager.put_buffer(buffer);
}
eos_notice("finished write recovery successfully");
// replace the proxy object
mFile->set_xrdiorw(req, uploadproxy);
proxy->CleanWriteQueue();
proxy->detach();
// replay the journal
if (mFile->journal()) {
if ((rc = journalflush(req))) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"errno='%d' hint='failed journalflush'",
rc));
eos_err("journal-flushing failed rc=%d", rc);
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:journalflush:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='journal-flushing failed rc=%d'", rc));
return rc;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='success journalflush'"));
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:journalflush:success"]++;
}
}
}
// re-open the file centrally for access
if (req && end_flush(req)) {
eos_warning("failed to signal end-flush");
}
} else {
eos_crit("no local cache data for recovery");
proxy->CleanWriteQueue();
{
XrdCl::Proxy::ProxyStatHandle::Get()->Stats()["recover:write:nocache:failed"]++;
}
mRecoveryStack.push_back(eos_log(LOG_SILENT, "hint='no local cache data for recovery'"));
return EREMOTEIO;
}
return 0;
}
/* -------------------------------------------------------------------------- */
int
data::datax::begin_flush(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
return EosFuse::Instance().mds.begin_flush(req, mMd,
std::string("repair")); // flag an ongoing flush centrally
}
/* -------------------------------------------------------------------------- */
int
data::datax::end_flush(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
return EosFuse::Instance().mds.end_flush(req, mMd,
std::string("repair")); // unflag an ongoing flush centrally
}
/* -------------------------------------------------------------------------- */
int
data::datax::detach(fuse_req_t req, std::string& cookie, int flags)
/* -------------------------------------------------------------------------- */
{
bool isRW = false;
if (flags & (O_RDWR | O_WRONLY)) {
isRW = true;
}
eos_info("cookie=%s flags=%o isrw=%d", cookie.c_str(), flags, isRW);
XrdSysMutexHelper lLock(mLock);
int bcache = mFile->file() ? mFile->file()->detach(cookie) : 0;
int jcache = mFile->journal() ? ((isRW ||
(mFlags & O_CACHE)) ? mFile->journal()->detach(
cookie) : 0) : 0;
int xio = 0;
if (isRW) {
if (mFile->has_xrdiorw(req)) {
mFile->xrdiorw(req)->detach();
}
} else {
if (mFile->has_xrdioro(req)) {
mFile->xrdioro(req)->detach();
}
}
return bcache | jcache | xio;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::store_cookie(std::string& cookie)
/* -------------------------------------------------------------------------- */
{
eos_info("cookie=%s", cookie.c_str());
int bc = mFile->file() ? mFile->file()->set_cookie(cookie) : 0;
int jc = mFile->journal() ? mFile->journal()->set_cookie(cookie) : 0;
return bc | jc;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::unlink(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
eos_info("");
cachehandler::instance().rm(mIno);
int bcache = mFile->file() ? mFile->file()->unlink() : 0;
int jcache = mFile->journal() ? mFile->journal()->unlink() : 0;
mIsUnlinked = true;
FlagDeleted();
return bcache | jcache;
}
// IO bridge interface
/* -------------------------------------------------------------------------- */
ssize_t
/* -------------------------------------------------------------------------- */
data::datax::pread(fuse_req_t req, void* buf, size_t count, off_t offset)
/* -------------------------------------------------------------------------- */
{
eos_info("offset=%llu count=%lu", offset, count);
mLock.Lock();
if (mFile->journal()) {
ssize_t jts = ((mFile->journal()))->get_truncatesize();
if (jts >= 0) {
// reduce reads in case of truncation stored in the journal
if ((ssize_t) offset > jts) {
offset = 0;
count = 0;
} else {
if ((ssize_t)(offset + count) > jts) {
count = jts - offset;
}
}
}
}
if (inline_buffer && inlined() &&
(count + offset) < mInlineMaxSize) {
// possibly return data from an inlined buffer
ssize_t avail_bytes = 0;
if (((size_t) offset < (*mMd)()->size())) {
if ((offset + count) > (*mMd)()->size()) {
avail_bytes = (*mMd)()->size() - offset;
} else {
avail_bytes = count;
}
} else {
avail_bytes = 0;
}
memcpy(buf, inline_buffer->ptr() + offset, avail_bytes);
mLock.UnLock();
return avail_bytes;
}
ssize_t br = 0;
if (mFile->file()) {
// read from file start cache
br = mFile->file()->pread(buf, count, offset);
}
if (br < 0) {
mLock.UnLock();
return br;
}
if (br == (ssize_t) count) {
mLock.UnLock();
return br;
}
if (mFile->file() && (offset < mFile->file()->prefetch_size())) {
mLock.UnLock();
if (prefetch(req)) {
WaitPrefetch(req);
mLock.Lock();
ssize_t br = mFile->file()->pread(buf, count, offset);
if (br < 0) {
mLock.UnLock();
return br;
}
if (br == (ssize_t) count) {
mLock.UnLock();
return br;
}
} else {
mLock.Lock();
}
}
ssize_t jr = 0;
if (mFile->journal()) {
// read from journal cache
jr = mFile->journal() ? mFile->journal()->pread((char*) buf + br, count - br,
offset + br) : 0;
}
if (jr < 0) {
mLock.UnLock();
return jr;
}
if ((br + jr) == (ssize_t) count) {
mLock.UnLock();
return (br + jr);
}
// read the missing part remote
XrdCl::shared_proxy proxy = mFile->has_xrdioro(req) ? mFile->xrdioro(
req) : mFile->xrdiorw(req);
XrdCl::XRootDStatus status;
if (proxy) {
if (proxy->IsOpening()) {
status = proxy->WaitOpen();
}
if (!mFile->is_caching()) {
// if caching is disabled, we wait for outstanding writes
status = proxy->WaitWrite();
// it is not obvious what we should do if there was a write error,
// we just proceed
}
uint32_t bytesRead = 0;
if (proxy->Read(proxy,
offset + br,
count - br,
(char*) buf + br,
bytesRead).IsOK()) {
mLock.UnLock();
std::vector chunks;
if (mFile->journal()) {
// retrieve all journal chunks matching our range
chunks = ((mFile->journal()))->get_chunks(offset + br, count - br);
for (auto it = chunks.begin(); it != chunks.end(); ++it) {
eos_err("offset=%ld count=%lu overlay-chunk offset=%ld size=%lu\n", offset,
count, it->offset, it->size);
// overlay journal contents again over the remote contents
ssize_t ljr = mFile->journal()->pread((char*) buf + br +
(it->offset - offset - br), it->size, it->offset);
if (ljr >= 0) {
// check if the journal contents extends the remote read
ssize_t chunkread = it->offset + it->size - offset - br;
if (chunkread > bytesRead) {
bytesRead = chunkread;
}
}
}
if (mFile->journal()) {
eos_info("offset=%ld count=%lu journal-max=%ld\n", offset, count,
mFile->journal()->get_max_offset());
// check if there is a chunk in the journal which extends the file size,
// so we have to extend the read
if (mFile->journal()->get_max_offset() > (off_t)(offset + br + bytesRead)) {
if (mFile->journal()->get_max_offset() > (off_t)(offset + count)) {
// the last journal entry extends over the requested range, we got all bytes
bytesRead = count;
} else {
// this should not be required, because logically we cannot get here
eos_err("consistency error : max-journal=%ld offset=%ld count=%lu br=%lu bytesread=%lu",
mFile->journal()->get_max_offset(), offset, count, br, bytesRead);
// we don't set bytesread here!
}
}
}
}
eos_info("count=%lu read-bytes=%lu", count, br + bytesRead);
if ((size_t)(br + bytesRead) > count) {
return count;
} else {
return (br + bytesRead);
}
return (br + bytesRead);
} else {
mLock.UnLock();
errno = EREMOTEIO;
// IO error
return -1;
}
}
mLock.UnLock();
errno = EFAULT;
return -1;
}
/* -------------------------------------------------------------------------- */
ssize_t
/* -------------------------------------------------------------------------- */
data::datax::pwrite(fuse_req_t req, const void* buf, size_t count, off_t offset)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mLock);
eos_info("offset=%llu count=%lu", offset, count);
ssize_t dw = 0;
// inlined-files
if (inline_buffer) {
if ((count + offset) < mInlineMaxSize) {
// copy into file inline buffer
inline_buffer->writeData(buf, offset, count);
}
}
if (mFile->file()) {
if (mFile->file()->size() || (mFlags & O_CREAT)) {
// don't write into the file start cache, if it is currently empty and it is not a newly created file
dw = mFile->file()->pwrite(buf, count, offset);
}
}
if (dw < 0) {
return dw;
} else {
if (mFile->journal()) {
if (!mFile->journal()->fits(count)) {
int rc = flush_nolock(req, true, true);
if (rc) {
eos_warning("flush failed with errno=%d", rc);
errno = rc;
return -1;
}
}
// now there is space to write for us
ssize_t jw = mFile->journal()->pwrite(buf, count, offset);
if (jw < 0) {
return jw;
}
dw = jw;
}
{
// stop sending more writes in case of unrecoverable errors
XrdCl::shared_proxy proxy = mFile->xrdiorw(req);
// block writes on read-only fds
if (!proxy) {
errno = EROFS;
return -1;
}
if (proxy->opening_state().IsError() &&
! proxy->opening_state_should_retry()) {
eos_err("unrecoverable error - code=%d errNo=%d",
proxy->opening_state().code,
proxy->opening_state().errNo);
proxy->CleanWriteQueue();
errno = XrdCl::Proxy::status2errno(proxy->opening_state());
return -1;
}
}
// send an asynchronous upstream write, which does not wait for the file open to be done
XrdCl::Proxy::write_handler handler =
mFile->xrdiorw(req)->WriteAsyncPrepare(mFile->xrdiorw(req), count, offset, 60);
XrdCl::XRootDStatus status =
mFile->xrdiorw(req)->ScheduleWriteAsync(buf, handler);
// test if we switch to xoff mode, where we only write into the journal
size_t cnt = 0;
while (mFile->xrdiorw(req)->HasTooManyWritesInFlight()) {
if (!(cnt % 1000)) {
eos_debug("doing XOFF");
}
EosFuse::instance().datas.set_xoff();
mXoff = true;
std::string msg;
if (mFile->xrdiorw(req)->HadFailures(msg)) {
eos_err("file state failure during xoff - switching to sync mode msg='%s'",
msg.c_str());
// if we had failures we change into synchronous mode to be able to trigger appropriate recovery
mFlags |= O_SYNC;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
cnt++;
}
mXoff = false;
if ((!status.IsOK()) && (!EosFuse::Instance().Config().recovery.write)) {
errno = XrdCl::Proxy::status2errno(status);
eos_err("async remote-io failed msg=\"%s\"", status.ToString().c_str());
return -1;
}
if (mFlags & O_SYNC) {
eos_debug("O_SYNC");
// make sure the file gets opened
XrdCl::XRootDStatus status = mFile->xrdiorw(req)->WaitOpen();
if (!status.IsOK()) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='will TryRecovery'",
status.ToString().c_str()));
int tret = 0;
if ((tret = TryRecovery(req, true))) {
errno = XrdCl::Proxy::status2errno(status);
eos_err("pseudo-sync remote-io failed msg=\"%s\"", status.ToString().c_str());
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' errno='%d' hint='failed TryRecovery'",
status.ToString().c_str(), tret));
return -1;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"triggering-status='%s' hint='success TryRecovery'",
status.ToString().c_str()));
// re-send the write again
XrdCl::Proxy::write_handler handler =
mFile->xrdiorw(req)->WriteAsyncPrepare(mFile->xrdiorw(req), count, offset, 60);
XrdCl::XRootDStatus status =
mFile->xrdiorw(req)->ScheduleWriteAsync(buf, handler);
}
}
// make sure all writes were successful
status = mFile->xrdiorw(req)->WaitWrite();
if (!status.IsOK()) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='will TryRecovery'",
status.ToString().c_str()));
int tret = 0;
if ((tret = TryRecovery(req, true))) {
errno = XrdCl::Proxy::status2errno(status);
eos_err("pseudo-sync remote-io failed msg=\"%s\"", status.ToString().c_str());
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' errno='%d' hint='failed TryRecovery'",
status.ToString().c_str(), tret));
return -1;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"triggering-status='%s' hint='success TryRecovery'",
status.ToString().c_str()));
// re-send the write again
XrdCl::Proxy::write_handler handler =
mFile->xrdiorw(req)->WriteAsyncPrepare(mFile->xrdiorw(req), count, offset, 60);
XrdCl::XRootDStatus status =
mFile->xrdiorw(req)->ScheduleWriteAsync(buf, handler);
status = mFile->xrdiorw(req)->WaitWrite();
if (!status.IsOK()) {
errno = XrdCl::Proxy::status2errno(status);
eos_err("pseudo-sync remote-io failed msg=\"%s\"", status.ToString().c_str());
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='failed resending writes after successful recovery'",
status.ToString().c_str()));
return -1;
}
}
}
}
}
if ((off_t)(offset + count) > mSize) {
mSize = count + offset;
}
eos_info("offset=%llu count=%lu result=%d", offset, count, dw);
return dw;
}
/* -------------------------------------------------------------------------- */
ssize_t
/* -------------------------------------------------------------------------- */
data::datax::peek_pread(fuse_req_t req, char*& buf, size_t count, off_t offset)
/* -------------------------------------------------------------------------- */
{
size_t md_size = 0;
{
XrdSysMutexHelper lLock(mMd->Locker());
md_size = (*mMd)()->size();
}
mLock.Lock();
eos_info("offset=%llu count=%lu size=%lu", offset, count, md_size);
if (mFile->journal()) {
ssize_t jts = ((mFile->journal()))->get_truncatesize();
if (jts >= 0) {
// reduce reads in case of truncation stored in the journal
if ((ssize_t) offset > jts) {
offset = 0;
count = 0;
} else {
if ((ssize_t)(offset + count) > jts) {
count = jts - offset;
}
}
}
}
buffer = sBufferManager.get_buffer(count);
buf = buffer->ptr();
if (inline_buffer && inlined()) {
// possibly return data from an inlined buffer
ssize_t avail_bytes = 0;
if (((size_t) offset < md_size)) {
if ((offset + count) > md_size) {
avail_bytes = md_size - offset;
} else {
avail_bytes = count;
}
} else {
avail_bytes = 0;
}
if (md_size <= (unsigned long long) inline_buffer->getSize()) {
memcpy(buf, inline_buffer->ptr() + offset, avail_bytes);
eos_debug("inline-read byte=%lld inline-buffer-size=%lld", avail_bytes,
inline_buffer->getSize());
return avail_bytes;
}
}
ssize_t br = 0;
if (mFile->file()) {
br = mFile->file()->pread(buf, count, offset);
if (EOS_LOGS_DEBUG) {
eos_debug("disk-read:%ld", br);
}
if (br < 0) {
return br;
}
if ((br == (ssize_t) count) || (br == (ssize_t)md_size)) {
return br;
}
}
if (mFile->file() && (offset < mFile->file()->prefetch_size())) {
if (prefetch(req, false)) {
WaitPrefetch(req, false);
ssize_t br = mFile->file()->pread(buf, count, offset);
if (br < 0) {
return br;
}
if (br == (ssize_t) count) {
if (mFile->journal() && (mFlags & O_CACHE)) {
// optionally populate the read journal cache
mFile->journal()->pwrite(buf, count, offset);
}
return br;
}
}
}
ssize_t jr = 0;
if (mFile->journal()) {
jr = mFile->journal() ? mFile->journal()->pread(buf + br, count - br,
offset + br) : 0;
if (jr < 0) {
return jr;
}
if ((br + jr) == (ssize_t) count) {
return (br + jr);
}
}
// read the missing part remote
XrdCl::shared_proxy proxy = mFile->has_xrdioro(req) ? mFile->xrdioro(
req) : mFile->xrdiorw(req);
XrdCl::XRootDStatus status;
eos_debug("ro=%d offset=%llu count=%lu br=%lu jr=%lu", mFile->has_xrdioro(req),
offset, count, br, jr);
if (proxy) {
if (proxy->IsOpening()) {
status = proxy->WaitOpen();
}
if (mFile->has_xrdiorw(req)) {
if (!status.IsOK()) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='will TryRecovery'",
status.ToString().c_str()));
int tret = 0;
// call recovery for an open
if ((tret = TryRecovery(req, false))) {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' errno='%d' hint='failed TryRecovery'",
status.ToString().c_str(), tret));
errno = XrdCl::Proxy::status2errno(status);
eos_err("sync remote-io failed msg=\"%s\"", status.ToString().c_str());
return -1;
} else {
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"triggering-status='%s' hint='success TryRecovery'",
status.ToString().c_str()));
// get the new proxy object, the recovery might exchange the file object
proxy = mFile->has_xrdioro(req) ? mFile->xrdioro(req) : mFile->xrdiorw(
req); // recovery might change the proxy object
}
}
}
if (mFile->has_xrdiorw(req)) {
XrdCl::shared_proxy wproxy = mFile->xrdiorw(req);
if (wproxy->OutstandingWrites()) {
status = wproxy->WaitWrite();
}
if (!status.IsOK()) {
errno = XrdCl::Proxy::status2errno(status);
eos_err("sync remote-io failed msg=\"%s\"", status.ToString().c_str());
return -1;
}
}
uint32_t bytesRead = 0;
int recovery = 0;
while (1) {
// if the recovery failed already once, we continue to silently return this error
if (!can_recover_read()) {
errno = XrdCl::Proxy::status2errno(status);
if (EOS_LOGS_DEBUG) {
eos_debug("sync remote-io failed msg=\"%s\" previously - recovery disabled",
status.ToString().c_str());
}
return -1;
}
proxy = mFile->has_xrdioro(req) ? mFile->xrdioro(req) : mFile->xrdiorw(
req); // recovery might change the proxy object
status = proxy->Read(proxy,
offset + br + jr,
count - br - jr,
(char*) buf + br + jr,
bytesRead);
if (!status.IsOK()) {
// read failed
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' hint='will TryRecovery'",
status.ToString().c_str()));
recovery = TryRecovery(req, false);
if (recovery) {
// recovery failed
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"status='%s' errno='%d' hint='failed TryRecovery'",
status.ToString().c_str(), recovery));
break;
} else {
// recovery succeeded
mRecoveryStack.push_back(eos_log(LOG_SILENT,
"triggering-status='%s' hint='success TryRecovery'",
status.ToString().c_str()));
}
} else {
// read succeeded
break;
}
}
if (recovery) {
errno = recovery;
disable_read_recovery();
eos_err("sync remote-io recovery failed errno=%d", errno);
return -1;
}
if (status. IsOK()) {
std::vector chunks;
if (mFile->journal()) {
// retrieve all journal chunks matching our range
chunks = ((mFile->journal()))->get_chunks(offset + br , count - br);
for (auto it = chunks.begin(); it != chunks.end(); ++it) {
eos_info("offset=%ld count=%lu overlay-chunk offset=%ld size=%lu", offset,
count, it->offset, it->size);
// overlay journal contents again over the remote contents
ssize_t ljr = mFile->journal()->pread((char*) buf + br +
(it->offset - offset - br), it->size, it->offset);
if (ljr >= 0) {
// check if the journal contents extends the remote read
ssize_t chunkread = it->offset + it->size - offset - br;
if (chunkread > bytesRead) {
bytesRead = chunkread;
}
}
}
eos_info("offset=%ld count=%lu bytes-read=%lu journal-max=%ld\n", offset, count,
bytesRead, mFile->journal()->get_max_offset());
// check if there is a chunk in the journal which extends the file size,
// so we have to extend the read
if (mFile->journal()->get_max_offset() > (off_t)(offset + br + bytesRead)) {
if (mFile->journal()->get_max_offset() > (off_t)(offset + count)) {
// the last journal entry extends over the requested range, we got all bytes
bytesRead = count;
} else {
// this should not be required, because logically we cannot get here
bytesRead = mFile->journal()->get_max_offset() - offset;
}
}
}
eos_info("count=%lu read-bytes=%lu", count, br + bytesRead);
if (mFile->journal() && (mFlags & O_CACHE)) {
// optionally populate the read journal cache
mFile->journal()->pwrite(buf, br + bytesRead, offset);
}
if ((size_t)(br + bytesRead) > count) {
return count;
} else {
return (br + bytesRead);
}
} else {
errno = XrdCl::Proxy::status2errno(status);
eos_err("sync remote-io failed msg=\"%s\"", status.ToString().c_str());
return -1;
}
}
return -1;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::datax::release_pread()
/* -------------------------------------------------------------------------- */
{
eos_info("");
sBufferManager.put_buffer(buffer);
buffer.reset();
mLock.UnLock();
return;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::truncate(fuse_req_t req, off_t offset)
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mLock);
eos_info("offset=%llu size=%llu", offset, mSize);
int dt = 0;
if (inline_buffer) {
if (inlined()) {
if (((size_t) offset) < mInlineMaxSize) {
// truncate file inline buffer
inline_buffer->truncateData(offset);
}
} else {
if (offset == 0) {
// we can re-enable the inlining for such a file
inline_buffer->truncateData(0);
mIsInlined = true;
}
}
}
if (mFile->file()) {
if (offset <= mFile->file()->prefetch_size()) {
// if the truncate falls into the file cache size, we have disable it because
// subsequent writes can stamp a whole inside the file cache
dt = mFile->file()->truncate(0);
remove_file_cache();
}
}
// if we have a journal it tracks the truncation size
int jt = 0;
if (mFile->journal()) {
jt = mFile->journal() ? mFile->journal()->truncate(offset) : 0;
}
eos_info("dt=%d jt=%d", dt, jt);
if (!mFile->journal()) {
if (mFile->has_xrdiorw(req)) {
if (mFile->xrdiorw(req)->IsOpening()) {
mFile->xrdiorw(req)->WaitOpen();
}
mFile->xrdiorw(req)->WaitWrite();
// the journal keeps track of truncation, otherwise or for O_SYNC we do it here
XrdCl::XRootDStatus status = mFile->xrdiorw(req)->Truncate(offset);
errno = XrdCl::Proxy::status2errno(status);
if (!status.IsOK()) {
return -1;
}
} else {
errno = EFAULT;
return -1;
}
}
if (!(dt | jt)) {
mSize = offset;
}
return dt | jt;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::sync()
/* -------------------------------------------------------------------------- */
{
eos_info("");
int ds = 0;
if (mFile->file()) {
ds = mFile->file()->sync();
}
int js = 0;
if (mFile->journal()) {
js = mFile->journal() ? mFile->journal()->sync() : 0;
}
bool journal_recovery = false;
for (auto it = mFile->get_xrdiorw().begin();
it != mFile->get_xrdiorw().end(); ++it) {
if (it->second->IsOpening()) {
it->second->WaitOpen();
}
XrdCl::XRootDStatus status = it->second->WaitWrite();
if (!status.IsOK()) {
errno = XrdCl::Proxy::status2errno(status);
journal_recovery = true;
} else {
status = it->second->Sync();
if (!status.IsOK()) {
errno = XrdCl::Proxy::status2errno(status);
journal_recovery = true;
}
}
}
if (journal_recovery) {
eos_err("syncing failed");
return -1;
}
return ds | js;
}
/* -------------------------------------------------------------------------- */
size_t
/* -------------------------------------------------------------------------- */
data::datax::size()
/* -------------------------------------------------------------------------- */
{
eos_info("");
off_t dsize = mFile->file() ? mFile->file()->size() : 0;
if (mSize > dsize) {
return mSize;
}
return dsize;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
data::datax::cache_invalidate()
/* -------------------------------------------------------------------------- */
{
eos_info("");
XrdSysMutexHelper lLock(mLock);
// truncate the block cache
int dt = mFile->file() ? mFile->file()->truncate(0) : 0;
int jt = mFile->journal() ? mFile->journal()->truncate(0, true) : 0;
inline_buffer = nullptr;
for (auto fit = mFile->get_xrdioro().begin();
fit != mFile->get_xrdioro().end(); ++fit) {
if (fit->second->IsOpen()) {
fit->second->DropReadAhead();
}
}
return dt | jt;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::datax::set_remote(const std::string& hostport,
const std::string& basename,
const uint64_t md_ino,
const uint64_t md_pino,
fuse_req_t req,
bool isRW)
/* -------------------------------------------------------------------------- */
{
eos_info("");
std::string remoteurl;
remoteurl = "root://";
remoteurl += hostport;
remoteurl += "//fusex-open";
remoteurl += "?eos.lfn=";
if (md_ino) {
remoteurl += "ino:";
char sino[128];
snprintf(sino, sizeof(sino), "%lx", md_ino);
remoteurl += sino;
} else {
remoteurl += "pino:";
char pino[128];
snprintf(pino, sizeof(pino), "%lx", md_pino);
remoteurl += pino;
remoteurl += "/";
remoteurl += basename;
}
std::string appname;
if (EosFuse::Instance().mds.supports_appname()) {
appname = EosFuse::Instance().Config().appname;
} else {
appname = "fuse";
}
remoteurl += "&eos.app=";
remoteurl += appname;
remoteurl += "&mgm.mtime=0&mgm.fusex=1&eos.bookingsize=0";
if (!isRW) {
// we don't check checksums in read, because we might read a file which is open and it does not have
// a final checksum when we read over the end
remoteurl += "&eos.checksum=ignore";
}
XrdCl::URL url(remoteurl);
XrdCl::URL::ParamsMap query = url.GetParams();
fusexrdlogin::loginurl(url, query, req, md_ino);
url.SetParams(query);
remoteurl = url.GetURL();
if (isRW) {
mRemoteUrlRW = remoteurl;
} else {
mRemoteUrlRO = remoteurl;
}
}
/* -------------------------------------------------------------------------- */
void
data::datax::dump_recovery_stack()
/* -------------------------------------------------------------------------- */
{
size_t i = 0;
char n[8];
if (mRecoveryStack.size()) {
std::stringstream sdump;
sdump << "# -------------------" << std::endl;
sdump << "# - recovery record -" << std::endl;
sdump << "# -------------------" << std::endl;
sdump << "# path := '" << fullpath() << "'" << std::endl;
sdump << "# fid := " << fid() << std::endl;
for (auto it : mRecoveryStack) {
snprintf(n, sizeof(n), "%03lu", i);
sdump << "# -[ " << n << " ] " << it << std::endl;
++i;
}
fprintf(stderr, "%s\n", sdump.str().c_str());
fflush(stderr);
}
}
/* -------------------------------------------------------------------------- */
const char*
data::datax::Dump(std::string& out)
/* -------------------------------------------------------------------------- */
{
for (auto fit = mFile->get_xrdioro().begin();
fit != mFile->get_xrdioro().end(); ++fit) {
fit->second->Dump(out);
}
for (auto fit = mFile->get_xrdiorw().begin();
fit != mFile->get_xrdiorw().end(); ++fit) {
fit->second->Dump(out);
}
return out.c_str();
}
void
data::datax::set_shared_url() {
// this call comes from already locked datax environments
std::string p;
if (mFile->has_xrdiorw(mReq)) {
p = mFile->xrdiorw(mReq)->getLastUrl();
if (p.empty()) {
p = mRemoteUrlRW;
}
} else {
if (mFile->has_xrdioro(mReq)) {
p = mFile->xrdioro(mReq)->getLastUrl();
if (p.empty()) {
p = mRemoteUrlRO;
}
}
}
mUrl = std::make_shared(p);
}
/* -------------------------------------------------------------------------- */
std::string
data::datax::url(bool nonblocking)
/* -------------------------------------------------------------------------- */
{
if (mUrl) {
return *mUrl;
}
std::string p;
{
if (nonblocking) {
if (!mLock.CondLock()) {
return "url:unresolved";
}
} else {
mLock.Lock();
}
if (mFile->has_xrdiorw(mReq)) {
p = mFile->xrdiorw(mReq)->getLastUrl();
} else {
if (mFile->has_xrdioro(mReq)) {
p = mFile->xrdioro(mReq)->getLastUrl();
}
}
mLock.UnLock();
}
size_t f1 = p.find("/fusex-open");
size_t f2 = p.find("eos.app");
if (f1 != std::string::npos) {
if (f2 != std::string::npos) {
p.erase(f1, f2-f1);
} else {
p.erase(f1);
}
p.insert(f1, " : ");
std::replace(p.begin(), p.end(), '&', ' ');
}
return p;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
data::dmap::waitflush(uint64_t seconds)
{
// wait that all pending data is flushed for 'seconds'
// if all is flushed, it returns true, otherwise false
for (uint64_t i = 0; i < seconds; ++i) {
size_t nattached = 0;
{
XrdSysMutexHelper mLock(this);
nattached = this->size();
}
if (nattached) {
eos_static_warning("[ waiting data to be flushed for %03d io objects] [ %d of %d seconds ]",
nattached, i, seconds);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
} else {
eos_static_warning("[ all data flushed ]");
return true;
}
}
eos_static_warning("[ data flush timed out after %d seconds ]", seconds);
return false;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
data::dmap::ioflush(ThreadAssistant& assistant)
/* -------------------------------------------------------------------------- */
{
ThreadAssistant::setSelfThreadName("data::ioflush");
while (!assistant.terminationRequested()) {
{
//eos_static_debug("");
std::vector data;
{
// avoid mutex contention
XrdSysMutexHelper mLock(this);
for (auto it = this->begin(); it != this->end(); ++it) {
if (it->second) {
data.push_back(it->second);
}
}
}
for (auto it = data.begin(); it != data.end(); ++it) {
XrdSysMutexHelper lLock((*it)->Locker());
eos_static_info("dbmap-in %#lx => %lx", (*it)->id(), &(*it));
}
for (auto it = data.begin(); it != data.end(); ++it) {
{
XrdSysMutexHelper lLock((*it)->Locker());
eos_static_info("dbmap-in => ino:%16lx %lx attached=%d", (*it)->id(), &(*it),
(*it)->attached_nolock());
if (!(*it)->attached_nolock()) {
// files which are detached might need an upstream sync
bool repeat = true;
while (repeat) {
// close all readers in async fashion
std::map& rmap = (*it)->file()->get_xrdioro();
for (auto fit = rmap.begin();
fit != rmap.end();) {
if (!fit->second) {
fit++;
continue;
}
if (fit->second->IsOpening() || fit->second->IsClosing()) {
eos_static_info("skipping xrdclproxyrw state=%d %d", fit->second->state(),
fit->second->IsClosed());
// skip files which are opening or closing
fit++;
continue;
}
if (fit->second->IsOpen()) {
// close read-only file if longer than 1s open
if ((fit->second->state_age() > 1.0)) {
if (fit->second->HasReadsInFlight()) {
// don't close files if there is still something in flight from read-ahead
// TODO: in EOS5 (Xrootd5) we can use SetProperty( "BundledClose", "true" ) and end the close with outstanding reads
eos_static_info("still have reads in flight ino:%16lx",(*it)->id());
fit++;
continue;
}
// closing read-only file
fit->second->CloseAsync(fit->second);
eos_static_info("closing reader");
fit++;
continue;
} else {
eos_static_info("age still too young ino:%16lx",(*it)->id());
}
}
if (fit->second->IsOpening() || fit->second->IsClosing()) {
// skip if its neither opened nor closed
fit++;
continue;
}
if (fit->second->IsClosed()) {
if (fit->second->DoneReadAhead()) {
fit = (*it)->file()->get_xrdioro().erase(fit);
eos_static_info("deleting reader");
continue;
}
}
fit++;
}
std::map& map = (*it)->file()->get_xrdiorw();
for (auto fit = map.begin();
fit != map.end(); ++fit) {
if (!fit->second) {
continue;
}
if (fit->second->IsOpening() || fit->second->IsClosing()) {
eos_static_info("skipping xrdclproxyrw state=%d %d", fit->second->state(),
fit->second->IsClosed());
// skip files which are opening or closing
break;
}
if (fit->second->IsOpen()) {
eos_static_info("skip flushing journal for req=%s id=%#lx", fit->first.c_str(),
(*it)->id());
// flush the journal using an asynchronous thread pool
// skipped: (*it)->journalflush_async(fit->first);
fit->second->set_state_TS(XrdCl::Proxy::WAITWRITE);
eos_static_info("changing to wait write state");
}
if (fit->second->IsWaitWrite()) {
if (!fit->second->OutstandingWrites()) {
if ((fit->second->state_age() > 1.0) &&
!EosFuse::Instance().mds.has_flush((*it)->id())) {
std::string msg;
// check if we need to run a recovery action
if ((fit->second->HadFailures(msg) ||
((*it)->simulate_write_error_in_flusher()))) {
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "status='%s' hint='will TryRecovery'",
msg.c_str()));
int tret = 0;
if (!(tret = (*it)->TryRecovery(0, true))) {
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "hint='success TryRecovery'"));
int jret = 0;
if ((jret = (*it)->journalflush(fit->first))) {
eos_static_err("ino:%16lx recovery failed", (*it)->id());
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "errno='%d' hint='failed journalflush'", jret));
} else {
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "hint='success journalflush'"));
}
} else {
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "errno='%d' hint='failed TryRecovery", tret));
}
}
eos_static_info("changing to close async state - age = %f ino:%16lx has-flush=%s",
fit->second->state_age(), (*it)->id(),
EosFuse::Instance().mds.has_flush((*it)->id()) ? "true" : "false");
fit->second->CloseAsync(fit->second);
break;
} else {
if (fit->second->state_age() < 1.0) {
eos_static_info("waiting for right age before async close - age = %f ino:%16lx has-flush=%s",
fit->second->state_age(), (*it)->id(),
EosFuse::Instance().mds.has_flush((*it)->id()) ? "true" : "false");
} else {
eos_static_info("waiting for flush before async close - age = %f ino:%16lx has-flush=%s",
fit->second->state_age(), (*it)->id(),
EosFuse::Instance().mds.has_flush((*it)->id()) ? "true" : "false");
}
break;
}
}
}
if (!fit->second->IsClosed()) {
break;
}
{
std::string msg;
if ((!(*it)->unlinked()) && fit->second->HadFailures(msg)) {
// let's see if the initial OpenAsync got a timeout, this we should retry always
XrdCl::XRootDStatus status = fit->second->opening_state();
bool rescue = true;
if (
(status.code == XrdCl::errConnectionError) ||
(status.code == XrdCl::errSocketTimeout) ||
(status.code == XrdCl::errOperationExpired) ||
(status.code == XrdCl::errSocketDisconnected)) {
// retry the open
eos_static_warning("re-issuing OpenAsync request after timeout - ino:%16lx err-code:%d",
(*it)->id(), status.code);
// to recover this errors XRootD requires new XrdCl::File object ... sigh ...
XrdCl::shared_proxy newproxy = XrdCl::Proxy::Factory();
newproxy->OpenAsync(newproxy, fit->second->url(), fit->second->flags(),
fit->second->mode(), 0);
newproxy->inherit_attached(fit->second);
newproxy->inherit_protocol(fit->second);
map[fit->first] = newproxy;
continue;
} else {
eos_static_warning("OpenAsync failed - trying recovery - ino:%16lx err-code:%d",
(*it)->id(), status.code);
if (status.errNo == kXR_noserver) {
int tret = 0;
if (!(tret = (*it)->TryRecovery(0, true))) {
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "hint='success TryRecovery'"));
int jret = 0;
if ((jret = (*it)->journalflush(fit->first))) {
eos_static_err("ino:%16lx recovery failed", (*it)->id());
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "errno='%d' hint='failed journalflush'", jret));
} else {
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "hint='success journalflush'"));
continue;
}
} else {
(*it)->recoverystack().push_back
(eos_static_log(LOG_SILENT, "errno='%d' hint='failed TryRecovery", tret));
}
}
eos_static_warning("giving up OpenAsync request - ino:%16lx err-code:%d",
(*it)->id(), status.code);
if (status.errNo == kXR_overQuota) {
// don't preserve these files, they got an application error beforehand
rescue = false;
}
}
// ---------------------------------------------------------
// we really have to avoid this to happen, but
// we can put everything we have cached in a save place for
// manual recovery and tag the error message
// ---------------------------------------------------------
if (rescue) {
std::string file_rescue_location;
std::string journal_rescue_location;
int dt = (*it)->file()->file() ? (*it)->file()->file()->rescue(
file_rescue_location) : 0;
int jt = (*it)->file()->journal() ? (*it)->file()->journal()->rescue(
journal_rescue_location) : 0;
if (!dt || !jt) {
const char* cmsg =
eos_static_log(LOG_CRIT,
"ino:%16lx msg=%s file-recovery=%s journal-recovery=%s",
(*it)->id(),
msg.c_str(),
(!dt) ? file_rescue_location.c_str() : "",
(!jt) ? journal_rescue_location.c_str() : "");
(*it)->recoverystack().push_back(cmsg);
}
}
}
eos_static_info("deleting xrdclproxyrw state=%d %d", fit->second->state(),
fit->second->IsClosed());
(*it)->file()->get_xrdiorw().erase(fit);
break;
}
}
repeat = false;
}
}
}
XrdSysMutexHelper mLock(this);
XrdSysMutexHelper lLock((*it)->Locker());
// re-check that nobody is attached
if (!(*it)->attached_nolock() && !(*it)->file()->get_xrdiorw().size() &&
!(*it)->file()->get_xrdioro().size()) {
eos_static_info("dropping one");
// here we make the data object unreachable for new clients
(*it)->detach_nolock();
cachehandler::instance().rm((*it)->id());
this->erase((*it)->id());
this->erase((*it)->id() + 0xffffffff);
}
}
assistant.wait_for(std::chrono::milliseconds(128));
}
}
}