XRootD
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor. More...
 
 ~Stream ()
 Destructor. More...
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty. More...
 
void Disconnect (bool force=false)
 Disconnect the stream. More...
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection. More...
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error. More...
 
const std::string & GetName () const
 Return stream name. More...
 
const URLGetURL () const
 Get the URL. More...
 
XRootDStatus Initialize ()
 Initializer. More...
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed. More...
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error. More...
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error. More...
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed. More...
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout. More...
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout. More...
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream. More...
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler. More...
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending. More...
 
void SetChannelData (AnyObject *channelData)
 Set the channel data. More...
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue. More...
 
void SetJobManager (JobManager *jobManager)
 Set job manager. More...
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams. More...
 
void SetPoller (Poller *poller)
 Set the poller. More...
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager. More...
 
void SetTransport (TransportHandler *transport)
 Set the transport. More...
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58  {
59  Disconnected = 0,
60  Connected = 1,
61  Connecting = 2,
62  Error = 3
63  };
@ Disconnected
Not connected.
Definition: XrdClStream.hh:59
@ Error
Broken.
Definition: XrdClStream.hh:62
@ Connected
Connected.
Definition: XrdClStream.hh:60
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:61

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 96 of file XrdClStream.cc.

96  :
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pBytesSent( 0 ),
111  pBytesReceived( 0 )
112  {
113  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115 
116  std::ostringstream o;
117  o << pUrl->GetHostId();
118  pStreamName = o.str();
119 
120  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126 
127  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129 
130  pAddressType = Utils::String2AddressType( netStack );
131  if( pAddressType == Utils::AddressType::IPAuto )
132  {
133  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
134  if( !( stacks & XrdNetUtils::hasIP64 ) )
135  {
136  if( stacks & XrdNetUtils::hasIPv4 )
137  pAddressType = Utils::AddressType::IPv4;
138  else if( stacks & XrdNetUtils::hasIPv6 )
139  pAddressType = Utils::AddressType::IPv6;
140  }
141  }
142 
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146  "Window: %d", pStreamName.c_str(), netStack.c_str(),
147  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148  }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:716
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154  {
155  Disconnect( true );
156 
157  Log *log = DefaultEnv::GetLog();
158  log->Debug( PostMasterMsg, "[%s] Destroying stream",
159  pStreamName.c_str() );
160 
161  MonitorDisconnection( XRootDStatus() );
162 
163  SubStreamList::iterator it;
164  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165  delete *it;
166  }
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:363

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1159 of file XrdClStream.cc.

1160  {
1161  Log *log = DefaultEnv::GetLog();
1162 
1163  //--------------------------------------------------------------------------
1164  // Resolve all the addresses of the host we're supposed to connect to
1165  //--------------------------------------------------------------------------
1166  std::vector<XrdNetAddr> prefaddrs;
1167  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1168  if( !st.IsOK() )
1169  {
1170  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1171  , pStreamName.c_str(), url.GetHostName().c_str() );
1172  return false;
1173  }
1174 
1175  //--------------------------------------------------------------------------
1176  // Resolve all the addresses of the alias
1177  //--------------------------------------------------------------------------
1178  std::vector<XrdNetAddr> aliasaddrs;
1179  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1180  if( !st.IsOK() )
1181  {
1182  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1183  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1184  return false;
1185  }
1186 
1187  //--------------------------------------------------------------------------
1188  // Now check if the preferred host is part of the alias
1189  //--------------------------------------------------------------------------
1190  auto itr = prefaddrs.begin();
1191  for( ; itr != prefaddrs.end() ; ++itr )
1192  {
1193  auto itr2 = aliasaddrs.begin();
1194  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1195  if( itr->Same( &*itr2 ) ) return true;
1196  }
1197 
1198  return false;
1199  }
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 585 of file XrdClStream.cc.

