//------------------------------------------------------------------------------ // File: XrdMqClient.hh // Author: Andreas-Joachim Peters - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #define ENOTBLK 15 #include "XrdOuc/XrdOucString.hh" #include "XrdCl/XrdClFile.hh" #include "XrdCl/XrdClFileSystem.hh" #include "common/AssistedThread.hh" #include "common/Logging.hh" #include "mq/XrdMqMessage.hh" //------------------------------------------------------------------------------ //! Class XrdMqClient //------------------------------------------------------------------------------ class XrdMqClient: public eos::common::LogId { public: //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- XrdMqClient(const char* clientid = 0, const char* brokerurl = 0, const char* defaultreceiverid = 0); //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~XrdMqClient(); //---------------------------------------------------------------------------- //! Add broker to the list available to the current mq client //! //! @param broker_url root://host:port//path/?optional_opaque info //! @param advisorystatus mark advisory status //! @param advisoryquery mark advisory query //! @param advisoryflusbacklog mark advisory flush backlog //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool AddBroker(const std::string& broker_url, bool advisorystatus = false, bool advisoryquery = false, bool advisoryflushbacklog = false); //---------------------------------------------------------------------------- //! Send message //! //! @param msg //! @param receiverid //! @param sign //! @param encrypt //! @param asynchronous //! //! @return true if message sent, otherwise false //---------------------------------------------------------------------------- bool SendMessage(XrdMqMessage& msg, const char* receiverid = 0, bool sign = false, bool encrypt = false, bool asynchronous = false); //---------------------------------------------------------------------------- //! Try reading a message from the attached broker //! //! @param assistant thread assistant //! //! @return newly read message or nullptr //---------------------------------------------------------------------------- XrdMqMessage* RecvMessage(ThreadAssistant* assistant = nullptr); //---------------------------------------------------------------------------- //! Receive message from internal buffer //! //! @return message object or nullptr //---------------------------------------------------------------------------- XrdMqMessage* RecvFromInternalBuffer(); //---------------------------------------------------------------------------- //! Reply to a particular message //! //! @param replymsg //! @param inmsg //! @param sign //! @param encrypt //! //! @return true if message sent, otherwise false //---------------------------------------------------------------------------- bool ReplyMessage(XrdMqMessage& replymsg, XrdMqMessage& inmsg, bool sign = false, bool encrypt = false); //---------------------------------------------------------------------------- //! Set the default receiver queue //! //! @param defqueue queue name //---------------------------------------------------------------------------- inline void SetDefaultReceiverQueue(const char* defqueue) { kDefaultReceiverQueue = defqueue; } //---------------------------------------------------------------------------- //! Set client id //! //! @param clientid client id to set //---------------------------------------------------------------------------- inline void SetClientId(const char* clientid) { kClientId = clientid; } //---------------------------------------------------------------------------- //! Get client id //---------------------------------------------------------------------------- inline const char* GetClientId() const { return kClientId.c_str(); } //---------------------------------------------------------------------------- //! Check if initialization (construction) was successful //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- inline bool IsInitOK() const { return kInitOK; } //---------------------------------------------------------------------------- //! Get and reset the new mq broker flag //! //! @return true if the client was redirected to a new broker, otherwise false //---------------------------------------------------------------------------- inline bool GetAndResetNewMqFlag() { return std::atomic_exchange(&mNewMqBroker, false); } //---------------------------------------------------------------------------- //! Convenience operator to send a message //---------------------------------------------------------------------------- bool operator << (XrdMqMessage& msg) { return (*this).SendMessage(msg); } //---------------------------------------------------------------------------- //! Subscribe to all the brokers //! //! @param take_lock //---------------------------------------------------------------------------- void Subscribe(bool take_lock = true); private: //! Map of broker urls to channel objects i.e XrdCl::File object for receiving //! messages and XrdCl::FileSystem for sending messages std::map, std::shared_ptr>> mMapBrokerToChannels; mutable eos::common::RWMutex mMutexMap; XrdOucString kMessageBuffer; XrdOucString kClientId; XrdOucString kDefaultReceiverQueue; char* kRecvBuffer; int kRecvBufferAlloc; size_t kInternalBufferPosition; bool kInitOK; std::atomic mNewMqBroker {true}; std::string mDefaultBrokerUrl; //---------------------------------------------------------------------------- //! Refresh the in/out-bound channels to all the brokers even if we don't //! get any redirect //---------------------------------------------------------------------------- void RefreshBrokersEndpoints(); //---------------------------------------------------------------------------- //! Extract hostname and port from XrdCl hostid info //! //! @param hostid information in the form :@... //! @param hostname info //! @param port info, 1097 by default //---------------------------------------------------------------------------- void ParseXrdClHostId(const std::string& hostid, std::string& hostname, int& port); };