// ----------------------------------------------------------------------
// File: FileSystem.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mgm/FileSystem.hh"
#include "mq/MessagingRealm.hh"
#include "mq/FsChangeListener.hh"
#include "mgm/FsView.hh"
#include "mgm/XrdMgmOfs.hh"
#include "qclient/shared/SharedHashSubscription.hh"
EOSMGMNAMESPACE_BEGIN
const std::string FileSystem::sNumBalanceTxTag = "local.balancer.running";
const std::string FileSystem::sGeotagTag = "stat.geotag";
const std::string FileSystem::sErrcTag = "stat.errc";
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
FileSystem::FileSystem(const common::FileSystemLocator& locator,
mq::MessagingRealm* msr) :
eos::common::FileSystem(locator, msr)
{
eos_static_info("msg=\"create file system\" queue_path=%s",
locator.getQueuePath().c_str());
if (mRealm->haveQDB()) {
// Register with FsChangeListeners interested in key updates related
// to this file system object
RegisterWithExistingListeners();
// Subscribe to the underlying SharedHash object to get updates
mSubscription = mq::SharedHashWrapper(mRealm, mHashLocator).subscribe();
if (mSubscription) {
using namespace std::placeholders;
mSubscription->attachCallback(std::bind(&FileSystem::ProcessUpdateCb,
this, _1));
}
}
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
FileSystem::~FileSystem()
{
// Make sure we wait for any ongoing callbacks
if (mSubscription) {
mSubscription->detachCallback();
}
UnregisterFromListeners();
}
//------------------------------------------------------------------------------
// Register with interested listeners - called when a new object is created
// and there are already existing FS listeners in the system
//------------------------------------------------------------------------------
void
FileSystem::RegisterWithExistingListeners()
{
auto map_interests = mRealm->GetInterestedListeners(mLocator.getQueuePath());
for (auto& elem : map_interests) {
auto& fs_listener = elem.first;
const auto& set_interests = elem.second;
eos_static_info("msg=\"register with existing fs listener\" listener=%s "
"fs_queue_path=%s", fs_listener->GetName().c_str(),
mLocator.getQueuePath().c_str());
eos::common::RWMutexWriteLock wr_lock(mRWMutex);
for (const auto& interest : set_interests) {
mMapListeners[interest].insert(fs_listener);
}
}
}
//------------------------------------------------------------------------------
// Unregister from all the listeners
//------------------------------------------------------------------------------
void
FileSystem::UnregisterFromListeners()
{
eos::common::RWMutexWriteLock wr_lock(mRWMutex);
for (const auto& elem : mMapListeners) {
for (auto& listener : elem.second) {
eos_static_info("msg=\"unsubscribe and detach from listener\" "
"listener_name=%s fs_queue_path=%s",
listener->GetName().c_str(),
mLocator.getQueuePath().c_str());
listener->unsubscribe(mLocator.getQueuePath(), {elem.first});
}
}
mMapListeners.clear();
}
//------------------------------------------------------------------------------
// Attach file system change listener
//------------------------------------------------------------------------------
bool
FileSystem::AttachFsListener(std::shared_ptr
fs_listener,
const std::set& interests)
{
if ((fs_listener == nullptr) || interests.empty()) {
return false;
}
eos_static_info("msg=\"attaching fs listener\" listener_name=%s "
"fs_queue_path=%s", fs_listener->GetName().c_str(),
mLocator.getQueuePath().c_str());
// Update the listener
fs_listener->subscribe(mLocator.getQueuePath(), interests);
eos::common::RWMutexWriteLock wr_lock(mRWMutex);
for (const auto& interest : interests) {
mMapListeners[interest].insert(fs_listener);
}
return true;
}
//------------------------------------------------------------------------------
// Detach file system change listener
//------------------------------------------------------------------------------
bool
FileSystem::DetachFsListener(std::shared_ptr
fs_listener,
const std::set& interests)
{
if ((fs_listener == nullptr) || interests.empty()) {
return false;
}
eos_static_info("msg=\"detaching fs listener\" listener_name=%s "
"fs_queue_path=%s", fs_listener->GetName().c_str(),
mLocator.getQueuePath().c_str());
// Update the listener
(void) fs_listener->unsubscribe(mLocator.getQueuePath(), interests);
eos::common::RWMutexWriteLock wr_lock(mRWMutex);
for (const auto& interest : interests) {
auto it = mMapListeners.find(interest);
// Erase listener
if (it != mMapListeners.end()) {
it->second.erase(fs_listener);
}
}
return true;
}
//------------------------------------------------------------------------------
// Notify file system change listener interested in the given update
//------------------------------------------------------------------------------
void
FileSystem::NotifyFsListener(qclient::SharedHashUpdate&& upd)
{
eos::common::RWMutexReadLock rd_lock(mRWMutex);
auto it = mMapListeners.find(upd.key);
if (it != mMapListeners.end()) {
eos::mq::FsChangeListener::Event event;
event.fileSystemQueue = GetQueuePath();
event.key = upd.key;
event.deletion = upd.value.empty();
for (auto& listener : it->second) {
listener->NotifyEvent(event);
}
}
}
//------------------------------------------------------------------------------
// Process shared hash update
//------------------------------------------------------------------------------
void
FileSystem::ProcessUpdateCb(qclient::SharedHashUpdate&& upd)
{
NotifyFsListener(std::move(upd));
}
//----------------------------------------------------------------------------
// Set the configuration status of a file system
//----------------------------------------------------------------------------
bool
FileSystem::SetConfigStatus(eos::common::ConfigStatus new_status)
{
using eos::mgm::FsView;
using eos::common::DrainStatus;
eos::common::ConfigStatus old_status = GetConfigStatus();
int drain_tx = IsDrainTransition(old_status, new_status);
// Only master drains
if (ShouldBroadCast()) {
std::string out_msg;
if (drain_tx > 0) {
if (!gOFS->mDrainEngine.StartFsDrain(this, 0, out_msg)) {
eos_static_err("%s", out_msg.c_str());
return false;
}
} else {
if (!gOFS->mDrainEngine.StopFsDrain(this, out_msg)) {
eos_static_debug("%s", out_msg.c_str());
// Drain already stopped make sure we also update the drain status
// if this was a finished drain ie. has status drained or failed
DrainStatus st = GetDrainStatus();
if ((st == DrainStatus::kDrained) ||
(st == DrainStatus::kDrainFailed) ||
(st == DrainStatus::kDrainExpired)) {
SetDrainStatus(eos::common::DrainStatus::kNoDrain);
}
}
}
}
std::string val = eos::common::FileSystem::GetConfigStatusAsString(new_status);
return eos::common::FileSystem::SetString("configstatus", val.c_str());
}
//------------------------------------------------------------------------------
// Set a 'key' describing the filesystem
//------------------------------------------------------------------------------
bool
FileSystem::SetString(const char* key, const char* str, bool broadcast)
{
std::string skey = key;
if (skey == "configstatus") {
return SetConfigStatus(GetConfigStatusFromString(str));
}
return eos::common::FileSystem::SetString(key, str, broadcast);
}
//------------------------------------------------------------------------------
// Check if this is a drain transition i.e. enables or disabled draining
//------------------------------------------------------------------------------
int
FileSystem::IsDrainTransition(const eos::common::ConfigStatus old,
const eos::common::ConfigStatus status)
{
using namespace eos::common;
// Enable draining
if (((old != ConfigStatus::kDrain) &&
(old != ConfigStatus::kDrainDead) &&
((status == ConfigStatus::kDrain) ||
(status == ConfigStatus::kDrainDead))) ||
(((old == ConfigStatus::kDrain) ||
(old == ConfigStatus::kDrainDead)) &&
(status == old))) {
return 1;
}
// Stop draining
if (((old == common::ConfigStatus::kDrain) ||
(old == common::ConfigStatus::kDrainDead)) &&
((status != common::ConfigStatus::kDrain) &&
(status != common::ConfigStatus::kDrainDead))) {
return -1;
}
// Not a drain transition
return 0;
}
//------------------------------------------------------------------------------
// Get the current broadcasting setting
//------------------------------------------------------------------------------
bool
FileSystem::ShouldBroadCast()
{
if (mRealm) {
if (mRealm->getSom()) {
return mRealm->getSom()->ShouldBroadCast();
} else {
// @note (esindril) to review when active-passive is actually enabled
return true;
}
} else {
return false;
}
}
//------------------------------------------------------------------------------
// Increment number of running balancing transfers
//------------------------------------------------------------------------------
void
FileSystem::IncrementBalanceTx()
{
++mNumBalanceTx;
SetLongLongLocal(sNumBalanceTxTag, (int64_t)mNumBalanceTx);
}
//------------------------------------------------------------------------------
// Decrement number of running balancing transfers
//------------------------------------------------------------------------------
void
FileSystem::DecrementBalanceTx()
{
--mNumBalanceTx;
SetLongLongLocal(sNumBalanceTxTag, (int64_t)mNumBalanceTx);
}
EOSMGMNAMESPACE_END