586  {
587  XrdSysMutexHelper scopedLock( pMutex );
588  Log *log = DefaultEnv::GetLog();
589 
590  if( pSubStreams[subStream]->outQueue->IsEmpty() )
591  {
592  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593  pSubStreams[subStream]->socket->GetStreamName().c_str() );
594  pSubStreams[subStream]->socket->DisableUplink();
595  }
596  }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool  force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364  {
365  XrdSysMutexHelper scopedLock( pMutex );
366  SubStreamList::iterator it;
367  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368  {
369  (*it)->socket->Close();
370  (*it)->status = Socket::Disconnected;
371  }
372  }
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188  {
189  XrdSysMutexHelper scopedLock( pMutex );
190 
191  //--------------------------------------------------------------------------
192  // We are in the process of connecting the main stream, so we do nothing
193  // because when the main stream connection is established it will connect
194  // all the other streams
195  //--------------------------------------------------------------------------
196  if( pSubStreams[0]->status == Socket::Connecting )
197  return XRootDStatus();
198 
199  //--------------------------------------------------------------------------
200  // The main stream is connected, so we can verify whether we have
201  // the up and the down stream connected and ready to handle data.
202  // If anything is not right we fall back to stream 0.
203  //--------------------------------------------------------------------------
204  if( pSubStreams[0]->status == Socket::Connected )
205  {
206  if( pSubStreams[path.down]->status != Socket::Connected )
207  path.down = 0;
208 
209  if( pSubStreams[path.up]->status == Socket::Disconnected )
210  {
211  path.up = 0;
212  return pSubStreams[0]->socket->EnableUplink();
213  }
214 
215  if( pSubStreams[path.up]->status == Socket::Connected )
216  return pSubStreams[path.up]->socket->EnableUplink();
217 
218  return XRootDStatus();
219  }
220 
221  //--------------------------------------------------------------------------
222  // The main stream is not connected, we need to check whether enough time
223  // has passed since we last encountered an error (if any) so that we could
224  // re-attempt the connection
225  //--------------------------------------------------------------------------
226  Log *log = DefaultEnv::GetLog();
227  time_t now = ::time(0);
228 
229  if( now-pLastStreamError < pStreamErrorWindow )
230  return pLastFatalError;
231 
232  gettimeofday( &pConnectionStarted, 0 );
233  ++pConnectionCount;
234 
235  //--------------------------------------------------------------------------
236  // Resolve all the addresses of the host we're supposed to connect to
237  //--------------------------------------------------------------------------
238  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239  if( !st.IsOK() )
240  {
241  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242  "the host", pStreamName.c_str() );
243  pLastStreamError = now;
244  st.status = stFatal;
245  pLastFatalError = st;
246  return st;
247  }
248 
249  if( pPrefer.IsValid() )
250  {
251  std::vector<XrdNetAddr> addrresses;
252  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253  if( !st.IsOK() )
254  {
255  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257  }
258  else
259  {
260  std::vector<XrdNetAddr> tmp;
261  tmp.reserve( pAddresses.size() );
262  // first add all remaining addresses
263  auto itr = pAddresses.begin();
264  for( ; itr != pAddresses.end() ; ++itr )
265  {
266  if( !HasNetAddr( *itr, addrresses ) )
267  tmp.push_back( *itr );
268  }
269  // then copy all 'preferred' addresses
270  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271  // and keep the result
272  pAddresses.swap( tmp );
273  }
274  }
275 
277  pAddresses );
278 
279  while( !pAddresses.empty() )
280  {
281  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282  pAddresses.pop_back();
283  pConnectionInitTime = ::time( 0 );
284  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285  if( st.IsOK() )
286  {
287  pSubStreams[0]->status = Socket::Connecting;
288  break;
289  }
290  }
291  return st;
292  }
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348  {
349  XrdSysMutexHelper scopedLock( pMutex );
350  if( pSubStreams[0]->status == Socket::Connecting )
351  {
352  pSubStreams[0]->status = Socket::Disconnected;
353  XrdCl::PathID path( 0, 0 );
354  XrdCl::XRootDStatus st = EnableLink( path );
355  if( !st.IsOK() )
356  OnConnectError( 0, st );
357  }
358  }
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:187
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:728
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status,
bool  hush = false 
)

