XRootD
Loading...
Searching...
No Matches
XrdClPollerBuiltIn.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClSocket.hh"
31#include "XrdSys/XrdSysE2T.hh"
33
34namespace
35{
36 //----------------------------------------------------------------------------
37 // A helper struct passed to the callback as a custom arg
38 //----------------------------------------------------------------------------
39 struct PollerHelper
40 {
41 PollerHelper():
42 channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43 readTimeout(0), writeTimeout(0)
44 {}
45 XrdSys::IOEvents::Channel *channel;
46 XrdSys::IOEvents::CallBack *callBack;
47 bool readEnabled;
48 bool writeEnabled;
49 uint16_t readTimeout;
50 uint16_t writeTimeout;
51 };
52
53 //----------------------------------------------------------------------------
54 // Call back implementation
55 //----------------------------------------------------------------------------
56 class SocketCallBack: public XrdSys::IOEvents::CallBack
57 {
58 public:
59 SocketCallBack( XrdCl::Socket *sock, XrdCl::SocketHandler *sh ):
60 pSocket( sock ), pHandler( sh ) {}
61 virtual ~SocketCallBack() {};
62
63 virtual bool Event( XrdSys::IOEvents::Channel *chP,
64 void *cbArg,
65 int evFlags )
66 {
67 using namespace XrdCl;
68 uint8_t ev = 0;
69
70 if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
71 if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
72 if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
73 if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
74
75 Log *log = DefaultEnv::GetLog();
76 if( unlikely(log->GetLevel() >= Log::DumpMsg) )
77 {
78 log->Dump( PollerMsg, "%s Got an event: %s",
79 pSocket->GetName().c_str(),
80 SocketHandler::EventTypeToString( ev ).c_str() );
81 }
82
83 pHandler->Event( ev, pSocket );
84 return true;
85 }
86 private:
87 XrdCl::Socket *pSocket;
88 XrdCl::SocketHandler *pHandler;
89 };
90}
91
92
93namespace XrdCl
94{
95 //----------------------------------------------------------------------------
96 // Initialize the poller
97 //----------------------------------------------------------------------------
99 {
100 return true;
101 }
102
103 //----------------------------------------------------------------------------
104 // Finalize the poller
105 //----------------------------------------------------------------------------
107 {
108 //--------------------------------------------------------------------------
109 // Clean up the channels
110 //--------------------------------------------------------------------------
111 SocketMap::iterator it;
112 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
113 {
114 PollerHelper *helper = (PollerHelper*)it->second;
115 if( helper->channel ) helper->channel->Delete();
116 delete helper->callBack;
117 delete helper;
118 }
119 pSocketMap.clear();
120
121 return true;
122 }
123
124 //------------------------------------------------------------------------
125 // Start polling
126 //------------------------------------------------------------------------
128 {
129 //--------------------------------------------------------------------------
130 // Start the poller
131 //--------------------------------------------------------------------------
132 using namespace XrdSys;
133
134 Log *log = DefaultEnv::GetLog();
135 log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
136 XrdSysMutexHelper scopedLock( pMutex );
137 int errNum = 0;
138 const char *errMsg = 0;
139
140 for( int i = 0; i < pNbPoller; ++i )
141 {
142 XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
143 if( !poller )
144 {
145 log->Error( PollerMsg, "Unable to create the internal poller object: "
146 "%s (%s)", XrdSysE2T( errno ), errMsg );
147 return false;
148 }
149 pPollerPool.push_back( poller );
150 }
151
152 pNext = pPollerPool.begin();
153
154 log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
155
156 //--------------------------------------------------------------------------
157 // Check if we have any descriptors to reinsert from the last time we
158 // were started
159 //--------------------------------------------------------------------------
160 SocketMap::iterator it;
161 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
162 {
163 PollerHelper *helper = (PollerHelper*)it->second;
164 Socket *socket = it->first;
165 helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
166 helper->callBack );
167 if( helper->readEnabled )
168 {
169 bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
170 helper->readTimeout, &errMsg );
171 if( !status )
172 {
173 log->Error( PollerMsg, "Unable to enable read notifications "
174 "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
175
176 return false;
177 }
178 }
179
180 if( helper->writeEnabled )
181 {
182 bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
183 helper->writeTimeout, &errMsg );
184 if( !status )
185 {
186 log->Error( PollerMsg, "Unable to enable write notifications "
187 "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
188
189 return false;
190 }
191 }
192 }
193 return true;
194 }
195
196 //------------------------------------------------------------------------
197 // Stop polling
198 //------------------------------------------------------------------------
200 {
201 using namespace XrdSys::IOEvents;
202
203 Log *log = DefaultEnv::GetLog();
204 log->Debug( PollerMsg, "Stopping the poller..." );
205
206 XrdSysMutexHelper scopedLock( pMutex );
207
208 if( pPollerPool.empty() )
209 {
210 log->Debug( PollerMsg, "Stopping a poller that has not been started" );
211 return true;
212 }
213
214 while( !pPollerPool.empty() )
215 {
216 XrdSys::IOEvents::Poller *poller = pPollerPool.back();
217 if( *pNext == poller )
218 pNext = pPollerPool.begin();
219 pPollerPool.pop_back();
220
221 if( !poller ) continue;
222
223 scopedLock.UnLock();
224 poller->Stop();
225 delete poller;
226 scopedLock.Lock( &pMutex );
227 }
228 pNext = pPollerPool.end();
229 pPollerMap.clear();
230
231 SocketMap::iterator it;
232 const char *errMsg = 0;
233
234 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
235 {
236 PollerHelper *helper = (PollerHelper*)it->second;
237 if( !helper->channel ) continue;
238 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
239 if( !status )
240 {
241 Socket *socket = it->first;
242 log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
243 socket->GetName().c_str(), errMsg );
244 }
245 helper->channel->Delete();
246 helper->channel = 0;
247 }
248
249 return true;
250 }
251
252 //------------------------------------------------------------------------
253 // Add socket to the polling queue
254 //------------------------------------------------------------------------
256 SocketHandler *handler )
257 {
258 Log *log = DefaultEnv::GetLog();
259 XrdSysMutexHelper scopedLock( pMutex );
260
261 if( !socket )
262 {
263 log->Error( PollerMsg, "Invalid socket, impossible to poll" );
264 return false;
265 }
266
267 if( socket->GetStatus() != Socket::Connected &&
268 socket->GetStatus() != Socket::Connecting )
269 {
270 log->Error( PollerMsg, "Socket is not in a state valid for polling" );
271 return false;
272 }
273
274 log->Debug( PollerMsg, "Adding socket %p to the poller", (void*)socket );
275
276 //--------------------------------------------------------------------------
277 // Check if the socket is already registered
278 //--------------------------------------------------------------------------
279 SocketMap::const_iterator it = pSocketMap.find( socket );
280 if( it != pSocketMap.end() )
281 {
282 log->Warning( PollerMsg, "%s Already registered with this poller",
283 socket->GetName().c_str() );
284 return false;
285 }
286
287 //--------------------------------------------------------------------------
288 // Create the socket helper
289 //--------------------------------------------------------------------------
290 XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
291
292 if( !poller )
293 {
294 log->Error( PollerMsg, "No poller available, can not add socket" );
295 return false;
296 }
297
298 PollerHelper *helper = new PollerHelper();
299 helper->callBack = new ::SocketCallBack( socket, handler );
300
301 if( poller )
302 {
303 helper->channel = new XrdSys::IOEvents::Channel( poller,
304 socket->GetFD(),
305 helper->callBack );
306 }
307
308 handler->Initialize( this );
309 pSocketMap[socket] = helper;
310 return true;
311 }
312
313 //------------------------------------------------------------------------
314 // Remove the socket
315 //------------------------------------------------------------------------
317 {
318 using namespace XrdSys::IOEvents;
319 Log *log = DefaultEnv::GetLog();
320
321 //--------------------------------------------------------------------------
322 // Find the right socket
323 //--------------------------------------------------------------------------
324 XrdSysMutexHelper scopedLock( pMutex );
325 SocketMap::iterator it = pSocketMap.find( socket );
326 if( it == pSocketMap.end() )
327 return true;
328
329 log->Debug( PollerMsg, "%s Removing socket from the poller",
330 socket->GetName().c_str() );
331
332 // unregister from the poller it's currently associated with
333 UnregisterFromPoller( socket );
334
335 //--------------------------------------------------------------------------
336 // Remove the socket
337 //--------------------------------------------------------------------------
338 PollerHelper *helper = (PollerHelper*)it->second;
339 pSocketMap.erase( it );
340 scopedLock.UnLock();
341
342 if( helper->channel )
343 {
344 const char *errMsg;
345 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
346 if( !status )
347 {
348 log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
349 socket->GetName().c_str(), errMsg );
350 return false;
351 }
352 helper->channel->Delete();
353 }
354 delete helper->callBack;
355 delete helper;
356 return true;
357 }
358
359 //----------------------------------------------------------------------------
360 // Notify the handler about read events
361 //----------------------------------------------------------------------------
363 bool notify,
364 uint16_t timeout )
365 {
366 using namespace XrdSys::IOEvents;
367 Log *log = DefaultEnv::GetLog();
368
369 if( !socket )
370 {
371 log->Error( PollerMsg, "Invalid socket, read events unavailable" );
372 return false;
373 }
374
375 //--------------------------------------------------------------------------
376 // Check if the socket is registered
377 //--------------------------------------------------------------------------
378 XrdSysMutexHelper scopedLock( pMutex );
379 SocketMap::const_iterator it = pSocketMap.find( socket );
380 if( it == pSocketMap.end() )
381 {
382 log->Warning( PollerMsg, "%s Socket is not registered",
383 socket->GetName().c_str() );
384 return false;
385 }
386
387 PollerHelper *helper = (PollerHelper*)it->second;
388 XrdSys::IOEvents::Poller *poller = GetPoller( socket );
389
390 //--------------------------------------------------------------------------
391 // Enable read notifications
392 //--------------------------------------------------------------------------
393 if( notify )
394 {
395 if( helper->readEnabled )
396 return true;
397 helper->readTimeout = timeout;
398
399 log->Dump( PollerMsg, "%s Enable read notifications, timeout: %d",
400 socket->GetName().c_str(), timeout );
401
402 if( poller )
403 {
404 const char *errMsg;
405 bool status = helper->channel->Enable( Channel::readEvents, timeout,
406 &errMsg );
407 if( !status )
408 {
409 log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
410 socket->GetName().c_str(), errMsg );
411 return false;
412 }
413 }
414 helper->readEnabled = true;
415 }
416
417 //--------------------------------------------------------------------------
418 // Disable read notifications
419 //--------------------------------------------------------------------------
420 else
421 {
422 if( !helper->readEnabled )
423 return true;
424
425 log->Dump( PollerMsg, "%s Disable read notifications",
426 socket->GetName().c_str() );
427
428 if( poller )
429 {
430 const char *errMsg;
431 bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
432 if( !status )
433 {
434 log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
435 socket->GetName().c_str(), errMsg );
436 return false;
437 }
438 }
439 helper->readEnabled = false;
440 }
441 return true;
442 }
443
444 //----------------------------------------------------------------------------
445 // Notify the handler about write events
446 //----------------------------------------------------------------------------
448 bool notify,
449 uint16_t timeout )
450 {
451 using namespace XrdSys::IOEvents;
452 Log *log = DefaultEnv::GetLog();
453
454 if( !socket )
455 {
456 log->Error( PollerMsg, "Invalid socket, write events unavailable" );
457 return false;
458 }
459
460 //--------------------------------------------------------------------------
461 // Check if the socket is registered
462 //--------------------------------------------------------------------------
463 XrdSysMutexHelper scopedLock( pMutex );
464 SocketMap::const_iterator it = pSocketMap.find( socket );
465 if( it == pSocketMap.end() )
466 {
467 log->Warning( PollerMsg, "%s Socket is not registered",
468 socket->GetName().c_str() );
469 return false;
470 }
471
472 PollerHelper *helper = (PollerHelper*)it->second;
473 XrdSys::IOEvents::Poller *poller = GetPoller( socket );
474
475 //--------------------------------------------------------------------------
476 // Enable write notifications
477 //--------------------------------------------------------------------------
478 if( notify )
479 {
480 if( helper->writeEnabled )
481 return true;
482
483 helper->writeTimeout = timeout;
484
485 log->Dump( PollerMsg, "%s Enable write notifications, timeout: %d",
486 socket->GetName().c_str(), timeout );
487
488 if( poller )
489 {
490 const char *errMsg;
491 bool status = helper->channel->Enable( Channel::writeEvents, timeout,
492 &errMsg );
493 if( !status )
494 {
495 log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
496 socket->GetName().c_str(), errMsg );
497 return false;
498 }
499 }
500 helper->writeEnabled = true;
501 }
502
503 //--------------------------------------------------------------------------
504 // Disable read notifications
505 //--------------------------------------------------------------------------
506 else
507 {
508 if( !helper->writeEnabled )
509 return true;
510
511 log->Dump( PollerMsg, "%s Disable write notifications",
512 socket->GetName().c_str() );
513 if( poller )
514 {
515 const char *errMsg;
516 bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
517 if( !status )
518 {
519 log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
520 socket->GetName().c_str(), errMsg );
521 return false;
522 }
523 }
524 helper->writeEnabled = false;
525 }
526 return true;
527 }
528
529 //----------------------------------------------------------------------------
530 // Check whether the socket is registered with the poller
531 //----------------------------------------------------------------------------
533 {
534 XrdSysMutexHelper scopedLock( pMutex );
535 SocketMap::iterator it = pSocketMap.find( socket );
536 return it != pSocketMap.end();
537 }
538
539 //----------------------------------------------------------------------------
540 // Return poller threads in round-robin fashion
541 //----------------------------------------------------------------------------
542 XrdSys::IOEvents::Poller* PollerBuiltIn::GetNextPoller()
543 {
544 if( pPollerPool.empty() ) return 0;
545
546 PollerPool::iterator ret = pNext;
547 ++pNext;
548 if( pNext == pPollerPool.end() )
549 pNext = pPollerPool.begin();
550 return *ret;
551 }
552
553 //----------------------------------------------------------------------------
554 // Return the poller associated with the respective channel
555 //----------------------------------------------------------------------------
556 XrdSys::IOEvents::Poller* PollerBuiltIn::RegisterAndGetPoller(const Socket * socket)
557 {
558 PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
559
560 if( itr == pPollerMap.end() )
561 {
562 XrdSys::IOEvents::Poller* poller = GetNextPoller();
563 if( poller )
564 pPollerMap[socket->GetFD()] = poller;
565 return poller;
566 }
567
568 return itr->second;
569 }
570
571 void PollerBuiltIn::UnregisterFromPoller( const Socket *socket )
572 {
573 PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
574 if( itr == pPollerMap.end() ) return;
575 pPollerMap.erase( itr );
576 }
577
578 XrdSys::IOEvents::Poller* PollerBuiltIn::GetPoller(const Socket * socket)
579 {
580 PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
581 if( itr == pPollerMap.end() ) return 0;
582 return itr->second;
583 }
584
585 //----------------------------------------------------------------------------
586 // Get the initial value for pNbPoller
587 //----------------------------------------------------------------------------
588 int PollerBuiltIn::GetNbPollerInit()
589 {
590 Env * env = DefaultEnv::GetEnv();
592 env->GetInt("ParallelEvtLoop", ret);
593 return ret;
594 }
595}
#define unlikely(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition XrdClLog.hh:258
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool EnableReadNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool Stop()
Stop polling.
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
A network socket.
std::string GetName() const
Get the string representation of the socket.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
int GetFD() const
Get the file descriptor.
SocketStatus GetStatus() const
Get the socket status.
void Lock(XrdSysMutex *Mutex)
@ allEvents
All of the above.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Enable(int events, int timeout=0, const char **eText=0)
bool Disable(int events, const char **eText=0)
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
const uint64_t PollerMsg
const int DefaultParallelEvtLoop
XrdSysError Log
Definition XrdConfig.cc:113