XRootD
XrdClXCpSrc.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDCL_XRDCLXCPSRC_HH_
26 #define SRC_XRDCL_XRDCLXCPSRC_HH_
27 
28 #include "XrdCl/XrdClFile.hh"
29 #include "XrdCl/XrdClSyncQueue.hh"
30 #include "XrdSys/XrdSysPthread.hh"
31 
32 #include <atomic>
33 
34 namespace XrdCl
35 {
36 
37 class XCpCtx;
38 
39 class XCpSrc
40 {
41  friend class ChunkHandler;
42 
43  public:
44 
55  XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx );
56 
60  int Start();
61 
65  void Stop()
66  {
67  pRunning = false;
68  }
69 
73  void Delete()
74  {
75  XrdSysMutexHelper lck( pMtx );
76  --pRefCount;
77  if( !pRefCount )
78  {
79  lck.UnLock();
80  delete this;
81  }
82  }
83 
90  {
91  XrdSysMutexHelper lck( pMtx );
92  // if Ctx is trying to increase our ref count it is possible
93  // we are already in our destrutor, waiting for RemoveSrc().
94  if( !pRefCount ) return nullptr;
95  ++pRefCount;
96  return this;
97  }
98 
102  bool IsRunning()
103  {
104  return pRunning;
105  }
106 
111  bool HasData()
112  {
113  XrdSysMutexHelper lck( pMtx );
114  return pCurrentOffset < pBlkEnd || !pRecovered.empty() || !pOngoing.empty();
115  }
116 
117 
118 
124  uint64_t TransferRate();
125 
131  static void DeleteChunk( PageInfo *&chunk )
132  {
133  if( chunk )
134  {
135  delete[] static_cast<char*>( chunk->GetBuffer() );
136  delete chunk;
137  chunk = 0;
138  }
139  }
140 
141  private:
142 
148  virtual ~XCpSrc();
149 
153  static void* Run( void* arg );
154 
159  void StartDownloading();
160 
171  XRootDStatus Initialize();
172 
180  XRootDStatus Recover();
181 
194  XRootDStatus ReadChunks();
195 
210  void Steal( XCpSrc *src );
211 
220  XRootDStatus GetWork();
221 
230  void ReportResponse( XRootDStatus *status, PageInfo *chunk, File *handle );
231 
235  template<typename T>
236  static void DeletePtr( T *&obj )
237  {
238  delete obj;
239  obj = 0;
240  }
241 
248  static bool FilesEqual( File *f1, File *f2 )
249  {
250  if( !f1 || !f2 ) return false;
251 
252  const std::string lastURL = "LastURL";
253  std::string url1, url2;
254 
255  f1->GetProperty( lastURL, url1 );
256  f2->GetProperty( lastURL, url2 );
257 
258  // remove cgi information
259  size_t pos = url1.find( '?' );
260  if( pos != std::string::npos )
261  url1 = url1.substr( 0 , pos );
262  pos = url2.find( '?' );
263  if( pos != std::string::npos )
264  url2 = url2.substr( 0 , pos );
265 
266  return url1 == url2;
267  }
268 
272  uint32_t pChunkSize;
273 
277  uint8_t pParallel;
278 
282  int64_t pFileSize;
283 
287  pthread_t pThread;
288 
292  XCpCtx *pCtx;
293 
297  std::string pUrl;
298 
302  File *pFile;
303 
304  std::map<File*, uint8_t> pFailed;
305 
309  uint64_t pCurrentOffset;
310 
314  uint64_t pBlkEnd;
315 
319  std::atomic<uint64_t> pDataTransfered;
320 
325  std::map<uint64_t, uint64_t> pOngoing;
326 
331  std::map<uint64_t, uint64_t> pRecovered;
332 
339  SyncQueue<XRootDStatus*> pReports;
340 
344  XrdSysRecMutex pMtx;
345 
349  size_t pRefCount;
350 
356  std::atomic<bool> pRunning;
357 
361  time_t pStartTime;
362 
367  time_t pTransferTime;
368 
373  bool pUsePgRead;
374 };
375 
376 } /* namespace XrdCl */
377 
378 #endif /* SRC_XRDCL_XRDCLXCPSRC_HH_ */
XrdOucString File
A file.
Definition: XrdClFile.hh:46
bool IsRunning()
Definition: XrdClXCpSrc.hh:102
static void DeleteChunk(PageInfo *&chunk)
Definition: XrdClXCpSrc.hh:131
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
Definition: XrdClXCpSrc.cc:110
XCpSrc * Self()
Definition: XrdClXCpSrc.hh:89
uint64_t TransferRate()
Definition: XrdClXCpSrc.cc:596
void * GetBuffer()
Get the buffer.