Force error.

Definition at line 914 of file XrdClStream.cc.

915  {
916  XrdSysMutexHelper scopedLock( pMutex );
917  Log *log = DefaultEnv::GetLog();
918  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
919  {
920  if( pSubStreams[substream]->status != Socket::Connected ) continue;
921  pSubStreams[substream]->socket->Close();
922  pSubStreams[substream]->status = Socket::Disconnected;
923 
924  if( !hush )
925  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
926  pStreamName.c_str(), status.ToString().c_str() );
927 
928  //--------------------------------------------------------------------
929  // Reinsert the stuff that we have failed to sent
930  //--------------------------------------------------------------------
931  Reinsert( substream );
932  }
933 
934  pConnectionCount = 0;
935 
936  //------------------------------------------------------------------------
937  // We're done here, unlock the stream mutex to avoid deadlocks and
938  // report the disconnection event to the handlers
939  //------------------------------------------------------------------------
940  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
941  "message handlers.", pStreamName.c_str() );
942 
943  SubStreamList::iterator it;
944  OutQueue q;
945  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
946  q.GrabItems( *(*it)->outQueue );
947  scopedLock.UnLock();
948 
949  q.Report( status );
950 
951  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
952  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
953  }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string& XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171  {
172  return pStreamName;
173  }

◆ GetURL()

const URL* XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158  {
159  return pUrl;
160  }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172  {
173  if( !pTransport || !pPoller || !pChannelData )
174  return XRootDStatus( stError, errUninitialized );
175 
176  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177  pChannelData, 0, this );
178  pSubStreams.push_back( new SubStreamData() );
179  pSubStreams[0]->socket = s;
180  return XRootDStatus();
181  }
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1128 of file XrdClStream.cc.

1130  {
1131  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1132  if( !mh.handler )
1134 
1135  uint16_t action = mh.handler->InspectStatusRsp();
1136  mh.action |= action;
1137 
1138  if( action & MsgHandler::RemoveHandler )
1139  pIncomingQueue->RemoveMessageHandler( mh.handler );
1140 
1141  if( action & MsgHandler::Raw )
1142  {
1143  incHandler = mh.handler;
1144  return MsgHandler::Raw;
1145  }
1146 
1147  if( action & MsgHandler::Corrupted )
1148  return MsgHandler::Corrupted;
1149 
1150  if( action & MsgHandler::More )
1151  return MsgHandler::More;
1152 
1153  return MsgHandler::None;
1154  }
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1107 of file XrdClStream.cc.

1108  {
1109  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1110  if( !mh.handler )
1111  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1112  mh.expires,
1113  mh.action );
1114 
1115  if( !mh.handler )
1116  return nullptr;
1117 
1118  if( mh.action & MsgHandler::Raw )
1119  return mh.handler;
1120  return nullptr;
1121  }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 623 of file XrdClStream.cc.

