// ---------------------------------------------------------------------- // File: ZMQ.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include #include "common/Logging.hh" #include #include "mgm/fusex.pb.h" #include "mgm/FuseServer/Server.hh" #include "mgm/ZMQ.hh" EOSMGMNAMESPACE_BEGIN //int ZMQ::Task::sMaxThreads = 16; FuseServer::Server ZMQ::gFuseServer; //------------------------------------------------------------------------------ // Start thread handling fuse server proxying //------------------------------------------------------------------------------ void ZMQ::ServeFuse() { mTask.reset(new Task(mBindUrl)); std::thread t1(&Task::run, mTask.get()); t1.detach(); } //------------------------------------------------------------------------------ // Task destructor //------------------------------------------------------------------------------ ZMQ::Task::~Task() { // Closing the ZMQ context will cause an execption to be thrown in the worker // thread with ETERM as the error number mZmqCtx.close(); for (const auto& th : mWorkerThreads) { delete th; } mWorkerThreads.clear(); } //------------------------------------------------------------------------------ // Start proxy service //------------------------------------------------------------------------------ void ZMQ::Task::run() noexcept { int enable_ipv6 = 1; mFrontend.set(zmq::sockopt::ipv6, enable_ipv6); { // set keepalive options int32_t keep_alive = 1; int32_t keep_alive_idle = 30; int32_t keep_alive_cnt = 2; int32_t keep_alive_intvl = 30; mFrontend.set(zmq::sockopt::tcp_keepalive, keep_alive); mFrontend.set(zmq::sockopt::tcp_keepalive_idle,keep_alive_idle); mFrontend.set(zmq::sockopt::tcp_keepalive_cnt, keep_alive_cnt); mFrontend.set(zmq::sockopt::tcp_keepalive_intvl, keep_alive_intvl); } mFrontend.bind(mBindUrl.c_str()); mBackend.bind("inproc://backend"); mInjector.connect("inproc://backend"); for (int i = 0; i < sMaxThreads; ++i) { mWorkerThreads.push_back(new std::thread(&Worker::work, new Worker(mZmqCtx, ZMQ_DEALER))); mWorkerThreads.back()->detach(); } try { zmq::proxy(mFrontend, mBackend); } catch (const zmq::error_t& e) { if (e.num() == ETERM) { // Shutdown return; } } } //------------------------------------------------------------------------------ // Reply to a client identifier with a piece of data //------------------------------------------------------------------------------ void ZMQ::Task::reply(const std::string& id, const std::string& data) { static XrdSysMutex sMutex; XrdSysMutexHelper lLock(sMutex); zmq::message_t id_msg(id.c_str(), id.size()); zmq::message_t data_msg(data.c_str(), data.size()); zmq::send_flags sfm = zmq::send_flags::sndmore; zmq::send_flags sf = zmq::send_flags::none; try { mInjector.send(id_msg, sfm); mInjector.send(data_msg, sf); } catch (const zmq::error_t& e) { if (e.num() == ETERM) { return; } } } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ void ZMQ::Worker::work() { worker_.connect("inproc://backend"); eos::fusex::container hb; try { while (true) { zmq::message_t identity; zmq::message_t msg; zmq::message_t copied_id; zmq::message_t copied_msg; auto rc = worker_.recv(identity); auto more = worker_.get(zmq::sockopt::rcvmore); if (!more) { eos_static_warning("discarding illegal message"); continue; } rc = worker_.recv(msg); std::string id(static_cast(identity.data()), identity.size()); std::string s(static_cast(msg.data()), msg.size()); hb.Clear(); if (hb.ParseFromString(s)) { switch (hb.type()) { case eos::fusex::container::HEARTBEAT: { struct timespec tsnow {}; eos::common::Timing::GetTimeSpec(tsnow); hb.mutable_heartbeat_()->set_delta(tsnow.tv_sec - hb.heartbeat_().clock() + (((int64_t) tsnow.tv_nsec - (int64_t) hb.heartbeat_().clock_ns()) * 1.0 / 1000000000.0)); if (gFuseServer.Client().Dispatch(id, *(hb.mutable_heartbeat_()))) { if (EOS_LOGS_DEBUG) { eos_static_debug("msg=\"received new heartbeat\" identity=%s type=%d", (id.length() < 256) ? id.c_str() : "-illegal-", hb.type()); } } else { if (EOS_LOGS_DEBUG) { eos_static_debug("msg=\"received heartbeat\" identity=%s type=%d", (id.length() < 256) ? id.c_str() : "-illegal-", hb.type()); } } if (hb.statistics_().vsize_mb() != 0.0f) { gFuseServer.Client().HandleStatistics(id, hb.statistics_()); } } break; default: eos_static_err("%s", "msg=\"message type unknown"); } } else { eos_static_debug("msg=\"unable to parse message\": " "id.c_str()=%s, id.length()=%d, id:hex=%s, s.c_str()=%s, s.length()=%d, s:hex=%s", id.c_str(), id.length(), eos::common::stringToHex(id).c_str(), s.c_str(), s.length(), eos::common::stringToHex(s).c_str()); } } } catch (const zmq::error_t& e) { // Shutdown if (e.num() == ETERM) { eos_static_debug("%s", "msg=\"shutdown ZMQ worker ...\""); delete this; } } } EOSMGMNAMESPACE_END