42 channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43 readTimeout(0), writeTimeout(0)
50 uint16_t writeTimeout;
60 pSocket( sock ), pHandler( sh ) {}
61 virtual ~SocketCallBack() {};
67 using namespace XrdCl;
70 if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
71 if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
72 if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
73 if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
75 Log *log = DefaultEnv::GetLog();
79 pSocket->GetName().c_str(),
80 SocketHandler::EventTypeToString( ev ).c_str() );
83 pHandler->Event( ev, pSocket );
111 SocketMap::iterator it;
112 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
114 PollerHelper *helper = (PollerHelper*)it->second;
115 if( helper->channel ) helper->channel->Delete();
116 delete helper->callBack;
135 log->
Debug(
PollerMsg,
"Creating and starting the built-in poller..." );
138 const char *errMsg = 0;
140 for(
int i = 0; i < pNbPoller; ++i )
145 log->
Error(
PollerMsg,
"Unable to create the internal poller object: "
149 pPollerPool.push_back( poller );
152 pNext = pPollerPool.begin();
160 SocketMap::iterator it;
161 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
163 PollerHelper *helper = (PollerHelper*)it->second;
164 Socket *socket = it->first;
165 helper->channel =
new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
167 if( helper->readEnabled )
169 bool status = helper->channel->
Enable( IOEvents::Channel::readEvents,
170 helper->readTimeout, &errMsg );
174 "while re-starting %s (%s)",
XrdSysE2T( errno ), errMsg );
180 if( helper->writeEnabled )
182 bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
183 helper->writeTimeout, &errMsg );
187 "while re-starting %s (%s)",
XrdSysE2T( errno ), errMsg );
208 if( pPollerPool.empty() )
210 log->
Debug(
PollerMsg,
"Stopping a poller that has not been started" );
214 while( !pPollerPool.empty() )
217 if( *pNext == poller )
218 pNext = pPollerPool.begin();
219 pPollerPool.pop_back();
221 if( !poller )
continue;
226 scopedLock.
Lock( &pMutex );
228 pNext = pPollerPool.end();
231 SocketMap::iterator it;
232 const char *errMsg = 0;
234 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
236 PollerHelper *helper = (PollerHelper*)it->second;
237 if( !helper->channel )
continue;
238 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
241 Socket *socket = it->first;
242 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
243 socket->
GetName().c_str(), errMsg );
245 helper->channel->Delete();
270 log->
Error(
PollerMsg,
"Socket is not in a state valid for polling" );
274 log->
Debug(
PollerMsg,
"Adding socket %p to the poller", (
void*)socket );
279 SocketMap::const_iterator it = pSocketMap.find( socket );
280 if( it != pSocketMap.end() )
294 log->
Error(
PollerMsg,
"No poller available, can not add socket" );
298 PollerHelper *helper =
new PollerHelper();
299 helper->callBack = new ::SocketCallBack( socket, handler );
309 pSocketMap[socket] = helper;
325 SocketMap::iterator it = pSocketMap.find( socket );
326 if( it == pSocketMap.end() )
333 UnregisterFromPoller( socket );
338 PollerHelper *helper = (PollerHelper*)it->second;
339 pSocketMap.erase( it );
342 if( helper->channel )
345 bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
348 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
349 socket->
GetName().c_str(), errMsg );
352 helper->channel->Delete();
354 delete helper->callBack;
371 log->
Error(
PollerMsg,
"Invalid socket, read events unavailable" );
379 SocketMap::const_iterator it = pSocketMap.find( socket );
380 if( it == pSocketMap.end() )
387 PollerHelper *helper = (PollerHelper*)it->second;
395 if( helper->readEnabled )
397 helper->readTimeout = timeout;
399 log->
Dump(
PollerMsg,
"%s Enable read notifications, timeout: %d",
400 socket->
GetName().c_str(), timeout );
405 bool status = helper->channel->Enable( Channel::readEvents, timeout,
409 log->
Error(
PollerMsg,
"%s Unable to enable read notifications: %s",
410 socket->
GetName().c_str(), errMsg );
414 helper->readEnabled =
true;
422 if( !helper->readEnabled )
431 bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
434 log->
Error(
PollerMsg,
"%s Unable to disable read notifications: %s",
435 socket->
GetName().c_str(), errMsg );
439 helper->readEnabled =
false;
456 log->
Error(
PollerMsg,
"Invalid socket, write events unavailable" );
464 SocketMap::const_iterator it = pSocketMap.find( socket );
465 if( it == pSocketMap.end() )
472 PollerHelper *helper = (PollerHelper*)it->second;
480 if( helper->writeEnabled )
483 helper->writeTimeout = timeout;
485 log->
Dump(
PollerMsg,
"%s Enable write notifications, timeout: %d",
486 socket->
GetName().c_str(), timeout );
491 bool status = helper->channel->Enable( Channel::writeEvents, timeout,
495 log->
Error(
PollerMsg,
"%s Unable to enable write notifications: %s",
496 socket->
GetName().c_str(), errMsg );
500 helper->writeEnabled =
true;
508 if( !helper->writeEnabled )
516 bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
519 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
520 socket->
GetName().c_str(), errMsg );
524 helper->writeEnabled =
false;
535 SocketMap::iterator it = pSocketMap.find( socket );
536 return it != pSocketMap.end();
544 if( pPollerPool.empty() )
return 0;
546 PollerPool::iterator ret = pNext;
548 if( pNext == pPollerPool.end() )
549 pNext = pPollerPool.begin();
558 PollerMap::iterator itr = pPollerMap.find( socket->
GetFD() );
560 if( itr == pPollerMap.end() )
564 pPollerMap[socket->
GetFD()] = poller;
571 void PollerBuiltIn::UnregisterFromPoller(
const Socket *socket )
573 PollerMap::iterator itr = pPollerMap.find( socket->
GetFD() );
574 if( itr == pPollerMap.end() )
return;
575 pPollerMap.erase( itr );
580 PollerMap::iterator itr = pPollerMap.find( socket->
GetFD() );
581 if( itr == pPollerMap.end() )
return 0;
588 int PollerBuiltIn::GetNbPollerInit()
592 env->
GetInt(
"ParallelEvtLoop", ret);
const char * XrdSysE2T(int errcode)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
LogLevel GetLevel() const
Get the log level.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool EnableReadNotification(Socket *socket, bool notify, uint16_t timeout=60)
virtual bool Stop()
Stop polling.
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
std::string GetName() const
Get the string representation of the socket.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
int GetFD() const
Get the file descriptor.
SocketStatus GetStatus() const
Get the socket status.
void Lock(XrdSysMutex *Mutex)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
bool Enable(int events, int timeout=0, const char **eText=0)
const int DefaultParallelEvtLoop