XRootD
XrdCl::PollerBuiltIn Class Reference

A poller implementation using the build-in XRootD poller. More...

#include <XrdClPollerBuiltIn.hh>

+ Inheritance diagram for XrdCl::PollerBuiltIn:
+ Collaboration diagram for XrdCl::PollerBuiltIn:

Public Member Functions

 PollerBuiltIn ()
 Constructor. More...
 
 ~PollerBuiltIn ()
 
virtual bool AddSocket (Socket *socket, SocketHandler *handler)
 
virtual bool EnableReadNotification (Socket *socket, bool notify, uint16_t timeout=60)
 
virtual bool EnableWriteNotification (Socket *socket, bool notify, uint16_t timeout=60)
 
virtual bool Finalize ()
 Finalize the poller. More...
 
virtual bool Initialize ()
 Initialize the poller. More...
 
virtual bool IsRegistered (Socket *socket)
 Check whether the socket is registered with the poller. More...
 
virtual bool IsRunning () const
 Is the event loop running? More...
 
virtual bool RemoveSocket (Socket *socket)
 Remove the socket. More...
 
virtual bool Start ()
 Start polling. More...
 
virtual bool Stop ()
 Stop polling. More...
 
- Public Member Functions inherited from XrdCl::Poller
virtual ~Poller ()
 Destructor. More...
 

Detailed Description

A poller implementation using the build-in XRootD poller.

Definition at line 40 of file XrdClPollerBuiltIn.hh.

Constructor & Destructor Documentation

◆ PollerBuiltIn()

XrdCl::PollerBuiltIn::PollerBuiltIn ( )
inline

Constructor.

Definition at line 46 of file XrdClPollerBuiltIn.hh.

46 : pNbPoller( GetNbPollerInit() ){}

◆ ~PollerBuiltIn()

XrdCl::PollerBuiltIn::~PollerBuiltIn ( )
inline

Definition at line 48 of file XrdClPollerBuiltIn.hh.

48 {}

Member Function Documentation

◆ AddSocket()

bool XrdCl::PollerBuiltIn::AddSocket ( Socket socket,
SocketHandler handler 
)
virtual

Add socket to the polling loop

Parameters
socketthe socket
handlerobject handling the events

Implements XrdCl::Poller.

Definition at line 255 of file XrdClPollerBuiltIn.cc.

