XRootD
XrdXrootdTransit.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d T r a n s i t . c c */
4 /* */
5 /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstring>
32 #include <unistd.h>
33 #include <sys/uio.h>
34 
35 #include "XProtocol/XProtocol.hh"
36 
37 #include "XrdSec/XrdSecEntity.hh"
38 
39 #include "Xrd/XrdBuffer.hh"
40 #include "Xrd/XrdLink.hh"
41 #include "XrdOuc/XrdOucErrInfo.hh"
42 #include "XrdOuc/XrdOucUtils.hh"
44 #include "XrdSys/XrdSysAtomics.hh"
50 
51 /******************************************************************************/
52 /* C l o b a l S y m b o l s */
53 /******************************************************************************/
54 
56 
57 #undef TRACELINK
58 #define TRACELINK Link
59 
60 #define XRD_GETNUM(x)\
61  ntohl(*(static_cast<unsigned int *>(static_cast<void *>(x))))
62 
63 /******************************************************************************/
64 /* S t a t i c M e m b e r s */
65 /******************************************************************************/
66 
67 const char *XrdXrootdTransit::reqTab = XrdXrootdTransit::ReqTable();
68 
70  XrdXrootdTransit::TranStack("TranStack",
71  "transit protocol anchor");
72 
73 /******************************************************************************/
74 /* A l l o c */
75 /******************************************************************************/
76 
78  XrdLink *linkP,
79  XrdSecEntity *seceP,
80  const char *nameP,
81  const char *protP
82  )
83 {
84  XrdXrootdTransit *xp;
85 
86 // Simply return a new transit object masquerading as a bridge
87 //
88  if (!(xp = TranStack.Pop())) xp = new XrdXrootdTransit();
89  xp->Init(rsltP, linkP, seceP, nameP, protP);
90  return xp;
91 }
92 
93 /******************************************************************************/
94 /* A t t n */
95 /******************************************************************************/
96 
97 int XrdXrootdTransit::Attn(XrdLink *lP, short *theSID, int rcode,
98  const struct iovec *ioV, int ioN, int ioL)
99 {
100  XrdXrootdTransPend *tP;
101 
102 // Find the request
103 //
104  if (!(tP = XrdXrootdTransPend::Remove(lP, *theSID)))
105  {TRACE(REQ, "Unable to find request for " <<lP->ID <<" sid=" <<*theSID);
106  return 0;
107  }
108 
109 // Resume the request as we have been waiting for the response.
110 //
111  return tP->bridge->AttnCont(tP, rcode, ioV, ioN, ioL);
112 }
113 
114 /******************************************************************************/
115 /* A t t n C o n t */
116 /******************************************************************************/
117 
118 int XrdXrootdTransit::AttnCont(XrdXrootdTransPend *tP, int rcode,
119  const struct iovec *ioV, int ioN, int ioL)
120 {
121  int rc;
122 
123 // we ensure waitPend has been cleared. This is to allow the process or
124 // redrive loops to have taken the correct action before we zero the
125 // runWait value.
126 //
127  {
128  XrdSysCondVarHelper clk(waitCnd);
129  while(waitPend) waitCnd.Wait();
130  }
131 
132 // Refresh the request structure
133 //
134  memcpy(&Request, &(tP->Pend.Request), sizeof(Request));
135  delete tP;
136  runWait = 0;
137 
138 // Reissue the request if it's a wait 0 response.
139 //
140  if (rcode==kXR_wait
141  && (!ioN || XRD_GETNUM(ioV[0].iov_base) == 0))
142  {// we don't set waitPend as the job can start straight away
143  Sched->Schedule((XrdJob *)&waitJob);
144  return 0;
145  }
146 
147 // Send off the deferred response
148 //
149  rc = Send(rcode, ioV, ioN, ioL);
150 
151 // Handle end based on current state
152 //
153  if (rc < 0) return rc;
154 
155  if (!runWait)
156  {if (runDone) runStatus.store(0, std::memory_order_release);
157  if (reInvoke) Sched->Schedule((XrdJob *)&respJob);
158  else Link->Enable();
159  }
160  else
161  {XrdSysCondVarHelper clk(waitCnd);
162  waitPend = false;
163  waitCnd.Broadcast();
164  }
165 
166 // All done
167 //
168  return rc;
169 }
170 
171 /******************************************************************************/
172 /* D i s c */
173 /******************************************************************************/
174 
176 {
177  char buff[128];
178 
179 // We do not allow disconnection while we are active
180 //
181  if (runStatus.fetch_add(1, std::memory_order_acq_rel)) return false;
182 
183 // Reconnect original protocol to the link
184 //
185  Link->setProtocol(realProt);
186 
187 // Now we need to recycle our xrootd part
188 //
189  sprintf(buff, "%s disconnection", pName);
190  XrdXrootdProtocol::Recycle(Link, time(0)-cTime, buff);
191 
192 // Make sure that any pending wait jobs can exit
193 //
194  {
195  XrdSysCondVarHelper clk(waitCnd);
196  waitPend = false;
197  runWait = 0;
198  waitCnd.Broadcast();
199  }
200 
201 // Now just free up our object.
202 //
203  TranStack.Push(&TranLink);
204  return true;
205 }
206 
207 /******************************************************************************/
208 /* Private: F a i l */
209 /******************************************************************************/
210 
211 bool XrdXrootdTransit::Fail(int ecode, const char *etext)
212 {
213  runError = ecode;
214  runEText = etext;
215  return true;
216 }
217 
218 /******************************************************************************/
219 /* F a t a l */
220 /******************************************************************************/
221 
222 int XrdXrootdTransit::Fatal(int rc)
223 {
226 
227  return (respObj->Error(rInfo, runError, runEText) ? rc : -1);
228 }
229 
230 /******************************************************************************/
231 /* I n i t */
232 /******************************************************************************/
233 
234 void XrdXrootdTransit::Init(XrdScheduler *schedP, int qMax, int qTTL)
235 {
236  TranStack.Set(schedP, &XrdXrootdTrace, TRACE_MEM);
237  TranStack.Set(qMax, qTTL);
238 }
239 
240 /******************************************************************************/
241 
243  XrdLink *linkP,
244  XrdSecEntity *seceP,
245  const char *nameP,
246  const char *protP
247  )
248 {
249  XrdNetAddrInfo *addrP;
250  const char *who;
251  char uname[sizeof(Request.login.username)+1];
252 
253 // Set standard stuff
254 //
255  runArgs = 0;
256  runALen = 0;
257  runABsz = 0;
258  runError = 0;
259  runStatus.store(0, std::memory_order_release);
260  runWait = 0;
261  runWTot = 0;
262  runWMax = 3600;
263  runWCall = false;
264  runDone = false;
265  reInvoke = false;
266  waitPend = false;
267  wBuff = 0;
268  wBLen = 0;
269  respObj = respP;
270  pName = protP;
271  mySID = getSID();
272 
273 // Bind the protocol to the link
274 //
275  SI->Bump(SI->Count);
276  Link = linkP;
277  Response.Set(linkP);
278  Response.Set(this);
279  strcpy(Entity.prot, "host");
280  Entity.host = (char *)linkP->Host();
281 
282 // Develop a trace identifier
283 //
284  strncpy(uname, nameP, sizeof(uname)-1);
285  uname[sizeof(uname)-1] = 0;
286  XrdOucUtils::Sanitize(uname);
287  linkP->setID(uname, mySID);
288 
289 // Place trace identifier everywhere is should be located
290 
291 // Indicate that this brige supports asynchronous responses
292 //
294 
295 // Mark the client as IPv4 if they came in as IPv4 or mapped IPv4. Note
296 // there is no way we can figure out if this is a dual-stack client.
297 //
298  addrP = Link->AddrInfo();
299  if (addrP->isIPType(XrdNetAddrInfo::IPv4) || addrP->isMapped())
301 
302 // Mark the client as being on a private net if the address is private
303 //
304  if (addrP->isPrivate()) {clientPV |= XrdOucEI::uPrip; rdType = 1;}
305  else rdType = 0;
306 
307 // Now tie the security information
308 //
309  Client = (seceP ? seceP : &Entity);
310  Client->ueid = mySID;
311  Client->tident = Client->pident = Link->ID;
312  Client->addrInfo = addrP;
313 
314 // Allocate a monitoring object, if needed for this connection and record login
315 //
316  if (Monitor.Ready())
317  {Monitor.Register(linkP->ID, linkP->Host(), protP);
318  if (Monitor.Logins())
319  {if (Monitor.Auths() && seceP) MonAuth();
320  else Monitor.Report(Monitor.Auths() ? "" : 0);
321  }
322  }
323 
324 // Complete the request ID object
325 //
326  ReqID.setID(Request.header.streamid, linkP->FDnum(), linkP->Inst());
327 
328 // Substitute our protocol for the existing one
329 //
330  realProt = linkP->setProtocol(this);
331  linkP->setProtName(protP);
332  linkP->armBridge();
333 
334 // Document this login
335 //
336  who = (seceP && seceP->name ? seceP->name : "nobody");
337  eDest.Log(SYS_LOG_01, "Bridge", Link->ID, "login as", who);
338 
339 // All done, indicate we are logged in
340 //
342  cTime = time(0);
343 
344 // Propogate a connect through the whole system
345 //
346  osFS->Connect(Client);
347 }
348 
349 /******************************************************************************/
350 /* P r o c e e d */
351 /******************************************************************************/
352 
354 {
355  int rc;
356 
357 // If we were interrupted in a reinvoke state, resume that state.
358 //
359  if (reInvoke) rc = Process(Link);
360  else rc = 0;
361 
362 // Handle ending status
363 //
364  if (rc >= 0) Link->Enable();
365  else if (rc != -EINPROGRESS) Link->Close();
366 }
367 
368 /******************************************************************************/
369 /* P r o c e s s */
370 /******************************************************************************/
371 
373 {
374  int rc;
375 
376 // This entry is serialized via link processing and data is now available.
377 // One of the following will be returned.
378 //
379 // < 0 -> Stop getting requests,
380 // -EINPROGRESS leave link disabled but otherwise all is well
381 // -n Error, disable and close the link
382 // = 0 -> OK, get next request, if allowed, o/w enable the link
383 // > 0 -> Slow link, stop getting requests and enable the link
384 //
385 
386 // Reflect data is present to the underlying protocol and if Run() has been
387 // called we need to dispatch that request. This may be iterative.
388 //
389 do{rc = realProt->Process((reInvoke ? 0 : lp));
390  if (rc >= 0 && runStatus.load(std::memory_order_acquire))
391  {reInvoke = (rc == 0);
392  if (runError) rc = Fatal(rc);
393  else {runDone = false;
394  rc = (Resume ? XrdXrootdProtocol::Process(lp) : Process2());
395  if (rc >= 0)
396  {if (runDone) runStatus.store(0, std::memory_order_release);
397  if (runWait)
398  {XrdSysCondVarHelper clk(waitCnd);
399  waitPend = false;
400  waitCnd.Broadcast();
401  return -EINPROGRESS;
402  }
403  if (!runDone) return rc;
404  }
405  }
406  } else reInvoke = false;
407  } while(rc >= 0 && reInvoke);
408 
409 // Make sure that we indicate that we are no longer active
410 //
411  runStatus.store(0, std::memory_order_release);
412 
413 // All done
414 //
415  return (rc ? rc : 1);
416 }
417 
418 /******************************************************************************/
419 /* R e c y c l e */
420 /******************************************************************************/
421 
422 void XrdXrootdTransit::Recycle(XrdLink *lp, int consec, const char *reason)
423 {
424 
425 // Set ourselves as active so we can't get more requests
426 //
427  runStatus.fetch_add(1, std::memory_order_acq_rel);
428 
429 // If we were active then we will need to quiesce before dismantling ourselves.
430 // Note that Recycle() can only be called if the link is enabled. So, this bit
431 // of code is improbable but we check it anyway.
432 //
433  if (runWait > 0) {
434  TRACEP(EMSG, "WARNING: Recycle is canceling wait job; the wait job might already be running during recycle.");
435  Sched->Cancel(&waitJob);
436  }
437 
438 // First we need to recycle the real protocol
439 //
440  if (realProt) realProt->Recycle(lp, consec, reason);
441 
442 // Now we need to recycle our xrootd part
443 //
444  XrdXrootdProtocol::Recycle(lp, consec, reason);
445 
446 // Release the argument buffer
447 //
448  if (runArgs) {free(runArgs); runArgs = 0;}
449 
450 // Delete all pending requests
451 //
453 
454 // Make sure that any pending wait jobs can exit
455 //
456  {
457  XrdSysCondVarHelper clk(waitCnd);
458  waitPend = false;
459  runWait = 0;
460  waitCnd.Broadcast();
461  }
462 
463 // Now just free up our object.
464 //
465  TranStack.Push(&TranLink);
466 }
467 
468 /******************************************************************************/
469 /* R e d r i v e */
470 /******************************************************************************/
471 
473 {
474  static int eCode = htonl(kXR_NoMemory);
475  static char eText[] = "Insufficent memory to re-issue request";
476  static struct iovec ioV[] = {{(char *)&eCode,sizeof(eCode)},
477  {(char *)&eText,sizeof(eText)}};
478  int rc;
479 
480 // we ensure waitPend has been cleared. This is to allow the process or
481 // redrive loops to have taken the correct action before we zero the
482 // runWait value.
483 //
484  {
485  XrdSysCondVarHelper clk(waitCnd);
486  while(waitPend) waitCnd.Wait();
487  }
488 
489 // Do some tracing
490 //
491  TRACEP(REQ, "Bridge redrive runStatus="<<runStatus.load(std::memory_order_acquire)
492  <<" runError="<<runError
493  <<" runWait="<<runWait<<" runWTot="<<runWTot);
494 
495 // Update wait statistics
496 //
497  runWTot += runWait;
498  runWait = 0;
499 
500 // While we are running asynchronously, there is no way that this object can
501 // be deleted while a timer is outstanding as the link has been disabled. So,
502 // we can reissue the request with little worry.
503 //
504 // This is a bit tricky here as a redriven request may result in a wait. If
505 // this happens we cannot hand the result off to the real protocol until we
506 // wait and successfully redrive. The wait handling occurs asynchronously
507 // so all we need to do is honor it here.
508 //
509  if (!runALen || RunCopy(runArgs, runALen)) {
510  do{runDone = false;
511  rc = Process2();
512  TRACEP(REQ, "Bridge redrive Process2 rc="<<rc
513  <<" runError="<<runError<<" runWait="<<runWait);
514  if (rc < 0) break;
515  if (runDone) runStatus.store(0, std::memory_order_release);
516  if (runWait || !runDone || !reInvoke) break;
517  rc = realProt->Process(NULL);
518  TRACEP(REQ, "Bridge redrive callback rc="<<rc
519  <<" runStatus="<<runStatus.load(std::memory_order_acquire));
520  if (rc < 0 || !runStatus.load(std::memory_order_acquire))
521  {reInvoke = false;
522  break;
523  }
524  reInvoke = (rc == 0);
525  if (runError) rc = Fatal(rc);
526  } while((rc >= 0) && !runError && !runWait);
527  }
528  else rc = Send(kXR_error, ioV, 2, 0);
529 
530 // Defer the request if need be
531 //
532  if (rc >= 0 && runWait)
533  {XrdSysCondVarHelper clk(waitCnd);
534  waitPend = false;
535  waitCnd.Broadcast();
536  return;
537  }
538  runWTot = 0;
539 
540 // Indicate we are no longer active
541 //
542  runStatus.store(0, std::memory_order_release);
543 
544 // If the link needs to be terminated, terminate the link. Otherwise, we can
545 // enable the link for new requests at this point.
546 //
547  if (rc < 0) Link->Close();
548  else Link->Enable();
549 }
550 
551 /******************************************************************************/
552 /* R e q T a b l e */
553 /******************************************************************************/
554 
555 #define KXR_INDEX(x) x-kXR_auth
556 
558 {
559  static char rTab[kXR_truncate-kXR_auth+1];
560 
561 // Initialize the table
562 //
563  memset(rTab, 0, sizeof(rTab));
564  rTab[KXR_INDEX(kXR_chmod)] = 1;
565  rTab[KXR_INDEX(kXR_close)] = 1;
566  rTab[KXR_INDEX(kXR_dirlist)] = 1;
567  rTab[KXR_INDEX(kXR_locate)] = 1;
568  rTab[KXR_INDEX(kXR_mkdir)] = 1;
569  rTab[KXR_INDEX(kXR_mv)] = 1;
570  rTab[KXR_INDEX(kXR_open)] = 1;
571  rTab[KXR_INDEX(kXR_prepare)] = 1;
572  rTab[KXR_INDEX(kXR_protocol)] = 1;
573  rTab[KXR_INDEX(kXR_query)] = 1;
574  rTab[KXR_INDEX(kXR_read)] = 2;
575  rTab[KXR_INDEX(kXR_readv)] = 2;
576  rTab[KXR_INDEX(kXR_rm)] = 1;
577  rTab[KXR_INDEX(kXR_rmdir)] = 1;
578  rTab[KXR_INDEX(kXR_set)] = 1;
579  rTab[KXR_INDEX(kXR_stat)] = 1;
580  rTab[KXR_INDEX(kXR_statx)] = 1;
581  rTab[KXR_INDEX(kXR_sync)] = 1;
582  rTab[KXR_INDEX(kXR_truncate)] = 1;
583  rTab[KXR_INDEX(kXR_write)] = 2;
584 
585 // Now return the address
586 //
587  return rTab;
588 }
589 
590 /******************************************************************************/
591 /* Private: R e q W r i t e */
592 /******************************************************************************/
593 
594 bool XrdXrootdTransit::ReqWrite(char *xdataP, int xdataL)
595 {
596 
597 // Make sure we always transit to the resume point
598 //
599  myBlen = 0;
600 
601 // If nothing was read, then this is a straight-up write
602 //
603  if (!xdataL || !xdataP || !Request.header.dlen)
604  {Resume = 0; wBuff = xdataP; wBLen = xdataL;
605  return true;
606  }
607 
608 // Partial data was read, we may have to split this between a direct write
609 // and a network read/write -- somewhat complicated.
610 //
611  myBuff = wBuff = xdataP;
612  myBlast = wBLen = xdataL;
614  return true;
615 }
616 
617 /******************************************************************************/
618 /* R u n */
619 /******************************************************************************/
620 
621 bool XrdXrootdTransit::Run(const char *xreqP, char *xdataP, int xdataL)
622 {
623  int movLen;
624 
625 // We do not allow re-entry if we are curently processing a request.
626 // It will be reset, as need, when a response is effected.
627 //
628 
629  if (runStatus.fetch_add(1, std::memory_order_acq_rel))
630  {TRACEP(REQ, "Bridge request failed due to re-entry");
631  return false;
632  }
633 
634 // Copy the request header
635 //
636  memcpy((void *)&Request, (void *)xreqP, sizeof(Request));
637 
638 // Validate that we can actually handle this request
639 //
641  if (Request.header.requestid & 0x8000
642  || Request.header.requestid > static_cast<kXR_unt16>(kXR_truncate)
643  || !reqTab[Request.header.requestid - kXR_auth])
644  {TRACEP(REQ, "Unsupported bridge request");
645  return Fail(kXR_Unsupported, "Unsupported bridge request");
646  }
647 
648 // Validate the data length
649 //
651  if (Request.header.dlen < 0)
652  {TRACEP(REQ, "Invalid request data length");
653  return Fail(kXR_ArgInvalid, "Invalid request data length");
654  }
655 
656 // Copy the stream id and trace this request
657 //
659  TRACEP(REQ, "Bridge req=" <<Request.header.requestid
660  <<" dlen=" <<Request.header.dlen <<" blen=" <<xdataL);
661 
662 // If this is a write request, we will need to do a lot more
663 //
664  if (Request.header.requestid == kXR_write) return ReqWrite(xdataP, xdataL);
665 
666 // Obtain any needed buffer and handle any existing data arguments. Also, we
667 // need to keep a shadow copy of the request arguments should we get a wait
668 // and will need to re-issue the request (the server mangles the args).
669 //
670  if (Request.header.dlen)
671  {movLen = (xdataL < Request.header.dlen ? xdataL : Request.header.dlen);
672  if (!RunCopy(xdataP, movLen)) return true;
673  if (!runArgs || movLen > runABsz)
674  {if (runArgs) free(runArgs);
675  if (!(runArgs = (char *)malloc(movLen)))
676  {TRACEP(REQ, "Failed to allocate memory");
677  return Fail(kXR_NoMemory, "Insufficient memory");
678  }
679  runABsz = movLen;
680  }
681  memcpy(runArgs, xdataP, movLen); runALen = movLen;
682  if ((myBlen = Request.header.dlen - movLen))
683  {myBuff = argp->buff + movLen;
685  return true;
686  }
687  } else runALen = 0;
688 
689 // If we have all the data, indicate request accepted.
690 //
691  runError = 0;
692  Resume = 0;
693  return true;
694 }
695 
696 /******************************************************************************/
697 /* Privae: R u n C o p y */
698 /******************************************************************************/
699 
700 bool XrdXrootdTransit::RunCopy(char *buffP, int buffL)
701 {
702 
703 // Allocate a buffer if we do not have one or it is too small
704 //
705  if (!argp || Request.header.dlen+1 > argp->bsize)
706  {if (argp) BPool->Release(argp);
707  if (!(argp = BPool->Obtain(Request.header.dlen+1)))
708  {Fail(kXR_ArgTooLong, "Request argument too long"); return false;}
709  hcNow = hcPrev; halfBSize = argp->bsize >> 1;
710  }
711 
712 // Copy the arguments to the buffer
713 //
714  memcpy(argp->buff, buffP, buffL);
715  argp->buff[buffL] = 0;
716  return true;
717 }
718 
719 /******************************************************************************/
720 /* S e n d */
721 /******************************************************************************/
722 
723 int XrdXrootdTransit::Send(int rcode, const struct iovec *ioV, int ioN, int ioL)
724 {
727  const char *eMsg;
728  int rc;
729  bool aOK;
730 
731 // Invoke the result object (we initially assume this is the final result)
732 //
733  runDone = true;
734  switch(rcode)
735  {case kXR_error:
736  rc = XRD_GETNUM(ioV[0].iov_base);
737  eMsg = (ioN < 2 ? "" : (const char *)ioV[1].iov_base);
738  if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
739  aOK = respObj->Error(rInfo, rc, eMsg);
740  break;
741  case kXR_ok:
742  if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
743  aOK = (ioN ? respObj->Data(rInfo, ioV, ioN, ioL, true)
744  : respObj->Done(rInfo));
745  break;
746  case kXR_oksofar:
747  aOK = respObj->Data(rInfo, ioV, ioN, ioL, false);
748  runDone = false;
749  break;
750  case kXR_redirect:
751  if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
752  rc = XRD_GETNUM(ioV[0].iov_base);
753  aOK = respObj->Redir(rInfo,rc,(const char *)ioV[1].iov_base);
754  break;
755  case kXR_wait:
756  return Wait(rInfo, ioV, ioN, ioL);
757  break;
758  case kXR_waitresp:
759  runDone = false;
760  return WaitResp(rInfo, ioV, ioN, ioL);
761  break;
762  default: if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
763  aOK = respObj->Error(rInfo, kXR_ServerError,
764  "internal logic error");
765  break;
766  };
767 
768 // All done
769 //
770  return (aOK ? 0 : -1);
771 }
772 
773 /******************************************************************************/
774 
775 int XrdXrootdTransit::Send(long long offset, int dlen, int fdnum)
776 {
779  offset, dlen, fdnum);
780 
781 // Effect callback (this is always a final result)
782 //
783  runDone = true;
784  return (respObj->File(sfInfo, dlen) ? 0 : -1);
785 }
786 
787 /******************************************************************************/
788 
789 int XrdXrootdTransit::Send(XrdOucSFVec *sfvec, int sfvnum, int dlen)
790 {
793  sfvec, sfvnum, dlen);
794 
795 // Effect callback (this is always a final result)
796 //
797  runDone = true;
798  return (respObj->File(sfInfo, dlen) ? 0 : -1);
799 }
800 
801 /******************************************************************************/
802 /* Private: W a i t */
803 /******************************************************************************/
804 
805 int XrdXrootdTransit::Wait(XrdXrootd::Bridge::Context &rInfo,
806  const struct iovec *ioV, int ioN, int ioL)
807 {
808  const char *eMsg;
809 
810 // Trace this request if need be
811 //
812  runWait = XRD_GETNUM(ioV[0].iov_base);
813  eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
814 
815 // Check if the protocol wants to handle all waits
816 //
817  if (runWMax <= 0)
818  {int wtime = runWait;
819  runWait = 0;
820  return (respObj->Wait(rInfo, wtime, eMsg) ? 0 : -1);
821  }
822 
823 // Check if we have exceeded the maximum wait time
824 //
825  if (runWTot >= runWMax)
826  {runDone = true;
827  runWait = 0;
828  return (respObj->Error(rInfo, kXR_Cancelled, eMsg) ? 0 : -1);
829  }
830 
831 // Readjust wait time
832 //
833  if (runWait > runWMax) runWait = runWMax;
834 
835 // Check if the protocol wants a wait notification
836 //
837  if (runWCall && !(respObj->Wait(rInfo, runWait, eMsg))) return -1;
838 
839 // The process, redrive & recycle rely on a non-zero or positive
840 // runWait to indicate wait is scheduled, so make sure that is true.
841 //
842  if (runWait <= 0) runWait = 1;
843 
844  TRACEP(REQ, "Bridge delaying request " <<runWait <<" sec (" <<eMsg <<")");
845 
846 // Delay processing (and thus clearing runWait) until the process or redrive
847 // loops can detect that we are waiting
848 //
849  waitPend = true;
850 
851 // All done, schedule the wait
852 //
853  Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
854 
855  return 0;
856 }
857 
858 /******************************************************************************/
859 /* Private: W a i t R e s p */
860 /******************************************************************************/
861 
862 int XrdXrootdTransit::WaitResp(XrdXrootd::Bridge::Context &rInfo,
863  const struct iovec *ioV, int ioN, int ioL)
864 {
865  XrdXrootdTransPend *trP;
866  const char *eMsg;
867  int wTime;
868 
869 // Trace this request if need be
870 //
871  wTime = XRD_GETNUM(ioV[0].iov_base);
872  eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
873  TRACEP(REQ, "Bridge waiting for resp; sid=" <<rInfo.sID.num
874  <<" wt=" <<wTime <<" (" <<eMsg <<")");
875 
876 // We would issue callback to see how we should handle this. However, we can't
877 // predictably handle a waitresp. So that means we will just wait for a resp.
878 //
879 // XrdXrootd::Bridge::Result *newCBP = respObj->WaitResp(rInfo, runWait, eMsg);
880 
881 // Save the current state
882 //
883  trP = new XrdXrootdTransPend(Link, this, &Request);
884  trP->Queue();
885 
886 // Effect a wait
887 //
888  runWait = -1;
889 
890 // Delay processing (and thus clearing runWait) until the process or redrive
891 // loops can detect that we are waiting
892 //
893  waitPend = true;
894 
895  return 0;
896 }
@ kXR_ArgInvalid
Definition: XProtocol.hh:990
@ kXR_Unsupported
Definition: XProtocol.hh:1003
@ kXR_Cancelled
Definition: XProtocol.hh:1007
@ kXR_ServerError
Definition: XProtocol.hh:1002
@ kXR_ArgTooLong
Definition: XProtocol.hh:992
@ kXR_NoMemory
Definition: XProtocol.hh:998
kXR_char streamid[2]
Definition: XProtocol.hh:156
kXR_char username[8]
Definition: XProtocol.hh:396
@ kXR_waitresp
Definition: XProtocol.hh:906
@ kXR_redirect
Definition: XProtocol.hh:904
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_ok
Definition: XProtocol.hh:899
@ kXR_wait
Definition: XProtocol.hh:905
@ kXR_error
Definition: XProtocol.hh:903
struct ClientRequestHdr header
Definition: XProtocol.hh:846
struct ClientLoginRequest login
Definition: XProtocol.hh:857
kXR_unt16 requestid
Definition: XProtocol.hh:157
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_mkdir
Definition: XProtocol.hh:120
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_chmod
Definition: XProtocol.hh:114
@ kXR_dirlist
Definition: XProtocol.hh:116
@ kXR_rm
Definition: XProtocol.hh:126
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_auth
Definition: XProtocol.hh:112
@ kXR_set
Definition: XProtocol.hh:130
@ kXR_rmdir
Definition: XProtocol.hh:127
@ kXR_statx
Definition: XProtocol.hh:134
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_protocol
Definition: XProtocol.hh:118
@ kXR_mv
Definition: XProtocol.hh:121
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_locate
Definition: XProtocol.hh:139
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_prepare
Definition: XProtocol.hh:133
@ kXR_asyncap
Definition: XProtocol.hh:378
@ kXR_ver002
Definition: XProtocol.hh:386
kXR_int32 dlen
Definition: XProtocol.hh:159
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
#define EMSG(x)
Definition: XrdCpConfig.cc:55
void Fatal(const char *op, const char *target)
Definition: XrdCrc32c.cc:58
#define eMsg(x)
const int SYS_LOG_01
Definition: XrdSysError.hh:72
#define TRACE_MEM
Definition: XrdTrace.hh:38
#define TRACE(act, x)
Definition: XrdTrace.hh:63
#define XRD_LOGGEDIN
#define TRACEP(act, x)
XrdSysTrace XrdXrootdTrace
#define XRD_GETNUM(x)
#define KXR_INDEX(x)
void Release(XrdBuffer *bp)
Definition: XrdBuffer.cc:221
XrdBuffer * Obtain(int bsz)
Definition: XrdBuffer.cc:140
int bsize
Definition: XrdBuffer.hh:46
char * buff
Definition: XrdBuffer.hh:45
Definition: XrdJob.hh:43
bool isMapped() const
bool isIPType(IPType ipType) const
void Set(int inQMax, time_t agemax=1800)
Definition: XrdObject.icc:90
void Push(XrdObject< T > *Node)
Definition: XrdObject.hh:101
T * Pop()
Definition: XrdObject.hh:93
void Bump(int &val)
Definition: XrdOucStats.hh:47
static void Sanitize(char *instr, char subc='_')
virtual void Recycle(XrdLink *lp=0, int consec=0, const char *reason=0)=0
virtual int Process(XrdLink *lp)=0
void Schedule(XrdJob *jp)
void Cancel(XrdJob *jp)
const char * pident
Trace identifier (originator)
Definition: XrdSecEntity.hh:82
XrdNetAddrInfo * addrInfo
Entity's connection details.
Definition: XrdSecEntity.hh:80
const char * tident
Trace identifier always preset.
Definition: XrdSecEntity.hh:81
char prot[XrdSecPROTOIDSIZE]
Auth protocol used (e.g. krb5)
Definition: XrdSecEntity.hh:67
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
unsigned int ueid
Unique ID of entity instance.
Definition: XrdSecEntity.hh:79
char * host
Entity's host name dnr dependent.
Definition: XrdSecEntity.hh:70
virtual void Connect(const XrdSecEntity *client=0)
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:133
void Register(const char *Uname, const char *Hname, const char *Pname, unsigned int xSID=0)
void Report(const char *Info)
static XrdXrootdStats * SI
XrdSecEntity * Client
static XrdSysError & eDest
static unsigned int getSID()
XrdXrootdMonitor::User Monitor
int(XrdXrootdProtocol::* Resume)()
static XrdScheduler * Sched
int Process(XrdLink *lp) override
void Recycle(XrdLink *lp, int consec, const char *reason) override
XrdXrootdResponse Response
static XrdBuffManager * BPool
XrdXrootdReqID ReqID
static XrdSfsFileSystem * osFS
void setID(unsigned long long id)
void Set(XrdLink *lp)
union XrdXrootdTransPend::@194 Pend
static XrdXrootdTransPend * Remove(XrdLink *lP, short sid)
XrdXrootdTransit * bridge
static void Clear(XrdXrootdTransit *trP)
bool Run(const char *xreqP, char *xdataP=0, int xdataL=0)
Inject an xrootd request into the protocol stack.
static const char * ReqTable()
Initialize the valid request table.
void Redrive()
Redrive a request after a wait.
int Send(int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle request data response.
void Recycle(XrdLink *lp, int consec, const char *reason)
Handle link shutdown.
static void Init(XrdScheduler *schedP, int qMax, int qTTL)
Perform one-time initialization.
static XrdXrootdTransit * Alloc(XrdXrootd::Bridge::Result *respP, XrdLink *linkP, XrdSecEntity *seceP, const char *nameP, const char *protP)
Get a new transit object.
static int Attn(XrdLink *lP, short *theSID, int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle attention response (i.e. async response)
XrdXrootdTransit()
Constructor & Destructor.
void Proceed()
Resume processing after a waitresp completion.
bool Disc()
Handle dismantlement.
int Process(XrdLink *lp)
Handle link activation (replaces parent activation).
union XrdXrootd::Bridge::Context::@167 sID
associated request stream ID
virtual int File(Bridge::Context &info, int dlen)=0
virtual bool Data(Bridge::Context &info, const struct iovec *iovP, int iovN, int iovL, bool final)=0
virtual bool Error(Bridge::Context &info, int ecode, const char *etext)=0
virtual bool Done(Bridge::Context &info)=0
the result context
virtual bool Redir(Bridge::Context &info, int port, const char *hname)=0
virtual bool Wait(Bridge::Context &info, int wtime, const char *wtext)
virtual void Free(Bridge::Context &info, char *buffP, int buffL)
XrdScheduler * schedP
static const int uIPv4
ucap: Supports read redirects
static const int uPrip