XRootD
XrdPfcFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 
20 #include "XrdPfcFile.hh"
21 #include "XrdPfc.hh"
22 #include "XrdPfcResourceMonitor.hh"
23 #include "XrdPfcIO.hh"
24 #include "XrdPfcTrace.hh"
25 
26 #include "XProtocol/XProtocol.hh"
27 #include "XrdSys/XrdSysTimer.hh"
28 #include "XrdOss/XrdOss.hh"
29 #include "XrdOuc/XrdOucEnv.hh"
31 
32 #include "XrdCl/XrdClURL.hh"
33 
34 #include <cassert>
35 #include <cstdio>
36 #include <sstream>
37 #include <unordered_map>
38 
39 #include <fcntl.h>
40 
41 using namespace XrdPfc;
42 
43 namespace
44 {
45 
46 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
47 
48 Cache* cache() { return &Cache::GetInstance(); }
49 
50 }
51 
52 const char *File::m_traceID = "File";
53 
54 //------------------------------------------------------------------------------
55 
56 File::File(const std::string& path, long long iOffset, long long iFileSize) :
57  m_ref_cnt(0),
58  m_data_file(0),
59  m_info_file(0),
60  m_cfi(Cache::TheOne().GetTrace(), Cache::TheOne().is_prefetch_enabled()),
61  m_filename(path),
62  m_offset(iOffset),
63  m_file_size(iFileSize),
64  m_current_io(m_io_set.end()),
65  m_ios_in_detach(0),
66  m_non_flushed_cnt(0),
67  m_in_sync(false),
68  m_detach_time_logged(false),
69  m_in_shutdown(false),
70  m_state_cond(0),
71  m_block_size(0),
72  m_num_blocks(0),
73  m_resmon_token(-1),
74  m_prefetch_state(kOff),
75  m_prefetch_bytes(0),
76  m_prefetch_read_cnt(0),
77  m_prefetch_hit_cnt(0),
78  m_prefetch_score(0)
79 {}
80 
81 File::~File()
82 {
83  TRACEF(Debug, "~File() for ");
84 }
85 
86 void File::Close()
87 {
88  // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
89  // A stat is called after close to re-check that m_stat_blocks have been reported correctly
90  // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
91  // in report_and_merge_delta_stats() below.
92  //
93  // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
94  // get reported in as stat.st_blocks.
95  // The reported number is correct in a stat immediately following a close.
96  // If one starts off by writing the last byte of the file, this pre-allocation does not get
97  // triggered up to that point. But comes back with a vengeance right after.
98  //
99  // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
100 
101  if (m_info_file)
102  {
103  TRACEF(Debug, "Close() closing info-file ");
104  m_info_file->Close();
105  delete m_info_file;
106  m_info_file = nullptr;
107  }
108 
109  if (m_data_file)
110  {
111  TRACEF(Debug, "Close() closing data-file ");
112  m_data_file->Close();
113  delete m_data_file;
114  m_data_file = nullptr;
115  }
116 
117  if (m_resmon_token >= 0)
118  {
119  // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
120  // but in this case the file will get unlinked by the cache and reported as purge event.
121  // We check if the reported st_blocks so far is correct.
122  if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
123  struct stat s;
124  int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
125  if (sr == 0 && s.st_blocks != m_st_blocks) {
126  Stats stats;
127  stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
128  m_st_blocks = s.st_blocks;
129  Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
130  }
131  }
132 
133  Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
134  }
135 
136  TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
137 }
138 
139 //------------------------------------------------------------------------------
140 
141 File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO)
142 {
143  File *file = new File(path, offset, fileSize);
144  if ( ! file->Open(inputIO))
145  {
146  delete file;
147  file = 0;
148  }
149  return file;
150 }
151 
152 //------------------------------------------------------------------------------
153 
155 {
156  // Called from Cache::Unlink() when the file is currently open.
157  // Cache::Unlink is also called on FSync error and when wrong number of bytes
158  // is received from a remote read.
159  //
160  // From this point onward the file will not be written to, cinfo file will
161  // not be updated, and all new read requests will return -ENOENT.
162  //
163  // File's entry in the Cache's active map is set to nullptr and will be
164  // removed from there shortly, in any case, well before this File object
165  // shuts down. Cache::Unlink() also reports the appropriate purge event.
166 
167  XrdSysCondVarHelper _lck(m_state_cond);
168 
169  m_in_shutdown = true;
170 
171  if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
172  {
173  m_prefetch_state = kStopped;
174  cache()->DeRegisterPrefetchFile(this);
175  }
176 
177  report_and_merge_delta_stats();
178 
179  return m_st_blocks;
180 }
181 
182 //------------------------------------------------------------------------------
183 
184 void File::check_delta_stats()
185 {
186  // Called under m_state_cond lock.
187  // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
188  if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
189  report_and_merge_delta_stats();
190 }
191 
192 void File::report_and_merge_delta_stats()
193 {
194  // Called under m_state_cond lock.
195  struct stat s;
196  m_data_file->Fstat(&s);
197  // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
198  // aggressive pre-allocation in this field (XFS, 4GB).
199  long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
200  : m_file_size >> 9;
201  long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
202  m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
203  m_st_blocks = st_blocks_to_report;
204  Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
205  m_stats.AddUp(m_delta_stats);
206  m_delta_stats.Reset();
207 }
208 
209 //------------------------------------------------------------------------------
210 
212 {
213  TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
214 
215  XrdSysCondVarHelper _lck(m_state_cond);
216  dec_ref_count(b);
217 }
218 
219 void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
220 {
221  TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
222 
223  XrdSysCondVarHelper _lck(m_state_cond);
224 
225  for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
226  {
227  dec_ref_count(*i);
228  }
229 }
230 
231 //------------------------------------------------------------------------------
232 
234 {
235  std::string loc(io->GetLocation());
236  XrdSysCondVarHelper _lck(m_state_cond);
237  insert_remote_location(loc);
238 }
239 
240 //------------------------------------------------------------------------------
241 
243 {
244  // Returns true if delay is needed.
245 
246  TRACEF(Debug, "ioActive start for io " << io);
247 
248  std::string loc(io->GetLocation());
249 
250  {
251  XrdSysCondVarHelper _lck(m_state_cond);
252 
253  IoSet_i mi = m_io_set.find(io);
254 
255  if (mi != m_io_set.end())
256  {
257  unsigned int n_active_reads = io->m_active_read_reqs;
258 
259  TRACE(Info, "ioActive for io " << io <<
260  ", active_reads " << n_active_reads <<
261  ", active_prefetches " << io->m_active_prefetches <<
262  ", allow_prefetching " << io->m_allow_prefetching <<
263  ", ios_in_detach " << m_ios_in_detach);
264  TRACEF(Info,
265  "\tio_map.size() " << m_io_set.size() <<
266  ", block_map.size() " << m_block_map.size() << ", file");
267 
268  insert_remote_location(loc);
269 
270  io->m_allow_prefetching = false;
271  io->m_in_detach = true;
272 
273  // Check if any IO is still available for prfetching. If not, stop it.
274  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
275  {
276  if ( ! select_current_io_or_disable_prefetching(false) )
277  {
278  TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
279  }
280  }
281 
282  // On last IO, consider write queue blocks. Note, this also contains
283  // blocks being prefetched.
284 
285  bool io_active_result;
286 
287  if (n_active_reads > 0)
288  {
289  io_active_result = true;
290  }
291  else if (m_io_set.size() - m_ios_in_detach == 1)
292  {
293  io_active_result = ! m_block_map.empty();
294  }
295  else
296  {
297  io_active_result = io->m_active_prefetches > 0;
298  }
299 
300  if ( ! io_active_result)
301  {
302  ++m_ios_in_detach;
303  }
304 
305  TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
306 
307  return io_active_result;
308  }
309  else
310  {
311  TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
312  return false;
313  }
314  }
315 }
316 
317 //------------------------------------------------------------------------------
318 
320 {
321  XrdSysCondVarHelper _lck(m_state_cond);
322  m_detach_time_logged = false;
323 }
324 
326 {
327  // Returns true if sync is required.
328  // This method is called after corresponding IO is detached from PosixCache.
329 
330  XrdSysCondVarHelper _lck(m_state_cond);
331  if ( ! m_in_shutdown)
332  {
333  if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
334  {
335  report_and_merge_delta_stats();
336  m_cfi.WriteIOStatDetach(m_stats);
337  m_detach_time_logged = true;
338  m_in_sync = true;
339  TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
340  return true;
341  }
342  }
343  TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
344  return false;
345 }
346 
347 //------------------------------------------------------------------------------
348 
349 void File::AddIO(IO *io)
350 {
351  // Called from Cache::GetFile() when a new IO asks for the file.
352 
353  TRACEF(Debug, "AddIO() io = " << (void*)io);
354 
355  time_t now = time(0);
356  std::string loc(io->GetLocation());
357 
358  m_state_cond.Lock();
359 
360  IoSet_i mi = m_io_set.find(io);
361 
362  if (mi == m_io_set.end())
363  {
364  m_io_set.insert(io);
365  io->m_attach_time = now;
366  m_delta_stats.IoAttach();
367 
368  insert_remote_location(loc);
369 
370  if (m_prefetch_state == kStopped)
371  {
372  m_prefetch_state = kOn;
373  cache()->RegisterPrefetchFile(this);
374  }
375  }
376  else
377  {
378  TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
379  }
380 
381  m_state_cond.UnLock();
382 }
383 
384 //------------------------------------------------------------------------------
385 
387 {
388  // Called from Cache::ReleaseFile.
389 
390  TRACEF(Debug, "RemoveIO() io = " << (void*)io);
391 
392  time_t now = time(0);
393 
394  m_state_cond.Lock();
395 
396  IoSet_i mi = m_io_set.find(io);
397 
398  if (mi != m_io_set.end())
399  {
400  if (mi == m_current_io)
401  {
402  ++m_current_io;
403  }
404 
405  m_delta_stats.IoDetach(now - io->m_attach_time);
406  m_io_set.erase(mi);
407  --m_ios_in_detach;
408 
409  if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
410  {
411  TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
412  m_prefetch_state = kStopped;
413  cache()->DeRegisterPrefetchFile(this);
414  }
415  }
416  else
417  {
418  TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
419  }
420 
421  m_state_cond.UnLock();
422 }
423 
424 //------------------------------------------------------------------------------
425 
426 bool File::Open(XrdOucCacheIO* inputIO)
427 {
428  // Sets errno accordingly.
429 
430  static const char *tpfx = "Open() ";
431 
432  TRACEF(Dump, tpfx << "entered");
433 
434  // Before touching anything, check with ResourceMonitor if a scan is in progress.
435  // This function will wait internally if needed until it is safe to proceed.
436  Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
437 
439 
440  XrdOss &myOss = * Cache::GetInstance().GetOss();
441  const char *myUser = conf.m_username.c_str();
442  XrdOucEnv myEnv;
443  struct stat data_stat, info_stat;
444 
445  std::string ifn = m_filename + Info::s_infoExtension;
446 
447  bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
448  bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
449 
450  // Create the data file itself.
451  char size_str[32]; sprintf(size_str, "%lld", m_file_size);
452  myEnv.Put("oss.asize", size_str);
453  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
454 
455  int res;
456 
457  if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
458  {
459  TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
460  errno = -res;
461  return false;
462  }
463 
464  m_data_file = myOss.newFile(myUser);
465  if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
466  {
467  TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
468  errno = -res;
469  delete m_data_file; m_data_file = 0;
470  return false;
471  }
472 
473  myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
474  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
475  if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
476  {
477  TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
478  errno = -res;
479  m_data_file->Close(); delete m_data_file; m_data_file = 0;
480  return false;
481  }
482 
483  m_info_file = myOss.newFile(myUser);
484  if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
485  {
486  TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
487  errno = -res;
488  delete m_info_file; m_info_file = 0;
489  m_data_file->Close(); delete m_data_file; m_data_file = 0;
490  return false;
491  }
492 
493  bool initialize_info_file = true;
494 
495  if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
496  {
497  TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
498  ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
499  ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
500  ", block_size=" << (m_cfi.GetBufferSize() >> 10) << "k)");
501 
502  // Check if data file exists and is of reasonable size.
503  if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
504  {
505  initialize_info_file = false;
506  } else {
507  TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
508  m_cfi.ResetAllAccessStats();
509  m_data_file->Ftruncate(0);
510  // data-file might not have existed at entry -- data_stat is then undefined
511  if (data_existed)
512  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
513  }
514  }
515 
516  if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
517  {
518  if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
519  conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
520  {
521  TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
522  initialize_info_file = true;
523  m_cfi.ResetAllAccessStats();
524  m_data_file->Ftruncate(0);
525  // data-file is known to exist due to checks in the previous if block
526  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
527  } else {
528  // TODO: If the file is complete, we don't need to reset net cksums.
529  m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
530  }
531  }
532 
533  // Check if we have pfc url arguments.
534  long long pfc_blocksize = conf.m_bufferSize;
535  int pfc_prefetch = conf.m_prefetch_max_blocks;
537  {
538  parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
539  }
540 
541  if (initialize_info_file)
542  {
543  m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
544  m_cfi.SetCkSumState(conf.get_cs_Chk());
545  m_cfi.ResetNoCkSumTime();
546  m_cfi.Write(m_info_file, ifn.c_str());
547  m_info_file->Fsync();
548  cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
549  TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()
550  << " block size = " << pfc_blocksize);
551  }
552  else
553  {
554  if (futimens(m_info_file->getFD(), NULL)) {
555  TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
556  }
557  if (pfc_blocksize != conf.m_bufferSize) {
558  TRACEF(Info, tpfx << "URL CGI pfc.blocksize ignored for an already existing file");
559  }
560  }
561 
562  m_cfi.WriteIOStatAttach();
563  m_state_cond.Lock();
564  m_block_size = m_cfi.GetBufferSize();
565  m_num_blocks = m_cfi.GetNBlocks();
566  m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
567  m_prefetch_max_blocks_in_flight = pfc_prefetch;
568  if (pfc_prefetch != conf.m_prefetch_max_blocks)
569  TRACEF(Debug, tpfx << "pfc.prefetch set to " << pfc_prefetch << " via CGI parameter");
570 
571  m_data_file->Fstat(&data_stat);
572  m_st_blocks = data_stat.st_blocks;
573 
574  m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
575  constexpr long long MB = 1024 * 1024;
576  m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
577  // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
578  // actual threshold based on return values from register_file_update_stats().
579 
580  m_state_cond.UnLock();
581 
582  return true;
583 }
584 
585 void File::parse_pfc_url_args(XrdOucCacheIO* inputIO, long long &pfc_blocksize, int &pfc_prefetch) const
586 {
587  const Configuration &conf = Cache::TheOne().RefConfiguration();
588 
589  XrdCl::URL url(inputIO->Path());
590  auto const & urlp = url.GetParams();
591 
592  auto extract = [&](const std::string &key, std::string &value) -> bool {
593  auto it = urlp.find(key);
594  if (it != urlp.end()) {
595  value = it->second;
596  return true;
597  } else {
598  value.clear();
599  return false;
600  }
601  };
602 
603  std::string val;
604  if (conf.m_cgi_blocksize_allowed && extract("pfc.blocksize", val))
605  {
606  const char *tpfx = "File::Open::urlcgi pfc.blocksize ";
607  long long bsize;
608  if (Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
610  {
611  pfc_blocksize = bsize;
612  } else {
613  TRACEF(Error, tpfx << "Error processing the parameter.");
614  }
615  }
616  if (conf.m_cgi_prefetch_allowed && extract("pfc.prefetch", val))
617  {
618  const char *tpfx = "File::Open::urlcgi pfc.prefetch ";
619  int pref;
620  if (Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
622  {
623  pfc_prefetch = pref;
624  } else {
625  TRACEF(Error, tpfx << "Error processing the parameter.");
626  }
627  }
628 }
629 
630 //------------------------------------------------------------------------------
631 
632 int File::Fstat(struct stat &sbuff)
633 {
634  // Stat on an open file.
635  // Corrects size to actual full size of the file.
636  // Sets atime to 0 if the file is only partially downloaded, in accordance
637  // with pfc.onlyifcached settings.
638  // Called from IO::Fstat() and Cache::Stat() when the file is active.
639  // Returns 0 on success, -errno on error.
640 
641  int res;
642 
643  if ((res = m_data_file->Fstat(&sbuff))) return res;
644 
645  sbuff.st_size = m_file_size;
646 
647  bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
648  if ( ! is_cached)
649  sbuff.st_atime = 0;
650 
651  return 0;
652 }
653 
654 //==============================================================================
655 // Read and helpers
656 //==============================================================================
657 
658 bool File::overlap(int blk, // block to query
659  long long blk_size, //
660  long long req_off, // offset of user request
661  int req_size, // size of user request
662  // output:
663  long long &off, // offset in user buffer
664  long long &blk_off, // offset in block
665  int &size) // size to copy
666 {
667  const long long beg = blk * blk_size;
668  const long long end = beg + blk_size;
669  const long long req_end = req_off + req_size;
670 
671  if (req_off < end && req_end > beg)
672  {
673  const long long ovlp_beg = std::max(beg, req_off);
674  const long long ovlp_end = std::min(end, req_end);
675 
676  off = ovlp_beg - req_off;
677  blk_off = ovlp_beg - beg;
678  size = (int) (ovlp_end - ovlp_beg);
679 
680  assert(size <= blk_size);
681  return true;
682  }
683  else
684  {
685  return false;
686  }
687 }
688 
689 //------------------------------------------------------------------------------
690 
691 Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
692 {
693  // Must be called w/ state_cond locked.
694  // Checks on size etc should be done before.
695  //
696  // Reference count is 0 so increase it in calling function if you want to
697  // catch the block while still in memory.
698 
699  const long long off = i * m_block_size;
700  const int last_block = m_num_blocks - 1;
701  const bool cs_net = cache()->RefConfiguration().is_cschk_net();
702 
703  int blk_size, req_size;
704  if (i == last_block) {
705  blk_size = req_size = m_file_size - off;
706  if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
707  } else {
708  blk_size = req_size = m_block_size;
709  }
710 
711  Block *b = 0;
712  char *buf = cache()->RequestRAM(req_size);
713 
714  if (buf)
715  {
716  b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
717 
718  if (b)
719  {
720  m_block_map[i] = b;
721 
722  // Actual Read request is issued in ProcessBlockRequests().
723 
724  if (m_prefetch_state == kOn && (int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
725  {
726  m_prefetch_state = kHold;
727  cache()->DeRegisterPrefetchFile(this);
728  }
729  }
730  else
731  {
732  TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
733  }
734  }
735 
736  return b;
737 }
738 
739 void File::ProcessBlockRequest(Block *b)
740 {
741  // This *must not* be called with block_map locked.
742 
744 
745  if (XRD_TRACE What >= TRACE_Dump) {
746  char buf[256];
747  snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
748  b->get_offset()/m_block_size, (void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (void*)b->get_buff(), (void*)brh);
749  TRACEF(Dump, "ProcessBlockRequest() " << buf);
750  }
751 
752  if (b->req_cksum_net())
753  {
754  b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
755  b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
756  } else {
757  b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
758  }
759 }
760 
761 void File::ProcessBlockRequests(BlockList_t& blks)
762 {
763  // This *must not* be called with block_map locked.
764 
765  for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
766  {
767  ProcessBlockRequest(*bi);
768  }
769 }
770 
771 //------------------------------------------------------------------------------
772 
773 void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
774 {
775  int n_chunks = ioVec.size();
776  int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
777 
778  TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
779  ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
780 
781  DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
782 
783  int pos = 0;
784  while (n_chunks > XrdProto::maxRvecsz) {
785  io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
786  pos += XrdProto::maxRvecsz;
787  n_chunks -= XrdProto::maxRvecsz;
788  }
789  io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
790 }
791 
792 //------------------------------------------------------------------------------
793 
794 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
795 {
796  TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
797 
798  long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
799 
800  if (rs < 0)
801  {
802  TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
803  return rs;
804  }
805 
806  if (rs != expected_size)
807  {
808  TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
809  return -EIO;
810  }
811 
812  return (int) rs;
813 }
814 
815 //------------------------------------------------------------------------------
816 
817 int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
818 {
819  // rrc_func is ONLY called from async processing.
820  // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
821  // This streamlines implementation of synchronous IO::Read().
822 
823  TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
824 
825  m_state_cond.Lock();
826 
827  if (m_in_shutdown || io->m_in_detach)
828  {
829  m_state_cond.UnLock();
830  return m_in_shutdown ? -ENOENT : -EBADF;
831  }
832 
833  // Shortcut -- file is fully downloaded.
834 
835  if (m_cfi.IsComplete())
836  {
837  m_state_cond.UnLock();
838  int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
839  if (ret > 0) {
840  XrdSysCondVarHelper _lck(m_state_cond);
841  m_delta_stats.AddBytesHit(ret);
842  check_delta_stats();
843  }
844  return ret;
845  }
846 
847  XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
848 
849  return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
850 }
851 
852 //------------------------------------------------------------------------------
853 
854 int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
855 {
856  TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
857 
858  m_state_cond.Lock();
859 
860  if (m_in_shutdown || io->m_in_detach)
861  {
862  m_state_cond.UnLock();
863  return m_in_shutdown ? -ENOENT : -EBADF;
864  }
865 
866  // Shortcut -- file is fully downloaded.
867 
868  if (m_cfi.IsComplete())
869  {
870  m_state_cond.UnLock();
871  int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
872  if (ret > 0) {
873  XrdSysCondVarHelper _lck(m_state_cond);
874  m_delta_stats.AddBytesHit(ret);
875  check_delta_stats();
876  }
877  return ret;
878  }
879 
880  return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
881 }
882 
883 //------------------------------------------------------------------------------
884 
885 int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
886  ReadReqRH *rh, const char *tpfx)
887 {
888  // Non-trivial processing for Read and ReadV.
889  // Entered under lock.
890  //
891  // loop over reqired blocks:
892  // - if on disk, ok;
893  // - if in ram or incoming, inc ref-count
894  // - otherwise request and inc ref count (unless RAM full => request direct)
895  // unlock
896 
897  int prefetch_cnt = 0;
898 
899  ReadRequest *read_req = nullptr;
900  BlockList_t blks_to_request; // blocks we are issuing a new remote request for
901 
902  std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
903 
904  std::vector<XrdOucIOVec> iovec_disk;
905  std::vector<XrdOucIOVec> iovec_direct;
906  int iovec_disk_total = 0;
907  int iovec_direct_total = 0;
908 
909  for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
910  {
911  const XrdOucIOVec &iov = readV[iov_idx];
912  long long iUserOff = iov.offset;
913  int iUserSize = iov.size;
914  char *iUserBuff = iov.data;
915 
916  const int idx_first = iUserOff / m_block_size;
917  const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
918 
919  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
920 
921  enum LastBlock_e { LB_other, LB_disk, LB_direct };
922 
923  LastBlock_e lbe = LB_other;
924 
925  for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
926  {
927  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
928  BlockMap_i bi = m_block_map.find(block_idx);
929 
930  // overlap and read
931  long long off; // offset in user buffer
932  long long blk_off; // offset in block
933  int size; // size to copy
934 
935  overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
936 
937  // In RAM or incoming?
938  if (bi != m_block_map.end())
939  {
940  inc_ref_count(bi->second);
941  TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
942 
943  if (bi->second->is_finished())
944  {
945  // note, blocks with error should not be here !!!
946  // they should be either removed or reissued in ProcessBlockResponse()
947  assert(bi->second->is_ok());
948 
949  blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
950 
951  if (bi->second->m_prefetch)
952  ++prefetch_cnt;
953  }
954  else
955  {
956  if ( ! read_req)
957  read_req = new ReadRequest(io, rh);
958 
959  // We have a lock on state_cond --> as we register the request before releasing the lock,
960  // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
961 
962  bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
963  ++read_req->m_n_chunk_reqs;
964  }
965 
966  lbe = LB_other;
967  }
968  // On disk?
969  else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
970  {
971  TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
972 
973  if (lbe == LB_disk)
974  iovec_disk.back().size += size;
975  else
976  iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
977  iovec_disk_total += size;
978 
979  if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
980  ++prefetch_cnt;
981 
982  lbe = LB_disk;
983  }
984  // Neither ... then we have to go get it ...
985  else
986  {
987  if ( ! read_req)
988  read_req = new ReadRequest(io, rh);
989 
990  // Is there room for one more RAM Block?
991  Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
992  if (b)
993  {
994  TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
995  inc_ref_count(b);
996  blks_to_request.push_back(b);
997 
998  b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
999  ++read_req->m_n_chunk_reqs;
1000 
1001  lbe = LB_other;
1002  }
1003  else // Nope ... read this directly without caching.
1004  {
1005  TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
1006 
1007  iovec_direct_total += size;
1008  read_req->m_direct_done = false;
1009 
1010  // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
1011  // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
1012  // is determined in the RequestBlocksDirect().
1013  if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
1014  iovec_direct.back().size += size;
1015  } else {
1016  long long in_offset = block_idx * m_block_size + blk_off;
1017  char *out_pos = iUserBuff + off;
1018  while (size > XrdProto::maxRVdsz) {
1019  iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
1020  in_offset += XrdProto::maxRVdsz;
1021  out_pos += XrdProto::maxRVdsz;
1022  size -= XrdProto::maxRVdsz;
1023  }
1024  iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1025  }
1026 
1027  lbe = LB_direct;
1028  }
1029  }
1030  } // end for over blocks in an IOVec
1031  } // end for over readV IOVec
1032 
1033  inc_prefetch_hit_cnt(prefetch_cnt);
1034 
1035  m_state_cond.UnLock();
1036 
1037  // First, send out remote requests for new blocks.
1038  if ( ! blks_to_request.empty())
1039  {
1040  ProcessBlockRequests(blks_to_request);
1041  blks_to_request.clear();
1042  }
1043 
1044  // Second, send out remote direct read requests.
1045  if ( ! iovec_direct.empty())
1046  {
1047  RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1048 
1049  TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
1050  }
1051 
1052  // Begin synchronous part where we process data that is already in RAM or on disk.
1053 
1054  long long bytes_read = 0;
1055  int error_cond = 0; // to be set to -errno
1056 
1057  // Third, process blocks that are available in RAM.
1058  if ( ! blks_ready.empty())
1059  {
1060  for (auto &bvi : blks_ready)
1061  {
1062  for (auto &cr : bvi.second)
1063  {
1064  TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
1065  memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1066  bytes_read += cr.m_size;
1067  }
1068  }
1069  }
1070 
1071  // Fourth, read blocks from disk.
1072  if ( ! iovec_disk.empty())
1073  {
1074  int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1075  TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1076  if (rc >= 0)
1077  {
1078  bytes_read += rc;
1079  }
1080  else
1081  {
1082  error_cond = rc;
1083  TRACEF(Error, tpfx << "failed read from disk");
1084  }
1085  }
1086 
1087  // End synchronous part -- update with sync stats and determine actual state of this read.
1088  // Note: remote reads might have already finished during disk-read!
1089 
1090  m_state_cond.Lock();
1091 
1092  for (auto &bvi : blks_ready)
1093  dec_ref_count(bvi.first, (int) bvi.second.size());
1094 
1095  if (read_req)
1096  {
1097  read_req->m_bytes_read += bytes_read;
1098  if (error_cond)
1099  read_req->update_error_cond(error_cond);
1100  read_req->m_stats.m_BytesHit += bytes_read;
1101  read_req->m_sync_done = true;
1102 
1103  if (read_req->is_complete())
1104  {
1105  // Almost like FinalizeReadRequest(read_req) -- but no callout!
1106  m_delta_stats.AddReadStats(read_req->m_stats);
1107  check_delta_stats();
1108  m_state_cond.UnLock();
1109 
1110  int ret = read_req->return_value();
1111  delete read_req;
1112  return ret;
1113  }
1114  else
1115  {
1116  m_state_cond.UnLock();
1117  return -EWOULDBLOCK;
1118  }
1119  }
1120  else
1121  {
1122  m_delta_stats.m_BytesHit += bytes_read;
1123  check_delta_stats();
1124  m_state_cond.UnLock();
1125 
1126  // !!! No callout.
1127 
1128  return error_cond ? error_cond : bytes_read;
1129  }
1130 }
1131 
1132 
1133 //==============================================================================
1134 // WriteBlock and Sync
1135 //==============================================================================
1136 
1138 {
1139  // write block buffer into disk file
1140  long long offset = b->m_offset - m_offset;
1141  long long size = b->get_size();
1142  ssize_t retval;
1143 
1144  if (m_cfi.IsCkSumCache())
1145  if (b->has_cksums())
1146  retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1147  else
1148  retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1149  else
1150  retval = m_data_file->Write(b->get_buff(), offset, size);
1151 
1152  if (retval < size)
1153  {
1154  if (retval < 0) {
1155  TRACEF(Error, "WriteToDisk() write error " << retval);
1156  } else {
1157  TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1158  }
1159 
1160  XrdSysCondVarHelper _lck(m_state_cond);
1161 
1162  dec_ref_count(b);
1163 
1164  return;
1165  }
1166 
1167  const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1168 
1169  // Set written bit.
1170  TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1171 
1172  bool schedule_sync = false;
1173  {
1174  XrdSysCondVarHelper _lck(m_state_cond);
1175 
1176  m_cfi.SetBitWritten(blk_idx);
1177 
1178  if (b->m_prefetch)
1179  {
1180  m_cfi.SetBitPrefetch(blk_idx);
1181  }
1182  if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1183  {
1184  m_cfi.ResetCkSumNet();
1185  }
1186 
1187  // Set synced bit or stash block index if in actual sync.
1188  // Synced state is only written out to cinfo file when data file is synced.
1189  if (m_in_sync)
1190  {
1191  m_writes_during_sync.push_back(blk_idx);
1192  }
1193  else
1194  {
1195  m_cfi.SetBitSynced(blk_idx);
1196  ++m_non_flushed_cnt;
1197  if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1198  ! m_in_shutdown)
1199  {
1200  schedule_sync = true;
1201  m_in_sync = true;
1202  m_non_flushed_cnt = 0;
1203  }
1204  }
1205  // As soon as the reference count is decreased on the block, the
1206  // file object may be deleted. Thus, to avoid holding both locks at a time,
1207  // we defer the ref count decrease until later if a sync is needed
1208  if (!schedule_sync) {
1209  dec_ref_count(b);
1210  }
1211  }
1212 
1213  if (schedule_sync)
1214  {
1215  cache()->ScheduleFileSync(this);
1216  XrdSysCondVarHelper _lck(m_state_cond);
1217  dec_ref_count(b);
1218  }
1219 }
1220 
1221 //------------------------------------------------------------------------------
1222 
1224 {
1225  TRACEF(Dump, "Sync()");
1226 
1227  int ret = m_data_file->Fsync();
1228  bool errorp = false;
1229  if (ret == XrdOssOK)
1230  {
1231  Stats loc_stats;
1232  {
1233  XrdSysCondVarHelper _lck(&m_state_cond);
1234  report_and_merge_delta_stats();
1235  loc_stats = m_stats;
1236  }
1237  m_cfi.WriteIOStat(loc_stats);
1238  m_cfi.Write(m_info_file, m_filename.c_str());
1239  int cret = m_info_file->Fsync();
1240  if (cret != XrdOssOK)
1241  {
1242  TRACEF(Error, "Sync cinfo file sync error " << cret);
1243  errorp = true;
1244  }
1245  }
1246  else
1247  {
1248  TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1249  errorp = true;
1250  }
1251 
1252  if (errorp)
1253  {
1254  TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1255 
1256  // Unlink will also call this->initiate_emergency_shutdown()
1257  Cache::GetInstance().UnlinkFile(m_filename, false);
1258 
1259  XrdSysCondVarHelper _lck(&m_state_cond);
1260 
1261  m_writes_during_sync.clear();
1262  m_in_sync = false;
1263 
1264  return;
1265  }
1266 
1267  int written_while_in_sync;
1268  bool resync = false;
1269  {
1270  XrdSysCondVarHelper _lck(&m_state_cond);
1271  for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1272  {
1273  m_cfi.SetBitSynced(*i);
1274  }
1275  written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1276  m_writes_during_sync.clear();
1277 
1278  // If there were writes during sync and the file is now complete,
1279  // let us call Sync again without resetting the m_in_sync flag.
1280  if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1281  resync = true;
1282  else
1283  m_in_sync = false;
1284  }
1285  TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1286 
1287  if (resync)
1288  Sync();
1289 }
1290 
1291 
1292 //==============================================================================
1293 // Block processing
1294 //==============================================================================
1295 
1296 void File::free_block(Block* b)
1297 {
1298  // Method always called under lock.
1299  int i = b->m_offset / m_block_size;
1300  TRACEF(Dump, "free_block block " << b << " idx = " << i);
1301  size_t ret = m_block_map.erase(i);
1302  if (ret != 1)
1303  {
1304  // assert might be a better option than a warning
1305  TRACEF(Error, "free_block did not erase " << i << " from map");
1306  }
1307  else
1308  {
1309  cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1310  delete b;
1311  }
1312 
1313  if (m_prefetch_state == kHold && (int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1314  {
1315  m_prefetch_state = kOn;
1316  cache()->RegisterPrefetchFile(this);
1317  }
1318 }
1319 
1320 //------------------------------------------------------------------------------
1321 
1322 bool File::select_current_io_or_disable_prefetching(bool skip_current)
1323 {
1324  // Method always called under lock. It also expects prefetch to be active.
1325 
1326  int io_size = (int) m_io_set.size();
1327  bool io_ok = false;
1328 
1329  if (io_size == 1)
1330  {
1331  io_ok = (*m_io_set.begin())->m_allow_prefetching;
1332  if (io_ok)
1333  {
1334  m_current_io = m_io_set.begin();
1335  }
1336  }
1337  else if (io_size > 1)
1338  {
1339  IoSet_i mi = m_current_io;
1340  if (skip_current && mi != m_io_set.end()) ++mi;
1341 
1342  for (int i = 0; i < io_size; ++i)
1343  {
1344  if (mi == m_io_set.end()) mi = m_io_set.begin();
1345 
1346  if ((*mi)->m_allow_prefetching)
1347  {
1348  m_current_io = mi;
1349  io_ok = true;
1350  break;
1351  }
1352  ++mi;
1353  }
1354  }
1355 
1356  if ( ! io_ok)
1357  {
1358  m_current_io = m_io_set.end();
1359  m_prefetch_state = kStopped;
1360  cache()->DeRegisterPrefetchFile(this);
1361  }
1362 
1363  return io_ok;
1364 }
1365 
1366 //------------------------------------------------------------------------------
1367 
1368 void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1369 {
1370  // Called from DirectResponseHandler.
1371  // NOT under lock.
1372 
1373  if (error_cond)
1374  TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1375 
1376  m_state_cond.Lock();
1377 
1378  if (error_cond)
1379  rreq->update_error_cond(error_cond);
1380  else {
1381  rreq->m_stats.m_BytesBypassed += bytes_read;
1382  rreq->m_bytes_read += bytes_read;
1383  }
1384 
1385  rreq->m_direct_done = true;
1386 
1387  bool rreq_complete = rreq->is_complete();
1388 
1389  m_state_cond.UnLock();
1390 
1391  if (rreq_complete)
1392  FinalizeReadRequest(rreq);
1393 }
1394 
1395 void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1396 {
1397  // Called from ProcessBlockResponse().
1398  // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1399  // Does not manage m_read_req.
1400  // Will not complete the request.
1401 
1402  TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1403  " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1404 
1405  rreq->update_error_cond(b->get_error());
1406  --rreq->m_n_chunk_reqs;
1407 
1408  dec_ref_count(b);
1409 }
1410 
1411 void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1412 {
1413  // Called from ProcessBlockResponse().
1414  // NOT under lock as it does memcopy ofor exisf block data.
1415  // Acquires lock for block, m_read_req and rreq state update.
1416 
1417  ReadRequest *rreq = creq.m_read_req;
1418 
1419  TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1420  memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1421 
1422  m_state_cond.Lock();
1423 
1424  rreq->m_bytes_read += creq.m_size;
1425 
1426  if (b->get_req_id() == (void*) rreq)
1427  rreq->m_stats.m_BytesMissed += creq.m_size;
1428  else
1429  rreq->m_stats.m_BytesHit += creq.m_size;
1430 
1431  --rreq->m_n_chunk_reqs;
1432 
1433  if (b->m_prefetch)
1434  inc_prefetch_hit_cnt(1);
1435 
1436  dec_ref_count(b);
1437 
1438  bool rreq_complete = rreq->is_complete();
1439 
1440  m_state_cond.UnLock();
1441 
1442  if (rreq_complete)
1443  FinalizeReadRequest(rreq);
1444 }
1445 
1446 void File::FinalizeReadRequest(ReadRequest *rreq)
1447 {
1448  // called from ProcessBlockResponse()
1449  // NOT under lock -- does callout
1450  {
1451  XrdSysCondVarHelper _lck(m_state_cond);
1452  m_delta_stats.AddReadStats(rreq->m_stats);
1453  check_delta_stats();
1454  }
1455 
1456  rreq->m_rh->Done(rreq->return_value());
1457  delete rreq;
1458 }
1459 
1460 void File::ProcessBlockResponse(Block *b, int res)
1461 {
1462  static const char* tpfx = "ProcessBlockResponse ";
1463 
1464  TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1465 
1466  if (res >= 0 && res != b->get_size())
1467  {
1468  // Incorrect number of bytes received, apparently size of the file on the remote
1469  // is different than what the cache expects it to be.
1470  TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1471  Cache::GetInstance().UnlinkFile(m_filename, false);
1472  }
1473 
1474  m_state_cond.Lock();
1475 
1476  // Deregister block from IO's prefetch count, if needed.
1477  if (b->m_prefetch)
1478  {
1479  IO *io = b->get_io();
1480  IoSet_i mi = m_io_set.find(io);
1481  if (mi != m_io_set.end())
1482  {
1483  --io->m_active_prefetches;
1484 
1485  // If failed and IO is still prefetching -- disable prefetching on this IO.
1486  if (res < 0 && io->m_allow_prefetching)
1487  {
1488  TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1489  io->m_allow_prefetching = false;
1490 
1491  // Check if any IO is still available for prfetching. If not, stop it.
1492  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1493  {
1494  if ( ! select_current_io_or_disable_prefetching(false) )
1495  {
1496  TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1497  }
1498  }
1499  }
1500 
1501  // If failed with no subscribers -- delete the block and exit.
1502  if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1503  {
1504  free_block(b);
1505  m_state_cond.UnLock();
1506  return;
1507  }
1508  m_prefetch_bytes += b->get_size();
1509  }
1510  else
1511  {
1512  TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1513  }
1514  }
1515 
1516  if (res == b->get_size())
1517  {
1518  b->set_downloaded();
1519  TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1520  if ( ! m_in_shutdown)
1521  {
1522  // Increase ref-count for the writer.
1523  inc_ref_count(b);
1524  m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1525  // No check for writes, report-and-merge forced during Sync().
1526  cache()->AddWriteTask(b, true);
1527  }
1528 
1529  // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1530  vChunkRequest_t creqs_to_notify;
1531  creqs_to_notify.swap( b->m_chunk_reqs );
1532 
1533  m_state_cond.UnLock();
1534 
1535  for (auto &creq : creqs_to_notify)
1536  {
1537  ProcessBlockSuccess(b, creq);
1538  }
1539  }
1540  else
1541  {
1542  if (res < 0) {
1543  bool new_error = b->get_io()->register_block_error(res);
1544  int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1545  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1546  << ", io=" << b->get_io() << ", error=" << res);
1547  } else {
1548  bool first_p = b->get_io()->register_incomplete_read();
1549  int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1550  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1551  << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1552 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1553  res = -EIO;
1554 #else
1555  res = -EREMOTEIO;
1556 #endif
1557  }
1558  b->set_error(res);
1559 
1560  // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1561  // Collect others with a different IO, the first of them will be used to reissue the request.
1562  // This is then done outside of lock.
1563  std::list<ReadRequest*> rreqs_to_complete;
1564  vChunkRequest_t creqs_to_keep;
1565 
1566  for(ChunkRequest &creq : b->m_chunk_reqs)
1567  {
1568  ReadRequest *rreq = creq.m_read_req;
1569 
1570  if (rreq->m_io == b->get_io())
1571  {
1572  ProcessBlockError(b, rreq);
1573  if (rreq->is_complete())
1574  {
1575  rreqs_to_complete.push_back(rreq);
1576  }
1577  }
1578  else
1579  {
1580  creqs_to_keep.push_back(creq);
1581  }
1582  }
1583 
1584  bool reissue = false;
1585  if ( ! creqs_to_keep.empty())
1586  {
1587  ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1588 
1589  TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1590  b->get_io() << " - reissuing request with my io " << rreq->m_io);
1591 
1592  b->reset_error_and_set_io(rreq->m_io, rreq);
1593  b->m_chunk_reqs.swap( creqs_to_keep );
1594  reissue = true;
1595  }
1596 
1597  m_state_cond.UnLock();
1598 
1599  for (auto rreq : rreqs_to_complete)
1600  FinalizeReadRequest(rreq);
1601 
1602  if (reissue)
1603  ProcessBlockRequest(b);
1604  }
1605 }
1606 
1607 //------------------------------------------------------------------------------
1608 
1609 const char* File::lPath() const
1610 {
1611  return m_filename.c_str();
1612 }
1613 
1614 //------------------------------------------------------------------------------
1615 
1616 int File::offsetIdx(int iIdx) const
1617 {
1618  return iIdx - m_offset/m_block_size;
1619 }
1620 
1621 
1622 //------------------------------------------------------------------------------
1623 
1625 {
1626  // Check that block is not on disk and not in RAM.
1627  // TODO: Could prefetch several blocks at once!
1628  // blks_max could be an argument
1629 
1630  BlockList_t blks;
1631 
1632  TRACEF(DumpXL, "Prefetch() entering.");
1633  {
1634  XrdSysCondVarHelper _lck(m_state_cond);
1635 
1636  if (m_prefetch_state != kOn)
1637  {
1638  return;
1639  }
1640 
1641  if ( ! select_current_io_or_disable_prefetching(true) )
1642  {
1643  TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1644  return;
1645  }
1646 
1647  // Select block(s) to fetch.
1648  for (int f = 0; f < m_num_blocks; ++f)
1649  {
1650  if ( ! m_cfi.TestBitWritten(f))
1651  {
1652  int f_act = f + m_offset / m_block_size;
1653 
1654  BlockMap_i bi = m_block_map.find(f_act);
1655  if (bi == m_block_map.end())
1656  {
1657  Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1658  if (b)
1659  {
1660  TRACEF(Dump, "Prefetch take block " << f_act);
1661  blks.push_back(b);
1662  // Note: block ref_cnt not increased, it will be when placed into write queue.
1663 
1664  inc_prefetch_read_cnt(1);
1665  }
1666  else
1667  {
1668  // This shouldn't happen as prefetching stops when RAM is 70% full.
1669  TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1670  }
1671  break;
1672  }
1673  }
1674  }
1675 
1676  if (blks.empty())
1677  {
1678  TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1679  m_prefetch_state = kComplete;
1680  cache()->DeRegisterPrefetchFile(this);
1681  }
1682  else
1683  {
1684  (*m_current_io)->m_active_prefetches += (int) blks.size();
1685  }
1686  }
1687 
1688  if ( ! blks.empty())
1689  {
1690  ProcessBlockRequests(blks);
1691  }
1692 }
1693 
1694 
1695 //------------------------------------------------------------------------------
1696 
1698 {
1699  return m_prefetch_score;
1700 }
1701 
1703 {
1704  return Cache::TheOne().GetLog();
1705 }
1706 
1708 {
1709  return Cache::TheOne().GetTrace();
1710 }
1711 
1712 void File::insert_remote_location(const std::string &loc)
1713 {
1714  if ( ! loc.empty())
1715  {
1716  size_t p = loc.find_first_of('@');
1717  m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1718  }
1719 }
1720 
1721 std::string File::GetRemoteLocations() const
1722 {
1723  std::string s;
1724  if ( ! m_remote_locations.empty())
1725  {
1726  size_t sl = 0;
1727  int nl = 0;
1728  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1729  {
1730  sl += i->size();
1731  }
1732  s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1733  s = '[';
1734  int j = 1;
1735  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1736  {
1737  s += '"'; s += *i; s += '"';
1738  if (j < nl) s += ',';
1739  }
1740  s += ']';
1741  }
1742  else
1743  {
1744  s = "[]";
1745  }
1746  return s;
1747 }
1748 
1749 //==============================================================================
1750 //======================= RESPONSE HANDLERS ==============================
1751 //==============================================================================
1752 
1754 {
1755  m_block->m_file->ProcessBlockResponse(m_block, res);
1756  delete this;
1757 }
1758 
1759 //------------------------------------------------------------------------------
1760 
1762 {
1763  m_mutex.Lock();
1764 
1765  int n_left = --m_to_wait;
1766 
1767  if (res < 0) {
1768  if (m_errno == 0) m_errno = res; // store first reported error
1769  } else {
1770  m_bytes_read += res;
1771  }
1772 
1773  m_mutex.UnLock();
1774 
1775  if (n_left == 0)
1776  {
1777  m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1778  delete this;
1779  }
1780 }
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACE_Error
Definition: XrdPfcTrace.hh:7
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACEF(act, x)
Definition: XrdPfcTrace.hh:67
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
#define TRACEF_INT(act, x)
Definition: XrdPfcTrace.hh:71
#define stat(a, b)
Definition: XrdPosix.hh:101
#define XRD_TRACE
Definition: XrdScheduler.cc:48
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
URL representation.
Definition: XrdClURL.hh:31
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:136
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
Definition: XrdOucCache.cc:86
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void Done(int result) override
Definition: XrdPfcFile.cc:1753
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:163
int get_size() const
Definition: XrdPfcFile.hh:136
int get_error() const
Definition: XrdPfcFile.hh:150
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:162
void * get_req_id() const
Definition: XrdPfcFile.hh:142
long long get_offset() const
Definition: XrdPfcFile.hh:138
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:125
void set_error(int err)
Definition: XrdPfcFile.hh:149
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:161
char * get_buff() const
Definition: XrdPfcFile.hh:135
IO * get_io() const
Definition: XrdPfcFile.hh:141
void set_downloaded()
Definition: XrdPfcFile.hh:148
bool req_cksum_net() const
Definition: XrdPfcFile.hh:159
bool has_cksums() const
Definition: XrdPfcFile.hh:160
long long m_offset
Definition: XrdPfcFile.hh:114
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:152
int get_req_size() const
Definition: XrdPfcFile.hh:137
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:163
XrdOss * GetOss() const
Definition: XrdPfc.hh:280
bool blocksize_str2value(const char *from, const char *str, long long &val, long long min, long long max) const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:215
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:135
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:132
XrdSysTrace * GetTrace() const
Definition: XrdPfc.hh:295
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1188
static const Cache & TheOne()
Definition: XrdPfc.cc:133
XrdSysError * GetLog() const
Definition: XrdPfc.hh:294
void Done(int result) override
Definition: XrdPfcFile.cc:1761
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:325
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1609
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:854
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1137
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1697
friend class BlockResponseHandler
Definition: XrdPfcFile.hh:205
void Prefetch()
Definition: XrdPfcFile.cc:1624
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1721
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:632
void AddIO(IO *io)
Definition: XrdPfcFile.cc:349
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:141
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:319
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:219
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:206
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1223
XrdSysTrace * GetTrace() const
Definition: XrdPfcFile.cc:1707
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:817
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:233
long long initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:154
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:386
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:211
XrdSysError * GetLog() const
Definition: XrdPfcFile.cc:1702
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:242
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
bool register_incomplete_read()
Definition: XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31
bool register_block_error(int res)
Definition: XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:70
const char * GetLocation()
Definition: XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
Definition: XrdPfcInfo.hh:365
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void SetBitSynced(int i)
Mark block as synced to disk.
Definition: XrdPfcInfo.hh:387
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:301
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:286
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
Definition: XrdPfcInfo.cc:420
void ResetCkSumNet()
Definition: XrdPfcInfo.cc:213
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:266
void DowngradeCkSumState(CkSumCheck_e css_ref)
Definition: XrdPfcInfo.hh:295
bool IsCkSumNet() const
Definition: XrdPfcInfo.hh:290
void ResetAllAccessStats()
Reset IO Stats.
Definition: XrdPfcInfo.cc:359
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
Definition: XrdPfcInfo.hh:376
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:447
bool IsCkSumCache() const
Definition: XrdPfcInfo.hh:289
void SetBitWritten(int i)
Mark block as written to disk.
Definition: XrdPfcInfo.hh:352
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:429
long long GetExpectedDataFileSize() const
Get expected data file size.
Definition: XrdPfcInfo.hh:420
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
Definition: XrdPfcInfo.hh:343
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:294
void SetCkSumState(CkSumCheck_e css)
Definition: XrdPfcInfo.hh:294
void ResetNoCkSumTime()
Definition: XrdPfcInfo.hh:302
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:161
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:438
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
void IoAttach()
Definition: XrdPfcStats.hh:85
void AddReadStats(const Stats &s)
Definition: XrdPfcStats.hh:67
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:41
void AddUp(const Stats &s)
Definition: XrdPfcStats.hh:119
void AddWriteStats(long long bytes_written, int n_cks_errs)
Definition: XrdPfcStats.hh:79
long long BytesReadAndWritten() const
Definition: XrdPfcStats.hh:102
void AddBytesHit(long long bh)
Definition: XrdPfcStats.hh:74
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:39
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
void IoDetach(int duration)
Definition: XrdPfcStats.hh:90
@ Warning
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:101
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:166
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:167
static const int maxRVdsz
Definition: XProtocol.hh:688
static const int maxRvecsz
Definition: XProtocol.hh:686
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
char * data
Definition: XrdOucIOVec.hh:45
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:122
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:115
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:116
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:80
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition: XrdPfc.hh:119
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:73
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
long long m_bufferSize
cache block size, default 128 kB
Definition: XrdPfc.hh:107
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:114
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:117
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition: XrdPfc.hh:118
unsigned short m_seq_id
Definition: XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:81
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:66
bool is_complete() const
Definition: XrdPfcFile.hh:83
int return_value() const
Definition: XrdPfcFile.hh:84
long long m_bytes_read
Definition: XrdPfcFile.hh:68