XRootD
XrdClXCpCtx.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDCL_XRDCLXCPCTX_HH_
26 #define SRC_XRDCL_XRDCLXCPCTX_HH_
27 
28 #include "XrdCl/XrdClSyncQueue.hh"
30 #include "XrdSys/XrdSysPthread.hh"
31 
32 #include <cstdint>
33 #include <iostream>
34 
35 namespace XrdCl
36 {
37 
38 class XCpSrc;
39 
40 class XCpCtx
41 {
42  public:
43 
56  XCpCtx( const std::vector<std::string> &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize );
57 
62  void Delete()
63  {
64  XrdSysMutexHelper lckmtx( pMtx );
65  --pRefCount;
66  if( !pRefCount )
67  {
68  lckmtx.UnLock();
69  delete this;
70  return;
71  }
72  lckmtx.UnLock();
73 
74  XrdSysCondVarHelper lckcv( pDoneCV );
75  pDone = true;
76  pDoneCV.Broadcast();
77  lckcv.UnLock();
78 
79  lckcv.Lock( &pDeleteCV );
80  while( !pDelete ) pDeleteCV.Wait();
81  lckcv.UnLock();
82  delete this;
83  }
84 
88  void Release()
89  {
90  XrdSysMutexHelper lck( pMtx );
91  --pRefCount;
92  if( !pRefCount )
93  {
94  XrdSysCondVarHelper lckcv( pDeleteCV );
95  pDelete = true;
96  pDeleteCV.Broadcast();
97  }
98  }
99 
106  {
107  XrdSysMutexHelper lck( pMtx );
108  ++pRefCount;
109  return this;
110  }
111 
119  bool GetNextUrl( std::string & url );
120 
128  XCpSrc* WeakestLink( XCpSrc *exclude );
129 
135  void PutChunk( PageInfo* chunk );
136 
142  std::pair<uint64_t, uint64_t> GetBlock();
143 
151  void SetFileSize( int64_t size );
152 
157  int64_t GetSize()
158  {
159  XrdSysCondVarHelper lck( pFileSizeCV );
160  while( pFileSize < 0 && GetRunning() > 0 ) pFileSizeCV.Wait();
161  return pFileSize;
162  }
163 
173 
189 
195  void RemoveSrc( XCpSrc *src )
196  {
197  XrdSysMutexHelper lck( pMtx );
198  pSources.remove( src );
199  }
200 
208  void NotifyIdleSrc();
209 
218  bool AllDone();
219 
226  {
227  pFileSizeCV.Broadcast();
228  }
229 
230 
231  private:
232 
238  size_t GetRunning();
239 
245  virtual ~XCpCtx();
246 
251  std::queue<std::string> pUrls;
252 
256  uint64_t pBlockSize;
257 
261  uint8_t pParallelSrc;
262 
266  uint32_t pChunkSize;
267 
271  uint8_t pParallelChunks;
272 
278  uint64_t pOffset;
279 
283  int64_t pFileSize;
284 
290  XrdSysCondVar pFileSizeCV;
291 
296  std::list<XCpSrc*> pSources;
297 
302  SyncQueue<PageInfo*> pSink;
303 
307  uint64_t pDataReceived;
308 
313  bool pDone;
314 
319  XrdSysCondVar pDoneCV;
320 
324  XrdSysMutex pMtx;
325 
329  size_t pRefCount;
330 
336  XrdSysCondVar pDeleteCV;
337 
341  bool pDelete;
342 };
343 
344 } /* namespace XrdCl */
345 
346 #endif /* SRC_XRDCL_XRDCLXCPCTX_HH_ */
A synchronized queue.
void NotifyInitExpectant()
Definition: XrdClXCpCtx.hh:225
void Release()
Definition: XrdClXCpCtx.hh:88
void NotifyIdleSrc()
Definition: XrdClXCpCtx.cc:188
bool GetNextUrl(std::string &url)
Definition: XrdClXCpCtx.cc:57
void RemoveSrc(XCpSrc *src)
Definition: XrdClXCpCtx.hh:195
XCpSrc * WeakestLink(XCpSrc *exclude)
Definition: XrdClXCpCtx.cc:66
XCpCtx * Self()
Definition: XrdClXCpCtx.hh:105
void PutChunk(PageInfo *chunk)
Definition: XrdClXCpCtx.cc:90
XCpCtx(const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
Definition: XrdClXCpCtx.cc:36
int64_t GetSize()
Definition: XrdClXCpCtx.hh:157
void SetFileSize(int64_t size)
Definition: XrdClXCpCtx.cc:107
std::pair< uint64_t, uint64_t > GetBlock()
Definition: XrdClXCpCtx.cc:95
XRootDStatus Initialize()
Definition: XrdClXCpCtx.cc:124
XRootDStatus GetChunk(XrdCl::PageInfo &ci)
Definition: XrdClXCpCtx.cc:156
void Lock(XrdSysCondVar *CndVar)