// ---------------------------------------------------------------------- // File: ZMQ.hh // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #ifndef __EOSMGM_ZMQ__HH__ #define __EOSMGM_ZMQ__HH__ #include "mgm/Namespace.hh" #include "mgm/FuseServer/Server.hh" #include #include #include #include EOSMGMNAMESPACE_BEGIN //------------------------------------------------------------------------------ //! Class ZMQ //------------------------------------------------------------------------------ class ZMQ { public: class Task; static FuseServer::Server gFuseServer; ///< Fuse server object //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- explicit ZMQ(const char* URL): mBindUrl(URL) {} //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~ZMQ() { std::cerr << __FUNCTION__ << ":: end of destructor\n"; } //---------------------------------------------------------------------------- //! Start thread handling fuse server proxying //---------------------------------------------------------------------------- void ServeFuse(); std::unique_ptr mTask; ///< Task associated to the ZMQ object //---------------------------------------------------------------------------- //! Class Worker //---------------------------------------------------------------------------- class Worker { public: Worker(zmq::context_t& ctx, int sock_type) : mZmqCtx(ctx), worker_(mZmqCtx, sock_type) {} void work(); private: zmq::context_t& mZmqCtx; zmq::socket_t worker_; }; //---------------------------------------------------------------------------- //! Class Task //---------------------------------------------------------------------------- class Task { public: const static int sMaxThreads = 16; ///< Max number of worker threads //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- explicit Task(std::string& url) : mZmqCtx(1), mFrontend(mZmqCtx, ZMQ_ROUTER), mBackend(mZmqCtx, ZMQ_DEALER), mInjector(mZmqCtx, ZMQ_DEALER), mBindUrl(url) {} //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~Task(); //---------------------------------------------------------------------------- //! Start proxy service //---------------------------------------------------------------------------- void run() noexcept; //---------------------------------------------------------------------------- //! Reply to a client identifier which a pice of data //! //! @param id cilent idnetifier //! @param data data buffer //---------------------------------------------------------------------------- void reply(const std::string& id, const std::string& data); private: zmq::context_t mZmqCtx; ///< ZMQ context for task zmq::socket_t mFrontend; ///< Frontend socket zmq::socket_t mBackend; ///< Backend socket zmq::socket_t mInjector; ///< Injector socket connected to the backedn std::string mBindUrl; ///< URL std::list mWorkerThreads; ///< List of worker threads }; private: std::string mBindUrl; ///< URL }; EOSMGMNAMESPACE_END #endif