00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
00026 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
00027
00028 #include "XrdCl/XrdClPostMasterInterfaces.hh"
00029 #include "XrdCl/XrdClXRootDResponses.hh"
00030 #include "XrdCl/XrdClDefaultEnv.hh"
00031 #include "XrdCl/XrdClMessage.hh"
00032 #include "XProtocol/XProtocol.hh"
00033 #include "XrdCl/XrdClLog.hh"
00034 #include "XrdCl/XrdClConstants.hh"
00035
00036 #include <sys/uio.h>
00037
00038 #include <list>
00039 #include <memory>
00040
00041 namespace XrdCl
00042 {
00043 class PostMaster;
00044 class SIDManager;
00045 class URL;
00046 class LocalFileHandler;
00047
00048
00049
00050
00051 struct RedirectEntry
00052 {
00053 RedirectEntry( const URL &from, const URL &to ) : from( from ), to( to )
00054 {
00055
00056 }
00057
00058 URL from;
00059 URL to;
00060 XRootDStatus status;
00061
00062 std::string ToString( bool prevok = true )
00063 {
00064 const std::string tostr = to.GetLocation();
00065 const std::string fromstr = from.GetLocation();
00066
00067 if( prevok )
00068 {
00069 if( tostr == fromstr )
00070 return "Retrying: " + tostr;
00071 return "Redirected from: " + fromstr + " to: " + tostr;
00072 }
00073 return "Failed at: " + fromstr + ", retrying at: " + tostr;
00074 }
00075 };
00076
00077
00079
00080 class XRootDMsgHandler: public IncomingMsgHandler,
00081 public OutgoingMsgHandler
00082 {
00083 friend class HandleRspJob;
00084
00085 public:
00086
00095
00096 XRootDMsgHandler( Message *msg,
00097 ResponseHandler *respHandler,
00098 const URL *url,
00099 SIDManager *sidMgr,
00100 LocalFileHandler *lFileHandler):
00101 pRequest( msg ),
00102 pResponse( 0 ),
00103 pResponseHandler( respHandler ),
00104 pUrl( *url ),
00105 pSidMgr( sidMgr ),
00106 pLFileHandler( lFileHandler ),
00107 pExpiration( 0 ),
00108 pRedirectAsAnswer( false ),
00109 pHosts( 0 ),
00110 pHasLoadBalancer( false ),
00111 pHasSessionId( false ),
00112 pChunkList( 0 ),
00113 pRedirectCounter( 0 ),
00114
00115 pAsyncOffset( 0 ),
00116 pAsyncReadSize( 0 ),
00117 pAsyncReadBuffer( 0 ),
00118 pAsyncMsgSize( 0 ),
00119
00120 pReadRawStarted( false ),
00121 pReadRawCurrentOffset( 0 ),
00122
00123 pReadVRawMsgOffset( 0 ),
00124 pReadVRawChunkHeaderDone( false ),
00125 pReadVRawChunkHeaderStarted( false ),
00126 pReadVRawSizeError( false ),
00127 pReadVRawChunkIndex( 0 ),
00128 pReadVRawMsgDiscard( false ),
00129
00130 pOtherRawStarted( false ),
00131
00132 pFollowMetalink( false ),
00133
00134 pStateful( false ),
00135
00136 pAggregatedWaitTime( 0 ),
00137
00138 pMsgInFly( false )
00139
00140 {
00141 pPostMaster = DefaultEnv::GetPostMaster();
00142 if( msg->GetSessionId() )
00143 pHasSessionId = true;
00144 memset( &pReadVRawChunkHeader, 0, sizeof( readahead_list ) );
00145
00146 Log *log = DefaultEnv::GetLog();
00147 log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
00148 pUrl.GetHostId().c_str(), this,
00149 pRequest->GetDescription().c_str() );
00150 }
00151
00152
00154
00155 ~XRootDMsgHandler()
00156 {
00157 DumpRedirectTraceBack();
00158
00159 if( !pHasSessionId )
00160 delete pRequest;
00161 delete pResponse;
00162 std::vector<Message *>::iterator it;
00163 for( it = pPartialResps.begin(); it != pPartialResps.end(); ++it )
00164 delete *it;
00165
00166 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
00167 pResponse = reinterpret_cast<Message*>( 0xDEADBEEF );
00168 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
00169 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
00170 pSidMgr = reinterpret_cast<SIDManager*>( 0xDEADBEEF );
00171 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
00172 pHosts = reinterpret_cast<HostList*>( 0xDEADBEEF );
00173 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
00174
00175 Log *log = DefaultEnv::GetLog();
00176 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
00177 pUrl.GetHostId().c_str(), this );
00178 }
00179
00180
00186
00187 virtual uint16_t Examine( Message *msg );
00188
00189
00193
00194 virtual uint16_t GetSid() const;
00195
00196
00200
00201 virtual void Process( Message *msg );
00202
00203
00213
00214 virtual Status ReadMessageBody( Message *msg,
00215 int socket,
00216 uint32_t &bytesRead );
00217
00218
00224
00225 virtual uint8_t OnStreamEvent( StreamEvent event,
00226 uint16_t streamNum,
00227 Status status );
00228
00229
00231
00232 virtual void OnStatusReady( const Message *message,
00233 Status status );
00234
00235
00237
00238 virtual bool IsRaw() const;
00239
00240
00249
00250 Status WriteMessageBody( int socket,
00251 uint32_t &bytesRead );
00252
00253
00258
00259 ChunkList* GetMessageBody( uint32_t *&asyncOffset )
00260 {
00261 asyncOffset = &pAsyncOffset;
00262 return pChunkList;
00263 }
00264
00265
00269
00270 void WaitDone( time_t now );
00271
00272
00274
00275 void SetExpiration( time_t expiration )
00276 {
00277 pExpiration = expiration;
00278 }
00279
00280
00283
00284 void SetRedirectAsAnswer( bool redirectAsAnswer )
00285 {
00286 pRedirectAsAnswer = redirectAsAnswer;
00287 }
00288
00289
00291
00292 const Message *GetRequest() const
00293 {
00294 return pRequest;
00295 }
00296
00297
00299
00300 void SetLoadBalancer( const HostInfo &loadBalancer )
00301 {
00302 if( !loadBalancer.url.IsValid() )
00303 return;
00304 pLoadBalancer = loadBalancer;
00305 pHasLoadBalancer = true;
00306 }
00307
00308
00310
00311 void SetHostList( HostList *hostList )
00312 {
00313 delete pHosts;
00314 pHosts = hostList;
00315 }
00316
00317
00319
00320 void SetChunkList( ChunkList *chunkList )
00321 {
00322 pChunkList = chunkList;
00323 if( chunkList )
00324 pChunkStatus.resize( chunkList->size() );
00325 else
00326 pChunkStatus.clear();
00327 }
00328
00329
00331
00332 void SetRedirectCounter( uint16_t redirectCounter )
00333 {
00334 pRedirectCounter = redirectCounter;
00335 }
00336
00337 void SetFollowMetalink( bool followMetalink )
00338 {
00339 pFollowMetalink = followMetalink;
00340 }
00341
00342 void SetStateful( bool stateful )
00343 {
00344 pStateful = stateful;
00345 }
00346
00347 private:
00348
00350
00351 Status ReadRawRead( Message *msg,
00352 int socket,
00353 uint32_t &bytesRead );
00354
00355
00357
00358 Status ReadRawReadV( Message *msg,
00359 int socket,
00360 uint32_t &bytesRead );
00361
00362
00364
00365 Status ReadRawOther( Message *msg,
00366 int socket,
00367 uint32_t &bytesRead );
00368
00369
00372
00373 Status ReadAsync( int socket, uint32_t &btesRead );
00374
00375
00377
00378 void HandleError( Status status, Message *msg = 0 );
00379
00380
00382
00383 Status RetryAtServer( const URL &url );
00384
00385
00387
00388 void HandleResponse();
00389
00390
00392
00393 XRootDStatus *ProcessStatus();
00394
00395
00398
00399 Status ParseResponse( AnyObject *&response );
00400
00401
00404
00405 Status RewriteRequestRedirect( const URL &newUrl );
00406
00407
00409
00410 Status RewriteRequestWait();
00411
00412
00414
00415 Status PostProcessReadV( VectorReadInfo *vReadInfo );
00416
00417
00419
00420 Status UnPackReadVResponse( Message *msg );
00421
00422
00424
00425 void UpdateTriedCGI(uint32_t errNo=0);
00426
00427
00429
00430 void SwitchOnRefreshFlag();
00431
00432
00435
00436 void HandleRspOrQueue();
00437
00438
00440
00441 void HandleLocalRedirect( URL *url );
00442
00443
00448
00449 bool IsRetryable( Message *request );
00450
00451
00458
00459 bool OmitWait( Message *request, const URL &url );
00460
00461
00463
00464 void DumpRedirectTraceBack();
00465
00466
00467
00468
00469 struct ChunkStatus
00470 {
00471 ChunkStatus(): sizeError( false ), done( false ) {}
00472 bool sizeError;
00473 bool done;
00474 };
00475
00476 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
00477
00478 Message *pRequest;
00479 Message *pResponse;
00480 std::vector<Message *> pPartialResps;
00481 ResponseHandler *pResponseHandler;
00482 URL pUrl;
00483 PostMaster *pPostMaster;
00484 SIDManager *pSidMgr;
00485 LocalFileHandler *pLFileHandler;
00486 Status pStatus;
00487 Status pLastError;
00488 time_t pExpiration;
00489 bool pRedirectAsAnswer;
00490 HostList *pHosts;
00491 bool pHasLoadBalancer;
00492 HostInfo pLoadBalancer;
00493 bool pHasSessionId;
00494 std::string pRedirectUrl;
00495 ChunkList *pChunkList;
00496 std::vector<ChunkStatus> pChunkStatus;
00497 uint16_t pRedirectCounter;
00498
00499 uint32_t pAsyncOffset;
00500 uint32_t pAsyncReadSize;
00501 char* pAsyncReadBuffer;
00502 uint32_t pAsyncMsgSize;
00503
00504 bool pReadRawStarted;
00505 uint32_t pReadRawCurrentOffset;
00506
00507 uint32_t pReadVRawMsgOffset;
00508 bool pReadVRawChunkHeaderDone;
00509 bool pReadVRawChunkHeaderStarted;
00510 bool pReadVRawSizeError;
00511 int32_t pReadVRawChunkIndex;
00512 readahead_list pReadVRawChunkHeader;
00513 bool pReadVRawMsgDiscard;
00514
00515 bool pOtherRawStarted;
00516
00517 bool pFollowMetalink;
00518
00519 bool pStateful;
00520 int pAggregatedWaitTime;
00521
00522 std::unique_ptr<RedirectEntry> pRdirEntry;
00523 RedirectTraceBack pRedirectTraceBack;
00524
00525 bool pMsgInFly;
00526 };
00527 }
00528
00529 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__