43 pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle ), pUsePgRead( usepgrd )
58 ToPgInfo( response, chunk );
62 if( !chunk && status->
IsOK() )
79 pSrc->ReportResponse( status, chunk, pHandle );
90 response->
Get( chunk );
91 response->
Set( (
int* )0 );
95 ChunkInfo *rsp =
nullptr;
97 chunk =
new PageInfo( rsp->offset, rsp->length, rsp->buffer );
111 pChunkSize( chunkSize ), pParallel( parallel ), pFileSize( fileSize ), pThread(),
112 pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ),
113 pRunning( false ), pStartTime( 0 ), pTransferTime( 0 ), pUsePgRead( false )
128 int rc = pthread_create( &pThread, 0, Run,
this );
129 if( rc ) pRunning =
false;
133 void* XCpSrc::Run(
void* arg )
136 me->StartDownloading();
141 void XCpSrc::StartDownloading()
143 XRootDStatus st = Initialize();
159 pStartTime = time( 0 );
168 if( GetWork().IsOK() )
continue;
170 else if( st.IsOK() && st.code ==
suDone )
174 if( GetWork().IsOK() )
continue;
176 pTransferTime += time( 0 ) - pStartTime;
187 pStartTime = time( 0 );
196 XRootDStatus *status = pReports.Get();
197 if( !status->IsOK() )
200 std::string myHost = URL( pUrl ).GetHostName();
201 log->Error(
UtilityMsg,
"Failed to read chunk from %s: %s", myHost.c_str(), status->GetErrorMessage().c_str() );
203 if( !Recover().IsOK() )
225 XRootDStatus XCpSrc::Initialize()
234 log->Error(
UtilityMsg,
"Failed to initialize XCp source, no more replicas to try" );
235 return XRootDStatus(
stError );
238 log->Debug(
UtilityMsg,
"Opening %s for reading", pUrl.c_str() );
249 log->Warning(
UtilityMsg,
"Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
255 if( ( !url.IsLocalFile() && !pFile->
IsSecure() ) ||
256 ( url.IsLocalFile() && url.IsMetalink() ) )
270 StatInfo *statInfo = 0;
271 st = pFile->
Stat(
false, statInfo );
274 log->Warning(
UtilityMsg,
"Failed to stat %s: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
278 pFileSize = statInfo->GetSize();
285 std::pair<uint64_t, uint64_t> p = pCtx->
GetBlock();
286 pCurrentOffset = p.first;
287 pBlkEnd = p.second + p.first;
292 XRootDStatus XCpSrc::Recover()
301 log->Error(
UtilityMsg,
"Failed to initialize XCp source, no more replicas to try" );
302 return XRootDStatus(
stError );
305 log->Debug(
UtilityMsg,
"Opening %s for reading", pUrl.c_str() );
317 log->Warning(
UtilityMsg,
"Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
321 if( ( !url.IsLocalFile() && pFile->
IsSecure() ) ||
322 ( url.IsLocalFile() && url.IsMetalink() ) )
336 pRecovered.insert( pOngoing.begin(), pOngoing.end() );
342 pStartTime = time( 0 );
348 XRootDStatus XCpSrc::ReadChunks()
352 while( pOngoing.size() < pParallel && !pRecovered.empty() )
354 std::pair<uint64_t, uint64_t> p;
355 std::map<uint64_t, uint64_t>::iterator itr = pRecovered.begin();
357 pOngoing.insert( p );
358 pRecovered.erase( itr );
360 char *buffer =
new char[p.second];
362 XRootDStatus st = pUsePgRead
363 ? pFile->
PgRead( p.first, p.second, buffer, handler )
364 : pFile->
Read( p.first, p.second, buffer, handler );
369 ReportResponse(
new XRootDStatus( st ), 0, pFile );
374 while( pOngoing.size() < pParallel && pCurrentOffset < pBlkEnd )
376 uint64_t chunkSize = pChunkSize;
377 if( pCurrentOffset + chunkSize > pBlkEnd )
378 chunkSize = pBlkEnd - pCurrentOffset;
379 pOngoing[pCurrentOffset] = chunkSize;
380 char *buffer =
new char[chunkSize];
382 XRootDStatus st = pUsePgRead
383 ? pFile->
PgRead( pCurrentOffset, chunkSize, buffer, handler )
384 : pFile->
Read( pCurrentOffset, chunkSize, buffer, handler );
385 pCurrentOffset += chunkSize;
390 ReportResponse(
new XRootDStatus( st ), 0, pFile );
395 if( pOngoing.empty() )
return XRootDStatus(
stOK,
suDone );
397 if( pRecovered.empty() && pCurrentOffset >= pBlkEnd )
return XRootDStatus(
stOK,
suPartial );
402 void XCpSrc::ReportResponse( XRootDStatus *status, PageInfo *chunk,
File *handle )
414 ignore = !pOngoing.erase( chunk->GetOffset() );
416 else if( FilesEqual( pFile, handle ) )
423 pFailed[pFile] = pOngoing.size();
429 if( !FilesEqual( pFile, handle ) )
436 if( pFailed[handle] == 0 )
440 pFailed.erase( handle );
441 XRootDStatus st = handle->Close();
448 if( status ) pReports.Put( status );
458 pDataTransfered += chunk->GetLength();
463 void XCpSrc::Steal( XCpSrc *src )
470 if ( std::less{}(&pMtx, &src->pMtx) )
472 lck2.
Lock( &src->pMtx );
478 lck2.
Lock( &src->pMtx );
482 std::string myHost = URL( pUrl ).GetHostName(), srcHost = URL( src->pUrl ).GetHostName();
488 pRecovered.insert( src->pOngoing.begin(), src->pOngoing.end() );
489 pRecovered.insert( src->pRecovered.begin(), src->pRecovered.end() );
490 pCurrentOffset = src->pCurrentOffset;
491 pBlkEnd = src->pBlkEnd;
493 src->pOngoing.clear();
494 src->pRecovered.clear();
495 src->pCurrentOffset = 0;
503 log->Debug(
UtilityMsg,
"%s: Stealing everything from %s", myHost.c_str(), srcHost.c_str() );
510 uint64_t myTransferRate =
TransferRate(), srcTransferRate = src->TransferRate();
511 if( myTransferRate == 0 )
return;
512 double fraction = double( myTransferRate ) / double( myTransferRate + srcTransferRate );
514 if( src->pCurrentOffset < src->pBlkEnd )
517 uint64_t blkSize = src->pBlkEnd - src->pCurrentOffset;
518 uint64_t steal =
static_cast<uint64_t
>( round( fraction * blkSize ) );
521 if( blkSize - steal <= pChunkSize )
524 pCurrentOffset = src->pBlkEnd - steal;
525 pBlkEnd = src->pBlkEnd;
526 src->pBlkEnd -= steal;
528 log->Debug(
UtilityMsg,
"%s: Stealing fraction (%f) of block from %s", myHost.c_str(), fraction, srcHost.c_str() );
533 if( !src->pRecovered.empty() )
535 size_t count =
static_cast<size_t>( round( fraction * src->pRecovered.size() ) );
538 std::map<uint64_t, uint64_t>::iterator itr = src->pRecovered.begin();
539 pRecovered.insert( *itr );
540 src->pRecovered.erase( itr );
543 log->Debug(
UtilityMsg,
"%s: Stealing fraction (%f) of recovered chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
554 if( !src->pOngoing.empty() && fraction > 0.7 )
556 size_t count =
static_cast<size_t>( round( fraction * src->pOngoing.size() ) );
559 std::map<uint64_t, uint64_t>::iterator itr = src->pOngoing.begin();
560 pRecovered.insert( *itr );
561 src->pOngoing.erase( itr );
564 log->Debug(
UtilityMsg,
"%s: Stealing fraction (%f) of ongoing chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
568 XRootDStatus XCpSrc::GetWork()
570 std::pair<uint64_t, uint64_t> p = pCtx->
GetBlock();
575 pCurrentOffset = p.first;
576 pBlkEnd = p.first + p.second;
579 std::string myHost = URL( pUrl ).GetHostName();
580 log->Debug(
UtilityMsg,
"%s got next block", myHost.c_str() );
582 return XRootDStatus();
588 if( wLink ) wLink->Delete();
591 if( pCurrentOffset < pBlkEnd || !pRecovered.empty() )
return XRootDStatus();
598 time_t duration = pTransferTime + time( 0 ) - pStartTime;
599 return pDataTransfered / ( duration + 1 );
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
ChunkHandler(XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool IsSecure() const
Check if the file is using an encrypted connection.
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool GetProperty(const std::string &name, std::string &value) const
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
bool SetProperty(const std::string &name, const std::string &value)
Handle an async response.
static bool HasPgRW(const XrdCl::URL &url)
void NotifyInitExpectant()
bool GetNextUrl(std::string &url)
void RemoveSrc(XCpSrc *src)
XCpSrc * WeakestLink(XCpSrc *exclude)
void PutChunk(PageInfo *chunk)
void SetFileSize(int64_t size)
std::pair< uint64_t, uint64_t > GetBlock()
friend class ChunkHandler
static void DeleteChunk(PageInfo *&chunk)
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
void Lock(XrdSysMutex *Mutex)
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const int DefaultCpUsePgWrtRd
const uint64_t UtilityMsg
const uint16_t suContinue
@ Read
Open only for reading.
uint32_t GetLength() const
Get the data length.
bool IsOK() const
We're fine.