/************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ //------------------------------------------------------------------------------ //! @author Georgios Bitzes //! @brief Inspect / change locality hashes //------------------------------------------------------------------------------ #include "qclient/structures/QLocalityHash.hh" #include "qclient/QClient.hh" #define SSTR(message) static_cast(std::ostringstream().flush() << message).str() #define DBG(message) std::cerr << __FILE__ << ":" << __LINE__ << " -- " << #message << " = " << message << std::endl namespace qclient { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ QLocalityHash::Iterator::Iterator(QClient *qcl, const std::string &key, size_t count, const std::string &startCursor) : mQcl(qcl), mKey(key), mCount(count), mCursor(startCursor) { fillFromBackend(); } //------------------------------------------------------------------------------ // Check if an unexpected error occurred (network issue, data type mismatch) //------------------------------------------------------------------------------ bool QLocalityHash::Iterator::hasError(std::string &err) const { if(mError.empty()) { return false; } err = mError; return true; } //------------------------------------------------------------------------------ // Report a malformed response //------------------------------------------------------------------------------ void QLocalityHash::Iterator::malformed(redisReplyPtr reply) { mError = SSTR("malformed server response to LHSCAN: " << qclient::describeRedisReply(reply)); return; } //------------------------------------------------------------------------------ // Fill internal buffer with contents from remote server //------------------------------------------------------------------------------ void QLocalityHash::Iterator::fillFromBackend() { while(mError.empty() && mResults.empty() && !mReachedEnd) { mReqs++; redisReplyPtr reply = mQcl->exec("LHSCAN", mKey, mCursor, "COUNT", std::to_string(mCount)).get(); if(!reply) { mError = "unable to contact backend - network error"; return; } if(reply->type != REDIS_REPLY_ARRAY || reply->elements != 2) { return malformed(reply); } redisReply *nextCursor = reply->element[0]; if(!nextCursor || nextCursor->type != REDIS_REPLY_STRING) { return malformed(reply); } mCursor = std::string(nextCursor->str, nextCursor->len); if(mCursor == "0") mReachedEnd = true; redisReply *subArray = reply->element[1]; if(!subArray || subArray->type != REDIS_REPLY_ARRAY || (subArray->elements % 3) != 0) { return malformed(reply); } for(size_t i = 0; i < subArray->elements; i++) { redisReply *item = subArray->element[i]; if(!item || item->type != REDIS_REPLY_STRING) { return malformed(reply); } mResults.emplace_back(item->str, item->len); } } } //------------------------------------------------------------------------------ // Check if the iterator points to a valid entry. // getKey / getLocalityHint / getValue are possible to call only if // valid() == true //------------------------------------------------------------------------------ bool QLocalityHash::Iterator::valid() const { return mError.empty() && !mResults.empty(); } //------------------------------------------------------------------------------ // Fetch current element being pointed to //------------------------------------------------------------------------------ std::string QLocalityHash::Iterator::getKey() const { return mResults[1]; } std::string QLocalityHash::Iterator::getLocalityHint() const { return mResults[0]; } std::string QLocalityHash::Iterator::getValue() const { return mResults[2]; } //------------------------------------------------------------------------------ // Get total number of network requests this object has issued so far //------------------------------------------------------------------------------ size_t QLocalityHash::Iterator::requestsSoFar() const { return mReqs; } //------------------------------------------------------------------------------ // Advance iterator - may result in network requests and block //------------------------------------------------------------------------------ void QLocalityHash::Iterator::next() { if(!mResults.empty()) { mResults.erase(mResults.begin(), mResults.begin()+3); } fillFromBackend(); } }