xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 #include "XrdCl/XrdClUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdNet/XrdNetAddr.hh"
33 #include <list>
34 #include <vector>
35 
36 namespace XrdCl
37 {
38  class Message;
39  class Channel;
40  class TransportHandler;
41  class TaskManager;
42  struct SubStreamData;
43 
44  //----------------------------------------------------------------------------
46  //----------------------------------------------------------------------------
47  class Stream
48  {
49  public:
50  //------------------------------------------------------------------------
52  //------------------------------------------------------------------------
54  {
56  Connected = 1,
57  Connecting = 2,
58  Error = 3
59  };
60 
61  //------------------------------------------------------------------------
63  //------------------------------------------------------------------------
64  Stream( const URL *url, uint16_t streamNum );
65 
66  //------------------------------------------------------------------------
68  //------------------------------------------------------------------------
69  ~Stream();
70 
71  //------------------------------------------------------------------------
73  //------------------------------------------------------------------------
75 
76  //------------------------------------------------------------------------
78  //------------------------------------------------------------------------
79  Status Send( Message *msg,
80  OutgoingMsgHandler *handler,
81  bool stateful,
82  time_t expires );
83 
84  //------------------------------------------------------------------------
86  //------------------------------------------------------------------------
87  void SetTransport( TransportHandler *transport )
88  {
89  pTransport = transport;
90  }
91 
92  //------------------------------------------------------------------------
94  //------------------------------------------------------------------------
95  void SetPoller( Poller *poller )
96  {
97  pPoller = poller;
98  }
99 
100  //------------------------------------------------------------------------
102  //------------------------------------------------------------------------
103  void SetIncomingQueue( InQueue *incomingQueue )
104  {
105  pIncomingQueue = incomingQueue;
106  delete pQueueIncMsgJob;
107  pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue );
108  }
109 
110  //------------------------------------------------------------------------
112  //------------------------------------------------------------------------
113  void SetChannelData( AnyObject *channelData )
114  {
115  pChannelData = channelData;
116  }
117 
118  //------------------------------------------------------------------------
120  //------------------------------------------------------------------------
121  void SetTaskManager( TaskManager *taskManager )
122  {
123  pTaskManager = taskManager;
124  }
125 
126  //------------------------------------------------------------------------
128  //------------------------------------------------------------------------
129  void SetJobManager( JobManager *jobManager )
130  {
131  pJobManager = jobManager;
132  }
133 
134  //------------------------------------------------------------------------
138  //------------------------------------------------------------------------
139  Status EnableLink( PathID &path );
140 
141  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
144  void Disconnect( bool force = false );
145 
146  //------------------------------------------------------------------------
149  //------------------------------------------------------------------------
150  void Tick( time_t now );
151 
152  //------------------------------------------------------------------------
154  //------------------------------------------------------------------------
155  const URL *GetURL() const
156  {
157  return pUrl;
158  }
159 
160  //------------------------------------------------------------------------
162  //------------------------------------------------------------------------
163  uint16_t GetStreamNumber() const
164  {
165  return pStreamNum;
166  }
167 
168  //------------------------------------------------------------------------
170  //------------------------------------------------------------------------
171  void ForceConnect();
172 
173  //------------------------------------------------------------------------
175  //------------------------------------------------------------------------
176  const std::string &GetName() const
177  {
178  return pStreamName;
179  }
180 
181  //------------------------------------------------------------------------
183  //------------------------------------------------------------------------
184  void DisableIfEmpty( uint16_t subStream );
185 
186  //------------------------------------------------------------------------
188  //------------------------------------------------------------------------
189  void OnIncoming( uint16_t subStream,
190  Message *msg,
191  uint32_t bytesReceived );
192 
193  //------------------------------------------------------------------------
194  // Call when one of the sockets is ready to accept a new message
195  //------------------------------------------------------------------------
196  std::pair<Message *, OutgoingMsgHandler *>
197  OnReadyToWrite( uint16_t subStream );
198 
199  //------------------------------------------------------------------------
200  // Call when a message is written to the socket
201  //------------------------------------------------------------------------
202  void OnMessageSent( uint16_t subStream,
203  Message *msg,
204  uint32_t bytesSent );
205 
206  //------------------------------------------------------------------------
208  //------------------------------------------------------------------------
209  void OnConnect( uint16_t subStream );
210 
211  //------------------------------------------------------------------------
213  //------------------------------------------------------------------------
214  void OnConnectError( uint16_t subStream, Status status );
215 
216  //------------------------------------------------------------------------
218  //------------------------------------------------------------------------
219  void OnError( uint16_t subStream, Status status );
220 
221  //------------------------------------------------------------------------
223  //------------------------------------------------------------------------
224  void ForceError( Status status );
225 
226  //------------------------------------------------------------------------
228  //------------------------------------------------------------------------
229  void OnReadTimeout( uint16_t subStream, bool &isBroken );
230 
231  //------------------------------------------------------------------------
233  //------------------------------------------------------------------------
234  void OnWriteTimeout( uint16_t subStream );
235 
236  //------------------------------------------------------------------------
238  //------------------------------------------------------------------------
239  void RegisterEventHandler( ChannelEventHandler *handler );
240 
241  //------------------------------------------------------------------------
243  //------------------------------------------------------------------------
244  void RemoveEventHandler( ChannelEventHandler *handler );
245 
246  //------------------------------------------------------------------------
255  //------------------------------------------------------------------------
256  std::pair<IncomingMsgHandler *, bool>
257  InstallIncHandler( Message *msg, uint16_t stream );
258 
259  private:
260 
261  //------------------------------------------------------------------------
262  // Job queuing the incoming messages
263  //------------------------------------------------------------------------
264  class QueueIncMsgJob: public Job
265  {
266  public:
267  QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {};
268  virtual ~QueueIncMsgJob() {};
269  virtual void Run( void *arg )
270  {
271  Message *msg = (Message *)arg;
272  pQueue->AddMessage( msg );
273  }
274  private:
276  };
277 
278  //------------------------------------------------------------------------
279  // Job handling the incoming messages
280  //------------------------------------------------------------------------
281  class HandleIncMsgJob: public Job
282  {
283  public:
284  HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {};
285  virtual ~HandleIncMsgJob() {};
286  virtual void Run( void *arg )
287  {
288  Message *msg = (Message *)arg;
289  pHandler->Process( msg );
290  delete this;
291  }
292  private:
294  };
295 
296  //------------------------------------------------------------------------
298  //------------------------------------------------------------------------
299  void OnFatalError( uint16_t subStream,
300  Status status,
301  XrdSysMutexHelper &lock );
302 
303  //------------------------------------------------------------------------
305  //------------------------------------------------------------------------
306  void MonitorDisconnection( Status status );
307 
308  //------------------------------------------------------------------------
310  //------------------------------------------------------------------------
311  Status RequestClose( Message *resp );
312 
313  typedef std::vector<SubStreamData*> SubStreamList;
314 
315  //------------------------------------------------------------------------
316  // Data members
317  //------------------------------------------------------------------------
318  const URL *pUrl;
319  uint16_t pStreamNum;
320  std::string pStreamName;
336  std::vector<XrdNetAddr> pAddresses;
339  uint64_t pSessionId;
340 
341  //------------------------------------------------------------------------
342  // Jobs
343  //------------------------------------------------------------------------
345 
346  //------------------------------------------------------------------------
347  // Monitoring info
348  //------------------------------------------------------------------------
351  uint64_t pBytesSent;
352  uint64_t pBytesReceived;
353  };
354 }
355 
356 #endif // __XRD_CL_STREAM_HH__
std::pair< Message *, OutgoingMsgHandler * > OnReadyToWrite(uint16_t subStream)
A synchronized queue.
Definition: XrdClJobManager.hh:50
Definition: XrdClStream.hh:281
Definition: XrdClAnyObject.hh:32
uint32_t pLastStreamError
Definition: XrdClStream.hh:328
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:336
Status Initialize()
Initializer.
void OnReadTimeout(uint16_t subStream, bool &isBroken)
On read timeout.
Definition: XrdSysPthread.hh:239
Interface for socket pollers.
Definition: XrdClPoller.hh:86
Definition: XrdClStream.hh:264
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:286
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void OnWriteTimeout(uint16_t subStream)
On write timeout.
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
void OnError(uint16_t subStream, Status status)
On error.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
timeval pConnectionDone
Definition: XrdClStream.hh:350
IncomingMsgHandler * pHandler
Definition: XrdClStream.hh:293
uint16_t GetStreamNumber() const
Get the stream number.
Definition: XrdClStream.hh:163
QueueIncMsgJob(InQueue *queue)
Definition: XrdClStream.hh:267
Definition: XrdClPostMasterInterfaces.hh:282
InQueue * pIncomingQueue
Definition: XrdClStream.hh:326
Status RequestClose(Message *resp)
Send close after an open request timed out.
In the process of being connected.
Definition: XrdClStream.hh:57
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:103
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:338
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:95
Stream(const URL *url, uint16_t streamNum)
Constructor.
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:33
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:313
void ForceError(Status status)
Force error.
Procedure execution status.
Definition: XrdClStatus.hh:109
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:121
uint64_t pSessionId
Definition: XrdClStream.hh:339
uint16_t pConnectionWindow
Definition: XrdClStream.hh:334
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:87
AddressType
Address type.
Definition: XrdClUtils.hh:93
uint16_t pConnectionCount
Definition: XrdClStream.hh:331
Broken.
Definition: XrdClStream.hh:58
AnyObject * pChannelData
Definition: XrdClStream.hh:327
void MonitorDisconnection(Status status)
Inform the monitoring about disconnection.
Status pLastFatalError
Definition: XrdClStream.hh:329
HandleIncMsgJob(IncomingMsgHandler *handler)
Definition: XrdClStream.hh:284
TransportHandler * pTransport
Definition: XrdClStream.hh:321
TaskManager * pTaskManager
Definition: XrdClStream.hh:323
uint16_t pConnectionRetry
Definition: XrdClStream.hh:332
timeval pConnectionStarted
Definition: XrdClStream.hh:349
uint16_t pStreamNum
Definition: XrdClStream.hh:319
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:325
void OnIncoming(uint16_t subStream, Message *msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
QueueIncMsgJob * pQueueIncMsgJob
Definition: XrdClStream.hh:344
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:302
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:285
JobManager * pJobManager
Definition: XrdClStream.hh:324
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:129
Status Send(Message *msg, OutgoingMsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:220
Poller * pPoller
Definition: XrdClStream.hh:322
Message handler.
Definition: XrdClPostMasterInterfaces.hh:68
uint64_t pBytesSent
Definition: XrdClStream.hh:351
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:35
Utils::AddressType pAddressType
Definition: XrdClStream.hh:337
~Stream()
Destructor.
void Disconnect(bool force=false)
Disconnect the stream.
Connected.
Definition: XrdClStream.hh:56
std::pair< IncomingMsgHandler *, bool > InstallIncHandler(Message *msg, uint16_t stream)
void ForceConnect()
Force connection.
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:113
uint64_t pBytesReceived
Definition: XrdClStream.hh:352
SubStreamList pSubStreams
Definition: XrdClStream.hh:335
void OnConnectError(uint16_t subStream, Status status)
On connect error.
URL representation.
Definition: XrdClURL.hh:30
bool AddMessage(Message *msg)
Add a fully reconstructed message to the queue.
virtual void Process(Message *msg)
Definition: XrdClPostMasterInterfaces.hh:126
Status EnableLink(PathID &path)
std::string pStreamName
Definition: XrdClStream.hh:320
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
virtual ~QueueIncMsgJob()
Definition: XrdClStream.hh:268
Message status handler.
Definition: XrdClPostMasterInterfaces.hh:167
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
Stream.
Definition: XrdClStream.hh:47
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:155
void Tick(time_t now)
InQueue * pQueue
Definition: XrdClStream.hh:275
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:176
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
const URL * pUrl
Definition: XrdClStream.hh:318
void OnFatalError(uint16_t subStream, Status status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
time_t pConnectionInitTime
Definition: XrdClStream.hh:333
virtual void Run(void *arg)
The job logic.
Definition: XrdClStream.hh:269
Definition: XrdClTaskManager.hh:75
Definition: XrdSysPthread.hh:260
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:53
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:330
Not connected.
Definition: XrdClStream.hh:55