257  {
258  Log *log = DefaultEnv::GetLog();
259  XrdSysMutexHelper scopedLock( pMutex );
260 
261  if( !socket )
262  {
263  log->Error( PollerMsg, "Invalid socket, impossible to poll" );
264  return false;
265  }
266 
267  if( socket->GetStatus() != Socket::Connected &&
268  socket->GetStatus() != Socket::Connecting )
269  {
270  log->Error( PollerMsg, "Socket is not in a state valid for polling" );
271  return false;
272  }
273 
274  log->Debug( PollerMsg, "Adding socket %p to the poller", (void*)socket );
275 
276  //--------------------------------------------------------------------------
277  // Check if the socket is already registered
278  //--------------------------------------------------------------------------
279  SocketMap::const_iterator it = pSocketMap.find( socket );
280  if( it != pSocketMap.end() )
281  {
282  log->Warning( PollerMsg, "%s Already registered with this poller",
283  socket->GetName().c_str() );
284  return false;
285  }
286 
287  //--------------------------------------------------------------------------
288  // Create the socket helper
289  //--------------------------------------------------------------------------
290  XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
291 
292  if( !poller )
293  {
294  log->Error( PollerMsg, "No poller available, can not add socket" );
295  return false;
296  }
297 
298  PollerHelper *helper = new PollerHelper();
299  helper->callBack = new ::SocketCallBack( socket, handler );
300 
301  if( poller )
302  {
303  helper->channel = new XrdSys::IOEvents::Channel( poller,
304  socket->GetFD(),
305  helper->callBack );
306  }
307 
308  handler->Initialize( this );
309  pSocketMap[socket] = helper;
310  return true;
311  }
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual void Initialize(Poller *)
Initializer.
Definition: XrdClPoller.hh:55
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
const uint64_t PollerMsg

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::Socket::GetFD(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::Socket::GetStatus(), XrdCl::SocketHandler::Initialize(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ EnableReadNotification()

bool XrdCl::PollerBuiltIn::EnableReadNotification ( Socket socket,
bool  notify,
uint16_t  timeout = 60 
)
virtual

Notify the handler about read events

Parameters
socketthe socket
notifyspecify if the handler should be notified
timeoutif no read event occurred after this time a timeout event will be generated

Implements XrdCl::Poller.

Definition at line 362 of file XrdClPollerBuiltIn.cc.

365  {
366  using namespace XrdSys::IOEvents;
367  Log *log = DefaultEnv::GetLog();
368 
369  if( !socket )
370  {
371  log->Error( PollerMsg, "Invalid socket, read events unavailable" );
372  return false;
373  }
374 
375  //--------------------------------------------------------------------------
376  // Check if the socket is registered
377  //--------------------------------------------------------------------------
378  XrdSysMutexHelper scopedLock( pMutex );
379  SocketMap::const_iterator it = pSocketMap.find( socket );
380  if( it == pSocketMap.end() )
381  {
382  log->Warning( PollerMsg, "%s Socket is not registered",
383  socket->GetName().c_str() );
384  return false;
385  }
386 
387  PollerHelper *helper = (PollerHelper*)it->second;
388  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
389 
390  //--------------------------------------------------------------------------
391  // Enable read notifications
392  //--------------------------------------------------------------------------
393  if( notify )
394  {
395  if( helper->readEnabled )
396  return true;
397  helper->readTimeout = timeout;
398 
399  log->Dump( PollerMsg, "%s Enable read notifications, timeout: %d",
400  socket->GetName().c_str(), timeout );
401 
402  if( poller )
403  {
404  const char *errMsg;
405  bool status = helper->channel->Enable( Channel::readEvents, timeout,
406  &errMsg );
407  if( !status )
408  {
409  log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
410  socket->GetName().c_str(), errMsg );
411  return false;
412  }
413  }
414  helper->readEnabled = true;
415  }
416 
417  //--------------------------------------------------------------------------
418  // Disable read notifications
419  //--------------------------------------------------------------------------
420  else
421  {
422  if( !helper->readEnabled )
423  return true;
424 
425  log->Dump( PollerMsg, "%s Disable read notifications",
426  socket->GetName().c_str() );
427 
428  if( poller )
429  {
430  const char *errMsg;
431  bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
432  if( !status )
433  {
434  log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
435  socket->GetName().c_str(), errMsg );
436  return false;
437  }
438  }
439  helper->readEnabled = false;
440  }
441  return true;
442  }
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
std::string GetName() const
Get the string representation of the socket.
Definition: XrdClSocket.cc:672

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ EnableWriteNotification()

bool XrdCl::PollerBuiltIn::EnableWriteNotification ( Socket socket,
bool  notify,
uint16_t  timeout = 60 
)
virtual

Notify the handler about write events

Parameters
socketthe socket
notifyspecify if the handler should be notified
timeoutif no write event occurred after this time a timeout event will be generated

Implements XrdCl::Poller.

Definition at line 447 of file XrdClPollerBuiltIn.cc.

