//------------------------------------------------------------------------------
//! @file xrdclproxy.cc
//! @author Andreas-Joachim Peters CERN
//! @brief XrdCl proxy class
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "xrdclproxy.hh"
#include "common/Logging.hh"
#include "common/Path.hh"
#include "XrdCl/XrdClXRootDResponses.hh"
using namespace XrdCl;
ssize_t XrdCl::Proxy::sChunkTimeout = 300;
XrdCl::BufferManager XrdCl::Proxy::sWrBufferManager;
XrdCl::BufferManager XrdCl::Proxy::sRaBufferManager;
std::atomic XrdCl::Proxy::sProxy;
std::mutex XrdCl::Proxy::WriteAsyncHandler::gBuffReferenceMutex;
std::map
XrdCl::Proxy::WriteAsyncHandler::gBufferReference;
std::mutex XrdCl::Proxy::ReadAsyncHandler::gExpiredChunksMutex;
std::vector
XrdCl::Proxy::ReadAsyncHandler::gExpiredChunks;
XrdCl::shared_proxy XrdCl::Proxy::Factory()
{
shared_proxy sp = std::make_shared();
return sp;
}
XrdCl::Proxy::ProxyStat XrdCl::Proxy::ProxyStatHandle::sProxyStats;
std::shared_ptr
XrdCl::Proxy::ProxyStatHandle::Get()
{
return std::make_shared();
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::Write(uint64_t offset,
uint32_t size,
const void* buffer,
ResponseHandler* handler,
uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("offset=%lu size=%u", offset, size);
XRootDStatus status = WaitOpen();
if (!status.IsOK()) {
return status;
}
return File::Write(offset, size, buffer, handler, timeout);
}
/* -------------------------------------------------------------------------- */
XRootDStatus
XrdCl::Proxy::Read(XrdCl::shared_proxy proxy,
uint64_t offset,
uint32_t size,
void* buffer,
uint32_t& bytesRead,
uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("offset=%lu size=%u", offset, size);
XRootDStatus status = WaitOpen();
bytesRead = 0;
if (!status.IsOK()) {
return status;
}
eos_debug("----: read: offset=%lu size=%u", offset, size);
int readahead_window_hit = 0;
uint64_t current_offset = offset;
uint32_t current_size = size;
bool isEOF = false;
bool request_next = true;
std::set delete_chunk;
std::set expired_chunk;
void* pbuffer = buffer;
if ((off_t)offset == (off_t)mPosition) {
mSeqDistance += size;
} else {
if (!XReadAheadDisabled) {
// Offset if unsigned so abs might not work properly in case of underflow
off_t seek_distance = (offset >= (uint64_t)mPosition ? offset - mPosition :
mPosition -
offset);
double sparse_ratio = 1.0 * mSeqDistance / (seek_distance + mSeqDistance);
if (EOS_LOGS_DEBUG) {
eos_debug("sparse ratio:= %.02f seq-distance=%lu seek-distance=%lu",
sparse_ratio, (off_t)mSeqDistance, (off_t)seek_distance);
}
mSeqDistance = size;
if (sparse_ratio && (sparse_ratio < XReadAheadSparseRatio)) {
eos_notice("sparse ratio:= %.02f seq-distance=%lu seek-distance=%lu - disabling readahead permanently url:'%s'",
sparse_ratio, (off_t)mSeqDistance, (off_t)seek_distance,
sparse_ratio, mUrl.c_str());
XReadAheadDisabled = true;
}
}
}
if ((XReadAheadStrategy != NONE)) {
ReadCondVar().Lock();
XReadAheadBlocksIs = 0;
if (ChunkRMap().size()) {
auto last_chunk_before_match = ChunkRMap().begin();
for (auto it = ChunkRMap().begin(); it != ChunkRMap().end(); ++it) {
// extract all possible data from the read-ahead map
off_t match_offset;
uint32_t match_size;
XrdSysCondVarHelper lLock(it->second->ReadCondVar());
if (EOS_LOGS_DEBUG) {
eos_debug("----: eval offset=%lu chunk-offset=%lu rah-position=%lu", offset,
it->second->offset(), mReadAheadPosition);
}
if (it->second->matches(current_offset, current_size, match_offset,
match_size)) {
readahead_window_hit++;
size_t cnt = 0;
while (!it->second->done()) {
it->second->ReadCondVar().WaitMS(25);
cnt++;
if (!(cnt % 2400)) {
// every 60 seconds ...
if (it->second->expired()) {
eos_crit("read-ahead request expired after %u cycles - now: %lu ctime: %lu",
cnt, time(NULL), it->second->creationtime());
continue;
}
}
}
status = it->second->Status();
if (it->second->Status().IsOK()) {
// the match result can change after the read actually returned
if (!it->second->matches(current_offset, current_size, match_offset,
match_size)) {
continue;
}
if (EOS_LOGS_DEBUG) {
eos_debug("----: prefetched offset=%lu m-offset=%lu current-size=%u m-size=%u dim=%ld",
current_offset, match_offset, current_size, match_size,
(char*) buffer - (char*) pbuffer);
eos_debug("----: out-buffer=%lx in-buffer=%lx in-buffer-size=%lu",
(long unsigned int) buffer, (long unsigned int) it->second->buffer(),
it->second->vbuffer().size());
}
// just copy what we have
memcpy(buffer, it->second->buffer() + match_offset - it->second->offset(),
match_size);
bytesRead += match_size;
mTotalReadAheadHitBytes += match_size;
buffer = (char*) buffer + match_size;
current_offset = match_offset + match_size;
current_size -= match_size;
isEOF = it->second->eof();
if (isEOF) {
eos_info("got EOF in matching chunk %lu (%lu)", it->second->offset(),
mPosition);
request_next = false;
XReadAheadNom = 0;
XReadAheadBlocksNom = XReadAheadBlocksMin;
mReadAheadPosition = 0;
XReadAheadReenableHits = 0;
break;
}
}
} else {
if (!readahead_window_hit) {
last_chunk_before_match = it;
} else {
XReadAheadBlocksIs++;
}
isEOF = it->second->eof();
if (isEOF) {
eos_info("got EOF in matching chunk %lu (%lu)", it->second->offset(),
mPosition);
request_next = false;
XReadAheadNom = 0;
XReadAheadBlocksNom = XReadAheadBlocksMin;
mReadAheadPosition = 0;
XReadAheadReenableHits = 0;
}
}
}
if (readahead_window_hit) {
// check if we can remove previous prefetched chunks, we keep one block before the current read position
for (auto it = ChunkRMap().begin(); it != last_chunk_before_match; ++it) {
XrdSysCondVarHelper lLock(it->second->ReadCondVar());
if (it->second->expired()) {
expired_chunk.insert(it->first);
} else {
if (it->second->done()) {
if (EOS_LOGS_DEBUG) {
eos_debug("----: dropping chunk offset=%lu chunk-offset=%lu", offset,
it->second->offset());
}
delete_chunk.insert(it->first);
}
}
}
} else {
// clean-up all chunks in the read-ahead map
for (auto it = ChunkRMap().begin(); it != ChunkRMap().end(); ++it) {
XrdSysCondVarHelper lLock(it->second->ReadCondVar());
size_t cnt = 0;
while (!it->second->done()) {
it->second->ReadCondVar().WaitMS(25);
cnt++;
if (!(cnt % 2400)) {
// every 60 seconds ...
if (it->second->expired()) {
eos_crit("read-ahead request expired after %u cycles - now: %lu ctime: %lu",
cnt, time(NULL), it->second->creationtime());
break;
}
}
}
if (it->second->expired()) {
expired_chunk.insert(it->first);
} else {
delete_chunk.insert(it->first);
}
}
}
for (auto it = delete_chunk.begin(); it != delete_chunk.end(); ++it) {
ChunkRMap().erase(*it);
}
for (auto it = expired_chunk.begin(); it != expired_chunk.end(); ++it) {
auto chunk = ChunkRMap()[*it];
{
// put on a garbage stack
std::lock_guard lock(
XrdCl::Proxy::ReadAsyncHandler::gExpiredChunksMutex);
XrdCl::Proxy::ReadAsyncHandler::gExpiredChunks.push_back(chunk);
}
{
// delete form the read-ahead map
ChunkRMap().erase(*it);
}
}
} else {
if ((off_t) offset == mPosition) {
XReadAheadReenableHits++;
if ((!XReadAheadDisabled) && (XReadAheadReenableHits > 2)) {
eos_info("re-enabling read-ahead at %lu (%lu)", offset, mPosition);
// re-enable read-ahead if sequential reading occurs
request_next = true;
if (!mReadAheadPosition) {
set_readahead_position(offset + size);
// tune the read-ahead size with the read-pattern
if (size > XReadAheadNom) {
XReadAheadNom = size;
}
if (XReadAheadNom > XReadAheadMax) {
XReadAheadNom = XReadAheadMax;
}
}
}
} else {
XReadAheadReenableHits = 0;
eos_info("disabling read-ahead at %lu (%lu)", offset, mPosition);
request_next = false;
XReadAheadNom = 0;
;
XReadAheadBlocksNom = XReadAheadBlocksMin;
set_readahead_position(0);
}
}
if (request_next) {
// dynamic window scaling
if (readahead_window_hit) {
if (XReadAheadStrategy == DYNAMIC) {
// increase the read-ahead window
XReadAheadNom *= 2;
if (XReadAheadNom > XReadAheadMax) {
XReadAheadNom = XReadAheadMax;
}
// increase the number of pre-fetched blocks
XReadAheadBlocksNom *= 2;
if (XReadAheadBlocksNom > XReadAheadBlocksMax) {
XReadAheadBlocksNom = XReadAheadBlocksMax;
}
}
}
if (EOS_LOGS_DEBUG) {
eos_debug("hit:%d chunks:%d pre-blocks:%d to-fetch:%d", readahead_window_hit,
ChunkRMap().size(), XReadAheadBlocksNom,
XReadAheadBlocksNom - XReadAheadBlocksIs);
}
// pre-fetch missing read-ahead blocks, if there is a window !=0
size_t blocks_to_fetch = XReadAheadNom ?
((XReadAheadBlocksNom > XReadAheadBlocksIs) ? ((XReadAheadBlocksNom -
XReadAheadBlocksIs)) : 0)
: 0;
for (size_t n_fetch = 0; n_fetch < blocks_to_fetch; n_fetch++) {
if (EOS_LOGS_DEBUG)
eos_debug("----: pre-fetch window=%lu pf-offset=%lu block(%d/%d)",
XReadAheadNom,
(unsigned long) mReadAheadPosition,
n_fetch, blocks_to_fetch
);
if (mReadAheadPosition > get_readahead_maximum_position()) {
eos_debug("----: pre-fetch skipped max-readahead-position=%lu",
get_readahead_maximum_position());
}
if (!ChunkRMap().count(mReadAheadPosition)) {
ReadCondVar().UnLock();
XrdCl::Proxy::read_handler rahread = ReadAsyncPrepare(proxy, mReadAheadPosition,
XReadAheadNom, false);
if (!rahread->valid()) {
ReadCondVar().Lock();
// no buffer available
break;
}
XRootDStatus rstatus = PreReadAsync(mReadAheadPosition, XReadAheadNom,
rahread, timeout);
if (rstatus.IsOK()) {
mReadAheadPosition += XReadAheadNom;
mTotalReadAheadBytes += XReadAheadNom;
}
ReadCondVar().Lock();
}
}
ReadCondVar().UnLock();
} else {
ReadCondVar().UnLock();
}
}
if (current_size) {
// do a synchronous read for missing pieces
uint32_t rbytes_read = 0;
status = File::Read(current_offset,
current_size,
buffer, rbytes_read, timeout);
if (status.IsOK()) {
if (rbytes_read) {
if (EOS_LOGS_DEBUG) {
eos_debug("----: postfetched offset=%lu size=%u rbytes=%d", current_offset,
current_size, rbytes_read);
}
}
bytesRead += rbytes_read;
}
}
set_readstate(&status);
if (status.IsOK()) {
mPosition = offset + size;
mTotalBytes += bytesRead;
}
return status;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::OpenAsync(XrdCl::shared_proxy proxy,
const std::string& url,
OpenFlags::Flags flags,
Access::Mode mode,
uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("url=%s flags=%x mode=%x", url.c_str(), (int) flags, (int) mode);
XrdSysCondVarHelper lLock(OpenCondVar());
int in_state = state();
mUrl = url;
mFlags = flags;
mMode = mode;
mTimeout = timeout;
if ((state() == OPENING) ||
(state() == WAITWRITE)) {
XRootDStatus status(XrdCl::stError,
suAlreadyDone,
XrdCl::errInProgress,
"in progress"
);
return status;
}
if (state() == OPENED) {
XRootDStatus status(XrdCl::stOK,
0,
0,
"opened"
);
return status;
}
if (state() == FAILED) {
eos_err("url=%s flags=%x mode=%x state=failed", url.c_str(), (int) flags,
(int) mode);
return XOpenState;
}
// Disable recovery on read and write
#if kXR_PROTOCOLVERSION == 0x00000297
((XrdCl::File*)(this))->EnableReadRecovery(false);
((XrdCl::File*)(this))->EnableWriteRecovery(false);
#else
SetProperty("ReadRecovery", "false");
SetProperty("WriteRecovery", "false");
#endif
if (EOS_LOGS_DEBUG) {
eos_debug("this=%x url=%s in-state %d state %d\n", this, url.c_str(), in_state,
state());
}
XrdCl::XRootDStatus status;
status = fuzzing().OpenAsyncSubmitFuzz();
if (!status.IsOK()) {
} else {
XOpenAsyncHandler.SetProxy(proxy);
status = Open(url.c_str(),
flags,
mode,
&XOpenAsyncHandler,
timeout);
}
if (status.IsOK()) {
set_state(OPENING);
} else {
eos_err("url=%s flags=%x mode=%x state=failed errmsg=%s", url.c_str(),
(int) flags, (int) mode, status.ToString().c_str());
XOpenAsyncHandler.SetProxy(0);
set_state(FAILED);
}
return XOpenState;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::OpenAsyncHandler::HandleResponseWithHosts(
XrdCl::XRootDStatus* status,
XrdCl::AnyObject* response,
XrdCl::HostList* hostList)
/* -------------------------------------------------------------------------- */
{
eos_static_debug("");
{
XrdSysCondVarHelper openLock(proxy()->OpenCondVar());
XRootDStatus fuzzingstatus = proxy()->fuzzing().OpenAsyncResponseFuzz();
if (!fuzzingstatus.IsOK()) {
eos_static_debug("fuzzing open response");
*status = fuzzingstatus;
}
if (status->IsOK()) {
proxy()->set_state(OPENED);
proxy()->set_lasturl();
openLock.UnLock();
XrdSysCondVarHelper writeLock(proxy()->WriteCondVar());
while (proxy()->WriteQueue().size()) {
write_handler handler = proxy()->WriteQueue().front();
XRootDStatus status;
eos_static_debug("sending scheduled write request: off=%ld size=%lu timeout=%hu",
handler->offset(),
handler->vbuffer().size(),
handler->timeout());
writeLock.UnLock();
status = proxy()->WriteAsync((uint64_t) handler->offset(),
(uint32_t)(handler->vbuffer().size()),
0,
handler,
handler->timeout()
);
writeLock.Lock(&proxy()->WriteCondVar());
proxy()->WriteQueue().pop_front();
if (!status.IsOK()) {
proxy()->set_writestate(&status);
}
}
writeLock.UnLock();
openLock.Lock(&proxy()->OpenCondVar());
} else {
eos_static_err("state=failed async open returned errmsg=%s",
status->ToString().c_str());
XrdSysCondVarHelper writeLock(proxy()->WriteCondVar());
if (proxy()->WriteQueue().size()) {
// if an open failes we clean the write queue
proxy()->CleanWriteQueue();
}
writeLock.UnLock();
proxy()->set_state(FAILED, status);
}
proxy()->OpenCondVar().Signal();
delete hostList;
delete status;
if (response) {
delete response;
}
}
mProxy = 0;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::ReOpenAsync(XrdCl::shared_proxy proxy)
/* -------------------------------------------------------------------------- */
{
if (mUrl.length()) {
set_state_TS(CLOSED);
return OpenAsync(proxy, mUrl, mFlags, mMode, mTimeout);
} else {
XRootDStatus status(XrdCl::stError,
suRetry,
XrdCl::errUninitialized,
"never opened before"
);
eos_err("state=failed reopenasync errmsg=%s", status.ToString().c_str());
set_state_TS(FAILED, &status);
return status;
}
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::CloseAsync(XrdCl::shared_proxy proxy, uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
// don't close files attached by several clients
if (mAttached > 1) {
eos_debug("still attached");
return XRootDStatus();
}
WaitOpen();
DropReadAhead();
XrdSysCondVarHelper lLock(OpenCondVar());
// only an opened file requires a close, otherwise we return the last known state
if ((state() == OPENED) ||
(state() == WAITWRITE)) {
XCloseAsyncHandler.SetProxy(proxy);
XrdCl::XRootDStatus status = XrdCl::File::Close(&XCloseAsyncHandler,
timeout);
if (!status.IsOK()) {
eos_err("state=failed closeasync errms=%s", status.ToString().c_str());
set_state(FAILED, &status);
XCloseAsyncHandler.SetProxy(0);
} else {
set_state(CLOSING, &status);
}
} else {
eos_crit("%x closing an unopened file state=%d url=%s\n", this, state(),
mUrl.c_str());
}
return XOpenState;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::ScheduleCloseAsync(XrdCl::shared_proxy proxy, uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
if (mAttached > 1) {
eos_debug("still attached");
return XRootDStatus();
}
{
bool no_chunks_left = true;
if ((state() == OPENING) ||
(state() == OPENED)) {
{
DropReadAhead();
XrdSysCondVarHelper lLock(WriteCondVar());
// either we have submitted chunks
if (ChunkMap().size()) {
no_chunks_left = false;
}
// or we have chunks still to be submitted
if (WriteQueue().size()) {
no_chunks_left = false;
}
if (!no_chunks_left) {
// indicate to close this file when the last write-callback arrived
eos_debug("indicating close-after-write");
XCloseAfterWrite = true;
XCloseAfterWriteTimeout = timeout;
}
}
if (no_chunks_left) {
return CloseAsync(proxy, timeout);
} else {
return XOpenState;
}
}
}
XRootDStatus status(XrdCl::stError,
suAlreadyDone,
XrdCl::errInvalidOp,
"file not open"
);
return status;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::Close(uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
// don't close files attached by several clients
if (mAttached > 1) {
return XRootDStatus();
}
WaitOpen();
if (IsOpen()) {
Collect();
}
DropReadAhead();
XrdSysCondVarHelper lLock(OpenCondVar());
XrdCl::XRootDStatus status = XrdCl::File::Close(timeout);
set_state(CLOSED, &status);
return status;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::CloseAsyncHandler::HandleResponse(XrdCl::XRootDStatus* status,
XrdCl::AnyObject* response)
/* -------------------------------------------------------------------------- */
{
eos_static_debug("");
{
XrdSysCondVarHelper lLock(mProxy->OpenCondVar());
if (!status->IsOK()) {
// if the open failed before, we leave the open failed state here
if (!mProxy->isDeleted()) {
if (mProxy->state() != XrdCl::Proxy::FAILED) {
eos_static_crit("%x current status = %d - setting CLOSEFAILED - msg=%s url=%s\n",
mProxy.get(), mProxy->state(), status->ToString().c_str(),
mProxy->url().c_str());
mProxy->set_state(XrdCl::Proxy::CLOSEFAILED, status);
}
} else {
eos_static_info("%x current status = %d - silencing CLOSEFAILED - msg=%s url=%s\n",
mProxy.get(), mProxy->state(), status->ToString().c_str(),
mProxy->url().c_str());
// an unlinked file can have a close failure response
XRootDStatus okstatus;
mProxy->set_state(XrdCl::Proxy::CLOSED, &okstatus);
}
} else {
mProxy->set_state(XrdCl::Proxy::CLOSED, status);
}
mProxy->OpenCondVar().Signal();
delete response;
delete status;
}
mProxy = 0;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WaitClose()
/* -------------------------------------------------------------------------- */
{
eos_debug("");
if (IsOpen()) {
Collect();
}
XrdSysCondVarHelper lLock(OpenCondVar());
while (state() == CLOSING) {
OpenCondVar().WaitMS(25);
}
return XOpenState;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WaitOpen()
/* -------------------------------------------------------------------------- */
{
eos_debug("");
XrdSysCondVarHelper lLock(OpenCondVar());
while (state() == OPENING) {
OpenCondVar().WaitMS(25);
}
return XOpenState;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WaitOpen(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
XrdSysCondVarHelper lLock(OpenCondVar());
while (state() == OPENING) {
if (req && fuse_req_interrupted(req)) {
return EINTR;
}
OpenCondVar().WaitMS(25);
}
return 0;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::IsOpening()
/* -------------------------------------------------------------------------- */
{
XrdSysCondVarHelper lLock(OpenCondVar());
eos_debug("state=%d", state());
return (state() == OPENING) ? true : false;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::IsClosing()
/* -------------------------------------------------------------------------- */
{
XrdSysCondVarHelper lLock(OpenCondVar());
eos_debug("state=%d", state());
return (state() == CLOSING) ? true : false;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::IsOpen()
/* -------------------------------------------------------------------------- */
{
XrdSysCondVarHelper lLock(OpenCondVar());
eos_debug("state=%d", state());
return (state() == OPENED) ? true : false;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::IsClosed()
/* -------------------------------------------------------------------------- */
{
XrdSysCondVarHelper lLock(OpenCondVar());
eos_debug("state=%d", state());
return ((state() == CLOSED) || (state() == CLOSEFAILED) ||
(state() == FAILED)) ? true : false;
}
bool
XrdCl::Proxy::IsWaitWrite()
{
XrdSysCondVarHelper lLock(OpenCondVar());
eos_debug("state=%d", state());
return (state() == WAITWRITE) ? true : false;
}
bool
XrdCl::Proxy::HadFailures(std::string& message)
{
bool ok = true;
XrdSysCondVarHelper lLock(OpenCondVar());
if (state() == CLOSEFAILED) {
message = "file close failed";
ok = false;
}
if (state() == FAILED) {
message = "file open failed";
ok = false;
}
if (!write_state().IsOK()) {
message = "file writing failed";
ok = false;
}
eos_debug("state=%d had-failures=%d", state(), !ok);
return !ok;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WriteAsyncHandler::HandleResponse(XrdCl::XRootDStatus* status,
XrdCl::AnyObject* response)
/* -------------------------------------------------------------------------- */
{
eos_static_debug("ino=%llx", mProxy->id());
bool no_chunks_left = true;
write_handler myself;
{
{
XrdSysCondVarHelper lLock(mProxy->WriteCondVar());
{
if (!status->IsOK()) {
mProxy->set_writestate(status);
eos_static_crit("write error '%s'", status->ToString().c_str());
}
mProxy->WriteCondVar().Signal();
delete response;
delete status;
}
}
}
{
XrdSysCondVarHelper lLock(mProxy->WriteCondVar());
myself = mProxy->ChunkMap()[(int64_t)this];
mProxy->ChunkMap().erase((uint64_t)this);
}
if (no_chunks_left) {
if (mProxy->close_after_write()) {
eos_static_debug("sending close-after-write");
// send an asynchronous close now
XrdCl::XRootDStatus status = mProxy->CloseAsync(mProxy,
mProxy->close_after_write_timeout());
}
}
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WriteAsyncHandler::DumpReferences(std::string& out)
{
std::lock_guard lock(gBuffReferenceMutex);
for (auto it = gBufferReference.begin(); it != gBufferReference.end(); ++it) {
out += "ref:";
out += it->first;
out += " := ";
out += std::to_string(it->second);
out += "\n";
}
return;
}
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::write_handler
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WriteAsyncPrepare(XrdCl::shared_proxy proxy, uint32_t size,
uint64_t offset,
uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
write_handler dst = std::make_shared(proxy, size, offset,
timeout);
XrdSysCondVarHelper lLock(WriteCondVar());
ChunkMap()[(uint64_t) dst.get()] = dst;
return dst;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WriteAsync(uint64_t offset,
uint32_t size,
const void* buffer,
XrdCl::Proxy::write_handler handler,
uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
// a buffer indicates, the handler buffer is already filled
if (buffer) {
handler->copy(buffer, size);
}
XRootDStatus status = Write(static_cast(offset),
static_cast(size),
handler->buffer(), handler.get(), timeout);
if (!status.IsOK()) {
// remove failing requests
XrdSysCondVarHelper lLock(WriteCondVar());
ChunkMap().erase((uint64_t) handler.get());
}
return status;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::ScheduleWriteAsync(
const void* buffer,
write_handler handler
)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
if (buffer) {
handler->copy(buffer, handler->vbuffer().size());
}
XrdSysCondVarHelper openLock(OpenCondVar());
if (state() == OPENED) {
openLock.UnLock();
eos_debug("direct");
inc_write_queue_direct_submissions();
// we can send off the write request
return WriteAsync(handler->offset(),
(size_t) handler->vbuffer().size(),
0,
handler,
handler->timeout());
}
if (state() == OPENING) {
inc_write_queue_scheduled_submissions();
eos_debug("scheduled");
// we add this write to the list to be submitted when the open call back arrives
XrdSysCondVarHelper lLock(WriteCondVar());
WriteQueue().push_back(handler);
// we can only say status OK in that case
XRootDStatus status(XrdCl::stOK,
0,
XrdCl::errInProgress,
"in progress"
);
return status;
} else {
// remove requests with failed open
XrdSysCondVarHelper lLock(WriteCondVar());
ChunkMap().erase((uint64_t) handler.get());
}
return XOpenState;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WaitWrite()
/* -------------------------------------------------------------------------- */
{
eos_debug("");
WaitOpen();
if (state() == WAITWRITE) {
XrdSysCondVarHelper openLock(OpenCondVar());
return XOpenState;
}
// check if the open failed
if (state() != OPENED) {
XrdSysCondVarHelper openLock(OpenCondVar());
return XOpenState;
}
{
time_t wait_start = time(NULL);
XrdSysCondVarHelper lLock(WriteCondVar());
while (ChunkMap().size()) {
eos_debug(" [..] map-size=%lu", ChunkMap().size());
WriteCondVar().WaitMS(1000);
time_t wait_stop = time(NULL);
if (ChunkMap().size() && ((wait_stop - wait_start) > sChunkTimeout)) {
eos_err("discarding %d chunks in-flight for writing", ChunkMap().size());
for (auto it = ChunkMap().begin(); it != ChunkMap().end(); ++it) {
it->second->disable();
}
ChunkMap().clear();
return XRootDStatus(XrdCl::stFatal,
suDone,
XrdCl::errSocketTimeout,
"request timeout"
);
}
}
eos_debug(" [..] map-size=%lu", ChunkMap().size());
}
{
XrdSysCondVarHelper writeLock(WriteCondVar());
return XWriteState;
}
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WaitWrite(fuse_req_t req)
/* -------------------------------------------------------------------------- */
{
// this waits for all writes to come back and checks for interrupts inbetween
// this assumes a file is in OPENED state
{
XrdSysCondVarHelper lLock(WriteCondVar());
while (ChunkMap().size()) {
if (req && fuse_req_interrupted(req)) {
return EINTR;
}
eos_debug(" [..] map-size=%lu", ChunkMap().size());
WriteCondVar().WaitMS(1000);
}
eos_debug(" [..] map-size=%lu", ChunkMap().size());
}
return 0;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::CollectWrites()
/* -------------------------------------------------------------------------- */
{
// this waits for all writes to come back and checks for interrupts inbetween
// this assumes a file is in OPENED state
{
XrdSysCondVarHelper lLock(WriteCondVar());
while (ChunkMap().size()) {
eos_debug(" [..] map-size=%lu", ChunkMap().size());
WriteCondVar().WaitMS(1000);
}
eos_debug(" [..] map-size=%lu", ChunkMap().size());
}
return XWriteState;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::OutstandingWrites()
/* -------------------------------------------------------------------------- */
{
eos_debug("");
XrdSysCondVarHelper lLock(WriteCondVar());
return ChunkMap().size() ? true : false;
}
void
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::ReadAsyncHandler::HandleResponse(XrdCl::XRootDStatus* status,
XrdCl::AnyObject* response)
/* -------------------------------------------------------------------------- */
{
eos_static_debug("");
{
XrdSysCondVarHelper lLock(ReadCondVar());
mStatus = *status;
bool fuzzing = proxy()->fuzzing().ReadAsyncResponseFuzz();
if (!fuzzing && status->IsOK()) {
XrdCl::ChunkInfo* chunk = 0;
if (response) {
response->Get(chunk);
if (valid()) {
if (chunk->length < mBuffer->size()) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("handler %x received %lu instead of %lu\n", this,
chunk->length, mBuffer->size());
}
mBuffer->resize(chunk->length);
}
}
if (!chunk->length) {
mEOF = true;
}
delete response;
} else {
mBuffer->resize(0);
}
} else {
if (status->IsOK()) {
if (valid()) {
mBuffer->resize(0);
}
if (response) {
delete response;
}
}
// we free the buffer, so it get's back to the buffer handler;
release_buffer();
}
mDone = true;
delete status;
mProxy->dec_read_chunks_in_flight();
ReadCondVar().Signal();
}
}
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::read_handler
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::ReadAsyncPrepare(XrdCl::shared_proxy proxy, off_t offset,
uint32_t size, bool blocking)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
read_handler src = std::make_shared(proxy, offset, size,
blocking);
if (!src->valid()) {
// check if an IO buffer was allocated
return src;
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("handler %x request %lu/%lu non-blocking\n", &(*src), offset,
size);
}
XrdSysCondVarHelper lLock(ReadCondVar());
if (!ChunkRMap().count(src->offset())) {
inc_read_chunks_in_flight();
}
ChunkRMap()[(uint64_t) src->offset()] = src;
ReadCondVar().Signal();
return src;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::PreReadAsync(uint64_t offset,
uint32_t size,
read_handler handler,
uint16_t timeout)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
XRootDStatus status = WaitOpen();
if (!status.IsOK()) {
// remove the allocated chunk buffer
XrdSysCondVarHelper lLock(ReadCondVar());
ChunkRMap().erase(offset);
dec_read_chunks_in_flight();
return status;
}
XRootDStatus rstatus = File::Read(static_cast(offset),
static_cast(size),
(void*) handler->buffer(), handler.get(), timeout);
if (!rstatus.IsOK()) {
// remove the allocated chunk buffer
XrdSysCondVarHelper lLock(ReadCondVar());
ChunkRMap().erase(offset);
dec_read_chunks_in_flight();
}
return rstatus;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::WaitRead(read_handler handler)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
XrdSysCondVarHelper lLock(handler->ReadCondVar());
time_t wait_start = time(NULL);
while (!handler->done()) {
handler->ReadCondVar().WaitMS(1000);
time_t wait_stop = time(NULL);
if (((wait_stop - wait_start) > sChunkTimeout)) {
eos_err("discarding %d chunks in-flight for reading", ChunkMap().size());
for (auto it = ChunkRMap().begin(); it != ChunkRMap().end(); ++it) {
it->second->disable();
}
clear_read_chunks_in_flight();
ChunkRMap().clear();
return XRootDStatus(XrdCl::stFatal,
suDone,
XrdCl::errSocketTimeout,
"request timeout"
);
}
}
if (handler->valid()) {
eos_debug(" [..] read-size=%lu", handler->vbuffer().size());
}
return handler->Status();
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::ReadAsync(read_handler handler,
uint32_t size,
void* buffer,
uint32_t& bytesRead)
/* -------------------------------------------------------------------------- */
{
eos_debug("");
XRootDStatus status = WaitRead(handler);
if (!status.IsOK()) {
return status;
}
bytesRead = (size < handler->vbuffer().size()) ? size :
handler->vbuffer().size();
memcpy(buffer, handler->buffer(), bytesRead);
return status;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::DoneAsync(read_handler handler)
{
eos_debug("");
XRootDStatus status = WaitRead(handler);
XrdSysCondVarHelper lLock(ReadCondVar());
ChunkRMap().erase(handler->offset());
return true;
}
/* -------------------------------------------------------------------------- */
XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::Sync(uint16_t timeout)
{
eos_debug("");
return File::Sync(timeout);
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::attach()
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mAttachedMutex);
mAttached++;
eos_debug("attached=%u", mAttached);
return;
}
/* -------------------------------------------------------------------------- */
size_t
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::detach()
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mAttachedMutex);
mAttached--;
eos_debug("attached=%u", mAttached);
return mAttached;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::attached()
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mAttachedMutex);
return mAttached ? true : false;
}
/* -------------------------------------------------------------------------- */
size_t
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::get_attached()
/* -------------------------------------------------------------------------- */
{
XrdSysMutexHelper lLock(mAttachedMutex);
return mAttached;
}
/* -------------------------------------------------------------------------- */
int XrdCl::Fuzzing::errors[22] = { 101, 102, 103, 104, 105, 106, 107, 108, 109,
201, 202, 203, 204, 205, 206, 207,
301, 302, 303, 304, 305, 306
};
/* -------------------------------------------------------------------------- */
size_t XrdCl::Fuzzing::non_fatal_errors = 9;
/* -------------------------------------------------------------------------- */
size_t XrdCl::Fuzzing::fatal_errors = 13;
/* -------------------------------------------------------------------------- */
size_t XrdCl::Fuzzing::open_async_submit_scaler = 0;
size_t XrdCl::Fuzzing::open_async_submit_counter = 0;
size_t XrdCl::Fuzzing::open_async_return_scaler = 0;
size_t XrdCl::Fuzzing::open_async_return_counter = 0;
size_t XrdCl::Fuzzing::read_async_return_scaler = 0;
size_t XrdCl::Fuzzing::read_async_return_counter = 0;
bool XrdCl::Fuzzing::open_async_submit_fatal = false;
bool XrdCl::Fuzzing::open_async_return_fatal = false;
/* -------------------------------------------------------------------------- */
XrdCl::XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Fuzzing::OpenAsyncSubmitFuzz()
{
if (open_async_submit_scaler) {
if (!(open_async_submit_counter++ % open_async_submit_scaler)) {
size_t random_error = rand() % (non_fatal_errors + (open_async_submit_fatal ?
fatal_errors : 0));
eos_static_debug("fuzzing error %d", errors[random_error]);
if (random_error < non_fatal_errors) {
XrdCl::XRootDStatus status(XrdCl::stError, errors[random_error], 0);
return status;
} else {
XrdCl::XRootDStatus status(XrdCl::stFatal, errors[random_error], 0);
return status;
}
}
}
// size_t open_async_submit_counter;
XRootDStatus status(XrdCl::stOK,
0,
0,
"open submitted"
);
return status;
}
/* -------------------------------------------------------------------------- */
XrdCl::XRootDStatus
/* -------------------------------------------------------------------------- */
XrdCl::Fuzzing::OpenAsyncResponseFuzz()
{
if (open_async_return_scaler) {
if (!(open_async_return_counter++ % open_async_return_scaler)) {
size_t random_error = rand() % (non_fatal_errors + (open_async_return_fatal ?
fatal_errors : 0));
eos_static_debug("fuzzing error %d", errors[random_error]);
if (random_error < non_fatal_errors) {
XrdCl::XRootDStatus status(XrdCl::stError, errors[random_error], 0);
return status;
} else {
XrdCl::XRootDStatus status(XrdCl::stFatal, errors[random_error], 0);
return status;
}
}
}
eos_static_debug("fuzzing OK");
// size_t open_async_return_counter
XRootDStatus status(XrdCl::stOK,
0,
0,
"open successful"
);
return status;
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
XrdCl::Fuzzing::ReadAsyncResponseFuzz()
{
if (read_async_return_scaler) {
if (!(read_async_return_counter++ % read_async_return_scaler)) {
eos_static_debug("fuzzing error");
return true;
}
}
eos_static_debug("fuzzing OK");
return false;
}
/* -------------------------------------------------------------------------- */
const char*
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::Dump(std::string& out)
{
mProtocol.Dump(out);
return out.c_str();
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::Protocol::Add(std::string s)
{
XrdSysMutexHelper lock(mMutex);
mMessages.push_back(std::string("---- " + s + "\n"));
}
/* -------------------------------------------------------------------------- */
const char*
/* -------------------------------------------------------------------------- */
XrdCl::Proxy::Protocol::Dump(std::string& out)
{
XrdSysMutexHelper lock(mMutex);
for (auto item = mMessages.begin(); item != mMessages.end(); ++item) {
out += *item;
}
return out.c_str();
}