//------------------------------------------------------------------------------ //! @file IProcCommand.hh //! @author Elvin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2017 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "mgm/Namespace.hh" #include "common/Mapping.hh" #include "common/Logging.hh" #include "proto/ConsoleReply.pb.h" #include "proto/ConsoleRequest.pb.h" #include "XrdSfs/XrdSfsInterface.hh" #include #include //! Forward declarations class XrdOucErrInfo; namespace Json { class Value; } EOSMGMNAMESPACE_BEGIN //------------------------------------------------------------------------------ //! Class IProcCommand - interface that needs to be implemented by all types //! of commands executed by the MGM. //------------------------------------------------------------------------------ class IProcCommand: public eos::common::LogId { public: //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- IProcCommand(): mHasSlot(false), mExecRequest(false), mReqProto(), mDoAsync(false), mForceKill(false), mVid(), mComment(), mRoutingInfo(), stdOut(""), stdErr(""), stdJson(""), retc(0), mTmpResp() { mTimestamp = time(NULL); } //---------------------------------------------------------------------------- //! Constructor //! //! @param req client protobuf request //! @param vid client virtual identity //! @param async if true then use thread pool to execute the command //---------------------------------------------------------------------------- IProcCommand(eos::console::RequestProto&& req, eos::common::VirtualIdentity& vid, bool async): IProcCommand() { mReqProto = req; mDoAsync = async; mVid = vid; mComment = req.comment().c_str(); } //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- virtual ~IProcCommand() { mForceKill.store(true); if (ofstdoutStream.is_open()) { ofstdoutStream.close(); } unlink(ofstdoutStreamFilename.c_str()); if (ofstderrStream.is_open()) { ofstderrStream.close(); } unlink(ofstderrStreamFilename.c_str()); if (mHasSlot) { std::unique_lock lock(mMapCmdsMutex); --mCmdsExecuting[mReqProto.command_case()]; } } //---------------------------------------------------------------------------- //! Open a proc command e.g. call the appropriate user or admin command and //! store the output in a resultstream of in case of find in temporary output //! files. //! //! @param inpath path indicating user or admin command //! @param info CGI describing the proc command //! @param vid_in virtual identity of the user requesting a command //! @param error object to store errors //! //! @return SFS_OK in any case //---------------------------------------------------------------------------- virtual int open(const char* path, const char* info, eos::common::VirtualIdentity& vid, XrdOucErrInfo* error); //---------------------------------------------------------------------------- //! Read a part of the result stream created during open //! //! @param boff offset where to start //! @param buff buffer to store stream //! @param blen len to return //! //! @return number of bytes read //---------------------------------------------------------------------------- virtual size_t read(XrdSfsFileOffset offset, char* buff, XrdSfsXferSize blen); //---------------------------------------------------------------------------- //! Get the size of the result stream //! //! @param buf stat structure to fill //! //! @return SFS_OK in any case //---------------------------------------------------------------------------- virtual int stat(struct stat* buf) { off_t size = 0; if (readStdOutStream) { ifstdoutStream.seekg(0, ifstdoutStream.end); size += ifstdoutStream.tellg(); ifstdoutStream.seekg(0, ifstdoutStream.beg); ifstderrStream.seekg(0, ifstderrStream.end); size += ifstderrStream.tellg(); ifstderrStream.seekg(0, ifstderrStream.beg); iretcStream.seekg(0, iretcStream.end); size += iretcStream.tellg(); iretcStream.seekg(0, iretcStream.beg); } else { size = mTmpResp.length(); } memset(buf, 0, sizeof(struct stat)); buf->st_size = size; return SFS_OK; } //---------------------------------------------------------------------------- //! Close the proc stream //! //! @return 0 if comment has been successfully stored otherwise != 0 //---------------------------------------------------------------------------- virtual int close() { if (ifstdoutStream.is_open()) { ifstdoutStream.close(); } if (ifstderrStream.is_open()) { ifstderrStream.close(); } return SFS_OK; } virtual std::string GetCmd(const char* cgi = 0) { return "proto"; } //---------------------------------------------------------------------------- //! Method implementing the specific behavior of the command executed //---------------------------------------------------------------------------- virtual eos::console::ReplyProto ProcessRequest() noexcept = 0; //---------------------------------------------------------------------------- //! Launch command asynchronously, creating the corresponding promise and //! future //---------------------------------------------------------------------------- virtual void LaunchJob() final; //---------------------------------------------------------------------------- //! Check if we can safely delete the current object as there is no async //! thread executing the ProcessResponse method //! //! @return true if deletion if safe, otherwise false //---------------------------------------------------------------------------- virtual bool KillJob() final; //---------------------------------------------------------------------------- //! Return the result //---------------------------------------------------------------------------- virtual const char* GetResult(size_t& size) const { return "bla"; } virtual void SetError(XrdOucErrInfo* error) {}; protected: virtual bool OpenTemporaryOutputFiles(); virtual bool CloseTemporaryOutputFiles(); //---------------------------------------------------------------------------- //! Retrieve the file's full path given its numeric id. //! This method executes under the NamespaceView lock. //! //! @param path full path of the file //! @param fid file numeric id //! @param err_msg_prefix error message to be displayed in case of exception //! @return retc return code //---------------------------------------------------------------------------- void GetPathFromFid(XrdOucString& path, unsigned long long fid, const std::string& err_msg_prefix); // drop when we drop non-proto commands using it int GetPathFromFid(std::string& path, unsigned long long fid, std::string& err_msg); //---------------------------------------------------------------------------- //! Retrieve the container's full path given its numeric id. //! This method executes under the NamespaceView lock. //! //! @param path full path of the container //! @param cid container numeric id //! @param err_msg_prefix error message to be displayed in case of exception //---------------------------------------------------------------------------- void GetPathFromCid(XrdOucString& path, unsigned long long cid, const std::string& err_msg_prefix); // drop when we drop non-proto commands using it int GetPathFromCid(std::string& path, unsigned long long cid, std::string& err_msg); //---------------------------------------------------------------------------- //! Format console output string as json. //! //! @note //! This will work only if the given output follows = format. //! Also, provided values must follow a proper JSON hierarchy ! //! //! Although the function tries to correct some values, //! the correction is not exhaustive. //! //! Valid example: //! stat.drain.status= / stat.drain.otherkey= //! //! Invalid example: //! stat.drain= / stat.drain.status= (will throw an exception) //! //! @param stdOut console output string //! //! @return jsonOut json formatted output //---------------------------------------------------------------------------- static Json::Value ConvertOutputToJsonFormat(const std::string& stdOut); //---------------------------------------------------------------------------- //! Create a JSON string from the command output, error and return code. //! //! @param out console output string //! @param err console error string //! @param retc console return code //! //! @return jsonOut json response string containing output, error //! and return code //---------------------------------------------------------------------------- std::string ResponseToJsonString(const std::string& out, const std::string& err = "", int rc = 0); //---------------------------------------------------------------------------- //! Indicate whether output should be in JSON format //---------------------------------------------------------------------------- inline bool WantsJsonOutput() { return mReqProto.format() == eos::console::RequestProto::JSON; } //---------------------------------------------------------------------------- //! Check if operation forbidden //! //! @param path path of the request //! @param vid client virtual identity //! @param err_check output error message //! @param errno_check output errno in case of errors //! //! @return true if operation forbidden, false otherwise //---------------------------------------------------------------------------- bool IsOperationForbidden(const std::string& path, const eos::common::VirtualIdentity& vid, std::string& err_check, int& errno_check) const; //---------------------------------------------------------------------------- //! Check if a routing redirect should happen. //! //! @note //! In case routing is needed, fills the routing info object //! and sets the reply return code to SFS_REDIRECT. //! //! @param path path to route //! @param reply the reply proto object //! //! @return true if should route, false otherwise //---------------------------------------------------------------------------- bool ShouldRoute(const std::string& path, eos::console::ReplyProto& reply); //---------------------------------------------------------------------------- //! Check if there is still an available slot for the current type of command //! in the queue served by the thread pool //! //! @return true if command can be queued, otherwise false //---------------------------------------------------------------------------- bool HasSlot(); //---------------------------------------------------------------------------- //! Store routing information //---------------------------------------------------------------------------- struct RoutingInfo { std::string path; std::string host; int port; int stall_timeout; }; static std::atomic_uint_least64_t uuid; static std::mutex mMapCmdsMutex; ///< Mutex protecting the cmds map //! Map of command types to number of commands actually queued static std::map mCmdsExecuting; //! Indicate if current command has taken a slot in the queue std::atomic mHasSlot; bool mExecRequest; ///< Indicate if request is launched asynchronously eos::console::RequestProto mReqProto; ///< Client request protobuf object std::future mFuture; ///< Response future bool mDoAsync; ///< If true use thread pool to do the work std::atomic mForceKill; ///< Flag to notify worker thread eos::common::VirtualIdentity mVid; ///< Copy of original vid time_t mTimestamp; ///< Timestamp of the proc command XrdOucString mComment; ///< Comment issued by the user for the proc command RoutingInfo mRoutingInfo; ///< Routing information of the proc command XrdOucString stdOut; ///< stdOut returned by proc command XrdOucString stdErr; ///< stdErr returned by proc command XrdOucString stdJson; ///< JSON output returned by proc command int retc; ///< Return code from the proc command std::string mTmpResp; ///< String used for streaming the response std::ofstream ofstdoutStream; std::ofstream ofstderrStream; std::string ofstdoutStreamFilename; std::string ofstderrStreamFilename; std::ifstream ifstdoutStream; std::ifstream ifstderrStream; std::istringstream iretcStream; bool readStdOutStream {false}; bool readStdErrStream {false}; bool readRetcStream {false}; }; EOSMGMNAMESPACE_END