XRootD
XrdSsiFileReq.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S s i F i l e R e q . c c */
4 /* */
5 /* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <cstdio>
31 #include <cstring>
32 #include <arpa/inet.h>
33 #include <sys/types.h>
34 
35 #include "XrdOuc/XrdOucBuffer.hh"
36 #include "XrdOuc/XrdOucERoute.hh"
37 #include "XrdOuc/XrdOucErrInfo.hh"
38 #include "XrdSfs/XrdSfsDio.hh"
39 #include "XrdSsi/XrdSsiAlert.hh"
40 #include "XrdSsi/XrdSsiFileReq.hh"
42 #include "XrdSsi/XrdSsiFileSess.hh"
43 #include "XrdSsi/XrdSsiRRAgent.hh"
44 #include "XrdSsi/XrdSsiService.hh"
45 #include "XrdSsi/XrdSsiSfs.hh"
46 #include "XrdSsi/XrdSsiStream.hh"
47 #include "XrdSsi/XrdSsiStats.hh"
48 #include "XrdSsi/XrdSsiTrace.hh"
49 #include "XrdSsi/XrdSsiUtils.hh"
50 #include "XrdSys/XrdSysError.hh"
51 
52 /******************************************************************************/
53 /* L o c a l M a c r o s */
54 /******************************************************************************/
55 
56 #define DEBUGXQ(x) DEBUG(rID<<sessN<<rspstID[urState]<<reqstID[myState]<<x)
57 
58 #define DUMPIT(x,y) XrdSsiUtils::b2x(x,y,hexBuff,sizeof(hexBuff),dotBuff)<<dotBuff
59 
60 /******************************************************************************/
61 /* G l o b a l s */
62 /******************************************************************************/
63 
64 namespace XrdSsi
65 {
67 extern XrdScheduler *Sched;
68 extern XrdSsiService *Service;
70 };
71 
72 using namespace XrdSsi;
73 
74 /******************************************************************************/
75 /* S t a t i c L o c a l s */
76 /******************************************************************************/
77 
78 namespace
79 {
80 const char *rspstID[XrdSsiFileReq::isMax] =
81  {" [new", " [begun", " [bound",
82  " [abort", " [done"
83  };
84 
85 const char *reqstID[XrdSsiFileReq::rsEnd] =
86  {" wtReq] ", " xqReq] ", " wtRsp] ",
87  " doRsp] ", " odRsp] ", " erRsp] "
88  };
89 };
90 
91 /******************************************************************************/
92 /* L o c a l C l a s s e s */
93 /******************************************************************************/
94 
95 namespace
96 {
97 class FinalizeJob : public XrdJob
98 {
99 public:
100 
101 void DoIt() {reqP->Finalize();
102  fileP->DeferredFinalizeDone(reqP, reqID);
103  delete this;
104  }
105 
106  FinalizeJob(XrdSsiFileReq *rP, XrdSsiFileSess *fP, unsigned int id) :
107  reqP(rP), fileP(fP), reqID(id) {}
108  ~FinalizeJob() {}
109 
110 private:
111 XrdSsiFileReq *reqP;
112 XrdSsiFileSess *fileP;
113 unsigned int reqID;
114 };
115 }
116 
117 /******************************************************************************/
118 /* S t a t i c M e m b e r s */
119 /******************************************************************************/
120 
121 XrdSysMutex XrdSsiFileReq::aqMutex;
122 XrdSsiFileReq *XrdSsiFileReq::freeReq = 0;
123 int XrdSsiFileReq::freeCnt = 0;
124 int XrdSsiFileReq::freeMax = 256;
125 
126 /******************************************************************************/
127 /* A c t i v a t e */
128 /******************************************************************************/
129 
131 {
132  EPNAME("Activate");
133 
134 // Do some debugging
135 //
136  DEBUGXQ((oP ? "oucbuff" : "sfsbuff") <<" rqsz=" <<rSz);
137 
138 // Do statistics
139 //
141  Stats.ReqCount++;
142  Stats.ReqBytes += rSz;
143  if (rSz > Stats.ReqMaxsz) Stats.ReqMaxsz = rSz;
145 
146 // Set request buffer pointers
147 //
148  oucBuff = oP;
149  sfsBref = bR;
150  reqSize = rSz;
151 
152 // Now schedule ourselves to process this request. The state is new.
153 //
154  Sched->Schedule((XrdJob *)this);
155 }
156 
157 /******************************************************************************/
158 /* A l e r t */
159 /******************************************************************************/
160 
162 {
163  EPNAME("Alert");
164  XrdSsiAlert *aP;
165  int msgLen;
166 
167 // Do some debugging
168 //
169  aMsg.GetMsg(msgLen);
170  DEBUGXQ(msgLen <<" byte alert presented wtr=" <<respWait);
171 
172 // Add up statistics
173 //
175 
176 // Lock this object
177 //
178  frqMutex.Lock();
179 
180 // Validate the length and whether this call is allowed
181 //
182  if (msgLen <= 0 || haveResp || isEnding)
183  {frqMutex.UnLock();
184  aMsg.RecycleMsg();
185  return;
186  }
187 
188 // Allocate an alert object and chain it into the pending queue
189 //
190  aP = XrdSsiAlert::Alloc(aMsg);
191 
192 // Alerts must be sent in the orer they are presented. So, check if we need
193 // to chain this and try to send the first in the chain. This only really
194 // matters if we can send the alert now because the client is waiting.
195 //
196  if (respWait)
197  {if (alrtPend)
198  {alrtLast->next = aP;
199  alrtLast = aP;
200  aP = alrtPend;
201  alrtPend = alrtPend->next;
202  }
203  WakeUp(aP);
204  } else {
205  if (alrtLast) alrtLast->next = aP;
206  else alrtPend = aP;
207  alrtLast = aP;
208  }
209 
210 // All done
211 //
212  frqMutex.UnLock();
213 }
214 
215 /******************************************************************************/
216 /* A l l o c */
217 /******************************************************************************/
218 
220  XrdSsiFileResource *rP,
221  XrdSsiFileSess *fP,
222  const char *sID,
223  const char *cID,
224  unsigned int rnum)
225 {
226  XrdSsiFileReq *nP;
227 
228 // Check if we can grab this from out queue
229 //
230  aqMutex.Lock();
231  if ((nP = freeReq))
232  {freeCnt--;
233  freeReq = nP->nextReq;
234  aqMutex.UnLock();
235  nP->Init(cID);
236  } else {
237  aqMutex.UnLock();
238  nP = new XrdSsiFileReq(cID);
239  }
240 
241 // Initialize for processing
242 //
243  if (nP)
244  {nP->sessN = sID;
245  nP->fileR = rP;
246  nP->fileP = fP;
247  nP->cbInfo = eiP;
248  nP->reqID = rnum;
249  snprintf(nP->rID, sizeof(nP->rID), "%u:", rnum);
250  }
251 
252 // Return the pointer
253 //
254  return nP;
255 }
256 
257 /******************************************************************************/
258 /* Private: B i n d D o n e */
259 /******************************************************************************/
260 
261 // This is called with frqMutex locked!
262 
263 void XrdSsiFileReq::BindDone()
264 {
265  EPNAME("BindDone");
266 
267 // Do some debugging
268 //
269  DEBUGXQ("Bind called; for request " <<reqID);
270 
271 // Collect statistics
272 //
274 
275 // Processing depends on the current state. Only listed states are valid.
276 // When the state is done, a finished event occuured between the time the
277 // request was handed off to the service but before the service bound it.
278 //
279  switch(urState)
280  {case isBegun: urState = isBound;
281  case isBound: return;
282  break;
283  case isDone: if (!schedDone)
284  {schedDone = true;
285  Sched->Schedule((XrdJob *)this);
286  }
287  return;
288  break;
289  default: break;
290  }
291 
292 // If we get here then we have an invalid state. Report it but otherwise we
293 // can't really do anything else. This means some memory may be lost.
294 //
295  Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
296 }
297 
298 /******************************************************************************/
299 /* D e f e r r e d F i n a l i z e */
300 /******************************************************************************/
301 
303 {
304  Sched->Schedule(new FinalizeJob(this, fileP, reqID));
305 }
306 
307 /******************************************************************************/
308 /* D i s p o s e */
309 /******************************************************************************/
310 
311 void XrdSsiFileReq::Dispose()
312 {
313  EPNAME("Dispose");
314 
315 // Do some debugging
316 //
317  DEBUGXQ("Recycling request...");
318 
319 // Collect statistics
320 //
321  Stats.Bump(Stats.ReqBound, -1);
322 
323 // Simply recycle the object
324 //
325  Recycle();
326 }
327 
328 /******************************************************************************/
329 /* D o I t */
330 /******************************************************************************/
331 
333 {
334  EPNAME("DoIt");
335  bool cancel;
336 
337 // Processing is determined by the responder's state. Only listed states are
338 // valid. Others should never occur in this context.
339 //
340  frqMutex.Lock();
341  switch(urState)
342  {case isNew: myState = xqReq; urState = isBegun;
343  DEBUGXQ("Calling service processor");
344  frqMutex.UnLock();
347  (XrdSsiFileResource &)*fileR);
348  return;
349  break;
350  case isAbort: DEBUGXQ("Skipped calling service processor");
351  frqMutex.UnLock();
353  Recycle();
354  return;
355  break;
356  case isDone: cancel = (myState != odRsp);
357  DEBUGXQ("Calling Finished(" <<cancel <<')');
358  if (respWait) WakeUp();
359  if (finWait) finWait->Post();
360  frqMutex.UnLock();
362  if (cancel) Stats.Bump(Stats.ReqCancels);
363  Finished(cancel); // This object may be deleted!
364  return;
365  break;
366  default: break;
367  }
368 
369 // If we get here then we have an invalid state. Report it but otherwise we
370 // can't really do anything else. This means some memory may be lost.
371 //
372  frqMutex.UnLock();
373  Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
374 }
375 
376 /******************************************************************************/
377 /* D o n e */
378 /******************************************************************************/
379 
380 // Gets invoked only after query() waitresp signal was sent
381 
382 void XrdSsiFileReq::Done(int &retc, XrdOucErrInfo *eiP, const char *name)
383 {
384  EPNAME("Done");
385  XrdSsiMutexMon mHelper(frqMutex);
386 
387 // We may need to delete the errinfo object if this callback was async. Note
388 // that the following test is valid even if the file object has been deleted.
389 //
390  if (eiP != fileP->errInfo()) delete eiP;
391 
392 // Check if we should finalize this request. This will be the case if the
393 // complete response was sent.
394 //
395  if (myState == odRsp)
396  {DEBUGXQ("resp sent; no additional data remains");
397  if (!fileP->DeferFinalize(this,reqID)) Finalize();
398  return;
399  }
400 
401 // Do some debugging
402 //
403  DEBUGXQ("wtrsp sent; resp " <<(haveResp ? "here" : "pend"));
404 
405 // We are invoked when sync() waitresp has been sent, check if a response was
406 // posted while this was going on. If so, make sure to send a wakeup. Note
407 // that the respWait flag is at this moment false as this is called in the
408 // sync response path for fctl() and the response may have been posted.
409 //
410  if (!haveResp) respWait = true;
411  else WakeUp();
412 }
413 
414 /******************************************************************************/
415 /* Private: E m s g */
416 /******************************************************************************/
417 
418 int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
419  int ecode, // The error code
420  const char *op) // Operation being performed
421 {
422  char buffer[2048];
423 
424 // Count errors
425 //
427 
428 // Get correct error code
429 //
430  if (ecode < 0) ecode = -ecode;
431 
432 // Format the error message
433 //
434  XrdOucERoute::Format(buffer, sizeof(buffer), ecode, op, sessN);
435 
436 // Put the message in the log
437 //
438  Log.Emsg(pfx, tident, buffer);
439 
440 // Place the error message in the error object and return
441 //
442  if (cbInfo) cbInfo->setErrInfo(ecode, buffer);
443  return SFS_ERROR;
444 }
445 
446 /******************************************************************************/
447 
448 int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value
449  XrdSsiErrInfo &eObj, // The error description
450  const char *op) // Operation being performed
451 {
452  const char *eMsg;
453  char buffer[2048];
454  int eNum;
455 
456 // Count errors
457 //
459 
460 // Get correct error code and message
461 //
462  eMsg = eObj.Get(eNum).c_str();
463  if (eNum <= 0) eNum = EFAULT;
464  if (!eMsg || !(*eMsg)) eMsg = "reason unknown";
465 
466 // Format the error message
467 //
468  snprintf(buffer, sizeof(buffer), "Unable to %s %s; %s", op, sessN, eMsg);
469 
470 // Put the message in the log
471 //
472  Log.Emsg(pfx, tident, buffer);
473 
474 // Place the error message in the error object and return
475 //
476  if (cbInfo) cbInfo->setErrInfo(eNum, buffer);
477  return SFS_ERROR;
478 }
479 
480 /******************************************************************************/
481 /* F i n a l i z e */
482 /******************************************************************************/
483 
485 {
486  EPNAME("Finalize");
487  XrdSsiMutexMon mHelper(frqMutex);
488  bool cancel = (myState != odRsp);
489 
490 // Release any unsent alerts (prevent any new alerts from being accepted)
491 //
492  isEnding = true;
493  if (alrtSent || alrtPend)
494  {XrdSsiAlert *dP, *aP = alrtSent;
495  if (aP) aP->next = alrtPend;
496  else aP = alrtPend;
497  mHelper.UnLock();
498  while((dP = aP)) {aP = aP->next; dP->Recycle();}
499  mHelper.Lock(frqMutex);
500  }
501 
502 // Processing is determined by the responder's state
503 //
504  switch(urState)
505  // Request is being scheduled, so we can simply abort it.
506  //
507  {case isNew: urState = isAbort;
508  cbInfo = 0;
509  sessN = "???";
511  DEBUGXQ("Aborting request processing");
512  return;
513  break;
514 
515  // Request already handed off but not yet bound. Defer until bound.
516  // We need to wait until this occurs to sequence Unprovision().
517  //
518  case isBegun: urState = isDone;
519  {XrdSysSemaphore wt4fin(0);
520  finWait = &wt4fin;
521  mHelper.UnLock();
522  wt4fin.Wait();
523  }
524  sessN = "n/a";
525  return;
526 
527  // Request is bound so we can finish right off.
528  //
529  case isBound: urState = isDone;
530  if (strBuff) {strBuff->Recycle(); strBuff = 0;}
531  DEBUGXQ("Calling Finished(" <<cancel <<')');
532  if (respWait) WakeUp();
533  mHelper.UnLock();
535  if (cancel) Stats.Bump(Stats.ReqCancels);
536  Finished(cancel); // This object may be deleted!
537  sessN = "n/a";
538  return;
539  break;
540 
541  // The following two cases may happen but it's safe to ignore them.
542  //
543  case isAbort:
544  case isDone: sessN = "bad";
545  return;
546  break;
547  default: break;
548  }
549 
550 // If we get here then we have an invalid state. Report it but otherwise we
551 // can't really do anything else. This means some memory may be lost.
552 //
553  Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
554 }
555 
556 /******************************************************************************/
557 /* G e t R e q u e s t */
558 /******************************************************************************/
559 
561 {
562  EPNAME("GetRequest");
563 
564 // Do some debugging
565 //
566  DEBUGXQ("sz=" <<reqSize);
568 
569 // The request may come from a ouc buffer or an sfs buffer
570 //
571  rLen = reqSize;
572  if (oucBuff) return oucBuff->Data();
573  return XrdSfsXio::Buffer(sfsBref);
574 }
575 
576 /******************************************************************************/
577 /* Private: I n i t */
578 /******************************************************************************/
579 
580 void XrdSsiFileReq::Init(const char *cID)
581 {
582  tident = (cID ? strdup(cID) : strdup("???"));
583  finWait = 0;
584  nextReq = 0;
585  cbInfo = 0;
586  respCB = 0;
587  respCBarg = 0;
588  alrtSent = 0;
589  alrtPend = 0;
590  alrtLast = 0;
591  sessN = "???";
592  oucBuff = 0;
593  sfsBref = 0;
594  strBuff = 0;
595  reqSize = 0;
596  respBuf = 0;
597  respOff = 0;
598  fileSz = 0; // Also does respLen = 0;
599  myState = wtReq;
600  urState = isNew;
601  *rID = 0;
602  schedDone = false;
603  haveResp = false;
604  respWait = false;
605  strmEOF = false;
606  isEnding = false;
608  XrdSsiRRAgent::SetMutex(this, &frqMutex);
609 }
610 
611 /******************************************************************************/
612 /* Protected: P r o c e s s R e s p o n s e */
613 /******************************************************************************/
614 
615 // This is called via the responder with the responder and request locks held.
616 
618  const XrdSsiRespInfo &Resp)
619 {
620  EPNAME("ProcessResponse");
621 
622 // Do some debugging
623 //
624  DEBUGXQ("Response presented wtr=" <<respWait);
625 
626 // Make sure we are still in execute state
627 //
628  if (urState != isBegun && urState != isBound) return false;
629  myState = doRsp;
630  respOff = 0;
631 
632 // Handle the response
633 //
634  switch(Resp.rType)
636  DEBUGXQ("Resp data sz="<<Resp.blen);
637  respLen = Resp.blen;
639  break;
641  DEBUGXQ("Resp err rc="<<Resp.eNum<<" msg="<<Resp.eMsg);
642  respLen = 0;
644  break;
646  DEBUGXQ("Resp file fd="<<Resp.fdnum<<" sz="<<Resp.fsize);
647  fileSz = Resp.fsize;
648  respOff = 0;
650  break;
652  DEBUGXQ("Resp strm");
653  respLen = 0;
655  break;
656  default:
657  DEBUGXQ("Resp invalid!!!!");
658  return false;
660  break;
661  }
662 
663 // If the client is waiting for the response, wake up the client to get it.
664 //
665  haveResp = true;
666  if (respWait) WakeUp();
667  return true;
668 }
669 
670 /******************************************************************************/
671 /* R e a d */
672 /******************************************************************************/
673 
675  char *buff, // Out
676  XrdSfsXferSize blen) // In
677 /*
678  Function: Read `blen' bytes at `offset' into 'buff' and return the actual
679  number of bytes read.
680 
681  Input: buff - Address of the buffer in which to place the data.
682  blen - The size of the buffer. This is the maximum number
683  of bytes that will be returned.
684 
685  Output: Returns the number of bytes read upon success and SFS_ERROR o/w.
686 */
687 {
688  static const char *epname = "read";
689  XrdSfsXferSize nbytes;
690  XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
691 
692 // A read should never be issued unless a response has been set
693 //
694  if (myState != doRsp)
695  {done = true;
696  return (myState == odRsp ? 0 : Emsg(epname, ENOMSG, "read"));
697  }
698 
699 // Fan out based on the kind of response we have
700 //
701  switch(Resp->rType)
703  if (respLen <= 0) {done = true; myState = odRsp; return 0;}
704  if (blen >= respLen)
705  {memcpy(buff, Resp->buff+respOff, respLen);
706  blen = respLen; myState = odRsp; done = true;
707  } else {
708  memcpy(buff, Resp->buff+respOff, blen);
709  respLen -= blen; respOff += blen;
710  }
711  return blen;
712  break;
714  cbInfo->setErrInfo(Resp->eNum, Resp->eMsg);
715  myState = odRsp; done = true;
716  return SFS_ERROR;
717  break;
719  if (fileSz <= 0) {done = true; myState = odRsp; return 0;}
720  nbytes = pread(Resp->fdnum, buff, blen, respOff);
721  if (nbytes <= 0)
722  {done = true;
723  if (!nbytes) {myState = odRsp; return 0;}
724  myState = erRsp;
725  return Emsg(epname, errno, "read");
726  }
727  respOff += nbytes; fileSz -= nbytes;
728  return nbytes;
729  break;
731  nbytes = (Resp->strmP->Type() == XrdSsiStream::isActive ?
732  readStrmA(Resp->strmP, buff, blen)
733  : readStrmP(Resp->strmP, buff, blen));
734  done = strmEOF && strBuff == 0;
735  return nbytes;
736  break;
737  default: break;
738  };
739 
740 // We should never get here
741 //
742  myState = erRsp;
743  done = true;
744  return Emsg(epname, EFAULT, "read");
745 }
746 
747 /******************************************************************************/
748 /* Private: r e a d S t r m A */
749 /******************************************************************************/
750 
751 XrdSfsXferSize XrdSsiFileReq::readStrmA(XrdSsiStream *strmP,
752  char *buff, XrdSfsXferSize blen)
753 {
754  static const char *epname = "readStrmA";
755  XrdSsiErrInfo eObj;
756  XrdSfsXferSize xlen = 0;
757 
758 
759 // Copy out data from the stream to fill the buffer
760 //
761 do{if (strBuff)
762  {if (respLen > blen)
763  {memcpy(buff, strBuff->data+respOff, blen);
764  respLen -= blen; respOff += blen;
765  return xlen+blen;
766  }
767  memcpy(buff, strBuff->data+respOff, respLen);
768  xlen += respLen;
769  strBuff->Recycle(); strBuff = 0;
770  blen -= respLen; buff += respLen;
771  }
772 
773  if (!strmEOF && blen)
774  {respLen = blen; respOff = 0;
775  strBuff = strmP->GetBuff(eObj, respLen, strmEOF);
776  }
777  } while(strBuff);
778 
779 // Check if we have data to return
780 //
781  if (strmEOF) {myState = odRsp; return xlen;}
782  else if (!blen) return xlen;
783 
784 // Report the error
785 //
786  myState = erRsp; strmEOF = true;
787  return Emsg(epname, eObj, "read stream");
788 }
789 
790 /******************************************************************************/
791 /* Private: r e a d S t r m P */
792 /******************************************************************************/
793 
794 XrdSfsXferSize XrdSsiFileReq::readStrmP(XrdSsiStream *strmP,
795  char *buff, XrdSfsXferSize blen)
796 {
797  static const char *epname = "readStrmP";
798  XrdSsiErrInfo eObj;
799  XrdSfsXferSize xlen = 0;
800  int dlen = 0;
801 
802 // Copy out data from the stream to fill the buffer
803 //
804  while(!strmEOF && (dlen = strmP->SetBuff(eObj, buff, blen, strmEOF)) > 0)
805  {xlen += dlen;
806  if (dlen == blen) return xlen;
807  if (dlen > blen) {eObj.Set(0,EOVERFLOW); break;}
808  buff += dlen; blen -= dlen;
809  }
810 
811 // Check if we ended with an zero length read
812 //
813  if (strmEOF || !dlen) {myState = odRsp; strmEOF = true; return xlen;}
814 
815 // Return an error
816 //
817  myState = erRsp; strmEOF = true;
818  return Emsg(epname, eObj, "read stream");
819 }
820 
821 /******************************************************************************/
822 /* Private: R e c y c l e */
823 /******************************************************************************/
824 
825 void XrdSsiFileReq::Recycle()
826 {
827 
828 // If we have an oucbuffer then we need to recycle it, otherwise if we have
829 // and sfs buffer, put it on the deferred release queue.
830 //
831  if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
832  else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
833  reqSize = 0;
834 
835 // Add to queue unless we have too many of these. If we add it back to the
836 // queue; make sure it's a cleaned up object!
837 //
838  aqMutex.Lock();
839  if (tident) {free(tident); tident = 0;}
840  if (freeCnt >= freeMax) {aqMutex.UnLock(); delete this;}
841  else {XrdSsiRRAgent::CleanUp(*this);
842  nextReq = freeReq;
843  freeReq = this;
844  freeCnt++;
845  aqMutex.UnLock();
846  }
847 }
848 
849 /******************************************************************************/
850 /* R e l R e q u e s t B u f f e r */
851 /******************************************************************************/
852 
854 {
855  EPNAME("RelReqBuff");
856  XrdSsiMutexMon mHelper(frqMutex);
857 
858 // Do some debugging
859 //
860  DEBUGXQ("called");
862 
863 // Release buffers
864 //
865  if (oucBuff) {oucBuff->Recycle(); oucBuff = 0;}
866  else if (sfsBref) {XrdSfsXio::Reclaim(sfsBref); sfsBref = 0;}
867  reqSize = 0;
868 }
869 
870 /******************************************************************************/
871 /* S e n d */
872 /******************************************************************************/
873 
875 {
876  static const char *epname = "send";
877  XrdSsiRespInfo const *Resp = XrdSsiRRAgent::RespP(this);
878  XrdOucSFVec sfVec[2];
879  int rc;
880 
881 // A send should never be issued unless a response has been set. Return a
882 // continuation which will cause Read() to be called to return the error.
883 //
884  if (myState != doRsp) return 1;
885 
886 // Fan out based on the kind of response we have
887 //
888  switch(Resp->rType)
890  if (blen > 0)
891  {sfVec[1].buffer = (char *)Resp->buff+respOff;
892  sfVec[1].fdnum = -1;
893  if (blen > respLen)
894  {blen = respLen; myState = odRsp;
895  } else {
896  respLen -= blen; respOff += blen;
897  }
898  } else blen = 0;
899  break;
901  return 1; // Causes error to be returned via Read()
902  break;
904  if (fileSz > 0)
905  {sfVec[1].offset = respOff; sfVec[1].fdnum = Resp->fdnum;
906  if (blen > fileSz)
907  {blen = fileSz; myState = odRsp;}
908  respOff += blen; fileSz -= blen;
909  } else blen = 0;
910  break;
912  if (Resp->strmP->Type() == XrdSsiStream::isPassive) return 1;
913  return sendStrmA(Resp->strmP, sfDio, blen);
914  break;
915  default: myState = erRsp;
916  return Emsg(epname, EFAULT, "send");
917  break;
918  };
919 
920 // Send off the data
921 //
922  if (!blen) {sfVec[1].buffer = rID; myState = odRsp;}
923  sfVec[1].sendsz = blen;
924  rc = sfDio->SendFile(sfVec, 2);
925 
926 // If send succeeded, indicate the action to be taken
927 //
928  if (!rc) return myState != odRsp;
929 
930 // The send failed, diagnose the problem
931 //
932  rc = (rc < 0 ? EIO : EFAULT);
933  myState = erRsp;
934  return Emsg(epname, rc, "send");
935 }
936 
937 /******************************************************************************/
938 /* Private: s e n d S t r m A */
939 /******************************************************************************/
940 
941 int XrdSsiFileReq::sendStrmA(XrdSsiStream *strmP,
942  XrdSfsDio *sfDio, XrdSfsXferSize blen)
943 {
944  static const char *epname = "sendStrmA";
945  XrdSsiErrInfo eObj;
946  XrdOucSFVec sfVec[2];
947  int rc;
948 
949 // Check if we need a buffer
950 //
951  if (!strBuff)
952  {respLen = blen;
953  if (strmEOF || !(strBuff = strmP->GetBuff(eObj, respLen, strmEOF)))
954  {myState = odRsp; strmEOF = true;
955  if (!strmEOF) Emsg(epname, eObj, "read stream");
956  return 1;
957  }
958  respOff = 0;
959  }
960 
961 // Complete the sendfile vector
962 //
963  sfVec[1].buffer = strBuff->data+respOff;
964  sfVec[1].fdnum = -1;
965  if (respLen > blen)
966  {sfVec[1].sendsz = blen;
967  respLen -= blen; respOff += blen;
968  } else {
969  sfVec[1].sendsz = respLen;
970  respLen = 0;
971  }
972 
973 // Send off the data
974 //
975  rc = sfDio->SendFile(sfVec, 2);
976 
977 // Release any completed buffer
978 //
979  if (strBuff && !respLen) {strBuff->Recycle(); strBuff = 0;}
980 
981 // If send succeeded, indicate the action to be taken
982 //
983  if (!rc) return myState != odRsp;
984 
985 // The send failed, diagnose the problem
986 //
987  rc = (rc < 0 ? EIO : EFAULT);
988  myState = erRsp; strmEOF = true;
989  return Emsg(epname, rc, "send");
990 }
991 
992 /******************************************************************************/
993 /* W a n t R e s p o n s e */
994 /******************************************************************************/
995 
997 {
998  EPNAME("WantResp");
999  XrdSsiMutexMon frqMon;
1000  const XrdSsiRespInfo *rspP;
1001 
1002 // Check if we have a previos alert that was sent (we need to recycle it). We
1003 // don't need a lock for this as it's fully serialized via serial fsctl calls.
1004 //
1005  if (alrtSent) {alrtSent->Recycle(); alrtSent = 0;}
1006 
1007 // Serialize the remainder of this code
1008 //
1009  frqMon.Lock(frqMutex);
1010  rspP = XrdSsiRRAgent::RespP(this);
1011 
1012 // If we have a pending alert then we need to send it now. Suppress the callback
1013 // as we will recycle the alert on the next call (there should be one).
1014 //
1015  if (alrtPend)
1016  {char hexBuff[16], binBuff[8], dotBuff[4];
1017  alrtSent = alrtPend;
1018  if (!(alrtPend = alrtPend->next)) alrtLast = 0;
1019  int n = alrtSent->SetInfo(eInfo, binBuff, sizeof(binBuff));
1020  eInfo.setErrCB((XrdOucEICB *)0);
1021  DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
1022  <<(alrtPend ? "" : "no ") <<"more pending");
1023  return true;
1024  }
1025 
1026 // Check if a response is here (well, ProcessResponse was called)
1027 //
1028 // if (rspP->rType)
1029  if (haveResp)
1030  {respCBarg = 0;
1031  if (fileP->AttnInfo(eInfo, rspP, reqID)) myState = odRsp;
1032  else eInfo.setErrCB((XrdOucEICB *)0);
1033  return true;
1034  }
1035 
1036 // Defer this and record the callback arguments. We defer setting respWait
1037 // to true until we know the deferal request has been sent (i.e. when Done()
1038 // is called). This forces ProcessResponse() to not prematurely wakeup the
1039 // client. This is necessitated by the fact that we must release the request
1040 // lock upon return; allowing a response to come in while the deferal request
1041 // is still in transit.
1042 //
1043  respCB = eInfo.getErrCB(respCBarg);
1044  respWait = false;
1045  return false;
1046 }
1047 
1048 /******************************************************************************/
1049 /* Private: W a k e U p */
1050 /******************************************************************************/
1051 
1052 void XrdSsiFileReq::WakeUp(XrdSsiAlert *aP) // Called with frqMutex locked!
1053 {
1054  EPNAME("WakeUp");
1055  XrdOucErrInfo *wuInfo =
1056  new XrdOucErrInfo(tident,(XrdOucEICB *)0,respCBarg);
1057  const XrdSsiRespInfo *rspP = XrdSsiRRAgent::RespP(this);
1058  int respCode = SFS_DATAVEC;
1059 
1060 // Do some debugging
1061 //
1062  DEBUGXQ("respCBarg=" <<Xrd::hex <<respCBarg <<Xrd::dec);
1063 
1064 // Setup the wakeup data. This may be for an alert or for an actual response.
1065 // If this is an alert or the complete response, then make sure we get a
1066 // callback to do the finalization. Otherwise, we don't need a callback
1067 // and the callback handler will simply delete the error object for us.
1068 //
1069  if (aP)
1070  {char hexBuff[16], binBuff[8], dotBuff[4];
1071  int n = aP->SetInfo(*wuInfo, binBuff, sizeof(binBuff));
1072  wuInfo->setErrCB((XrdOucEICB *)aP, respCBarg);
1073  DEBUGXQ(n <<" byte alert (0x" <<DUMPIT(binBuff, n) <<") sent; "
1074  <<(alrtPend ? "" : "no ") <<"more pending");
1075  } else {
1076  if (fileP->AttnInfo(*wuInfo, rspP, reqID))
1077  {wuInfo->setErrArg(respCBarg); myState = odRsp;}
1078  }
1079 
1080 // Tell the client to issue a read now or handle the alert or full response.
1081 //
1082  respWait = false;
1083  respCB->Done(respCode, wuInfo, sessN);
1085 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
#define tident
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define pread(a, b, c, d)
Definition: XrdPosix.hh:80
#define eMsg(x)
#define SFS_DATAVEC
#define SFS_ERROR
int XrdSfsXferSize
class XrdBuffer * XrdSfsXioHandle
Definition: XrdSfsXio.hh:46
#define DEBUGXQ(x)
#define DUMPIT(x, y)
if(Avsz)
Definition: XrdJob.hh:43
static int Format(char *buff, int blen, int ecode, const char *etxt1, const char *etxt2=0)
Definition: XrdOucERoute.cc:44
XrdOucEICB * getErrCB()
void setErrArg(unsigned long long cbarg=0)
void setErrCB(XrdOucEICB *cb, unsigned long long cbarg=0)
XrdSysMutex statsMutex
Definition: XrdOucStats.hh:55
void Bump(int &val)
Definition: XrdOucStats.hh:47
void Schedule(XrdJob *jp)
virtual int SendFile(int fildes)=0
static void Reclaim(XrdSfsXioHandle theHand)
Definition: XrdSfsXio.cc:70
static char * Buffer(XrdSfsXioHandle theHand, int *buffsz=0)
Definition: XrdSfsXio.cc:61
void Recycle()
Definition: XrdSsiAlert.cc:98
static XrdSsiAlert * Alloc(XrdSsiRespInfoMsg &aMsg)
Definition: XrdSsiAlert.cc:52
int SetInfo(XrdOucErrInfo &eInfo, char *aMsg, int aLen)
Definition: XrdSsiAlert.cc:117
XrdSsiAlert * next
Definition: XrdSsiAlert.hh:41
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
const std::string & Get(int &eNum) const
void Alert(XrdSsiRespInfoMsg &aMsg)
Send or receive a server generated alert.
bool WantResponse(XrdOucErrInfo &eInfo)
XrdSfsXferSize Read(bool &done, char *buffer, XrdSfsXferSize blen)
char * GetRequest(int &rLen)
void RelRequestBuffer()
bool ProcessResponse(const XrdSsiErrInfo &eInfo, const XrdSsiRespInfo &resp)
void DeferredFinalize()
int Send(XrdSfsDio *sfDio, XrdSfsXferSize size)
static XrdSsiFileReq * Alloc(XrdOucErrInfo *eP, XrdSsiFileResource *rP, XrdSsiFileSess *fP, const char *sn, const char *id, unsigned int rnum)
void Activate(XrdOucBuffer *oP, XrdSfsXioHandle bR, int rSz)
void Done(int &Result, XrdOucErrInfo *cbInfo, const char *path=0)
void Lock(XrdSsiMutex *mutex)
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static XrdSsiRespInfo * RespP(XrdSsiRequest *rP)
static void onServer(XrdSsiRequest *rP)
static void CleanUp(XrdSsiRequest &reqR)
char * GetMsg(int &mlen)
virtual void RecycleMsg(bool sent=true)=0
virtual void ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef)=0
Process a request; client-side or server-side.
long long ReqMaxsz
Definition: XrdSsiStats.hh:41
long long ReqBytes
Definition: XrdSsiStats.hh:40
virtual bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen)
virtual Buffer * GetBuff(XrdSsiErrInfo &eRef, int &dlen, bool &last)
Definition: XrdSsiStream.hh:93
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSsiStats Stats
XrdSsiService * Service
XrdScheduler * Sched
XrdSysError Log
@ dec
Definition: XrdSysTrace.hh:42
@ hex
Definition: XrdSysTrace.hh:42
int fdnum
File descriptor for data.
Definition: XrdOucSFVec.hh:47
int sendsz
Length of data at offset.
Definition: XrdOucSFVec.hh:46