//------------------------------------------------------------------------------
//! @file RocksKV.cc
//! @author Georgios Bitzes CERN
//! @brief kv persistency class based on rocksdb
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifdef HAVE_ROCKSDB
#include
#include
#include "kv/kv.hh"
#include "kv/RocksKV.hh"
#include "eosfuse.hh"
#include "misc/MacOSXHelper.hh"
#include "common/Logging.hh"
#include "misc/longstring.hh"
#include
#include
#include
#include
/* -------------------------------------------------------------------------- */
static bool
/* -------------------------------------------------------------------------- */
safe_strtoll(const std::string& str, uint64_t& ret)
/* -------------------------------------------------------------------------- */
{
char* endptr = NULL;
ret = strtoull(str.c_str(), &endptr, 10);
if (endptr != str.c_str() + str.size() || ret == ULONG_LONG_MAX) {
return false;
}
return true;
}
/* -------------------------------------------------------------------------- */
RocksKV::RocksKV()
/* -------------------------------------------------------------------------- */
{
}
/* -------------------------------------------------------------------------- */
RocksKV::~RocksKV()
/* -------------------------------------------------------------------------- */
{
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::connect(const std::string& prefix, const std::string& path)
/* -------------------------------------------------------------------------- */
{
eos_static_info("Opening RocksKV store at local path %s", path.c_str());
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
table_options.block_size = 1024;
// table_options.block_cache = rocksdb::NewLRUCache(4*1024);
// table_options.cache_index_and_filter_blocks = false;
options.optimize_filters_for_hits = true;
options.statistics = rocksdb::CreateDBStatistics();
// options.compression = rocksdb::kZSTD;
// options.bottommost_compression = rocksdb::kZSTD;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
options.create_if_missing = true;
options.row_cache = rocksdb::NewLRUCache(4 * 1024 * 1024);
options.level_compaction_dynamic_level_bytes = true;
options.max_subcompactions = 4;
options.disable_auto_compactions = false;
options.write_buffer_size = 1 * 1024 * 1024;
rocksdb::TransactionDBOptions txopts;
txopts.transaction_lock_timeout = -1;
txopts.default_lock_timeout = -1;
rocksdb::TransactionDB* mydb;
rocksdb::Status st = rocksdb::TransactionDB::Open(options, txopts, path, &mydb);
if (!st.ok()) {
eos_static_crit("Could not open RocksKV store, error: %s",
st.ToString().c_str());
return -1;
}
mPrefix = prefix;
transactionDB.reset(mydb);
db = transactionDB->GetBaseDB();
return 0;
}
static int badStatus(const rocksdb::Status& st)
{
eos_static_crit("Unexpected rocksdb status: %s", st.ToString().c_str());
return -1;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::get(const std::string& key, std::string& value)
/* -------------------------------------------------------------------------- */
{
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), prefix(key), &value);
if (st.IsNotFound()) {
return 1;
} else if (!st.ok()) {
return badStatus(st);
}
return 0;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::get(const std::string& key, uint64_t& value)
/* -------------------------------------------------------------------------- */
{
std::string tmp;
int ret = this->get(key, tmp);
if (ret != 0) {
return ret;
}
if (!safe_strtoll(tmp.c_str(), value)) {
eos_static_crit("Expected to find an integer on key %s, instead found %s",
key.c_str(), tmp.c_str());
return -1;
}
return 0;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::put(const std::string& key, const std::string& value)
/* -------------------------------------------------------------------------- */
{
rocksdb::Status st = db->Put(rocksdb::WriteOptions(), prefix(key), value);
if (!st.ok()) {
return badStatus(st);
}
return 0;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::put(const std::string& key, uint64_t value)
/* -------------------------------------------------------------------------- */
{
return this->put(key, std::to_string(value));
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::inc(const std::string& key, uint64_t& value)
/* -------------------------------------------------------------------------- */
{
TransactionPtr tx = startTransaction();
std::string tmp;
uint64_t initialValue = 0;
rocksdb::Status st = tx->GetForUpdate(rocksdb::ReadOptions(), prefix(key),
&tmp);
if (!st.ok() && !st.IsNotFound()) {
return badStatus(st);
}
if (!safe_strtoll(tmp, initialValue)) {
eos_static_crit("Attemted to increase a non-numeric value on key %s: %s",
key.c_str(), tmp.c_str());
return -1;
}
st = tx->Put(prefix(key), std::to_string(initialValue + value));
if (!st.ok()) {
return badStatus(st);
}
st = tx->Commit();
if (!st.ok()) {
return badStatus(st);
}
value += initialValue;
return 0;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::erase(const std::string& key)
/* -------------------------------------------------------------------------- */
{
rocksdb::Status st = db->Delete(rocksdb::WriteOptions(), prefix(key));
if (!st.ok()) {
// deleting a non-existent key is not an error!
return badStatus(st);
}
return 0;
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::get(uint64_t key, std::string& value, const std::string& name_space)
/* -------------------------------------------------------------------------- */
{
return this->get(buildKey(key, name_space), value);
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::put(uint64_t key, const std::string& value,
const std::string& name_space)
/* -------------------------------------------------------------------------- */
{
return this->put(buildKey(key, name_space), value);
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::get(uint64_t key, uint64_t& value, const std::string& name_space)
/* -------------------------------------------------------------------------- */
{
return this->get(buildKey(key, name_space), value);
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::put(uint64_t key, uint64_t value, const std::string& name_space)
/* -------------------------------------------------------------------------- */
{
return this->put(buildKey(key, name_space), value);
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::erase(uint64_t key, const std::string& name_space)
{
return this->erase(buildKey(key, name_space));
}
/* -------------------------------------------------------------------------- */
int
/* -------------------------------------------------------------------------- */
RocksKV::clean_stores(const std::string& storedir, const std::string& newdb)
{
DIR* dir;
struct dirent* ent;
if ((dir = ::opendir(storedir.c_str())) != NULL) {
while ((ent = ::readdir(dir)) != NULL) {
std::string entry = ent->d_name;
if ((entry == ".") || (entry == "..")) {
continue;
}
if (entry == newdb) {
continue;
}
if (!entry.length()) {
continue;
}
struct stat buf;
std::string dbdir = storedir;
dbdir += "/";
dbdir += entry;
if (!stat(dbdir.c_str(), &buf)) {
// check if this is a directory
if (S_ISDIR(buf.st_mode)) {
// cleanup this old directory
std::string rmline = "rm -rf ";
rmline += dbdir;
(void) !system(rmline.c_str());
fprintf(stderr, "###### cleaning stale cache directory '%s'\n",
dbdir.c_str());
}
}
}
::closedir(dir);
}
return 0;
}
#endif // HAVE_ROCKSDB