// ----------------------------------------------------------------------
// File: StateMachine.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "StateMachine.hh"
#include "Utils.hh"
#include "../deps/StringMatchLen.h"
#include "utils/ParseUtils.hh"
#include "storage/KeyDescriptor.hh"
#include "storage/KeyLocators.hh"
#include "storage/StagingArea.hh"
#include "storage/KeyDescriptorBuilder.hh"
#include "storage/PatternMatching.hh"
#include "storage/ExpirationEventIterator.hh"
#include "storage/ReverseLocator.hh"
#include "storage/InternalKeyParsing.hh"
#include "storage/ConsistencyScanner.hh"
#include "storage/ParanoidManifestChecker.hh"
#include "utils/IntToBinaryString.hh"
#include "utils/TimeFormatting.hh"
#include
#include
#include
#include
#include
#include
#include
#include
#define RETURN_ON_ERROR(st) { rocksdb::Status st2 = st; if(!st2.ok()) return st2; }
#define THROW_ON_ERROR(st) { rocksdb::Status st2 = st; if(!st2.ok()) qdb_throw(st2.ToString()); }
#define ASSERT_OK_OR_NOTFOUND(st) { rocksdb::Status st2 = st; if(!st2.ok() && !st2.IsNotFound()) qdb_throw(st2.ToString()); }
using namespace quarkdb;
static bool directoryExists(const std::string &path) {
struct stat st;
if(stat(path.c_str(), &st) == 0 && (st.st_mode & S_IFDIR) != 0) {
return true;
}
return false;
}
static rocksdb::Status malformed(std::string_view message) {
return rocksdb::Status::InvalidArgument(message);
}
StateMachine::StateMachine(std::string_view f, bool write_ahead_log, bool bulk_load)
: filename(f), writeAheadLog(write_ahead_log), bulkLoad(bulk_load), timeKeeper(0u),
requestCounter(std::chrono::seconds(10)) {
if(writeAheadLog) {
qdb_info("Openning state machine " << quotes(filename) << ".");
}
else {
qdb_warn("Opening state machine " << quotes(filename) << " *without* write ahead log - an unclean shutdown WILL CAUSE DATA LOSS");
}
bool dirExists = directoryExists(filename);
if(bulkLoad && dirExists) {
qdb_throw("bulkload only available for newly initialized state machines; path '" << filename << "' already exists");
}
rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
table_options.block_size = 16 * 1024;
// This option prevents creating bloom filters for the last compaction level.
// A bloom filter is used to quickly rule out whether an SST may contain a
// given key or not. Having bloom filters for the last compaction layer is
// not particularly useful, as it only prevents an extra IO read in cases
// where a key is not found. Given that the last compaction layer is the
// biggest, turning on this option reduces total bloom filter size on disk
// (and associated memory consumption) by ~90%, while only making "not-found"
// queries slightly more expensive.
options.optimize_filters_for_hits = true;
// The default settings for rate limiting are a bit too conservative, causing
// bulk loading to stall heavily.
options.max_write_buffer_number = 6;
options.soft_pending_compaction_bytes_limit = 256 * 1073741824ull;
options.hard_pending_compaction_bytes_limit = 512 * 1073741824ull;
options.level0_slowdown_writes_trigger = 50;
options.level0_stop_writes_trigger = 75;
// rocksdb replays the MANIFEST file upon startup to detect possible DB
// corruption. This file grows by the number of SST files updated per run,
// and is reset after each run.
// If the DB runs for too long, accumulating too many updates, the next
// restart will potentially take several minutes.
// This option limits the max size of MANIFEST to 2MB, taking care to
// automatically roll-over when necessary, which should alleviate the above.
if(!bulkLoad) {
options.max_manifest_file_size = 1024 * 1024;
}
options.compression = rocksdb::kLZ4Compression;
options.bottommost_compression = rocksdb::kZSTD;
options.create_if_missing = !dirExists;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
options.row_cache = rocksdb::NewLRUCache(1024 * 1024 * 1024, 8);
// Use multiple threads for compaction and flushing jobs
options.IncreaseParallelism(std::max(2u, std::thread::hardware_concurrency() / 2));
// Parallelize compaction, but limit maximum number of subcompactions to 4.
options.max_subcompactions = std::max(1u, std::thread::hardware_concurrency() / 2);
if(options.max_subcompactions > 4) {
options.max_subcompactions = 4;
}
// Let rocksdb itself decide the target sizes for each compaction level
options.level_compaction_dynamic_level_bytes = true;
options.disable_auto_compactions = false;
// Warn on write stalls
writeStallWarner.reset(new WriteStallWarner("state-machine"));
options.listeners.emplace_back(writeStallWarner);
if(bulkLoad) {
qdb_warn("Opening state machine in bulkload mode.");
writeAheadLog = false;
options.PrepareForBulkLoad();
options.memtable_factory.reset(new rocksdb::VectorRepFactory());
options.allow_concurrent_memtable_write = false;
}
rocksdb::DB *tmpdb = nullptr;
rocksdb::Status status = rocksdb::DB::Open(options, filename, &tmpdb);
if(!status.ok()) qdb_throw("Cannot open " << quotes(filename) << ":" << status.ToString());
db.reset(tmpdb);
ensureCompatibleFormat(!dirExists);
ensureBulkloadSanity(!dirExists);
ensureClockSanity(!dirExists);
loadExpirationCache();
retrieveLastApplied();
manifestChecker.reset(new ParanoidManifestChecker(filename));
consistencyScanner.reset(new ConsistencyScanner(*this));
}
void StateMachine::ensureClockSanity(bool justCreated) {
std::string value;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_Clock, &value);
if(justCreated) {
if(!st.IsNotFound()) qdb_throw("Error when reading __clock, which should not exist: " << st.ToString());
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_Clock, unsignedIntToBinaryString(0u)));
}
else {
if(st.IsNotFound()) {
// Compatibility: When opening old state machines, set expected __clock key.
// TODO: Remove in a couple of releases.
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_Clock, unsignedIntToBinaryString(0u)));
}
}
st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_Clock, &value);
if(!st.ok()) qdb_throw("Error when reading __clock: " << st.ToString());
if(value.size() != 8u) {
qdb_throw("Detected corruption of __clock, received size " << value.size() << ", was expecting 8");
}
// We survived!
timeKeeper.reset(binaryStringToUnsignedInt(value.c_str()));
}
StateMachine::~StateMachine() {
manifestChecker.reset();
consistencyScanner.reset();
if(db) {
qdb_info("Closing state machine " << quotes(filename));
db.reset();
}
}
//------------------------------------------------------------------------------
// Get underlying folder where this SM resides
//------------------------------------------------------------------------------
std::string StateMachine::getPhysicalLocation() const {
return filename;
}
void StateMachine::reset() {
IteratorPtr iter(db->NewIterator(rocksdb::ReadOptions()));
for(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
db->Delete(rocksdb::WriteOptions(), iter->key().ToString());
}
ensureCompatibleFormat(true);
ensureBulkloadSanity(true);
ensureClockSanity(true);
retrieveLastApplied();
}
void StateMachine::hardSynchronizeDynamicClock() {
ClockValue syncPoint;
getClock(syncPoint);
timeKeeper.synchronize(syncPoint);
}
ClockValue StateMachine::getDynamicClock() {
return timeKeeper.getDynamicTime();
}
void StateMachine::ensureBulkloadSanity(bool justCreated) {
std::string inBulkload;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_InBulkload, &inBulkload);
if(justCreated) {
if(!st.IsNotFound()) qdb_throw("Error when reading __in-bulkload, which should not exist: " << st.ToString());
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_InBulkload, boolToString(bulkLoad)));
}
else {
if(st.IsNotFound()) {
// Compatibility: When opening old state machines, set expected __in-bulkload key.
// TODO: Remove once PPS machines have been updated..
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_InBulkload, boolToString(false)));
st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_InBulkload, &inBulkload);
}
if(!st.ok()) qdb_throw("Error when reading __in-bulkload: " << st.ToString());
if(inBulkload != boolToString(false)) qdb_throw("Bulkload mode was NOT finalized! DB is corrupted - you either did not call finalizeBulkload, or you copied live SST files without shutting down the bulkload QDB process first." << st.ToString());
}
}
void StateMachine::ensureCompatibleFormat(bool justCreated) {
const std::string currentFormat("0");
std::string format;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_Format, &format);
if(justCreated) {
if(!st.IsNotFound()) qdb_throw("Error when reading __format, which should not exist: " << st.ToString());
st = db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_Format, currentFormat);
if(!st.ok()) qdb_throw("error when setting format: " << st.ToString());
}
else {
if(!st.ok()) qdb_throw("Cannot read __format: " << st.ToString());
if(format != currentFormat) qdb_throw("Asked to open a state machine with incompatible format (" << format << "), I can only handle " << currentFormat);
}
}
void StateMachine::retrieveLastApplied() {
std::string tmp;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), KeyConstants::kStateMachine_LastApplied, &tmp);
if(st.ok()) {
lastApplied = binaryStringToInt(tmp.c_str());
}
else if(st.IsNotFound()) {
lastApplied = 0;
st = db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_LastApplied, intToBinaryString(lastApplied));
if(!st.ok()) qdb_throw("error when setting lastApplied: " << st.ToString());
}
else {
qdb_throw("error when retrieving lastApplied: " << st.ToString());
}
}
LogIndex StateMachine::getLastApplied() {
return lastApplied;
}
static rocksdb::Status wrong_type() {
return rocksdb::Status::InvalidArgument("WRONGTYPE Operation against a key holding the wrong kind of value");
}
static KeyDescriptor constructDescriptor(rocksdb::Status &st, const std::string &serialization) {
if(st.IsNotFound()) {
return KeyDescriptor();
}
if(!st.ok()) qdb_throw("unexpected rocksdb status when inspecting key descriptor");
return KeyDescriptor(serialization);
}
KeyDescriptor StateMachine::getKeyDescriptor(StagingArea &stagingArea, std::string_view redisKey) {
std::string tmp;
DescriptorLocator dlocator(redisKey);
rocksdb::Status st = stagingArea.get(dlocator.toView(), tmp);
return constructDescriptor(st, tmp);
}
KeyDescriptor StateMachine::lockKeyDescriptor(StagingArea &stagingArea, DescriptorLocator &dlocator) {
std::string tmp;
rocksdb::Status st = stagingArea.getForUpdate(dlocator.toView(), tmp);
return constructDescriptor(st, tmp);
}
bool StateMachine::assertKeyType(StagingArea &stagingArea, std::string_view key, KeyType keytype) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(!keyinfo.empty() && keyinfo.getKeyType() != keytype) return false;
return true;
}
rocksdb::Status StateMachine::hget(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string &value) {
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
FieldLocator locator(KeyType::kHash, key, field);
return stagingArea.get(locator.toView(), value);
}
rocksdb::Status StateMachine::hexists(StagingArea &stagingArea, std::string_view key, std::string_view field) {
std::string tmp;
return this->hget(stagingArea, key, field, tmp);
}
rocksdb::Status StateMachine::hkeys(StagingArea &stagingArea, std::string_view key, std::vector &keys) {
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
keys.clear();
FieldLocator locator(KeyType::kHash, key);
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.toView())) break;
keys.push_back(std::string(tmp.begin()+locator.getPrefixSize(), tmp.end()));
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hgetall(StagingArea &stagingArea, std::string_view key, std::vector &res) {
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
res.clear();
FieldLocator locator(KeyType::kHash, key);
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.toView())) break;
res.push_back(std::string(tmp.begin()+locator.getPrefixSize(), tmp.end()));
res.push_back(iter->value().ToString());
}
return rocksdb::Status::OK();
}
void StateMachine::lhsetInternal(WriteOperation &operation, std::string_view key, std::string_view field, std::string_view hint, std::string_view value, bool &fieldcreated) {
fieldcreated = false;
if(operation.localityFieldExists(hint, field)) {
// Cool, field exists, we take the fast path. Just update a single value,
// and we are done. No need to update any indexes or key descriptor size,
// as we simply override the old value.
operation.writeLocalityField(hint, field, value);
return;
}
// Two cases: We've received a different locality hint, or we're creating
// a new field.
std::string previousHint;
if(operation.getLocalityIndex(field, previousHint)) {
// Changing locality hint. Drop old entry, insert new one.
qdb_assert(operation.deleteLocalityField(previousHint, field));
// Update field and index.
operation.writeLocalityField(hint, field, value);
operation.writeLocalityIndex(field, hint);
// No update on key size, we're just rewriting a key.
return;
}
// New field!
fieldcreated = true;
operation.writeLocalityField(hint, field, value);
operation.writeLocalityIndex(field, hint);
return;
}
rocksdb::Status StateMachine::lhmset(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end) {
if((end - start) % 3 != 0) qdb_throw("lhmset: distance between start and end iterators must be a multiple of three");
WriteOperation operation(stagingArea, key, KeyType::kLocalityHash);
if(!operation.valid()) return wrong_type();
int64_t created = 0;
for(auto it = start; it != end; it += 3) {
bool fieldcreated = false;
lhsetInternal(operation, key, *it, *(it+1), *(it+2), fieldcreated);
created += fieldcreated;
}
return operation.finalize(operation.keySize() + created);
}
rocksdb::Status StateMachine::lhset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint, std::string_view value, bool &fieldcreated) {
WriteOperation operation(stagingArea, key, KeyType::kLocalityHash);
if(!operation.valid()) return wrong_type();
fieldcreated = false;
lhsetInternal(operation, key, field, hint, value, fieldcreated);
return operation.finalize(operation.keySize() + fieldcreated);
}
rocksdb::Status StateMachine::lhlocdel(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint, int64_t &removed) {
if(!assertKeyType(stagingArea, key, KeyType::kLocalityHash)) return wrong_type();
LocalityFieldLocator locator(key, hint, field);
rocksdb::Status st = stagingArea.exists(locator.toView());
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) {
// Yes, we got a match, field + locality hint match, proceed to deletion.
RedisRequest req;
req.push_back(field);
rocksdb::Status st = lhdel(stagingArea, key, req.begin(), req.end(), removed);
qdb_assert(removed == 1);
return st;
}
removed = 0;
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::lhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed) {
removed = 0;
WriteOperation operation(stagingArea, key, KeyType::kLocalityHash);
if(!operation.valid()) return wrong_type();
for(ReqIterator it = start; it != end; it++) {
std::string hint;
bool exists = operation.getAndDeleteLocalityIndex(*it, hint);
if(exists) {
removed++;
qdb_assert(operation.deleteLocalityField(hint, *it));
}
}
int64_t newsize = operation.keySize() - removed;
return operation.finalize(newsize);
}
rocksdb::Status StateMachine::lhget(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint, std::string &value) {
if(!assertKeyType(stagingArea, key, KeyType::kLocalityHash)) return wrong_type();
if(!hint.empty()) {
// We were given a hint, whooo. Fast path.
LocalityFieldLocator locator(key, hint, field);
rocksdb::Status st = stagingArea.get(locator.toView(), value);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) {
// Done!
return st;
}
// Hmh. Either the field does not exist, or we were given a wrong locality
// hint.
}
std::string correctHint;
LocalityIndexLocator indexLocator(key, field);
rocksdb::Status st = stagingArea.get(indexLocator.toView(), correctHint);
ASSERT_OK_OR_NOTFOUND(st);
if(st.IsNotFound()) return st;
if(!hint.empty()) {
// Client is drunk and giving wrong locality hints, warn.
qdb_assert(hint != correctHint);
qdb_warn("Received invalid locality hint (" << hint << " vs " << correctHint << ") for locality hash with key " << key << ", targeting field " << field);
}
// Fetch correct hint.
LocalityFieldLocator fieldLocator(key, correctHint, field);
THROW_ON_ERROR(stagingArea.get(fieldLocator.toView(), value));
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated) {
WriteOperation operation(stagingArea, key, KeyType::kHash);
if(!operation.valid()) return wrong_type();
fieldcreated = !operation.fieldExists(field);
int64_t newsize = operation.keySize() + fieldcreated;
operation.writeField(field, value);
return operation.finalize(newsize);
}
rocksdb::Status StateMachine::hmset(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end) {
if((end - start) % 2 != 0) qdb_throw("hmset: distance between start and end iterators must be an even number");
WriteOperation operation(stagingArea, key, KeyType::kHash);
if(!operation.valid()) return wrong_type();
int64_t newsize = operation.keySize();
for(ReqIterator it = start; it != end; it += 2) {
newsize += !operation.fieldExists(*it);
operation.writeField(*it, *(it+1));
}
return operation.finalize(newsize);
}
rocksdb::Status StateMachine::hsetnx(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated) {
WriteOperation operation(stagingArea, key, KeyType::kHash);
if(!operation.valid()) return wrong_type();
fieldcreated = !operation.fieldExists(field);
int64_t newsize = operation.keySize() + fieldcreated;
if(fieldcreated) {
operation.writeField(field, value);
}
return operation.finalize(newsize);
}
rocksdb::Status StateMachine::hincrby(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view incrby, int64_t &result) {
int64_t incrbyInt64;
if(!ParseUtils::parseInt64(incrby, incrbyInt64)) {
return malformed("value is not an integer or out of range");
}
WriteOperation operation(stagingArea, key, KeyType::kHash);
if(!operation.valid()) return wrong_type();
std::string value;
bool exists = operation.getField(field, value);
result = 0;
if(exists && !ParseUtils::parseInt64(value, result)) {
operation.finalize(operation.keySize());
return malformed("hash value is not an integer");
}
result += incrbyInt64;
operation.writeField(field, std::to_string(result));
return operation.finalize(operation.keySize() + !exists);
}
rocksdb::Status StateMachine::hincrbyfloat(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view incrby, double &result) {
double incrByDouble;
if(!my_strtod(incrby, incrByDouble)) {
return malformed("value is not a float or out of range");
}
WriteOperation operation(stagingArea, key, KeyType::kHash);
if(!operation.valid()) return wrong_type();
std::string value;
bool exists = operation.getField(field, value);
result = 0;
if(exists && !my_strtod(value, result)) {
operation.finalize(operation.keySize());
return malformed("hash value is not a float");
}
result += incrByDouble;
operation.writeField(field, std::to_string(result));
return operation.finalize(operation.keySize() + !exists);
}
rocksdb::Status StateMachine::hdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed) {
removed = 0;
WriteOperation operation(stagingArea, key, KeyType::kHash);
if(!operation.valid()) return wrong_type();
for(ReqIterator it = start; it != end; it++) {
removed += operation.deleteField(*it);
}
int64_t newsize = operation.keySize() - removed;
return operation.finalize(newsize);
}
static bool isWrongType(KeyDescriptor &descriptor, KeyType keyType) {
return !descriptor.empty() && (descriptor.getKeyType() != keyType);
}
rocksdb::Status StateMachine::hlen(StagingArea &stagingArea, std::string_view key, size_t &len) {
len = 0;
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kHash)) return wrong_type();
len = keyinfo.getSize();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::lhlen(StagingArea &stagingArea, std::string_view key, size_t &len) {
len = 0;
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kLocalityHash)) return wrong_type();
len = keyinfo.getSize();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::rawGetAllVersions(std::string_view key, std::vector &versions) {
return rocksdb::GetAllKeyVersions(db.get(), key, key,
std::numeric_limits::max(), &versions);
}
rocksdb::Status StateMachine::rawScanMaybeTombstones(StagingArea &stagingArea, std::string_view seek, size_t count, std::vector &elements, bool tombstones) {
if(tombstones) {
return rawScanTombstones(stagingArea, seek, count, elements);
}
return rawScan(stagingArea, seek, count, elements);
}
rocksdb::Status StateMachine::rawScanTombstones(StagingArea &stagingArea, std::string_view key, size_t count, std::vector &elements) {
elements.clear();
IteratorPtr iter(stagingArea.getIterator(true));
iter->Seek(key);
while(iter->Valid() && elements.size() < count) {
std::string_view currentKey = iter->key().ToStringView();
if(isTombstone(currentKey)) {
currentKey.remove_suffix(8u);
elements.emplace_back(currentKey);
}
iter->Next();
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::rawScan(StagingArea &stagingArea, std::string_view key, size_t count, std::vector &elements) {
elements.clear();
IteratorPtr iter(stagingArea.getIterator());
size_t items = 0;
for(iter->Seek(key); iter->Valid(); iter->Next()) {
if(items >= 1000000u || items >= count) break;
items++;
elements.emplace_back(iter->key().ToString());
elements.emplace_back(iter->value().ToString());
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hscan(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector &res) {
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
FieldLocator locator(KeyType::kHash, key, cursor);
res.clear();
newCursor = "";
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.toView()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.getPrefix())) break;
std::string fieldname = std::string(tmp.begin()+locator.getPrefixSize(), tmp.end());
if(res.size() >= count*2) {
newCursor = fieldname;
break;
}
res.push_back(fieldname);
res.push_back(iter->value().ToString());
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::lhscan(StagingArea &stagingArea, std::string_view key, std::string_view cursor, std::string_view matchloc, size_t count, std::string &newCursor, std::vector &results) {
if(!assertKeyType(stagingArea, key, KeyType::kLocalityHash)) return wrong_type();
std::string_view cursorHint;
std::string_view cursorField;
LocalityFieldLocator locator(key);
results.clear();
std::string patternPrefix = extractPatternPrefix(matchloc);
bool emptyPattern = (matchloc.empty() || matchloc == "*");
// Any rocksdb keys we touch must have this prefix.
size_t keyPrefixSize = locator.toView().size();
std::string requiredPrefix(locator.toView());
std::string_view scanStart = requiredPrefix;
if(!cursor.empty()) {
// Decompose cursor into hint + field.
EscapedPrefixExtractor extractor;
if(!extractor.parse(cursor)) {
return malformed("invalid cursor");
}
cursorHint = extractor.getOriginalPrefix();
cursorField = extractor.getRawSuffix();
// We start from the given hint + field.
locator.resetHint(cursorHint);
if(!patternPrefix.empty()) {
requiredPrefix = locator.toView();
}
locator.resetField(cursorField);
scanStart = locator.toView();
}
else if(!patternPrefix.empty()) {
locator.resetHint(patternPrefix);
requiredPrefix += patternPrefix;
scanStart = locator.toView();
if(patternPrefix.size() != matchloc.size()) {
scanStart.remove_suffix(2);
}
}
newCursor = "";
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(scanStart); iter->Valid(); iter->Next()) {
std::string_view rocksdbKey = iter->key().ToStringView();
if(!StringUtils::startsWith(rocksdbKey, requiredPrefix)) {
// It's over, we've iterated through the entire locality hash
break;
}
// Split hint + field
std::string_view hintPlusField = rocksdbKey;
hintPlusField.remove_prefix(keyPrefixSize);
EscapedPrefixExtractor splitter;
qdb_assert(splitter.parse(hintPlusField));
if(results.size() >= count*3) {
// We've hit result sizelimit, calculate new cursor and break
newCursor = hintPlusField;
break;
}
std::string_view localityHint = splitter.getOriginalPrefix();
// Populate new entry consisting of three items
if(emptyPattern || stringmatchlen(matchloc.data(), matchloc.length(), localityHint.data(), localityHint.length(), 0)) {
results.emplace_back(splitter.getOriginalPrefix());
results.emplace_back(splitter.getRawSuffix());
results.emplace_back(iter->value().ToStringView());
}
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::sscan(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector &res) {
if(!assertKeyType(stagingArea, key, KeyType::kSet)) return wrong_type();
FieldLocator locator(KeyType::kSet, key, cursor);
res.clear();
newCursor = "";
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.toView()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.getPrefix())) break;
std::string fieldname = std::string(tmp.begin()+locator.getPrefixSize(), tmp.end());
if(res.size() >= count) {
newCursor = fieldname;
break;
}
res.push_back(fieldname);
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::dequeScanBack(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector &res) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kDeque)) return wrong_type();
if(keyinfo.getSize() == 0) {
newCursor = "";
return rocksdb::Status::OK();
}
uint64_t cursorMarker;
if(cursor.size() == 0u) {
cursorMarker = keyinfo.getEndIndex();
}
else if(cursor.size() == 8u) {
cursorMarker = binaryStringToUnsignedInt(cursor.data());
if(cursorMarker > keyinfo.getEndIndex()) {
cursorMarker = keyinfo.getEndIndex();
}
}
else {
return malformed("invalid cursor");
}
uint64_t startingMarker = cursorMarker - count;
if(startingMarker <= keyinfo.getStartIndex() + 1) {
newCursor = "";
startingMarker = keyinfo.getStartIndex() + 1;
}
else {
newCursor = unsignedIntToBinaryString(startingMarker);
}
FieldLocator locator(KeyType::kDeque, key, unsignedIntToBinaryString(startingMarker));
IteratorPtr iter(stagingArea.getIterator());
iter->Seek(locator.toView());
for(uint64_t i = startingMarker; i < cursorMarker; i++) {
qdb_assert(iter->Valid());
locator.resetField(unsignedIntToBinaryString(i));
qdb_assert(locator.toView() == iter->key().ToStringView());
res.emplace_back(iter->value().ToStringView());
iter->Next();
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hvals(StagingArea &stagingArea, std::string_view key, std::vector &vals) {
if(!assertKeyType(stagingArea, key, KeyType::kHash)) return wrong_type();
FieldLocator locator(KeyType::kHash, key);
vals.clear();
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.toView())) break;
vals.push_back(iter->value().ToString());
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::sadd(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &added) {
added = 0;
WriteOperation operation(stagingArea, key, KeyType::kSet);
if(!operation.valid()) return wrong_type();
for(ReqIterator it = start; it != end; it++) {
bool exists = operation.fieldExists(*it);
if(!exists) {
operation.writeField(*it, "1");
added++;
}
}
return operation.finalize(operation.keySize() + added);
}
rocksdb::Status StateMachine::sismember(StagingArea &stagingArea, std::string_view key, std::string_view element) {
if(!assertKeyType(stagingArea, key, KeyType::kSet)) return wrong_type();
FieldLocator locator(KeyType::kSet, key, element);
std::string tmp;
return db->Get(stagingArea.snapshot->opts(), locator.toView(), &tmp);
}
rocksdb::Status StateMachine::srem(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed) {
removed = 0;
WriteOperation operation(stagingArea, key, KeyType::kSet);
if(!operation.valid()) return wrong_type();
for(ReqIterator it = start; it != end; it++) {
removed += operation.deleteField(*it);
}
return operation.finalize(operation.keySize() - removed);
}
rocksdb::Status StateMachine::smove(StagingArea &stagingArea, std::string_view source, std::string_view destination, std::string_view element, int64_t &outcome) {
WriteOperation operation1(stagingArea, source, KeyType::kSet);
if(!operation1.valid()) return wrong_type();
WriteOperation operation2(stagingArea, destination, KeyType::kSet);
if(!operation2.valid()) {
operation1.finalize(operation1.keySize());
return wrong_type();
}
if(operation1.deleteField(element)) {
outcome = 1;
operation1.finalize(operation1.keySize() - 1);
if(operation2.fieldExists(element)) {
// No-op
operation2.finalize(operation2.keySize());
}
else {
operation2.writeField(element, "1");
operation2.finalize(operation2.keySize() + 1);
}
return rocksdb::Status::OK();
}
// No operation performed, item does not exist
outcome = 0;
operation1.finalize(operation1.keySize());
operation2.finalize(operation2.keySize());
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::smembers(StagingArea &stagingArea, std::string_view key, std::vector &members) {
if(!assertKeyType(stagingArea, key, KeyType::kSet)) return wrong_type();
FieldLocator locator(KeyType::kSet, key);
members.clear();
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.toView())) break;
members.push_back(std::string(tmp.begin()+locator.getPrefixSize(), tmp.end()));
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::scard(StagingArea &stagingArea, std::string_view key, size_t &count) {
count = 0;
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kSet)) return wrong_type();
count = keyinfo.getSize();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::configGet(StagingArea &stagingArea, std::string_view key, std::string &value) {
ConfigurationLocator locator(key);
return stagingArea.get(locator.toView(), value);
}
rocksdb::Status StateMachine::configSet(StagingArea &stagingArea, std::string_view key, std::string_view value) {
// We don't use WriteOperation or key descriptors here,
// since kConfiguration is special.
std::string oldvalue = "N/A";
rocksdb::Status st = configGet(key, oldvalue);
if(st.ok()) oldvalue = SSTR("'" << oldvalue << "'");
qdb_info("Applying configuration update: Key " << key << " changes from " << oldvalue << " into '" << value << "'");
ConfigurationLocator locator(key);
stagingArea.put(locator.toView(), value);
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::configGetall(StagingArea &stagingArea, std::vector &res) {
IteratorPtr iter(stagingArea.getIterator());
res.clear();
std::string searchPrefix(1, char(InternalKeyType::kConfiguration));
for(iter->Seek(searchPrefix); iter->Valid(); iter->Next()) {
std::string rkey = iter->key().ToString();
if(rkey.size() == 0 || rkey[0] != char(InternalKeyType::kConfiguration)) break;
res.push_back(rkey.substr(1));
res.push_back(iter->value().ToString());
}
return rocksdb::Status::OK();
}
StateMachine::WriteOperation::~WriteOperation() {
if(!finalized) {
std::cerr << "WriteOperation being destroyed without having been finalized" << std::endl;
std::terminate();
}
}
StateMachine::WriteOperation::WriteOperation(StagingArea &staging, std::string_view key, const KeyType &type)
: stagingArea(staging), redisKey(key), expectedType(type) {
std::string tmp;
dlocator.reset(redisKey);
rocksdb::Status st = stagingArea.getForUpdate(dlocator.toView(), tmp);
if(st.IsNotFound()) {
keyinfo = KeyDescriptor();
}
else if(st.ok()) {
keyinfo = KeyDescriptor(tmp);
}
else {
qdb_throw("unexpected rocksdb status when inspecting KeyType entry " << dlocator.toString() << ": " << st.ToString());
}
redisKeyExists = !keyinfo.empty();
isValid = (keyinfo.empty()) || (keyinfo.getKeyType() == type);
if(keyinfo.empty() && isValid) {
keyinfo.setKeyType(expectedType);
if(expectedType == KeyType::kVersionedHash) {
keyinfo.setStartIndex(0u);
}
}
finalized = !isValid;
}
bool StateMachine::WriteOperation::descriptorModifiedAlreadyInWriteBatch() {
std::string ignored;
rocksdb::Status st = stagingArea.readFromWriteBatch(dlocator.toView(), ignored);
ASSERT_OK_OR_NOTFOUND(st);
return st.ok();
}
bool StateMachine::WriteOperation::valid() {
return isValid;
}
bool StateMachine::WriteOperation::keyExists() {
return redisKeyExists;
}
bool StateMachine::WriteOperation::getField(std::string_view field, std::string &out) {
assertWritable();
FieldLocator locator(keyinfo.getKeyType(), redisKey, field);
rocksdb::Status st = stagingArea.get(locator.toView(), out);
ASSERT_OK_OR_NOTFOUND(st);
return st.ok();
}
bool StateMachine::WriteOperation::getLocalityIndex(std::string_view field, std::string &out) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityIndexLocator locator(redisKey, field);
rocksdb::Status st = stagingArea.get(locator.toView(), out);
ASSERT_OK_OR_NOTFOUND(st);
return st.ok();
}
bool StateMachine::WriteOperation::getAndDeleteLocalityIndex(std::string_view field, std::string &out) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityIndexLocator locator(redisKey, field);
rocksdb::Status st = stagingArea.get(locator.toView(), out);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) {
stagingArea.del(locator.toView());
}
return st.ok();
}
int64_t StateMachine::WriteOperation::keySize() {
return keyinfo.getSize();
}
void StateMachine::WriteOperation::assertWritable() {
if(!isValid) qdb_throw("WriteOperation not valid!");
if(finalized) qdb_throw("WriteOperation already finalized!");
}
void StateMachine::WriteOperation::write(std::string_view value) {
assertWritable();
if(keyinfo.getKeyType() == KeyType::kString) {
StringLocator locator(redisKey);
stagingArea.put(locator.toView(), value);
}
else if(keyinfo.getKeyType() == KeyType::kLease) {
LeaseLocator locator(redisKey);
stagingArea.put(locator.toView(), value);
}
else {
qdb_throw("writing without a field makes sense only for strings and leases");
}
}
void StateMachine::WriteOperation::writeField(std::string_view field, std::string_view value) {
assertWritable();
if(keyinfo.getKeyType() != KeyType::kHash && keyinfo.getKeyType() != KeyType::kSet && keyinfo.getKeyType() != KeyType::kDeque && keyinfo.getKeyType() != KeyType::kVersionedHash) {
qdb_throw("writing with a field makes sense only for hashes, sets, or lists");
}
FieldLocator locator(keyinfo.getKeyType(), redisKey, field);
stagingArea.put(locator.toView(), value);
}
void StateMachine::WriteOperation::writeLocalityField(std::string_view hint, std::string_view field, std::string_view value) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityFieldLocator locator(redisKey, hint, field);
stagingArea.put(locator.toView(), value);
}
void StateMachine::WriteOperation::writeLocalityIndex(std::string_view field, std::string_view hint) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityIndexLocator locator(redisKey, field);
stagingArea.put(locator.toView(), hint);
}
void StateMachine::WriteOperation::cancel() {
finalized = true;
}
rocksdb::Status StateMachine::WriteOperation::finalize(int64_t newsize, bool forceUpdate) {
assertWritable();
if(newsize < 0) qdb_throw("invalid newsize: " << newsize);
if(newsize == 0 && keyinfo.getKeyType() != KeyType::kVersionedHash) {
stagingArea.del(dlocator.toView());
}
else if(keyinfo.getSize() != newsize || forceUpdate) {
keyinfo.setSize(newsize);
stagingArea.put(dlocator.toView(), keyinfo.serialize());
}
finalized = true;
return rocksdb::Status::OK(); // OK if return value is ignored
}
bool StateMachine::WriteOperation::fieldExists(std::string_view field) {
assertWritable();
FieldLocator locator(keyinfo.getKeyType(), redisKey, field);
rocksdb::Status st = stagingArea.exists(locator.toView());
ASSERT_OK_OR_NOTFOUND(st);
return st.ok();
}
bool StateMachine::WriteOperation::localityFieldExists(std::string_view hint, std::string_view field) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
LocalityFieldLocator locator(redisKey, hint, field);
rocksdb::Status st = stagingArea.exists(locator.toView());
ASSERT_OK_OR_NOTFOUND(st);
return st.ok();
}
bool StateMachine::WriteOperation::deleteField(std::string_view field) {
assertWritable();
std::string tmp;
FieldLocator locator(keyinfo.getKeyType(), redisKey, field);
rocksdb::Status st = stagingArea.get(locator.toView(), tmp);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) stagingArea.del(locator.toView());
return st.ok();
}
bool StateMachine::WriteOperation::deleteLocalityField(std::string_view hint, std::string_view field) {
assertWritable();
qdb_assert(keyinfo.getKeyType() == KeyType::kLocalityHash);
std::string tmp;
LocalityFieldLocator locator(redisKey, hint, field);
rocksdb::Status st = stagingArea.get(locator.toView(), tmp);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) stagingArea.del(locator.toView());
return st.ok();
}
rocksdb::Status StateMachine::set(StagingArea &stagingArea, std::string_view key, std::string_view value) {
WriteOperation operation(stagingArea, key, KeyType::kString);
if(!operation.valid()) return wrong_type();
operation.write(value);
return operation.finalize(value.size());
}
rocksdb::Status StateMachine::dequePush(StagingArea &stagingArea, Direction direction, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length) {
WriteOperation operation(stagingArea, key, KeyType::kDeque);
if(!operation.valid()) return wrong_type();
KeyDescriptor &descriptor = operation.descriptor();
uint64_t listIndex = descriptor.getListIndex(direction);
uint64_t itemsAdded = 0;
for(ReqIterator it = start; it != end; it++) {
operation.writeField(unsignedIntToBinaryString(listIndex + (itemsAdded*(int)direction)), *it);
itemsAdded++;
}
descriptor.setListIndex(direction, listIndex + (itemsAdded*(int)direction));
length = operation.keySize() + itemsAdded;
if(operation.keySize() == 0) {
descriptor.setListIndex(flipDirection(direction), listIndex + ((int)direction*-1));
}
return operation.finalize(length);
}
rocksdb::Status StateMachine::dequePushFront(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length) {
return this->dequePush(stagingArea, Direction::kLeft, key, start, end, length);
}
rocksdb::Status StateMachine::dequePushBack(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length) {
return this->dequePush(stagingArea, Direction::kRight, key, start, end, length);
}
rocksdb::Status StateMachine::dequePopFront(StagingArea &stagingArea, std::string_view key, std::string &item) {
return this->dequePop(stagingArea, Direction::kLeft, key, item);
}
rocksdb::Status StateMachine::dequePopBack(StagingArea &stagingArea, std::string_view key, std::string &item) {
return this->dequePop(stagingArea, Direction::kRight, key, item);
}
rocksdb::Status StateMachine::dequeTrimFront(StagingArea &stagingArea, std::string_view key, std::string_view maxToKeepStr, int64_t &itemsRemoved) {
int64_t maxToKeep;
if(!ParseUtils::parseInt64(maxToKeepStr, maxToKeep) || maxToKeep < 0) {
return malformed("value is not an integer or out of range");
}
WriteOperation operation(stagingArea, key, KeyType::kDeque);
if(!operation.valid()) return wrong_type();
KeyDescriptor &descriptor = operation.descriptor();
int64_t toRemove = descriptor.getSize() - maxToKeep;
if(toRemove <= 0) {
operation.cancel();
itemsRemoved = 0;
return rocksdb::Status::OK();
}
int64_t eliminated = 0;
for(uint64_t nextToEliminate = descriptor.getStartIndex()+1; nextToEliminate <=
descriptor.getStartIndex() + toRemove; nextToEliminate++) {
eliminated++;
qdb_assert(operation.deleteField(unsignedIntToBinaryString(nextToEliminate)));
}
qdb_assert(eliminated == toRemove);
itemsRemoved = toRemove;
descriptor.setStartIndex(descriptor.getStartIndex() + toRemove);
qdb_assert(descriptor.getEndIndex() - descriptor.getStartIndex() - 1 == (uint64_t) maxToKeep);
return operation.finalize(descriptor.getEndIndex() - descriptor.getStartIndex() - 1);
}
void StateMachine::loadExpirationCache() {
StagingArea stagingArea(*this);
std::scoped_lock lock(mExpirationCacheMutex);
ExpirationEventIterator iter(stagingArea);
while(iter.valid()) {
mExpirationCache.insert(iter.getDeadline(), std::string(iter.getRedisKey()));
iter.next();
}
}
void StateMachine::advanceClock(StagingArea &stagingArea, ClockValue newValue) {
std::scoped_lock lock(mExpirationCacheMutex);
// Assert we're not setting the clock back..
ClockValue prevValue;
getClock(stagingArea, prevValue);
if(newValue < prevValue) {
qdb_throw("Attempted to set state machine clock in the past: " << prevValue << " ==> " << newValue);
}
// Clear out any leases past the deadline
while(!mExpirationCache.empty() && mExpirationCache.getFrontDeadline() <= newValue) {
qdb_assert(lease_release(stagingArea, std::string(mExpirationCache.getFrontLease()), ClockValue(0)).ok());
}
// Update value
stagingArea.put(KeyConstants::kStateMachine_Clock, unsignedIntToBinaryString(newValue));
}
rocksdb::Status StateMachine::lease_get(StagingArea &stagingArea, std::string_view key, ClockValue clockUpdate, LeaseInfo &info) {
std::scoped_lock lock(mExpirationCacheMutex);
// Advance clock, and clear out any expired leases.
maybeAdvanceClock(stagingArea, clockUpdate);
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(keyinfo.empty()) {
return rocksdb::Status::NotFound();
}
if(keyinfo.getKeyType() != KeyType::kLease) {
return wrong_type();
}
LeaseLocator locator(key);
std::string value;
THROW_ON_ERROR(stagingArea.get(locator.toView(), value));
info = LeaseInfo(value, keyinfo.getStartIndex(), keyinfo.getEndIndex());
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::artificiallySlowWriteNeverUseThis(StagingArea &stagingArea, std::string_view sleepStr) {
// Artificially block the state machine for the specified duration of time
int64_t sleepMs;
if(!ParseUtils::parseInt64(sleepStr, sleepMs) || sleepMs < 0) {
return malformed("value is not an integer or out of range");
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::vhset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, uint64_t &version) {
WriteOperation operation(stagingArea, key, KeyType::kVersionedHash);
if(!operation.valid()) return wrong_type();
bool fieldcreated = !operation.fieldExists(field);
int64_t newsize = operation.keySize() + fieldcreated;
operation.writeField(field, value);
// Have we modified this key in the same write batch already?
// If yes:
// - We have already incremented the version, nothing to do. Each transaction
// towards the state machine counts as a single version.
// If not:
// - We need to increment the version by one.
KeyDescriptor &descriptor = operation.descriptor();
version = descriptor.getStartIndex();
if(!operation.descriptorModifiedAlreadyInWriteBatch()) {
version++;
descriptor.setStartIndex(version);
}
VersionedHashRevision &revision = stagingArea.getRevisionTracker().forKey(key);
revision.setRevisionNumber(version);
revision.addUpdate(field, value);
return operation.finalize(newsize, true);
}
rocksdb::Status StateMachine::vhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, uint64_t &version) {
int64_t removed = 0;
WriteOperation operation(stagingArea, key, KeyType::kVersionedHash);
if(!operation.valid()) return wrong_type();
VersionedHashRevision &revision = stagingArea.getRevisionTracker().forKey(key);
for(ReqIterator it = start; it != end; it++) {
removed += operation.deleteField(*it);
revision.addUpdate(*it, "");
}
// Have we modified this key in the same write batch already?
// If yes:
// - We have already incremented the version, nothing to do. Each transaction
// towards the state machine counts as a single version.
// If not:
// - We need to increment the version by one.
KeyDescriptor &descriptor = operation.descriptor();
version = descriptor.getStartIndex();
if(removed != 0 && !operation.descriptorModifiedAlreadyInWriteBatch()) {
version++;
descriptor.setStartIndex(version);
}
revision.setRevisionNumber(version);
int64_t newsize = operation.keySize() - removed;
return operation.finalize(newsize, true);
}
rocksdb::Status
StateMachine::vhkeys(StagingArea &stagingArea, std::string_view key,
std::vector &res, uint64_t &version)
{
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(keyinfo.empty()) {
res.clear();
version = 0u;
return rocksdb::Status::OK();
}
if(keyinfo.getKeyType() != KeyType::kVersionedHash) {
return wrong_type();
}
res.clear();
FieldLocator locator(KeyType::kVersionedHash, key);
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.toView())) break;
res.push_back(std::string(tmp.begin() + locator.getPrefixSize(), tmp.end()));
}
version = keyinfo.getStartIndex();
return rocksdb::Status::OK();
}
rocksdb::Status
StateMachine::vhgetall(StagingArea &stagingArea, std::string_view key,
std::vector &res, uint64_t &version)
{
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(keyinfo.empty()) {
res.clear();
version = 0u;
return rocksdb::Status::OK();
}
if(keyinfo.getKeyType() != KeyType::kVersionedHash) {
return wrong_type();
}
res.clear();
FieldLocator locator(KeyType::kVersionedHash, key);
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.toView())) break;
res.push_back(std::string(tmp.begin()+locator.getPrefixSize(), tmp.end()));
res.push_back(iter->value().ToString());
}
version = keyinfo.getStartIndex();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::vhlen(StagingArea &stagingArea, std::string_view key, size_t &len) {
len = 0;
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kVersionedHash)) return wrong_type();
len = keyinfo.getSize();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::hclone(StagingArea &stagingArea, std::string_view source, std::string_view target) {
WriteOperation operation(stagingArea, target, KeyType::kHash);
if(!operation.valid()) return wrong_type();
if(operation.keyExists()) {
operation.cancel();
return rocksdb::Status::InvalidArgument("ERR target key already exists, will not overwrite");
}
KeyDescriptor sourceKeyInfo = getKeyDescriptor(stagingArea, source);
if(sourceKeyInfo.empty()) {
operation.cancel();
return rocksdb::Status::OK(); // source key is empty, do nothing
}
if(sourceKeyInfo.getKeyType() != KeyType::kHash) {
operation.cancel();
return wrong_type();
}
int64_t newsize = 0;
FieldLocator locator(KeyType::kHash, source);
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.getPrefix()); iter->Valid(); iter->Next()) {
std::string tmp = iter->key().ToString();
if(!StringUtils::startsWith(tmp, locator.toView())) break;
operation.writeField(
std::string(tmp.begin()+locator.getPrefixSize(), tmp.end()),
iter->value().ToString()
);
newsize++;
}
qdb_assert(newsize == sourceKeyInfo.getSize());
return operation.finalize(newsize);
}
void StateMachine::advanceClock(ClockValue newValue, LogIndex index) {
StagingArea stagingArea(*this);
std::scoped_lock lock(mExpirationCacheMutex);
advanceClock(stagingArea, newValue);
stagingArea.commit(index);
}
ClockValue StateMachine::maybeAdvanceClock(StagingArea &stagingArea, ClockValue clockUpdate) {
std::scoped_lock lock(mExpirationCacheMutex);
// Get current clock time.
ClockValue currentClock;
getClock(stagingArea, currentClock);
// Two cases:
// - currentClock is behind clockUpdate - should be by far the most common.
// Simply update currentClock to clockUpdate.
// - currentClock is ahead.. we were hit by a rare race condition. Advance
// clockUpdate to currentClock instead.
if(currentClock < clockUpdate) {
advanceClock(stagingArea, clockUpdate);
return clockUpdate;
}
else {
return currentClock;
}
}
void StateMachine::getClock(StagingArea &stagingArea, ClockValue &value) {
std::string prevValue;
THROW_ON_ERROR(stagingArea.get(KeyConstants::kStateMachine_Clock, prevValue));
if(prevValue.size() != 8u) {
qdb_throw("Clock corruption, expected exactly 8 bytes, got " << prevValue.size());
}
value = binaryStringToUnsignedInt(prevValue.c_str());
}
void StateMachine::lease_get_pending_expiration_events(StagingArea &stagingArea, ClockValue &staticClock, ClockValue &dynamicClock, std::vector &events) {
std::scoped_lock lock(mExpirationCacheMutex);
events.clear();
getClock(stagingArea, staticClock);
dynamicClock = getDynamicClock();
ExpirationEventIterator iter(stagingArea);
while(iter.valid()) {
events.emplace_back(iter.getRedisKey(), iter.getDeadline());
iter.next();
}
}
void StateMachine::getType(StagingArea &stagingArea, std::string_view key, std::string &keyType) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
keyType = keyTypeAsString(keyinfo.getKeyType());
}
void StateMachine::getClock(ClockValue &value) {
StagingArea stagingArea(*this, true);
std::scoped_lock lock(mExpirationCacheMutex);
getClock(stagingArea, value);
}
LeaseAcquisitionStatus StateMachine::lease_acquire(StagingArea &stagingArea, std::string_view key, std::string_view value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info) {
std::scoped_lock lock(mExpirationCacheMutex);
qdb_assert(!value.empty());
// First, some timekeeping, update clock time if necessary.
clockUpdate = maybeAdvanceClock(stagingArea, clockUpdate);
// Ensure the key pointed to is either a lease, or non-existent.
WriteOperation operation(stagingArea, key, KeyType::kLease);
if(!operation.valid()) return LeaseAcquisitionStatus::kKeyTypeMismatch;
// Quick check that no-one else holds the lease right now.
// Could it be that the lease has actually expired? Not at this point.
// advanceClock() should have taken care of removing expired leases.
LeaseLocator locator(key);
std::string oldLeaseHolder;
rocksdb::Status st = stagingArea.get(locator.toView(), oldLeaseHolder);
ASSERT_OK_OR_NOTFOUND(st);
if(st.ok()) {
if(oldLeaseHolder != value) {
KeyDescriptor &descriptor = operation.descriptor();
info = LeaseInfo(oldLeaseHolder, descriptor.getStartIndex(), descriptor.getEndIndex());
operation.cancel();
return LeaseAcquisitionStatus::kFailedDueToOtherOwner;
}
}
// Looks good.. Either the lease is held by the same holder, and this is
// simply an extension request, or this is a new lease altogether.
KeyDescriptor &descriptor = operation.descriptor();
bool extended = operation.keyExists();
if(operation.keyExists()) {
// Lease extension.. need to wipe out old pending expiration event
ExpirationEventLocator oldEvent(descriptor.getEndIndex(), key);
mExpirationCache.remove(descriptor.getEndIndex(), std::string(key));
THROW_ON_ERROR(stagingArea.exists(oldEvent.toView()));
stagingArea.singleDelete(oldEvent.toView());
}
// Anchor expiration timestamp based on clockUpdate.
ClockValue expirationTimestamp = clockUpdate + duration;
descriptor.setStartIndex(clockUpdate);
descriptor.setEndIndex(expirationTimestamp);
// Store expiration event.
ExpirationEventLocator newEvent(expirationTimestamp, key);
mExpirationCache.insert(expirationTimestamp, std::string(key));
stagingArea.put(newEvent.toView(), "1");
// Update lease value.
operation.write(value);
info = LeaseInfo(value, descriptor.getStartIndex(), descriptor.getEndIndex());
operation.finalize(value.size(), true);
if(extended) return LeaseAcquisitionStatus::kRenewed;
return LeaseAcquisitionStatus::kAcquired;
}
rocksdb::Status StateMachine::lease_release(StagingArea &stagingArea, std::string_view key, ClockValue clockUpdate) {
std::scoped_lock lock(mExpirationCacheMutex);
// First, some timekeeping, update clock time if necessary.
if(clockUpdate != 0u) {
// maybeAdvanceClock will also call this function.. Avoid infinite loop
// by supplying clockUpdate == 0u.
maybeAdvanceClock(stagingArea, clockUpdate);
}
WriteOperation operation(stagingArea, key, KeyType::kLease);
if(!operation.valid()) return wrong_type();
if(!operation.keyExists()) {
operation.finalize(0u);
return rocksdb::Status::NotFound();
}
KeyDescriptor &descriptor = operation.descriptor();
ExpirationEventLocator event(descriptor.getEndIndex(), key);
THROW_ON_ERROR(stagingArea.exists(event.toView()));
mExpirationCache.remove(descriptor.getEndIndex(), std::string(key));
stagingArea.singleDelete(event.toView());
LeaseLocator leaseLocator(key);
THROW_ON_ERROR(stagingArea.exists(leaseLocator.toView()));
stagingArea.del(leaseLocator.toView());
return operation.finalize(0u);
}
rocksdb::Status StateMachine::dequeLen(StagingArea &stagingArea, std::string_view key, size_t &len) {
len = 0;
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, key);
if(isWrongType(keyinfo, KeyType::kDeque)) return wrong_type();
len = keyinfo.getSize();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::dequePop(StagingArea &stagingArea, Direction direction, std::string_view key, std::string &item) {
WriteOperation operation(stagingArea, key, KeyType::kDeque);
if(!operation.valid()) return wrong_type();
// nothing to do, return empty string
if(operation.keySize() == 0) {
item = "";
operation.finalize(0);
return rocksdb::Status::NotFound();
}
KeyDescriptor &descriptor = operation.descriptor();
uint64_t listIndex = descriptor.getListIndex(direction);
uint64_t victim = listIndex + (int)direction*(-1);
std::string field = unsignedIntToBinaryString(victim);
qdb_assert(operation.getField(field, item));
qdb_assert(operation.deleteField(field));
descriptor.setListIndex(direction, victim);
return operation.finalize(operation.keySize() - 1);
}
StateMachine::Snapshot::Snapshot(rocksdb::DB *db_) {
db = db_;
snapshot = db->GetSnapshot();
if(snapshot == nullptr) qdb_throw("unable to take db snapshot");
options.snapshot = snapshot;
}
StateMachine::Snapshot::~Snapshot() {
db->ReleaseSnapshot(snapshot);
}
rocksdb::ReadOptions& StateMachine::Snapshot::opts() {
return options;
}
rocksdb::Status StateMachine::get(StagingArea &stagingArea, std::string_view key, std::string &value) {
if(!assertKeyType(stagingArea, key, KeyType::kString)) return wrong_type();
StringLocator slocator(key);
return stagingArea.get(slocator.toView(), value);
}
void StateMachine::remove_all_with_prefix(std::string_view prefix, int64_t &removed, StagingArea &stagingArea) {
removed = 0;
std::string tmp;
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(prefix); iter->Valid(); iter->Next()) {
// iter->key() may get deleted from under our feet, better keep a copy
std::string key = iter->key().ToString();
if(!StringUtils::startsWith(key, prefix)) break;
if(key.size() > 0 && (key[0] == char(InternalKeyType::kInternal) || key[0] == char(InternalKeyType::kConfiguration))) continue;
stagingArea.del(key);
removed++;
}
}
rocksdb::Status StateMachine::del(StagingArea &stagingArea, const ReqIterator &start, const ReqIterator &end, int64_t &removed) {
removed = 0;
for(ReqIterator it = start; it != end; it++) {
DescriptorLocator dlocator(*it);
KeyDescriptor keyInfo = lockKeyDescriptor(stagingArea, dlocator);
if(keyInfo.empty()) continue;
std::string tmp;
if(keyInfo.getKeyType() == KeyType::kString) {
StringLocator slocator(*it);
THROW_ON_ERROR(stagingArea.get(slocator.toView(), tmp));
stagingArea.del(slocator.toView());
}
else if(keyInfo.getKeyType() == KeyType::kHash || keyInfo.getKeyType() == KeyType::kSet || keyInfo.getKeyType() == KeyType::kDeque || keyInfo.getKeyType() == KeyType::kVersionedHash) {
FieldLocator locator(keyInfo.getKeyType(), *it);
int64_t count = 0;
remove_all_with_prefix(locator.toView(), count, stagingArea);
if(count != keyInfo.getSize()) qdb_throw("mismatch between keyInfo counter and number of elements deleted by remove_all_with_prefix: " << count << " vs " << keyInfo.getSize());
}
else if(keyInfo.getKeyType() == KeyType::kLocalityHash) {
// wipe out fields
LocalityFieldLocator fieldLocator(*it);
int64_t count = 0;
remove_all_with_prefix(fieldLocator.toView(), count, stagingArea);
if(count != keyInfo.getSize()) qdb_throw("mismatch between keyInfo counter and number of elements deleted by remove_all_with_prefix: " << count << " vs " << keyInfo.getSize());
// wipe out indexes
LocalityIndexLocator indexLocator(*it);
count = 0;
remove_all_with_prefix(indexLocator.toView(), count, stagingArea);
if(count != keyInfo.getSize()) qdb_throw("mismatch between keyInfo counter and number of elements deleted by remove_all_with_prefix: " << count << " vs " << keyInfo.getSize());
}
else if(keyInfo.getKeyType() == KeyType::kLease) {
THROW_ON_ERROR(lease_release(stagingArea, it->sv(), 0u));
}
else {
qdb_throw("DEL called on unknown keytype - should never happen");
}
removed++;
stagingArea.del(dlocator.toView());
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::exists(StagingArea &stagingArea, const ReqIterator &start, const ReqIterator &end, int64_t &count) {
count = 0;
for(auto it = start; it != end; it++) {
KeyDescriptor keyinfo = getKeyDescriptor(stagingArea, *it);
if(!keyinfo.empty()) {
count++;
}
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::keys(StagingArea &stagingArea, std::string_view pattern, std::vector &result) {
result.clear();
std::string oldCursor;
std::string newCursor;
while(true) {
rocksdb::Status st = scan(stagingArea, oldCursor, pattern, std::numeric_limits::max()-1, newCursor, result);
if(!st.ok()) return st;
if(newCursor.empty()) {
break;
}
oldCursor = newCursor;
}
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::scan(StagingArea &stagingArea, std::string_view cursor, std::string_view pattern, size_t count, std::string &newcursor, std::vector &results) {
// Any hits *must* start with patternPrefix. This will allow us in many
// circumstances to eliminate checking large parts of the keyspace, without
// having to call stringmatchlen.
// Best-case pattern is "sometext*", where there are no wasted iterations.
std::string patternPrefix = extractPatternPrefix(pattern);
DescriptorLocator locator;
if(cursor.empty()) {
locator.reset(patternPrefix);
}
else {
locator.reset(cursor);
}
size_t iterations = 0;
bool emptyPattern = (pattern.empty() || pattern == "*");
IteratorPtr iter(stagingArea.getIterator());
for(iter->Seek(locator.toView()); iter->Valid(); iter->Next()) {
iterations++;
std::string rkey = iter->key().ToString();
// Check if we should terminate the search
if(rkey.size() == 0 || rkey[0] != char(InternalKeyType::kDescriptor)) break;
if(!StringUtils::isPrefix(patternPrefix, rkey.c_str()+1, rkey.size()-1)) {
// Take a shortcut and break scanning early,
// since no more matches can possibly exist.
break;
}
if(iterations > count) {
newcursor = rkey.substr(1);
return rocksdb::Status::OK();
}
if(emptyPattern || stringmatchlen(pattern.data(), pattern.length(), rkey.data()+1, rkey.length()-1, 0)) {
results.push_back(rkey.substr(1));
}
}
newcursor.clear();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::flushall(StagingArea &stagingArea) {
std::scoped_lock lock(mExpirationCacheMutex);
int64_t tmp;
remove_all_with_prefix("", tmp, stagingArea);
mExpirationCache.clear();
return rocksdb::Status::OK();
}
rocksdb::Status StateMachine::checkpoint(const std::string &path) {
rocksdb::Checkpoint *checkpoint = nullptr;
RETURN_ON_ERROR(rocksdb::Checkpoint::Create(db.get(), &checkpoint));
rocksdb::Status st = checkpoint->CreateCheckpoint(path);
delete checkpoint;
return st;
}
std::string StateMachine::statistics() {
std::string stats;
db->GetProperty("rocksdb.stats", &stats);
return stats;
}
//------------------------------------------------------------------------------
// Get level stats
//------------------------------------------------------------------------------
std::string StateMachine::levelStats() {
std::string stats;
db->GetProperty(rocksdb::DB::Properties::kLevelStats, &stats);
return stats;
}
//------------------------------------------------------------------------------
// Get compression stats
//------------------------------------------------------------------------------
std::vector StateMachine::compressionStats() {
std::vector results;
for(size_t i = 0; i <= 6; i++) {
std::string tmp;
db->GetProperty(SSTR(rocksdb::DB::Properties::kCompressionRatioAtLevelPrefix << i), &tmp);
results.emplace_back(tmp);
}
return results;
}
//------------------------------------------------------------------------------
// Get command stats
//------------------------------------------------------------------------------
Statistics StateMachine::commandStats() {
return requestCounter.getOverallStats();
}
rocksdb::Status StateMachine::noop(LogIndex index) {
StagingArea stagingArea(*this);
return stagingArea.commit(index);
}
//------------------------------------------------------------------------------
// Return health information regarding free space
//------------------------------------------------------------------------------
HealthIndicator StateMachine::getFreeSpaceHealth() {
std::string description = "SM-FREE-SPACE";
struct statfs out;
if(statfs(filename.c_str(), &out) != 0) {
return { HealthIndicator(HealthStatus::kRed, description, SSTR("Could not statfs '" << filename << "'")) };
}
HealthStatus status = HealthStatus::kGreen;
constexpr int64_t bytesInGiB = 1024 * 1024 * 1024;
int64_t freeBytes = out.f_bavail * out.f_bsize;
double percentFree = 100 - (100.0 * (double) (out.f_blocks - out.f_bfree) / (double) (out.f_blocks - out.f_bfree + out.f_bavail));
// Red if less than 1 GiB, yellow if less than 5 GiB, green otherwise
if(freeBytes <= bytesInGiB) {
status = chooseWorstHealth(status, HealthStatus::kRed);
}
else if(freeBytes <= 5 * bytesInGiB) {
status = chooseWorstHealth(status, HealthStatus::kYellow);
}
// Red if less thanĀ 3% of total capacity, yellow if less than 10%, green otherwise
if(percentFree <= 3) {
status = chooseWorstHealth(status, HealthStatus::kRed);
}
else if(percentFree <= 10) {
status = chooseWorstHealth(status, HealthStatus::kYellow);
}
return HealthIndicator(status, description, SSTR(freeBytes << " bytes (" << percentFree << "%)"));
}
//------------------------------------------------------------------------------
// Return health information about the state machine
//------------------------------------------------------------------------------
std::vector StateMachine::getHealthIndicators() {
std::string description = "SM-MANIFEST-TIMEDIFF";
HealthStatus healthStatus = HealthStatus::kGreen;
Status status = manifestChecker->getLastStatus();
if(!status.ok()) {
healthStatus = HealthStatus::kRed;
}
return { getFreeSpaceHealth(), HealthIndicator(healthStatus, description, status.getMsg()) };
}
rocksdb::Status StateMachine::manualCompaction() {
qdb_event("Triggering manual state machine compaction.. auto-compaction will be disabled while the manual one is running.");
// Disabling auto-compactions is a hack to prevent write-stalling. Pending compaction
// bytes will jump to the total size of the DB as soon as a manual compaction is
// issued, which will most likely stall or completely stop writes for a long time.
// (depends on the size of the DB)
// This is a recommendation by rocksdb devs as a workaround: Disabling auto
// compactions will disable write-stalling as well.
THROW_ON_ERROR(db->SetOptions( { {"disable_auto_compactions", "true"} } ));
rocksdb::CompactRangeOptions opts;
opts.bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce;
rocksdb::Status st = db->CompactRange(opts, nullptr, nullptr);
THROW_ON_ERROR(db->SetOptions( { {"disable_auto_compactions", "false"} } ));
qdb_event("Manual state machine compaction has completed with status " << st.ToString());
return st;
}
void StateMachine::finalizeBulkload() {
qdb_event("Finalizing bulkload, issuing manual compaction...");
THROW_ON_ERROR(manualCompaction());
qdb_event("Manual compaction was successful. Building key descriptors...");
KeyDescriptorBuilder builder(*this);
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_InBulkload, boolToString(false)));
qdb_event("All done, bulkload is over. Restart quarkdb in standalone mode.");
}
StateMachine::IteratorPtr StateMachine::getRawIterator() {
rocksdb::ReadOptions readOpts;
readOpts.total_order_seek = true;
return IteratorPtr(db->NewIterator(readOpts));
}
void StateMachine::commitBatch(rocksdb::WriteBatch &batch) {
rocksdb::WriteOptions opts;
opts.disableWAL = !writeAheadLog;
THROW_ON_ERROR(db->Write(opts, &batch));
}
rocksdb::Status StateMachine::verifyChecksum() {
qdb_info("Initiating a full checksum scan of the state machine.");
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
rocksdb::Status status = db->VerifyChecksum();
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::chrono::seconds duration = std::chrono::duration_cast(end - start);
if(status.ok()) {
qdb_info("State machine checksum scan successful! (took " << formatTime(duration) << ")");
}
else {
qdb_critical("State machine corruption, checksum verification failed: " << status.ToString());
}
return status;
}
bool StateMachine::waitUntilTargetLastApplied(LogIndex targetLastApplied, std::chrono::milliseconds duration) {
std::unique_lock lock(lastAppliedMtx);
if(targetLastApplied <= lastApplied) {
return true;
}
lastAppliedCV.wait_for(lock, duration);
return (targetLastApplied <= lastApplied);
}
//------------------------------------------------------------------------------
// Extremely dangerous operation, the state-machine should NOT be part
// of an active raft-machinery when this function is called, or even facing
// client traffic at all.
//------------------------------------------------------------------------------
void StateMachine::forceResetLastApplied(LogIndex newLastApplied) {
std::scoped_lock lock(lastAppliedMtx);
qdb_info("Resetting lastApplied for state-machine stored in '" << filename << "': " << lastApplied << " => " << newLastApplied);
THROW_ON_ERROR(db->Put(rocksdb::WriteOptions(), KeyConstants::kStateMachine_LastApplied, intToBinaryString(newLastApplied)));
lastApplied = newLastApplied;
}
void StateMachine::commitTransaction(rocksdb::WriteBatchWithIndex &wb, LogIndex index) {
std::scoped_lock lock(lastAppliedMtx);
if(index <= 0 && lastApplied > 0) qdb_throw("provided invalid index for version-tracked database: " << index << ", current last applied: " << lastApplied);
if(index > 0) {
if(index != lastApplied+1) qdb_throw("attempted to perform illegal lastApplied update: " << lastApplied << " ==> " << index);
THROW_ON_ERROR(wb.Put(KeyConstants::kStateMachine_LastApplied, intToBinaryString(index)));
}
rocksdb::WriteOptions opts;
opts.disableWAL = !writeAheadLog;
rocksdb::Status st = db->Write(opts, wb.GetWriteBatch() );
if(index > 0 && st.ok()) lastApplied = index;
if(!st.ok()) qdb_throw("unable to commit transaction with index " << index << ": " << st.ToString());
// Notify that last applied has changed
lastAppliedCV.notify_all();
}
// Simple API for writes - chain to pipelined API using a single operation per batch
#define CHAIN(index, func, ...) { \
StagingArea stagingArea(*this); \
auto st = this->func(stagingArea, ## __VA_ARGS__); \
stagingArea.commit(index); \
return st; \
}
#define CHAIN_READ(func, ...) { \
StagingArea stagingArea(*this, true); \
return this->func(stagingArea, ## __VA_ARGS__); \
}
//------------------------------------------------------------------------------
// Convenience functions, without having to manually instantiate a staging area.
// Reads:
//------------------------------------------------------------------------------
rocksdb::Status StateMachine::get(std::string_view key, std::string &value) {
CHAIN_READ(get, key, value);
}
rocksdb::Status StateMachine::exists(const ReqIterator &start, const ReqIterator &end, int64_t &count) {
CHAIN_READ(exists, start, end, count);
}
rocksdb::Status StateMachine::keys(std::string_view pattern, std::vector &result) {
CHAIN_READ(keys, pattern, result);
}
rocksdb::Status StateMachine::scan(std::string_view cursor, std::string_view pattern, size_t count, std::string &newcursor, std::vector &results) {
CHAIN_READ(scan, cursor, pattern, count, newcursor, results);
}
rocksdb::Status StateMachine::hget(std::string_view key, std::string_view field, std::string &value) {
CHAIN_READ(hget, key, field, value);
}
rocksdb::Status StateMachine::hexists(std::string_view key, std::string_view field) {
CHAIN_READ(hexists, key, field);
}
rocksdb::Status StateMachine::hkeys(std::string_view key, std::vector &keys) {
CHAIN_READ(hkeys, key, keys);
}
rocksdb::Status StateMachine::hgetall(std::string_view key, std::vector &res) {
CHAIN_READ(hgetall, key, res);
}
rocksdb::Status StateMachine::hlen(std::string_view key, size_t &len) {
CHAIN_READ(hlen, key, len);
}
rocksdb::Status StateMachine::hvals(std::string_view key, std::vector &vals) {
CHAIN_READ(hvals, key, vals);
}
rocksdb::Status StateMachine::hscan(std::string_view key, std::string_view cursor, size_t count, std::string &newcursor, std::vector &results) {
CHAIN_READ(hscan, key, cursor, count, newcursor, results);
}
rocksdb::Status StateMachine::sismember(std::string_view key, std::string_view element) {
CHAIN_READ(sismember, key, element);
}
rocksdb::Status StateMachine::smembers(std::string_view key, std::vector &members) {
CHAIN_READ(smembers, key, members);
}
rocksdb::Status StateMachine::scard(std::string_view key, size_t &count) {
CHAIN_READ(scard, key, count);
}
rocksdb::Status StateMachine::sscan(std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector &res) {
CHAIN_READ(sscan, key, cursor, count, newCursor, res);
}
rocksdb::Status StateMachine::dequeLen(std::string_view key, size_t &len) {
CHAIN_READ(dequeLen, key, len);
}
rocksdb::Status StateMachine::configGet(std::string_view key, std::string &value) {
CHAIN_READ(configGet, key, value);
}
rocksdb::Status StateMachine::configGetall(std::vector &res) {
CHAIN_READ(configGetall, res);
}
rocksdb::Status StateMachine::lhlen(std::string_view key, size_t &len) {
CHAIN_READ(lhlen, key, len);
}
rocksdb::Status StateMachine::lhget(std::string_view key, std::string_view field, std::string_view hint, std::string &value) {
CHAIN_READ(lhget, key, field, hint, value);
}
rocksdb::Status StateMachine::vhgetall(std::string_view key, std::vector &res, uint64_t &version) {
CHAIN_READ(vhgetall, key, res, version);
}
rocksdb::Status StateMachine::rawScanTombstones(std::string_view seek, size_t count, std::vector &keys) {
CHAIN_READ(rawScanTombstones, seek, count, keys);
}
//------------------------------------------------------------------------------
// Writes:
//------------------------------------------------------------------------------
rocksdb::Status StateMachine::hset(std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated, LogIndex index) {
CHAIN(index, hset, key, field, value, fieldcreated);
}
rocksdb::Status StateMachine::hmset(std::string_view key, const ReqIterator &start, const ReqIterator &end, LogIndex index) {
CHAIN(index, hmset, key, start, end);
}
rocksdb::Status StateMachine::hsetnx(std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated, LogIndex index) {
CHAIN(index, hsetnx, key, field, value, fieldcreated);
}
rocksdb::Status StateMachine::hincrby(std::string_view key, std::string_view field, std::string_view incrby, int64_t &result, LogIndex index) {
CHAIN(index, hincrby, key, field, incrby, result);
}
rocksdb::Status StateMachine::hincrbyfloat(std::string_view key, std::string_view field, std::string_view incrby, double &result, LogIndex index) {
CHAIN(index, hincrbyfloat, key, field, incrby, result);
}
rocksdb::Status StateMachine::hdel(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed, LogIndex index) {
CHAIN(index, hdel, key, start, end, removed);
}
rocksdb::Status StateMachine::sadd(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &added, LogIndex index) {
CHAIN(index, sadd, key, start, end, added);
}
rocksdb::Status StateMachine::srem(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed, LogIndex index) {
CHAIN(index, srem, key, start, end, removed);
}
rocksdb::Status StateMachine::set(std::string_view key, std::string_view value, LogIndex index) {
CHAIN(index, set, key, value);
}
rocksdb::Status StateMachine::del(const ReqIterator &start, const ReqIterator &end, int64_t &removed, LogIndex index) {
CHAIN(index, del, start, end, removed);
}
rocksdb::Status StateMachine::flushall(LogIndex index) {
CHAIN(index, flushall);
}
rocksdb::Status StateMachine::dequePopFront(std::string_view key, std::string &item, LogIndex index) {
CHAIN(index, dequePopFront, key, item);
}
rocksdb::Status StateMachine::dequePopBack(std::string_view key, std::string &item, LogIndex index) {
CHAIN(index, dequePopBack, key, item);
}
rocksdb::Status StateMachine::dequePushFront(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length, LogIndex index) {
CHAIN(index, dequePushFront, key, start, end, length);
}
rocksdb::Status StateMachine::dequePushBack(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length, LogIndex index) {
CHAIN(index, dequePushBack, key, start, end, length);
}
rocksdb::Status StateMachine::configSet(std::string_view key, std::string_view value, LogIndex index) {
CHAIN(index, configSet, key, value);
}
rocksdb::Status StateMachine::lhset(std::string_view key, std::string_view field, std::string_view hint, std::string_view value, bool &fieldcreated, LogIndex index) {
CHAIN(index, lhset, key, field, hint, value, fieldcreated);
}
LeaseAcquisitionStatus StateMachine::lease_acquire(std::string_view key, std::string_view value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info, LogIndex index) {
CHAIN(index, lease_acquire, key, value, clockUpdate, duration, info);
}
rocksdb::Status StateMachine::lease_get(std::string_view key, ClockValue clockUpdate, LeaseInfo &info, LogIndex index) {
CHAIN(index, lease_get, key, clockUpdate, info);
}
rocksdb::Status StateMachine::lease_release(std::string_view key, ClockValue clockUpdate, LogIndex index) {
CHAIN(index, lease_release, key, clockUpdate);
}
rocksdb::Status StateMachine::dequeTrimFront(std::string_view key, std::string_view maxToKeep, int64_t &itemsRemoved, LogIndex index) {
CHAIN(index, dequeTrimFront, key, maxToKeep, itemsRemoved);
}
rocksdb::Status StateMachine::vhset(std::string_view key, std::string_view field, std::string_view value, uint64_t &version, LogIndex index) {
CHAIN(index, vhset, key, field, value, version);
}