624  {
625  XrdSysMutexHelper scopedLock( pMutex );
626  pSubStreams[subStream]->status = Socket::Connected;
627 
628  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629  Log *log = DefaultEnv::GetLog();
630  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631  subStream, ipstack.c_str() );
632 
633  if( subStream == 0 )
634  {
635  pLastStreamError = 0;
636  pLastFatalError = XRootDStatus();
637  pConnectionCount = 0;
638  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639  pSessionId = ++sSessCntGen;
640 
641  //------------------------------------------------------------------------
642  // Create the streams if they don't exist yet
643  //------------------------------------------------------------------------
644  if( pSubStreams.size() == 1 && numSub > 1 )
645  {
646  for( uint16_t i = 1; i < numSub; ++i )
647  {
648  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650  pChannelData, i, this );
651  pSubStreams.push_back( new SubStreamData() );
652  pSubStreams[i]->socket = s;
653  }
654  }
655 
656  //------------------------------------------------------------------------
657  // Connect the extra streams, if we fail we move all the outgoing items
658  // to stream 0, we don't need to enable the uplink here, because it
659  // should be already enabled after the handshaking process is completed.
660  //------------------------------------------------------------------------
661  if( pSubStreams.size() > 1 )
662  {
663  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664  pStreamName.c_str(), pSubStreams.size() - 1 );
665  for( size_t i = 1; i < pSubStreams.size(); ++i )
666  {
667  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669  if( !st.IsOK() )
670  {
671  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672  pSubStreams[i]->socket->Close();
673  }
674  else
675  {
676  pSubStreams[i]->status = Socket::Connecting;
677  }
678  }
679  }
680 
681  //------------------------------------------------------------------------
682  // Inform monitoring
683  //------------------------------------------------------------------------
684  pBytesSent = 0;
685  pBytesReceived = 0;
686  gettimeofday( &pConnectionDone, 0 );
687  Monitor *mon = DefaultEnv::GetMonitor();
688  if( mon )
689  {
690  Monitor::ConnectInfo i;
691  i.server = pUrl->GetHostId();
692  i.sTOD = pConnectionStarted;
693  i.eTOD = pConnectionDone;
694  i.streams = pSubStreams.size();
695 
696  AnyObject qryResult;
697  std::string *qryResponse = nullptr;
698  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699  qryResult.Get( qryResponse );
700 
701  if (qryResponse) {
702  i.auth = *qryResponse;
703  delete qryResponse;
704  } else {
705  i.auth = "";
706  }
707 
708  mon->Event( Monitor::EvConnect, &i );
709  }
710 
711  //------------------------------------------------------------------------
712  // For every connected control-stream call the global on-connect handler
713  //------------------------------------------------------------------------
715  }
716  else if( pOnDataConnJob )
717  {
718  //------------------------------------------------------------------------
719  // For every connected data-stream call the on-connect handler
720  //------------------------------------------------------------------------
721  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
722  }
723  }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 728 of file XrdClStream.cc.

729  {
730  XrdSysMutexHelper scopedLock( pMutex );
731  Log *log = DefaultEnv::GetLog();
732  pSubStreams[subStream]->socket->Close();
733  time_t now = ::time(0);
734 
735  //--------------------------------------------------------------------------
736  // For every connection error call the global connection error handler
737  //--------------------------------------------------------------------------
739 
740  //--------------------------------------------------------------------------
741  // If we connected subStream == 0 and cannot connect >0 then we just give
742  // up and move the outgoing messages to another queue
743  //--------------------------------------------------------------------------
744  if( subStream > 0 )
745  {
746  pSubStreams[subStream]->status = Socket::Disconnected;
747  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
748  if( pSubStreams[0]->status == Socket::Connected )
749  {
750  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
751  if( !st.IsOK() )
752  OnFatalError( 0, st, scopedLock );
753  return;
754  }
755 
756  if( pSubStreams[0]->status == Socket::Connecting )
757  return;
758 
759  OnFatalError( subStream, status, scopedLock );
760  return;
761  }
762 
763  //--------------------------------------------------------------------------
764  // Check if we still have time to try and do something in the current window
765  //--------------------------------------------------------------------------
766  time_t elapsed = now-pConnectionInitTime;
767  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
768  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
769 
770  //------------------------------------------------------------------------
771  // If we have some IP addresses left we try them
772  //------------------------------------------------------------------------
773  if( !pAddresses.empty() )
774  {
775  XRootDStatus st;
776  do
777  {
778  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
779  pAddresses.pop_back();
780  pConnectionInitTime = ::time( 0 );
781  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
782  }
783  while( !pAddresses.empty() && !st.IsOK() );
784 
785  if( !st.IsOK() )
786  OnFatalError( subStream, st, scopedLock );
787 
788  return;
789  }
790  //------------------------------------------------------------------------
791  // If we still can retry with the same host name, we sleep until the end
792  // of the connection window and try
793  //------------------------------------------------------------------------
794  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
795  && !status.IsFatal() )
796  {
797  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
798  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
799 
800  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
801  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
802  return;
803  }
804  //--------------------------------------------------------------------------
805  // We are out of the connection window, the only thing we can do here
806  // is re-resolving the host name and retrying if we still can
807  //--------------------------------------------------------------------------
808  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
809  {
810  pAddresses.clear();
811  pSubStreams[0]->status = Socket::Disconnected;
812  PathID path( 0, 0 );
813  XRootDStatus st = EnableLink( path );
814  if( !st.IsOK() )
815  OnFatalError( subStream, st, scopedLock );
816  return;
817  }
818 
819  //--------------------------------------------------------------------------
820  // Else, we fail
821  //--------------------------------------------------------------------------
822  OnFatalError( subStream, status, scopedLock );
823  }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 828 of file XrdClStream.cc.

