XRootD
XrdLinkXeq.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d L i n k X e q . c c */
4 /* */
5 /* (c) 2018 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <limits.h>
31 #include <poll.h>
32 #include <signal.h>
33 #include <cstdio>
34 #include <cstring>
35 #include <unistd.h>
36 #include <sys/types.h>
37 #include <sys/uio.h>
38 
39 #if defined(__linux__) || defined(__GNU__)
40 #include <netinet/tcp.h>
41 #if !defined(TCP_CORK)
42 #undef HAVE_SENDFILE
43 #endif
44 #endif
45 
46 #ifdef HAVE_SENDFILE
47 
48 #if defined(__solaris__) || defined(__linux__) || defined(__GNU__)
49 #include <sys/sendfile.h>
50 #endif
51 
52 #endif
53 
54 #include "XrdSys/XrdSysAtomics.hh"
55 #include "XrdSys/XrdSysError.hh"
56 #include "XrdSys/XrdSysFD.hh"
57 #include "XrdSys/XrdSysPlatform.hh"
58 
59 #include "Xrd/XrdBuffer.hh"
60 #include "Xrd/XrdLink.hh"
61 #include "Xrd/XrdLinkCtl.hh"
62 #include "Xrd/XrdLinkXeq.hh"
63 #include "Xrd/XrdPoll.hh"
64 #include "Xrd/XrdScheduler.hh"
65 #include "Xrd/XrdSendQ.hh"
66 #include "Xrd/XrdTcpMonPin.hh"
67 
68 #define TRACE_IDENT ID
69 #include "Xrd/XrdTrace.hh"
70 
71 /******************************************************************************/
72 /* G l o b a l s */
73 /******************************************************************************/
74 
75 namespace XrdGlobal
76 {
77 extern XrdSysError Log;
78 extern XrdScheduler Sched;
79 extern XrdTlsContext *tlsCtx;
81 extern int devNull;
82  const int maxIOV = XrdSys::getIovMax();
83 };
84 
85 using namespace XrdGlobal;
86 
87 /******************************************************************************/
88 /* S t a t i c s */
89 /******************************************************************************/
90 
91  const char *XrdLinkXeq::TraceID = "LinkXeq";
92 
93  long long XrdLinkXeq::LinkBytesIn = 0;
94  long long XrdLinkXeq::LinkBytesOut = 0;
95  long long XrdLinkXeq::LinkConTime = 0;
96  long long XrdLinkXeq::LinkCountTot = 0;
97  int XrdLinkXeq::LinkCount = 0;
100  int XrdLinkXeq::LinkStalls = 0;
101  int XrdLinkXeq::LinkSfIntr = 0;
103 
104 /******************************************************************************/
105 /* C o n s t r u c t o r */
106 /******************************************************************************/
107 
108 XrdLinkXeq::XrdLinkXeq() : XrdLink(*this), PollInfo((XrdLink &)*this)
109 {
111 }
112 
114 {
115  memcpy(Uname+sizeof(Uname)-7, "anon.0@", 7);
116  strcpy(Lname, "somewhere");
117  ID = &Uname[sizeof(Uname)-5];
118  Comment = ID;
119  sendQ = 0;
120  stallCnt = stallCntTot = 0;
121  tardyCnt = tardyCntTot = 0;
122  SfIntr = 0;
123  isIdle = 0;
125  LockReads= false;
126  KeepFD = false;
127  Protocol = 0;
128  ProtoAlt = 0;
129  CloseRequestCb = 0;
130 
131  LinkInfo.Reset();
132  PollInfo.Zorch();
133  ResetLink();
134 }
135 
136 /******************************************************************************/
137 /* B a c k l o g */
138 /******************************************************************************/
139 
141 {
143 
144 // Return backlog information
145 //
146  return (sendQ ? sendQ->Backlog() : 0);
147 }
148 
149 /******************************************************************************/
150 /* C l i e n t */
151 /******************************************************************************/
152 
153 int XrdLinkXeq::Client(char *nbuf, int nbsz)
154 {
155  int ulen;
156 
157 // Generate full client name
158 //
159  if (nbsz <= 0) return 0;
160  ulen = (Lname - ID);
161  if ((ulen + HNlen) >= nbsz) ulen = 0;
162  else {strncpy(nbuf, ID, ulen);
163  strcpy(nbuf+ulen, HostName);
164  ulen += HNlen;
165  }
166  return ulen;
167 }
168 
169 /******************************************************************************/
170 /* C l o s e */
171 /******************************************************************************/
172 
173 int XrdLinkXeq::Close(bool defer)
175  int csec, fd, rc = 0;
176 
177 // If a defer close is requested, we can close the descriptor but we must
178 // keep the slot number to prevent a new client getting the same fd number.
179 // Linux is peculiar in that any in-progress operations will remain in that
180 // state even after the FD is closed unless there is some activity either on
181 // the connection or an event occurs that causes an operation restart. We
182 // portably solve this problem by issuing a shutdown() on the socket prior
183 // closing it. On most platforms, this informs readers that the connection is
184 // gone (though not on old (i.e. <= 2.3) versions of Linux, sigh). Also, if
185 // nonblocking mode is enabled, we need to do this in a separate thread as
186 // a shutdown may block for a pretty long time if lots\ of messages are queued.
187 // We will ask the SendQ object to schedule the shutdown for us before it
188 // commits suicide.
189 // Note that we can hold the opMutex while we also get the wrMutex.
190 //
191  if (defer)
192  {if (!sendQ) Shutdown(false);
193  else {TRACEI(DEBUG, "Shutdown FD " <<LinkInfo.FD<<" only via SendQ");
194  LinkInfo.InUse++;
195  LinkInfo.FD = -LinkInfo.FD; // Leave poll version untouched!
196  wrMutex.Lock();
197  sendQ->Terminate(this);
198  sendQ = 0;
199  wrMutex.UnLock();
200  }
201  return 0;
202  }
203 
204 // If we got here then this is not a deferred close so we just need to check
205 // if there is a sendq appendage we need to get rid of.
206 //
207  if (sendQ)
208  {wrMutex.Lock();
209  sendQ->Terminate();
210  sendQ = 0;
211  wrMutex.UnLock();
212  }
213 
214 // Multiple protocols may be bound to this link. If it is in use, defer the
215 // actual close until the use count drops to one.
216 //
217  while(LinkInfo.InUse > 1)
218  {opHelper.UnLock();
219  TRACEI(DEBUG, "Close FD "<<LinkInfo.FD <<" deferred, use count="
220  <<LinkInfo.InUse);
221  Serialize();
222  opHelper.Lock(&LinkInfo.opMutex);
223  }
224  LinkInfo.InUse--;
225  Instance = 0;
226 
227 // Add up the statistic for this link
228 //
229  syncStats(&csec);
230 
231 // Cleanup TLS if it is active
232 //
233  if (isTLS) tlsIO.Shutdown();
234 
235 // Clean this link up
236 //
237  if (Protocol) {Protocol->Recycle(this, csec, LinkInfo.Etext); Protocol = 0;}
238  if (ProtoAlt) {ProtoAlt->Recycle(this, csec, LinkInfo.Etext); ProtoAlt = 0;}
239  if (LinkInfo.Etext) {free(LinkInfo.Etext); LinkInfo.Etext = 0;}
240  LinkInfo.InUse = 0;
241 
242 // At this point we can have no lock conflicts, so if someone is waiting for
243 // us to terminate let them know about it. Note that we will get the condvar
244 // mutex while we hold the opMutex. This is the required order! We will also
245 // zero out the pointer to the condvar while holding the opmutex.
246 //
247  if (LinkInfo.KillcvP)
248  {LinkInfo.KillcvP->Lock();
251  LinkInfo.KillcvP = 0;
252  }
253 
254 // Remove ourselves from the poll table and then from the Link table. We may
255 // not hold on to the opMutex when we acquire the LTMutex. However, the link
256 // table needs to be cleaned up prior to actually closing the socket. So, we
257 // do some fancy footwork to prevent multiple closes of this link.
258 //
259  fd = abs(LinkInfo.FD);
260  if (PollInfo.FD > 0)
262  PollInfo.FD = -1;
263  opHelper.UnLock();
264  XrdLinkCtl::Unhook(fd);
265  } else opHelper.UnLock();
266 
267 // Invoke the TCP monitor if it was loaded.
268 //
269  if (TcpMonPin && fd > 2)
270  {XrdTcpMonPin::LinkInfo lnkInfo;
271  lnkInfo.tident = ID;
272  lnkInfo.fd = fd;
273  lnkInfo.consec = csec;
274  lnkInfo.bytesIn = BytesInTot;
275  lnkInfo.bytesOut = BytesOutTot;
276  TcpMonPin->Monitor(Addr, lnkInfo, sizeof(lnkInfo));
277  }
278 
279 // Close the file descriptor if it isn't being shared. Do it as the last
280 // thing because closes and accepts and not interlocked.
281 //
282  if (fd >= 2) {if (KeepFD) rc = 0;
283  else rc = (close(fd) < 0 ? errno : 0);
284  }
285  if (rc) Log.Emsg("Link", rc, "close", ID);
286  return rc;
287 }
288 
289 /******************************************************************************/
290 /* D o I t */
291 /******************************************************************************/
292 
294 {
295  int rc;
296 
297 // The Process() return code tells us what to do:
298 // < 0 -> Stop getting requests,
299 // -EINPROGRESS leave link disabled but otherwise all is well
300 // -n Error, disable and close the link
301 // = 0 -> OK, get next request, if allowed, o/w enable the link
302 // > 0 -> Slow link, stop getting requests and enable the link
303 //
304  if (Protocol)
305  do {rc = Protocol->Process(this);} while (!rc && Sched.canStick());
306  else {Log.Emsg("Link", "Dispatch on closed link", ID);
307  return;
308  }
309 
310 // Either re-enable the link and cycle back waiting for a new request, leave
311 // disabled, or terminate the connection.
312 //
313  bool doCl = false;
314  if (rc >= 0)
315  {if (PollInfo.Poller && !PollInfo.Poller->Enable(PollInfo)) doCl = true;}
316  else if (rc != -EINPROGRESS) doCl = true;
317 
318  if (doCl)
319  {if (CloseRequestCb)
320  {const bool res = CloseRequestCb(CloseRequestCbArg);
321  if (!res) return;
322  }
323  Close();
324  }
325 }
326 
327 /******************************************************************************/
328 /* g e t P e e r C e r t s */
329 /******************************************************************************/
330 
332 {
333  return (isTLS ? tlsIO.getCerts(true) : 0);
334 }
335 
336 /******************************************************************************/
337 /* P e e k */
338 /******************************************************************************/
339 
340 int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)
341 {
342  XrdSysMutexHelper theMutex;
343  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
344  ssize_t mlen;
345  int retc;
346 
347 // Lock the read mutex if we need to, the helper will unlock it upon exit
348 //
349  if (LockReads) theMutex.Lock(&rdMutex);
350 
351 // Wait until we can actually read something
352 //
353  isIdle = 0;
354  do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
355  if (retc != 1)
356  {if (retc == 0) return 0;
357  return Log.Emsg("Link", -errno, "poll", ID);
358  }
359 
360 // Verify it is safe to read now
361 //
362  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
363  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
364  return -1;
365  }
366 
367 // Do the peek.
368 //
369  do {mlen = recv(LinkInfo.FD, Buff, Blen, MSG_PEEK);}
370  while(mlen < 0 && errno == EINTR);
371 
372 // Return the result
373 //
374  if (mlen >= 0) return int(mlen);
375  Log.Emsg("Link", errno, "peek on", ID);
376  return -1;
377 }
378 
379 /******************************************************************************/
380 /* R e c v */
381 /******************************************************************************/
382 
383 int XrdLinkXeq::Recv(char *Buff, int Blen)
384 {
385  ssize_t rlen;
386 
387 // Note that we will read only as much as is queued. Use Recv() with a
388 // timeout to receive as much data as possible.
389 //
390  if (LockReads) rdMutex.Lock();
391  isIdle = 0;
392  do {rlen = read(LinkInfo.FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
393  if (rlen > 0) AtomicAdd(BytesIn, rlen);
394  if (LockReads) rdMutex.UnLock();
395 
396  if (rlen >= 0) return int(rlen);
397  if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
398  return -1;
399 }
400 
401 /******************************************************************************/
402 
403 int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
404 {
405  XrdSysMutexHelper theMutex;
406  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
407  ssize_t rlen, totlen = 0;
408  int retc;
409 
410 // Lock the read mutex if we need to, the helper will unlock it upon exit
411 //
412  if (LockReads) theMutex.Lock(&rdMutex);
413 
414 // Wait up to timeout milliseconds for data to arrive
415 //
416  isIdle = 0;
417  while(Blen > 0)
418  {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
419  if (retc != 1)
420  {if (retc == 0)
421  {tardyCnt++;
422  if (totlen)
423  {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
424  AtomicAdd(BytesIn, totlen);
425  }
426  return int(totlen);
427  }
428  return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
429  }
430 
431  // Verify it is safe to read now
432  //
433  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
434  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
435  "polling", ID);
436  return -1;
437  }
438 
439  // Read as much data as you can. Note that we will force an error
440  // if we get a zero-length read after poll said it was OK.
441  //
442  do {rlen = recv(LinkInfo.FD, Buff, Blen, 0);}
443  while(rlen < 0 && errno == EINTR);
444  if (rlen <= 0)
445  {if (!rlen) return -ENOMSG;
446  if (LinkInfo.FD > 0) Log.Emsg("Link", -errno, "receive from", ID);
447  return -1;
448  }
449  totlen += rlen; Blen -= rlen; Buff += rlen;
450  }
451 
452  AtomicAdd(BytesIn, totlen);
453  return int(totlen);
454 }
455 
456 /******************************************************************************/
457 
458 int XrdLinkXeq::Recv(const struct iovec *iov, int iocnt, int timeout)
459 {
460  XrdSysMutexHelper theMutex;
461  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
462  int retc, rlen;
463 
464 // Lock the read mutex if we need to, the helper will unlock it upon exit
465 //
466  if (LockReads) theMutex.Lock(&rdMutex);
467 
468 // Wait up to timeout milliseconds for data to arrive
469 //
470  isIdle = 0;
471  do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
472  if (retc != 1)
473  {if (retc == 0)
474  {tardyCnt++;
475  return 0;
476  }
477  return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
478  }
479 
480 // Verify it is safe to read now
481 //
482  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
483  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
484  return -1;
485  }
486 
487 // If the iocnt is within limits then just go ahead and read once.
488 //
489  if (iocnt <= maxIOV)
490  {rlen = RecvIOV(iov, iocnt);
491  if (rlen > 0) {AtomicAdd(BytesIn, rlen);}
492  return rlen;
493  }
494 
495 // We will have to break this up into allowable segments and we need to add up
496 // the bytes in each segment so that we know when to stop reading.
497 //
498  int seglen, segcnt = maxIOV, totlen = 0;
499  do {seglen = 0;
500  for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
501  if ((rlen = RecvIOV(iov, segcnt)) < 0) return rlen;
502  totlen += rlen;
503  if (rlen < seglen) break;
504  iov += segcnt;
505  iocnt -= segcnt;
506  if (iocnt <= maxIOV) segcnt = iocnt;
507  } while(iocnt > 0);
508 
509 // All done
510 //
511  AtomicAdd(BytesIn, totlen);
512  return totlen;
513 }
514 
515 /******************************************************************************/
516 /* R e c v A l l */
517 /******************************************************************************/
518 
519 int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
520 {
521  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
522  ssize_t rlen;
523  int retc;
524 
525 // Check if timeout specified. Notice that the timeout is the max we will
526 // for some data. We will wait forever for all the data. Yeah, it's weird.
527 //
528  if (timeout >= 0)
529  {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
530  if (retc != 1)
531  {if (!retc) return -ETIMEDOUT;
532  Log.Emsg("Link",errno,"poll",ID);
533  return -1;
534  }
535  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
536  {Log.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
537  return -1;
538  }
539  }
540 
541 // Note that we will block until we receive all he bytes.
542 //
543  if (LockReads) rdMutex.Lock();
544  isIdle = 0;
545  do {rlen = recv(LinkInfo.FD, Buff, Blen, MSG_WAITALL);}
546  while(rlen < 0 && errno == EINTR);
547  if (rlen > 0) AtomicAdd(BytesIn, rlen);
548  if (LockReads) rdMutex.UnLock();
549 
550  if (int(rlen) == Blen) return Blen;
551  if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
552  else if (rlen > 0) Log.Emsg("RecvAll", "Premature end from", ID);
553  else if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
554  return -1;
555 }
556 
557 /******************************************************************************/
558 /* Protected: R e c v I O V */
559 /******************************************************************************/
560 
561 int XrdLinkXeq::RecvIOV(const struct iovec *iov, int iocnt)
562 {
563  ssize_t retc = 0;
564 
565 // Read the data in. On some version of Unix (e.g., Linux) a readv() may
566 // end at any time without reading all the bytes when directed to a socket.
567 // We always return the number bytes read (or an error). The caller needs to
568 // restart the read at the appropriate place in the iovec when more data arrives.
569 //
570  do {retc = readv(LinkInfo.FD, iov, iocnt);}
571  while(retc < 0 && errno == EINTR);
572 
573 // Check how we completed
574 //
575  if (retc < 0) Log.Emsg("Link", errno, "receive from", ID);
576  return retc;
577 }
578 
579 /******************************************************************************/
580 /* R e g i s t e r */
581 /******************************************************************************/
582 
583 bool XrdLinkXeq::Register(const char *hName)
584 {
585 
586 // First see if we can register this name with the address object
587 //
588  if (!Addr.Register(hName)) return false;
589 
590 // Make appropriate changes here
591 //
592  if (HostName) free(HostName);
593  HostName = strdup(hName);
594  strlcpy(Lname, hName, sizeof(Lname));
595  return true;
596 }
597 
598 /******************************************************************************/
599 /* S e n d */
600 /******************************************************************************/
601 
602 int XrdLinkXeq::Send(const char *Buff, int Blen)
603 {
604  ssize_t retc = 0, bytesleft = Blen;
605 
606 // Get a lock
607 //
608  wrMutex.Lock();
609  isIdle = 0;
610  AtomicAdd(BytesOut, Blen);
611 
612 // Do non-blocking writes if we are setup to do so.
613 //
614  if (sendQ)
615  {retc = sendQ->Send(Buff, Blen);
616  wrMutex.UnLock();
617  return retc;
618  }
619 
620 // Write the data out
621 //
622  while(bytesleft)
623  {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
624  {if (errno == EINTR) continue;
625  else break;
626  }
627  bytesleft -= retc; Buff += retc;
628  }
629 
630 // All done
631 //
632  wrMutex.UnLock();
633  if (retc >= 0) return Blen;
634  Log.Emsg("Link", errno, "send to", ID);
635  return -1;
636 }
637 
638 /******************************************************************************/
639 
640 int XrdLinkXeq::Send(const struct iovec *iov, int iocnt, int bytes)
641 {
642  int retc;
643 
644 // Get a lock and assume we will be successful (statistically we are)
645 //
646  wrMutex.Lock();
647  isIdle = 0;
648  AtomicAdd(BytesOut, bytes);
649 
650 // Do non-blocking writes if we are setup to do so.
651 //
652  if (sendQ)
653  {retc = sendQ->Send(iov, iocnt, bytes);
654  wrMutex.UnLock();
655  return retc;
656  }
657 
658 // If the iocnt is within limits then just go ahead and write this out
659 //
660  if (iocnt <= maxIOV)
661  {retc = SendIOV(iov, iocnt, bytes);
662  wrMutex.UnLock();
663  return retc;
664  }
665 
666 // We will have to break this up into allowable segments
667 //
668  int seglen, segcnt = maxIOV, iolen = 0;
669  do {seglen = 0;
670  for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
671  if ((retc = SendIOV(iov, segcnt, seglen)) < 0)
672  {wrMutex.UnLock();
673  return retc;
674  }
675  iolen += retc;
676  iov += segcnt;
677  iocnt -= segcnt;
678  if (iocnt <= maxIOV) segcnt = iocnt;
679  } while(iocnt > 0);
680 
681 // All done
682 //
683  wrMutex.UnLock();
684  return iolen;
685 }
686 
687 /******************************************************************************/
688 
689 int XrdLinkXeq::Send(const sfVec *sfP, int sfN)
690 {
691 #if !defined(HAVE_SENDFILE)
692 
693  return -1;
694 
695 #elif defined(__solaris__)
696 
697  sendfilevec_t vecSF[XrdOucSFVec::sfMax], *vecSFP = vecSF;
698  size_t xframt, totamt, bytes = 0;
699  ssize_t retc;
700  int i = 0;
701 
702 // Construct the sendfilev() vector
703 //
704  for (i = 0; i < sfN; sfP++, i++)
705  {if (sfP->fdnum < 0)
706  {vecSF[i].sfv_fd = SFV_FD_SELF;
707  vecSF[i].sfv_off = (off_t)sfP->buffer;
708  } else {
709  vecSF[i].sfv_fd = sfP->fdnum;
710  vecSF[i].sfv_off = sfP->offset;
711  }
712  vecSF[i].sfv_flag = 0;
713  vecSF[i].sfv_len = sfP->sendsz;
714  bytes += sfP->sendsz;
715  }
716  totamt = bytes;
717 
718 // Lock the link, issue sendfilev(), and unlock the link. The documentation
719 // is very spotty and inconsistent. We can only retry this operation under
720 // very limited conditions.
721 //
722  wrMutex.Lock();
723  isIdle = 0;
724 do{retc = sendfilev(LinkInfo.FD, vecSFP, sfN, &xframt);
725 
726 // Check if all went well and return if so (usual case)
727 //
728  if (xframt == bytes)
729  {AtomicAdd(BytesOut, bytes);
730  wrMutex.UnLock();
731  return totamt;
732  }
733 
734 // The only one we will recover from is EINTR. We cannot legally get EAGAIN.
735 //
736  if (retc < 0 && errno != EINTR) break;
737 
738 // Try to resume the transfer
739 //
740  if (xframt > 0)
741  {AtomicAdd(BytesOut, xframt); bytes -= xframt; SfIntr++;
742  while(xframt > 0 && sfN)
743  {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
744  {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
745  xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
746  }
747  }
748  } while(sfN > 0);
749 
750 // See if we can recover without destroying the connection
751 //
752  retc = (retc < 0 ? errno : ECANCELED);
753  wrMutex.UnLock();
754  Log.Emsg("Link", retc, "send file to", ID);
755  return -1;
756 
757 #elif defined(__linux__) || defined(__GNU__)
758 
759  static const int setON = 1, setOFF = 0;
760  ssize_t retc = 0, bytesleft;
761  off_t myOffset;
762  int i, xfrbytes = 0, uncork = 1, xIntr = 0;
763 
764 // lock the link
765 //
766  wrMutex.Lock();
767  isIdle = 0;
768 
769 // In linux we need to cork the socket. On permanent errors we do not uncork
770 // the socket because it will be closed in short order.
771 //
772  if (setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
773  {Log.Emsg("Link", errno, "cork socket for", ID);
774  uncork = 0; sfOK = 0;
775  }
776 
777 // Send the header first
778 //
779  for (i = 0; i < sfN; sfP++, i++)
780  {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
781  else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
782  while(bytesleft
783  && (retc=sendfile(LinkInfo.FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
784  {bytesleft -= retc; xIntr++;}
785  }
786  if (retc < 0 && errno == EINTR) continue;
787  if (retc <= 0) break;
788  xfrbytes += sfP->sendsz;
789  }
790 
791 // Diagnose any sendfile errors
792 //
793  if (retc <= 0)
794  {if (retc == 0) errno = ECANCELED;
795  wrMutex.UnLock();
796  Log.Emsg("Link", errno, "send file to", ID);
797  return -1;
798  }
799 
800 // Now uncork the socket
801 //
802  if (uncork
803  && setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
804  Log.Emsg("Link", errno, "uncork socket for", ID);
805 
806 // All done
807 //
808  if (xIntr > sfN) SfIntr += (xIntr - sfN);
809  AtomicAdd(BytesOut, xfrbytes);
810  wrMutex.UnLock();
811  return xfrbytes;
812 
813 #else
814 
815  return -1;
816 
817 #endif
818 }
819 
820 /******************************************************************************/
821 /* Protected: s e n d D a t a */
822 /******************************************************************************/
823 
824 int XrdLinkXeq::sendData(const char *Buff, int Blen)
825 {
826  ssize_t retc = 0, bytesleft = Blen;
827 
828 // Write the data out
829 //
830  while(bytesleft)
831  {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
832  {if (errno == EINTR) continue;
833  else break;
834  }
835  bytesleft -= retc; Buff += retc;
836  }
837 
838 // All done
839 //
840  return retc;
841 }
842 
843 /******************************************************************************/
844 /* Protected: S e n d I O V */
845 /******************************************************************************/
846 
847 int XrdLinkXeq::SendIOV(const struct iovec *iov, int iocnt, int bytes)
848 {
849  ssize_t bytesleft, n, retc = 0;
850  const char *Buff;
851 
852 // Write the data out. On some version of Unix (e.g., Linux) a writev() may
853 // end at any time without writing all the bytes when directed to a socket.
854 // So, we attempt to resume the writev() using a combination of write() and
855 // a writev() continuation. This approach slowly converts a writev() to a
856 // series of writes if need be. We must do this inline because we must hold
857 // the lock until all the bytes are written or an error occurs.
858 //
859  bytesleft = static_cast<ssize_t>(bytes);
860  while(bytesleft)
861  {do {retc = writev(LinkInfo.FD, iov, iocnt);}
862  while(retc < 0 && errno == EINTR);
863  if (retc >= bytesleft || retc < 0) break;
864  bytesleft -= retc;
865  while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
866  {retc -= n; iov++; iocnt--;}
867  Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
868  while(n) {if ((retc = write(LinkInfo.FD, Buff, n)) < 0)
869  {if (errno == EINTR) continue;
870  else break;
871  }
872  n -= retc; Buff += retc; bytesleft -= retc;
873  }
874  if (retc < 0 || iocnt < 1) break;
875  }
876 
877 // All done
878 //
879  if (retc >= 0) return bytes;
880  Log.Emsg("Link", errno, "send to", ID);
881  return -1;
882 }
883 
884 /******************************************************************************/
885 /* s e t I D */
886 /******************************************************************************/
887 
888 void XrdLinkXeq::setID(const char *userid, int procid)
889 {
890  char buff[sizeof(Uname)], *bp, *sp;
891  int ulen;
892 
893  snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, PollInfo.FD);
894  ulen = strlen(buff);
895  sp = buff + ulen - 1;
896  bp = &Uname[sizeof(Uname)-1];
897  if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
898  *bp = '@'; bp--;
899  while(ulen--) {*bp = *sp; bp--; sp--;}
900  ID = bp+1;
901  Comment = (const char *)ID;
902 
903 // Update the ID in the TLS socket if enabled
904 //
905  if (isTLS) tlsIO.SetTraceID(ID);
906 }
907 
908 /******************************************************************************/
909 /* s e t N B */
910 /******************************************************************************/
911 
913 {
914 // We don't support non-blocking output except for Linux at the moment
915 //
916 #if !defined(__linux__)
917  return false;
918 #else
919 // Trace this request
920 //
921  TRACEI(DEBUG,"enabling non-blocking output");
922 
923 // If we don't already have a sendQ object get one. This is a one-time call
924 // so to optimize checking if this object exists we also get the opMutex.'
925 //
927  if (!sendQ)
928  {wrMutex.Lock();
929  sendQ = new XrdSendQ(*this, wrMutex);
930  wrMutex.UnLock();
931  }
933  return true;
934 #endif
935 }
936 
937 /******************************************************************************/
938 /* s e t P r o t o c o l */
939 /******************************************************************************/
940 
942 {
943 
944 // Set new protocol.
945 //
947  XrdProtocol *op = Protocol;
948  if (push) ProtoAlt = Protocol;
949  Protocol = pp;
951  return op;
952 }
953 
954 /******************************************************************************/
955 /* s e t P r o t N a m e */
956 /******************************************************************************/
957 
958 void XrdLinkXeq::setProtName(const char *name)
959 {
960 
961 // Set the protocol name.
962 //
964  Addr.SetDialect(name);
966 }
967 
968 /******************************************************************************/
969 /* s e t T L S */
970 /******************************************************************************/
971 
972 bool XrdLinkXeq::setTLS(bool enable, XrdTlsContext *ctx)
973 { //???
974 // static const XrdTlsConnection::RW_Mode rwMode=XrdTlsConnection::TLS_RNB_WBL;
977  const char *eNote;
978  XrdTls::RC rc;
979 
980 // If we are already in a compatible mode, we are done
981 //
982 
983  if (isTLS == enable) return true;
984 
985 // If this is a shutdown, then do it now.
986 //
987  if (!enable)
988  {tlsIO.Shutdown();
989  isTLS = enable;
990  Addr.SetTLS(enable);
991  return true;
992  }
993 // We want to initialize TLS, do so now.
994 //
995  if (!ctx) ctx = tlsCtx;
996  eNote = tlsIO.Init(*ctx, PollInfo.FD, rwMode, hsMode, false, false, ID);
997 
998 // Check for errors
999 //
1000  if (eNote)
1001  {char buff[1024];
1002  snprintf(buff, sizeof(buff), "Unable to enable tls for %s;", ID);
1003  Log.Emsg("LinkXeq", buff, eNote);
1004  return false;
1005  }
1006 
1007 // Now we need to accept this TLS connection
1008 //
1009  std::string eMsg;
1010  rc = tlsIO.Accept(&eMsg);
1011 
1012 // Diagnose return state
1013 //
1014  if (rc != XrdTls::TLS_AOK) Log.Emsg("LinkXeq", eMsg.c_str());
1015  else {isTLS = enable;
1016  Addr.SetTLS(enable);
1017  Log.Emsg("LinkXeq", ID, "connection upgraded to", verTLS());
1018  }
1019  return rc == XrdTls::TLS_AOK;
1020 }
1021 
1022 /******************************************************************************/
1023 /* S F E r r o r */
1024 /******************************************************************************/
1025 
1027 {
1028  Log.Emsg("TLS", rc, "send file to", ID);
1029  return -1;
1030 }
1031 
1032 /******************************************************************************/
1033 /* S h u t d o w n */
1034 /******************************************************************************/
1035 
1036 void XrdLinkXeq::Shutdown(bool getLock)
1037 {
1038  int temp;
1039 
1040 // Trace the entry
1041 //
1042  TRACEI(DEBUG, (getLock ? "Async" : "Sync") <<" link shutdown in progress");
1043 
1044 // Get the lock if we need too (external entry via another thread)
1045 //
1046  if (getLock) LinkInfo.opMutex.Lock();
1047 
1048 // If there is something to do, do it now
1049 //
1050  temp = Instance; Instance = 0;
1051  if (!KeepFD)
1052  {shutdown(PollInfo.FD, SHUT_RDWR);
1053  if (dup2(devNull, PollInfo.FD) < 0)
1054  {Instance = temp;
1055  Log.Emsg("Link", errno, "shutdown FD for", ID);
1056  }
1057  }
1058 
1059 // All done
1060 //
1061  if (getLock) LinkInfo.opMutex.UnLock();
1062 }
1063 
1064 /******************************************************************************/
1065 /* S t a t s */
1066 /******************************************************************************/
1067 
1068 int XrdLinkXeq::Stats(char *buff, int blen, bool do_sync)
1069 {
1070  static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
1071  "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
1072  "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
1073  "<sfps>%d</sfps></stats>";
1074  int i;
1075 
1076 // Check if actual length wanted
1077 //
1078  if (!buff) return sizeof(statfmt)+17*6;
1079 
1080 // We must synchronize the statistical counters
1081 //
1082  if (do_sync) XrdLinkCtl::SyncAll();
1083 
1084 // Obtain lock on the stats area and format it
1085 //
1087  i = snprintf(buff, blen, statfmt, AtomicGet(LinkCount),
1097  return i;
1098 }
1099 
1100 /******************************************************************************/
1101 /* s y n c S t a t s */
1102 /******************************************************************************/
1103 
1104 void XrdLinkXeq::syncStats(int *ctime)
1105 {
1106  long long tmpLL;
1107  int tmpI4;
1108 
1109 // If this is dynamic, get the opMutex lock
1110 //
1111  if (!ctime) LinkInfo.opMutex.Lock();
1112 
1113 // Either the caller has the opMutex or this is called out of close. In either
1114 // case, we need to get the read and write mutexes; each followed by the stats
1115 // mutex. This order is important because we should not hold the stats mutex
1116 // for very long and the r/w mutexes may take a long time to acquire. If we
1117 // must maintain the link count we need to actually acquire the stats mutex as
1118 // we will be doing compound operations. Atomics are still used to keep other
1119 // threads from seeing partial results.
1120 //
1121  AtomicBeg(rdMutex);
1122 
1123  if (ctime)
1124  {*ctime = time(0) - LinkInfo.conTime;
1125  AtomicAdd(LinkConTime, *ctime);
1126  statsMutex.Lock();
1127  if (LinkCount > 0) AtomicDec(LinkCount);
1128  statsMutex.UnLock();
1129  }
1130 
1132 
1133  tmpLL = AtomicFAZ(BytesIn);
1134  AtomicAdd(LinkBytesIn, tmpLL); AtomicAdd(BytesInTot, tmpLL);
1135  tmpI4 = AtomicFAZ(tardyCnt);
1136  AtomicAdd(LinkTimeOuts, tmpI4); AtomicAdd(tardyCntTot, tmpI4);
1137  tmpI4 = AtomicFAZ(stallCnt);
1138  AtomicAdd(LinkStalls, tmpI4); AtomicAdd(stallCntTot, tmpI4);
1140 
1142  tmpLL = AtomicFAZ(BytesOut);
1143  AtomicAdd(LinkBytesOut, tmpLL); AtomicAdd(BytesOutTot, tmpLL);
1144  tmpI4 = AtomicFAZ(SfIntr);
1145  AtomicAdd(LinkSfIntr, tmpI4);
1147 
1148 // Make sure the protocol updates it's statistics as well
1149 //
1150  if (Protocol) Protocol->Stats(0, 0, 1);
1151 
1152 // All done
1153 //
1154  if (!ctime) LinkInfo.opMutex.UnLock();
1155 }
1156 
1157 /******************************************************************************/
1158 /* Protected: T L S _ E r r o r */
1159 /******************************************************************************/
1160 
1161 int XrdLinkXeq::TLS_Error(const char *act, XrdTls::RC rc)
1162 {
1163  std::string reason = XrdTls::RC2Text(rc);
1164  char msg[512];
1165 
1166  snprintf(msg, sizeof(msg), "Unable to %s %s;", act, ID);
1167  Log.Emsg("TLS", msg, reason.c_str());
1168  return -1;
1169 }
1170 
1171 /******************************************************************************/
1172 /* T L S _ P e e k */
1173 /******************************************************************************/
1174 
1175 int XrdLinkXeq::TLS_Peek(char *Buff, int Blen, int timeout)
1176 {
1177  XrdSysMutexHelper theMutex;
1178  XrdTls::RC retc;
1179  int rc, rlen;
1180 
1181 // Lock the read mutex if we need to, the helper will unlock it upon exit
1182 //
1183  if (LockReads) theMutex.Lock(&rdMutex);
1184 
1185 // Wait until we can actually read something
1186 //
1187  isIdle = 0;
1188  if (timeout)
1189  {rc = Wait4Data(timeout);
1190  if (rc < 1) return rc;
1191  }
1192 
1193 // Do the peek and if sucessful, the number of bytes available.
1194 //
1195  retc = tlsIO.Peek(Buff, Blen, rlen);
1196  if (retc == XrdTls::TLS_AOK) return rlen;
1197 
1198 // Dianose the TLS error and return failure
1199 //
1200  return TLS_Error("peek on", retc);
1201 }
1202 
1203 /******************************************************************************/
1204 /* T L S _ R e c v */
1205 /******************************************************************************/
1206 
1207 int XrdLinkXeq::TLS_Recv(char *Buff, int Blen)
1208 {
1209  XrdSysMutexHelper theMutex;
1210  XrdTls::RC retc;
1211  int rlen;
1212 
1213 // Lock the read mutex if we need to, the helper will unlock it upon exit
1214 //
1215  if (LockReads) theMutex.Lock(&rdMutex);
1216 
1217 // Note that we will read only as much as is queued. Use Recv() with a
1218 // timeout to receive as much data as possible.
1219 //
1220  isIdle = 0;
1221  retc = tlsIO.Read(Buff, Blen, rlen);
1222  if (retc != XrdTls::TLS_AOK) return TLS_Error("receive from", retc);
1223  if (rlen > 0) AtomicAdd(BytesIn, rlen);
1224  return rlen;
1225 }
1226 
1227 /******************************************************************************/
1228 
1229 int XrdLinkXeq::TLS_Recv(char *Buff, int Blen, int timeout, bool havelock)
1230 {
1231  XrdSysMutexHelper theMutex;
1232  XrdTls::RC retc;
1233  int pend, rlen, totlen = 0;
1234 
1235 // Lock the read mutex if we need to, the helper will unlock it upon exit
1236 //
1237  if (LockReads && !havelock) theMutex.Lock(&rdMutex);
1238 
1239 // Wait up to timeout milliseconds for data to arrive
1240 //
1241  isIdle = 0;
1242  while(Blen > 0)
1243  {pend = tlsIO.Pending(true);
1244  if (!pend) pend = Wait4Data(timeout);
1245  if (pend < 1)
1246  {if (pend < 0) return -1;
1247  tardyCnt++;
1248  if (totlen)
1249  {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
1250  AtomicAdd(BytesIn, totlen);
1251  }
1252  return totlen;
1253  }
1254 
1255  // Read as much data as you can. Note that we will force an error
1256  // if we get a zero-length read after poll said it was OK. However,
1257  // if we never read anything, then we simply return -ENOMSG to avoid
1258  // generating a "read link error" as clearly there was a hangup.
1259  //
1260  retc = tlsIO.Read(Buff, Blen, rlen);
1261  if (retc != XrdTls::TLS_AOK)
1262  {if (!totlen) return -ENOMSG;
1263  AtomicAdd(BytesIn, totlen);
1264  return TLS_Error("receive from", retc);
1265  }
1266  if (rlen <= 0) break;
1267  totlen += rlen; Blen -= rlen; Buff += rlen;
1268  }
1269 
1270  AtomicAdd(BytesIn, totlen);
1271  return totlen;
1272 }
1273 
1274 /******************************************************************************/
1275 
1276 int XrdLinkXeq::TLS_Recv(const struct iovec *iov, int iocnt, int timeout)
1277 {
1278  XrdSysMutexHelper theMutex;
1279  char *Buff;
1280  int Blen, rlen, totlen = 0;
1281 
1282 // Lock the read mutex if we need to, the helper will unlock it upon exit
1283 //
1284  if (LockReads) theMutex.Lock(&rdMutex);
1285 
1286 // Individually process each element until we can't read any more
1287 //
1288  isIdle = 0;
1289  for (int i = 0; i < iocnt; i++)
1290  {Buff = (char *)iov[i].iov_base;
1291  Blen = iov[i].iov_len;
1292  rlen = TLS_Recv(Buff, Blen, timeout, true);
1293  if (rlen <= 0) break;
1294  totlen += rlen;
1295  if (rlen < Blen) break;
1296  }
1297 
1298  if (totlen) {AtomicAdd(BytesIn, totlen);}
1299  return totlen;
1300 }
1301 
1302 /******************************************************************************/
1303 /* T L S _ R e c v A l l */
1304 /******************************************************************************/
1305 
1306 int XrdLinkXeq::TLS_RecvAll(char *Buff, int Blen, int timeout)
1307 {
1308  int retc;
1309 
1310 // Check if timeout specified. Notice that the timeout is the max we will
1311 // wait for some data. We will wait forever for all the data. Yeah, it's weird.
1312 //
1313  if (timeout >= 0)
1314  {retc = tlsIO.Pending(true);
1315  if (!retc) retc = Wait4Data(timeout);
1316  if (retc < 1) return (retc ? -1 : -ETIMEDOUT);
1317  }
1318 
1319 // Note that we will block until we receive all the bytes.
1320 //
1321  return TLS_Recv(Buff, Blen, -1);
1322 }
1323 
1324 /******************************************************************************/
1325 /* T L S _ S e n d */
1326 /******************************************************************************/
1327 
1328 int XrdLinkXeq::TLS_Send(const char *Buff, int Blen)
1329 {
1331  ssize_t bytesleft = Blen;
1332  XrdTls::RC retc;
1333  int byteswritten;
1334 
1335 // Prepare to send
1336 //
1337  isIdle = 0;
1338  AtomicAdd(BytesOut, Blen);
1339 
1340 // Do non-blocking writes if we are setup to do so.
1341 //
1342  if (sendQ) return sendQ->Send(Buff, Blen);
1343 
1344 // Write the data out
1345 //
1346  while(bytesleft)
1347  {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1348  if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1349  bytesleft -= byteswritten; Buff += byteswritten;
1350  }
1351 
1352 // All done
1353 //
1354  return Blen;
1355 }
1356 
1357 /******************************************************************************/
1358 
1359 int XrdLinkXeq::TLS_Send(const struct iovec *iov, int iocnt, int bytes)
1360 {
1362  XrdTls::RC retc;
1363  int byteswritten;
1364 
1365 // Get a lock and assume we will be successful (statistically we are). Note
1366 // that the calling interface gauranteed bytes are not zero.
1367 //
1368  isIdle = 0;
1369  AtomicAdd(BytesOut, bytes);
1370 
1371 // Do non-blocking writes if we are setup to do so.
1372 //
1373  if (sendQ) return sendQ->Send(iov, iocnt, bytes);
1374 
1375 // Write the data out.
1376 //
1377  for (int i = 0; i < iocnt; i++)
1378  {ssize_t bytesleft = iov[i].iov_len;
1379  char *Buff = (char *)iov[i].iov_base;
1380  while(bytesleft)
1381  {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1382  if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1383  bytesleft -= byteswritten; Buff += byteswritten;
1384  }
1385  }
1386 
1387 // All done
1388 //
1389  return bytes;
1390 }
1391 
1392 /******************************************************************************/
1393 
1394 int XrdLinkXeq::TLS_Send(const sfVec *sfP, int sfN)
1395 {
1397  int bytes, buffsz, fileFD, retc;
1398  off_t offset;
1399  ssize_t totamt = 0;
1400  char myBuff[65536];
1401 
1402 // Convert the sendfile to a regular send. The conversion is not particularly
1403 // fast and caller are advised to avoid using sendfile on TLS connections.
1404 //
1405  isIdle = 0;
1406  for (int i = 0; i < sfN; sfP++, i++)
1407  {if (!(bytes = sfP->sendsz)) continue;
1408  totamt += bytes;
1409  if (sfP->fdnum < 0)
1410  {if (!TLS_Write(sfP->buffer, bytes)) return -1;
1411  continue;
1412  }
1413  offset = sfP->offset;
1414  fileFD = sfP->fdnum;
1415  buffsz = (bytes < (int)sizeof(myBuff) ? bytes : sizeof(myBuff));
1416  do {do {retc = pread(fileFD, myBuff, buffsz, offset);}
1417  while(retc < 0 && errno == EINTR);
1418  if (retc < 0) return SFError(errno);
1419  if (!retc) break;
1420  if (!TLS_Write(myBuff, buffsz)) return -1;
1421  offset += buffsz; bytes -= buffsz; totamt += retc;
1422  } while(bytes > 0);
1423  }
1424 
1425 // We are done
1426 //
1427  AtomicAdd(BytesOut, totamt);
1428  return totamt;
1429 }
1430 
1431 /******************************************************************************/
1432 /* Protected: T L S _ W r i t e */
1433 /******************************************************************************/
1434 
1435 bool XrdLinkXeq::TLS_Write(const char *Buff, int Blen)
1436 {
1437  XrdTls::RC retc;
1438  int byteswritten;
1439 
1440 // Write the data out
1441 //
1442  while(Blen)
1443  {retc = tlsIO.Write(Buff, Blen, byteswritten);
1444  if (retc != XrdTls::TLS_AOK)
1445  {TLS_Error("write to", retc);
1446  return false;
1447  }
1448  Blen -= byteswritten; Buff += byteswritten;
1449  }
1450 
1451 // All done
1452 //
1453  return true;
1454 }
1455 
1456 /******************************************************************************/
1457 /* v e r T L S */
1458 /******************************************************************************/
1459 
1460 const char *XrdLinkXeq::verTLS()
1461 {
1462  return tlsIO.Version();
1463 }
1464 
1465 /******************************************************************************/
1466 /* R e g i s t e r C l o s e R e q u e s t C b */
1467 /******************************************************************************/
1468 
1470  void* cbarg)
1471 {
1472  if (pp != Protocol) return false;
1473 
1474  CloseRequestCb = cb;
1475  CloseRequestCbArg = cbarg;
1476  return true;
1477 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
ssize_t readv(int fildes, const struct iovec *iov, int iovcnt)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t writev(int fildes, const struct iovec *iov, int iovcnt)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:48
#define pread(a, b, c, d)
Definition: XrdPosix.hh:80
#define eMsg(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACEI(act, x)
Definition: XrdTrace.hh:66
const char * Comment
Definition: XrdJob.hh:47
static void SyncAll()
Synchronize statustics for ll links.
Definition: XrdLinkCtl.cc:374
static void Unhook(int fd)
Unhook a link from the active table of links.
Definition: XrdLinkCtl.cc:392
time_t conTime
Definition: XrdLinkInfo.hh:44
void Reset()
Definition: XrdLinkInfo.hh:52
char * Etext
Definition: XrdLinkInfo.hh:45
XrdSysRecMutex opMutex
Definition: XrdLinkInfo.hh:46
XrdSysCondVar * KillcvP
Definition: XrdLinkInfo.hh:42
static const char * TraceID
Definition: XrdLinkXeq.hh:160
bool(* CloseRequestCb)(void *)
Definition: XrdLinkXeq.hh:188
int TLS_Send(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1328
long long BytesOut
Definition: XrdLinkXeq.hh:175
int TLS_Error(const char *act, XrdTls::RC rc)
Definition: XrdLinkXeq.cc:1161
int TLS_Peek(char *Buff, int Blen, int timeout)
Definition: XrdLinkXeq.cc:1175
int stallCntTot
Definition: XrdLinkXeq.hh:178
int Client(char *buff, int blen)
Definition: XrdLinkXeq.cc:153
char Uname[24]
Definition: XrdLinkXeq.hh:205
XrdTlsPeerCerts * getPeerCerts()
Definition: XrdLinkXeq.cc:331
static int LinkCountMax
Definition: XrdLinkXeq.hh:169
XrdLinkInfo LinkInfo
Definition: XrdLinkXeq.hh:147
XrdProtocol * ProtoAlt
Definition: XrdLinkXeq.hh:187
int Close(bool defer=false)
Definition: XrdLinkXeq.cc:173
XrdNetAddr Addr
Definition: XrdLinkXeq.hh:197
int TLS_Recv(char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1207
int sendData(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:824
long long BytesInTot
Definition: XrdLinkXeq.hh:174
bool TLS_Write(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1435
int SendIOV(const struct iovec *iov, int iocnt, int bytes)
Definition: XrdLinkXeq.cc:847
XrdProtocol * setProtocol(XrdProtocol *pp, bool push)
Definition: XrdLinkXeq.cc:941
static long long LinkCountTot
Definition: XrdLinkXeq.hh:167
long long BytesOutTot
Definition: XrdLinkXeq.hh:176
void Shutdown(bool getLock)
Definition: XrdLinkXeq.cc:1036
int Peek(char *buff, int blen, int timeout=-1)
Definition: XrdLinkXeq.cc:340
static int LinkCount
Definition: XrdLinkXeq.hh:168
void Reset()
Definition: XrdLinkXeq.cc:113
int Backlog()
Definition: XrdLinkXeq.cc:140
XrdSysMutex wrMutex
Definition: XrdLinkXeq.hh:199
static int Stats(char *buff, int blen, bool do_sync=false)
Definition: XrdLinkXeq.cc:1068
XrdSendQ * sendQ
Definition: XrdLinkXeq.hh:200
XrdPollInfo PollInfo
Definition: XrdLinkXeq.hh:148
void setID(const char *userid, int procid)
Definition: XrdLinkXeq.cc:888
bool LockReads
Definition: XrdLinkXeq.hh:202
int Recv(char *buff, int blen)
Definition: XrdLinkXeq.cc:383
static long long LinkBytesIn
Definition: XrdLinkXeq.hh:164
void * CloseRequestCbArg
Definition: XrdLinkXeq.hh:189
int TLS_RecvAll(char *Buff, int Blen, int timeout)
Definition: XrdLinkXeq.cc:1306
int SFError(int rc)
Definition: XrdLinkXeq.cc:1026
long long BytesIn
Definition: XrdLinkXeq.hh:173
int tardyCntTot
Definition: XrdLinkXeq.hh:180
int Send(const char *buff, int blen)
Definition: XrdLinkXeq.cc:602
XrdSysMutex rdMutex
Definition: XrdLinkXeq.hh:198
const char * verTLS()
Definition: XrdLinkXeq.cc:1460
bool setNB()
Definition: XrdLinkXeq.cc:912
int RecvIOV(const struct iovec *iov, int iocnt)
Definition: XrdLinkXeq.cc:561
char Lname[256]
Definition: XrdLinkXeq.hh:206
static long long LinkConTime
Definition: XrdLinkXeq.hh:166
static int LinkSfIntr
Definition: XrdLinkXeq.hh:172
XrdTlsSocket tlsIO
Definition: XrdLinkXeq.hh:193
void DoIt()
Definition: XrdLinkXeq.cc:293
int RecvAll(char *buff, int blen, int timeout=-1)
Definition: XrdLinkXeq.cc:519
XrdProtocol * Protocol
Definition: XrdLinkXeq.hh:186
bool Register(const char *hName)
Definition: XrdLinkXeq.cc:583
static XrdSysMutex statsMutex
Definition: XrdLinkXeq.hh:182
void setProtName(const char *name)
Definition: XrdLinkXeq.cc:958
static int LinkStalls
Definition: XrdLinkXeq.hh:171
static long long LinkBytesOut
Definition: XrdLinkXeq.hh:165
void syncStats(int *ctime=0)
Definition: XrdLinkXeq.cc:1104
bool setTLS(bool enable, XrdTlsContext *ctx=0)
Definition: XrdLinkXeq.cc:972
static int LinkTimeOuts
Definition: XrdLinkXeq.hh:170
bool RegisterCloseRequestCb(XrdProtocol *pp, bool(*cb)(void *), void *cbarg)
Definition: XrdLinkXeq.cc:1469
void SetDialect(const char *dP)
Definition: XrdNetAddr.hh:205
bool Register(const char *hName)
Definition: XrdNetAddr.cc:180
void SetTLS(bool val)
Definition: XrdNetAddr.cc:590
void Zorch()
Definition: XrdPollInfo.hh:49
XrdPoll * Poller
Definition: XrdPollInfo.hh:43
virtual int Enable(XrdPollInfo &pInfo)=0
static char * Poll2Text(short events)
Definition: XrdPoll.cc:272
static void Detach(XrdPollInfo &pInfo)
Definition: XrdPoll.cc:177
virtual void Recycle(XrdLink *lp=0, int consec=0, const char *reason=0)=0
virtual int Stats(char *buff, int blen, int do_sync=0)=0
virtual int Process(XrdLink *lp)=0
void Terminate(XrdLink *lP=0)
Definition: XrdSendQ.cc:396
int Send(const char *buff, int blen)
Definition: XrdSendQ.cc:230
unsigned int Backlog()
Definition: XrdSendQ.hh:46
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Lock(XrdSysMutex *Mutex)
int fd
Socket file descriptor.
Definition: XrdTcpMonPin.hh:61
long long bytesOut
Bytes written to the socket.
Definition: XrdTcpMonPin.hh:64
int consec
Seconds connected.
Definition: XrdTcpMonPin.hh:62
virtual void Monitor(XrdNetAddrInfo &netInfo, LinkInfo &lnkInfo, int liLen)=0
long long bytesIn
Bytes read from the socket.
Definition: XrdTcpMonPin.hh:63
const char * tident
Pointer to the client's trace identifier.
Definition: XrdTcpMonPin.hh:60
@ TLS_HS_BLOCK
Always block during handshake.
Definition: XrdTlsSocket.hh:53
XrdTls::RC Accept(std::string *eMsg=0)
void Shutdown(SDType=sdImmed)
@ TLS_RBL_WBL
blocking read blocking write
Definition: XrdTlsSocket.hh:48
XrdTls::RC Write(const char *buffer, size_t size, int &bytesOut)
const char * Version()
XrdTls::RC Read(char *buffer, size_t size, int &bytesRead)
Read from the TLS connection. If necessary, a handshake will be done.
const char * Init(XrdTlsContext &ctx, int sfd, RW_Mode rwm, HS_Mode hsm, bool isClient, bool serial=true, const char *tid="")
void SetTraceID(const char *tid)
int Pending(bool any=true)
XrdTls::RC Peek(char *buffer, size_t size, int &bytesPeek)
XrdTlsPeerCerts * getCerts(bool ver=true)
static std::string RC2Text(XrdTls::RC rc, bool dbg=false)
Definition: XrdTls.cc:127
@ TLS_AOK
All went well, will always be zero.
Definition: XrdTls.hh:40
XrdTlsContext * tlsCtx
Definition: XrdGlobals.cc:52
XrdTcpMonPin * TcpMonPin
Definition: XrdLinkXeq.cc:80
const int maxIOV
Definition: XrdLinkXeq.cc:82
XrdSysError Log
Definition: XrdConfig.cc:113
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54
int devNull
Definition: XrdGlobals.cc:55
int getIovMax()
int fdnum
File descriptor for data.
Definition: XrdOucSFVec.hh:47
int sendsz
Length of data at offset.
Definition: XrdOucSFVec.hh:46