/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
//------------------------------------------------------------------------------
// @author Elvin-Alin Sindrilaru
// @brief User quota accounting
//------------------------------------------------------------------------------
#include "namespace/ns_quarkdb/accounting/QuotaStats.hh"
#include "namespace/ns_quarkdb/flusher/MetadataFlusher.hh"
#include "namespace/ns_quarkdb/QdbContactDetails.hh"
#include "namespace/ns_quarkdb/ConfigurationParser.hh"
#include "namespace/ns_quarkdb/Constants.hh"
#include "qclient/structures/QScanner.hh"
#include "qclient/structures/QHash.hh"
#include "common/StringTokenizer.hh"
EOSNSNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// *** Class QuotaNode implementaion ***
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
QuarkQuotaNode::QuarkQuotaNode(IQuotaStats* quota_stats,
IContainerMD::id_t node_id)
: IQuotaNode(quota_stats, node_id)
{
std::string snode_id = std::to_string(node_id);
pQcl = static_cast(quota_stats)->pQcl;
pFlusher = static_cast(quota_stats)->pFlusher;
pQuotaUidKey = QuarkQuotaStats::KeyQuotaUidMap(snode_id);
pQuotaGidKey = QuarkQuotaStats::KeyQuotaGidMap(snode_id);
}
//------------------------------------------------------------------------------
// Account a new file, adjust the size using the size mapping function
//------------------------------------------------------------------------------
void
QuarkQuotaNode::addFile(const IFileMD* file)
{
const std::string suid = std::to_string(file->getCUid());
const std::string sgid = std::to_string(file->getCGid());
const int64_t size = pQuotaStats->getPhysicalSize(file);
const std::string physicalSize = std::to_string(size);
const std::string logicalSize = std::to_string(file->getSize());
pFlusher->exec("HINCRBYMULTI",
pQuotaUidKey, suid + quota::sPhysicalSize, physicalSize,
pQuotaGidKey, sgid + quota::sPhysicalSize, physicalSize,
pQuotaUidKey, suid + quota::sLogicalSize, logicalSize,
pQuotaGidKey, sgid + quota::sLogicalSize, logicalSize,
pQuotaUidKey, suid + quota::sNumFiles, "1",
pQuotaGidKey, sgid + quota::sNumFiles, "1"
);
// Update the cached information
pCore.addFile(
file->getCUid(),
file->getCGid(),
file->getSize(),
size
);
}
//------------------------------------------------------------------------------
// Remove a file, adjust the size using the size mapping function
//------------------------------------------------------------------------------
void
QuarkQuotaNode::removeFile(const IFileMD* file)
{
const std::string suid = std::to_string(file->getCUid());
const std::string sgid = std::to_string(file->getCGid());
const int64_t size = pQuotaStats->getPhysicalSize(file);
const int64_t logicalSizeInt = file->getSize();
const std::string minusPhysicalSize = std::to_string(-size);
const std::string minusLogicalSize = std::to_string(-logicalSizeInt);
pFlusher->exec("HINCRBYMULTI",
pQuotaUidKey, suid + quota::sPhysicalSize, minusPhysicalSize,
pQuotaGidKey, sgid + quota::sPhysicalSize, minusPhysicalSize,
pQuotaUidKey, suid + quota::sLogicalSize, minusLogicalSize,
pQuotaGidKey, sgid + quota::sLogicalSize, minusLogicalSize,
pQuotaUidKey, suid + quota::sNumFiles, "-1",
pQuotaGidKey, sgid + quota::sNumFiles, "-1"
);
// Update the cached information
pCore.removeFile(
file->getCUid(),
file->getCGid(),
file->getSize(),
size
);
}
//------------------------------------------------------------------------------
// Meld in another quota node
//------------------------------------------------------------------------------
void
QuarkQuotaNode::meld(const IQuotaNode* node)
{
const QuarkQuotaNode* impl_node = static_cast(node);
// Meld in the uid map info
qclient::QHash hmap(*pQcl,
QuarkQuotaStats::KeyQuotaUidMap(std::to_string(impl_node->getId())));
std::pair> reply;
std::string cursor = "0";
constexpr int64_t count = 2000000;
do {
reply = hmap.hscan(cursor, count);
cursor = reply.first;
for (const auto& elem : reply.second) {
pFlusher->hincrby(pQuotaUidKey, elem.first, std::stoll(elem.second));
}
} while (cursor != "0");
// Meld in the gid map info
hmap.setKey(QuarkQuotaStats::KeyQuotaGidMap(std::to_string(
impl_node->getId())));
cursor = "0";
do {
reply = hmap.hscan(cursor, count);
cursor = reply.first;
for (const auto& elem : reply.second) {
pFlusher->hincrby(pQuotaGidKey, elem.first, std::stoll(elem.second));
}
} while (cursor != "0");
// Update the cached information
pCore.meld(node->getCore());
}
//------------------------------------------------------------------------------
// Update with information from the backend
//------------------------------------------------------------------------------
void QuarkQuotaNode::updateFromBackend()
{
std::string cursor = "0";
constexpr int64_t count = 2000000;
std::pair> reply;
qclient::QHash uid_map(*pQcl, pQuotaUidKey);
qclient::QHash gid_map(*pQcl, pQuotaGidKey);
std::set to_delete;
do {
reply = uid_map.hscan(cursor, count);
cursor = reply.first;
for (const auto& elem : reply.second) {
size_t pos = elem.first.find(':');
uint64_t uid = std::stoull(elem.first.substr(0, pos));
std::string type = elem.first.substr(pos + 1);
auto it_uid = pCore.mUserInfo.find(uid);
if (it_uid == pCore.mUserInfo.end()) {
auto pair = pCore.mUserInfo.emplace(uid, QuotaNodeCore::UsageInfo());
it_uid = pair.first;
}
QuotaNodeCore::UsageInfo& uinfo = it_uid->second;
if (type == "logical_size") {
uinfo.space = std::stoull(elem.second);
} else if (type == "physical_size") {
uinfo.physicalSpace = std::stoull(elem.second);
} else if (type == "files") {
uinfo.files = std::stoull(elem.second);
}
// If nothing is used we can drop the entry from the map
if ((uinfo.space == 0ull) && (uinfo.physicalSpace == 0ull) &&
(uinfo.files == 0ull)) {
to_delete.insert(elem.first);
pCore.mUserInfo.erase(it_uid);
}
}
} while (cursor != "0");
for (const auto& key_del : to_delete) {
uid_map.hdel(key_del);
}
to_delete.clear();
cursor = "0";
do {
reply = gid_map.hscan(cursor, count);
cursor = reply.first;
for (const auto& elem : reply.second) {
size_t pos = elem.first.find(':');
uint64_t uid = std::stoull(elem.first.substr(0, pos));
std::string type = elem.first.substr(pos + 1);
auto it_gid = pCore.mGroupInfo.find(uid);
if (it_gid == pCore.mGroupInfo.end()) {
auto pair = pCore.mGroupInfo.emplace(uid, QuotaNodeCore::UsageInfo());
it_gid = pair.first;
}
QuotaNodeCore::UsageInfo& ginfo = it_gid->second;
if (type == "logical_size") {
ginfo.space = std::stoull(elem.second);
} else if (type == "physical_size") {
ginfo.physicalSpace = std::stoull(elem.second);
} else if (type == "files") {
ginfo.files = std::stoull(elem.second);
}
// If nothing is used we can drop the entry from the map
if ((ginfo.space == 0ull) && (ginfo.physicalSpace == 0ull) &&
(ginfo.files == 0ull)) {
to_delete.insert(elem.first);
pCore.mGroupInfo.erase(it_gid);
}
}
} while (cursor != "0");
for (const auto& key_del : to_delete) {
gid_map.hdel(key_del);
}
}
//------------------------------------------------------------------------------
// Replace underlying QuotaNodeCore object.
//------------------------------------------------------------------------------
void
QuarkQuotaNode::replaceCore(const QuotaNodeCore& updated)
{
pCore = updated;
pFlusher->exec("DEL", pQuotaUidKey);
pFlusher->exec("DEL", pQuotaGidKey);
for (auto it = pCore.mUserInfo.begin(); it != pCore.mUserInfo.end(); it++) {
std::string suid = std::to_string(it->first);
pFlusher->exec("HSET", pQuotaUidKey,
suid + quota::sPhysicalSize,
std::to_string(it->second.physicalSpace)
);
pFlusher->exec("HSET", pQuotaUidKey,
suid + quota::sLogicalSize,
std::to_string(it->second.space)
);
pFlusher->exec("HSET", pQuotaUidKey,
suid + quota::sNumFiles,
std::to_string(it->second.files)
);
}
for (auto it = pCore.mGroupInfo.begin(); it != pCore.mGroupInfo.end(); it++) {
std::string sgid = std::to_string(it->first);
pFlusher->exec("HSET", pQuotaGidKey,
sgid + quota::sPhysicalSize,
std::to_string(it->second.physicalSpace)
);
pFlusher->exec("HSET", pQuotaGidKey,
sgid + quota::sLogicalSize,
std::to_string(it->second.space)
);
pFlusher->exec("HSET", pQuotaGidKey,
sgid + quota::sNumFiles,
std::to_string(it->second.files)
);
}
}
//------------------------------------------------------------------------------
// Update underlying QuotaNodeCore object.
//------------------------------------------------------------------------------
void
QuarkQuotaNode::updateCore(const QuotaNodeCore& updated)
{
// replace all existing entries from updated and flush them
pCore << updated;
for (auto it = updated.mUserInfo.begin(); it != updated.mUserInfo.end(); it++) {
std::string suid = std::to_string(it->first);
pFlusher->exec("HSET", pQuotaUidKey,
suid + quota::sPhysicalSize,
std::to_string(it->second.physicalSpace)
);
pFlusher->exec("HSET", pQuotaUidKey,
suid + quota::sLogicalSize,
std::to_string(it->second.space)
);
pFlusher->exec("HSET", pQuotaUidKey,
suid + quota::sNumFiles,
std::to_string(it->second.files)
);
}
for (auto it = updated.mGroupInfo.begin(); it != updated.mGroupInfo.end(); it++) {
std::string sgid = std::to_string(it->first);
pFlusher->exec("HSET", pQuotaGidKey,
sgid + quota::sPhysicalSize,
std::to_string(it->second.physicalSpace)
);
pFlusher->exec("HSET", pQuotaGidKey,
sgid + quota::sLogicalSize,
std::to_string(it->second.space)
);
pFlusher->exec("HSET", pQuotaGidKey,
sgid + quota::sNumFiles,
std::to_string(it->second.files)
);
}
}
//------------------------------------------------------------------------------
// *** Class QuotaStats implementaion ***
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
QuarkQuotaStats::QuarkQuotaStats(qclient::QClient* qcl,
MetadataFlusher* flusher):
pQcl(qcl), pFlusher(flusher) {}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
QuarkQuotaStats::~QuarkQuotaStats()
{
pNodeMap.clear();
}
//------------------------------------------------------------------------------
// Configure the quota service
//------------------------------------------------------------------------------
void
QuarkQuotaStats::configure(const std::map& config)
{
// Nothing to do, dependencies are passed through the constructor
}
//------------------------------------------------------------------------------
// Get a quota node associated to the container id
//------------------------------------------------------------------------------
IQuotaNode*
QuarkQuotaStats::getQuotaNode(IContainerMD::id_t node_id)
{
auto it = pNodeMap.find(node_id);
if (it != pNodeMap.end()) {
return it->second.get();
}
std::string snode_id = std::to_string(node_id);
if ((pQcl->exists(KeyQuotaUidMap(snode_id)) == 1) ||
(pQcl->exists(KeyQuotaGidMap(snode_id)) == 1)) {
QuarkQuotaNode* ptr = new QuarkQuotaNode(this, node_id);
ptr->updateFromBackend();
pNodeMap[node_id].reset(ptr);
return ptr;
}
return nullptr;
}
//------------------------------------------------------------------------------
// Register a new quota node
//------------------------------------------------------------------------------
IQuotaNode*
QuarkQuotaStats::registerNewNode(IContainerMD::id_t node_id)
{
std::string snode_id = std::to_string(node_id);
if (pNodeMap.count(node_id) ||
(pQcl->exists(KeyQuotaUidMap(snode_id)) == 1) ||
(pQcl->exists(KeyQuotaGidMap(snode_id)) == 1)) {
MDException e;
e.getMessage() << "Quota node already exist: " << snode_id;
throw e;
}
IQuotaNode* ptr = new QuarkQuotaNode(this, node_id);
pNodeMap[node_id].reset(ptr);
return ptr;
}
//------------------------------------------------------------------------------
// Remove quota node
//------------------------------------------------------------------------------
void
QuarkQuotaStats::removeNode(IContainerMD::id_t node_id)
{
auto it = pNodeMap.find(node_id);
if (it != pNodeMap.end()) {
pNodeMap.erase(it);
}
std::string snode_id = std::to_string(node_id);
pFlusher->del(KeyQuotaUidMap(snode_id));
pFlusher->del(KeyQuotaGidMap(snode_id));
}
//------------------------------------------------------------------------------
// Get the set of all quota node ids. The quota node id corresponds to the
// container id.
//------------------------------------------------------------------------------
std::unordered_set
QuarkQuotaStats::getAllIds()
{
std::unordered_set quota_ids;
qclient::QScanner quota_set(*pQcl, quota::sPrefix + "*:*");
for (; quota_set.valid(); quota_set.next()) {
// Extract quota node id
IContainerMD::id_t id = 0;
if (ParseQuotaId(quota_set.getValue(), id)) {
quota_ids.insert(id);
}
}
return quota_ids;
}
//------------------------------------------------------------------------------
// Get quota node uid map key
//------------------------------------------------------------------------------
std::string
QuarkQuotaStats::KeyQuotaUidMap(const std::string& sid)
{
return quota::sPrefix + sid + ":" + quota::sUidsSuffix;
}
//------------------------------------------------------------------------------
// Get quota node gid map key
//------------------------------------------------------------------------------
std::string
QuarkQuotaStats::KeyQuotaGidMap(const std::string& sid)
{
return quota::sPrefix + sid + ":" + quota::sGidsSuffix;
}
//------------------------------------------------------------------------------
// Parse quota id from string
//------------------------------------------------------------------------------
bool
QuarkQuotaStats::ParseQuotaId(const std::string& input, IContainerMD::id_t& id)
{
std::vector parts =
eos::common::StringTokenizer::split< std::vector >(input, ':');
if (parts.size() != 3) {
return false;
}
if (parts[0] + ":" != quota::sPrefix) {
return false;
}
if ((parts[2] != quota::sUidsSuffix) && (parts[2] != quota::sGidsSuffix)) {
return false;
}
id = std::stoull(parts[1]);
return true;
}
EOSNSNAMESPACE_END