829  {
830  XrdSysMutexHelper scopedLock( pMutex );
831  Log *log = DefaultEnv::GetLog();
832  pSubStreams[subStream]->socket->Close();
833  pSubStreams[subStream]->status = Socket::Disconnected;
834 
835  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
836  pStreamName.c_str(), subStream, status.ToString().c_str() );
837 
838  //--------------------------------------------------------------------------
839  // Reinsert the stuff that we have failed to sent
840  //--------------------------------------------------------------------------
841  Reinsert( subStream );
842 
843  //--------------------------------------------------------------------------
844  // We are dealing with an error of a peripheral stream. If we don't have
845  // anything to send don't bother recovering. Otherwise move the requests
846  // to stream 0 if possible.
847  //--------------------------------------------------------------------------
848  if( subStream > 0 )
849  {
850  if( pSubStreams[subStream]->outQueue->IsEmpty() )
851  return;
852 
853  if( pSubStreams[0]->status != Socket::Disconnected )
854  {
855  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
856  if( pSubStreams[0]->status == Socket::Connected )
857  {
858  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
859  if( !st.IsOK() )
860  OnFatalError( 0, st, scopedLock );
861  return;
862  }
863  }
864  OnFatalError( subStream, status, scopedLock );
865  return;
866  }
867 
868  //--------------------------------------------------------------------------
869  // If we lost the stream 0 we have lost the session, we re-enable the
870  // stream if we still have things in one of the outgoing queues, otherwise
871  // there is not point to recover at this point.
872  //--------------------------------------------------------------------------
873  if( subStream == 0 )
874  {
875  MonitorDisconnection( status );
876 
877  SubStreamList::iterator it;
878  size_t outstanding = 0;
879  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
880  outstanding += (*it)->outQueue->GetSizeStateless();
881 
882  if( outstanding )
883  {
884  PathID path( 0, 0 );
885  XRootDStatus st = EnableLink( path );
886  if( !st.IsOK() )
887  {
888  OnFatalError( 0, st, scopedLock );
889  return;
890  }
891  }
892 
893  //------------------------------------------------------------------------
894  // We're done here, unlock the stream mutex to avoid deadlocks and
895  // report the disconnection event to the handlers
896  //------------------------------------------------------------------------
897  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
898  "message handlers.", pStreamName.c_str() );
899  OutQueue q;
900  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
901  q.GrabStateful( *(*it)->outQueue );
902  scopedLock.UnLock();
903 
904  q.Report( status );
905  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
906  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
907  return;
908  }
909  }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::Status::IsOK(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474  {
475  msg->SetSessionId( pSessionId );
476  pBytesReceived += bytesReceived;
477 
478  MsgHandler *handler = nullptr;
479  uint16_t action = 0;
480  {
481  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482  handler = mh.handler;
483  action = mh.action;
484  mh.Reset();
485  }
486 
487  if( !IsPartial( *msg ) )
488  {
489  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490  *pChannelData );
491  if( streamAction & TransportHandler::DigestMsg )
492  return;
493 
494  if( streamAction & TransportHandler::RequestClose )
495  {
496  RequestClose( *msg );
497  return;
498  }
499  }
500 
501  Log *log = DefaultEnv::GetLog();
502 
503  //--------------------------------------------------------------------------
504  // No handler, we discard the message ...
505  //--------------------------------------------------------------------------
506  if( !handler )
507  {
508  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511  pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
512  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513  return;
514  }
515 
516  //--------------------------------------------------------------------------
517  // We have a handler, so we call the callback
518  //--------------------------------------------------------------------------
519  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520  pStreamName.c_str(), (void*)msg.get() );
521 
523  {
524  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526 
527  // if we are handling partial response we have to take down the timeout fence
528  if( IsPartial( *msg ) )
529  {
530  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531  if( xrdHandler ) xrdHandler->PartialReceived();
532  }
533 
534  return;
535  }
536 
537  Job *job = new HandleIncMsgJob( handler );
538  pJobManager->QueueJob( job );
539  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 601 of file XrdClStream.cc.

