//------------------------------------------------------------------------------
// File: ReplicaParLayout.cc
// Author: Andreas-Joachim Peters - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "fst/layout/ReplicaParLayout.hh"
#include "fst/XrdFstOfs.hh"
EOSFSTNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
ReplicaParLayout::ReplicaParLayout(XrdFstOfsFile* file,
unsigned long lid,
const XrdSecEntity* client,
XrdOucErrInfo* outError,
const char* path,
uint16_t timeout) :
Layout(file, lid, client, outError, path, timeout),
// this 1=0x0 16=0xf :-)
mNumReplicas(eos::common::LayoutId::GetStripeNumber(lid) + 1),
mHasWriteErr(false), mDoAsyncWrite(false)
{
if (getenv("EOS_FST_REPLICA_ASYNC_WRITE")) {
mDoAsyncWrite = true;
}
}
//------------------------------------------------------------------------------
// Redirect to new target
//------------------------------------------------------------------------------
void ReplicaParLayout::Redirect(const char* path)
{
mFileIO.reset(FileIoPlugin::GetIoObject(path, mOfsFile, mSecEntity));
mLocalPath = path;
}
//------------------------------------------------------------------------------
// Open file
//------------------------------------------------------------------------------
int
ReplicaParLayout::Open(XrdSfsFileOpenMode flags, mode_t mode,
const char* opaque)
{
int replica_index = -1;
int replica_head = -1;
const char* index = mOfsFile->mOpenOpaque->Get("mgm.replicaindex");
if (index) {
replica_index = atoi(index);
if ((replica_index < 0) || (replica_index > 255)) {
eos_err("msg=\"illegal replica index %d\"", replica_index);
return Emsg("ReplicaPar::Open", *mError, EINVAL, "open replica - "
"illegal replica index found", index);
}
} else {
eos_err("%s", "msg=\"replica index missing\"");
return Emsg("ReplicaPar::Open", *mError, EINVAL, "open replica - "
"no replica index defined");
}
const char* head = mOfsFile->mOpenOpaque->Get("mgm.replicahead");
if (head) {
replica_head = atoi(head);
if ((replica_head < 0) || (replica_head > 255)) {
eos_err("msg=\"illegal replica head %d\"", replica_head);
return Emsg("ReplicaParOpen", *mError, EINVAL, "open replica - "
"illegal replica head found", head);
}
} else {
eos_err("%s", "msg=\"replica head missing\"");
return Emsg("ReplicaPar::Open", *mError, EINVAL, "open replica - "
"no replica head defined");
}
// Define the replication head
eos_debug("replica_head=%i, replica_index=%i", replica_head, replica_index);
if (replica_index == replica_head) {
mIsEntryServer = true;
}
int envlen;
XrdOucString ns_path = mOfsFile->mOpenOpaque->Get("mgm.path");
// Local replica is always on the first position in the vector
mReplicaUrl.push_back(mLocalPath);
// Only entry server needs to contact others and only for write ops
if (mIsEntryServer && mOfsFile->mIsRW) {
for (int i = 0; i < mNumReplicas; ++i) {
if (i != replica_index) {
const std::string rep_tag = "mgm.url" + std::to_string(i);
const char* rep = mOfsFile->mCapOpaque->Get(rep_tag.c_str());
if (!rep) {
if (mOfsFile->mIsRW) {
eos_err("msg=\"failed to open replica for writing, missing url "
"for replica %s\"", rep_tag.c_str());
return Emsg("ReplicaParOpen", *mError, EINVAL, "open stripes - "
"missing url for replica ", rep_tag.c_str());
} else {
// For read we can handle one of the replicas missing
continue;
}
}
// Prepare the index for the next target
XrdOucString oldindex = "mgm.replicaindex=";
XrdOucString newindex = "mgm.replicaindex=";
oldindex += index;
newindex += i;
XrdOucString new_opaque = mOfsFile->mOpenOpaque->Env(envlen);
new_opaque.replace(oldindex.c_str(), newindex.c_str());
std::string replica_url = rep;
replica_url += ns_path.c_str();
replica_url += "?";
replica_url += new_opaque.c_str();
mReplicaUrl.push_back(replica_url);
eos_debug("msg=\"add replica\" replica_url=%s, index=%i",
replica_url.c_str(), i);
}
}
}
std::list> open_futures;
std::list open_replies;
for (const auto& replica_url : mReplicaUrl) {
std::unique_ptr file
{FileIoPlugin::GetIoObject(replica_url, mOfsFile, mSecEntity)};
if (file) {
open_futures.push_back(file->fileOpenAsync(flags, mode, opaque, mTimeout));
mReplicaFile.push_back(std::move(file));
} else {
// Wait and discard any pending replies
for (auto& fut : open_futures) {
(void) fut.get();
}
eos_err("msg=\"failed to allocate file object\" path=\"%s\"",
replica_url.c_str());
return Emsg("ReplicaParOpen", *mError, EINVAL, "open stripes - "
"failed to allocate file object");
}
}
for (auto& fut : open_futures) {
open_replies.push_back(fut.get());
// Populate vector of responses for write ops - to be dropped with eosd
mResponses.emplace_back();
}
int count = 0;
for (const auto& status : open_replies) {
if (!status.IsOK()) {
bool is_local = (count == 0);
bool is_rw = mOfsFile->mIsRW;
XrdOucString maskUrl = (mReplicaUrl[count].c_str() ?
mReplicaUrl[count].c_str() : "");
// Mask some opaque parameters to shorten the logging
eos::common::StringConversion::MaskTag(maskUrl, "cap.sym");
eos::common::StringConversion::MaskTag(maskUrl, "cap.msg");
eos::common::StringConversion::MaskTag(maskUrl, "authz");
eos_err("msg=\"failed %s %s open\" path=\"%s\"",
(is_local ? "local" : "remote"), (is_rw ? "write" : "read"),
maskUrl.c_str());
return Emsg("ReplicaParOpen", *mError, (is_local ? EIO : EREMOTEIO),
"open stripes - open failed ", maskUrl.c_str());
}
++count;
}
return SFS_OK;
}
//------------------------------------------------------------------------------
// Read from file
//------------------------------------------------------------------------------
int64_t
ReplicaParLayout::Read(XrdSfsFileOffset offset, char* buffer,
XrdSfsXferSize length, bool readahead)
{
int64_t rc = 0;
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
rc = mReplicaFile[i]->fileRead(offset, buffer, length, mTimeout);
if (rc == SFS_ERROR) {
XrdOucString maskUrl = mReplicaUrl[i].c_str() ? mReplicaUrl[i].c_str() : "";
// mask some opaque parameters to shorten the logging
eos::common::StringConversion::MaskTag(maskUrl, "cap.sym");
eos::common::StringConversion::MaskTag(maskUrl, "cap.msg");
eos::common::StringConversion::MaskTag(maskUrl, "authz");
eos_warning("Failed to read from replica off=%lld, length=%i, mask_url=%s",
offset, length, maskUrl.c_str());
continue;
} else {
// Read was successful no need to read from another replica
break;
}
}
if (rc == SFS_ERROR) {
eos_err("Failed to read from any replica offset=%lld, length=%i",
offset, length);
return Emsg("ReplicaParRead", *mError, EREMOTEIO,
"read replica - read failed");
}
return rc;
}
//------------------------------------------------------------------------------
// Vector read
//------------------------------------------------------------------------------
int64_t
ReplicaParLayout::ReadV(XrdCl::ChunkList& chunkList, uint32_t len)
{
int64_t rc = 0;
eos_debug("msg=\"readv\" count_chunks=%i", chunkList.size());
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
rc = mReplicaFile[i]->fileReadV(chunkList, mTimeout);
if (rc == SFS_ERROR) {
XrdOucString maskUrl = mReplicaUrl[i].c_str() ? mReplicaUrl[i].c_str() : "";
// Mask some opaque parameters to shorten the logging
eos::common::StringConversion::MaskTag(maskUrl, "cap.sym");
eos::common::StringConversion::MaskTag(maskUrl, "cap.msg");
eos::common::StringConversion::MaskTag(maskUrl, "authz");
eos_warning("msg=\"failed replica readv \" url=\"%s\"", maskUrl.c_str());
continue;
} else {
// Read was successful no need to read from another replica
break;
}
}
if (rc == SFS_ERROR) {
eos_err("%s", "msg=\"failed to readv from any replica\"");
return Emsg("ReplicaParRead", *mError, EREMOTEIO, "readv replica failed");
}
return rc;
}
//------------------------------------------------------------------------------
// Write to file
//------------------------------------------------------------------------------
int64_t
ReplicaParLayout::Write(XrdSfsFileOffset offset, const char* buffer,
XrdSfsXferSize length)
{
if (mDoAsyncWrite) {
return WriteAsync(offset, buffer, length);
}
for (unsigned int i = 0; i < mReplicaFile.size(); ++i) {
int64_t rc = mReplicaFile[i]->fileWrite(offset, buffer, length, mTimeout);
if (rc != length) {
XrdOucString maskUrl = mReplicaUrl[i].c_str() ? mReplicaUrl[i].c_str() : "";
// mask some opaque parameters to shorten the logging
eos::common::StringConversion::MaskTag(maskUrl, "cap.sym");
eos::common::StringConversion::MaskTag(maskUrl, "cap.msg");
eos::common::StringConversion::MaskTag(maskUrl, "authz");
errno = (i == 0) ? EIO : EREMOTEIO;
// show only the first write error as an error to broadcast upstream
if (mHasWriteErr) {
eos_err("[NB] Failed to write replica %i - write failed -%llu %s",
i, offset, maskUrl.c_str());
} else {
eos_err("Failed to write replica %i - write failed - %llu %s",
i, offset, maskUrl.c_str());
}
mHasWriteErr = true;
return Emsg("ReplicaWrite", *mError, errno, "write replica failed",
maskUrl.c_str());
}
}
return length;
}
//------------------------------------------------------------------------------
// Write using async requests
//------------------------------------------------------------------------------
int64_t
ReplicaParLayout::WriteAsync(XrdSfsFileOffset offset, const char* buffer,
XrdSfsXferSize length)
{
for (unsigned int i = 0; i < mReplicaFile.size(); ++i) {
mResponses[i].CollectFuture(mReplicaFile[i]->fileWriteAsync
(buffer, offset, length));
// Collect available responses
if (!mResponses[i].CheckResponses(false)) {
XrdOucString maskUrl = mReplicaUrl[i].c_str() ? mReplicaUrl[i].c_str() : "";
eos::common::StringConversion::MaskTag(maskUrl, "cap.sym");
eos::common::StringConversion::MaskTag(maskUrl, "cap.msg");
eos::common::StringConversion::MaskTag(maskUrl, "authz");
// Show only the first write error as an error to broadcast upstream
if (mHasWriteErr) {
eos_err("msg=\"[NB] write failed for replica %i\" offset=%llu url=%s",
i, offset, maskUrl.c_str());
} else {
eos_err("msg=\"write failed for replica %i\" offset=%llu url=%s",
i, offset, maskUrl.c_str());
}
mHasWriteErr = true;
errno = (i == 0) ? EIO : EREMOTEIO;
return Emsg("ReplicaWrite", *mError, errno, "write replica failed",
maskUrl.c_str());
}
}
return length;
}
//------------------------------------------------------------------------------
// Truncate file
//------------------------------------------------------------------------------
int
ReplicaParLayout::Truncate(XrdSfsFileOffset offset)
{
int rc = SFS_OK;
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
rc = mReplicaFile[i]->fileTruncate(offset, mTimeout);
if (rc != SFS_OK) {
errno = (i == 0) ? EIO : EREMOTEIO;
XrdOucString maskUrl = mReplicaUrl[i].c_str() ? mReplicaUrl[i].c_str() : "";
// mask some opaque parameters to shorten the logging
eos::common::StringConversion::MaskTag(maskUrl, "cap.sym");
eos::common::StringConversion::MaskTag(maskUrl, "cap.msg");
eos::common::StringConversion::MaskTag(maskUrl, "authz");
eos_err("Failed to truncate replica %i", i);
return Emsg("ReplicaParTuncate", *mError, errno, "truncate failed",
maskUrl.c_str());
}
}
return rc;
}
//------------------------------------------------------------------------------
// Get stats for file
//------------------------------------------------------------------------------
int
ReplicaParLayout::Stat(struct stat* buf)
{
int rc = 0;
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
rc = mReplicaFile[i]->fileStat(buf, mTimeout);
// Stop at the first stat which works
if (!rc) {
break;
}
}
return rc;
}
//------------------------------------------------------------------------------
// Sync file to disk
//------------------------------------------------------------------------------
int
ReplicaParLayout::Sync()
{
int rc = 0;
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
XrdOucString maskUrl = mReplicaUrl[i].c_str() ? mReplicaUrl[i].c_str() : "";
// mask some opaque parameters to shorten the logging
eos::common::StringConversion::MaskTag(maskUrl, "cap.sym");
eos::common::StringConversion::MaskTag(maskUrl, "cap.msg");
eos::common::StringConversion::MaskTag(maskUrl, "authz");
rc = mReplicaFile[i]->fileSync(mTimeout);
if (rc != SFS_OK) {
errno = (i == 0) ? EIO : EREMOTEIO;
eos_err("error=failed to sync replica %i", i);
return Emsg("ReplicaParSync", *mError, errno, "sync failed",
maskUrl.c_str());
}
}
return rc;
}
//------------------------------------------------------------------------------
// Remove file and all replicas
//------------------------------------------------------------------------------
int
ReplicaParLayout::Remove()
{
int rc = SFS_OK;
bool got_error = false;
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
rc = mReplicaFile[i]->fileRemove();
if (rc != SFS_OK) {
got_error = true;
errno = (i == 0) ? EIO : EREMOTEIO;
eos_err("msg=\"failed to remove replica %i\"", i);
}
}
if (got_error) {
return Emsg("ReplicaParRemove", *mError, errno, "remove failed");
}
return rc;
}
//------------------------------------------------------------------------------
// Close file
//------------------------------------------------------------------------------
int
ReplicaParLayout::Close()
{
int rc = SFS_OK;
int rc_close = SFS_OK;
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
// Wait for any async requests before closing
if (mReplicaFile[i]) {
if (mOfsFile->mIsRW && mDoAsyncWrite) {
if (!mResponses[i].CheckResponses(true)) {
eos_err("msg=\"some async write requests failed for replica %i\"", i);
++rc;
}
}
rc_close = mReplicaFile[i]->fileClose(mTimeout);
rc += rc_close;
if (rc_close != SFS_OK) {
eos_err("msg=\"failed to close replica %i\" url=\"%s\"",
i, mReplicaUrl[i].c_str());
if (errno != EIO) {
errno = ((i == 0) ? EIO : EREMOTEIO);
}
}
}
}
if (rc != SFS_OK) {
return Emsg("ReplicaParClose", *mError, errno, "close failed", "");
}
return rc;
}
//------------------------------------------------------------------------------
// Execute implementation dependant command
//------------------------------------------------------------------------------
int
ReplicaParLayout::Fctl(const std::string& cmd, const XrdSecEntity* client)
{
int retc = SFS_OK;
for (unsigned int i = 0; i < mReplicaFile.size(); i++) {
retc += mReplicaFile[i]->fileFctl(cmd);
}
return retc;
}
//------------------------------------------------------------------------------
// Reserve space for file
//------------------------------------------------------------------------------
int
ReplicaParLayout::Fallocate(XrdSfsFileOffset length)
{
return mReplicaFile[0]->fileFallocate(length);
}
//------------------------------------------------------------------------------
// Deallocate reserved space
//------------------------------------------------------------------------------
int
ReplicaParLayout::Fdeallocate(XrdSfsFileOffset fromOffset,
XrdSfsFileOffset toOffset)
{
return mReplicaFile[0]->fileFdeallocate(fromOffset, toOffset);
}
EOSFSTNAMESPACE_END