/*
* journalcache.cc
*
* Created on: Mar 15, 2017
* Author: Michal Simon
*
************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "journalcache.hh"
#include "dircleaner.hh"
#include "io.hh"
#include "common/Path.hh"
#include "common/Logging.hh"
#ifdef __APPLE__
#include "XrdSys/XrdSysPlatform.hh"
#endif
#include
#include
constexpr size_t journalcache::sDefaultMaxSize;
std::string journalcache::sLocation;
size_t journalcache::sMaxSize = journalcache::sDefaultMaxSize;
shared_ptr journalcache::jDirCleaner;
journalcache::journalcache(fuse_ino_t ino) : ino(ino), cachesize(0),
truncatesize(-1), max_offset(0), fd(-1), nbAttached(0), nbFlushed(0)
{
memset(&attachstat, 0, sizeof(attachstat));
memset(&detachstat, 0, sizeof(detachstat));
}
journalcache::~journalcache()
{
if (fd > 0) {
eos_static_debug("closing fd=%d\n", fd);
detachstat.st_size = 0 ;
fstat(fd, &detachstat);
int rc = close(fd);
if (rc) {
eos_static_crit("%s", "msg=\"journalcache::~journalcache fd close failed\"");
std::abort();
}
if (jDirCleaner) {
jDirCleaner->get_external_tree().change(detachstat.st_size - attachstat.st_size,
0);
}
if (!(flags & O_CACHE)) {
// only clean write caches
journal.clear();
unlink();
}
fd = -1;
}
}
int journalcache::location(std::string& path, bool mkpath)
{
char cache_path[1024 + 20];
snprintf(cache_path, sizeof(cache_path), "%s/%03lX/%08lX.jc",
sLocation.c_str(),
(ino > 0x0fffffff) ? (ino >> 28) % 4096 : ino % 4096, ino);
if (mkpath) {
eos::common::Path cPath(cache_path);
if (!cPath.MakeParentPath(S_IRWXU)) {
return -errno;
}
}
path = cache_path;
return 0;
}
int journalcache::read_journal()
{
journal.clear();
const size_t bufsize = 1024;
char buffer[bufsize];
ssize_t bytesRead = 0, totalBytesRead = 0;
int64_t pos = 0;
ssize_t entrySize = 0;
while (true) {
bytesRead = ::pread(fd, buffer, bufsize, totalBytesRead);
if (bytesRead <= 0) {
break;
}
pos = 0;
do {
if (entrySize == 0) {
if ( (pos + sizeof(header_t)) > (long unsigned int)bytesRead) {
// no complete header is left, we have re-align the next read to get a full header
bytesRead = pos;
break;
}
header_t* header = reinterpret_cast(buffer + pos);
journal.insert(header->offset, header->offset + header->size,
totalBytesRead + pos);
entrySize = header->size;
pos += sizeof(header_t);
}
size_t shift = entrySize > bytesRead - pos ? bytesRead - pos : entrySize;
pos += shift;
entrySize -= shift;
} while (pos < bytesRead);
totalBytesRead += bytesRead;
}
if (bytesRead < 0) {
return errno;
}
return totalBytesRead;
}
int journalcache::attach(fuse_req_t req, std::string& cookie, int _flags)
{
XrdSysMutexHelper lck(mtx);
flags = _flags;
if ((nbAttached == 0) && (fd == -1)) {
std::string path;
int rc = location(path);
if (rc) {
return rc;
}
if (stat(path.c_str(), &attachstat)) {
// a new file
if (jDirCleaner) {
jDirCleaner->get_external_tree().change(0, 1);
}
}
// need to open the file
size_t tries = 0;
do {
fd = open(path.c_str(), O_CREAT | O_RDWR, S_IRWXU);
if (fd < 0) {
if (errno == ENOENT) {
tries++;
// re-create the directory structure
rc = location(path);
if (rc) {
return rc;
}
if (tries < 10) {
continue;
} else {
return -errno;
}
}
return -errno;
}
break;
} while (1);
cachesize = read_journal();
}
nbAttached++;
return 0;
}
int journalcache::detach(std::string& cookie)
{
XrdSysMutexHelper lck(mtx);
nbAttached--;
return 0;
}
int journalcache::unlink()
{
std::string path;
int rc = location(path);
if (!rc) {
struct stat buf;
rc = stat(path.c_str(), &buf);
if (!rc) {
rc = ::unlink(path.c_str());
if (!rc) {
// a deleted file
if (jDirCleaner) {
jDirCleaner->get_external_tree().change(-buf.st_size, -1);
}
}
}
}
return rc;
}
int journalcache::rescue(std::string& rescue_location)
{
std::string path;
int rc = location(path);
if (!rescue_location.length()) {
rescue_location = path;
rescue_location += ".recover";
}
if (!rc) {
return ::rename(path.c_str(), rescue_location.c_str());
} else {
return rc;
}
}
ssize_t journalcache::pread(void* buf, size_t count, off_t offset)
{
read_lock lck(clck);
auto result = journal.query(offset, offset + count);
// there is not a single interval that overlaps
if (result.empty()) {
return 0;
}
char* buffer = reinterpret_cast(buf);
uint64_t off = offset;
uint64_t bytesRead = 0;
for (auto& itr : result) {
if (itr->low <= off && off < itr->high) {
// read from cache
uint64_t cacheoff = itr->value + sizeof(header_t) + (off - itr->low);
int64_t intervalsize = itr->high - off;
int64_t bytesLeft = count - bytesRead;
int64_t bufsize = intervalsize < bytesLeft ? intervalsize : bytesLeft;
ssize_t ret = ::pread(fd, buffer, bufsize, cacheoff);
if (ret < 0) {
return -1;
}
bytesRead += ret;
off += ret;
buffer += ret;
if (bytesRead >= count) {
break;
}
}
}
if ((truncatesize != -1) && ((ssize_t) offset >= truncatesize)) {
// offset after truncation mark
return 0;
}
if ((truncatesize != -1) && ((ssize_t)(offset + bytesRead) > truncatesize)) {
// read over truncation size
return (truncatesize - offset);
}
return bytesRead;
}
void journalcache::process_intersection(interval_tree&
to_write, interval_tree::iterator itr,
std::vector& updates)
{
auto result = to_write.query(itr->low, itr->high);
if (result.empty()) {
return;
}
if (result.size() > 1) {
throw std::logic_error("journalcache: overlapping journal entries");
}
const interval_tree::iterator to_wrt = *result.begin();
// the intersection
uint64_t low = std::max(to_wrt->low, itr->low);
uint64_t high = std::min(to_wrt->high, itr->high);
// update
chunk_t update;
update.offset = offset_for_update(itr->value, low - itr->low);
update.size = high - low;
update.buff = static_cast(to_wrt->value) + (low - to_wrt->low);
updates.push_back(std::move(update));
// update the 'to write' intervals
uint64_t wrtlow = to_wrt->low;
uint64_t wrthigh = to_wrt->high;
const void* wrtbuff = to_wrt->value;
to_write.erase(wrtlow, wrthigh);
// the intersection overlaps with the given
// interval so there is nothing more to do
if (low == wrtlow && high == wrthigh) {
return;
}
if (high < wrthigh) {
// the remaining right-hand-side interval
const char* buff = static_cast(wrtbuff) + (high - wrtlow);
to_write.insert(high, wrthigh, buff);
}
if (low > wrtlow) {
// the remaining left-hand-side interval
to_write.insert(wrtlow, low, wrtbuff);
}
}
int journalcache::update_cache(std::vector& updates)
{
// make sure we are updating the cache in ascending order
std::sort(updates.begin(), updates.end());
int rc = 0;
for (auto& u : updates) {
rc = ::pwrite(fd, u.buff, u.size,
u.offset); // TODO is it safe to assume it will write it all
if (rc <= 0) {
return errno;
}
}
return 0;
}
ssize_t journalcache::pwrite(const void* buf, size_t count, off_t offset)
{
if (count <= 0) {
return 0;
}
write_lock lck(clck);
while (sMaxSize <= cachesize) {
clck.write_wait();
}
interval_tree to_write;
std::vector updates;
to_write.insert(offset, offset + count, buf);
auto res = journal.query(offset, offset + count);
for (auto itr : res) {
process_intersection(to_write, itr, updates);
}
int rc = update_cache(updates);
if (rc) {
return -1;
}
interval_tree::iterator itr;
// TODO this could be replaced with a single pwritev
for (itr = to_write.begin(); itr != to_write.end(); ++itr) {
uint64_t size = itr->high - itr->low;
header_t header;
header.offset = itr->low;
header.size = size;
iovec iov[2];
iov[0].iov_base = &header;
iov[0].iov_len = sizeof(header_t);
iov[1].iov_base = const_cast(itr->value);
iov[1].iov_len = size;
// @todo: fix this properly for the mac if there is such support
rc = ::pwrite(fd, iov[0].iov_base, iov[0].iov_len, cachesize);
rc += ::pwrite(fd, iov[1].iov_base, iov[1].iov_len,
cachesize + iov[0].iov_len);
// rc = ::pwritev( fd, iov, 2, cachesize ); // TODO is it safe to assume it will write it all
if (rc <= 0) {
return -1;
}
journal.insert(itr->low, itr->high, cachesize);
cachesize += sizeof(header_t) + size;
}
if ((truncatesize != -1) && ((ssize_t)(offset + count) > truncatesize)) {
// journal written after last truncation size
truncatesize = offset + count;
}
if ((ssize_t)(offset + count) > max_offset) {
max_offset = offset + count;
}
return count;
}
int journalcache::truncate(off_t offset, bool invalidate)
{
int rc = 0;
write_lock lck(clck);
fstat(fd, &detachstat);
if (offset) {
truncatesize = offset;
max_offset = offset;
} else {
// distinguish cache invalidation from 0 truncation
if (invalidate) {
truncatesize = -1;
} else {
truncatesize = 0;
}
max_offset = 0;
journal.clear();
cachesize = 0;
if (!::ftruncate(fd, 0)) {
if (jDirCleaner) {
jDirCleaner->get_external_tree().change(detachstat.st_size - attachstat.st_size,
0);
}
attachstat.st_size = offset;
}
}
return rc;
}
int journalcache::sync()
{
return ::fdatasync(fd);
}
size_t journalcache::size()
{
return cachesize;
}
off_t journalcache::get_max_offset()
{
read_lock lck(clck);
return max_offset;
}
int journalcache::init(const cacheconfig& config)
{
if (::access(config.location.c_str(), W_OK)) {
return errno;
}
sLocation = config.journal;
if (config.per_file_journal_max_size) {
journalcache::sMaxSize = config.per_file_journal_max_size;
}
eos_static_info("journalcache location %s", sLocation.c_str());
return 0;
}
int journalcache::init_daemonized(const cacheconfig& config)
{
jDirCleaner = std::make_shared(config.location,
"jc",
config.total_file_journal_size,
config.total_file_journal_inodes,
config.clean_threshold
);
jDirCleaner->set_trim_suffix(".jc");
if (config.clean_on_startup) {
eos_static_info("cleaning journal path=%s", config.location.c_str());
if (jDirCleaner->cleanall(".jc")) {
eos_static_err("journal cleanup failed");
return -1;
}
}
return 0;
}
int journalcache::remote_sync(cachesyncer& syncer)
{
write_lock lck(clck);
int ret = syncer.sync(fd, journal, sizeof(header_t), truncatesize);
if (!ret) {
journal.clear();
eos_static_debug("ret=%d truncatesize=%ld\n", ret, truncatesize);
ret |= ::ftruncate(fd, 0);
eos_static_debug("ret=%d errno=%d\n", ret, errno);
}
clck.broadcast();
return ret;
}
int journalcache::remote_sync_async(XrdCl::shared_proxy proxy)
{
// sends all the journal content as asynchronous write requests
int ret = 0;
if (!proxy) {
return -1;
}
off_t offshift = sizeof(header_t);
write_lock lck(clck);
for (auto itr = journal.begin(); itr != journal.end(); ++itr) {
off_t cacheoff = itr->value + offshift;
size_t size = itr->high - itr->low;
// prepare async buffer
XrdCl::Proxy::write_handler handler = proxy->WriteAsyncPrepare(proxy, size, itr->low,
0);
int bytesRead = ::pread(fd, (void*) handler->buffer(), size, cacheoff);
if (bytesRead < 0) {
// TODO handle error
clck.broadcast();
return -1;
}
if (bytesRead < (int) size) {
// TODO handle error - still we continue
}
XrdCl::XRootDStatus st = proxy->ScheduleWriteAsync(0, handler);
if (!st.IsOK()) {
eos_static_err("failed to issue async-write");
clck.broadcast();
return -1;
}
}
// there might be a truncate call after the writes to be applied
if (truncatesize != -1) {
XrdCl::XRootDStatus st = proxy->Truncate(truncatesize);
if (!st.IsOK()) {
eos_static_err("failed to truncate");
clck.broadcast();
return -1;
}
truncatesize = -1;
}
journal.clear();
eos_static_debug("ret=%d truncatesize=%ld\n", ret, truncatesize);
errno = 0;
ret |= ::ftruncate(fd, 0);
eos_static_debug("ret=%d errno=%d\n", ret, errno);
clck.broadcast();
return ret;
}
int journalcache::reset()
{
write_lock lck(clck);
journal.clear();
int retc = (fd > 0)?::ftruncate(fd, 0):0;
cachesize = 0;
max_offset = 0;
truncatesize = -1;
clck.broadcast();
return retc;
}
std::string journalcache::dump()
{
std::string out;
out += "fd=";
out += std::to_string(fd);
out += " truncatexize=";
out += std::to_string(get_truncatesize());
out += " maxoffset=";
out += std::to_string(get_max_offset());
out += " nbattached=";
out += std::to_string(nbAttached);
out += " nbflushed=";
out += std::to_string(nbFlushed);
return out;
}
std::vector journalcache::get_chunks(off_t offset,
size_t size)
{
read_lock lck(clck);
auto result = journal.query(offset, offset + size);
std::vector ret;
for (auto& itr : result) {
uint64_t off = (off_t) itr->low < (off_t) offset ? offset : itr->low;
uint64_t count = itr->high < offset + size ? itr->high - off : offset + size -
off;
uint64_t cacheoff = itr->value + sizeof(header_t) + (off - itr->low);
std::unique_ptr buffer(new char[count]);
ssize_t rc = ::pread(fd, buffer.get(), count, cacheoff);
if (rc < 0) {
return ret;
}
ret.push_back(chunk_t(off, count, std::move(buffer)));
}
return ret;
}