XRootD
XrdOssThrottleFile.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* (c) 2025 by the Morgridge Institute for Research */
4 /* */
5 /* This file is part of the XRootD software suite. */
6 /* */
7 /* XRootD is free software: you can redistribute it and/or modify it under */
8 /* the terms of the GNU Lesser General Public License as published by the */
9 /* Free Software Foundation, either version 3 of the License, or (at your */
10 /* option) any later version. */
11 /* */
12 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
13 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
14 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
15 /* License for more details. */
16 /* */
17 /* You should have received a copy of the GNU Lesser General Public License */
18 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
19 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
20 /* */
21 /* The copyright holder's institutional names and contributor's names may not */
22 /* be used to endorse or promote products derived from this software without */
23 /* specific prior written permission of the institution or contributor. */
24 /******************************************************************************/
25 
26 #include "XrdOuc/XrdOucEnv.hh"
28 #include "XrdOss/XrdOss.hh"
29 #include "XrdOss/XrdOssWrapper.hh"
30 #include "XrdSfs/XrdSfsAio.hh"
31 #include "XrdSys/XrdSysLogger.hh"
35 #include "XrdVersion.hh"
36 
37 #include <functional>
38 
39 namespace {
40 
41 class File final : public XrdOssWrapDF {
42 public:
43  File(std::unique_ptr<XrdOssDF> wrapDF, XrdThrottleManager &throttle, XrdSysError *lP, XrdOucTrace *tP)
44  : XrdOssWrapDF(*wrapDF), m_log(lP), m_throttle(throttle), m_trace(tP), m_wrapped(std::move(wrapDF)) {}
45 
46 virtual ~File() {}
47 
48 virtual int Open(const char *path, int Oflag, mode_t Mode,
49  XrdOucEnv &env) override {
50 
51  std::tie(m_user, m_uid) = m_throttle.GetUserInfo(env.secEnv());
52 
53  std::string open_error_message;
54  if (!m_throttle.OpenFile(m_user, open_error_message)) {
55  TRACE(DEBUG, open_error_message);
56  return -EMFILE;
57  }
58 
59  auto rval = wrapDF.Open(path, Oflag, Mode, env);
60 
61  if (rval < 0) {
62  m_throttle.CloseFile(m_user);
63  }
64 
65  return rval;
66 }
67 
68 virtual int Close(long long *retsz) override {
69  m_throttle.CloseFile(m_user);
70  return wrapDF.Close(retsz);
71 }
72 
73 virtual int getFD() override {return -1;}
74 
75 virtual off_t getMmap(void **addr) override {*addr = 0; return 0;}
76 
77 virtual ssize_t pgRead (void* buffer, off_t offset, size_t rdlen,
78  uint32_t* csvec, uint64_t opts) override {
79 
80  return DoThrottle(rdlen, 1,
81  static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgRead),
82  buffer, offset, rdlen, csvec, opts);
83 }
84 
85 virtual int pgRead(XrdSfsAio *aioparm, uint64_t opts) override
86 { // We disable all AIO-based reads.
87  aioparm->Result = pgRead((char *)aioparm->sfsAio.aio_buf,
88  aioparm->sfsAio.aio_offset,
89  aioparm->sfsAio.aio_nbytes,
90  aioparm->cksVec, opts);
91  aioparm->doneRead();
92  return 0;
93 }
94 
95 virtual ssize_t pgWrite(void* buffer, off_t offset, size_t wrlen,
96  uint32_t* csvec, uint64_t opts) override {
97 
98  return DoThrottle(wrlen, 1,
99  static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgWrite),
100  buffer, offset, wrlen, csvec, opts);
101 }
102 
103 virtual int pgWrite(XrdSfsAio *aioparm, uint64_t opts) override
104 { // We disable all AIO-based writes.
105  aioparm->Result = this->pgWrite((char *)aioparm->sfsAio.aio_buf,
106  aioparm->sfsAio.aio_offset,
107  aioparm->sfsAio.aio_nbytes,
108  aioparm->cksVec, opts);
109  aioparm->doneWrite();
110  return 0;
111 }
112 
113 virtual ssize_t Read(off_t offset, size_t size) override {
114  return DoThrottle(size, 1,
115  static_cast<ssize_t (XrdOssDF::*)(off_t, size_t)>(&XrdOssDF::Read),
116  offset, size);
117 }
118 virtual ssize_t Read(void* buffer, off_t offset, size_t size) override {
119  return DoThrottle(size, 1,
120  static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t)>(&XrdOssDF::Read),
121  buffer, offset, size);
122 }
123 
124 virtual int Read(XrdSfsAio *aiop) override {
125  aiop->Result = this->Read((char *)aiop->sfsAio.aio_buf,
126  aiop->sfsAio.aio_offset,
127  aiop->sfsAio.aio_nbytes);
128  aiop->doneRead();
129  return 0;
130 }
131 
132 virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt) override {
133  off_t sum = 0;
134  for (int i = 0; i < rdvcnt; ++i) {
135  sum += readV[i].size;
136  }
137  return DoThrottle(sum, rdvcnt, &XrdOssDF::ReadV, readV, rdvcnt);
138 }
139 
140 
141 virtual ssize_t Write(const void* buffer, off_t offset, size_t size) override {
142  return DoThrottle(size, 1,
143  static_cast<ssize_t (XrdOssDF::*)(const void*, off_t, size_t)>(&XrdOssDF::Write),
144  buffer, offset, size);
145 }
146 
147 virtual int Write(XrdSfsAio *aiop) override {
148  aiop->Result = this->Write((char *)aiop->sfsAio.aio_buf,
149  aiop->sfsAio.aio_offset,
150  aiop->sfsAio.aio_nbytes);
151  aiop->doneWrite();
152  return 0;
153 }
154 
155 private:
156 
157  template <class Fn, class... Args>
158  int DoThrottle(size_t rdlen, size_t ops, Fn &&fn, Args &&... args) {
159  m_throttle.Apply(rdlen, ops, m_uid);
160  bool ok = true;
161  XrdThrottleTimer timer = m_throttle.StartIOTimer(m_uid, ok);
162  if (!ok) {
163  TRACE(DEBUG, "Throttling in progress");
164  return -EMFILE;
165  }
166  return std::invoke(fn, wrapDF, std::forward<Args>(args)...);
167  }
168 
169  XrdSysError *m_log{nullptr};
170  XrdThrottleManager &m_throttle;
171  XrdOucTrace *m_trace{nullptr};
172  std::unique_ptr<XrdOssDF> m_wrapped;
173  std::string m_user;
174  uint16_t m_uid;
175 
176  static constexpr char TraceID[] = "XrdThrottleFile";
177 };
178 
179 class FileSystem final : public XrdOssWrapper {
180 public:
181  FileSystem(XrdOss *oss, XrdSysLogger *log, XrdOucEnv *envP)
182  : XrdOssWrapper(*oss),
183  m_env(envP),
184  m_oss(oss),
185  m_log(new XrdSysError(log)),
186  m_trace(new XrdOucTrace(m_log.get())),
187  m_throttle(m_log.get(), m_trace.get())
188  {
189 
190  m_throttle.Init();
191  if (envP)
192  {
193  auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
194  m_log->Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
195  m_throttle.SetMonitor(gstream);
196  }
197  }
198 
199  int Configure(const std::string &config_filename) {
200  XrdThrottle::Configuration config(*m_log, m_env);
201  if (config.Configure(config_filename)) {
202  m_log->Emsg("Config", "Unable to load configuration file", config_filename.c_str());
203  return 1;
204  }
205  m_throttle.FromConfig(config);
206  return 0;
207  }
208 
209  virtual ~FileSystem() {}
210 
211  virtual XrdOssDF *newFile(const char *user = 0) override {
212  std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
213  return new File(std::move(wrapped), m_throttle, m_log.get(), m_trace.get());
214  }
215 
216 private:
217  XrdOucEnv *m_env{nullptr};
218  std::unique_ptr<XrdOss> m_oss;
219  std::unique_ptr<XrdSysError> m_log{nullptr};
220  std::unique_ptr<XrdOucTrace> m_trace{nullptr};
221  XrdThrottleManager m_throttle;
222 };
223 
224 } // namespace
225 
226 extern "C" {
227 
229  const char *config_fn, const char *parms,
230  XrdOucEnv *envP) {
231  std::unique_ptr<FileSystem> fs(new FileSystem(curr_oss, logger, envP));
232  if (fs->Configure(config_fn)) {
233  XrdSysError(logger, "XrdThrottle").Say("Config", "Unable to load configuration file", config_fn);
234  return nullptr;
235  }
236  // Note the throttle is set up as an OSS.
237  // This will prevent the throttle from being layered on top of the OFS; to keep backward
238  // compatibility with old configurations, we do not cause the server to fail.
239  //
240  // Originally, XrdThrottle was used as an OFS because the loadshed code required the ability
241  // to redirect the client to a different server. This is rarely (never?) used in practice.
242  // By putting the throttle in the OSS, we benefit from the fact the OFS has first run the
243  // authorization code and has made a user name available for fairshare of the throttle.
244  envP->PutInt("XrdOssThrottle", 1);
245  return fs.release();
246 }
247 
249 
250 } // extern "C"
251 
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
XrdVERSIONINFO(XrdOssAddStorageSystem2, throttle)
int Mode
XrdOucString File
struct myOpts opts
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:160
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int getFD()
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Close(long long *retsz=0)
virtual ssize_t pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual off_t getMmap(void **addr)
virtual int Init(XrdSysLogger *lp, const char *cfn)
virtual XrdOssDF * newFile(const char *tident)
void PutInt(const char *varname, long value)
Definition: XrdOucEnv.cc:250
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
const XrdSecEntity * secEnv() const
Definition: XrdOucEnv.hh:107
uint32_t * cksVec
Definition: XrdSfsAio.hh:63
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
virtual void doneWrite()=0
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
XrdOucEnv * envP
Definition: XrdPss.cc:109