450  {
451  using namespace XrdSys::IOEvents;
452  Log *log = DefaultEnv::GetLog();
453 
454  if( !socket )
455  {
456  log->Error( PollerMsg, "Invalid socket, write events unavailable" );
457  return false;
458  }
459 
460  //--------------------------------------------------------------------------
461  // Check if the socket is registered
462  //--------------------------------------------------------------------------
463  XrdSysMutexHelper scopedLock( pMutex );
464  SocketMap::const_iterator it = pSocketMap.find( socket );
465  if( it == pSocketMap.end() )
466  {
467  log->Warning( PollerMsg, "%s Socket is not registered",
468  socket->GetName().c_str() );
469  return false;
470  }
471 
472  PollerHelper *helper = (PollerHelper*)it->second;
473  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
474 
475  //--------------------------------------------------------------------------
476  // Enable write notifications
477  //--------------------------------------------------------------------------
478  if( notify )
479  {
480  if( helper->writeEnabled )
481  return true;
482 
483  helper->writeTimeout = timeout;
484 
485  log->Dump( PollerMsg, "%s Enable write notifications, timeout: %d",
486  socket->GetName().c_str(), timeout );
487 
488  if( poller )
489  {
490  const char *errMsg;
491  bool status = helper->channel->Enable( Channel::writeEvents, timeout,
492  &errMsg );
493  if( !status )
494  {
495  log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
496  socket->GetName().c_str(), errMsg );
497  return false;
498  }
499  }
500  helper->writeEnabled = true;
501  }
502 
503  //--------------------------------------------------------------------------
504  // Disable read notifications
505  //--------------------------------------------------------------------------
506  else
507  {
508  if( !helper->writeEnabled )
509  return true;
510 
511  log->Dump( PollerMsg, "%s Disable write notifications",
512  socket->GetName().c_str() );
513  if( poller )
514  {
515  const char *errMsg;
516  bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
517  if( !status )
518  {
519  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
520  socket->GetName().c_str(), errMsg );
521  return false;
522  }
523  }
524  helper->writeEnabled = false;
525  }
526  return true;
527  }

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ Finalize()

bool XrdCl::PollerBuiltIn::Finalize ( )
virtual

Finalize the poller.

Implements XrdCl::Poller.

Definition at line 106 of file XrdClPollerBuiltIn.cc.

107  {
108  //--------------------------------------------------------------------------
109  // Clean up the channels
110  //--------------------------------------------------------------------------
111  SocketMap::iterator it;
112  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
113  {
114  PollerHelper *helper = (PollerHelper*)it->second;
115  if( helper->channel ) helper->channel->Delete();
116  delete helper->callBack;
117  delete helper;
118  }
119  pSocketMap.clear();
120 
121  return true;
122  }

◆ Initialize()

bool XrdCl::PollerBuiltIn::Initialize ( )
virtual

Initialize the poller.

Implements XrdCl::Poller.

Definition at line 98 of file XrdClPollerBuiltIn.cc.

99  {
100  return true;
101  }

◆ IsRegistered()

bool XrdCl::PollerBuiltIn::IsRegistered ( Socket socket)
virtual

Check whether the socket is registered with the poller.

Implements XrdCl::Poller.

Definition at line 532 of file XrdClPollerBuiltIn.cc.

533  {
534  XrdSysMutexHelper scopedLock( pMutex );
535  SocketMap::iterator it = pSocketMap.find( socket );
536  return it != pSocketMap.end();
537  }

◆ IsRunning()

virtual bool XrdCl::PollerBuiltIn::IsRunning ( ) const
inlinevirtual

Is the event loop running?

Implements XrdCl::Poller.

Definition at line 117 of file XrdClPollerBuiltIn.hh.

118  {
119  return !pPollerPool.empty();
120  }

◆ RemoveSocket()

bool XrdCl::PollerBuiltIn::RemoveSocket ( Socket socket)
virtual

Remove the socket.

Implements XrdCl::Poller.

Definition at line 316 of file XrdClPollerBuiltIn.cc.

317  {
318  using namespace XrdSys::IOEvents;
319  Log *log = DefaultEnv::GetLog();
320 
321  //--------------------------------------------------------------------------
322  // Find the right socket
323  //--------------------------------------------------------------------------
324  XrdSysMutexHelper scopedLock( pMutex );
325  SocketMap::iterator it = pSocketMap.find( socket );
326  if( it == pSocketMap.end() )
327  return true;
328 
329  log->Debug( PollerMsg, "%s Removing socket from the poller",
330  socket->GetName().c_str() );
331 
332  // unregister from the poller it's currently associated with
333  UnregisterFromPoller( socket );
334 
335  //--------------------------------------------------------------------------
336  // Remove the socket
337  //--------------------------------------------------------------------------
338  PollerHelper *helper = (PollerHelper*)it->second;
339  pSocketMap.erase( it );
340  scopedLock.UnLock();
341 
342  if( helper->channel )
343  {
344  const char *errMsg;
345  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
346  if( !status )
347  {
348  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
349  socket->GetName().c_str(), errMsg );
350  return false;
351  }
352  helper->channel->Delete();
353  }
354  delete helper->callBack;
355  delete helper;
356  return true;
357  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