604  {
605  pTransport->MessageSent( msg, subStream, bytesSent,
606  *pChannelData );
607  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608  pBytesSent += bytesSent;
609  if( h.handler )
610  {
611  // ensure expiration time is assigned if still in queue
612  pIncomingQueue->AssignTimeout( h.handler );
613  // OnStatusReady may cause the handler to delete itself, in
614  // which case the handler or the user callback may also delete msg
615  h.handler->OnStatusReady( msg, XRootDStatus() );
616  }
617  pSubStreams[subStream]->outMsgHelper.Reset();
618  }
void AssignTimeout(MsgHandler *handler)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AssignTimeout(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), and XrdCl::MsgHandler::OnStatusReady().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1012 of file XrdClStream.cc.

1013  {
1014  //--------------------------------------------------------------------------
1015  // We only take the main stream into account
1016  //--------------------------------------------------------------------------
1017  if( substream != 0 )
1018  return true;
1019 
1020  //--------------------------------------------------------------------------
1021  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1022  // It is assumed that the underlying transport makes sure that there is no
1023  // pending requests that are not answered, ie. all possible virtual streams
1024  // are de-allocated
1025  //--------------------------------------------------------------------------
1026  Log *log = DefaultEnv::GetLog();
1027  SubStreamList::iterator it;
1028  time_t now = time(0);
1029 
1030  XrdSysMutexHelper scopedLock( pMutex );
1031  uint32_t outgoingMessages = 0;
1032  time_t lastActivity = 0;
1033  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1034  {
1035  outgoingMessages += (*it)->outQueue->GetSize();
1036  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1037  if( lastActivity < sockLastActivity )
1038  lastActivity = sockLastActivity;
1039  }
1040 
1041  if( !outgoingMessages )
1042  {
1043  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1044  *pChannelData );
1045  if( disconnect )
1046  {
1047  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1048  pStreamName.c_str() );
1049  scopedLock.UnLock();
1050  //----------------------------------------------------------------------
1051  // Important note!
1052  //
1053  // This destroys the Stream object itself, the underlined
1054  // AsyncSocketHandler object (that called this method) and the Channel
1055  // object that aggregates this Stream.
1056  //
1057  // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1058  // in a Channel that was previously collapsed in a redirect.
1059  //----------------------------------------------------------------------
1061  return false;
1062  }
1063  }
1064 
1065  //--------------------------------------------------------------------------
1066  // Check if the stream is broken
1067  //--------------------------------------------------------------------------
1068  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1069  *pChannelData );
1070  if( !st.IsOK() )
1071  {
1072  scopedLock.UnLock();
1073  OnError( substream, st );
1074  return false;
1075  }
1076  return true;
1077  }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:828
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 545 of file XrdClStream.cc.

