XRootD
Loading...
Searching...
No Matches
XrdLinkXeq.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d L i n k X e q . c c */
4/* */
5/* (c) 2018 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <limits.h>
31#include <poll.h>
32#include <signal.h>
33#include <cstdio>
34#include <cstring>
35#include <unistd.h>
36#include <sys/types.h>
37#include <sys/uio.h>
38
39#if defined(__linux__) || defined(__GNU__)
40#include <netinet/tcp.h>
41#if !defined(TCP_CORK)
42#undef HAVE_SENDFILE
43#endif
44#endif
45
46#ifdef HAVE_SENDFILE
47
48#if defined(__solaris__) || defined(__linux__) || defined(__GNU__)
49#include <sys/sendfile.h>
50#endif
51
52#endif
53
55#include "XrdSys/XrdSysError.hh"
56#include "XrdSys/XrdSysFD.hh"
58
59#include "Xrd/XrdBuffer.hh"
60#include "Xrd/XrdLink.hh"
61#include "Xrd/XrdLinkCtl.hh"
62#include "Xrd/XrdLinkXeq.hh"
63#include "Xrd/XrdPoll.hh"
64#include "Xrd/XrdScheduler.hh"
65#include "Xrd/XrdSendQ.hh"
66#include "Xrd/XrdTcpMonPin.hh"
67
68#define TRACE_IDENT ID
69#include "Xrd/XrdTrace.hh"
70
71/******************************************************************************/
72/* G l o b a l s */
73/******************************************************************************/
74
75namespace XrdGlobal
76{
77extern XrdSysError Log;
78extern XrdScheduler Sched;
79extern XrdTlsContext *tlsCtx;
81extern int devNull;
83};
84
85using namespace XrdGlobal;
86
87/******************************************************************************/
88/* S t a t i c s */
89/******************************************************************************/
90
91 const char *XrdLinkXeq::TraceID = "LinkXeq";
92
93 long long XrdLinkXeq::LinkBytesIn = 0;
94 long long XrdLinkXeq::LinkBytesOut = 0;
95 long long XrdLinkXeq::LinkConTime = 0;
96 long long XrdLinkXeq::LinkCountTot = 0;
103
104/******************************************************************************/
105/* C o n s t r u c t o r */
106/******************************************************************************/
107
109{
111}
112
114{
115 memcpy(Uname+sizeof(Uname)-7, "anon.0@", 7);
116 strcpy(Lname, "somewhere");
117 ID = &Uname[sizeof(Uname)-5];
118 Comment = ID;
119 sendQ = 0;
120 stallCnt = stallCntTot = 0;
121 tardyCnt = tardyCntTot = 0;
122 SfIntr = 0;
123 isIdle = 0;
125 LockReads= false;
126 KeepFD = false;
127 Protocol = 0;
128 ProtoAlt = 0;
129 CloseRequestCb = 0;
130
131 LinkInfo.Reset();
132 PollInfo.Zorch();
133 ResetLink();
134}
135
136/******************************************************************************/
137/* B a c k l o g */
138/******************************************************************************/
139
141{
143
144// Return backlog information
145//
146 return (sendQ ? sendQ->Backlog() : 0);
147}
148
149/******************************************************************************/
150/* C l i e n t */
151/******************************************************************************/
152
153int XrdLinkXeq::Client(char *nbuf, int nbsz)
154{
155 int ulen;
156
157// Generate full client name
158//
159 if (nbsz <= 0) return 0;
160 ulen = (Lname - ID);
161 if ((ulen + HNlen) >= nbsz) ulen = 0;
162 else {strncpy(nbuf, ID, ulen);
163 strcpy(nbuf+ulen, HostName);
164 ulen += HNlen;
165 }
166 return ulen;
167}
168
169/******************************************************************************/
170/* C l o s e */
171/******************************************************************************/
172
173int XrdLinkXeq::Close(bool defer)
174{ XrdSysMutexHelper opHelper(LinkInfo.opMutex);
175 int csec, fd, rc = 0;
176
177// If a defer close is requested, we can close the descriptor but we must
178// keep the slot number to prevent a new client getting the same fd number.
179// Linux is peculiar in that any in-progress operations will remain in that
180// state even after the FD is closed unless there is some activity either on
181// the connection or an event occurs that causes an operation restart. We
182// portably solve this problem by issuing a shutdown() on the socket prior
183// closing it. On most platforms, this informs readers that the connection is
184// gone (though not on old (i.e. <= 2.3) versions of Linux, sigh). Also, if
185// nonblocking mode is enabled, we need to do this in a separate thread as
186// a shutdown may block for a pretty long time if lots\ of messages are queued.
187// We will ask the SendQ object to schedule the shutdown for us before it
188// commits suicide.
189// Note that we can hold the opMutex while we also get the wrMutex.
190//
191 if (defer)
192 {if (!sendQ) Shutdown(false);
193 else {TRACEI(DEBUG, "Shutdown FD " <<LinkInfo.FD<<" only via SendQ");
194 LinkInfo.InUse++;
195 LinkInfo.FD = -LinkInfo.FD; // Leave poll version untouched!
196 wrMutex.Lock();
197 sendQ->Terminate(this);
198 sendQ = 0;
199 wrMutex.UnLock();
200 }
201 return 0;
202 }
203
204// If we got here then this is not a deferred close so we just need to check
205// if there is a sendq appendage we need to get rid of.
206//
207 if (sendQ)
208 {wrMutex.Lock();
209 sendQ->Terminate();
210 sendQ = 0;
211 wrMutex.UnLock();
212 }
213
214// Multiple protocols may be bound to this link. If it is in use, defer the
215// actual close until the use count drops to one.
216//
217 while(LinkInfo.InUse > 1)
218 {opHelper.UnLock();
219 TRACEI(DEBUG, "Close FD "<<LinkInfo.FD <<" deferred, use count="
220 <<LinkInfo.InUse);
221 Serialize();
222 opHelper.Lock(&LinkInfo.opMutex);
223 }
224 LinkInfo.InUse--;
225 Instance = 0;
226
227// Add up the statistic for this link
228//
229 syncStats(&csec);
230
231// Cleanup TLS if it is active
232//
233 if (isTLS) tlsIO.Shutdown();
234
235// Clean this link up
236//
237 if (Protocol) {Protocol->Recycle(this, csec, LinkInfo.Etext); Protocol = 0;}
238 if (ProtoAlt) {ProtoAlt->Recycle(this, csec, LinkInfo.Etext); ProtoAlt = 0;}
239 if (LinkInfo.Etext) {free(LinkInfo.Etext); LinkInfo.Etext = 0;}
240 LinkInfo.InUse = 0;
241
242// At this point we can have no lock conflicts, so if someone is waiting for
243// us to terminate let them know about it. Note that we will get the condvar
244// mutex while we hold the opMutex. This is the required order! We will also
245// zero out the pointer to the condvar while holding the opmutex.
246//
247 if (LinkInfo.KillcvP)
248 {LinkInfo.KillcvP->Lock();
249 LinkInfo.KillcvP->Signal();
250 LinkInfo.KillcvP->UnLock();
251 LinkInfo.KillcvP = 0;
252 }
253
254// Remove ourselves from the poll table and then from the Link table. We may
255// not hold on to the opMutex when we acquire the LTMutex. However, the link
256// table needs to be cleaned up prior to actually closing the socket. So, we
257// do some fancy footwork to prevent multiple closes of this link.
258//
259 fd = abs(LinkInfo.FD);
260 if (PollInfo.FD > 0)
261 {if (PollInfo.Poller) {XrdPoll::Detach(PollInfo); PollInfo.Poller = 0;}
262 PollInfo.FD = -1;
263 opHelper.UnLock();
265 } else opHelper.UnLock();
266
267// Invoke the TCP monitor if it was loaded.
268//
269 if (TcpMonPin && fd > 2)
270 {XrdTcpMonPin::LinkInfo lnkInfo;
271 lnkInfo.tident = ID;
272 lnkInfo.fd = fd;
273 lnkInfo.consec = csec;
274 lnkInfo.bytesIn = BytesInTot;
275 lnkInfo.bytesOut = BytesOutTot;
276 TcpMonPin->Monitor(Addr, lnkInfo, sizeof(lnkInfo));
277 }
278
279// Close the file descriptor if it isn't being shared. Do it as the last
280// thing because closes and accepts and not interlocked.
281//
282 if (fd >= 2) {if (KeepFD) rc = 0;
283 else rc = (close(fd) < 0 ? errno : 0);
284 }
285 if (rc) Log.Emsg("Link", rc, "close", ID);
286 return rc;
287}
288
289/******************************************************************************/
290/* D o I t */
291/******************************************************************************/
292
294{
295 int rc;
296
297// The Process() return code tells us what to do:
298// < 0 -> Stop getting requests,
299// -EINPROGRESS leave link disabled but otherwise all is well
300// -n Error, disable and close the link
301// = 0 -> OK, get next request, if allowed, o/w enable the link
302// > 0 -> Slow link, stop getting requests and enable the link
303//
304 if (Protocol)
305 do {rc = Protocol->Process(this);} while (!rc && Sched.canStick());
306 else {Log.Emsg("Link", "Dispatch on closed link", ID);
307 return;
308 }
309
310// Either re-enable the link and cycle back waiting for a new request, leave
311// disabled, or terminate the connection.
312//
313 bool doCl = false;
314 if (rc >= 0)
315 {if (PollInfo.Poller && !PollInfo.Poller->Enable(PollInfo)) doCl = true;}
316 else if (rc != -EINPROGRESS) doCl = true;
317
318 if (doCl)
319 {if (CloseRequestCb)
320 {const bool res = CloseRequestCb(CloseRequestCbArg);
321 if (!res) return;
322 }
323 Close();
324 }
325}
326
327/******************************************************************************/
328/* g e t P e e r C e r t s */
329/******************************************************************************/
330
332{
333 return (isTLS ? tlsIO.getCerts(true) : 0);
334}
335
336/******************************************************************************/
337/* P e e k */
338/******************************************************************************/
339
340int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)
341{
342 XrdSysMutexHelper theMutex;
343 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
344 ssize_t mlen;
345 int retc;
346
347// Lock the read mutex if we need to, the helper will unlock it upon exit
348//
349 if (LockReads) theMutex.Lock(&rdMutex);
350
351// Wait until we can actually read something
352//
353 isIdle = 0;
354 do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
355 if (retc != 1)
356 {if (retc == 0) return 0;
357 return Log.Emsg("Link", -errno, "poll", ID);
358 }
359
360// Verify it is safe to read now
361//
362 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
363 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
364 return -1;
365 }
366
367// Do the peek.
368//
369 do {mlen = recv(LinkInfo.FD, Buff, Blen, MSG_PEEK);}
370 while(mlen < 0 && errno == EINTR);
371
372// Return the result
373//
374 if (mlen >= 0) return int(mlen);
375 Log.Emsg("Link", errno, "peek on", ID);
376 return -1;
377}
378
379/******************************************************************************/
380/* R e c v */
381/******************************************************************************/
382
383int XrdLinkXeq::Recv(char *Buff, int Blen)
384{
385 ssize_t rlen;
386
387// Note that we will read only as much as is queued. Use Recv() with a
388// timeout to receive as much data as possible.
389//
390 if (LockReads) rdMutex.Lock();
391 isIdle = 0;
392 do {rlen = read(LinkInfo.FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
393 if (rlen > 0) AtomicAdd(BytesIn, rlen);
394 if (LockReads) rdMutex.UnLock();
395
396 if (rlen >= 0) return int(rlen);
397 if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
398 return -1;
399}
400
401/******************************************************************************/
402
403int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
404{
405 XrdSysMutexHelper theMutex;
406 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
407 ssize_t rlen, totlen = 0;
408 int retc;
409
410// Lock the read mutex if we need to, the helper will unlock it upon exit
411//
412 if (LockReads) theMutex.Lock(&rdMutex);
413
414// Wait up to timeout milliseconds for data to arrive
415//
416 isIdle = 0;
417 while(Blen > 0)
418 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
419 if (retc != 1)
420 {if (retc == 0)
421 {tardyCnt++;
422 if (totlen)
423 {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
424 AtomicAdd(BytesIn, totlen);
425 }
426 return int(totlen);
427 }
428 return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
429 }
430
431 // Verify it is safe to read now
432 //
433 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
434 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
435 "polling", ID);
436 return -1;
437 }
438
439 // Read as much data as you can. Note that we will force an error
440 // if we get a zero-length read after poll said it was OK.
441 //
442 do {rlen = recv(LinkInfo.FD, Buff, Blen, 0);}
443 while(rlen < 0 && errno == EINTR);
444 if (rlen <= 0)
445 {if (!rlen) return -ENOMSG;
446 if (LinkInfo.FD > 0) Log.Emsg("Link", -errno, "receive from", ID);
447 return -1;
448 }
449 totlen += rlen; Blen -= rlen; Buff += rlen;
450 }
451
452 AtomicAdd(BytesIn, totlen);
453 return int(totlen);
454}
455
456/******************************************************************************/
457
458int XrdLinkXeq::Recv(const struct iovec *iov, int iocnt, int timeout)
459{
460 XrdSysMutexHelper theMutex;
461 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
462 int retc, rlen;
463
464// Lock the read mutex if we need to, the helper will unlock it upon exit
465//
466 if (LockReads) theMutex.Lock(&rdMutex);
467
468// Wait up to timeout milliseconds for data to arrive
469//
470 isIdle = 0;
471 do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
472 if (retc != 1)
473 {if (retc == 0)
474 {tardyCnt++;
475 return 0;
476 }
477 return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
478 }
479
480// Verify it is safe to read now
481//
482 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
483 {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
484 return -1;
485 }
486
487// If the iocnt is within limits then just go ahead and read once.
488//
489 if (iocnt <= maxIOV)
490 {rlen = RecvIOV(iov, iocnt);
491 if (rlen > 0) {AtomicAdd(BytesIn, rlen);}
492 return rlen;
493 }
494
495// We will have to break this up into allowable segments and we need to add up
496// the bytes in each segment so that we know when to stop reading.
497//
498 int seglen, segcnt = maxIOV, totlen = 0;
499 do {seglen = 0;
500 for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
501 if ((rlen = RecvIOV(iov, segcnt)) < 0) return rlen;
502 totlen += rlen;
503 if (rlen < seglen) break;
504 iov += segcnt;
505 iocnt -= segcnt;
506 if (iocnt <= maxIOV) segcnt = iocnt;
507 } while(iocnt > 0);
508
509// All done
510//
511 AtomicAdd(BytesIn, totlen);
512 return totlen;
513}
514
515/******************************************************************************/
516/* R e c v A l l */
517/******************************************************************************/
518
519int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
520{
521 struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
522 ssize_t rlen;
523 int retc;
524
525// Check if timeout specified. Notice that the timeout is the max we will
526// for some data. We will wait forever for all the data. Yeah, it's weird.
527//
528 if (timeout >= 0)
529 {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
530 if (retc != 1)
531 {if (!retc) return -ETIMEDOUT;
532 Log.Emsg("Link",errno,"poll",ID);
533 return -1;
534 }
535 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
536 {Log.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
537 return -1;
538 }
539 }
540
541// Note that we will block until we receive all he bytes.
542//
543 if (LockReads) rdMutex.Lock();
544 isIdle = 0;
545 do {rlen = recv(LinkInfo.FD, Buff, Blen, MSG_WAITALL);}
546 while(rlen < 0 && errno == EINTR);
547 if (rlen > 0) AtomicAdd(BytesIn, rlen);
548 if (LockReads) rdMutex.UnLock();
549
550 if (int(rlen) == Blen) return Blen;
551 if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
552 else if (rlen > 0) Log.Emsg("RecvAll", "Premature end from", ID);
553 else if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
554 return -1;
555}
556
557/******************************************************************************/
558/* Protected: R e c v I O V */
559/******************************************************************************/
560
561int XrdLinkXeq::RecvIOV(const struct iovec *iov, int iocnt)
562{
563 ssize_t retc = 0;
564
565// Read the data in. On some version of Unix (e.g., Linux) a readv() may
566// end at any time without reading all the bytes when directed to a socket.
567// We always return the number bytes read (or an error). The caller needs to
568// restart the read at the appropriate place in the iovec when more data arrives.
569//
570 do {retc = readv(LinkInfo.FD, iov, iocnt);}
571 while(retc < 0 && errno == EINTR);
572
573// Check how we completed
574//
575 if (retc < 0) Log.Emsg("Link", errno, "receive from", ID);
576 return retc;
577}
578
579/******************************************************************************/
580/* R e g i s t e r */
581/******************************************************************************/
582
583bool XrdLinkXeq::Register(const char *hName)
584{
585
586// First see if we can register this name with the address object
587//
588 if (!Addr.Register(hName)) return false;
589
590// Make appropriate changes here
591//
592 if (HostName) free(HostName);
593 HostName = strdup(hName);
594 strlcpy(Lname, hName, sizeof(Lname));
595 return true;
596}
597
598/******************************************************************************/
599/* S e n d */
600/******************************************************************************/
601
602int XrdLinkXeq::Send(const char *Buff, int Blen)
603{
604 ssize_t retc = 0, bytesleft = Blen;
605
606// Get a lock
607//
608 wrMutex.Lock();
609 isIdle = 0;
610 AtomicAdd(BytesOut, Blen);
611
612// Do non-blocking writes if we are setup to do so.
613//
614 if (sendQ)
615 {retc = sendQ->Send(Buff, Blen);
616 wrMutex.UnLock();
617 return retc;
618 }
619
620// Write the data out
621//
622 while(bytesleft)
623 {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
624 {if (errno == EINTR) continue;
625 else break;
626 }
627 bytesleft -= retc; Buff += retc;
628 }
629
630// All done
631//
632 wrMutex.UnLock();
633 if (retc >= 0) return Blen;
634 Log.Emsg("Link", errno, "send to", ID);
635 return -1;
636}
637
638/******************************************************************************/
639
640int XrdLinkXeq::Send(const struct iovec *iov, int iocnt, int bytes)
641{
642 int retc;
643
644// Get a lock and assume we will be successful (statistically we are)
645//
646 wrMutex.Lock();
647 isIdle = 0;
648 AtomicAdd(BytesOut, bytes);
649
650// Do non-blocking writes if we are setup to do so.
651//
652 if (sendQ)
653 {retc = sendQ->Send(iov, iocnt, bytes);
654 wrMutex.UnLock();
655 return retc;
656 }
657
658// If the iocnt is within limits then just go ahead and write this out
659//
660 if (iocnt <= maxIOV)
661 {retc = SendIOV(iov, iocnt, bytes);
662 wrMutex.UnLock();
663 return retc;
664 }
665
666// We will have to break this up into allowable segments
667//
668 int seglen, segcnt = maxIOV, iolen = 0;
669 do {seglen = 0;
670 for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
671 if ((retc = SendIOV(iov, segcnt, seglen)) < 0)
672 {wrMutex.UnLock();
673 return retc;
674 }
675 iolen += retc;
676 iov += segcnt;
677 iocnt -= segcnt;
678 if (iocnt <= maxIOV) segcnt = iocnt;
679 } while(iocnt > 0);
680
681// All done
682//
683 wrMutex.UnLock();
684 return iolen;
685}
686
687/******************************************************************************/
688
689int XrdLinkXeq::Send(const sfVec *sfP, int sfN)
690{
691#if !defined(HAVE_SENDFILE)
692
693 return -1;
694
695#elif defined(__solaris__)
696
697 sendfilevec_t vecSF[XrdOucSFVec::sfMax], *vecSFP = vecSF;
698 size_t xframt, totamt, bytes = 0;
699 ssize_t retc;
700 int i = 0;
701
702// Construct the sendfilev() vector
703//
704 for (i = 0; i < sfN; sfP++, i++)
705 {if (sfP->fdnum < 0)
706 {vecSF[i].sfv_fd = SFV_FD_SELF;
707 vecSF[i].sfv_off = (off_t)sfP->buffer;
708 } else {
709 vecSF[i].sfv_fd = sfP->fdnum;
710 vecSF[i].sfv_off = sfP->offset;
711 }
712 vecSF[i].sfv_flag = 0;
713 vecSF[i].sfv_len = sfP->sendsz;
714 bytes += sfP->sendsz;
715 }
716 totamt = bytes;
717
718// Lock the link, issue sendfilev(), and unlock the link. The documentation
719// is very spotty and inconsistent. We can only retry this operation under
720// very limited conditions.
721//
722 wrMutex.Lock();
723 isIdle = 0;
724do{retc = sendfilev(LinkInfo.FD, vecSFP, sfN, &xframt);
725
726// Check if all went well and return if so (usual case)
727//
728 if (xframt == bytes)
729 {AtomicAdd(BytesOut, bytes);
730 wrMutex.UnLock();
731 return totamt;
732 }
733
734// The only one we will recover from is EINTR. We cannot legally get EAGAIN.
735//
736 if (retc < 0 && errno != EINTR) break;
737
738// Try to resume the transfer
739//
740 if (xframt > 0)
741 {AtomicAdd(BytesOut, xframt); bytes -= xframt; SfIntr++;
742 while(xframt > 0 && sfN)
743 {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
744 {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
745 xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
746 }
747 }
748 } while(sfN > 0);
749
750// See if we can recover without destroying the connection
751//
752 retc = (retc < 0 ? errno : ECANCELED);
753 wrMutex.UnLock();
754 Log.Emsg("Link", retc, "send file to", ID);
755 return -1;
756
757#elif defined(__linux__) || defined(__GNU__)
758
759 static const int setON = 1, setOFF = 0;
760 ssize_t retc = 0, bytesleft;
761 off_t myOffset;
762 int i, xfrbytes = 0, uncork = 1, xIntr = 0;
763
764// lock the link
765//
766 wrMutex.Lock();
767 isIdle = 0;
768
769// In linux we need to cork the socket. On permanent errors we do not uncork
770// the socket because it will be closed in short order.
771//
772 if (setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
773 {Log.Emsg("Link", errno, "cork socket for", ID);
774 uncork = 0; sfOK = 0;
775 }
776
777// Send the header first
778//
779 for (i = 0; i < sfN; sfP++, i++)
780 {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
781 else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
782 while(bytesleft
783 && (retc=sendfile(LinkInfo.FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
784 {bytesleft -= retc; xIntr++;}
785 }
786 if (retc < 0 && errno == EINTR) continue;
787 if (retc <= 0) break;
788 xfrbytes += sfP->sendsz;
789 }
790
791// Diagnose any sendfile errors
792//
793 if (retc <= 0)
794 {if (retc == 0) errno = ECANCELED;
795 wrMutex.UnLock();
796 Log.Emsg("Link", errno, "send file to", ID);
797 return -1;
798 }
799
800// Now uncork the socket
801//
802 if (uncork
803 && setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
804 Log.Emsg("Link", errno, "uncork socket for", ID);
805
806// All done
807//
808 if (xIntr > sfN) SfIntr += (xIntr - sfN);
809 AtomicAdd(BytesOut, xfrbytes);
810 wrMutex.UnLock();
811 return xfrbytes;
812
813#else
814
815 return -1;
816
817#endif
818}
819
820/******************************************************************************/
821/* Protected: s e n d D a t a */
822/******************************************************************************/
823
824int XrdLinkXeq::sendData(const char *Buff, int Blen)
825{
826 ssize_t retc = 0, bytesleft = Blen;
827
828// Write the data out
829//
830 while(bytesleft)
831 {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
832 {if (errno == EINTR) continue;
833 else break;
834 }
835 bytesleft -= retc; Buff += retc;
836 }
837
838// All done
839//
840 return retc;
841}
842
843/******************************************************************************/
844/* Protected: S e n d I O V */
845/******************************************************************************/
846
847int XrdLinkXeq::SendIOV(const struct iovec *iov, int iocnt, int bytes)
848{
849 ssize_t bytesleft, n, retc = 0;
850 const char *Buff;
851
852// Write the data out. On some version of Unix (e.g., Linux) a writev() may
853// end at any time without writing all the bytes when directed to a socket.
854// So, we attempt to resume the writev() using a combination of write() and
855// a writev() continuation. This approach slowly converts a writev() to a
856// series of writes if need be. We must do this inline because we must hold
857// the lock until all the bytes are written or an error occurs.
858//
859 bytesleft = static_cast<ssize_t>(bytes);
860 while(bytesleft)
861 {do {retc = writev(LinkInfo.FD, iov, iocnt);}
862 while(retc < 0 && errno == EINTR);
863 if (retc >= bytesleft || retc < 0) break;
864 bytesleft -= retc;
865 while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
866 {retc -= n; iov++; iocnt--;}
867 Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
868 while(n) {if ((retc = write(LinkInfo.FD, Buff, n)) < 0)
869 {if (errno == EINTR) continue;
870 else break;
871 }
872 n -= retc; Buff += retc; bytesleft -= retc;
873 }
874 if (retc < 0 || iocnt < 1) break;
875 }
876
877// All done
878//
879 if (retc >= 0) return bytes;
880 Log.Emsg("Link", errno, "send to", ID);
881 return -1;
882}
883
884/******************************************************************************/
885/* s e t I D */
886/******************************************************************************/
887
888void XrdLinkXeq::setID(const char *userid, int procid)
889{
890 char buff[sizeof(Uname)], *bp, *sp;
891 int ulen;
892
893 snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, PollInfo.FD);
894 ulen = strlen(buff);
895 sp = buff + ulen - 1;
896 bp = &Uname[sizeof(Uname)-1];
897 if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
898 *bp = '@'; bp--;
899 while(ulen--) {*bp = *sp; bp--; sp--;}
900 ID = bp+1;
901 Comment = (const char *)ID;
902
903// Update the ID in the TLS socket if enabled
904//
905 if (isTLS) tlsIO.SetTraceID(ID);
906}
907
908/******************************************************************************/
909/* s e t N B */
910/******************************************************************************/
911
913{
914// We don't support non-blocking output except for Linux at the moment
915//
916#if !defined(__linux__)
917 return false;
918#else
919// Trace this request
920//
921 TRACEI(DEBUG,"enabling non-blocking output");
922
923// If we don't already have a sendQ object get one. This is a one-time call
924// so to optimize checking if this object exists we also get the opMutex.'
925//
926 LinkInfo.opMutex.Lock();
927 if (!sendQ)
928 {wrMutex.Lock();
929 sendQ = new XrdSendQ(*this, wrMutex);
930 wrMutex.UnLock();
931 }
932 LinkInfo.opMutex.UnLock();
933 return true;
934#endif
935}
936
937/******************************************************************************/
938/* s e t P r o t o c o l */
939/******************************************************************************/
940
942{
943
944// Set new protocol.
945//
946 LinkInfo.opMutex.Lock();
947 XrdProtocol *op = Protocol;
948 if (push) ProtoAlt = Protocol;
949 Protocol = pp;
950 LinkInfo.opMutex.UnLock();
951 return op;
952}
953
954/******************************************************************************/
955/* s e t P r o t N a m e */
956/******************************************************************************/
957
958void XrdLinkXeq::setProtName(const char *name)
959{
960
961// Set the protocol name.
962//
963 LinkInfo.opMutex.Lock();
964 Addr.SetDialect(name);
965 LinkInfo.opMutex.UnLock();
966}
967
968/******************************************************************************/
969/* s e t T L S */
970/******************************************************************************/
971
972bool XrdLinkXeq::setTLS(bool enable, XrdTlsContext *ctx)
973{ //???
974// static const XrdTlsConnection::RW_Mode rwMode=XrdTlsConnection::TLS_RNB_WBL;
977 const char *eNote;
978 XrdTls::RC rc;
979
980// If we are already in a compatible mode, we are done
981//
982
983 if (isTLS == enable) return true;
984
985// If this is a shutdown, then do it now.
986//
987 if (!enable)
988 {tlsIO.Shutdown();
989 isTLS = enable;
990 Addr.SetTLS(enable);
991 return true;
992 }
993// We want to initialize TLS, do so now.
994//
995 if (!ctx) ctx = tlsCtx;
996 eNote = tlsIO.Init(*ctx, PollInfo.FD, rwMode, hsMode, false, false, ID);
997
998// Check for errors
999//
1000 if (eNote)
1001 {char buff[1024];
1002 snprintf(buff, sizeof(buff), "Unable to enable tls for %s;", ID);
1003 Log.Emsg("LinkXeq", buff, eNote);
1004 return false;
1005 }
1006
1007// Now we need to accept this TLS connection
1008//
1009 std::string eMsg;
1010 rc = tlsIO.Accept(&eMsg);
1011
1012// Diagnose return state
1013//
1014 if (rc != XrdTls::TLS_AOK) Log.Emsg("LinkXeq", eMsg.c_str());
1015 else {isTLS = enable;
1016 Addr.SetTLS(enable);
1017 Log.Emsg("LinkXeq", ID, "connection upgraded to", verTLS());
1018 }
1019 return rc == XrdTls::TLS_AOK;
1020}
1021
1022/******************************************************************************/
1023/* S F E r r o r */
1024/******************************************************************************/
1025
1027{
1028 Log.Emsg("TLS", rc, "send file to", ID);
1029 return -1;
1030}
1031
1032/******************************************************************************/
1033/* S h u t d o w n */
1034/******************************************************************************/
1035
1036void XrdLinkXeq::Shutdown(bool getLock)
1037{
1038 int temp;
1039
1040// Trace the entry
1041//
1042 TRACEI(DEBUG, (getLock ? "Async" : "Sync") <<" link shutdown in progress");
1043
1044// Get the lock if we need too (external entry via another thread)
1045//
1046 if (getLock) LinkInfo.opMutex.Lock();
1047
1048// If there is something to do, do it now
1049//
1050 temp = Instance; Instance = 0;
1051 if (!KeepFD)
1052 {shutdown(PollInfo.FD, SHUT_RDWR);
1053 if (dup2(devNull, PollInfo.FD) < 0)
1054 {Instance = temp;
1055 Log.Emsg("Link", errno, "shutdown FD for", ID);
1056 }
1057 }
1058
1059// All done
1060//
1061 if (getLock) LinkInfo.opMutex.UnLock();
1062}
1063
1064/******************************************************************************/
1065/* S t a t s */
1066/******************************************************************************/
1067
1068int XrdLinkXeq::Stats(char *buff, int blen, bool do_sync)
1069{
1070 static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
1071 "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
1072 "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
1073 "<sfps>%d</sfps></stats>";
1074 int i;
1075
1076// Check if actual length wanted
1077//
1078 if (!buff) return sizeof(statfmt)+17*6;
1079
1080// We must synchronize the statistical counters
1081//
1082 if (do_sync) XrdLinkCtl::SyncAll();
1083
1084// Obtain lock on the stats area and format it
1085//
1087 i = snprintf(buff, blen, statfmt, AtomicGet(LinkCount),
1097 return i;
1098}
1099
1100/******************************************************************************/
1101/* s y n c S t a t s */
1102/******************************************************************************/
1103
1105{
1106 long long tmpLL;
1107 int tmpI4;
1108
1109// If this is dynamic, get the opMutex lock
1110//
1111 if (!ctime) LinkInfo.opMutex.Lock();
1112
1113// Either the caller has the opMutex or this is called out of close. In either
1114// case, we need to get the read and write mutexes; each followed by the stats
1115// mutex. This order is important because we should not hold the stats mutex
1116// for very long and the r/w mutexes may take a long time to acquire. If we
1117// must maintain the link count we need to actually acquire the stats mutex as
1118// we will be doing compound operations. Atomics are still used to keep other
1119// threads from seeing partial results.
1120//
1122
1123 if (ctime)
1124 {*ctime = time(0) - LinkInfo.conTime;
1125 AtomicAdd(LinkConTime, *ctime);
1126 statsMutex.Lock();
1127 if (LinkCount > 0) AtomicDec(LinkCount);
1128 statsMutex.UnLock();
1129 }
1130
1132
1133 tmpLL = AtomicFAZ(BytesIn);
1134 AtomicAdd(LinkBytesIn, tmpLL); AtomicAdd(BytesInTot, tmpLL);
1135 tmpI4 = AtomicFAZ(tardyCnt);
1137 tmpI4 = AtomicFAZ(stallCnt);
1138 AtomicAdd(LinkStalls, tmpI4); AtomicAdd(stallCntTot, tmpI4);
1140
1142 tmpLL = AtomicFAZ(BytesOut);
1144 tmpI4 = AtomicFAZ(SfIntr);
1145 AtomicAdd(LinkSfIntr, tmpI4);
1147
1148// Make sure the protocol updates it's statistics as well
1149//
1150 if (Protocol) Protocol->Stats(0, 0, 1);
1151
1152// All done
1153//
1154 if (!ctime) LinkInfo.opMutex.UnLock();
1155}
1156
1157/******************************************************************************/
1158/* Protected: T L S _ E r r o r */
1159/******************************************************************************/
1160
1161int XrdLinkXeq::TLS_Error(const char *act, XrdTls::RC rc)
1162{
1163 std::string reason = XrdTls::RC2Text(rc);
1164 char msg[512];
1165
1166 snprintf(msg, sizeof(msg), "Unable to %s %s;", act, ID);
1167 Log.Emsg("TLS", msg, reason.c_str());
1168 return -1;
1169}
1170
1171/******************************************************************************/
1172/* T L S _ P e e k */
1173/******************************************************************************/
1174
1175int XrdLinkXeq::TLS_Peek(char *Buff, int Blen, int timeout)
1176{
1177 XrdSysMutexHelper theMutex;
1178 XrdTls::RC retc;
1179 int rc, rlen;
1180
1181// Lock the read mutex if we need to, the helper will unlock it upon exit
1182//
1183 if (LockReads) theMutex.Lock(&rdMutex);
1184
1185// Wait until we can actually read something
1186//
1187 isIdle = 0;
1188 if (timeout)
1189 {rc = Wait4Data(timeout);
1190 if (rc < 1) return rc;
1191 }
1192
1193// Do the peek and if sucessful, the number of bytes available.
1194//
1195 retc = tlsIO.Peek(Buff, Blen, rlen);
1196 if (retc == XrdTls::TLS_AOK) return rlen;
1197
1198// Dianose the TLS error and return failure
1199//
1200 return TLS_Error("peek on", retc);
1201}
1202
1203/******************************************************************************/
1204/* T L S _ R e c v */
1205/******************************************************************************/
1206
1207int XrdLinkXeq::TLS_Recv(char *Buff, int Blen)
1208{
1209 XrdSysMutexHelper theMutex;
1210 XrdTls::RC retc;
1211 int rlen;
1212
1213// Lock the read mutex if we need to, the helper will unlock it upon exit
1214//
1215 if (LockReads) theMutex.Lock(&rdMutex);
1216
1217// Note that we will read only as much as is queued. Use Recv() with a
1218// timeout to receive as much data as possible.
1219//
1220 isIdle = 0;
1221 retc = tlsIO.Read(Buff, Blen, rlen);
1222 if (retc != XrdTls::TLS_AOK) return TLS_Error("receive from", retc);
1223 if (rlen > 0) AtomicAdd(BytesIn, rlen);
1224 return rlen;
1225}
1226
1227/******************************************************************************/
1228
1229int XrdLinkXeq::TLS_Recv(char *Buff, int Blen, int timeout, bool havelock)
1230{
1231 XrdSysMutexHelper theMutex;
1232 XrdTls::RC retc;
1233 int pend, rlen, totlen = 0;
1234
1235// Lock the read mutex if we need to, the helper will unlock it upon exit
1236//
1237 if (LockReads && !havelock) theMutex.Lock(&rdMutex);
1238
1239// Wait up to timeout milliseconds for data to arrive
1240//
1241 isIdle = 0;
1242 while(Blen > 0)
1243 {pend = tlsIO.Pending(true);
1244 if (!pend) pend = Wait4Data(timeout);
1245 if (pend < 1)
1246 {if (pend < 0) return -1;
1247 tardyCnt++;
1248 if (totlen)
1249 {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
1250 AtomicAdd(BytesIn, totlen);
1251 }
1252 return totlen;
1253 }
1254
1255 // Read as much data as you can. Note that we will force an error
1256 // if we get a zero-length read after poll said it was OK. However,
1257 // if we never read anything, then we simply return -ENOMSG to avoid
1258 // generating a "read link error" as clearly there was a hangup.
1259 //
1260 retc = tlsIO.Read(Buff, Blen, rlen);
1261 if (retc != XrdTls::TLS_AOK)
1262 {if (!totlen) return -ENOMSG;
1263 AtomicAdd(BytesIn, totlen);
1264 return TLS_Error("receive from", retc);
1265 }
1266 if (rlen <= 0) break;
1267 totlen += rlen; Blen -= rlen; Buff += rlen;
1268 }
1269
1270 AtomicAdd(BytesIn, totlen);
1271 return totlen;
1272}
1273
1274/******************************************************************************/
1275
1276int XrdLinkXeq::TLS_Recv(const struct iovec *iov, int iocnt, int timeout)
1277{
1278 XrdSysMutexHelper theMutex;
1279 char *Buff;
1280 int Blen, rlen, totlen = 0;
1281
1282// Lock the read mutex if we need to, the helper will unlock it upon exit
1283//
1284 if (LockReads) theMutex.Lock(&rdMutex);
1285
1286// Individually process each element until we can't read any more
1287//
1288 isIdle = 0;
1289 for (int i = 0; i < iocnt; i++)
1290 {Buff = (char *)iov[i].iov_base;
1291 Blen = iov[i].iov_len;
1292 rlen = TLS_Recv(Buff, Blen, timeout, true);
1293 if (rlen <= 0) break;
1294 totlen += rlen;
1295 if (rlen < Blen) break;
1296 }
1297
1298 if (totlen) {AtomicAdd(BytesIn, totlen);}
1299 return totlen;
1300}
1301
1302/******************************************************************************/
1303/* T L S _ R e c v A l l */
1304/******************************************************************************/
1305
1306int XrdLinkXeq::TLS_RecvAll(char *Buff, int Blen, int timeout)
1307{
1308 int retc;
1309
1310// Check if timeout specified. Notice that the timeout is the max we will
1311// wait for some data. We will wait forever for all the data. Yeah, it's weird.
1312//
1313 if (timeout >= 0)
1314 {retc = tlsIO.Pending(true);
1315 if (!retc) retc = Wait4Data(timeout);
1316 if (retc < 1) return (retc ? -1 : -ETIMEDOUT);
1317 }
1318
1319// Note that we will block until we receive all the bytes.
1320//
1321 return TLS_Recv(Buff, Blen, -1);
1322}
1323
1324/******************************************************************************/
1325/* T L S _ S e n d */
1326/******************************************************************************/
1327
1328int XrdLinkXeq::TLS_Send(const char *Buff, int Blen)
1329{
1331 ssize_t bytesleft = Blen;
1332 XrdTls::RC retc;
1333 int byteswritten;
1334
1335// Prepare to send
1336//
1337 isIdle = 0;
1338 AtomicAdd(BytesOut, Blen);
1339
1340// Do non-blocking writes if we are setup to do so.
1341//
1342 if (sendQ) return sendQ->Send(Buff, Blen);
1343
1344// Write the data out
1345//
1346 while(bytesleft)
1347 {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1348 if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1349 bytesleft -= byteswritten; Buff += byteswritten;
1350 }
1351
1352// All done
1353//
1354 return Blen;
1355}
1356
1357/******************************************************************************/
1358
1359int XrdLinkXeq::TLS_Send(const struct iovec *iov, int iocnt, int bytes)
1360{
1362 XrdTls::RC retc;
1363 int byteswritten;
1364
1365// Get a lock and assume we will be successful (statistically we are). Note
1366// that the calling interface gauranteed bytes are not zero.
1367//
1368 isIdle = 0;
1369 AtomicAdd(BytesOut, bytes);
1370
1371// Do non-blocking writes if we are setup to do so.
1372//
1373 if (sendQ) return sendQ->Send(iov, iocnt, bytes);
1374
1375// Write the data out.
1376//
1377 for (int i = 0; i < iocnt; i++)
1378 {ssize_t bytesleft = iov[i].iov_len;
1379 char *Buff = (char *)iov[i].iov_base;
1380 while(bytesleft)
1381 {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1382 if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1383 bytesleft -= byteswritten; Buff += byteswritten;
1384 }
1385 }
1386
1387// All done
1388//
1389 return bytes;
1390}
1391
1392/******************************************************************************/
1393
1394int XrdLinkXeq::TLS_Send(const sfVec *sfP, int sfN)
1395{
1397 int bytes, buffsz, fileFD, retc;
1398 off_t offset;
1399 ssize_t totamt = 0;
1400 char myBuff[65536];
1401
1402// Convert the sendfile to a regular send. The conversion is not particularly
1403// fast and caller are advised to avoid using sendfile on TLS connections.
1404//
1405 isIdle = 0;
1406 for (int i = 0; i < sfN; sfP++, i++)
1407 {if (!(bytes = sfP->sendsz)) continue;
1408 totamt += bytes;
1409 if (sfP->fdnum < 0)
1410 {if (!TLS_Write(sfP->buffer, bytes)) return -1;
1411 continue;
1412 }
1413 offset = sfP->offset;
1414 fileFD = sfP->fdnum;
1415 buffsz = (bytes < (int)sizeof(myBuff) ? bytes : sizeof(myBuff));
1416 do {do {retc = pread(fileFD, myBuff, buffsz, offset);}
1417 while(retc < 0 && errno == EINTR);
1418 if (retc < 0) return SFError(errno);
1419 if (!retc) break;
1420 if (!TLS_Write(myBuff, buffsz)) return -1;
1421 offset += buffsz; bytes -= buffsz; totamt += retc;
1422 } while(bytes > 0);
1423 }
1424
1425// We are done
1426//
1427 AtomicAdd(BytesOut, totamt);
1428 return totamt;
1429}
1430
1431/******************************************************************************/
1432/* Protected: T L S _ W r i t e */
1433/******************************************************************************/
1434
1435bool XrdLinkXeq::TLS_Write(const char *Buff, int Blen)
1436{
1437 XrdTls::RC retc;
1438 int byteswritten;
1439
1440// Write the data out
1441//
1442 while(Blen)
1443 {retc = tlsIO.Write(Buff, Blen, byteswritten);
1444 if (retc != XrdTls::TLS_AOK)
1445 {TLS_Error("write to", retc);
1446 return false;
1447 }
1448 Blen -= byteswritten; Buff += byteswritten;
1449 }
1450
1451// All done
1452//
1453 return true;
1454}
1455
1456/******************************************************************************/
1457/* v e r T L S */
1458/******************************************************************************/
1459
1461{
1462 return tlsIO.Version();
1463}
1464
1465/******************************************************************************/
1466/* R e g i s t e r C l o s e R e q u e s t C b */
1467/******************************************************************************/
1468
1470 void* cbarg)
1471{
1472 if (pp != Protocol) return false;
1473
1474 CloseRequestCb = cb;
1475 CloseRequestCbArg = cbarg;
1476 return true;
1477}
#define DEBUG(x)
#define close(a)
Definition XrdPosix.hh:48
#define write(a, b, c)
Definition XrdPosix.hh:115
#define writev(a, b, c)
Definition XrdPosix.hh:117
#define readv(a, b, c)
Definition XrdPosix.hh:84
#define read(a, b, c)
Definition XrdPosix.hh:82
#define pread(a, b, c, d)
Definition XrdPosix.hh:80
#define eMsg(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACEI(act, x)
Definition XrdTrace.hh:66
const char * Comment
Definition XrdJob.hh:47
static void SyncAll()
Synchronize statustics for ll links.
static void Unhook(int fd)
Unhook a link from the active table of links.
static const char * TraceID
bool(* CloseRequestCb)(void *)
int TLS_Send(const char *Buff, int Blen)
long long BytesOut
int TLS_Error(const char *act, XrdTls::RC rc)
int TLS_Peek(char *Buff, int Blen, int timeout)
int Client(char *buff, int blen)
char Uname[24]
XrdTlsPeerCerts * getPeerCerts()
static int LinkCountMax
XrdLinkInfo LinkInfo
XrdProtocol * ProtoAlt
int Close(bool defer=false)
XrdNetAddr Addr
int TLS_Recv(char *Buff, int Blen)
int sendData(const char *Buff, int Blen)
long long BytesInTot
bool TLS_Write(const char *Buff, int Blen)
int SendIOV(const struct iovec *iov, int iocnt, int bytes)
XrdProtocol * setProtocol(XrdProtocol *pp, bool push)
static long long LinkCountTot
long long BytesOutTot
void Shutdown(bool getLock)
int Peek(char *buff, int blen, int timeout=-1)
static int LinkCount
void Reset()
int Backlog()
XrdSysMutex wrMutex
static int Stats(char *buff, int blen, bool do_sync=false)
XrdSendQ * sendQ
XrdPollInfo PollInfo
void setID(const char *userid, int procid)
int Recv(char *buff, int blen)
static long long LinkBytesIn
void * CloseRequestCbArg
int TLS_RecvAll(char *Buff, int Blen, int timeout)
int SFError(int rc)
long long BytesIn
int Send(const char *buff, int blen)
XrdSysMutex rdMutex
const char * verTLS()
bool setNB()
int RecvIOV(const struct iovec *iov, int iocnt)
char Lname[256]
static long long LinkConTime
static int LinkSfIntr
XrdTlsSocket tlsIO
void DoIt()
int RecvAll(char *buff, int blen, int timeout=-1)
XrdProtocol * Protocol
bool Register(const char *hName)
static XrdSysMutex statsMutex
void setProtName(const char *name)
static int LinkStalls
static long long LinkBytesOut
void syncStats(int *ctime=0)
bool setTLS(bool enable, XrdTlsContext *ctx=0)
static int LinkTimeOuts
bool RegisterCloseRequestCb(XrdProtocol *pp, bool(*cb)(void *), void *cbarg)
static char * Poll2Text(short events)
Definition XrdPoll.cc:272
static void Detach(XrdPollInfo &pInfo)
Definition XrdPoll.cc:177
void Lock(XrdSysMutex *Mutex)
int fd
Socket file descriptor.
long long bytesOut
Bytes written to the socket.
int consec
Seconds connected.
long long bytesIn
Bytes read from the socket.
const char * tident
Pointer to the client's trace identifier.
@ TLS_HS_BLOCK
Always block during handshake.
@ TLS_RBL_WBL
blocking read blocking write
static std::string RC2Text(XrdTls::RC rc, bool dbg=false)
Definition XrdTls.cc:127
@ TLS_AOK
All went well, will always be zero.
Definition XrdTls.hh:40
XrdTlsContext * tlsCtx
Definition XrdGlobals.cc:52
XrdTcpMonPin * TcpMonPin
Definition XrdLinkXeq.cc:80
const int maxIOV
Definition XrdLinkXeq.cc:82
XrdSysError Log
Definition XrdConfig.cc:113
XrdScheduler Sched
Definition XrdLinkCtl.cc:54
int getIovMax()
int fdnum
File descriptor for data.
int sendsz
Length of data at offset.