// ----------------------------------------------------------------------
// File: StateMachine.hh
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef QUARKDB_STATE_MACHINE_H
#define QUARKDB_STATE_MACHINE_H
#include "Timekeeper.hh"
#include "Common.hh"
#include "RedisRequest.hh"
#include "utils/Macros.hh"
#include "utils/RequestCounter.hh"
#include "storage/KeyDescriptor.hh"
#include "storage/KeyLocators.hh"
#include "storage/KeyConstants.hh"
#include "storage/LeaseInfo.hh"
#include "storage/ExpirationEventCache.hh"
#include "health/HealthIndicator.hh"
#include "storage/WriteStallWarner.hh"
#include
#include
#include
#include
#include
#include
namespace quarkdb {
class ConsistencyScanner;
class ParanoidManifestChecker;
enum class LeaseAcquisitionStatus {
kKeyTypeMismatch,
kAcquired,
kRenewed,
kFailedDueToOtherOwner
};
class StagingArea;
class StateMachine {
public:
StateMachine(std::string_view filename, bool write_ahead_log = true, bool bulkLoad = false);
virtual ~StateMachine();
DISALLOW_COPY_AND_ASSIGN(StateMachine);
void reset();
using IteratorPtr = std::unique_ptr;
//----------------------------------------------------------------------------
// API for transactional writes - in this case, lastApplied increments once
// per batch, not per individual operation.
//----------------------------------------------------------------------------
rocksdb::Status set(StagingArea &stagingArea, std::string_view key, std::string_view value);
rocksdb::Status del(StagingArea &stagingArea, const ReqIterator &start, const ReqIterator &end, int64_t &removed);
rocksdb::Status flushall(StagingArea &stagingArea);
// hashes
rocksdb::Status hset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated);
rocksdb::Status hmset(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end);
rocksdb::Status hsetnx(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated);
rocksdb::Status hincrby(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view incrby, int64_t &result);
rocksdb::Status hincrbyfloat(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view incrby, double &result);
rocksdb::Status hdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed);
rocksdb::Status hclone(StagingArea &stagingArea, std::string_view source, std::string_view target);
// locality hashes
rocksdb::Status lhset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint, std::string_view value, bool &fieldcreated);
rocksdb::Status lhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed);
rocksdb::Status lhlocdel(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint, int64_t &removed);
rocksdb::Status lhmset(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end);
// sets
rocksdb::Status sadd(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &added);
rocksdb::Status srem(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed);
rocksdb::Status smove(StagingArea &stagingArea, std::string_view source, std::string_view destination, std::string_view element, int64_t &outcome);
// deques
rocksdb::Status dequePushFront(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length);
rocksdb::Status dequePushBack(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length);
rocksdb::Status dequePopFront(StagingArea &stagingArea, std::string_view key, std::string &item);
rocksdb::Status dequePopBack(StagingArea &stagingArea, std::string_view key, std::string &item);
rocksdb::Status dequeTrimFront(StagingArea &stagingArea, std::string_view key, std::string_view maxToKeep, int64_t &itemsRemoved);
// leases
void advanceClock(StagingArea &stagingArea, ClockValue newValue);
LeaseAcquisitionStatus lease_acquire(StagingArea &stagingArea, std::string_view key, std::string_view value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info);
rocksdb::Status lease_release(StagingArea &stagingArea, std::string_view key, ClockValue clockValue);
rocksdb::Status lease_get(StagingArea &stagingArea, std::string_view key, ClockValue clockUpdate, LeaseInfo &info);
// versioned hashes
rocksdb::Status vhset(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view value, uint64_t &version);
rocksdb::Status vhdel(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end, uint64_t &version);
//----------------------------------------------------------------------------
// API for transactional reads. Can be part of a mixed read-write transaction.
//----------------------------------------------------------------------------
void getClock(StagingArea &stagingArea, ClockValue &value);
void getType(StagingArea &stagingArea, std::string_view key, std::string& keyType);
// strings
rocksdb::Status get(StagingArea &stagingArea, std::string_view key, std::string &value);
// generic
rocksdb::Status scan(StagingArea &stagingArea, std::string_view cursor, std::string_view pattern, size_t count, std::string &newcursor, std::vector &results);
rocksdb::Status rawScan(StagingArea &stagingArea, std::string_view key, size_t count, std::vector &elements);
rocksdb::Status keys(StagingArea &stagingArea, std::string_view pattern, std::vector &result);
rocksdb::Status exists(StagingArea &stagingArea, const ReqIterator &start, const ReqIterator &end, int64_t &count);
rocksdb::Status rawScanTombstones(StagingArea &stagingArea, std::string_view seek, size_t count, std::vector &elements);
rocksdb::Status rawScanMaybeTombstones(StagingArea &stagingArea, std::string_view seek, size_t count, std::vector &elements, bool tombstones);
// hashes
rocksdb::Status hget(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string &value);
rocksdb::Status hexists(StagingArea &stagingArea, std::string_view key, std::string_view field);
rocksdb::Status hkeys(StagingArea &stagingArea, std::string_view key, std::vector &keys);
rocksdb::Status hgetall(StagingArea &stagingArea, std::string_view key, std::vector &res);
rocksdb::Status hlen(StagingArea &stagingArea, std::string_view key, size_t &len);
rocksdb::Status hvals(StagingArea &stagingArea, std::string_view key, std::vector &vals);
rocksdb::Status hscan(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newcursor, std::vector &results);
// locality hashes
rocksdb::Status lhget(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint, std::string &value);
rocksdb::Status lhlen(StagingArea &stagingArea, std::string_view key, size_t &len);
rocksdb::Status lhscan(StagingArea &stagingArae, std::string_view key, std::string_view cursor, std::string_view matchloc, size_t count, std::string &newcursor, std::vector &results);
// deque
rocksdb::Status dequeLen(StagingArea &stagingArea, std::string_view key, size_t &len);
rocksdb::Status dequeScanBack(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector &results);
// sets
rocksdb::Status scard(StagingArea &stagingArea, std::string_view key, size_t &count);
rocksdb::Status sscan(StagingArea &stagingArea, std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector &res);
rocksdb::Status smembers(StagingArea &stagingArea, std::string_view key, std::vector &members);
rocksdb::Status sismember(StagingArea &stagingArea, std::string_view key, std::string_view element);
// versioned hashes
rocksdb::Status vhkeys(StagingArea &stagingArea, std::string_view key, std::vector &res, uint64_t &version);
rocksdb::Status vhgetall(StagingArea &stagingArea, std::string_view key, std::vector &res, uint64_t &version);
rocksdb::Status vhlen(StagingArea &stagingArea, std::string_view key, size_t &len);
// misc
struct ExpirationEvent {
ExpirationEvent(std::string_view sv, ClockValue cv) : key(sv), deadline(cv) {}
std::string key;
ClockValue deadline;
};
void lease_get_pending_expiration_events(StagingArea &stagingArea, ClockValue &staticClock, ClockValue &dynamicClock, std::vector &events);
//----------------------------------------------------------------------------
// Simple API
//----------------------------------------------------------------------------
rocksdb::Status hget(std::string_view key, std::string_view field, std::string &value);
rocksdb::Status hexists(std::string_view key, std::string_view field);
rocksdb::Status hkeys(std::string_view key, std::vector &keys);
rocksdb::Status hgetall(std::string_view key, std::vector &res);
rocksdb::Status hset(std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated, LogIndex index = 0);
rocksdb::Status hmset(std::string_view key, const ReqIterator &start, const ReqIterator &end, LogIndex index = 0);
rocksdb::Status hsetnx(std::string_view key, std::string_view field, std::string_view value, bool &fieldcreated, LogIndex index = 0);
rocksdb::Status hincrby(std::string_view key, std::string_view field, std::string_view incrby, int64_t &result, LogIndex index = 0);
rocksdb::Status hincrbyfloat(std::string_view key, std::string_view field, std::string_view incrby, double &result, LogIndex index = 0);
rocksdb::Status hdel(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed, LogIndex index = 0);
rocksdb::Status hlen(std::string_view key, size_t &len);
rocksdb::Status hscan(std::string_view key, std::string_view cursor, size_t count, std::string &newcursor, std::vector &results);
rocksdb::Status hvals(std::string_view key, std::vector &vals);
rocksdb::Status sadd(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &added, LogIndex index = 0);
rocksdb::Status sismember(std::string_view key, std::string_view element);
rocksdb::Status srem(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &removed, LogIndex index = 0);
rocksdb::Status smembers(std::string_view key, std::vector &members);
rocksdb::Status scard(std::string_view key, size_t &count);
rocksdb::Status sscan(std::string_view key, std::string_view cursor, size_t count, std::string &newCursor, std::vector &res);
rocksdb::Status set(std::string_view key, std::string_view value, LogIndex index = 0);
rocksdb::Status get(std::string_view key, std::string &value);
rocksdb::Status del(const ReqIterator &start, const ReqIterator &end, int64_t &removed, LogIndex index = 0);
rocksdb::Status exists(const ReqIterator &start, const ReqIterator &end, int64_t &count);
rocksdb::Status keys(std::string_view pattern, std::vector &result);
rocksdb::Status scan(std::string_view cursor, std::string_view pattern, size_t count, std::string &newcursor, std::vector &results);
rocksdb::Status flushall(LogIndex index = 0);
rocksdb::Status dequePushFront(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length, LogIndex index = 0);
rocksdb::Status dequePushBack(std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length, LogIndex index = 0);
rocksdb::Status dequePopFront(std::string_view key, std::string &item, LogIndex index = 0);
rocksdb::Status dequePopBack(std::string_view key, std::string &item, LogIndex index = 0);
rocksdb::Status dequeTrimFront(std::string_view key, std::string_view maxToKeep, int64_t &itemsRemoved, LogIndex index = 0);
rocksdb::Status dequeLen(std::string_view key, size_t &len);
rocksdb::Status lhset(std::string_view key, std::string_view field, std::string_view hint, std::string_view value, bool &fieldcreated, LogIndex index = 0);
rocksdb::Status lhlen(std::string_view key, size_t &len);
rocksdb::Status lhget(std::string_view key, std::string_view field, std::string_view hint, std::string &value);
void advanceClock(ClockValue newValue, LogIndex index = 0);
void getClock(ClockValue &value);
rocksdb::Status rawGetAllVersions(std::string_view key, std::vector &versions);
LeaseAcquisitionStatus lease_acquire(std::string_view key, std::string_view value, ClockValue clockUpdate, uint64_t duration, LeaseInfo &info, LogIndex index = 0);
rocksdb::Status lease_release(std::string_view key, ClockValue clockUpdate, LogIndex index = 0);
rocksdb::Status lease_get(std::string_view key, ClockValue clockUpdate, LeaseInfo &info, LogIndex index = 0);
rocksdb::Status vhset(std::string_view key, std::string_view field, std::string_view value, uint64_t &version, LogIndex index);
rocksdb::Status vhgetall(std::string_view key, std::vector &res, uint64_t &version);
rocksdb::Status rawScanTombstones(std::string_view seek, size_t count, std::vector &keys);
rocksdb::Status artificiallySlowWriteNeverUseThis(StagingArea &stagingArea, std::string_view sleepTime);
//----------------------------------------------------------------------------
// Internal configuration, not exposed to users through 'KEYS' and friends.
// Has completely its own key namespace and does not interact in any way
// with the other redis commands.
//----------------------------------------------------------------------------
rocksdb::Status configGet(std::string_view key, std::string &value);
rocksdb::Status configGet(StagingArea &stagingArea, std::string_view key, std::string &value);
rocksdb::Status configSet(std::string_view key, std::string_view value, LogIndex index = 0);
rocksdb::Status configSet(StagingArea &stagingArea, std::string_view key, std::string_view value);
rocksdb::Status configGetall(std::vector &res);
rocksdb::Status configGetall(StagingArea &stagingArea, std::vector &res);
rocksdb::Status noop(LogIndex index);
LogIndex getLastApplied();
//----------------------------------------------------------------------------
// Checkpoint for online backups
//----------------------------------------------------------------------------
rocksdb::Status checkpoint(const std::string &path);
//----------------------------------------------------------------------------
// Get statistics
//----------------------------------------------------------------------------
std::string statistics();
//----------------------------------------------------------------------------
// Get level stats
//----------------------------------------------------------------------------
std::string levelStats();
//----------------------------------------------------------------------------
// Get compression stats
//----------------------------------------------------------------------------
std::vector compressionStats();
//----------------------------------------------------------------------------
// Get command stats
//----------------------------------------------------------------------------
Statistics commandStats();
bool inBulkLoad() const {
return bulkLoad;
}
//----------------------------------------------------------------------------
// Return health information about the state machine
//----------------------------------------------------------------------------
std::vector getHealthIndicators();
rocksdb::Status manualCompaction();
void finalizeBulkload();
IteratorPtr getRawIterator();
void commitBatch(rocksdb::WriteBatch &batch);
bool waitUntilTargetLastApplied(LogIndex targetLastApplied, std::chrono::milliseconds duration);
rocksdb::Status verifyChecksum();
RequestCounter& getRequestCounter() { return requestCounter; }
ClockValue getDynamicClock();
void hardSynchronizeDynamicClock();
//----------------------------------------------------------------------------
// Extremely dangerous operation, the state-machine should NOT be part
// of an active raft-machinery when this function is called, or even facing
// client traffic at all.
//----------------------------------------------------------------------------
void forceResetLastApplied(LogIndex newLastApplied);
//----------------------------------------------------------------------------
// Get underlying folder where this SM resides
//----------------------------------------------------------------------------
std::string getPhysicalLocation() const;
private:
ClockValue maybeAdvanceClock(StagingArea &stagingArea, ClockValue newValue);
friend class StagingArea;
class Snapshot {
public:
Snapshot(rocksdb::DB *db);
~Snapshot();
rocksdb::ReadOptions& opts();
private:
rocksdb::DB *db;
const rocksdb::Snapshot *snapshot;
rocksdb::ReadOptions options;
DISALLOW_COPY_AND_ASSIGN(Snapshot);
};
void commitTransaction(rocksdb::WriteBatchWithIndex &wb, LogIndex index);
bool assertKeyType(StagingArea &stagingArea, std::string_view key, KeyType keytype);
rocksdb::Status dequePop(StagingArea &stagingArea, Direction direction, std::string_view key, std::string &item);
rocksdb::Status dequePush(StagingArea &stagingArea, Direction direction, std::string_view key, const ReqIterator &start, const ReqIterator &end, int64_t &length);
class WriteOperation {
public:
WriteOperation(StagingArea &stagingArea, std::string_view key, const KeyType &type);
~WriteOperation();
bool valid();
bool keyExists();
bool getField(std::string_view field, std::string &out);
bool getLocalityIndex(std::string_view field, std::string &out);
void cancel();
bool descriptorModifiedAlreadyInWriteBatch();
int64_t keySize();
void assertWritable();
void write(std::string_view value);
void writeField(std::string_view field, std::string_view value);
void writeLocalityField(std::string_view hint, std::string_view field, std::string_view value);
void writeLocalityIndex(std::string_view field, std::string_view hint);
bool fieldExists(std::string_view field);
bool localityFieldExists(std::string_view hint, std::string_view field);
bool deleteField(std::string_view field);
bool deleteLocalityField(std::string_view hint, std::string_view field);
bool getAndDeleteLocalityIndex(std::string_view field, std::string &hint);
rocksdb::Status finalize(int64_t newsize, bool forceUpdate = false);
KeyDescriptor& descriptor() {
return keyinfo;
}
private:
StagingArea &stagingArea;
std::string_view redisKey;
KeyType expectedType;
KeyDescriptor keyinfo;
DescriptorLocator dlocator;
bool redisKeyExists;
bool isValid = false;
bool finalized = false;
};
friend class WriteOperation;
KeyDescriptor getKeyDescriptor(StagingArea &stagingArea, std::string_view redisKey);
KeyDescriptor lockKeyDescriptor(StagingArea &stagingArea, DescriptorLocator &dlocator);
void retrieveLastApplied();
void ensureCompatibleFormat(bool justCreated);
void ensureBulkloadSanity(bool justCreated);
void ensureClockSanity(bool justCreated);
void remove_all_with_prefix(std::string_view prefix, int64_t &removed, StagingArea &stagingArea);
void lhsetInternal(WriteOperation &operation, std::string_view key, std::string_view field, std::string_view hint, std::string_view value, bool &fieldcreated);
std::atomic lastApplied;
std::condition_variable lastAppliedCV;
std::mutex lastAppliedMtx;
std::mutex writeMtx;
std::unique_ptr db;
std::unique_ptr manifestChecker;
std::unique_ptr consistencyScanner;
const std::string filename;
bool writeAheadLog;
bool bulkLoad;
Timekeeper timeKeeper;
RequestCounter requestCounter;
std::shared_ptr writeStallWarner;
ExpirationEventCache mExpirationCache;
std::recursive_mutex mExpirationCacheMutex;
void loadExpirationCache();
//----------------------------------------------------------------------------
// Return health information regarding free space
//----------------------------------------------------------------------------
HealthIndicator getFreeSpaceHealth();
};
}
#endif