00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
00026 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
00027
00028 #include "XrdCl/XrdClPostMasterInterfaces.hh"
00029 #include "XrdCl/XrdClXRootDResponses.hh"
00030 #include "XrdCl/XrdClDefaultEnv.hh"
00031 #include "XrdCl/XrdClMessage.hh"
00032 #include "XProtocol/XProtocol.hh"
00033 #include "XrdCl/XrdClLog.hh"
00034 #include "XrdCl/XrdClConstants.hh"
00035
00036 #include "XrdSys/XrdSysPthread.hh"
00037
00038 #include <sys/uio.h>
00039
00040 #include <list>
00041 #include <memory>
00042
00043 #if __cplusplus >= 201103L
00044 #include <atomic>
00045 #endif
00046
00047 namespace XrdCl
00048 {
00049 class PostMaster;
00050 class SIDManager;
00051 class URL;
00052 class LocalFileHandler;
00053
00054
00055
00056
00057 struct RedirectEntry
00058 {
00059 enum Type
00060 {
00061 EntryRedirect,
00062 EntryRedirectOnWait,
00063 EntryRetry,
00064 EntryWait
00065 };
00066
00067 RedirectEntry( const URL &from, const URL &to, Type type ) :
00068 from( from ), to( to ), type( type )
00069 {
00070
00071 }
00072
00073 URL from;
00074 URL to;
00075 Type type;
00076 XRootDStatus status;
00077
00078 std::string ToString( bool prevok = true )
00079 {
00080 const std::string tostr = to.GetLocation();
00081 const std::string fromstr = from.GetLocation();
00082
00083 if( prevok )
00084 {
00085 switch( type )
00086 {
00087 case EntryRedirect: return "Redirected from: " + fromstr + " to: "
00088 + tostr;
00089
00090 case EntryRedirectOnWait: return "Server responded with wait. "
00091 "Falling back to virtual redirector: " + tostr;
00092
00093 case EntryRetry: return "Retrying: " + tostr;
00094
00095 case EntryWait: return "Waited at server request. Resending: "
00096 + tostr;
00097 }
00098 }
00099 return "Failed at: " + fromstr + ", retrying at: " + tostr;
00100 }
00101 };
00102
00103
00105
00106 class XRootDMsgHandler: public IncomingMsgHandler,
00107 public OutgoingMsgHandler
00108 {
00109 friend class HandleRspJob;
00110
00111 public:
00112
00121
00122 XRootDMsgHandler( Message *msg,
00123 ResponseHandler *respHandler,
00124 const URL *url,
00125 std::shared_ptr<SIDManager> sidMgr,
00126 LocalFileHandler *lFileHandler):
00127 pRequest( msg ),
00128 pResponse( 0 ),
00129 pResponseHandler( respHandler ),
00130 pUrl( *url ),
00131 pEffectiveDataServerUrl( 0 ),
00132 pSidMgr( sidMgr ),
00133 pLFileHandler( lFileHandler ),
00134 pExpiration( 0 ),
00135 pRedirectAsAnswer( false ),
00136 pOksofarAsAnswer( false ),
00137 pHosts( 0 ),
00138 pHasLoadBalancer( false ),
00139 pHasSessionId( false ),
00140 pChunkList( 0 ),
00141 pRedirectCounter( 0 ),
00142 pNotAuthorizedCounter( 0 ),
00143
00144 pAsyncOffset( 0 ),
00145 pAsyncReadSize( 0 ),
00146 pAsyncReadBuffer( 0 ),
00147 pAsyncMsgSize( 0 ),
00148
00149 pReadRawStarted( false ),
00150 pReadRawCurrentOffset( 0 ),
00151
00152 pReadVRawMsgOffset( 0 ),
00153 pReadVRawChunkHeaderDone( false ),
00154 pReadVRawChunkHeaderStarted( false ),
00155 pReadVRawSizeError( false ),
00156 pReadVRawChunkIndex( 0 ),
00157 pReadVRawMsgDiscard( false ),
00158
00159 pOtherRawStarted( false ),
00160
00161 pFollowMetalink( false ),
00162
00163 pStateful( false ),
00164
00165 pAggregatedWaitTime( 0 ),
00166
00167 pMsgInFly( false ),
00168
00169 pTimeoutFence( false ),
00170
00171 pDirListStarted( false ),
00172 pDirListWithStat( false ),
00173
00174 pCV( 0 )
00175
00176 {
00177 pPostMaster = DefaultEnv::GetPostMaster();
00178 if( msg->GetSessionId() )
00179 pHasSessionId = true;
00180 memset( &pReadVRawChunkHeader, 0, sizeof( readahead_list ) );
00181
00182 Log *log = DefaultEnv::GetLog();
00183 log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
00184 pUrl.GetHostId().c_str(), this,
00185 pRequest->GetDescription().c_str() );
00186 }
00187
00188
00190
00191 ~XRootDMsgHandler()
00192 {
00193 DumpRedirectTraceBack();
00194
00195 if( !pHasSessionId )
00196 delete pRequest;
00197 delete pResponse;
00198 std::vector<Message *>::iterator it;
00199 for( it = pPartialResps.begin(); it != pPartialResps.end(); ++it )
00200 delete *it;
00201
00202 delete pEffectiveDataServerUrl;
00203
00204 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
00205 pResponse = reinterpret_cast<Message*>( 0xDEADBEEF );
00206 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
00207 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
00208 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
00209 pHosts = reinterpret_cast<HostList*>( 0xDEADBEEF );
00210 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
00211 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
00212
00213 Log *log = DefaultEnv::GetLog();
00214 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
00215 pUrl.GetHostId().c_str(), this );
00216 }
00217
00218
00224
00225 virtual uint16_t Examine( Message *msg );
00226
00227
00231
00232 virtual uint16_t GetSid() const;
00233
00234
00238
00239 virtual void Process( Message *msg );
00240
00241
00251
00252 virtual Status ReadMessageBody( Message *msg,
00253 int socket,
00254 uint32_t &bytesRead );
00255
00256
00262
00263 virtual uint8_t OnStreamEvent( StreamEvent event,
00264 uint16_t streamNum,
00265 Status status );
00266
00267
00269
00270 virtual void OnStatusReady( const Message *message,
00271 Status status );
00272
00273
00275
00276 virtual bool IsRaw() const;
00277
00278
00287
00288 Status WriteMessageBody( int socket,
00289 uint32_t &bytesRead );
00290
00291
00296
00297 ChunkList* GetMessageBody( uint32_t *&asyncOffset )
00298 {
00299 asyncOffset = &pAsyncOffset;
00300 return pChunkList;
00301 }
00302
00303
00307
00308 void WaitDone( time_t now );
00309
00310
00312
00313 void SetExpiration( time_t expiration )
00314 {
00315 pExpiration = expiration;
00316 }
00317
00318
00321
00322 void SetRedirectAsAnswer( bool redirectAsAnswer )
00323 {
00324 pRedirectAsAnswer = redirectAsAnswer;
00325 }
00326
00327
00330
00331 void SetOksofarAsAnswer( bool oksofarAsAnswer )
00332 {
00333 pOksofarAsAnswer = oksofarAsAnswer;
00334 }
00335
00336
00338
00339 const Message *GetRequest() const
00340 {
00341 return pRequest;
00342 }
00343
00344
00346
00347 void SetLoadBalancer( const HostInfo &loadBalancer )
00348 {
00349 if( !loadBalancer.url.IsValid() )
00350 return;
00351 pLoadBalancer = loadBalancer;
00352 pHasLoadBalancer = true;
00353 }
00354
00355
00357
00358 void SetHostList( HostList *hostList )
00359 {
00360 delete pHosts;
00361 pHosts = hostList;
00362 }
00363
00364
00366
00367 void SetChunkList( ChunkList *chunkList )
00368 {
00369 pChunkList = chunkList;
00370 if( chunkList )
00371 pChunkStatus.resize( chunkList->size() );
00372 else
00373 pChunkStatus.clear();
00374 }
00375
00376
00378
00379 void SetRedirectCounter( uint16_t redirectCounter )
00380 {
00381 pRedirectCounter = redirectCounter;
00382 }
00383
00384 void SetFollowMetalink( bool followMetalink )
00385 {
00386 pFollowMetalink = followMetalink;
00387 }
00388
00389 void SetStateful( bool stateful )
00390 {
00391 pStateful = stateful;
00392 }
00393
00394
00396
00397 void TakeDownTimeoutFence();
00398
00399 private:
00400
00402
00403 Status ReadRawRead( Message *msg,
00404 int socket,
00405 uint32_t &bytesRead );
00406
00407
00409
00410 Status ReadRawReadV( Message *msg,
00411 int socket,
00412 uint32_t &bytesRead );
00413
00414
00416
00417 Status ReadRawOther( Message *msg,
00418 int socket,
00419 uint32_t &bytesRead );
00420
00421
00424
00425 Status ReadAsync( int socket, uint32_t &btesRead );
00426
00427
00429
00430 void HandleError( Status status, Message *msg = 0 );
00431
00432
00434
00435 Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
00436
00437
00439
00440 void HandleResponse();
00441
00442
00444
00445 XRootDStatus *ProcessStatus();
00446
00447
00450
00451 Status ParseResponse( AnyObject *&response );
00452
00453
00456
00457 Status RewriteRequestRedirect( const URL &newUrl );
00458
00459
00461
00462 Status RewriteRequestWait();
00463
00464
00466
00467 Status PostProcessReadV( VectorReadInfo *vReadInfo );
00468
00469
00471
00472 Status UnPackReadVResponse( Message *msg );
00473
00474
00476
00477 void UpdateTriedCGI(uint32_t errNo=0);
00478
00479
00481
00482 void SwitchOnRefreshFlag();
00483
00484
00487
00488 void HandleRspOrQueue();
00489
00490
00492
00493 void HandleLocalRedirect( URL *url );
00494
00495
00500
00501 bool IsRetriable( Message *request );
00502
00503
00510
00511 bool OmitWait( Message *request, const URL &url );
00512
00513
00519
00520 bool RetriableErrorResponse( const Status &status );
00521
00522
00524
00525 void DumpRedirectTraceBack();
00526
00527
00528
00529
00530 struct ChunkStatus
00531 {
00532 ChunkStatus(): sizeError( false ), done( false ) {}
00533 bool sizeError;
00534 bool done;
00535 };
00536
00537 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
00538
00539 Message *pRequest;
00540 Message *pResponse;
00541 std::vector<Message *> pPartialResps;
00542 ResponseHandler *pResponseHandler;
00543 URL pUrl;
00544 URL *pEffectiveDataServerUrl;
00545 PostMaster *pPostMaster;
00546 std::shared_ptr<SIDManager> pSidMgr;
00547 LocalFileHandler *pLFileHandler;
00548 Status pStatus;
00549 Status pLastError;
00550 time_t pExpiration;
00551 bool pRedirectAsAnswer;
00552 bool pOksofarAsAnswer;
00553 HostList *pHosts;
00554 bool pHasLoadBalancer;
00555 HostInfo pLoadBalancer;
00556 bool pHasSessionId;
00557 std::string pRedirectUrl;
00558 ChunkList *pChunkList;
00559 std::vector<ChunkStatus> pChunkStatus;
00560 uint16_t pRedirectCounter;
00561 uint16_t pNotAuthorizedCounter;
00562
00563 uint32_t pAsyncOffset;
00564 uint32_t pAsyncReadSize;
00565 char* pAsyncReadBuffer;
00566 uint32_t pAsyncMsgSize;
00567
00568 bool pReadRawStarted;
00569 uint32_t pReadRawCurrentOffset;
00570
00571 uint32_t pReadVRawMsgOffset;
00572 bool pReadVRawChunkHeaderDone;
00573 bool pReadVRawChunkHeaderStarted;
00574 bool pReadVRawSizeError;
00575 int32_t pReadVRawChunkIndex;
00576 readahead_list pReadVRawChunkHeader;
00577 bool pReadVRawMsgDiscard;
00578
00579 bool pOtherRawStarted;
00580
00581 bool pFollowMetalink;
00582
00583 bool pStateful;
00584 int pAggregatedWaitTime;
00585
00586 std::unique_ptr<RedirectEntry> pRdirEntry;
00587 RedirectTraceBack pRedirectTraceBack;
00588
00589 bool pMsgInFly;
00590
00591
00592
00593
00594
00595
00596 #if __cplusplus >= 201103L
00597 std::atomic<bool> pTimeoutFence;
00598 #else
00599 bool pTimeoutFence;
00600 #endif
00601
00602
00603
00604
00605
00606
00607 bool pDirListStarted;
00608 bool pDirListWithStat;
00609
00610
00611
00612
00613
00614 XrdSysCondVar pCV;
00615 };
00616 }
00617
00618 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__