XRootD
XrdOssStatsFileSystem.cc
Go to the documentation of this file.
1 
3 #include "XrdOssStatsConfig.hh"
5 #include "XrdOssStatsFile.hh"
9 
10 #include <cinttypes>
11 #include <stdexcept>
12 #include <thread>
13 
14 using namespace XrdOssStats;
15 using namespace XrdOssStats::detail;
16 
17 FileSystem::FileSystem(XrdOss *oss, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP) :
18  XrdOssWrapper(*oss),
19  m_oss(oss),
20  m_env(envP),
21  m_log(lp, "fsstat_"),
22  m_slow_duration(std::chrono::seconds(1))
23 {
24  m_log.Say("------ Initializing the storage statistics plugin.");
25  if (!Config(configfn)) {
26  m_failure = "Failed to configure the storage statistics plugin.";
27  return;
28  }
29 
30  // While the plugin _does_ print its activity to the debugging facility (if enabled), its relatively useless
31  // unless the g-stream is available. Hence, if it's _not_ available, we stop the OSS initialization but do
32  // not cause the server startup to fail.
33  if (envP) {
34  m_gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("oss.gStream*"));
35  if (m_gstream) {
36  m_log.Say("Config", "Stats monitoring has been configured via xrootd.mongstream directive");
37  } else {
38  m_log.Say("Config", "XrdOssStats plugin is loaded but it requires the oss monitoring g-stream to also be enabled to be useful; try adding `xrootd.mongstream oss ...` to your configuration");
39  return;
40  }
41  } else {
42  m_failure = "XrdOssStats plugin invoked without a configured environment; likely an internal error";
43  return;
44  }
45 
46  // Parse the 'runmode' of the OSS, if applicable
47  auto runmode = envP->Get("oss.runmode");
48  if (runmode && runmode[0]) {
49  m_runmode = runmode;
50  }
51 
52  pthread_t tid;
53  int rc;
54  if ((rc = XrdSysThread::Run(&tid, FileSystem::AggregateBootstrap, static_cast<void *>(this), 0, "FS Stats Compute Thread"))) {
55  m_log.Emsg("FileSystem", rc, "create stats compute thread");
56  m_failure = "Failed to create the statistics computing thread.";
57  return;
58  }
59 
60  m_ready = true;
61 }
62 
64 
65 bool
66 FileSystem::InitSuccessful(std::string &errMsg) {
67  if (m_ready) return true;
68 
69  errMsg = m_failure;
70  if (errMsg.empty()) {
71  m_oss.release();
72  }
73  return false;
74 }
75 
76 void *
77 FileSystem::AggregateBootstrap(void *me) {
78  auto myself = static_cast<FileSystem*>(me);
79  while (1) {
80  std::this_thread::sleep_for(std::chrono::seconds(1));
81  myself->AggregateStats();
82  }
83  return nullptr;
84 }
85 
86 bool
87 FileSystem::Config(const char *configfn)
88 {
90 
91  XrdOucGatherConf statsConf("fsstats.trace fsstats.slowop", &m_log);
92  int result;
93  if ((result = statsConf.Gather(configfn, XrdOucGatherConf::trim_lines)) < 0) {
94  m_log.Emsg("Config", -result, "parsing config file", configfn);
95  return false;
96  }
97 
98  char *val;
99  while (statsConf.GetLine()) {
100  val = statsConf.GetToken(); // Ignore -- we asked for a single value
101  if (!strcmp(val, "trace")) {
102  m_log.setMsgMask(0);
103  if (!(val = statsConf.GetToken())) {
104  m_log.Emsg("Config", "fsstats.trace requires an argument. Usage: fsstats.trace [all|err|warning|info|debug|none]");
105  return false;
106  }
107  do {
108  if (!strcmp(val, "all")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::All);}
109  else if (!strcmp(val, "error")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error);}
110  else if (!strcmp(val, "warning")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning);}
111  else if (!strcmp(val, "info")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning | LogMask::Info);}
112  else if (!strcmp(val, "debug")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning | LogMask::Info | LogMask::Debug);}
113  else if (!strcmp(val, "none")) {m_log.setMsgMask(0);}
114  } while ((val = statsConf.GetToken()));
115  } else if (!strcmp(val, "slowop")) {
116  if (!(val = statsConf.GetToken())) {
117  m_log.Emsg("Config", "fsstats.slowop requires an argument. Usage: fsstats.slowop [duration]");
118  return false;
119  }
120  std::string errmsg;
121  if (!ParseDuration(val, m_slow_duration, errmsg)) {
122  m_log.Emsg("Config", "fsstats.slowop couldn't parse duration", val, errmsg.c_str());
123  return false;
124  }
125  }
126  }
127  m_log.Emsg("Config", "Logging levels enabled", LogMaskToString(m_log.getMsgMask()).c_str());
128 
129  return true;
130 }
131 
132 XrdOssDF *FileSystem::newDir(const char *user)
133 {
134  // Call the underlying OSS newDir
135  std::unique_ptr<XrdOssDF> wrapped(wrapPI.newDir(user));
136  return new Directory(std::move(wrapped), m_log, *this);
137 }
138 
139 XrdOssDF *FileSystem::newFile(const char *user)
140 {
141  // Call the underlying OSS newFile
142  std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
143  return new File(std::move(wrapped), m_log, *this);
144 }
145 
146 int FileSystem::Chmod(const char * path, mode_t mode, XrdOucEnv *env)
147 {
148  OpTimer op(m_ops.m_chmod_ops, m_slow_ops.m_chmod_ops, m_times.m_chmod, m_slow_times.m_chmod, m_slow_duration);
149  return wrapPI.Chmod(path, mode, env);
150 }
151 
152 int FileSystem::Rename(const char *oPath, const char *nPath,
153  XrdOucEnv *oEnvP, XrdOucEnv *nEnvP)
154 {
155  OpTimer op(m_ops.m_rename_ops, m_slow_ops.m_rename_ops, m_times.m_rename, m_slow_times.m_rename, m_slow_duration);
156  return wrapPI.Rename(oPath, nPath, oEnvP, nEnvP);
157 }
158 
159 int FileSystem::Stat(const char *path, struct stat *buff,
160  int opts, XrdOucEnv *env)
161 {
162  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
163  return wrapPI.Stat(path, buff, opts, env);
164 }
165 
166 int FileSystem::StatFS(const char *path, char *buff, int &blen,
167  XrdOucEnv *env)
168 {
169  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
170  return wrapPI.StatFS(path, buff, blen, env);
171 }
172 
173 int FileSystem::StatLS(XrdOucEnv &env, const char *path,
174  char *buff, int &blen)
175 {
176  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
177  return wrapPI.StatLS(env, path, buff, blen);
178 }
179 
180 int FileSystem::StatPF(const char *path, struct stat *buff, int opts)
181 {
182  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
183  return wrapPI.StatPF(path, buff, opts);
184 }
185 
186 int FileSystem::StatPF(const char *path, struct stat *buff)
187 {
188  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
189  return wrapPI.StatPF(path, buff, 0);
190 }
191 
192 int FileSystem::StatVS(XrdOssVSInfo *vsP, const char *sname, int updt)
193 {
194  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
195  return wrapPI.StatVS(vsP, sname, updt);
196 }
197 
198 int FileSystem::StatXA(const char *path, char *buff, int &blen,
199  XrdOucEnv *env)
200 {
201  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
202  return wrapPI.StatXA(path, buff, blen, env);
203 }
204 
205 int FileSystem::StatXP(const char *path, unsigned long long &attr,
206  XrdOucEnv *env)
207 {
208  OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
209  return wrapPI.StatXP(path, attr, env);
210 }
211 
212 int FileSystem::Truncate(const char *path, unsigned long long fsize,
213  XrdOucEnv *env)
214 {
215  OpTimer op(m_ops.m_truncate_ops, m_slow_ops.m_truncate_ops, m_times.m_truncate, m_slow_times.m_truncate, m_slow_duration);
216  return wrapPI.Truncate(path, fsize, env);
217 }
218 
219 int FileSystem::Unlink(const char *path, int Opts, XrdOucEnv *env)
220 {
221  OpTimer op(m_ops.m_unlink_ops, m_slow_ops.m_unlink_ops, m_times.m_unlink, m_slow_times.m_unlink, m_slow_duration);
222  return wrapPI.Unlink(path, Opts, env);
223 }
224 
225 void FileSystem::AggregateStats()
226 {
227  char buf[1500];
228  auto len = snprintf(buf, 1500,
229  "{"
230  "\"event\":\"oss_stats%s\"," \
231  "\"reads\":%" PRIu64 ",\"writes\":%" PRIu64 ",\"stats\":%" PRIu64 "," \
232  "\"pgreads\":%" PRIu64 ",\"pgwrites\":%" PRIu64 ",\"readvs\":%" PRIu64 "," \
233  "\"readv_segs\":%" PRIu64 ",\"dirlists\":%" PRIu64 ",\"dirlist_ents\":%" PRIu64 ","
234  "\"truncates\":%" PRIu64 ",\"unlinks\":%" PRIu64 ",\"chmods\":%" PRIu64 ","
235  "\"opens\":%" PRIu64 ",\"renames\":%" PRIu64 ","
236  "\"slow_reads\":%" PRIu64 ",\"slow_writes\":%" PRIu64 ",\"slow_stats\":%" PRIu64 ","
237  "\"slow_pgreads\":%" PRIu64 ",\"slow_pgwrites\":%" PRIu64 ",\"slow_readvs\":%" PRIu64 ","
238  "\"slow_readv_segs\":%" PRIu64 ",\"slow_dirlists\":%" PRIu64 ",\"slow_dirlist_ents\":%" PRIu64 ","
239  "\"slow_truncates\":%" PRIu64 ",\"slow_unlinks\":%" PRIu64 ",\"slow_chmods\":%" PRIu64 ","
240  "\"slow_opens\":%" PRIu64 ",\"slow_renames\":%" PRIu64 ","
241  "\"open_t\":%.4f,\"read_t\":%.4f,\"readv_t\":%.4f,"
242  "\"pgread_t\":%.4f,\"write_t\":%.4f,\"pgwrite_t\":%.4f,"
243  "\"dirlist_t\":%.4f,\"stat_t\":%.4f,\"truncate_t\":%.4f,"
244  "\"unlink_t\":%.4f,\"rename_t\":%.4f,\"chmod_t\":%.4f,"
245  "\"slow_open_t\":%.4f,\"slow_read_t\":%.4f,\"slow_readv_t\":%.4f,"
246  "\"slow_pgread_t\":%.4f,\"slow_write_t\":%.4f,\"slow_pgwrite_t\":%.4f,"
247  "\"slow_dirlist_t\":%.4f,\"slow_stat_t\":%.4f,\"slow_truncate_t\":%.4f,"
248  "\"slow_unlink_t\":%.4f,\"slow_rename_t\":%.4f,\"slow_chmod_t\":%.4f"
249  "}",
250  m_runmode.empty() ? "" : ("_" + m_runmode).c_str(),
251  static_cast<uint64_t>(m_ops.m_read_ops), static_cast<uint64_t>(m_ops.m_write_ops), static_cast<uint64_t>(m_ops.m_stat_ops),
252  static_cast<uint64_t>(m_ops.m_pgread_ops), static_cast<uint64_t>(m_ops.m_pgwrite_ops), static_cast<uint64_t>(m_ops.m_readv_ops),
253  static_cast<uint64_t>(m_ops.m_readv_segs), static_cast<uint64_t>(m_ops.m_dirlist_ops), static_cast<uint64_t>(m_ops.m_dirlist_entries),
254  static_cast<uint64_t>(m_ops.m_truncate_ops), static_cast<uint64_t>(m_ops.m_unlink_ops), static_cast<uint64_t>(m_ops.m_chmod_ops),
255  static_cast<uint64_t>(m_ops.m_open_ops), static_cast<uint64_t>(m_ops.m_rename_ops),
256  static_cast<uint64_t>(m_slow_ops.m_read_ops), static_cast<uint64_t>(m_slow_ops.m_write_ops), static_cast<uint64_t>(m_slow_ops.m_stat_ops),
257  static_cast<uint64_t>(m_slow_ops.m_pgread_ops), static_cast<uint64_t>(m_slow_ops.m_pgwrite_ops), static_cast<uint64_t>(m_slow_ops.m_readv_ops),
258  static_cast<uint64_t>(m_slow_ops.m_readv_segs), static_cast<uint64_t>(m_slow_ops.m_dirlist_ops), static_cast<uint64_t>(m_slow_ops.m_dirlist_entries),
259  static_cast<uint64_t>(m_slow_ops.m_truncate_ops), static_cast<uint64_t>(m_slow_ops.m_unlink_ops), static_cast<uint64_t>(m_slow_ops.m_chmod_ops),
260  static_cast<uint64_t>(m_slow_ops.m_open_ops), static_cast<uint64_t>(m_slow_ops.m_rename_ops),
261  static_cast<float>(m_times.m_open)/1e9, static_cast<float>(m_times.m_read)/1e9, static_cast<float>(m_times.m_readv)/1e9,
262  static_cast<float>(m_times.m_pgread)/1e9, static_cast<float>(m_times.m_write)/1e9, static_cast<float>(m_times.m_pgwrite)/1e9,
263  static_cast<float>(m_times.m_dirlist)/1e9, static_cast<float>(m_times.m_stat)/1e9, static_cast<float>(m_times.m_truncate)/1e9,
264  static_cast<float>(m_times.m_unlink)/1e9, static_cast<float>(m_times.m_rename)/1e9, static_cast<float>(m_times.m_chmod)/1e9,
265  static_cast<float>(m_slow_times.m_open)/1e9, static_cast<float>(m_slow_times.m_read)/1e9, static_cast<float>(m_slow_times.m_readv)/1e9,
266  static_cast<float>(m_slow_times.m_pgread)/1e9, static_cast<float>(m_slow_times.m_write)/1e9, static_cast<float>(m_slow_times.m_pgwrite)/1e9,
267  static_cast<float>(m_slow_times.m_dirlist)/1e9, static_cast<float>(m_slow_times.m_stat)/1e9, static_cast<float>(m_slow_times.m_truncate)/1e9,
268  static_cast<float>(m_slow_times.m_unlink)/1e9, static_cast<float>(m_slow_times.m_rename)/1e9, static_cast<float>(m_slow_times.m_chmod)/1e9
269 
270  );
271  if (len >= 1500) {
272  m_log.Log(LogMask::Error, "Aggregate", "Failed to generate g-stream statistics packet");
273  return;
274  }
275  m_log.Log(LogMask::Debug, "Aggregate", buf);
276  if (m_gstream && !m_gstream->Insert(buf, len + 1)) {
277  m_log.Log(LogMask::Error, "Aggregate", "Failed to send g-stream statistics packet");
278  return;
279  }
280 }
281 
282 FileSystem::OpTimer::OpTimer(RAtomic_uint64_t &op_count, RAtomic_uint64_t &slow_op_count, RAtomic_uint64_t &timing, RAtomic_uint64_t &slow_timing, std::chrono::steady_clock::duration duration)
283  : m_op_count(op_count),
284  m_slow_op_count(slow_op_count),
285  m_timing(timing),
286  m_slow_timing(slow_timing),
287  m_start(std::chrono::steady_clock::now()),
288  m_slow_duration(duration)
289 {}
290 
291 FileSystem::OpTimer::~OpTimer()
292 {
293  auto dur = std::chrono::steady_clock::now() - m_start;
294  m_op_count++;
295  m_timing += std::chrono::nanoseconds(dur).count();
296  if (dur > m_slow_duration) {
297  m_slow_op_count++;
298  m_slow_timing += std::chrono::nanoseconds(dur).count();
299  }
300 }
#define stat(a, b)
Definition: XrdPosix.hh:101
bool Debug
struct myOpts opts
@ Error
int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *env=0) override
FileSystem(XrdOss *oss, XrdSysLogger *log, const char *configName, XrdOucEnv *envP)
int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0) override
int StatPF(const char *path, struct stat *buff, int opts) override
int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0) override
XrdOssDF * newDir(const char *user=0) override
int StatXA(const char *path, char *buff, int &blen, XrdOucEnv *env=0) override
XrdOssDF * newFile(const char *user=0) override
int Chmod(const char *path, mode_t mode, XrdOucEnv *env=0) override
int Unlink(const char *path, int Opts=0, XrdOucEnv *env=0) override
bool InitSuccessful(std::string &errMsg)
int StatFS(const char *path, char *buff, int &blen, XrdOucEnv *env=0) override
int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *env=0) override
bool Config(const char *configfn)
int StatXP(const char *path, unsigned long long &attr, XrdOucEnv *env=0) override
int StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen) override
virtual int StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen)
Definition: XrdOss.cc:97
virtual int StatXA(const char *path, char *buff, int &blen, XrdOucEnv *envP=0)
Definition: XrdOss.cc:127
virtual int StatXP(const char *path, unsigned long long &attr, XrdOucEnv *envP=0)
Definition: XrdOss.cc:137
virtual XrdOssDF * newDir(const char *tident)=0
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int StatPF(const char *path, struct stat *buff, int opts)
Definition: XrdOss.cc:107
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
virtual int StatFS(const char *path, char *buff, int &blen, XrdOucEnv *envP=0)
Definition: XrdOss.cc:87
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *envP=0)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
char * GetToken(char **rest=0, int lowcase=0)
int Gather(const char *cfname, Level lvl, const char *parms=0)
@ trim_lines
Prefix trimmed lines.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
void setMsgMask(int mask)
Definition: XrdSysError.hh:154
int getMsgMask()
Definition: XrdSysError.hh:156
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:133
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
bool Insert(const char *data, int dlen)
@ Warning
int Opts
Definition: XrdMpxStats.cc:58
bool ParseDuration(const std::string &duration, std::chrono::steady_clock::duration &result, std::string &errmsg)
std::string LogMaskToString(int mask)
XrdOucEnv * envP
Definition: XrdPss.cc:109