XRootD
XrdClPollerBuiltIn.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClSocket.hh"
30 #include "XrdCl/XrdClOptimizers.hh"
31 #include "XrdSys/XrdSysE2T.hh"
32 #include "XrdSys/XrdSysIOEvents.hh"
33 
34 namespace
35 {
36  //----------------------------------------------------------------------------
37  // A helper struct passed to the callback as a custom arg
38  //----------------------------------------------------------------------------
39  struct PollerHelper
40  {
41  PollerHelper():
42  channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43  readTimeout(0), writeTimeout(0)
44  {}
47  bool readEnabled;
48  bool writeEnabled;
49  uint16_t readTimeout;
50  uint16_t writeTimeout;
51  };
52 
53  //----------------------------------------------------------------------------
54  // Call back implementation
55  //----------------------------------------------------------------------------
56  class SocketCallBack: public XrdSys::IOEvents::CallBack
57  {
58  public:
59  SocketCallBack( XrdCl::Socket *sock, XrdCl::SocketHandler *sh ):
60  pSocket( sock ), pHandler( sh ) {}
61  virtual ~SocketCallBack() {};
62 
63  virtual bool Event( XrdSys::IOEvents::Channel *chP,
64  void *cbArg,
65  int evFlags )
66  {
67  using namespace XrdCl;
68  uint8_t ev = 0;
69 
70  if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
71  if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
72  if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
73  if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
74 
75  Log *log = DefaultEnv::GetLog();
76  if( unlikely(log->GetLevel() >= Log::DumpMsg) )
77  {
78  log->Dump( PollerMsg, "%s Got an event: %s",
79  pSocket->GetName().c_str(),
80  SocketHandler::EventTypeToString( ev ).c_str() );
81  }
82 
83  pHandler->Event( ev, pSocket );
84  return true;
85  }
86  private:
87  XrdCl::Socket *pSocket;
88  XrdCl::SocketHandler *pHandler;
89  };
90 }
91 
92 
93 namespace XrdCl
94 {
95  //----------------------------------------------------------------------------
96  // Initialize the poller
97  //----------------------------------------------------------------------------
99  {
100  return true;
101  }
102 
103  //----------------------------------------------------------------------------
104  // Finalize the poller
105  //----------------------------------------------------------------------------
107  {
108  //--------------------------------------------------------------------------
109  // Clean up the channels
110  //--------------------------------------------------------------------------
111  SocketMap::iterator it;
112  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
113  {
114  PollerHelper *helper = (PollerHelper*)it->second;
115  if( helper->channel ) helper->channel->Delete();
116  delete helper->callBack;
117  delete helper;
118  }
119  pSocketMap.clear();
120 
121  return true;
122  }
123 
124  //------------------------------------------------------------------------
125  // Start polling
126  //------------------------------------------------------------------------
128  {
129  //--------------------------------------------------------------------------
130  // Start the poller
131  //--------------------------------------------------------------------------
132  using namespace XrdSys;
133 
134  Log *log = DefaultEnv::GetLog();
135  log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
136  XrdSysMutexHelper scopedLock( pMutex );
137  int errNum = 0;
138  const char *errMsg = 0;
139 
140  for( int i = 0; i < pNbPoller; ++i )
141  {
142  XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
143  if( !poller )
144  {
145  log->Error( PollerMsg, "Unable to create the internal poller object: "
146  "%s (%s)", XrdSysE2T( errno ), errMsg );
147  return false;
148  }
149  pPollerPool.push_back( poller );
150  }
151 
152  pNext = pPollerPool.begin();
153 
154  log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
155 
156  //--------------------------------------------------------------------------
157  // Check if we have any descriptors to reinsert from the last time we
158  // were started
159  //--------------------------------------------------------------------------
160  SocketMap::iterator it;
161  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
162  {
163  PollerHelper *helper = (PollerHelper*)it->second;
164  Socket *socket = it->first;
165  helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
166  helper->callBack );
167  if( helper->readEnabled )
168  {
169  bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
170  helper->readTimeout, &errMsg );
171  if( !status )
172  {
173  log->Error( PollerMsg, "Unable to enable read notifications "
174  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
175 
176  return false;
177  }
178  }
179 
180  if( helper->writeEnabled )
181  {
182  bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
183  helper->writeTimeout, &errMsg );
184  if( !status )
185  {
186  log->Error( PollerMsg, "Unable to enable write notifications "
187  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
188 
189  return false;
190  }
191  }
192  }
193  return true;
194  }
195 
196  //------------------------------------------------------------------------
197  // Stop polling
198  //------------------------------------------------------------------------
200  {
201  using namespace XrdSys::IOEvents;
202 
203  Log *log = DefaultEnv::GetLog();
204  log->Debug( PollerMsg, "Stopping the poller..." );
205 
206  XrdSysMutexHelper scopedLock( pMutex );
207 
208  if( pPollerPool.empty() )
209  {
210  log->Debug( PollerMsg, "Stopping a poller that has not been started" );
211  return true;
212  }
213 
214  while( !pPollerPool.empty() )
215  {
216  XrdSys::IOEvents::Poller *poller = pPollerPool.back();
217  if( *pNext == poller )
218  pNext = pPollerPool.begin();
219  pPollerPool.pop_back();
220 
221  if( !poller ) continue;
222 
223  scopedLock.UnLock();
224  poller->Stop();
225  delete poller;
226  scopedLock.Lock( &pMutex );
227  }
228  pNext = pPollerPool.end();
229  pPollerMap.clear();
230 
231  SocketMap::iterator it;
232  const char *errMsg = 0;
233 
234  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
235  {
236  PollerHelper *helper = (PollerHelper*)it->second;
237  if( !helper->channel ) continue;
238  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
239  if( !status )
240  {
241  Socket *socket = it->first;
242  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
243  socket->GetName().c_str(), errMsg );
244  }
245  helper->channel->Delete();
246  helper->channel = 0;
247  }
248 
249  return true;
250  }
251 
252  //------------------------------------------------------------------------
253  // Add socket to the polling queue
254  //------------------------------------------------------------------------
256  SocketHandler *handler )
257  {
258  Log *log = DefaultEnv::GetLog();
259  XrdSysMutexHelper scopedLock( pMutex );
260 
261  if( !socket )
262  {
263  log->Error( PollerMsg, "Invalid socket, impossible to poll" );
264  return false;
265  }
266 
267  if( socket->GetStatus() != Socket::Connected &&
268  socket->GetStatus() != Socket::Connecting )
269  {
270  log->Error( PollerMsg, "Socket is not in a state valid for polling" );
271  return false;
272  }
273 
274  log->Debug( PollerMsg, "Adding socket %p to the poller", (void*)socket );
275 
276  //--------------------------------------------------------------------------
277  // Check if the socket is already registered
278  //--------------------------------------------------------------------------
279  SocketMap::const_iterator it = pSocketMap.find( socket );
280  if( it != pSocketMap.end() )
281  {
282  log->Warning( PollerMsg, "%s Already registered with this poller",
283  socket->GetName().c_str() );
284  return false;
285  }
286 
287  //--------------------------------------------------------------------------
288  // Create the socket helper
289  //--------------------------------------------------------------------------
290  XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
291 
292  if( !poller )
293  {
294  log->Error( PollerMsg, "No poller available, can not add socket" );
295  return false;
296  }
297 
298  PollerHelper *helper = new PollerHelper();
299  helper->callBack = new ::SocketCallBack( socket, handler );
300 
301  if( poller )
302  {
303  helper->channel = new XrdSys::IOEvents::Channel( poller,
304  socket->GetFD(),
305  helper->callBack );
306  }
307 
308  handler->Initialize( this );
309  pSocketMap[socket] = helper;
310  return true;
311  }
312 
313  //------------------------------------------------------------------------
314  // Remove the socket
315  //------------------------------------------------------------------------
317  {
318  using namespace XrdSys::IOEvents;
319  Log *log = DefaultEnv::GetLog();
320 
321  //--------------------------------------------------------------------------
322  // Find the right socket
323  //--------------------------------------------------------------------------
324  XrdSysMutexHelper scopedLock( pMutex );
325  SocketMap::iterator it = pSocketMap.find( socket );
326  if( it == pSocketMap.end() )
327  return true;
328 
329  log->Debug( PollerMsg, "%s Removing socket from the poller",
330  socket->GetName().c_str() );
331 
332  // unregister from the poller it's currently associated with
333  UnregisterFromPoller( socket );
334 
335  //--------------------------------------------------------------------------
336  // Remove the socket
337  //--------------------------------------------------------------------------
338  PollerHelper *helper = (PollerHelper*)it->second;
339  pSocketMap.erase( it );
340  scopedLock.UnLock();
341 
342  if( helper->channel )
343  {
344  const char *errMsg;
345  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
346  if( !status )
347  {
348  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
349  socket->GetName().c_str(), errMsg );
350  return false;
351  }
352  helper->channel->Delete();
353  }
354  delete helper->callBack;
355  delete helper;
356  return true;
357  }
358 
359  //----------------------------------------------------------------------------
360  // Notify the handler about read events
361  //----------------------------------------------------------------------------
363  bool notify,
364  uint16_t timeout )
365  {
366  using namespace XrdSys::IOEvents;
367  Log *log = DefaultEnv::GetLog();
368 
369  if( !socket )
370  {
371  log->Error( PollerMsg, "Invalid socket, read events unavailable" );
372  return false;
373  }
374 
375  //--------------------------------------------------------------------------
376  // Check if the socket is registered
377  //--------------------------------------------------------------------------
378  XrdSysMutexHelper scopedLock( pMutex );
379  SocketMap::const_iterator it = pSocketMap.find( socket );
380  if( it == pSocketMap.end() )
381  {
382  log->Warning( PollerMsg, "%s Socket is not registered",
383  socket->GetName().c_str() );
384  return false;
385  }
386 
387  PollerHelper *helper = (PollerHelper*)it->second;
388  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
389 
390  //--------------------------------------------------------------------------
391  // Enable read notifications
392  //--------------------------------------------------------------------------
393  if( notify )
394  {
395  if( helper->readEnabled )
396  return true;
397  helper->readTimeout = timeout;
398 
399  log->Dump( PollerMsg, "%s Enable read notifications, timeout: %d",
400  socket->GetName().c_str(), timeout );
401 
402  if( poller )
403  {
404  const char *errMsg;
405  bool status = helper->channel->Enable( Channel::readEvents, timeout,
406  &errMsg );
407  if( !status )
408  {
409  log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
410  socket->GetName().c_str(), errMsg );
411  return false;
412  }
413  }
414  helper->readEnabled = true;
415  }
416 
417  //--------------------------------------------------------------------------
418  // Disable read notifications
419  //--------------------------------------------------------------------------
420  else
421  {
422  if( !helper->readEnabled )
423  return true;
424 
425  log->Dump( PollerMsg, "%s Disable read notifications",
426  socket->GetName().c_str() );
427 
428  if( poller )
429  {
430  const char *errMsg;
431  bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
432  if( !status )
433  {
434  log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
435  socket->GetName().c_str(), errMsg );
436  return false;
437  }
438  }
439  helper->readEnabled = false;
440  }
441  return true;
442  }
443 
444  //----------------------------------------------------------------------------
445  // Notify the handler about write events
446  //----------------------------------------------------------------------------
448  bool notify,
449  uint16_t timeout )
450  {
451  using namespace XrdSys::IOEvents;
452  Log *log = DefaultEnv::GetLog();
453 
454  if( !socket )
455  {
456  log->Error( PollerMsg, "Invalid socket, write events unavailable" );
457  return false;
458  }
459 
460  //--------------------------------------------------------------------------
461  // Check if the socket is registered
462  //--------------------------------------------------------------------------
463  XrdSysMutexHelper scopedLock( pMutex );
464  SocketMap::const_iterator it = pSocketMap.find( socket );
465  if( it == pSocketMap.end() )
466  {
467  log->Warning( PollerMsg, "%s Socket is not registered",
468  socket->GetName().c_str() );
469  return false;
470  }
471 
472  PollerHelper *helper = (PollerHelper*)it->second;
473  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
474 
475  //--------------------------------------------------------------------------
476  // Enable write notifications
477  //--------------------------------------------------------------------------
478  if( notify )
479  {
480  if( helper->writeEnabled )
481  return true;
482 
483  helper->writeTimeout = timeout;
484 
485  log->Dump( PollerMsg, "%s Enable write notifications, timeout: %d",
486  socket->GetName().c_str(), timeout );
487 
488  if( poller )
489  {
490  const char *errMsg;
491  bool status = helper->channel->Enable( Channel::writeEvents, timeout,
492  &errMsg );
493  if( !status )
494  {
495  log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
496  socket->GetName().c_str(), errMsg );
497  return false;
498  }
499  }
500  helper->writeEnabled = true;
501  }
502 
503  //--------------------------------------------------------------------------
504  // Disable read notifications
505  //--------------------------------------------------------------------------
506  else
507  {
508  if( !helper->writeEnabled )
509  return true;
510 
511  log->Dump( PollerMsg, "%s Disable write notifications",
512  socket->GetName().c_str() );
513  if( poller )
514  {
515  const char *errMsg;
516  bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
517  if( !status )
518  {
519  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
520  socket->GetName().c_str(), errMsg );
521  return false;
522  }
523  }
524  helper->writeEnabled = false;
525  }
526  return true;
527  }
528 
529  //----------------------------------------------------------------------------
530  // Check whether the socket is registered with the poller
531  //----------------------------------------------------------------------------
533  {
534  XrdSysMutexHelper scopedLock( pMutex );
535  SocketMap::iterator it = pSocketMap.find( socket );
536  return it != pSocketMap.end();
537  }
538 
539  //----------------------------------------------------------------------------
540  // Return poller threads in round-robin fashion
541  //----------------------------------------------------------------------------
542  XrdSys::IOEvents::Poller* PollerBuiltIn::GetNextPoller()
543  {
544  if( pPollerPool.empty() ) return 0;
545 
546  PollerPool::iterator ret = pNext;
547  ++pNext;
548  if( pNext == pPollerPool.end() )
549  pNext = pPollerPool.begin();
550  return *ret;
551  }
552 
553  //----------------------------------------------------------------------------
554  // Return the poller associated with the respective channel
555  //----------------------------------------------------------------------------
556  XrdSys::IOEvents::Poller* PollerBuiltIn::RegisterAndGetPoller(const Socket * socket)
557  {
558  PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
559 
560  if( itr == pPollerMap.end() )
561  {
562  XrdSys::IOEvents::Poller* poller = GetNextPoller();
563  if( poller )
564  pPollerMap[socket->GetFD()] = poller;
565  return poller;
566  }
567 
568  return itr->second;
569  }
570 
571  void PollerBuiltIn::UnregisterFromPoller( const Socket *socket )
572  {
573  PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
574  if( itr == pPollerMap.end() ) return;
575  pPollerMap.erase( itr );
576  }
577 
578  XrdSys::IOEvents::Poller* PollerBuiltIn::GetPoller(const Socket * socket)
579  {
580  PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
581  if( itr == pPollerMap.end() ) return 0;
582  return itr->second;
583  }
584 
585  //----------------------------------------------------------------------------
586  // Get the initial value for pNbPoller
587  //----------------------------------------------------------------------------
588  int PollerBuiltIn::GetNbPollerInit()
589  {
590  Env * env = DefaultEnv::GetEnv();
592  env->GetInt("ParallelEvtLoop", ret);
593  return ret;
594  }
595 }
#define unlikely(x)
bool Create
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition: XrdClLog.hh:258
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool EnableReadNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool Stop()
Stop polling.
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
Definition: XrdClPoller.hh:55
A network socket.
Definition: XrdClSocket.hh:43
std::string GetName() const
Get the string representation of the socket.
Definition: XrdClSocket.cc:672
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
int GetFD() const
Get the file descriptor.
Definition: XrdClSocket.hh:214
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
void Lock(XrdSysMutex *Mutex)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
bool Enable(int events, int timeout=0, const char **eText=0)
const uint64_t PollerMsg
const int DefaultParallelEvtLoop