546  {
547  XrdSysMutexHelper scopedLock( pMutex );
548  Log *log = DefaultEnv::GetLog();
549  if( pSubStreams[subStream]->outQueue->IsEmpty() )
550  {
551  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552  pSubStreams[subStream]->socket->GetStreamName().c_str() );
553 
554  pSubStreams[subStream]->socket->DisableUplink();
555  return std::make_pair( (Message *)0, (MsgHandler *)0 );
556  }
557 
558  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560  h.expires,
561  h.stateful );
562 
563  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564  "from out-queue to in-queue, starting to send outgoing.",
565  pUrl->GetHostId().c_str(), (void*)h.handler,
566  h.msg->GetObfuscatedDescription().c_str() );
567 
568  scopedLock.UnLock();
569 
570  if( h.handler )
571  {
572  bool rmMsg = false;
573  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574  if( rmMsg )
575  {
576  Log *log = DefaultEnv::GetLog();
577  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578  pStreamName.c_str() );
579  }
580  h.handler->OnReadyToSend( h.msg );
581  }
582  return std::make_pair( h.msg, h.handler );
583  }
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54

References XrdCl::InQueue::AddMessageHandler(), XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdSysMutexHelper::UnLock(), and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1082 of file XrdClStream.cc.

1083  {
1084  return true;
1085  }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1204 of file XrdClStream.cc.

1205  {
1206  switch( query )
1207  {
1208  case StreamQuery::IpAddr:
1209  {
1210  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1211  return Status();
1212  }
1213 
1214  case StreamQuery::IpStack:
1215  {
1216  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1217  return Status();
1218  }
1219 
1220  case StreamQuery::HostName:
1221  {
1222  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1223  return Status();
1224  }
1225 
1226  default:
1227  return Status( stError, errQueryNotSupported );
1228  }
1229  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1090 of file XrdClStream.cc.

1091  {
1092  pChannelEvHandlers.AddHandler( handler );
1093  }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1098 of file XrdClStream.cc.

1099  {
1100  pChannelEvHandlers.RemoveHandler( handler );
1101  }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301  {
302  XrdSysMutexHelper scopedLock( pMutex );
303  Log *log = DefaultEnv::GetLog();
304 
305  //--------------------------------------------------------------------------
306  // Check the session ID and bounce if needed
307  //--------------------------------------------------------------------------
308  if( msg->GetSessionId() &&
309  (pSubStreams[0]->status != Socket::Connected ||
310  pSessionId != msg->GetSessionId()) )
311  return XRootDStatus( stError, errInvalidSession );
312 
313  //--------------------------------------------------------------------------
314  // Decide on the path to send the message
315  //--------------------------------------------------------------------------
316  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317  if( pSubStreams.size() <= path.up )
318  {
319  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320  "substream %d, using 0 instead", pStreamName.c_str(),
321  msg->GetObfuscatedDescription().c_str(), path.up );
322  path.up = 0;
323  }
324 
325  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326  "substream %d expecting answer at %d", pStreamName.c_str(),
327  msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
328 
329  //--------------------------------------------------------------------------
330  // Enable *a* path and insert the message to the right queue
331  //--------------------------------------------------------------------------
332  XRootDStatus st = EnableLink( path );
333  if( st.IsOK() )
334  {
335  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337  expires, stateful );
338  }
339  else
340  st.status = stFatal;
341  return st;
342  }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116  {
117  pChannelData = channelData;
118  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108  {
109  pIncomingQueue = incomingQueue;
110  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132  {
133  pJobManager = jobManager;
134  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264  {
265  XrdSysMutexHelper scopedLock( pMutex );
266  pOnDataConnJob = onConnJob;
267  }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100  {
101  pPoller = poller;
102  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124  {
125  pTaskManager = taskManager;
126  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92  {
93  pTransport = transport;
94  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378  {
379  //--------------------------------------------------------------------------
380  // Check for timed-out requests and incoming handlers
381  //--------------------------------------------------------------------------
382  pMutex.Lock();
383  OutQueue q;
384  SubStreamList::iterator it;
385  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386  q.GrabExpired( *(*it)->outQueue, now );
387  pMutex.UnLock();
388 
389  q.Report( XRootDStatus( stError, errOperationExpired ) );
390  pIncomingQueue->ReportTimeout( now );
391  }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: