//------------------------------------------------------------------------------ // File: XrdMqClient.cc // Author: Andreas-Joachim Peters - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mq/XrdMqClient.hh" #include #include #include #include #include #include #include #include //------------------------------------------------------------------------------ // Signal Handler for SIGBUS //------------------------------------------------------------------------------ static sigjmp_buf xrdmqclient_sj_env; static void xrdmqclient_sigbus_hdl(int sig, siginfo_t* siginfo, void* ptr) { siglongjmp(xrdmqclient_sj_env, 1); } //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XrdMqClient::XrdMqClient(const char* clientid, const char* brokerurl, const char* defaultreceiverid) { kInitOK = true; kMessageBuffer = ""; kRecvBuffer = nullptr; kRecvBufferAlloc = 0; kInternalBufferPosition = 0; // Install sigbus signal handler struct sigaction act; memset(&act, 0, sizeof(act)); act.sa_sigaction = xrdmqclient_sigbus_hdl; act.sa_flags = SA_SIGINFO; if (sigaction(SIGBUS, &act, 0)) { fprintf(stderr, "error: [XrdMqClient] cannot install SIGBUS handler\n"); } // Set short timeout resolution, connection window, connection retry and // stream error window. XrdCl::DefaultEnv::GetEnv()->PutInt("TimeoutResolution", 1); XrdCl::DefaultEnv::GetEnv()->PutInt("ConnectionWindow", 5); XrdCl::DefaultEnv::GetEnv()->PutInt("ConnectionRetry", 1); XrdCl::DefaultEnv::GetEnv()->PutInt("StreamErrorWindow", 0); if (brokerurl && !AddBroker(brokerurl)) { fprintf(stderr, "error: [XrdMqClient] cannot add broker %s\n", brokerurl); } if (defaultreceiverid) { kDefaultReceiverQueue = defaultreceiverid; } else { // Default receiver is always a master kDefaultReceiverQueue = "/xmessage/*/master/*"; } if (clientid) { kClientId = clientid; if (kClientId.beginswith("root://")) { // Truncate the URL away int pos = kClientId.find("//", 7); if (pos != STR_NPOS) { kClientId.erase(0, pos + 1); } } } else { // By default create the client id as /xmesssage/// int ppos = 0; char* cfull_name = XrdNetUtils::MyHostName(0); if (!cfull_name) { kInitOK = false; } XrdOucString FullName = cfull_name; XrdOucString HostName = FullName; XrdOucString Domain = FullName; if ((ppos = FullName.find(".")) != STR_NPOS) { HostName.assign(FullName, 0, ppos - 1); Domain.assign(FullName, ppos + 1); } else { Domain = "unknown"; } kClientId = "/xmessage/"; kClientId += HostName; kClientId += "/"; kClientId += Domain; free(cfull_name); } } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ XrdMqClient::~XrdMqClient() { eos::common::RWMutexWriteLock wr_lock(mMutexMap); for (const auto& broker : mMapBrokerToChannels) { auto st = broker.second.first->Close(1); if (!st.IsOK()) { eos_static_info("XrdMqClient error closing url=\"%s\" (%p) err=\"%s\"", broker.first.c_str(), (void*)broker.second.first.get(), st.ToString().c_str()); } } mMapBrokerToChannels.clear(); if (kRecvBuffer) { free(kRecvBuffer); kRecvBuffer = nullptr; } } //------------------------------------------------------------------------------ // AddBroker //------------------------------------------------------------------------------ bool XrdMqClient::AddBroker(const std::string& broker_url, bool advisorystatus, bool advisoryquery, bool advisoryflushbacklog) { if (broker_url.empty()) { eos_static_err("%s", "msg=\"cannot add empty broker url\""); return false; } std::ostringstream oss; oss << broker_url; if (broker_url.find("?") == std::string::npos) { oss << "?"; } else { oss << "&"; } oss << XMQCADVISORYSTATUS << "=" << advisorystatus << "&" << XMQCADVISORYQUERY << "=" << advisoryquery << "&" << XMQCADVISORYFLUSHBACKLOG << "=" << advisoryflushbacklog; std::string new_url = oss.str(); mDefaultBrokerUrl = new_url; // Check validity of the new broker url XrdCl::URL xrd_url(new_url); if (!xrd_url.IsValid()) { eos_static_err("msg=\"invalid url\" url=\"%s\"", new_url.c_str()); return false; } eos_static_info("msg=\"add broker\" url=\"%s\"", new_url.c_str()); eos::common::RWMutexWriteLock wr_lock(mMutexMap); if (mMapBrokerToChannels.find(new_url) != mMapBrokerToChannels.end()) { eos_static_err("msg=\"broker already exists\" url=\"%s\"", new_url.c_str()); return false; } auto ret = mMapBrokerToChannels.emplace (new_url, std::make_pair(std::make_shared(), std::make_shared(xrd_url))); if (!ret.second) { eos_static_err("msg=\"failed to create broker channels\" url=\"%s\"", new_url.c_str()); return false; } eos_static_info("XrdMqClient new file and filesystem objects for url=\"%s\" " "are (%p,%p)", new_url.c_str(), (void*)ret.first->second.first.get(), (void*)ret.first->second.second.get()); return true; } //------------------------------------------------------------------------------ // SendMessage //------------------------------------------------------------------------------ bool XrdMqClient::SendMessage(XrdMqMessage& msg, const char* receiverid, bool sign, bool encrypt, bool asynchronous) { bool rc = false; // Only one send message at a time static std::mutex s_mutex_send; std::unique_lock lock(s_mutex_send); // Tag the sender msg.kMessageHeader.kSenderId = kClientId; // Tag the send time XrdMqMessageHeader::GetTime(msg.kMessageHeader.kSenderTime_sec, msg.kMessageHeader.kSenderTime_nsec); // Tag the receiver queue if (!receiverid) { msg.kMessageHeader.kReceiverQueue = kDefaultReceiverQueue; } else { msg.kMessageHeader.kReceiverQueue = receiverid; } if (encrypt) { msg.Sign(true); } else { if (sign) { msg.Sign(false); } else { msg.Encode(); } } XrdOucString message = msg.kMessageHeader.kReceiverQueue; message += "?"; message += msg.GetMessageBuffer(); if (message.length() > (2 * 1000 * 1000)) { fprintf(stderr, "XrdMqClient::SendMessage: error => trying to send message " "with size %d [limit is 2M]\n", message.length()); XrdMqMessage::Eroute.Emsg("SendMessage", E2BIG, "The message exceeds the maximum size of 2M!"); return false; } int all_ok = true; { eos::common::RWMutexReadLock rd_lock(mMutexMap); for (const auto& broker : mMapBrokerToChannels) { XrdCl::Buffer arg; XrdCl::XRootDStatus status; uint16_t timeout = (getenv("EOS_FST_OP_TIMEOUT") ? atoi(getenv("EOS_FST_OP_TIMEOUT")) : 0); XrdCl::Buffer* response_raw {nullptr}; std::unique_ptr response {nullptr}; auto send_channel = broker.second.second; arg.FromString(message.c_str()); if (asynchronous) { // Don't wait for responses if not required const std::string surl = broker.first; auto discard_handler = XrdCl::ResponseHandler::Wrap ([ = ](XrdCl::XRootDStatus & s, XrdCl::AnyObject & r) mutable { if (!s.IsOK()) { eos_static_err("XrdMqClient error on query async-result url=\"%s\" " "(%p) err=\"%s\"", surl.c_str(), (void*)send_channel.get(), s.ToString().c_str()); } // Make sure we extend the lifetime of the XrdCl::FileSystem // object until this handler is called, otherwise if we delete // the FS object it could happen that an async response for it // will trigger a crash. E.g. AssignLBHandler send_channel.reset(); }); status = send_channel->Query(XrdCl::QueryCode::OpaqueFile, arg, discard_handler, timeout); if (!status.IsOK()) { delete discard_handler; } } else { status = send_channel->Query(XrdCl::QueryCode::OpaqueFile, arg, response_raw, timeout); response.reset(response_raw); response_raw = nullptr; } rc = status.IsOK(); if (!rc) { eos_static_err("XrdMqClient error querying async=%d url=\"%s\" " "(%p) err=\"%s\"", asynchronous ? 1 : 0, broker.first.c_str(), (void*)send_channel.get(), status.ToString().c_str()); } // We continue until any of the brokers accepts the message if (!rc) { all_ok = false; eos_err("msg=\"failed to send message\" dst=\"%s\" msg=\"%s\"", broker.first.c_str(), message.c_str()); XrdMqMessage::Eroute.Emsg("SendMessage", status.errNo, status.GetErrorMessage().c_str()); } } } if (!all_ok) { RefreshBrokersEndpoints(); } return rc; } //------------------------------------------------------------------------------ // Reply to a particular message //------------------------------------------------------------------------------ bool XrdMqClient::ReplyMessage(XrdMqMessage& replymsg, XrdMqMessage& inmsg, bool sign, bool encrypt) { replymsg.SetReply(inmsg); return SendMessage(replymsg, inmsg.kMessageHeader.kSenderId.c_str(), sign, encrypt); } //------------------------------------------------------------------------------ // Receive message //------------------------------------------------------------------------------ XrdMqMessage* XrdMqClient::RecvMessage(ThreadAssistant* assistant) { std::shared_ptr recv_channel; eos::common::RWMutexReadLock rd_lock(mMutexMap); if (mMapBrokerToChannels.size() != 1) { eos_static_err("msg=\"no support for multi-broker setup or no broker " "registered\" map_size=%i", mMapBrokerToChannels.size()); return nullptr; } // Single broker case - check if there is still a buffered message XrdMqMessage* message; message = RecvFromInternalBuffer(); if (message) { return message; } uint16_t timeout = (getenv("EOS_FST_OP_TIMEOUT") ? atoi(getenv("EOS_FST_OP_TIMEOUT")) : 0); XrdCl::StatInfo* stinfo = nullptr; recv_channel = mMapBrokerToChannels.begin()->second.first; XrdCl::XRootDStatus status; while (!(status=recv_channel->Stat(true, stinfo, timeout)).IsOK()) { // Any error on stat requires a refresh of the broker endpoints eos_static_err("XrdMqClient error stating url=\"%s\" (%p) err=\"%s\"", mMapBrokerToChannels.begin()->first.c_str(), (void*)recv_channel.get(), status.ToString().c_str()); rd_lock.Release(); RefreshBrokersEndpoints(); rd_lock.Grab(mMutexMap); if (mMapBrokerToChannels.empty()) { eos_static_err("%s", "msg=\"no broker registered\""); return nullptr; } recv_channel = mMapBrokerToChannels.begin()->second.first; if (assistant) { assistant->wait_for(std::chrono::seconds(2)); if (assistant->terminationRequested()) { return nullptr; } } else { std::this_thread::sleep_for(std::chrono::seconds(2)); } } if (stinfo->GetSize() == 0) { delete stinfo; return nullptr; } // Mantain a receiver buffer which fits the need if (kRecvBufferAlloc < (int) stinfo->GetSize()) { uint64_t allocsize = 1024 * 1024; if (stinfo->GetSize() > allocsize) { allocsize = stinfo->GetSize() + 1; } kRecvBuffer = static_cast(realloc(kRecvBuffer, allocsize)); if (!kRecvBuffer) { // Fatal - we exit! exit(-1); } kRecvBufferAlloc = allocsize; } // Read all messages uint32_t nread = 0; status = recv_channel->Read(0, stinfo->GetSize(), kRecvBuffer, nread); if (!status.IsOK()) { eos_static_err("XrdMqClient error reading url=\"%s\" (%p) err=\"%s\"", mMapBrokerToChannels.begin()->first.c_str(), (void*)recv_channel.get(), status.ToString().c_str()); } if (status.IsOK() && (nread > 0)) { kRecvBuffer[nread] = 0; // Add to the internal message buffer kInternalBufferPosition = 0; kMessageBuffer = kRecvBuffer; } delete stinfo; return RecvFromInternalBuffer(); } //------------------------------------------------------------------------------ // Receive message from internal buffer //------------------------------------------------------------------------------ XrdMqMessage* XrdMqClient::RecvFromInternalBuffer() { if ((kMessageBuffer.length() - kInternalBufferPosition) > 0) { // fprintf( stderr,"Message Buffer %ld\n", kMessageBuffer.length()); // there is still a message in the buffer int nextmessage; int firstmessage; // fprintf( stderr,"#### %ld Entering at position %ld %ld\n", time(NULL), // kInternalBufferPosition, kMessageBuffer.length() ); firstmessage = kMessageBuffer.find(XMQHEADER, kInternalBufferPosition); if (firstmessage == STR_NPOS) { return 0; } else { if ((firstmessage > 0) && ((size_t) firstmessage > kInternalBufferPosition)) { kMessageBuffer.erase(0, firstmessage); kInternalBufferPosition = 0; } } if ((kMessageBuffer.length() + kInternalBufferPosition) < (int) strlen(XMQHEADER)) { return 0; } nextmessage = kMessageBuffer.find(XMQHEADER, kInternalBufferPosition + strlen(XMQHEADER)); char savec = 0; if (nextmessage != STR_NPOS) { savec = kMessageBuffer.c_str()[nextmessage]; ((char*) kMessageBuffer.c_str())[nextmessage] = 0; } XrdMqMessage* message = XrdMqMessage::Create(kMessageBuffer.c_str() + kInternalBufferPosition); if (!message) { fprintf(stderr, "couldn't get any message\n"); return 0; } XrdMqMessageHeader::GetTime(message->kMessageHeader.kReceiverTime_sec, message->kMessageHeader.kReceiverTime_nsec); if (nextmessage != STR_NPOS) { ((char*) kMessageBuffer.c_str())[nextmessage] = savec; } if (nextmessage == STR_NPOS) { // Last message kMessageBuffer = ""; kInternalBufferPosition = 0; } else { // Move forward //kMessageBuffer.erase(0,nextmessage); kInternalBufferPosition = nextmessage; } return message; } else { kMessageBuffer = ""; kInternalBufferPosition = 0; } return 0; } //------------------------------------------------------------------------------ // Refresh the in/out-bound channels to all the brokers //------------------------------------------------------------------------------ void XrdMqClient::RefreshBrokersEndpoints() { // Only one refresh at a time static std::mutex s_mutex_refresh; std::unique_lock lock(s_mutex_refresh); std::map endpoint_replacements; { // Collect broker endpoints that need to be updated eos::common::RWMutexReadLock rd_lock(mMutexMap); for (const auto& broker : mMapBrokerToChannels) { XrdCl::File file; std::string new_hostid; // Create a new dummy url since the current one might be actually working // and check if we get redirected XrdCl::URL tmp_url(broker.first); tmp_url.SetPath(tmp_url.GetPath() + "_mq_test"); if (!tmp_url.IsValid()) { eos_static_err("msg=\"invalid url\" url=\"%s\"", tmp_url.GetURL().c_str()); std::abort(); } XrdCl::XRootDStatus st = file.Open(tmp_url.GetURL(), XrdCl::OpenFlags::Read); if (!st.IsOK()) { eos_static_err("XrdMqClient error opening url=\"%s\" (%p) err=\"%s\"", tmp_url.GetURL().c_str(), (void*)&file, st.ToString().c_str()); } // Skip if we can't contact or we couldn't get the property if (!st.IsOK() || !file.GetProperty("DataServer", new_hostid)) { eos_static_err("msg=\"failed to contact broker\" url=\"%s\"", tmp_url.GetURL().c_str()); st = file.Close(1); if (!st.IsOK()) { eos_static_info("XrdMqClient error closing url=\"%s\" (%p) err=\"%s\"", tmp_url.GetURL().c_str(), (void*)&file, st.ToString().c_str()); } if (mDefaultBrokerUrl != broker.first) { eos_static_info("msg=\"refresh broker endpoint\" old_url=\"%s\" " "default_url=\"%s\"", broker.first.c_str(), mDefaultBrokerUrl.c_str()); endpoint_replacements.emplace(broker.first, mDefaultBrokerUrl); } break; } st = file.Close(1); if (!st.IsOK()) { eos_static_info("XrdMqClient error closing url=\"%s\" (%p) err=\"%s\"", tmp_url.GetURL().c_str(), (void*)&file, st.ToString().c_str()); } // Extract hostname and port from new_hostid int new_port; std::string new_hostname; ParseXrdClHostId(new_hostid, new_hostname, new_port); XrdCl::URL new_url(broker.first); const std::string old_host_id {new_url.GetHostId()}; // Update the new url endpoint new_url.SetHostPort(new_hostname, new_port); if (!new_url.IsValid()) { eos_static_err("msg=\"skip adding invalid new broker url\", " "new_url=\"%s\"", new_url.GetURL().c_str()); continue; } if (new_url.GetHostId() == old_host_id) { // The new endpoint is the same as the old one, therefore we trigger // a reconnection only if stat of the endpoint fails - this can happen // when the MQ is restarted without an MGM restart. auto recv_channel = broker.second.first; XrdCl::StatInfo* stinfo {nullptr}; auto st = recv_channel->Stat(true, stinfo); delete stinfo; if (st.IsOK()) { continue; } eos_static_err("XrdMqClient error stating url=\"%s\" (%p) err=\"%s\"", broker.first.c_str(), (void*)recv_channel.get(), st.ToString().c_str()); } eos_static_info("msg=\"refresh broker endpoint\" old_url=\"%s\" " "new_url=\"%s\"", broker.first.c_str(), new_url.GetURL().c_str()); endpoint_replacements.emplace(broker.first, new_url.GetURL()); } } if (endpoint_replacements.empty()) { return; } eos::common::RWMutexWriteLock wr_lock(mMutexMap); for (const auto& replace : endpoint_replacements) { auto it_old = mMapBrokerToChannels.find(replace.first); if (it_old == mMapBrokerToChannels.end()) { continue; } // Close old receive channel with small timeout to avoid any hangs auto recv_channel = it_old->second.first; auto tmp_stat = recv_channel->Close(1); if (!tmp_stat.IsOK()) { eos_static_info("XrdMqClient error closing url=\"%s\" (%p) err=\"%s\"", it_old->first.c_str(), (void*)recv_channel.get(), tmp_stat.ToString().c_str()); } const XrdCl::File *old_fp = it_old->second.first.get(); const XrdCl::FileSystem *old_fsp = it_old->second.second.get(); mMapBrokerToChannels.erase(it_old); XrdCl::URL xrd_url(replace.second); auto ret = mMapBrokerToChannels.emplace (replace.second, std::make_pair(std::make_shared(), std::make_shared(xrd_url))); if (!ret.second) { eos_static_err("msg=\"failed to create broker channels\" url=\"%s\"", replace.second.c_str()); continue; } else { eos_static_info("msg=\"successfully added new broker\" url=\"%s\"", xrd_url.GetURL().c_str()); eos_static_info("XrdMqClient created replacement objects for url=\"%s\" " "(%p,%p -> %p,%p)", xrd_url.GetURL().c_str(),old_fp,old_fsp, (void*)ret.first->second.first.get(), (void*)ret.first->second.second.get()); } } // Subscribe receiving channel to the new broker Subscribe(false); mNewMqBroker = true; } //------------------------------------------------------------------------------ // Subscribe //------------------------------------------------------------------------------ void XrdMqClient::Subscribe(bool take_lock) { eos::common::RWMutexReadLock rd_lock; if (take_lock) { rd_lock.Grab(mMutexMap); } for (const auto& broker : mMapBrokerToChannels) { std::string surl = broker.first; auto recv_channel = broker.second.first; auto st = recv_channel->Open(surl.c_str(), XrdCl::OpenFlags::Read); if (!st.IsOK()) { eos_static_err("XrdMqClient error opening url=\"%s\" (%p) err=\"%s\"", surl.c_str(), (void*)recv_channel.get(), st.ToString().c_str()); } if (st.IsOK()) { eos_static_info("msg=\"successfully subscribed to broker\" url=\"%s\"", surl.c_str()); // Check if we were redirected to another MQ std::string new_hostid; if (!recv_channel->GetProperty("DataServer", new_hostid)) { eos_static_err("msg=\"failed to get DataServer for file\" url=\"%s\"", surl.c_str()); continue; } int new_port; std::string new_hostname; ParseXrdClHostId(new_hostid, new_hostname, new_port); XrdCl::URL old_url(surl); // We got redirected to a new mq if (old_url.GetHostId() != new_hostid) { eos_static_info("msg=\"got redirection to new MQ\" host_id=%s", new_hostid.c_str()); XrdCl::URL new_url(surl); new_url.SetHostPort(new_hostname, new_port); const XrdCl::File *old_fp = broker.second.first.get(); const XrdCl::FileSystem *old_fsp = broker.second.second.get(); recv_channel = std::make_shared(); st = recv_channel->Open(new_url.GetURL(), XrdCl::OpenFlags::Read); if (!st.IsOK()) { eos_static_err("XrdMqClient error opening url=\"%s\" (%p) err=\"%s\"", new_url.GetURL().c_str(), (void*)recv_channel.get(), st.ToString().c_str()); eos_static_err("msg=\"failed opening file to new MQ\" url=\"%s\"", new_url.GetURL().c_str()); continue; } // Delete old broker and add a new one mMapBrokerToChannels.erase(surl); auto ret = mMapBrokerToChannels.emplace(new_url.GetURL(), std::make_pair(recv_channel, std::make_shared(new_url))); if (ret.second) { eos_static_info("XrdMqClient created replacement objects for old_url=\"%s\" " "new_url=\"%s\" (%p,%p -> %p,%p)", surl.c_str(), new_url.GetURL().c_str(),old_fp,old_fsp, (void*)ret.first->second.first.get(), (void*)ret.first->second.second.get()); } break; } } else { eos_static_err("msg=\"failed to subscribe to broker\" url=\"%s\"", surl.c_str()); if (mDefaultBrokerUrl != surl) { eos_static_info("msg=\"put back default broker url\" url=\%s\"", mDefaultBrokerUrl.c_str()); const XrdCl::File *old_fp = broker.second.first.get(); const XrdCl::FileSystem *old_fsp = broker.second.second.get(); mMapBrokerToChannels.erase(surl); recv_channel = std::make_shared(); XrdCl::URL default_url(mDefaultBrokerUrl); auto ret = mMapBrokerToChannels.emplace(mDefaultBrokerUrl, std::make_pair(recv_channel, std::make_shared(default_url))); if (ret.second) { eos_static_info("XrdMqClient created replacement objects for old_url=\"%s\" " "new_url=\"%s\" (%p,%p -> %p,%p)", surl.c_str(), mDefaultBrokerUrl.c_str(),old_fp,old_fsp, (void*)ret.first->second.first.get(), (void*)ret.first->second.second.get()); } } break; } } } //------------------------------------------------------------------------------ // Extract hostname and port from XrdCl hostid info //------------------------------------------------------------------------------ void XrdMqClient::ParseXrdClHostId(const std::string& hostid, std::string& hostname, int& port) { port = 1097; // by default hostname = hostid; size_t pos = hostname.find('@'); if (pos != std::string::npos) { hostname = hostname.substr(pos + 1); } pos = hostname.find(':'); // Extract hostname and port if (pos != std::string::npos) { try { port = std::stoi(hostname.substr(pos + 1)); } catch (...) { // ignore any conversion errors } hostname = hostname.substr(0, pos); } }