◆ Start()

bool XrdCl::PollerBuiltIn::Start ( )
virtual

Start polling.

Implements XrdCl::Poller.

Definition at line 127 of file XrdClPollerBuiltIn.cc.

128  {
129  //--------------------------------------------------------------------------
130  // Start the poller
131  //--------------------------------------------------------------------------
132  using namespace XrdSys;
133 
134  Log *log = DefaultEnv::GetLog();
135  log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
136  XrdSysMutexHelper scopedLock( pMutex );
137  int errNum = 0;
138  const char *errMsg = 0;
139 
140  for( int i = 0; i < pNbPoller; ++i )
141  {
142  XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
143  if( !poller )
144  {
145  log->Error( PollerMsg, "Unable to create the internal poller object: "
146  "%s (%s)", XrdSysE2T( errno ), errMsg );
147  return false;
148  }
149  pPollerPool.push_back( poller );
150  }
151 
152  pNext = pPollerPool.begin();
153 
154  log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
155 
156  //--------------------------------------------------------------------------
157  // Check if we have any descriptors to reinsert from the last time we
158  // were started
159  //--------------------------------------------------------------------------
160  SocketMap::iterator it;
161  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
162  {
163  PollerHelper *helper = (PollerHelper*)it->second;
164  Socket *socket = it->first;
165  helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
166  helper->callBack );
167  if( helper->readEnabled )
168  {
169  bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
170  helper->readTimeout, &errMsg );
171  if( !status )
172  {
173  log->Error( PollerMsg, "Unable to enable read notifications "
174  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
175 
176  return false;
177  }
178  }
179 
180  if( helper->writeEnabled )
181  {
182  bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
183  helper->writeTimeout, &errMsg );
184  if( !status )
185  {
186  log->Error( PollerMsg, "Unable to enable write notifications "
187  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
188 
189  return false;
190  }
191  }
192  }
193  return true;
194  }
bool Create
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
A network socket.
Definition: XrdClSocket.hh:43
bool Enable(int events, int timeout=0, const char **eText=0)

References Create, XrdCl::Log::Debug(), XrdSys::IOEvents::Channel::Enable(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::PollerMsg, and XrdSysE2T().

+ Here is the call graph for this function:

◆ Stop()

bool XrdCl::PollerBuiltIn::Stop ( )
virtual

Stop polling.

Implements XrdCl::Poller.

Definition at line 199 of file XrdClPollerBuiltIn.cc.

200  {
201  using namespace XrdSys::IOEvents;
202 
203  Log *log = DefaultEnv::GetLog();
204  log->Debug( PollerMsg, "Stopping the poller..." );
205 
206  XrdSysMutexHelper scopedLock( pMutex );
207 
208  if( pPollerPool.empty() )
209  {
210  log->Debug( PollerMsg, "Stopping a poller that has not been started" );
211  return true;
212  }
213 
214  while( !pPollerPool.empty() )
215  {
216  XrdSys::IOEvents::Poller *poller = pPollerPool.back();
217  if( *pNext == poller )
218  pNext = pPollerPool.begin();
219  pPollerPool.pop_back();
220 
221  if( !poller ) continue;
222 
223  scopedLock.UnLock();
224  poller->Stop();
225  delete poller;
226  scopedLock.Lock( &pMutex );
227  }
228  pNext = pPollerPool.end();
229  pPollerMap.clear();
230 
231  SocketMap::iterator it;
232  const char *errMsg = 0;
233 
234  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
235  {
236  PollerHelper *helper = (PollerHelper*)it->second;
237  if( !helper->channel ) continue;
238  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
239  if( !status )
240  {
241  Socket *socket = it->first;
242  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
243  socket->GetName().c_str(), errMsg );
244  }
245  helper->channel->Delete();
246  helper->channel = 0;
247  }
248 
249  return true;
250  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdSysMutexHelper::Lock(), XrdCl::PollerMsg, XrdSys::IOEvents::Poller::Stop(), and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: