Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(543)

Side by Side Diff: net/third_party/udt/src/core.cpp

Issue 6708091: Remove UDT. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/third_party/udt/src/core.h ('k') | net/third_party/udt/src/epoll.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*****************************************************************************
2 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are
7 met:
8
9 * Redistributions of source code must retain the above
10 copyright notice, this list of conditions and the
11 following disclaimer.
12
13 * Redistributions in binary form must reproduce the
14 above copyright notice, this list of conditions
15 and the following disclaimer in the documentation
16 and/or other materials provided with the distribution.
17
18 * Neither the name of the University of Illinois
19 nor the names of its contributors may be used to
20 endorse or promote products derived from this
21 software without specific prior written permission.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
25 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *****************************************************************************/
35
36 /*****************************************************************************
37 written by
38 Yunhong Gu, last updated 01/22/2011
39 *****************************************************************************/
40
41 #ifndef WIN32
42 #include <unistd.h>
43 #include <netdb.h>
44 #include <arpa/inet.h>
45 #include <cerrno>
46 #include <cstring>
47 #include <cstdlib>
48 #else
49 #include <winsock2.h>
50 #include <ws2tcpip.h>
51 #ifdef LEGACY_WIN32
52 #include <wspiapi.h>
53 #endif
54 #endif
55 #include <cmath>
56 #include "queue.h"
57 #include "core.h"
58
59 using namespace std;
60
61
62 CUDTUnited CUDT::s_UDTUnited;
63
64 const UDTSOCKET CUDT::INVALID_SOCK = -1;
65 const int CUDT::ERROR = -1;
66
67 const UDTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK;
68 const int UDT::ERROR = CUDT::ERROR;
69
70 const int32_t CSeqNo::m_iSeqNoTH = 0x3FFFFFFF;
71 const int32_t CSeqNo::m_iMaxSeqNo = 0x7FFFFFFF;
72 const int32_t CAckNo::m_iMaxAckSeqNo = 0x7FFFFFFF;
73 const int32_t CMsgNo::m_iMsgNoTH = 0xFFFFFFF;
74 const int32_t CMsgNo::m_iMaxMsgNo = 0x1FFFFFFF;
75
76 const int CUDT::m_iVersion = 4;
77 const int CUDT::m_iSYNInterval = 10000;
78 const int CUDT::m_iSelfClockInterval = 64;
79
80
81 CUDT::CUDT()
82 {
83 m_pSndBuffer = NULL;
84 m_pRcvBuffer = NULL;
85 m_pSndLossList = NULL;
86 m_pRcvLossList = NULL;
87 m_pACKWindow = NULL;
88 m_pSndTimeWindow = NULL;
89 m_pRcvTimeWindow = NULL;
90
91 m_pSndQueue = NULL;
92 m_pRcvQueue = NULL;
93 m_pPeerAddr = NULL;
94 m_pSNode = NULL;
95 m_pRNode = NULL;
96
97 // Initilize mutex and condition variables
98 initSynch();
99
100 // Default UDT configurations
101 m_iMSS = 1500;
102 m_bSynSending = true;
103 m_bSynRecving = true;
104 m_iFlightFlagSize = 25600;
105 m_iSndBufSize = 8192;
106 m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size
107 m_Linger.l_onoff = 1;
108 m_Linger.l_linger = 180;
109 m_iUDPSndBufSize = 65536;
110 m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS;
111 m_iSockType = UDT_STREAM;
112 m_iIPversion = AF_INET;
113 m_bRendezvous = false;
114 m_iSndTimeOut = -1;
115 m_iRcvTimeOut = -1;
116 m_bReuseAddr = true;
117 m_llMaxBW = -1;
118
119 m_pCCFactory = new CCCFactory<CUDTCC>;
120 m_pCC = NULL;
121 m_pCache = NULL;
122
123 // Initial status
124 m_bOpened = false;
125 m_bListening = false;
126 m_bConnected = false;
127 m_bClosing = false;
128 m_bShutdown = false;
129 m_bBroken = false;
130 m_bPeerHealth = true;
131 m_ullLingerExpiration = 0;
132 }
133
134 CUDT::CUDT(const CUDT& ancestor)
135 {
136 m_pSndBuffer = NULL;
137 m_pRcvBuffer = NULL;
138 m_pSndLossList = NULL;
139 m_pRcvLossList = NULL;
140 m_pACKWindow = NULL;
141 m_pSndTimeWindow = NULL;
142 m_pRcvTimeWindow = NULL;
143
144 m_pSndQueue = NULL;
145 m_pRcvQueue = NULL;
146 m_pPeerAddr = NULL;
147 m_pSNode = NULL;
148 m_pRNode = NULL;
149
150 // Initilize mutex and condition variables
151 initSynch();
152
153 // Default UDT configurations
154 m_iMSS = ancestor.m_iMSS;
155 m_bSynSending = ancestor.m_bSynSending;
156 m_bSynRecving = ancestor.m_bSynRecving;
157 m_iFlightFlagSize = ancestor.m_iFlightFlagSize;
158 m_iSndBufSize = ancestor.m_iSndBufSize;
159 m_iRcvBufSize = ancestor.m_iRcvBufSize;
160 m_Linger = ancestor.m_Linger;
161 m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize;
162 m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize;
163 m_iSockType = ancestor.m_iSockType;
164 m_iIPversion = ancestor.m_iIPversion;
165 m_bRendezvous = ancestor.m_bRendezvous;
166 m_iSndTimeOut = ancestor.m_iSndTimeOut;
167 m_iRcvTimeOut = ancestor.m_iRcvTimeOut;
168 m_bReuseAddr = true; // this must be true, because all accepted sockets share d the same port with the listener
169 m_llMaxBW = ancestor.m_llMaxBW;
170
171 m_pCCFactory = ancestor.m_pCCFactory->clone();
172 m_pCC = NULL;
173 m_pCache = ancestor.m_pCache;
174
175 // Initial status
176 m_bOpened = false;
177 m_bListening = false;
178 m_bConnected = false;
179 m_bClosing = false;
180 m_bShutdown = false;
181 m_bBroken = false;
182 m_bPeerHealth = true;
183 m_ullLingerExpiration = 0;
184 }
185
186 CUDT::~CUDT()
187 {
188 // release mutex/condtion variables
189 destroySynch();
190
191 // destroy the data structures
192 delete m_pSndBuffer;
193 delete m_pRcvBuffer;
194 delete m_pSndLossList;
195 delete m_pRcvLossList;
196 delete m_pACKWindow;
197 delete m_pSndTimeWindow;
198 delete m_pRcvTimeWindow;
199 delete m_pCCFactory;
200 delete m_pCC;
201 delete m_pPeerAddr;
202 delete m_pSNode;
203 delete m_pRNode;
204 }
205
206 void CUDT::setOpt(UDTOpt optName, const void* optval, const int&)
207 {
208 if (m_bBroken || m_bClosing)
209 throw CUDTException(2, 1, 0);
210
211 CGuard cg(m_ConnectionLock);
212 CGuard sendguard(m_SendLock);
213 CGuard recvguard(m_RecvLock);
214
215 switch (optName)
216 {
217 case UDT_MSS:
218 if (m_bOpened)
219 throw CUDTException(5, 1, 0);
220
221 if (*(int*)optval < int(28 + CHandShake::m_iContentSize))
222 throw CUDTException(5, 3, 0);
223
224 m_iMSS = *(int*)optval;
225
226 // Packet size cannot be greater than UDP buffer size
227 if (m_iMSS > m_iUDPSndBufSize)
228 m_iMSS = m_iUDPSndBufSize;
229 if (m_iMSS > m_iUDPRcvBufSize)
230 m_iMSS = m_iUDPRcvBufSize;
231
232 break;
233
234 case UDT_SNDSYN:
235 m_bSynSending = *(bool *)optval;
236 break;
237
238 case UDT_RCVSYN:
239 m_bSynRecving = *(bool *)optval;
240 break;
241
242 case UDT_CC:
243 if (m_bConnected)
244 throw CUDTException(5, 1, 0);
245 if (NULL != m_pCCFactory)
246 delete m_pCCFactory;
247 m_pCCFactory = ((CCCVirtualFactory *)optval)->clone();
248
249 break;
250
251 case UDT_FC:
252 if (m_bConnected)
253 throw CUDTException(5, 2, 0);
254
255 if (*(int*)optval < 1)
256 throw CUDTException(5, 3);
257
258 // Mimimum recv flight flag size is 32 packets
259 if (*(int*)optval > 32)
260 m_iFlightFlagSize = *(int*)optval;
261 else
262 m_iFlightFlagSize = 32;
263
264 break;
265
266 case UDT_SNDBUF:
267 if (m_bOpened)
268 throw CUDTException(5, 1, 0);
269
270 if (*(int*)optval <= 0)
271 throw CUDTException(5, 3, 0);
272
273 m_iSndBufSize = *(int*)optval / (m_iMSS - 28);
274
275 break;
276
277 case UDT_RCVBUF:
278 if (m_bOpened)
279 throw CUDTException(5, 1, 0);
280
281 if (*(int*)optval <= 0)
282 throw CUDTException(5, 3, 0);
283
284 // Mimimum recv buffer size is 32 packets
285 if (*(int*)optval > (m_iMSS - 28) * 32)
286 m_iRcvBufSize = *(int*)optval / (m_iMSS - 28);
287 else
288 m_iRcvBufSize = 32;
289
290 // recv buffer MUST not be greater than FC size
291 if (m_iRcvBufSize > m_iFlightFlagSize)
292 m_iRcvBufSize = m_iFlightFlagSize;
293
294 break;
295
296 case UDT_LINGER:
297 m_Linger = *(linger*)optval;
298 break;
299
300 case UDP_SNDBUF:
301 if (m_bOpened)
302 throw CUDTException(5, 1, 0);
303
304 m_iUDPSndBufSize = *(int*)optval;
305
306 if (m_iUDPSndBufSize < m_iMSS)
307 m_iUDPSndBufSize = m_iMSS;
308
309 break;
310
311 case UDP_RCVBUF:
312 if (m_bOpened)
313 throw CUDTException(5, 1, 0);
314
315 m_iUDPRcvBufSize = *(int*)optval;
316
317 if (m_iUDPRcvBufSize < m_iMSS)
318 m_iUDPRcvBufSize = m_iMSS;
319
320 break;
321
322 case UDT_RENDEZVOUS:
323 if (m_bConnected)
324 throw CUDTException(5, 1, 0);
325 m_bRendezvous = *(bool *)optval;
326 break;
327
328 case UDT_SNDTIMEO:
329 m_iSndTimeOut = *(int*)optval;
330 break;
331
332 case UDT_RCVTIMEO:
333 m_iRcvTimeOut = *(int*)optval;
334 break;
335
336 case UDT_REUSEADDR:
337 if (m_bOpened)
338 throw CUDTException(5, 1, 0);
339 m_bReuseAddr = *(bool*)optval;
340 break;
341
342 case UDT_MAXBW:
343 if (m_bConnected)
344 throw CUDTException(5, 1, 0);
345 m_llMaxBW = *(int64_t*)optval;
346 break;
347
348 default:
349 throw CUDTException(5, 0, 0);
350 }
351 }
352
353 void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen)
354 {
355 CGuard cg(m_ConnectionLock);
356
357 switch (optName)
358 {
359 case UDT_MSS:
360 *(int*)optval = m_iMSS;
361 optlen = sizeof(int);
362 break;
363
364 case UDT_SNDSYN:
365 *(bool*)optval = m_bSynSending;
366 optlen = sizeof(bool);
367 break;
368
369 case UDT_RCVSYN:
370 *(bool*)optval = m_bSynRecving;
371 optlen = sizeof(bool);
372 break;
373
374 case UDT_CC:
375 if (!m_bOpened)
376 throw CUDTException(5, 5, 0);
377 *(CCC**)optval = m_pCC;
378 optlen = sizeof(CCC*);
379
380 break;
381
382 case UDT_FC:
383 *(int*)optval = m_iFlightFlagSize;
384 optlen = sizeof(int);
385 break;
386
387 case UDT_SNDBUF:
388 *(int*)optval = m_iSndBufSize * (m_iMSS - 28);
389 optlen = sizeof(int);
390 break;
391
392 case UDT_RCVBUF:
393 *(int*)optval = m_iRcvBufSize * (m_iMSS - 28);
394 optlen = sizeof(int);
395 break;
396
397 case UDT_LINGER:
398 if (optlen < (int)(sizeof(linger)))
399 throw CUDTException(5, 3, 0);
400
401 *(linger*)optval = m_Linger;
402 optlen = sizeof(linger);
403 break;
404
405 case UDP_SNDBUF:
406 *(int*)optval = m_iUDPSndBufSize;
407 optlen = sizeof(int);
408 break;
409
410 case UDP_RCVBUF:
411 *(int*)optval = m_iUDPRcvBufSize;
412 optlen = sizeof(int);
413 break;
414
415 case UDT_RENDEZVOUS:
416 *(bool *)optval = m_bRendezvous;
417 optlen = sizeof(bool);
418 break;
419
420 case UDT_SNDTIMEO:
421 *(int*)optval = m_iSndTimeOut;
422 optlen = sizeof(int);
423 break;
424
425 case UDT_RCVTIMEO:
426 *(int*)optval = m_iRcvTimeOut;
427 optlen = sizeof(int);
428 break;
429
430 case UDT_REUSEADDR:
431 *(bool *)optval = m_bReuseAddr;
432 optlen = sizeof(bool);
433 break;
434
435 case UDT_MAXBW:
436 *(int64_t*)optval = m_llMaxBW;
437 break;
438
439 default:
440 throw CUDTException(5, 0, 0);
441 }
442 }
443
444 void CUDT::open()
445 {
446 CGuard cg(m_ConnectionLock);
447
448 // Initial sequence number, loss, acknowledgement, etc.
449 m_iPktSize = m_iMSS - 28;
450 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
451
452 m_iEXPCount = 1;
453 m_iBandwidth = 1;
454 m_iDeliveryRate = 16;
455 m_iAckSeqNo = 0;
456 m_ullLastAckTime = 0;
457
458 // trace information
459 m_StartTime = CTimer::getTime();
460 m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetran sTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;
461 m_LastSampleTime = CTimer::getTime();
462 m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceR etrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
463 m_llSndDuration = m_llSndDurationTotal = 0;
464
465 // structures for queue
466 if (NULL == m_pSNode)
467 m_pSNode = new CSNode;
468 m_pSNode->m_pUDT = this;
469 m_pSNode->m_llTimeStamp = 1;
470 m_pSNode->m_iHeapLoc = -1;
471
472 if (NULL == m_pRNode)
473 m_pRNode = new CRNode;
474 m_pRNode->m_pUDT = this;
475 m_pRNode->m_llTimeStamp = 1;
476 m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;
477 m_pRNode->m_bOnList = false;
478
479 m_iRTT = 10 * m_iSYNInterval;
480 m_iRTTVar = m_iRTT >> 1;
481 m_ullCPUFrequency = CTimer::getCPUFrequency();
482
483 // set up the timers
484 m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;
485
486 // set minimum NAK and EXP timeout to 100ms
487 m_ullMinNakInt = 300000 * m_ullCPUFrequency;
488 m_ullMinExpInt = 300000 * m_ullCPUFrequency;
489
490 m_ullACKInt = m_ullSYNInt;
491 m_ullNAKInt = m_ullMinNakInt;
492 m_ullEXPInt = m_ullMinExpInt;
493 m_llLastRspTime = CTimer::getTime();
494
495 CTimer::rdtsc(m_ullNextACKTime);
496 m_ullNextACKTime += m_ullSYNInt;
497 CTimer::rdtsc(m_ullNextNAKTime);
498 m_ullNextNAKTime += m_ullNAKInt;
499 CTimer::rdtsc(m_ullNextEXPTime);
500 m_ullNextEXPTime += m_ullEXPInt;
501
502 m_iPktCount = 0;
503 m_iLightACKCount = 1;
504
505 m_ullTargetTime = 0;
506 m_ullTimeDiff = 0;
507
508 // Now UDT is opened.
509 m_bOpened = true;
510 }
511
512 void CUDT::listen()
513 {
514 CGuard cg(m_ConnectionLock);
515
516 if (!m_bOpened)
517 throw CUDTException(5, 0, 0);
518
519 if (m_bConnected)
520 throw CUDTException(5, 2, 0);
521
522 // listen can be called more than once
523 if (m_bListening)
524 return;
525
526 // if there is already another socket listening on the same port
527 if (m_pRcvQueue->setListener(this) < 0)
528 throw CUDTException(5, 11, 0);
529
530 m_bListening = true;
531 }
532
533 void CUDT::connect(const sockaddr* serv_addr)
534 {
535 CGuard cg(m_ConnectionLock);
536
537 if (!m_bOpened)
538 throw CUDTException(5, 0, 0);
539
540 if (m_bListening)
541 throw CUDTException(5, 2, 0);
542
543 if (m_bConnected)
544 throw CUDTException(5, 2, 0);
545
546 // register this socket in the rendezvous queue
547 // RendezevousQueue is used to temporarily store incoming handshake, non-rend ezvous connections also require this function
548 m_pRcvQueue->m_pRendezvousQueue->insert(m_SocketID, m_iIPversion, serv_addr);
549
550 CPacket request;
551 char* reqdata = new char [m_iPayloadSize];
552 CHandShake req;
553
554 CPacket response;
555 char* resdata = new char [m_iPayloadSize];
556 CHandShake res;
557
558 // This is my current configurations
559 req.m_iVersion = m_iVersion;
560 req.m_iType = m_iSockType;
561 req.m_iMSS = m_iMSS;
562 req.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
563 req.m_iReqType = (!m_bRendezvous) ? 1 : 0;
564 req.m_iID = m_SocketID;
565 CIPAddress::ntop(serv_addr, req.m_piPeerIP, m_iIPversion);
566
567 // Random Initial Sequence Number
568 srand((unsigned int)CTimer::getTime());
569 m_iISN = req.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_ MAX));
570
571 m_iLastDecSeq = req.m_iISN - 1;
572 m_iSndLastAck = req.m_iISN;
573 m_iSndLastDataAck = req.m_iISN;
574 m_iSndCurrSeqNo = req.m_iISN - 1;
575 m_iSndLastAck2 = req.m_iISN;
576 m_ullSndLastAck2Time = CTimer::getTime();
577
578 // Inform the server my configurations.
579 request.pack(0, NULL, reqdata, m_iPayloadSize);
580 // ID = 0, connection request
581 request.m_iID = 0;
582
583 // Wait for the negotiated configurations from the peer side.
584 response.pack(0, NULL, resdata, m_iPayloadSize);
585
586 uint64_t timeo = 3000000;
587 if (m_bRendezvous)
588 timeo *= 10;
589 uint64_t entertime = CTimer::getTime();
590 uint64_t last_req_time = 0;
591
592 CUDTException e(0, 0);
593 char* tmp = NULL;
594
595 while (!m_bClosing)
596 {
597 // avoid sending too many requests, at most 1 request per 250ms
598 if (CTimer::getTime() - last_req_time > 250000)
599 {
600 req.serialize(reqdata, m_iPayloadSize);
601 request.setLength(CHandShake::m_iContentSize);
602 m_pSndQueue->sendto(serv_addr, request);
603
604 last_req_time = CTimer::getTime();
605 }
606
607 response.setLength(m_iPayloadSize);
608 if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
609 {
610 if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getTy pe())) && (NULL != tmp))
611 {
612 // a data packet or a keep-alive packet comes, which means the peer side is already connected
613 // in this situation, a previously recorded response (tmp) will be u sed
614 res.deserialize(tmp, CHandShake::m_iContentSize);
615 memcpy(m_piSelfIP, res.m_piPeerIP, 16);
616 break;
617 }
618
619 if ((1 != response.getFlag()) || (0 != response.getType()))
620 response.setLength(-1);
621 else
622 {
623 res.deserialize(response.m_pcData, response.getLength());
624
625 if (m_bRendezvous)
626 {
627 // regular connect should NOT communicate with rendezvous connect
628 // rendezvous connect require 3-way handshake
629 if (1 == res.m_iReqType)
630 response.setLength(-1);
631 else if ((0 == res.m_iReqType) || (0 == req.m_iReqType))
632 {
633 if (NULL == tmp)
634 tmp = new char [m_iPayloadSize];
635 res.serialize(tmp, m_iPayloadSize);
636
637 req.m_iReqType = -1;
638 request.m_iID = res.m_iID;
639 response.setLength(-1);
640 }
641 }
642 else
643 {
644 // set cookie
645 if (1 == res.m_iReqType)
646 {
647 req.m_iReqType = -1;
648 req.m_iCookie = res.m_iCookie;
649 response.setLength(-1);
650 }
651 }
652 }
653
654 // new request/response should be sent out immediately on receving a re sponse
655 last_req_time = 0;
656 }
657
658 if (response.getLength() > 0)
659 {
660 memcpy(m_piSelfIP, res.m_piPeerIP, 16);
661 break;
662 }
663
664 if (CTimer::getTime() > entertime + timeo)
665 {
666 // timeout
667 e = CUDTException(1, 1, 0);
668 break;
669 }
670 }
671
672 delete [] tmp;
673 delete [] reqdata;
674 delete [] resdata;
675
676 // remove from rendezvous queue
677 m_pRcvQueue->m_pRendezvousQueue->remove(m_SocketID);
678
679 if (e.getErrorCode() == 0)
680 {
681 if (m_bClosing) // if the socket is closed before connection...
682 e = CUDTException(1);
683 else if (1002 == res.m_iReqType) // connection re quest rejected
684 e = CUDTException(1, 2, 0);
685 else if ((!m_bRendezvous) && (m_iISN != res.m_iISN)) // secuity check
686 e = CUDTException(1, 4, 0);
687 }
688
689 if (e.getErrorCode() != 0)
690 {
691 throw e;
692 }
693
694 // Got it. Re-configure according to the negotiated values.
695 m_iMSS = res.m_iMSS;
696 m_iFlowWindowSize = res.m_iFlightFlagSize;
697 m_iPktSize = m_iMSS - 28;
698 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
699 m_iPeerISN = res.m_iISN;
700 m_iRcvLastAck = res.m_iISN;
701 m_iRcvLastAckAck = res.m_iISN;
702 m_iRcvCurrSeqNo = res.m_iISN - 1;
703 m_PeerID = res.m_iID;
704
705 // Prepare all data structures
706 try
707 {
708 m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
709 m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
710 // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.
711 m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
712 m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
713 m_pACKWindow = new CACKWindow(1024);
714 m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
715 m_pSndTimeWindow = new CPktTimeWindow();
716 }
717 catch (...)
718 {
719 throw CUDTException(3, 2, 0);
720 }
721
722 m_pCC = m_pCCFactory->create();
723 m_pCC->m_UDT = m_SocketID;
724 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
725 m_dCongestionWindow = m_pCC->m_dCWndSize;
726
727 CInfoBlock ib;
728 ib.m_iIPversion = m_iIPversion;
729 CInfoBlock::convert(serv_addr, m_iIPversion, ib.m_piIP);
730 if (m_pCache->lookup(&ib) >= 0)
731 {
732 m_iRTT = ib.m_iRTT;
733 m_iBandwidth = ib.m_iBandwidth;
734 }
735
736 m_pCC->setMSS(m_iMSS);
737 m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize);
738 m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo);
739 m_pCC->setRcvRate(m_iDeliveryRate);
740 m_pCC->setRTT(m_iRTT);
741 m_pCC->setBandwidth(m_iBandwidth);
742 if (m_llMaxBW > 0) m_pCC->setUserParam((char*)&(m_llMaxBW), 8);
743 m_pCC->init();
744
745 m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (socka ddr*)new sockaddr_in6;
746 memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in ) : sizeof(sockaddr_in6));
747
748 // And, I am connected too.
749 m_bConnected = true;
750
751 // register this socket for receiving data packets
752 m_pRNode->m_bOnList = true;
753 m_pRcvQueue->setNewEntry(this);
754
755 // acknowledde any waiting epolls to read/write
756 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID);
757 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID);
758 }
759
760 void CUDT::connect(const sockaddr* peer, CHandShake* hs)
761 {
762 CGuard cg(m_ConnectionLock);
763
764 // Uses the smaller MSS between the peers
765 if (hs->m_iMSS > m_iMSS)
766 hs->m_iMSS = m_iMSS;
767 else
768 m_iMSS = hs->m_iMSS;
769
770 // exchange info for maximum flow window size
771 m_iFlowWindowSize = hs->m_iFlightFlagSize;
772 hs->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
773
774 m_iPeerISN = hs->m_iISN;
775
776 m_iRcvLastAck = hs->m_iISN;
777 m_iRcvLastAckAck = hs->m_iISN;
778 m_iRcvCurrSeqNo = hs->m_iISN - 1;
779
780 m_PeerID = hs->m_iID;
781 hs->m_iID = m_SocketID;
782
783 // use peer's ISN and send it back for security check
784 m_iISN = hs->m_iISN;
785
786 m_iLastDecSeq = m_iISN - 1;
787 m_iSndLastAck = m_iISN;
788 m_iSndLastDataAck = m_iISN;
789 m_iSndCurrSeqNo = m_iISN - 1;
790 m_iSndLastAck2 = m_iISN;
791 m_ullSndLastAck2Time = CTimer::getTime();
792
793 // this is a reponse handshake
794 hs->m_iReqType = -1;
795
796 // get local IP address and send the peer its IP address (because UDP cannot get local IP address)
797 memcpy(m_piSelfIP, hs->m_piPeerIP, 16);
798 CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion);
799
800 m_iPktSize = m_iMSS - 28;
801 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
802
803 // Prepare all structures
804 try
805 {
806 m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
807 m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
808 m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
809 m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
810 m_pACKWindow = new CACKWindow(1024);
811 m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
812 m_pSndTimeWindow = new CPktTimeWindow();
813 }
814 catch (...)
815 {
816 throw CUDTException(3, 2, 0);
817 }
818
819 m_pCC = m_pCCFactory->create();
820 m_pCC->m_UDT = m_SocketID;
821 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
822 m_dCongestionWindow = m_pCC->m_dCWndSize;
823
824 CInfoBlock ib;
825 ib.m_iIPversion = m_iIPversion;
826 CInfoBlock::convert(peer, m_iIPversion, ib.m_piIP);
827 if (m_pCache->lookup(&ib) >= 0)
828 {
829 m_iRTT = ib.m_iRTT;
830 m_iBandwidth = ib.m_iBandwidth;
831 }
832
833 m_pCC->setMSS(m_iMSS);
834 m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize);
835 m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo);
836 m_pCC->setRcvRate(m_iDeliveryRate);
837 m_pCC->setRTT(m_iRTT);
838 m_pCC->setBandwidth(m_iBandwidth);
839 if (m_llMaxBW > 0) m_pCC->setUserParam((char*)&(m_llMaxBW), 8);
840 m_pCC->init();
841
842 m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (socka ddr*)new sockaddr_in6;
843 memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : s izeof(sockaddr_in6));
844
845 // And of course, it is connected.
846 m_bConnected = true;
847
848 // register this socket for receiving data packets
849 m_pRNode->m_bOnList = true;
850 m_pRcvQueue->setNewEntry(this);
851
852 //send the response to the peer, see listen() for more discussions about this
853 CPacket response;
854 int size = CHandShake::m_iContentSize;
855 char* buffer = new char[size];
856 hs->serialize(buffer, size);
857 response.pack(0, NULL, buffer, size);
858 response.m_iID = m_PeerID;
859 m_pSndQueue->sendto(peer, response);
860 delete [] buffer;
861 }
862
863 void CUDT::close()
864 {
865 if (!m_bOpened)
866 return;
867
868 if (0 != m_Linger.l_onoff)
869 {
870 uint64_t entertime = CTimer::getTime();
871
872 while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
873 {
874 // linger has been checked by previous close() call and has expired
875 if (m_ullLingerExpiration >= entertime)
876 break;
877
878 if (!m_bSynSending)
879 {
880 // if this socket enables asynchronous sending, return immediately a nd let GC to close it later
881 if (0 == m_ullLingerExpiration)
882 m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000UL L;
883
884 return;
885 }
886
887 #ifndef WIN32
888 timespec ts;
889 ts.tv_sec = 0;
890 ts.tv_nsec = 1000000;
891 nanosleep(&ts, NULL);
892 #else
893 Sleep(1);
894 #endif
895 }
896 }
897
898 // remove this socket from the snd queue
899 if (m_bConnected)
900 m_pSndQueue->m_pSndUList->remove(this);
901
902 // remove itself from all epoll monitoring
903 for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
904 s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
905
906 CGuard cg(m_ConnectionLock);
907
908 if (!m_bOpened)
909 return;
910
911 // Inform the threads handler to stop.
912 m_bClosing = true;
913
914 // Signal the sender and recver if they are waiting for data.
915 releaseSynch();
916
917 if (m_bListening)
918 {
919 m_bListening = false;
920 m_pRcvQueue->removeListener(this);
921 }
922 if (m_bConnected)
923 {
924 if (!m_bShutdown)
925 sendCtrl(5);
926
927 m_pCC->close();
928
929 CInfoBlock ib;
930 ib.m_iIPversion = m_iIPversion;
931 CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
932 ib.m_iRTT = m_iRTT;
933 ib.m_iBandwidth = m_iBandwidth;
934 m_pCache->update(&ib);
935
936 m_bConnected = false;
937 }
938
939 // waiting all send and recv calls to stop
940 CGuard sendguard(m_SendLock);
941 CGuard recvguard(m_RecvLock);
942
943 // CLOSED.
944 m_bOpened = false;
945 }
946
947 int CUDT::send(const char* data, const int& len)
948 {
949 if (UDT_DGRAM == m_iSockType)
950 throw CUDTException(5, 10, 0);
951
952 // throw an exception if not connected
953 if (m_bBroken || m_bClosing)
954 throw CUDTException(2, 1, 0);
955 else if (!m_bConnected)
956 throw CUDTException(2, 2, 0);
957
958 if (len <= 0)
959 return 0;
960
961 CGuard sendguard(m_SendLock);
962
963 if (m_pSndBuffer->getCurrBufSize() == 0)
964 {
965 // delay the EXP timer to avoid mis-fired timeout
966 uint64_t currtime;
967 CTimer::rdtsc(currtime);
968 m_ullNextEXPTime = currtime + m_ullEXPInt;
969 }
970
971 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
972 {
973 if (!m_bSynSending)
974 throw CUDTException(6, 1, 0);
975 else
976 {
977 // wait here during a blocking sending
978 #ifndef WIN32
979 pthread_mutex_lock(&m_SendBlockLock);
980 if (m_iSndTimeOut < 0)
981 {
982 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
983 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
984 }
985 else
986 {
987 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
988 timespec locktime;
989
990 locktime.tv_sec = exptime / 1000000;
991 locktime.tv_nsec = (exptime % 1000000) * 1000;
992
993 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < ex ptime))
994 pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &lo cktime);
995 }
996 pthread_mutex_unlock(&m_SendBlockLock);
997 #else
998 if (m_iSndTimeOut < 0)
999 {
1000 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
1001 WaitForSingleObject(m_SendBlockCond, INFINITE);
1002 }
1003 else
1004 {
1005 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
1006
1007 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < ex ptime))
1008 WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer:: getTime()) / 1000));
1009 }
1010 #endif
1011
1012 // check the connection status
1013 if (m_bBroken || m_bClosing)
1014 throw CUDTException(2, 1, 0);
1015 else if (!m_bConnected)
1016 throw CUDTException(2, 2, 0);
1017 else if (!m_bPeerHealth)
1018 {
1019 m_bPeerHealth = true;
1020 throw CUDTException(7);
1021 }
1022 }
1023 }
1024
1025 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
1026 {
1027 if (m_iSndTimeOut >= 0)
1028 throw CUDTException(6, 1, 0);
1029
1030 return 0;
1031 }
1032
1033 int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize;
1034 if (size > len)
1035 size = len;
1036
1037 // record total time used for sending
1038 if (0 == m_pSndBuffer->getCurrBufSize())
1039 m_llSndDurationCounter = CTimer::getTime();
1040
1041 // insert the user buffer into the sening list
1042 m_pSndBuffer->addBuffer(data, size);
1043
1044 // insert this socket to snd list if it is not on the list yet
1045 m_pSndQueue->m_pSndUList->update(this, false);
1046
1047 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
1048 {
1049 // write is not available any more
1050 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID);
1051 }
1052
1053 return size;
1054 }
1055
1056 int CUDT::recv(char* data, const int& len)
1057 {
1058 if (UDT_DGRAM == m_iSockType)
1059 throw CUDTException(5, 10, 0);
1060
1061 // throw an exception if not connected
1062 if (!m_bConnected)
1063 throw CUDTException(2, 2, 0);
1064 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
1065 throw CUDTException(2, 1, 0);
1066
1067 if (len <= 0)
1068 return 0;
1069
1070 CGuard recvguard(m_RecvLock);
1071
1072 if (0 == m_pRcvBuffer->getRcvDataSize())
1073 {
1074 if (!m_bSynRecving)
1075 throw CUDTException(6, 2, 0);
1076 else
1077 {
1078 #ifndef WIN32
1079 pthread_mutex_lock(&m_RecvDataLock);
1080 if (m_iRcvTimeOut < 0)
1081 {
1082 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB uffer->getRcvDataSize()))
1083 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
1084 }
1085 else
1086 {
1087 uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
1088 timespec locktime;
1089
1090 locktime.tv_sec = exptime / 1000000;
1091 locktime.tv_nsec = (exptime % 1000000) * 1000;
1092
1093 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB uffer->getRcvDataSize()))
1094 {
1095 pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &lock time);
1096 if (CTimer::getTime() >= exptime)
1097 break;
1098 }
1099 }
1100 pthread_mutex_unlock(&m_RecvDataLock);
1101 #else
1102 if (m_iRcvTimeOut < 0)
1103 {
1104 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB uffer->getRcvDataSize()))
1105 WaitForSingleObject(m_RecvDataCond, INFINITE);
1106 }
1107 else
1108 {
1109 uint64_t enter_time = CTimer::getTime();
1110
1111 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB uffer->getRcvDataSize()))
1112 {
1113 int diff = int(CTimer::getTime() - enter_time) / 1000;
1114 if (diff >= m_iRcvTimeOut)
1115 break;
1116 WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut - diff ));
1117 }
1118 }
1119 #endif
1120 }
1121 }
1122
1123 // throw an exception if not connected
1124 if (!m_bConnected)
1125 throw CUDTException(2, 2, 0);
1126 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
1127 throw CUDTException(2, 1, 0);
1128
1129 int res = m_pRcvBuffer->readBuffer(data, len);
1130
1131 if (m_pRcvBuffer->getRcvDataSize() <= 0)
1132 {
1133 // read is not available any more
1134 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID);
1135 }
1136
1137 if ((res <= 0) && (m_iRcvTimeOut >= 0))
1138 throw CUDTException(6, 2, 0);
1139
1140 return res;
1141 }
1142
1143 int CUDT::sendmsg(const char* data, const int& len, const int& msttl, const bool & inorder)
1144 {
1145 if (UDT_STREAM == m_iSockType)
1146 throw CUDTException(5, 9, 0);
1147
1148 // throw an exception if not connected
1149 if (m_bBroken || m_bClosing)
1150 throw CUDTException(2, 1, 0);
1151 else if (!m_bConnected)
1152 throw CUDTException(2, 2, 0);
1153
1154 if (len <= 0)
1155 return 0;
1156
1157 if (len > m_iSndBufSize * m_iPayloadSize)
1158 throw CUDTException(5, 12, 0);
1159
1160 CGuard sendguard(m_SendLock);
1161
1162 if (m_pSndBuffer->getCurrBufSize() == 0)
1163 {
1164 // delay the EXP timer to avoid mis-fired timeout
1165 uint64_t currtime;
1166 CTimer::rdtsc(currtime);
1167 m_ullNextEXPTime = currtime + m_ullEXPInt;
1168 }
1169
1170 if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
1171 {
1172 if (!m_bSynSending)
1173 throw CUDTException(6, 1, 0);
1174 else
1175 {
1176 // wait here during a blocking sending
1177 #ifndef WIN32
1178 pthread_mutex_lock(&m_SendBlockLock);
1179 if (m_iSndTimeOut < 0)
1180 {
1181 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
1182 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
1183 }
1184 else
1185 {
1186 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
1187 timespec locktime;
1188
1189 locktime.tv_sec = exptime / 1000000;
1190 locktime.tv_nsec = (exptime % 1000000) * 1000;
1191
1192 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime () < exptime))
1193 pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &lo cktime);
1194 }
1195 pthread_mutex_unlock(&m_SendBlockLock);
1196 #else
1197 if (m_iSndTimeOut < 0)
1198 {
1199 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
1200 WaitForSingleObject(m_SendBlockCond, INFINITE);
1201 }
1202 else
1203 {
1204 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
1205
1206 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime () < exptime))
1207 WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer:: getTime()) / 1000));
1208 }
1209 #endif
1210
1211 // check the connection status
1212 if (m_bBroken || m_bClosing)
1213 throw CUDTException(2, 1, 0);
1214 else if (!m_bConnected)
1215 throw CUDTException(2, 2, 0);
1216 }
1217 }
1218
1219 if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
1220 {
1221 if (m_iSndTimeOut >= 0)
1222 throw CUDTException(6, 1, 0);
1223
1224 return 0;
1225 }
1226
1227 // record total time used for sending
1228 if (0 == m_pSndBuffer->getCurrBufSize())
1229 m_llSndDurationCounter = CTimer::getTime();
1230
1231 // insert the user buffer into the sening list
1232 m_pSndBuffer->addBuffer(data, len, msttl, inorder);
1233
1234 // insert this socket to the snd list if it is not on the list yet
1235 m_pSndQueue->m_pSndUList->update(this, false);
1236
1237 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
1238 {
1239 // write is not available any more
1240 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID);
1241 }
1242
1243 return len;
1244 }
1245
1246 int CUDT::recvmsg(char* data, const int& len)
1247 {
1248 if (UDT_STREAM == m_iSockType)
1249 throw CUDTException(5, 9, 0);
1250
1251 // throw an exception if not connected
1252 if (!m_bConnected)
1253 throw CUDTException(2, 2, 0);
1254
1255 if (len <= 0)
1256 return 0;
1257
1258 CGuard recvguard(m_RecvLock);
1259
1260 if (m_bBroken || m_bClosing)
1261 {
1262 int res = m_pRcvBuffer->readMsg(data, len);
1263 if (0 == res)
1264 {
1265 // read is not available
1266 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID);
1267
1268 throw CUDTException(2, 1, 0);
1269 }
1270 else
1271 return res;
1272 }
1273
1274 if (!m_bSynRecving)
1275 {
1276 int res = m_pRcvBuffer->readMsg(data, len);
1277 if (0 == res)
1278 throw CUDTException(6, 2, 0);
1279 else
1280 return res;
1281 }
1282
1283 int res = 0;
1284 bool timeout = false;
1285
1286 do
1287 {
1288 #ifndef WIN32
1289 pthread_mutex_lock(&m_RecvDataLock);
1290
1291 if (m_iRcvTimeOut < 0)
1292 {
1293 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_p RcvBuffer->readMsg(data, len))))
1294 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
1295 }
1296 else
1297 {
1298 uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
1299 timespec locktime;
1300
1301 locktime.tv_sec = exptime / 1000000;
1302 locktime.tv_nsec = (exptime % 1000000) * 1000;
1303
1304 if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &lockti me) == ETIMEDOUT)
1305 timeout = true;
1306
1307 res = m_pRcvBuffer->readMsg(data, len);
1308 }
1309 pthread_mutex_unlock(&m_RecvDataLock);
1310 #else
1311 if (m_iRcvTimeOut < 0)
1312 {
1313 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_p RcvBuffer->readMsg(data, len))))
1314 WaitForSingleObject(m_RecvDataCond, INFINITE);
1315 }
1316 else
1317 {
1318 if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAI T_TIMEOUT)
1319 timeout = true;
1320
1321 res = m_pRcvBuffer->readMsg(data, len);
1322 }
1323 #endif
1324
1325 if (m_bBroken || m_bClosing)
1326 throw CUDTException(2, 1, 0);
1327 else if (!m_bConnected)
1328 throw CUDTException(2, 2, 0);
1329 } while ((0 == res) && !timeout);
1330
1331 if (m_pRcvBuffer->getRcvMsgNum() <= 0)
1332 {
1333 // read is not available any more
1334 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID);
1335 }
1336
1337 if ((res <= 0) && (m_iRcvTimeOut >= 0))
1338 throw CUDTException(6, 2, 0);
1339
1340 return res;
1341 }
1342
1343 int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, const int64_t& size, const int& block)
1344 {
1345 if (UDT_DGRAM == m_iSockType)
1346 throw CUDTException(5, 10, 0);
1347
1348 if (m_bBroken || m_bClosing)
1349 throw CUDTException(2, 1, 0);
1350 else if (!m_bConnected)
1351 throw CUDTException(2, 2, 0);
1352
1353 if (size <= 0)
1354 return 0;
1355
1356 CGuard sendguard(m_SendLock);
1357
1358 if (m_pSndBuffer->getCurrBufSize() == 0)
1359 {
1360 // delay the EXP timer to avoid mis-fired timeout
1361 uint64_t currtime;
1362 CTimer::rdtsc(currtime);
1363 m_ullNextEXPTime = currtime + m_ullEXPInt;
1364 }
1365
1366 int64_t tosend = size;
1367 int unitsize;
1368
1369 // positioning...
1370 try
1371 {
1372 ifs.seekg((streamoff)offset);
1373 }
1374 catch (...)
1375 {
1376 throw CUDTException(4, 1);
1377 }
1378
1379 // sending block by block
1380 while (tosend > 0)
1381 {
1382 if (ifs.fail())
1383 throw CUDTException(4, 4);
1384
1385 if (ifs.eof())
1386 break;
1387
1388 unitsize = int((tosend >= block) ? block : tosend);
1389
1390 #ifndef WIN32
1391 pthread_mutex_lock(&m_SendBlockLock);
1392 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m _pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
1393 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
1394 pthread_mutex_unlock(&m_SendBlockLock);
1395 #else
1396 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m _pSndBuffer->getCurrBufSize()) && m_bPeerHealth)
1397 WaitForSingleObject(m_SendBlockCond, INFINITE);
1398 #endif
1399
1400 if (m_bBroken || m_bClosing)
1401 throw CUDTException(2, 1, 0);
1402 else if (!m_bConnected)
1403 throw CUDTException(2, 2, 0);
1404 else if (!m_bPeerHealth)
1405 {
1406 // reset peer health status, once this error returns, the app should ha ndle the situation at the peer side
1407 m_bPeerHealth = true;
1408 throw CUDTException(7);
1409 }
1410
1411 // record total time used for sending
1412 if (0 == m_pSndBuffer->getCurrBufSize())
1413 m_llSndDurationCounter = CTimer::getTime();
1414
1415 int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize);
1416
1417 if (sentsize > 0)
1418 {
1419 tosend -= sentsize;
1420 offset += sentsize;
1421 }
1422
1423 // insert this socket to snd list if it is not on the list yet
1424 m_pSndQueue->m_pSndUList->update(this, false);
1425 }
1426
1427 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
1428 {
1429 // write is not available any more
1430 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID);
1431 }
1432
1433 return size - tosend;
1434 }
1435
1436 int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, const int64_t& size, const int& block)
1437 {
1438 if (UDT_DGRAM == m_iSockType)
1439 throw CUDTException(5, 10, 0);
1440
1441 if (!m_bConnected)
1442 throw CUDTException(2, 2, 0);
1443 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()))
1444 throw CUDTException(2, 1, 0);
1445
1446 if (size <= 0)
1447 return 0;
1448
1449 CGuard recvguard(m_RecvLock);
1450
1451 int64_t torecv = size;
1452 int unitsize = block;
1453 int recvsize;
1454
1455 // positioning...
1456 try
1457 {
1458 ofs.seekp((streamoff)offset);
1459 }
1460 catch (...)
1461 {
1462 throw CUDTException(4, 3);
1463 }
1464
1465 // receiving... "recvfile" is always blocking
1466 while (torecv > 0)
1467 {
1468 if (ofs.fail())
1469 {
1470 // send the sender a signal so it will not be blocked forever
1471 int32_t err_code = CUDTException::EFILE;
1472 sendCtrl(8, &err_code);
1473
1474 throw CUDTException(4, 4);
1475 }
1476
1477 #ifndef WIN32
1478 pthread_mutex_lock(&m_RecvDataLock);
1479 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer- >getRcvDataSize()))
1480 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
1481 pthread_mutex_unlock(&m_RecvDataLock);
1482 #else
1483 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer- >getRcvDataSize()))
1484 WaitForSingleObject(m_RecvDataCond, INFINITE);
1485 #endif
1486
1487 if (!m_bConnected)
1488 throw CUDTException(2, 2, 0);
1489 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize() ))
1490 throw CUDTException(2, 1, 0);
1491
1492 unitsize = int((torecv >= block) ? block : torecv);
1493 recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize);
1494
1495 if (recvsize > 0)
1496 {
1497 torecv -= recvsize;
1498 offset += recvsize;
1499 }
1500 }
1501
1502 if (m_pRcvBuffer->getRcvDataSize() <= 0)
1503 {
1504 // read is not available any more
1505 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID);
1506 }
1507
1508 return size - torecv;
1509 }
1510
1511 void CUDT::sample(CPerfMon* perf, bool clear)
1512 {
1513 if (!m_bConnected)
1514 throw CUDTException(2, 2, 0);
1515 if (m_bBroken || m_bClosing)
1516 throw CUDTException(2, 1, 0);
1517
1518 uint64_t currtime = CTimer::getTime();
1519 perf->msTimeStamp = (currtime - m_StartTime) / 1000;
1520
1521 perf->pktSent = m_llTraceSent;
1522 perf->pktRecv = m_llTraceRecv;
1523 perf->pktSndLoss = m_iTraceSndLoss;
1524 perf->pktRcvLoss = m_iTraceRcvLoss;
1525 perf->pktRetrans = m_iTraceRetrans;
1526 perf->pktSentACK = m_iSentACK;
1527 perf->pktRecvACK = m_iRecvACK;
1528 perf->pktSentNAK = m_iSentNAK;
1529 perf->pktRecvNAK = m_iRecvNAK;
1530 perf->usSndDuration = m_llSndDuration;
1531
1532 perf->pktSentTotal = m_llSentTotal;
1533 perf->pktRecvTotal = m_llRecvTotal;
1534 perf->pktSndLossTotal = m_iSndLossTotal;
1535 perf->pktRcvLossTotal = m_iRcvLossTotal;
1536 perf->pktRetransTotal = m_iRetransTotal;
1537 perf->pktSentACKTotal = m_iSentACKTotal;
1538 perf->pktRecvACKTotal = m_iRecvACKTotal;
1539 perf->pktSentNAKTotal = m_iSentNAKTotal;
1540 perf->pktRecvNAKTotal = m_iRecvNAKTotal;
1541 perf->usSndDurationTotal = m_llSndDurationTotal;
1542
1543 double interval = double(currtime - m_LastSampleTime);
1544
1545 perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval;
1546 perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval;
1547
1548 perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency);
1549 perf->pktFlowWindow = m_iFlowWindowSize;
1550 perf->pktCongestionWindow = (int)m_dCongestionWindow;
1551 perf->pktFlightSize = CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSe qNo::incseq(m_iSndCurrSeqNo)) - 1;
1552 perf->msRTT = m_iRTT/1000.0;
1553 perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0;
1554
1555 #ifndef WIN32
1556 if (0 == pthread_mutex_trylock(&m_ConnectionLock))
1557 #else
1558 if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0))
1559 #endif
1560 {
1561 perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : (m_iSndBufSize - m_pS ndBuffer->getCurrBufSize()) * m_iMSS;
1562 perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvai lBufSize() * m_iMSS;
1563
1564 #ifndef WIN32
1565 pthread_mutex_unlock(&m_ConnectionLock);
1566 #else
1567 ReleaseMutex(m_ConnectionLock);
1568 #endif
1569 }
1570 else
1571 {
1572 perf->byteAvailSndBuf = 0;
1573 perf->byteAvailRcvBuf = 0;
1574 }
1575
1576 if (clear)
1577 {
1578 m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTra ceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
1579 m_llSndDuration = 0;
1580 m_LastSampleTime = currtime;
1581 }
1582 }
1583
1584 void CUDT::initSynch()
1585 {
1586 #ifndef WIN32
1587 pthread_mutex_init(&m_SendBlockLock, NULL);
1588 pthread_cond_init(&m_SendBlockCond, NULL);
1589 pthread_mutex_init(&m_RecvDataLock, NULL);
1590 pthread_cond_init(&m_RecvDataCond, NULL);
1591 pthread_mutex_init(&m_SendLock, NULL);
1592 pthread_mutex_init(&m_RecvLock, NULL);
1593 pthread_mutex_init(&m_AckLock, NULL);
1594 pthread_mutex_init(&m_ConnectionLock, NULL);
1595 #else
1596 m_SendBlockLock = CreateMutex(NULL, false, NULL);
1597 m_SendBlockCond = CreateEvent(NULL, false, false, NULL);
1598 m_RecvDataLock = CreateMutex(NULL, false, NULL);
1599 m_RecvDataCond = CreateEvent(NULL, false, false, NULL);
1600 m_SendLock = CreateMutex(NULL, false, NULL);
1601 m_RecvLock = CreateMutex(NULL, false, NULL);
1602 m_AckLock = CreateMutex(NULL, false, NULL);
1603 m_ConnectionLock = CreateMutex(NULL, false, NULL);
1604 #endif
1605 }
1606
1607 void CUDT::destroySynch()
1608 {
1609 #ifndef WIN32
1610 pthread_mutex_destroy(&m_SendBlockLock);
1611 pthread_cond_destroy(&m_SendBlockCond);
1612 pthread_mutex_destroy(&m_RecvDataLock);
1613 pthread_cond_destroy(&m_RecvDataCond);
1614 pthread_mutex_destroy(&m_SendLock);
1615 pthread_mutex_destroy(&m_RecvLock);
1616 pthread_mutex_destroy(&m_AckLock);
1617 pthread_mutex_destroy(&m_ConnectionLock);
1618 #else
1619 CloseHandle(m_SendBlockLock);
1620 CloseHandle(m_SendBlockCond);
1621 CloseHandle(m_RecvDataLock);
1622 CloseHandle(m_RecvDataCond);
1623 CloseHandle(m_SendLock);
1624 CloseHandle(m_RecvLock);
1625 CloseHandle(m_AckLock);
1626 CloseHandle(m_ConnectionLock);
1627 #endif
1628 }
1629
1630 void CUDT::releaseSynch()
1631 {
1632 #ifndef WIN32
1633 // wake up user calls
1634 pthread_mutex_lock(&m_SendBlockLock);
1635 pthread_cond_signal(&m_SendBlockCond);
1636 pthread_mutex_unlock(&m_SendBlockLock);
1637
1638 pthread_mutex_lock(&m_SendLock);
1639 pthread_mutex_unlock(&m_SendLock);
1640
1641 pthread_mutex_lock(&m_RecvDataLock);
1642 pthread_cond_signal(&m_RecvDataCond);
1643 pthread_mutex_unlock(&m_RecvDataLock);
1644
1645 pthread_mutex_lock(&m_RecvLock);
1646 pthread_mutex_unlock(&m_RecvLock);
1647 #else
1648 SetEvent(m_SendBlockCond);
1649 WaitForSingleObject(m_SendLock, INFINITE);
1650 ReleaseMutex(m_SendLock);
1651 SetEvent(m_RecvDataCond);
1652 WaitForSingleObject(m_RecvLock, INFINITE);
1653 ReleaseMutex(m_RecvLock);
1654 #endif
1655 }
1656
1657 void CUDT::sendCtrl(const int& pkttype, void* lparam, void* rparam, const int& s ize)
1658 {
1659 CPacket ctrlpkt;
1660
1661 switch (pkttype)
1662 {
1663 case 2: //010 - Acknowledgement
1664 {
1665 int32_t ack;
1666
1667 // If there is no loss, the ACK is the current largest sequence number plu s 1;
1668 // Otherwise it is the smallest sequence number in the receiver loss list.
1669 if (0 == m_pRcvLossList->getLossLength())
1670 ack = CSeqNo::incseq(m_iRcvCurrSeqNo);
1671 else
1672 ack = m_pRcvLossList->getFirstLostSeq();
1673
1674 if (ack == m_iRcvLastAckAck)
1675 break;
1676
1677 // send out a lite ACK
1678 // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number
1679 if (4 == size)
1680 {
1681 ctrlpkt.pack(pkttype, NULL, &ack, size);
1682 ctrlpkt.m_iID = m_PeerID;
1683 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1684
1685 break;
1686 }
1687
1688 uint64_t currtime;
1689 CTimer::rdtsc(currtime);
1690
1691 // There are new received packets to acknowledge, update related informati on.
1692 if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)
1693 {
1694 int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack);
1695
1696 m_iRcvLastAck = ack;
1697
1698 m_pRcvBuffer->ackData(acksize);
1699
1700 // signal a waiting "recv" call if there is any data available
1701 #ifndef WIN32
1702 pthread_mutex_lock(&m_RecvDataLock);
1703 if (m_bSynRecving)
1704 pthread_cond_signal(&m_RecvDataCond);
1705 pthread_mutex_unlock(&m_RecvDataLock);
1706 #else
1707 if (m_bSynRecving)
1708 SetEvent(m_RecvDataCond);
1709 #endif
1710
1711 // acknowledge any waiting epolls to read
1712 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID);
1713 }
1714 else if (ack == m_iRcvLastAck)
1715 {
1716 if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCP UFrequency))
1717 break;
1718 }
1719 else
1720 break;
1721
1722 // Send out the ACK only if has not been received by the sender before
1723 if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0)
1724 {
1725 int32_t data[6];
1726
1727 m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo);
1728 data[0] = m_iRcvLastAck;
1729 data[1] = m_iRTT;
1730 data[2] = m_iRTTVar;
1731 data[3] = m_pRcvBuffer->getAvailBufSize();
1732 // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock
1733 if (data[3] < 2)
1734 data[3] = 2;
1735
1736 if (currtime - m_ullLastAckTime > m_ullSYNInt)
1737 {
1738 data[4] = m_pRcvTimeWindow->getPktRcvSpeed();
1739 data[5] = m_pRcvTimeWindow->getBandwidth();
1740 ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 24);
1741
1742 CTimer::rdtsc(m_ullLastAckTime);
1743 }
1744 else
1745 {
1746 ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 16);
1747 }
1748
1749 ctrlpkt.m_iID = m_PeerID;
1750 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1751
1752 m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck);
1753
1754 ++ m_iSentACK;
1755 ++ m_iSentACKTotal;
1756 }
1757
1758 break;
1759 }
1760
1761 case 6: //110 - Acknowledgement of Acknowledgement
1762 ctrlpkt.pack(pkttype, lparam);
1763 ctrlpkt.m_iID = m_PeerID;
1764 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1765
1766 break;
1767
1768 case 3: //011 - Loss Report
1769 {
1770 if (NULL != rparam)
1771 {
1772 if (1 == size)
1773 {
1774 // only 1 loss packet
1775 ctrlpkt.pack(pkttype, NULL, (int32_t *)rparam + 1, 4);
1776 }
1777 else
1778 {
1779 // more than 1 loss packets
1780 ctrlpkt.pack(pkttype, NULL, rparam, 8);
1781 }
1782
1783 ctrlpkt.m_iID = m_PeerID;
1784 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1785
1786 ++ m_iSentNAK;
1787 ++ m_iSentNAKTotal;
1788 }
1789 else if (m_pRcvLossList->getLossLength() > 0)
1790 {
1791 // this is periodically NAK report; make sure NAK cannot be sent back t oo often
1792
1793 // read loss list from the local receiver loss list
1794 int32_t* data = new int32_t[m_iPayloadSize / 4];
1795 int losslen;
1796 m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4);
1797
1798 if (0 < losslen)
1799 {
1800 ctrlpkt.pack(pkttype, NULL, data, losslen * 4);
1801 ctrlpkt.m_iID = m_PeerID;
1802 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1803
1804 ++ m_iSentNAK;
1805 ++ m_iSentNAKTotal;
1806 }
1807
1808 delete [] data;
1809 }
1810
1811 // update next NAK time, which should wait enough time for the retansmissi on, but not too long
1812 m_ullNAKInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency;
1813 int rcv_speed = m_pRcvTimeWindow->getPktRcvSpeed();
1814 if (rcv_speed > 0)
1815 m_ullNAKInt += (m_pRcvLossList->getLossLength() * 1000000ULL / rcv_spee d) * m_ullCPUFrequency;
1816 if (m_ullNAKInt < m_ullMinNakInt)
1817 m_ullNAKInt = m_ullMinNakInt;
1818
1819 break;
1820 }
1821
1822 case 4: //100 - Congestion Warning
1823 ctrlpkt.pack(pkttype);
1824 ctrlpkt.m_iID = m_PeerID;
1825 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1826
1827 CTimer::rdtsc(m_ullLastWarningTime);
1828
1829 break;
1830
1831 case 1: //001 - Keep-alive
1832 ctrlpkt.pack(pkttype);
1833 ctrlpkt.m_iID = m_PeerID;
1834 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1835
1836 break;
1837
1838 case 0: //000 - Handshake
1839 ctrlpkt.pack(pkttype, NULL, rparam, sizeof(CHandShake));
1840 ctrlpkt.m_iID = m_PeerID;
1841 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1842
1843 break;
1844
1845 case 5: //101 - Shutdown
1846 ctrlpkt.pack(pkttype);
1847 ctrlpkt.m_iID = m_PeerID;
1848 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1849
1850 break;
1851
1852 case 7: //111 - Msg drop request
1853 ctrlpkt.pack(pkttype, lparam, rparam, 8);
1854 ctrlpkt.m_iID = m_PeerID;
1855 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1856
1857 break;
1858
1859 case 8: //1000 - acknowledge the peer side a special error
1860 ctrlpkt.pack(pkttype, lparam);
1861 ctrlpkt.m_iID = m_PeerID;
1862 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
1863
1864 break;
1865
1866 case 32767: //0x7FFF - Resevered for future use
1867 break;
1868
1869 default:
1870 break;
1871 }
1872 }
1873
1874 void CUDT::processCtrl(CPacket& ctrlpkt)
1875 {
1876 // Just heard from the peer, reset the expiration count.
1877 m_iEXPCount = 1;
1878 m_llLastRspTime = CTimer::getTime();
1879
1880 if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getTy pe()) || (3 == ctrlpkt.getType()))
1881 {
1882 uint64_t currtime;
1883 CTimer::rdtsc(currtime);
1884 if (!m_pCC->m_bUserDefinedRTO)
1885 m_ullNextEXPTime = currtime + m_ullEXPInt;
1886 else
1887 m_ullNextEXPTime = currtime + m_pCC->m_iRTO * m_ullCPUFrequency;
1888 }
1889
1890 switch (ctrlpkt.getType())
1891 {
1892 case 2: //010 - Acknowledgement
1893 {
1894 int32_t ack;
1895
1896 // process a lite ACK
1897 if (4 == ctrlpkt.getLength())
1898 {
1899 ack = *(int32_t *)ctrlpkt.m_pcData;
1900 if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0)
1901 {
1902 m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastA ck), ack);
1903 m_iSndLastAck = ack;
1904 }
1905
1906 break;
1907 }
1908
1909 // read ACK seq. no.
1910 ack = ctrlpkt.getAckSeqNo();
1911
1912 // send ACK acknowledgement
1913 // number of ACK2 can be much less than number of ACK
1914 uint64_t currtime = CTimer::getTime();
1915 if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack = = m_iSndLastAck2))
1916 {
1917 sendCtrl(6, &ack);
1918 m_iSndLastAck2 = ack;
1919 m_ullSndLastAck2Time = currtime;
1920 }
1921
1922 // Got data ACK
1923 ack = *(int32_t *)ctrlpkt.m_pcData;
1924
1925 // check the validation of the ack
1926 if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)
1927 {
1928 //this should not happen: attack or bug
1929 m_bBroken = true;
1930 m_iBrokenCounter = 0;
1931 break;
1932 }
1933
1934 if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0)
1935 {
1936 // Update Flow Window Size, must update before and together with m_iSnd LastAck
1937 m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3);
1938 m_iSndLastAck = ack;
1939 }
1940
1941 // protect packet retransmission
1942 CGuard::enterCS(m_AckLock);
1943
1944 int offset = CSeqNo::seqoff((int32_t&)m_iSndLastDataAck, ack);
1945 if (offset <= 0)
1946 {
1947 // discard it if it is a repeated ACK
1948 CGuard::leaveCS(m_AckLock);
1949 break;
1950 }
1951
1952 // acknowledge the sending buffer
1953 m_pSndBuffer->ackData(offset);
1954
1955 // record total time used for sending
1956 m_llSndDuration += currtime - m_llSndDurationCounter;
1957 m_llSndDurationTotal += currtime - m_llSndDurationCounter;
1958 m_llSndDurationCounter = currtime;
1959
1960 // update sending variables
1961 m_iSndLastDataAck = ack;
1962 m_pSndLossList->remove(CSeqNo::decseq((int32_t&)m_iSndLastDataAck));
1963
1964 CGuard::leaveCS(m_AckLock);
1965
1966 #ifndef WIN32
1967 pthread_mutex_lock(&m_SendBlockLock);
1968 if (m_bSynSending)
1969 pthread_cond_signal(&m_SendBlockCond);
1970 pthread_mutex_unlock(&m_SendBlockLock);
1971 #else
1972 if (m_bSynSending)
1973 SetEvent(m_SendBlockCond);
1974 #endif
1975
1976 // acknowledde any waiting epolls to write
1977 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID);
1978
1979 // insert this socket to snd list if it is not on the list yet
1980 m_pSndQueue->m_pSndUList->update(this, false);
1981
1982 // Update RTT
1983 //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1);
1984 //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2);
1985 int rtt = *((int32_t *)ctrlpkt.m_pcData + 1);
1986 m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;
1987 m_iRTT = (m_iRTT * 7 + rtt) >> 3;
1988
1989 m_pCC->setRTT(m_iRTT);
1990
1991 m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt;
1992 if (m_ullEXPInt < m_ullMinExpInt)
1993 m_ullEXPInt = m_ullMinExpInt;
1994
1995 if (ctrlpkt.getLength() > 16)
1996 {
1997 // Update Estimated Bandwidth and packet delivery rate
1998 if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0)
1999 m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcDa ta + 4)) >> 3;
2000
2001 if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0)
2002 m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5 )) >> 3;
2003
2004 m_pCC->setRcvRate(m_iDeliveryRate);
2005 m_pCC->setBandwidth(m_iBandwidth);
2006 }
2007
2008 m_pCC->onACK(ack);
2009 // update CC parameters
2010 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
2011 m_dCongestionWindow = m_pCC->m_dCWndSize;
2012
2013 ++ m_iRecvACK;
2014 ++ m_iRecvACKTotal;
2015
2016 break;
2017 }
2018
2019 case 6: //110 - Acknowledgement of Acknowledgement
2020 {
2021 int32_t ack;
2022 int rtt = -1;
2023
2024 // update RTT
2025 rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack);
2026 if (rtt <= 0)
2027 break;
2028
2029 //if increasing delay detected...
2030 // sendCtrl(4);
2031
2032 // RTT EWMA
2033 m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;
2034 m_iRTT = (m_iRTT * 7 + rtt) >> 3;
2035
2036 m_pCC->setRTT(m_iRTT);
2037
2038 m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt;
2039 if (m_ullEXPInt < m_ullMinExpInt)
2040 m_ullEXPInt = m_ullMinExpInt;
2041
2042 // update last ACK that has been received by the sender
2043 if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)
2044 m_iRcvLastAckAck = ack;
2045
2046 break;
2047 }
2048
2049 case 3: //011 - Loss Report
2050 {
2051 int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData);
2052
2053 m_pCC->onLoss(losslist, ctrlpkt.getLength() / 4);
2054 // update CC parameters
2055 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
2056 m_dCongestionWindow = m_pCC->m_dCWndSize;
2057
2058 bool secure = true;
2059
2060 // decode loss list message and insert loss into the sender loss list
2061 for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i)
2062 {
2063 if (0 != (losslist[i] & 0x80000000))
2064 {
2065 if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSndCurrSeqNo)) > 0))
2066 {
2067 // seq_a must not be greater than seq_b; seq_b must not be greate r than the most recent sent seq
2068 secure = false;
2069 break;
2070 }
2071
2072 int num = 0;
2073 if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, const_cast<int32_t&>(m_ iSndLastAck)) >= 0)
2074 num = m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);
2075 else if (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSnd LastAck)) >= 0)
2076 num = m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), losslist[i + 1]);
2077
2078 m_iTraceSndLoss += num;
2079 m_iSndLossTotal += num;
2080
2081 ++ i;
2082 }
2083 else if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndLastAck )) >= 0)
2084 {
2085 if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndCurrSeqNo )) > 0)
2086 {
2087 //seq_a must not be greater than the most recent sent seq
2088 secure = false;
2089 break;
2090 }
2091
2092 int num = m_pSndLossList->insert(losslist[i], losslist[i]);
2093
2094 m_iTraceSndLoss += num;
2095 m_iSndLossTotal += num;
2096 }
2097 }
2098
2099 if (!secure)
2100 {
2101 //this should not happen: attack or bug
2102 m_bBroken = true;
2103 m_iBrokenCounter = 0;
2104 break;
2105 }
2106
2107 // the lost packet (retransmission) should be sent out immediately
2108 m_pSndQueue->m_pSndUList->update(this);
2109
2110 ++ m_iRecvNAK;
2111 ++ m_iRecvNAKTotal;
2112
2113 break;
2114 }
2115
2116 case 4: //100 - Delay Warning
2117 // One way packet delay is increasing, so decrease the sending rate
2118 m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125);
2119 m_iLastDecSeq = m_iSndCurrSeqNo;
2120
2121 break;
2122
2123 case 1: //001 - Keep-alive
2124 // The only purpose of keep-alive packet is to tell that the peer is still alive
2125 // nothing needs to be done.
2126
2127 break;
2128
2129 case 0: //000 - Handshake
2130 if ((((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType > 0) || (m_bRendezvous && (((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType != -2)))
2131 {
2132 // The peer side has not received the handshake message, so it keeps qu erying
2133 // resend the handshake packet
2134
2135 CHandShake initdata;
2136 initdata.m_iISN = m_iISN;
2137 initdata.m_iMSS = m_iMSS;
2138 initdata.m_iFlightFlagSize = m_iFlightFlagSize;
2139 initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2;
2140 initdata.m_iID = m_SocketID;
2141
2142 char buffer[1500];
2143 int size = 1500;
2144 initdata.serialize(buffer, size);
2145 sendCtrl(0, NULL, buffer, size);
2146 }
2147
2148 break;
2149
2150 case 5: //101 - Shutdown
2151 m_bShutdown = true;
2152 m_bClosing = true;
2153 m_bBroken = true;
2154 m_iBrokenCounter = 60;
2155
2156 // Signal the sender and recver if they are waiting for data.
2157 releaseSynch();
2158
2159 CTimer::triggerEvent();
2160
2161 break;
2162
2163 case 7: //111 - Msg drop request
2164 m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq());
2165 m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_ pcData + 4));
2166
2167 // move forward with current recv seq no.
2168 if ((CSeqNo::seqcmp(*(int32_t*)ctrlpkt.m_pcData, CSeqNo::incseq(m_iRcvCurr SeqNo)) <= 0)
2169 && (CSeqNo::seqcmp(*(int32_t*)(ctrlpkt.m_pcData + 4), m_iRcvCurrSeqNo) > 0))
2170 {
2171 m_iRcvCurrSeqNo = *(int32_t*)(ctrlpkt.m_pcData + 4);
2172 }
2173
2174 break;
2175
2176 case 8: // 1000 - An error has happened to the peer side
2177 //int err_type = packet.getAddInfo();
2178
2179 // currently only this error is signalled from the peer side
2180 // if recvfile() failes (e.g., due to disk fail), blcoked sendfile/send sh ould return immediately
2181 // giving the app a chance to fix the issue
2182
2183 m_bPeerHealth = false;
2184
2185 break;
2186
2187 case 32767: //0x7FFF - reserved and user defined messages
2188 m_pCC->processCustomMsg(&ctrlpkt);
2189 // update CC parameters
2190 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
2191 m_dCongestionWindow = m_pCC->m_dCWndSize;
2192
2193 break;
2194
2195 default:
2196 break;
2197 }
2198 }
2199
2200 int CUDT::packData(CPacket& packet, uint64_t& ts)
2201 {
2202 int payload = 0;
2203 bool probe = false;
2204
2205 uint64_t entertime;
2206 CTimer::rdtsc(entertime);
2207
2208 if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))
2209 m_ullTimeDiff += entertime - m_ullTargetTime;
2210
2211 // Loss retransmission always has higher priority.
2212 if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0)
2213 {
2214 // protect m_iSndLastDataAck from updating by ACK processing
2215 CGuard ackguard(m_AckLock);
2216
2217 int offset = CSeqNo::seqoff((int32_t&)m_iSndLastDataAck, packet.m_iSeqNo);
2218 if (offset < 0)
2219 return 0;
2220
2221 int msglen;
2222
2223 payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsg No, msglen);
2224
2225 if (-1 == payload)
2226 {
2227 int32_t seqpair[2];
2228 seqpair[0] = packet.m_iSeqNo;
2229 seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);
2230 sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);
2231
2232 // only one msg drop request is necessary
2233 m_pSndLossList->remove(seqpair[1]);
2234
2235 // skip all dropped packets
2236 if (CSeqNo::seqcmp(const_cast<int32_t&>(m_iSndCurrSeqNo), CSeqNo::incse q(seqpair[1])) < 0)
2237 m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]);
2238
2239 return 0;
2240 }
2241 else if (0 == payload)
2242 return 0;
2243
2244 ++ m_iTraceRetrans;
2245 ++ m_iRetransTotal;
2246 }
2247 else
2248 {
2249 // If no loss, pack a new packet.
2250
2251 // check congestion/flow window limit
2252 int cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowS ize : (int)m_dCongestionWindow;
2253 if (cwnd >= CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSeqNo::in cseq(m_iSndCurrSeqNo)))
2254 {
2255 if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m _iMsgNo)))
2256 {
2257 m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
2258 m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo);
2259
2260 packet.m_iSeqNo = m_iSndCurrSeqNo;
2261
2262 // every 16 (0xF) packets, a packet pair is sent
2263 if (0 == (packet.m_iSeqNo & 0xF))
2264 probe = true;
2265 }
2266 else
2267 {
2268 m_ullTargetTime = 0;
2269 m_ullTimeDiff = 0;
2270 ts = 0;
2271 return 0;
2272 }
2273 }
2274 else
2275 {
2276 m_ullTargetTime = 0;
2277 m_ullTimeDiff = 0;
2278 ts = 0;
2279 return 0;
2280 }
2281 }
2282
2283 packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);
2284 packet.m_iID = m_PeerID;
2285 packet.setLength(payload);
2286
2287 m_pCC->onPktSent(&packet);
2288 //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp);
2289
2290 ++ m_llTraceSent;
2291 ++ m_llSentTotal;
2292
2293 if (probe)
2294 {
2295 // sends out probing packet pair
2296 ts = entertime;
2297 probe = false;
2298 }
2299 else
2300 {
2301 #ifndef NO_BUSY_WAITING
2302 ts = entertime + m_ullInterval;
2303 #else
2304 if (m_ullTimeDiff >= m_ullInterval)
2305 {
2306 ts = entertime;
2307 m_ullTimeDiff -= m_ullInterval;
2308 }
2309 else
2310 {
2311 ts = entertime + m_ullInterval - m_ullTimeDiff;
2312 m_ullTimeDiff = 0;
2313 }
2314 #endif
2315 }
2316
2317 m_ullTargetTime = ts;
2318
2319 return payload;
2320 }
2321
2322 int CUDT::processData(CUnit* unit)
2323 {
2324 CPacket& packet = unit->m_Packet;
2325
2326 // Just heard from the peer, reset the expiration count.
2327 m_iEXPCount = 1;
2328 m_llLastRspTime = CTimer::getTime();
2329
2330 if (CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck)
2331 {
2332 uint64_t currtime;
2333 CTimer::rdtsc(currtime);
2334 if (!m_pCC->m_bUserDefinedRTO)
2335 m_ullNextEXPTime = currtime + m_ullEXPInt;
2336 else
2337 m_ullNextEXPTime = currtime + m_pCC->m_iRTO * m_ullCPUFrequency;
2338 }
2339
2340 m_pCC->onPktReceived(&packet);
2341
2342 ++ m_iPktCount;
2343
2344 // update time information
2345 m_pRcvTimeWindow->onPktArrival();
2346
2347 // check if it is probing packet pair
2348 if (0 == (packet.m_iSeqNo & 0xF))
2349 m_pRcvTimeWindow->probe1Arrival();
2350 else if (1 == (packet.m_iSeqNo & 0xF))
2351 m_pRcvTimeWindow->probe2Arrival();
2352
2353 ++ m_llTraceRecv;
2354 ++ m_llRecvTotal;
2355
2356 int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);
2357 if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))
2358 return -1;
2359
2360 if (m_pRcvBuffer->addData(unit, offset) < 0)
2361 return -1;
2362
2363 // Loss detection.
2364 if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0)
2365 {
2366 // If loss found, insert them to the receiver loss list
2367 m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(pac ket.m_iSeqNo));
2368
2369 // pack loss list for NAK
2370 int32_t lossdata[2];
2371 lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;
2372 lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);
2373
2374 // Generate loss report immediately.
2375 sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::de cseq(packet.m_iSeqNo)) ? 1 : 2);
2376
2377 int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;
2378 m_iTraceRcvLoss += loss;
2379 m_iRcvLossTotal += loss;
2380 }
2381
2382 // This is not a regular fixed size packet...
2383 //an irregular sized packet usually indicates the end of a message, so send a n ACK immediately
2384 if (packet.getLength() != m_iPayloadSize)
2385 CTimer::rdtsc(m_ullNextACKTime);
2386
2387 // Update the current largest sequence number that has been received.
2388 // Or it is a retransmitted packet, remove it from receiver loss list.
2389 if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)
2390 m_iRcvCurrSeqNo = packet.m_iSeqNo;
2391 else
2392 m_pRcvLossList->remove(packet.m_iSeqNo);
2393
2394 return 0;
2395 }
2396
2397 int CUDT::listen(sockaddr* addr, CPacket& packet)
2398 {
2399 if (m_bClosing)
2400 return 1002;
2401
2402 if (packet.getLength() != CHandShake::m_iContentSize)
2403 return 1004;
2404
2405 CHandShake hs;
2406 hs.deserialize(packet.m_pcData, packet.getLength());
2407
2408 // SYN cookie
2409 char clienthost[NI_MAXHOST];
2410 char clientport[NI_MAXSERV];
2411 getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sock addr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NU MERICHOST|NI_NUMERICSERV);
2412 int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret c hanges every one minute
2413 char cookiestr[1024];
2414 sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int)times tamp);
2415 unsigned char cookie[16];
2416 CMD5::compute(cookiestr, cookie);
2417
2418 if (1 == hs.m_iReqType)
2419 {
2420 hs.m_iCookie = *(int*)cookie;
2421 packet.m_iID = hs.m_iID;
2422 int size = packet.getLength();
2423 hs.serialize(packet.m_pcData, size);
2424 m_pSndQueue->sendto(addr, packet);
2425 return 0;
2426 }
2427 else
2428 {
2429 if (hs.m_iCookie != *(int*)cookie)
2430 {
2431 timestamp --;
2432 sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int )timestamp);
2433 CMD5::compute(cookiestr, cookie);
2434
2435 if (hs.m_iCookie != *(int*)cookie)
2436 return -1;
2437 }
2438 }
2439
2440 int32_t id = hs.m_iID;
2441
2442 // When a peer side connects in...
2443 if ((1 == packet.getFlag()) && (0 == packet.getType()))
2444 {
2445 if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType))
2446 {
2447 // mismatch, reject the request
2448 hs.m_iReqType = 1002;
2449 int size = CHandShake::m_iContentSize;
2450 hs.serialize(packet.m_pcData, size);
2451 packet.m_iID = id;
2452 m_pSndQueue->sendto(addr, packet);
2453 }
2454 else
2455 {
2456 int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);
2457 if (result == -1)
2458 hs.m_iReqType = 1002;
2459
2460 // send back a response if connection failed or connection already exis ted
2461 // new connection response should be sent in connect()
2462 if (result != 1)
2463 {
2464 int size = CHandShake::m_iContentSize;
2465 hs.serialize(packet.m_pcData, size);
2466 packet.m_iID = id;
2467 m_pSndQueue->sendto(addr, packet);
2468 }
2469 else
2470 {
2471 // a mew connection has been created, enable epoll for read
2472 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID);
2473 }
2474 }
2475 }
2476
2477 return hs.m_iReqType;
2478 }
2479
2480 void CUDT::checkTimers()
2481 {
2482 // update CC parameters
2483 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
2484 m_dCongestionWindow = m_pCC->m_dCWndSize;
2485 //uint64_t minint = (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPk tSndInt() * 0.9);
2486 //if (m_ullInterval < minint)
2487 // m_ullInterval = minint;
2488
2489 uint64_t currtime;
2490 CTimer::rdtsc(currtime);
2491
2492 if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC-> m_iACKInterval <= m_iPktCount)))
2493 {
2494 // ACK timer expired or ACK interval is reached
2495
2496 sendCtrl(2);
2497 CTimer::rdtsc(currtime);
2498 if (m_pCC->m_iACKPeriod > 0)
2499 m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency;
2500 else
2501 m_ullNextACKTime = currtime + m_ullACKInt;
2502
2503 m_iPktCount = 0;
2504 m_iLightACKCount = 1;
2505 }
2506 else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount)
2507 {
2508 //send a "light" ACK
2509 sendCtrl(2, NULL, NULL, 4);
2510 ++ m_iLightACKCount;
2511 }
2512
2513 // we are not sending back repeated NAK anymore and rely on the sender's EXP for retransmission
2514 //if ((m_pRcvLossList->getLossLength() > 0) && (currtime > m_ullNextNAKTime))
2515 //{
2516 // // NAK timer expired, and there is loss to be reported.
2517 // sendCtrl(3);
2518 //
2519 // CTimer::rdtsc(currtime);
2520 // m_ullNextNAKTime = currtime + m_ullNAKInt;
2521 //}
2522
2523 if (currtime > m_ullNextEXPTime)
2524 {
2525 // Haven't receive any information from the peer, is it dead?!
2526 // timeout: at least 16 expirations and must be greater than 10 seconds
2527 if ((m_iEXPCount > 16) && (CTimer::getTime() - m_llLastRspTime > 10000000) )
2528 {
2529 //
2530 // Connection is broken.
2531 // UDT does not signal any information about this instead of to stop qu ietly.
2532 // Apllication will detect this when it calls any UDT methods next time .
2533 //
2534 m_bClosing = true;
2535 m_bBroken = true;
2536 m_iBrokenCounter = 30;
2537
2538 // update snd U list to remove this socket
2539 m_pSndQueue->m_pSndUList->update(this);
2540
2541 releaseSynch();
2542
2543 // a broken socket can be "write" to learn the error
2544 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID);
2545
2546 CTimer::triggerEvent();
2547
2548 return;
2549 }
2550
2551 // sender: Insert all the packets sent after last received acknowledgement into the sender loss list.
2552 // recver: Send out a keep-alive packet
2553 if (m_pSndBuffer->getCurrBufSize() > 0)
2554 {
2555 if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossLi st->getLossLength() == 0))
2556 {
2557 // resend all unacknowledged packets on timeout, but only if there i s no packet in the loss list
2558 int32_t csn = m_iSndCurrSeqNo;
2559 int num = m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck) , csn);
2560 m_iTraceSndLoss += num;
2561 m_iSndLossTotal += num;
2562 }
2563
2564 m_pCC->onTimeout();
2565 // update CC parameters
2566 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
2567 m_dCongestionWindow = m_pCC->m_dCWndSize;
2568
2569 // immediately restart transmission
2570 m_pSndQueue->m_pSndUList->update(this);
2571 }
2572 else
2573 {
2574 sendCtrl(1);
2575 }
2576
2577 ++ m_iEXPCount;
2578 m_ullEXPInt = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency;
2579 if (m_ullEXPInt < m_iEXPCount * m_ullMinExpInt)
2580 m_ullEXPInt = m_iEXPCount * m_ullMinExpInt;
2581 CTimer::rdtsc(m_ullNextEXPTime);
2582 m_ullNextEXPTime += m_ullEXPInt;
2583 }
2584 }
2585
2586 void CUDT::addEPoll(const int eid)
2587 {
2588 CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);
2589 m_sPollID.insert(eid);
2590 CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
2591
2592 if (!m_bConnected || m_bBroken || m_bClosing)
2593 return;
2594
2595 if ((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0))
2596 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID);
2597 else if ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0))
2598 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID);
2599
2600 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
2601 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID);
2602 }
2603
2604 void CUDT::removeEPoll(const int eid)
2605 {
2606 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID);
2607 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID);
2608
2609 CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);
2610 m_sPollID.erase(eid);
2611 CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
2612 }
OLDNEW
« no previous file with comments | « net/third_party/udt/src/core.h ('k') | net/third_party/udt/src/epoll.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698