OLD | NEW |
| (Empty) |
1 /***************************************************************************** | |
2 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois. | |
3 All rights reserved. | |
4 | |
5 Redistribution and use in source and binary forms, with or without | |
6 modification, are permitted provided that the following conditions are | |
7 met: | |
8 | |
9 * Redistributions of source code must retain the above | |
10 copyright notice, this list of conditions and the | |
11 following disclaimer. | |
12 | |
13 * Redistributions in binary form must reproduce the | |
14 above copyright notice, this list of conditions | |
15 and the following disclaimer in the documentation | |
16 and/or other materials provided with the distribution. | |
17 | |
18 * Neither the name of the University of Illinois | |
19 nor the names of its contributors may be used to | |
20 endorse or promote products derived from this | |
21 software without specific prior written permission. | |
22 | |
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS | |
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, | |
25 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR | |
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR | |
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, | |
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, | |
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | |
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | |
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | |
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
34 *****************************************************************************/ | |
35 | |
36 /***************************************************************************** | |
37 written by | |
38 Yunhong Gu, last updated 01/22/2011 | |
39 *****************************************************************************/ | |
40 | |
41 #ifndef WIN32 | |
42 #include <unistd.h> | |
43 #include <netdb.h> | |
44 #include <arpa/inet.h> | |
45 #include <cerrno> | |
46 #include <cstring> | |
47 #include <cstdlib> | |
48 #else | |
49 #include <winsock2.h> | |
50 #include <ws2tcpip.h> | |
51 #ifdef LEGACY_WIN32 | |
52 #include <wspiapi.h> | |
53 #endif | |
54 #endif | |
55 #include <cmath> | |
56 #include "queue.h" | |
57 #include "core.h" | |
58 | |
59 using namespace std; | |
60 | |
61 | |
62 CUDTUnited CUDT::s_UDTUnited; | |
63 | |
64 const UDTSOCKET CUDT::INVALID_SOCK = -1; | |
65 const int CUDT::ERROR = -1; | |
66 | |
67 const UDTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK; | |
68 const int UDT::ERROR = CUDT::ERROR; | |
69 | |
70 const int32_t CSeqNo::m_iSeqNoTH = 0x3FFFFFFF; | |
71 const int32_t CSeqNo::m_iMaxSeqNo = 0x7FFFFFFF; | |
72 const int32_t CAckNo::m_iMaxAckSeqNo = 0x7FFFFFFF; | |
73 const int32_t CMsgNo::m_iMsgNoTH = 0xFFFFFFF; | |
74 const int32_t CMsgNo::m_iMaxMsgNo = 0x1FFFFFFF; | |
75 | |
76 const int CUDT::m_iVersion = 4; | |
77 const int CUDT::m_iSYNInterval = 10000; | |
78 const int CUDT::m_iSelfClockInterval = 64; | |
79 | |
80 | |
81 CUDT::CUDT() | |
82 { | |
83 m_pSndBuffer = NULL; | |
84 m_pRcvBuffer = NULL; | |
85 m_pSndLossList = NULL; | |
86 m_pRcvLossList = NULL; | |
87 m_pACKWindow = NULL; | |
88 m_pSndTimeWindow = NULL; | |
89 m_pRcvTimeWindow = NULL; | |
90 | |
91 m_pSndQueue = NULL; | |
92 m_pRcvQueue = NULL; | |
93 m_pPeerAddr = NULL; | |
94 m_pSNode = NULL; | |
95 m_pRNode = NULL; | |
96 | |
97 // Initilize mutex and condition variables | |
98 initSynch(); | |
99 | |
100 // Default UDT configurations | |
101 m_iMSS = 1500; | |
102 m_bSynSending = true; | |
103 m_bSynRecving = true; | |
104 m_iFlightFlagSize = 25600; | |
105 m_iSndBufSize = 8192; | |
106 m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size | |
107 m_Linger.l_onoff = 1; | |
108 m_Linger.l_linger = 180; | |
109 m_iUDPSndBufSize = 65536; | |
110 m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS; | |
111 m_iSockType = UDT_STREAM; | |
112 m_iIPversion = AF_INET; | |
113 m_bRendezvous = false; | |
114 m_iSndTimeOut = -1; | |
115 m_iRcvTimeOut = -1; | |
116 m_bReuseAddr = true; | |
117 m_llMaxBW = -1; | |
118 | |
119 m_pCCFactory = new CCCFactory<CUDTCC>; | |
120 m_pCC = NULL; | |
121 m_pCache = NULL; | |
122 | |
123 // Initial status | |
124 m_bOpened = false; | |
125 m_bListening = false; | |
126 m_bConnected = false; | |
127 m_bClosing = false; | |
128 m_bShutdown = false; | |
129 m_bBroken = false; | |
130 m_bPeerHealth = true; | |
131 m_ullLingerExpiration = 0; | |
132 } | |
133 | |
134 CUDT::CUDT(const CUDT& ancestor) | |
135 { | |
136 m_pSndBuffer = NULL; | |
137 m_pRcvBuffer = NULL; | |
138 m_pSndLossList = NULL; | |
139 m_pRcvLossList = NULL; | |
140 m_pACKWindow = NULL; | |
141 m_pSndTimeWindow = NULL; | |
142 m_pRcvTimeWindow = NULL; | |
143 | |
144 m_pSndQueue = NULL; | |
145 m_pRcvQueue = NULL; | |
146 m_pPeerAddr = NULL; | |
147 m_pSNode = NULL; | |
148 m_pRNode = NULL; | |
149 | |
150 // Initilize mutex and condition variables | |
151 initSynch(); | |
152 | |
153 // Default UDT configurations | |
154 m_iMSS = ancestor.m_iMSS; | |
155 m_bSynSending = ancestor.m_bSynSending; | |
156 m_bSynRecving = ancestor.m_bSynRecving; | |
157 m_iFlightFlagSize = ancestor.m_iFlightFlagSize; | |
158 m_iSndBufSize = ancestor.m_iSndBufSize; | |
159 m_iRcvBufSize = ancestor.m_iRcvBufSize; | |
160 m_Linger = ancestor.m_Linger; | |
161 m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize; | |
162 m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize; | |
163 m_iSockType = ancestor.m_iSockType; | |
164 m_iIPversion = ancestor.m_iIPversion; | |
165 m_bRendezvous = ancestor.m_bRendezvous; | |
166 m_iSndTimeOut = ancestor.m_iSndTimeOut; | |
167 m_iRcvTimeOut = ancestor.m_iRcvTimeOut; | |
168 m_bReuseAddr = true; // this must be true, because all accepted sockets share
d the same port with the listener | |
169 m_llMaxBW = ancestor.m_llMaxBW; | |
170 | |
171 m_pCCFactory = ancestor.m_pCCFactory->clone(); | |
172 m_pCC = NULL; | |
173 m_pCache = ancestor.m_pCache; | |
174 | |
175 // Initial status | |
176 m_bOpened = false; | |
177 m_bListening = false; | |
178 m_bConnected = false; | |
179 m_bClosing = false; | |
180 m_bShutdown = false; | |
181 m_bBroken = false; | |
182 m_bPeerHealth = true; | |
183 m_ullLingerExpiration = 0; | |
184 } | |
185 | |
186 CUDT::~CUDT() | |
187 { | |
188 // release mutex/condtion variables | |
189 destroySynch(); | |
190 | |
191 // destroy the data structures | |
192 delete m_pSndBuffer; | |
193 delete m_pRcvBuffer; | |
194 delete m_pSndLossList; | |
195 delete m_pRcvLossList; | |
196 delete m_pACKWindow; | |
197 delete m_pSndTimeWindow; | |
198 delete m_pRcvTimeWindow; | |
199 delete m_pCCFactory; | |
200 delete m_pCC; | |
201 delete m_pPeerAddr; | |
202 delete m_pSNode; | |
203 delete m_pRNode; | |
204 } | |
205 | |
206 void CUDT::setOpt(UDTOpt optName, const void* optval, const int&) | |
207 { | |
208 if (m_bBroken || m_bClosing) | |
209 throw CUDTException(2, 1, 0); | |
210 | |
211 CGuard cg(m_ConnectionLock); | |
212 CGuard sendguard(m_SendLock); | |
213 CGuard recvguard(m_RecvLock); | |
214 | |
215 switch (optName) | |
216 { | |
217 case UDT_MSS: | |
218 if (m_bOpened) | |
219 throw CUDTException(5, 1, 0); | |
220 | |
221 if (*(int*)optval < int(28 + CHandShake::m_iContentSize)) | |
222 throw CUDTException(5, 3, 0); | |
223 | |
224 m_iMSS = *(int*)optval; | |
225 | |
226 // Packet size cannot be greater than UDP buffer size | |
227 if (m_iMSS > m_iUDPSndBufSize) | |
228 m_iMSS = m_iUDPSndBufSize; | |
229 if (m_iMSS > m_iUDPRcvBufSize) | |
230 m_iMSS = m_iUDPRcvBufSize; | |
231 | |
232 break; | |
233 | |
234 case UDT_SNDSYN: | |
235 m_bSynSending = *(bool *)optval; | |
236 break; | |
237 | |
238 case UDT_RCVSYN: | |
239 m_bSynRecving = *(bool *)optval; | |
240 break; | |
241 | |
242 case UDT_CC: | |
243 if (m_bConnected) | |
244 throw CUDTException(5, 1, 0); | |
245 if (NULL != m_pCCFactory) | |
246 delete m_pCCFactory; | |
247 m_pCCFactory = ((CCCVirtualFactory *)optval)->clone(); | |
248 | |
249 break; | |
250 | |
251 case UDT_FC: | |
252 if (m_bConnected) | |
253 throw CUDTException(5, 2, 0); | |
254 | |
255 if (*(int*)optval < 1) | |
256 throw CUDTException(5, 3); | |
257 | |
258 // Mimimum recv flight flag size is 32 packets | |
259 if (*(int*)optval > 32) | |
260 m_iFlightFlagSize = *(int*)optval; | |
261 else | |
262 m_iFlightFlagSize = 32; | |
263 | |
264 break; | |
265 | |
266 case UDT_SNDBUF: | |
267 if (m_bOpened) | |
268 throw CUDTException(5, 1, 0); | |
269 | |
270 if (*(int*)optval <= 0) | |
271 throw CUDTException(5, 3, 0); | |
272 | |
273 m_iSndBufSize = *(int*)optval / (m_iMSS - 28); | |
274 | |
275 break; | |
276 | |
277 case UDT_RCVBUF: | |
278 if (m_bOpened) | |
279 throw CUDTException(5, 1, 0); | |
280 | |
281 if (*(int*)optval <= 0) | |
282 throw CUDTException(5, 3, 0); | |
283 | |
284 // Mimimum recv buffer size is 32 packets | |
285 if (*(int*)optval > (m_iMSS - 28) * 32) | |
286 m_iRcvBufSize = *(int*)optval / (m_iMSS - 28); | |
287 else | |
288 m_iRcvBufSize = 32; | |
289 | |
290 // recv buffer MUST not be greater than FC size | |
291 if (m_iRcvBufSize > m_iFlightFlagSize) | |
292 m_iRcvBufSize = m_iFlightFlagSize; | |
293 | |
294 break; | |
295 | |
296 case UDT_LINGER: | |
297 m_Linger = *(linger*)optval; | |
298 break; | |
299 | |
300 case UDP_SNDBUF: | |
301 if (m_bOpened) | |
302 throw CUDTException(5, 1, 0); | |
303 | |
304 m_iUDPSndBufSize = *(int*)optval; | |
305 | |
306 if (m_iUDPSndBufSize < m_iMSS) | |
307 m_iUDPSndBufSize = m_iMSS; | |
308 | |
309 break; | |
310 | |
311 case UDP_RCVBUF: | |
312 if (m_bOpened) | |
313 throw CUDTException(5, 1, 0); | |
314 | |
315 m_iUDPRcvBufSize = *(int*)optval; | |
316 | |
317 if (m_iUDPRcvBufSize < m_iMSS) | |
318 m_iUDPRcvBufSize = m_iMSS; | |
319 | |
320 break; | |
321 | |
322 case UDT_RENDEZVOUS: | |
323 if (m_bConnected) | |
324 throw CUDTException(5, 1, 0); | |
325 m_bRendezvous = *(bool *)optval; | |
326 break; | |
327 | |
328 case UDT_SNDTIMEO: | |
329 m_iSndTimeOut = *(int*)optval; | |
330 break; | |
331 | |
332 case UDT_RCVTIMEO: | |
333 m_iRcvTimeOut = *(int*)optval; | |
334 break; | |
335 | |
336 case UDT_REUSEADDR: | |
337 if (m_bOpened) | |
338 throw CUDTException(5, 1, 0); | |
339 m_bReuseAddr = *(bool*)optval; | |
340 break; | |
341 | |
342 case UDT_MAXBW: | |
343 if (m_bConnected) | |
344 throw CUDTException(5, 1, 0); | |
345 m_llMaxBW = *(int64_t*)optval; | |
346 break; | |
347 | |
348 default: | |
349 throw CUDTException(5, 0, 0); | |
350 } | |
351 } | |
352 | |
353 void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen) | |
354 { | |
355 CGuard cg(m_ConnectionLock); | |
356 | |
357 switch (optName) | |
358 { | |
359 case UDT_MSS: | |
360 *(int*)optval = m_iMSS; | |
361 optlen = sizeof(int); | |
362 break; | |
363 | |
364 case UDT_SNDSYN: | |
365 *(bool*)optval = m_bSynSending; | |
366 optlen = sizeof(bool); | |
367 break; | |
368 | |
369 case UDT_RCVSYN: | |
370 *(bool*)optval = m_bSynRecving; | |
371 optlen = sizeof(bool); | |
372 break; | |
373 | |
374 case UDT_CC: | |
375 if (!m_bOpened) | |
376 throw CUDTException(5, 5, 0); | |
377 *(CCC**)optval = m_pCC; | |
378 optlen = sizeof(CCC*); | |
379 | |
380 break; | |
381 | |
382 case UDT_FC: | |
383 *(int*)optval = m_iFlightFlagSize; | |
384 optlen = sizeof(int); | |
385 break; | |
386 | |
387 case UDT_SNDBUF: | |
388 *(int*)optval = m_iSndBufSize * (m_iMSS - 28); | |
389 optlen = sizeof(int); | |
390 break; | |
391 | |
392 case UDT_RCVBUF: | |
393 *(int*)optval = m_iRcvBufSize * (m_iMSS - 28); | |
394 optlen = sizeof(int); | |
395 break; | |
396 | |
397 case UDT_LINGER: | |
398 if (optlen < (int)(sizeof(linger))) | |
399 throw CUDTException(5, 3, 0); | |
400 | |
401 *(linger*)optval = m_Linger; | |
402 optlen = sizeof(linger); | |
403 break; | |
404 | |
405 case UDP_SNDBUF: | |
406 *(int*)optval = m_iUDPSndBufSize; | |
407 optlen = sizeof(int); | |
408 break; | |
409 | |
410 case UDP_RCVBUF: | |
411 *(int*)optval = m_iUDPRcvBufSize; | |
412 optlen = sizeof(int); | |
413 break; | |
414 | |
415 case UDT_RENDEZVOUS: | |
416 *(bool *)optval = m_bRendezvous; | |
417 optlen = sizeof(bool); | |
418 break; | |
419 | |
420 case UDT_SNDTIMEO: | |
421 *(int*)optval = m_iSndTimeOut; | |
422 optlen = sizeof(int); | |
423 break; | |
424 | |
425 case UDT_RCVTIMEO: | |
426 *(int*)optval = m_iRcvTimeOut; | |
427 optlen = sizeof(int); | |
428 break; | |
429 | |
430 case UDT_REUSEADDR: | |
431 *(bool *)optval = m_bReuseAddr; | |
432 optlen = sizeof(bool); | |
433 break; | |
434 | |
435 case UDT_MAXBW: | |
436 *(int64_t*)optval = m_llMaxBW; | |
437 break; | |
438 | |
439 default: | |
440 throw CUDTException(5, 0, 0); | |
441 } | |
442 } | |
443 | |
444 void CUDT::open() | |
445 { | |
446 CGuard cg(m_ConnectionLock); | |
447 | |
448 // Initial sequence number, loss, acknowledgement, etc. | |
449 m_iPktSize = m_iMSS - 28; | |
450 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; | |
451 | |
452 m_iEXPCount = 1; | |
453 m_iBandwidth = 1; | |
454 m_iDeliveryRate = 16; | |
455 m_iAckSeqNo = 0; | |
456 m_ullLastAckTime = 0; | |
457 | |
458 // trace information | |
459 m_StartTime = CTimer::getTime(); | |
460 m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetran
sTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal =
0; | |
461 m_LastSampleTime = CTimer::getTime(); | |
462 m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceR
etrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; | |
463 m_llSndDuration = m_llSndDurationTotal = 0; | |
464 | |
465 // structures for queue | |
466 if (NULL == m_pSNode) | |
467 m_pSNode = new CSNode; | |
468 m_pSNode->m_pUDT = this; | |
469 m_pSNode->m_llTimeStamp = 1; | |
470 m_pSNode->m_iHeapLoc = -1; | |
471 | |
472 if (NULL == m_pRNode) | |
473 m_pRNode = new CRNode; | |
474 m_pRNode->m_pUDT = this; | |
475 m_pRNode->m_llTimeStamp = 1; | |
476 m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL; | |
477 m_pRNode->m_bOnList = false; | |
478 | |
479 m_iRTT = 10 * m_iSYNInterval; | |
480 m_iRTTVar = m_iRTT >> 1; | |
481 m_ullCPUFrequency = CTimer::getCPUFrequency(); | |
482 | |
483 // set up the timers | |
484 m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency; | |
485 | |
486 // set minimum NAK and EXP timeout to 100ms | |
487 m_ullMinNakInt = 300000 * m_ullCPUFrequency; | |
488 m_ullMinExpInt = 300000 * m_ullCPUFrequency; | |
489 | |
490 m_ullACKInt = m_ullSYNInt; | |
491 m_ullNAKInt = m_ullMinNakInt; | |
492 m_ullEXPInt = m_ullMinExpInt; | |
493 m_llLastRspTime = CTimer::getTime(); | |
494 | |
495 CTimer::rdtsc(m_ullNextACKTime); | |
496 m_ullNextACKTime += m_ullSYNInt; | |
497 CTimer::rdtsc(m_ullNextNAKTime); | |
498 m_ullNextNAKTime += m_ullNAKInt; | |
499 CTimer::rdtsc(m_ullNextEXPTime); | |
500 m_ullNextEXPTime += m_ullEXPInt; | |
501 | |
502 m_iPktCount = 0; | |
503 m_iLightACKCount = 1; | |
504 | |
505 m_ullTargetTime = 0; | |
506 m_ullTimeDiff = 0; | |
507 | |
508 // Now UDT is opened. | |
509 m_bOpened = true; | |
510 } | |
511 | |
512 void CUDT::listen() | |
513 { | |
514 CGuard cg(m_ConnectionLock); | |
515 | |
516 if (!m_bOpened) | |
517 throw CUDTException(5, 0, 0); | |
518 | |
519 if (m_bConnected) | |
520 throw CUDTException(5, 2, 0); | |
521 | |
522 // listen can be called more than once | |
523 if (m_bListening) | |
524 return; | |
525 | |
526 // if there is already another socket listening on the same port | |
527 if (m_pRcvQueue->setListener(this) < 0) | |
528 throw CUDTException(5, 11, 0); | |
529 | |
530 m_bListening = true; | |
531 } | |
532 | |
533 void CUDT::connect(const sockaddr* serv_addr) | |
534 { | |
535 CGuard cg(m_ConnectionLock); | |
536 | |
537 if (!m_bOpened) | |
538 throw CUDTException(5, 0, 0); | |
539 | |
540 if (m_bListening) | |
541 throw CUDTException(5, 2, 0); | |
542 | |
543 if (m_bConnected) | |
544 throw CUDTException(5, 2, 0); | |
545 | |
546 // register this socket in the rendezvous queue | |
547 // RendezevousQueue is used to temporarily store incoming handshake, non-rend
ezvous connections also require this function | |
548 m_pRcvQueue->m_pRendezvousQueue->insert(m_SocketID, m_iIPversion, serv_addr); | |
549 | |
550 CPacket request; | |
551 char* reqdata = new char [m_iPayloadSize]; | |
552 CHandShake req; | |
553 | |
554 CPacket response; | |
555 char* resdata = new char [m_iPayloadSize]; | |
556 CHandShake res; | |
557 | |
558 // This is my current configurations | |
559 req.m_iVersion = m_iVersion; | |
560 req.m_iType = m_iSockType; | |
561 req.m_iMSS = m_iMSS; | |
562 req.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize :
m_iFlightFlagSize; | |
563 req.m_iReqType = (!m_bRendezvous) ? 1 : 0; | |
564 req.m_iID = m_SocketID; | |
565 CIPAddress::ntop(serv_addr, req.m_piPeerIP, m_iIPversion); | |
566 | |
567 // Random Initial Sequence Number | |
568 srand((unsigned int)CTimer::getTime()); | |
569 m_iISN = req.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_
MAX)); | |
570 | |
571 m_iLastDecSeq = req.m_iISN - 1; | |
572 m_iSndLastAck = req.m_iISN; | |
573 m_iSndLastDataAck = req.m_iISN; | |
574 m_iSndCurrSeqNo = req.m_iISN - 1; | |
575 m_iSndLastAck2 = req.m_iISN; | |
576 m_ullSndLastAck2Time = CTimer::getTime(); | |
577 | |
578 // Inform the server my configurations. | |
579 request.pack(0, NULL, reqdata, m_iPayloadSize); | |
580 // ID = 0, connection request | |
581 request.m_iID = 0; | |
582 | |
583 // Wait for the negotiated configurations from the peer side. | |
584 response.pack(0, NULL, resdata, m_iPayloadSize); | |
585 | |
586 uint64_t timeo = 3000000; | |
587 if (m_bRendezvous) | |
588 timeo *= 10; | |
589 uint64_t entertime = CTimer::getTime(); | |
590 uint64_t last_req_time = 0; | |
591 | |
592 CUDTException e(0, 0); | |
593 char* tmp = NULL; | |
594 | |
595 while (!m_bClosing) | |
596 { | |
597 // avoid sending too many requests, at most 1 request per 250ms | |
598 if (CTimer::getTime() - last_req_time > 250000) | |
599 { | |
600 req.serialize(reqdata, m_iPayloadSize); | |
601 request.setLength(CHandShake::m_iContentSize); | |
602 m_pSndQueue->sendto(serv_addr, request); | |
603 | |
604 last_req_time = CTimer::getTime(); | |
605 } | |
606 | |
607 response.setLength(m_iPayloadSize); | |
608 if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0) | |
609 { | |
610 if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getTy
pe())) && (NULL != tmp)) | |
611 { | |
612 // a data packet or a keep-alive packet comes, which means the peer
side is already connected | |
613 // in this situation, a previously recorded response (tmp) will be u
sed | |
614 res.deserialize(tmp, CHandShake::m_iContentSize); | |
615 memcpy(m_piSelfIP, res.m_piPeerIP, 16); | |
616 break; | |
617 } | |
618 | |
619 if ((1 != response.getFlag()) || (0 != response.getType())) | |
620 response.setLength(-1); | |
621 else | |
622 { | |
623 res.deserialize(response.m_pcData, response.getLength()); | |
624 | |
625 if (m_bRendezvous) | |
626 { | |
627 // regular connect should NOT communicate with rendezvous connect | |
628 // rendezvous connect require 3-way handshake | |
629 if (1 == res.m_iReqType) | |
630 response.setLength(-1); | |
631 else if ((0 == res.m_iReqType) || (0 == req.m_iReqType)) | |
632 { | |
633 if (NULL == tmp) | |
634 tmp = new char [m_iPayloadSize]; | |
635 res.serialize(tmp, m_iPayloadSize); | |
636 | |
637 req.m_iReqType = -1; | |
638 request.m_iID = res.m_iID; | |
639 response.setLength(-1); | |
640 } | |
641 } | |
642 else | |
643 { | |
644 // set cookie | |
645 if (1 == res.m_iReqType) | |
646 { | |
647 req.m_iReqType = -1; | |
648 req.m_iCookie = res.m_iCookie; | |
649 response.setLength(-1); | |
650 } | |
651 } | |
652 } | |
653 | |
654 // new request/response should be sent out immediately on receving a re
sponse | |
655 last_req_time = 0; | |
656 } | |
657 | |
658 if (response.getLength() > 0) | |
659 { | |
660 memcpy(m_piSelfIP, res.m_piPeerIP, 16); | |
661 break; | |
662 } | |
663 | |
664 if (CTimer::getTime() > entertime + timeo) | |
665 { | |
666 // timeout | |
667 e = CUDTException(1, 1, 0); | |
668 break; | |
669 } | |
670 } | |
671 | |
672 delete [] tmp; | |
673 delete [] reqdata; | |
674 delete [] resdata; | |
675 | |
676 // remove from rendezvous queue | |
677 m_pRcvQueue->m_pRendezvousQueue->remove(m_SocketID); | |
678 | |
679 if (e.getErrorCode() == 0) | |
680 { | |
681 if (m_bClosing) // if the socket
is closed before connection... | |
682 e = CUDTException(1); | |
683 else if (1002 == res.m_iReqType) // connection re
quest rejected | |
684 e = CUDTException(1, 2, 0); | |
685 else if ((!m_bRendezvous) && (m_iISN != res.m_iISN)) // secuity check | |
686 e = CUDTException(1, 4, 0); | |
687 } | |
688 | |
689 if (e.getErrorCode() != 0) | |
690 { | |
691 throw e; | |
692 } | |
693 | |
694 // Got it. Re-configure according to the negotiated values. | |
695 m_iMSS = res.m_iMSS; | |
696 m_iFlowWindowSize = res.m_iFlightFlagSize; | |
697 m_iPktSize = m_iMSS - 28; | |
698 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; | |
699 m_iPeerISN = res.m_iISN; | |
700 m_iRcvLastAck = res.m_iISN; | |
701 m_iRcvLastAckAck = res.m_iISN; | |
702 m_iRcvCurrSeqNo = res.m_iISN - 1; | |
703 m_PeerID = res.m_iID; | |
704 | |
705 // Prepare all data structures | |
706 try | |
707 { | |
708 m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); | |
709 m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize); | |
710 // after introducing lite ACK, the sndlosslist may not be cleared in time,
so it requires twice space. | |
711 m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); | |
712 m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); | |
713 m_pACKWindow = new CACKWindow(1024); | |
714 m_pRcvTimeWindow = new CPktTimeWindow(16, 64); | |
715 m_pSndTimeWindow = new CPktTimeWindow(); | |
716 } | |
717 catch (...) | |
718 { | |
719 throw CUDTException(3, 2, 0); | |
720 } | |
721 | |
722 m_pCC = m_pCCFactory->create(); | |
723 m_pCC->m_UDT = m_SocketID; | |
724 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); | |
725 m_dCongestionWindow = m_pCC->m_dCWndSize; | |
726 | |
727 CInfoBlock ib; | |
728 ib.m_iIPversion = m_iIPversion; | |
729 CInfoBlock::convert(serv_addr, m_iIPversion, ib.m_piIP); | |
730 if (m_pCache->lookup(&ib) >= 0) | |
731 { | |
732 m_iRTT = ib.m_iRTT; | |
733 m_iBandwidth = ib.m_iBandwidth; | |
734 } | |
735 | |
736 m_pCC->setMSS(m_iMSS); | |
737 m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize); | |
738 m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo); | |
739 m_pCC->setRcvRate(m_iDeliveryRate); | |
740 m_pCC->setRTT(m_iRTT); | |
741 m_pCC->setBandwidth(m_iBandwidth); | |
742 if (m_llMaxBW > 0) m_pCC->setUserParam((char*)&(m_llMaxBW), 8); | |
743 m_pCC->init(); | |
744 | |
745 m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (socka
ddr*)new sockaddr_in6; | |
746 memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in
) : sizeof(sockaddr_in6)); | |
747 | |
748 // And, I am connected too. | |
749 m_bConnected = true; | |
750 | |
751 // register this socket for receiving data packets | |
752 m_pRNode->m_bOnList = true; | |
753 m_pRcvQueue->setNewEntry(this); | |
754 | |
755 // acknowledde any waiting epolls to read/write | |
756 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); | |
757 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); | |
758 } | |
759 | |
760 void CUDT::connect(const sockaddr* peer, CHandShake* hs) | |
761 { | |
762 CGuard cg(m_ConnectionLock); | |
763 | |
764 // Uses the smaller MSS between the peers | |
765 if (hs->m_iMSS > m_iMSS) | |
766 hs->m_iMSS = m_iMSS; | |
767 else | |
768 m_iMSS = hs->m_iMSS; | |
769 | |
770 // exchange info for maximum flow window size | |
771 m_iFlowWindowSize = hs->m_iFlightFlagSize; | |
772 hs->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize :
m_iFlightFlagSize; | |
773 | |
774 m_iPeerISN = hs->m_iISN; | |
775 | |
776 m_iRcvLastAck = hs->m_iISN; | |
777 m_iRcvLastAckAck = hs->m_iISN; | |
778 m_iRcvCurrSeqNo = hs->m_iISN - 1; | |
779 | |
780 m_PeerID = hs->m_iID; | |
781 hs->m_iID = m_SocketID; | |
782 | |
783 // use peer's ISN and send it back for security check | |
784 m_iISN = hs->m_iISN; | |
785 | |
786 m_iLastDecSeq = m_iISN - 1; | |
787 m_iSndLastAck = m_iISN; | |
788 m_iSndLastDataAck = m_iISN; | |
789 m_iSndCurrSeqNo = m_iISN - 1; | |
790 m_iSndLastAck2 = m_iISN; | |
791 m_ullSndLastAck2Time = CTimer::getTime(); | |
792 | |
793 // this is a reponse handshake | |
794 hs->m_iReqType = -1; | |
795 | |
796 // get local IP address and send the peer its IP address (because UDP cannot
get local IP address) | |
797 memcpy(m_piSelfIP, hs->m_piPeerIP, 16); | |
798 CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion); | |
799 | |
800 m_iPktSize = m_iMSS - 28; | |
801 m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; | |
802 | |
803 // Prepare all structures | |
804 try | |
805 { | |
806 m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); | |
807 m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize); | |
808 m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); | |
809 m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); | |
810 m_pACKWindow = new CACKWindow(1024); | |
811 m_pRcvTimeWindow = new CPktTimeWindow(16, 64); | |
812 m_pSndTimeWindow = new CPktTimeWindow(); | |
813 } | |
814 catch (...) | |
815 { | |
816 throw CUDTException(3, 2, 0); | |
817 } | |
818 | |
819 m_pCC = m_pCCFactory->create(); | |
820 m_pCC->m_UDT = m_SocketID; | |
821 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); | |
822 m_dCongestionWindow = m_pCC->m_dCWndSize; | |
823 | |
824 CInfoBlock ib; | |
825 ib.m_iIPversion = m_iIPversion; | |
826 CInfoBlock::convert(peer, m_iIPversion, ib.m_piIP); | |
827 if (m_pCache->lookup(&ib) >= 0) | |
828 { | |
829 m_iRTT = ib.m_iRTT; | |
830 m_iBandwidth = ib.m_iBandwidth; | |
831 } | |
832 | |
833 m_pCC->setMSS(m_iMSS); | |
834 m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize); | |
835 m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo); | |
836 m_pCC->setRcvRate(m_iDeliveryRate); | |
837 m_pCC->setRTT(m_iRTT); | |
838 m_pCC->setBandwidth(m_iBandwidth); | |
839 if (m_llMaxBW > 0) m_pCC->setUserParam((char*)&(m_llMaxBW), 8); | |
840 m_pCC->init(); | |
841 | |
842 m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (socka
ddr*)new sockaddr_in6; | |
843 memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : s
izeof(sockaddr_in6)); | |
844 | |
845 // And of course, it is connected. | |
846 m_bConnected = true; | |
847 | |
848 // register this socket for receiving data packets | |
849 m_pRNode->m_bOnList = true; | |
850 m_pRcvQueue->setNewEntry(this); | |
851 | |
852 //send the response to the peer, see listen() for more discussions about this | |
853 CPacket response; | |
854 int size = CHandShake::m_iContentSize; | |
855 char* buffer = new char[size]; | |
856 hs->serialize(buffer, size); | |
857 response.pack(0, NULL, buffer, size); | |
858 response.m_iID = m_PeerID; | |
859 m_pSndQueue->sendto(peer, response); | |
860 delete [] buffer; | |
861 } | |
862 | |
863 void CUDT::close() | |
864 { | |
865 if (!m_bOpened) | |
866 return; | |
867 | |
868 if (0 != m_Linger.l_onoff) | |
869 { | |
870 uint64_t entertime = CTimer::getTime(); | |
871 | |
872 while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0)
&& (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) | |
873 { | |
874 // linger has been checked by previous close() call and has expired | |
875 if (m_ullLingerExpiration >= entertime) | |
876 break; | |
877 | |
878 if (!m_bSynSending) | |
879 { | |
880 // if this socket enables asynchronous sending, return immediately a
nd let GC to close it later | |
881 if (0 == m_ullLingerExpiration) | |
882 m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000UL
L; | |
883 | |
884 return; | |
885 } | |
886 | |
887 #ifndef WIN32 | |
888 timespec ts; | |
889 ts.tv_sec = 0; | |
890 ts.tv_nsec = 1000000; | |
891 nanosleep(&ts, NULL); | |
892 #else | |
893 Sleep(1); | |
894 #endif | |
895 } | |
896 } | |
897 | |
898 // remove this socket from the snd queue | |
899 if (m_bConnected) | |
900 m_pSndQueue->m_pSndUList->remove(this); | |
901 | |
902 // remove itself from all epoll monitoring | |
903 for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i) | |
904 s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID); | |
905 | |
906 CGuard cg(m_ConnectionLock); | |
907 | |
908 if (!m_bOpened) | |
909 return; | |
910 | |
911 // Inform the threads handler to stop. | |
912 m_bClosing = true; | |
913 | |
914 // Signal the sender and recver if they are waiting for data. | |
915 releaseSynch(); | |
916 | |
917 if (m_bListening) | |
918 { | |
919 m_bListening = false; | |
920 m_pRcvQueue->removeListener(this); | |
921 } | |
922 if (m_bConnected) | |
923 { | |
924 if (!m_bShutdown) | |
925 sendCtrl(5); | |
926 | |
927 m_pCC->close(); | |
928 | |
929 CInfoBlock ib; | |
930 ib.m_iIPversion = m_iIPversion; | |
931 CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP); | |
932 ib.m_iRTT = m_iRTT; | |
933 ib.m_iBandwidth = m_iBandwidth; | |
934 m_pCache->update(&ib); | |
935 | |
936 m_bConnected = false; | |
937 } | |
938 | |
939 // waiting all send and recv calls to stop | |
940 CGuard sendguard(m_SendLock); | |
941 CGuard recvguard(m_RecvLock); | |
942 | |
943 // CLOSED. | |
944 m_bOpened = false; | |
945 } | |
946 | |
947 int CUDT::send(const char* data, const int& len) | |
948 { | |
949 if (UDT_DGRAM == m_iSockType) | |
950 throw CUDTException(5, 10, 0); | |
951 | |
952 // throw an exception if not connected | |
953 if (m_bBroken || m_bClosing) | |
954 throw CUDTException(2, 1, 0); | |
955 else if (!m_bConnected) | |
956 throw CUDTException(2, 2, 0); | |
957 | |
958 if (len <= 0) | |
959 return 0; | |
960 | |
961 CGuard sendguard(m_SendLock); | |
962 | |
963 if (m_pSndBuffer->getCurrBufSize() == 0) | |
964 { | |
965 // delay the EXP timer to avoid mis-fired timeout | |
966 uint64_t currtime; | |
967 CTimer::rdtsc(currtime); | |
968 m_ullNextEXPTime = currtime + m_ullEXPInt; | |
969 } | |
970 | |
971 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) | |
972 { | |
973 if (!m_bSynSending) | |
974 throw CUDTException(6, 1, 0); | |
975 else | |
976 { | |
977 // wait here during a blocking sending | |
978 #ifndef WIN32 | |
979 pthread_mutex_lock(&m_SendBlockLock); | |
980 if (m_iSndTimeOut < 0) | |
981 { | |
982 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz
e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) | |
983 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); | |
984 } | |
985 else | |
986 { | |
987 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; | |
988 timespec locktime; | |
989 | |
990 locktime.tv_sec = exptime / 1000000; | |
991 locktime.tv_nsec = (exptime % 1000000) * 1000; | |
992 | |
993 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz
e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < ex
ptime)) | |
994 pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &lo
cktime); | |
995 } | |
996 pthread_mutex_unlock(&m_SendBlockLock); | |
997 #else | |
998 if (m_iSndTimeOut < 0) | |
999 { | |
1000 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz
e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) | |
1001 WaitForSingleObject(m_SendBlockCond, INFINITE); | |
1002 } | |
1003 else | |
1004 { | |
1005 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; | |
1006 | |
1007 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSiz
e <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < ex
ptime)) | |
1008 WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::
getTime()) / 1000)); | |
1009 } | |
1010 #endif | |
1011 | |
1012 // check the connection status | |
1013 if (m_bBroken || m_bClosing) | |
1014 throw CUDTException(2, 1, 0); | |
1015 else if (!m_bConnected) | |
1016 throw CUDTException(2, 2, 0); | |
1017 else if (!m_bPeerHealth) | |
1018 { | |
1019 m_bPeerHealth = true; | |
1020 throw CUDTException(7); | |
1021 } | |
1022 } | |
1023 } | |
1024 | |
1025 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) | |
1026 { | |
1027 if (m_iSndTimeOut >= 0) | |
1028 throw CUDTException(6, 1, 0); | |
1029 | |
1030 return 0; | |
1031 } | |
1032 | |
1033 int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize; | |
1034 if (size > len) | |
1035 size = len; | |
1036 | |
1037 // record total time used for sending | |
1038 if (0 == m_pSndBuffer->getCurrBufSize()) | |
1039 m_llSndDurationCounter = CTimer::getTime(); | |
1040 | |
1041 // insert the user buffer into the sening list | |
1042 m_pSndBuffer->addBuffer(data, size); | |
1043 | |
1044 // insert this socket to snd list if it is not on the list yet | |
1045 m_pSndQueue->m_pSndUList->update(this, false); | |
1046 | |
1047 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) | |
1048 { | |
1049 // write is not available any more | |
1050 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); | |
1051 } | |
1052 | |
1053 return size; | |
1054 } | |
1055 | |
1056 int CUDT::recv(char* data, const int& len) | |
1057 { | |
1058 if (UDT_DGRAM == m_iSockType) | |
1059 throw CUDTException(5, 10, 0); | |
1060 | |
1061 // throw an exception if not connected | |
1062 if (!m_bConnected) | |
1063 throw CUDTException(2, 2, 0); | |
1064 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize())) | |
1065 throw CUDTException(2, 1, 0); | |
1066 | |
1067 if (len <= 0) | |
1068 return 0; | |
1069 | |
1070 CGuard recvguard(m_RecvLock); | |
1071 | |
1072 if (0 == m_pRcvBuffer->getRcvDataSize()) | |
1073 { | |
1074 if (!m_bSynRecving) | |
1075 throw CUDTException(6, 2, 0); | |
1076 else | |
1077 { | |
1078 #ifndef WIN32 | |
1079 pthread_mutex_lock(&m_RecvDataLock); | |
1080 if (m_iRcvTimeOut < 0) | |
1081 { | |
1082 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB
uffer->getRcvDataSize())) | |
1083 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); | |
1084 } | |
1085 else | |
1086 { | |
1087 uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL; | |
1088 timespec locktime; | |
1089 | |
1090 locktime.tv_sec = exptime / 1000000; | |
1091 locktime.tv_nsec = (exptime % 1000000) * 1000; | |
1092 | |
1093 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB
uffer->getRcvDataSize())) | |
1094 { | |
1095 pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &lock
time); | |
1096 if (CTimer::getTime() >= exptime) | |
1097 break; | |
1098 } | |
1099 } | |
1100 pthread_mutex_unlock(&m_RecvDataLock); | |
1101 #else | |
1102 if (m_iRcvTimeOut < 0) | |
1103 { | |
1104 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB
uffer->getRcvDataSize())) | |
1105 WaitForSingleObject(m_RecvDataCond, INFINITE); | |
1106 } | |
1107 else | |
1108 { | |
1109 uint64_t enter_time = CTimer::getTime(); | |
1110 | |
1111 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvB
uffer->getRcvDataSize())) | |
1112 { | |
1113 int diff = int(CTimer::getTime() - enter_time) / 1000; | |
1114 if (diff >= m_iRcvTimeOut) | |
1115 break; | |
1116 WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut - diff
)); | |
1117 } | |
1118 } | |
1119 #endif | |
1120 } | |
1121 } | |
1122 | |
1123 // throw an exception if not connected | |
1124 if (!m_bConnected) | |
1125 throw CUDTException(2, 2, 0); | |
1126 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize())) | |
1127 throw CUDTException(2, 1, 0); | |
1128 | |
1129 int res = m_pRcvBuffer->readBuffer(data, len); | |
1130 | |
1131 if (m_pRcvBuffer->getRcvDataSize() <= 0) | |
1132 { | |
1133 // read is not available any more | |
1134 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); | |
1135 } | |
1136 | |
1137 if ((res <= 0) && (m_iRcvTimeOut >= 0)) | |
1138 throw CUDTException(6, 2, 0); | |
1139 | |
1140 return res; | |
1141 } | |
1142 | |
1143 int CUDT::sendmsg(const char* data, const int& len, const int& msttl, const bool
& inorder) | |
1144 { | |
1145 if (UDT_STREAM == m_iSockType) | |
1146 throw CUDTException(5, 9, 0); | |
1147 | |
1148 // throw an exception if not connected | |
1149 if (m_bBroken || m_bClosing) | |
1150 throw CUDTException(2, 1, 0); | |
1151 else if (!m_bConnected) | |
1152 throw CUDTException(2, 2, 0); | |
1153 | |
1154 if (len <= 0) | |
1155 return 0; | |
1156 | |
1157 if (len > m_iSndBufSize * m_iPayloadSize) | |
1158 throw CUDTException(5, 12, 0); | |
1159 | |
1160 CGuard sendguard(m_SendLock); | |
1161 | |
1162 if (m_pSndBuffer->getCurrBufSize() == 0) | |
1163 { | |
1164 // delay the EXP timer to avoid mis-fired timeout | |
1165 uint64_t currtime; | |
1166 CTimer::rdtsc(currtime); | |
1167 m_ullNextEXPTime = currtime + m_ullEXPInt; | |
1168 } | |
1169 | |
1170 if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) | |
1171 { | |
1172 if (!m_bSynSending) | |
1173 throw CUDTException(6, 1, 0); | |
1174 else | |
1175 { | |
1176 // wait here during a blocking sending | |
1177 #ifndef WIN32 | |
1178 pthread_mutex_lock(&m_SendBlockLock); | |
1179 if (m_iSndTimeOut < 0) | |
1180 { | |
1181 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi
ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)) | |
1182 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); | |
1183 } | |
1184 else | |
1185 { | |
1186 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; | |
1187 timespec locktime; | |
1188 | |
1189 locktime.tv_sec = exptime / 1000000; | |
1190 locktime.tv_nsec = (exptime % 1000000) * 1000; | |
1191 | |
1192 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi
ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime
() < exptime)) | |
1193 pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &lo
cktime); | |
1194 } | |
1195 pthread_mutex_unlock(&m_SendBlockLock); | |
1196 #else | |
1197 if (m_iSndTimeOut < 0) | |
1198 { | |
1199 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi
ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)) | |
1200 WaitForSingleObject(m_SendBlockCond, INFINITE); | |
1201 } | |
1202 else | |
1203 { | |
1204 uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; | |
1205 | |
1206 while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSi
ze - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime
() < exptime)) | |
1207 WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::
getTime()) / 1000)); | |
1208 } | |
1209 #endif | |
1210 | |
1211 // check the connection status | |
1212 if (m_bBroken || m_bClosing) | |
1213 throw CUDTException(2, 1, 0); | |
1214 else if (!m_bConnected) | |
1215 throw CUDTException(2, 2, 0); | |
1216 } | |
1217 } | |
1218 | |
1219 if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) | |
1220 { | |
1221 if (m_iSndTimeOut >= 0) | |
1222 throw CUDTException(6, 1, 0); | |
1223 | |
1224 return 0; | |
1225 } | |
1226 | |
1227 // record total time used for sending | |
1228 if (0 == m_pSndBuffer->getCurrBufSize()) | |
1229 m_llSndDurationCounter = CTimer::getTime(); | |
1230 | |
1231 // insert the user buffer into the sening list | |
1232 m_pSndBuffer->addBuffer(data, len, msttl, inorder); | |
1233 | |
1234 // insert this socket to the snd list if it is not on the list yet | |
1235 m_pSndQueue->m_pSndUList->update(this, false); | |
1236 | |
1237 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) | |
1238 { | |
1239 // write is not available any more | |
1240 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); | |
1241 } | |
1242 | |
1243 return len; | |
1244 } | |
1245 | |
1246 int CUDT::recvmsg(char* data, const int& len) | |
1247 { | |
1248 if (UDT_STREAM == m_iSockType) | |
1249 throw CUDTException(5, 9, 0); | |
1250 | |
1251 // throw an exception if not connected | |
1252 if (!m_bConnected) | |
1253 throw CUDTException(2, 2, 0); | |
1254 | |
1255 if (len <= 0) | |
1256 return 0; | |
1257 | |
1258 CGuard recvguard(m_RecvLock); | |
1259 | |
1260 if (m_bBroken || m_bClosing) | |
1261 { | |
1262 int res = m_pRcvBuffer->readMsg(data, len); | |
1263 if (0 == res) | |
1264 { | |
1265 // read is not available | |
1266 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); | |
1267 | |
1268 throw CUDTException(2, 1, 0); | |
1269 } | |
1270 else | |
1271 return res; | |
1272 } | |
1273 | |
1274 if (!m_bSynRecving) | |
1275 { | |
1276 int res = m_pRcvBuffer->readMsg(data, len); | |
1277 if (0 == res) | |
1278 throw CUDTException(6, 2, 0); | |
1279 else | |
1280 return res; | |
1281 } | |
1282 | |
1283 int res = 0; | |
1284 bool timeout = false; | |
1285 | |
1286 do | |
1287 { | |
1288 #ifndef WIN32 | |
1289 pthread_mutex_lock(&m_RecvDataLock); | |
1290 | |
1291 if (m_iRcvTimeOut < 0) | |
1292 { | |
1293 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_p
RcvBuffer->readMsg(data, len)))) | |
1294 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); | |
1295 } | |
1296 else | |
1297 { | |
1298 uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL; | |
1299 timespec locktime; | |
1300 | |
1301 locktime.tv_sec = exptime / 1000000; | |
1302 locktime.tv_nsec = (exptime % 1000000) * 1000; | |
1303 | |
1304 if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &lockti
me) == ETIMEDOUT) | |
1305 timeout = true; | |
1306 | |
1307 res = m_pRcvBuffer->readMsg(data, len); | |
1308 } | |
1309 pthread_mutex_unlock(&m_RecvDataLock); | |
1310 #else | |
1311 if (m_iRcvTimeOut < 0) | |
1312 { | |
1313 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_p
RcvBuffer->readMsg(data, len)))) | |
1314 WaitForSingleObject(m_RecvDataCond, INFINITE); | |
1315 } | |
1316 else | |
1317 { | |
1318 if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAI
T_TIMEOUT) | |
1319 timeout = true; | |
1320 | |
1321 res = m_pRcvBuffer->readMsg(data, len); | |
1322 } | |
1323 #endif | |
1324 | |
1325 if (m_bBroken || m_bClosing) | |
1326 throw CUDTException(2, 1, 0); | |
1327 else if (!m_bConnected) | |
1328 throw CUDTException(2, 2, 0); | |
1329 } while ((0 == res) && !timeout); | |
1330 | |
1331 if (m_pRcvBuffer->getRcvMsgNum() <= 0) | |
1332 { | |
1333 // read is not available any more | |
1334 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); | |
1335 } | |
1336 | |
1337 if ((res <= 0) && (m_iRcvTimeOut >= 0)) | |
1338 throw CUDTException(6, 2, 0); | |
1339 | |
1340 return res; | |
1341 } | |
1342 | |
1343 int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, const int64_t& size, const
int& block) | |
1344 { | |
1345 if (UDT_DGRAM == m_iSockType) | |
1346 throw CUDTException(5, 10, 0); | |
1347 | |
1348 if (m_bBroken || m_bClosing) | |
1349 throw CUDTException(2, 1, 0); | |
1350 else if (!m_bConnected) | |
1351 throw CUDTException(2, 2, 0); | |
1352 | |
1353 if (size <= 0) | |
1354 return 0; | |
1355 | |
1356 CGuard sendguard(m_SendLock); | |
1357 | |
1358 if (m_pSndBuffer->getCurrBufSize() == 0) | |
1359 { | |
1360 // delay the EXP timer to avoid mis-fired timeout | |
1361 uint64_t currtime; | |
1362 CTimer::rdtsc(currtime); | |
1363 m_ullNextEXPTime = currtime + m_ullEXPInt; | |
1364 } | |
1365 | |
1366 int64_t tosend = size; | |
1367 int unitsize; | |
1368 | |
1369 // positioning... | |
1370 try | |
1371 { | |
1372 ifs.seekg((streamoff)offset); | |
1373 } | |
1374 catch (...) | |
1375 { | |
1376 throw CUDTException(4, 1); | |
1377 } | |
1378 | |
1379 // sending block by block | |
1380 while (tosend > 0) | |
1381 { | |
1382 if (ifs.fail()) | |
1383 throw CUDTException(4, 4); | |
1384 | |
1385 if (ifs.eof()) | |
1386 break; | |
1387 | |
1388 unitsize = int((tosend >= block) ? block : tosend); | |
1389 | |
1390 #ifndef WIN32 | |
1391 pthread_mutex_lock(&m_SendBlockLock); | |
1392 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m
_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) | |
1393 pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); | |
1394 pthread_mutex_unlock(&m_SendBlockLock); | |
1395 #else | |
1396 while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m
_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) | |
1397 WaitForSingleObject(m_SendBlockCond, INFINITE); | |
1398 #endif | |
1399 | |
1400 if (m_bBroken || m_bClosing) | |
1401 throw CUDTException(2, 1, 0); | |
1402 else if (!m_bConnected) | |
1403 throw CUDTException(2, 2, 0); | |
1404 else if (!m_bPeerHealth) | |
1405 { | |
1406 // reset peer health status, once this error returns, the app should ha
ndle the situation at the peer side | |
1407 m_bPeerHealth = true; | |
1408 throw CUDTException(7); | |
1409 } | |
1410 | |
1411 // record total time used for sending | |
1412 if (0 == m_pSndBuffer->getCurrBufSize()) | |
1413 m_llSndDurationCounter = CTimer::getTime(); | |
1414 | |
1415 int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize); | |
1416 | |
1417 if (sentsize > 0) | |
1418 { | |
1419 tosend -= sentsize; | |
1420 offset += sentsize; | |
1421 } | |
1422 | |
1423 // insert this socket to snd list if it is not on the list yet | |
1424 m_pSndQueue->m_pSndUList->update(this, false); | |
1425 } | |
1426 | |
1427 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) | |
1428 { | |
1429 // write is not available any more | |
1430 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); | |
1431 } | |
1432 | |
1433 return size - tosend; | |
1434 } | |
1435 | |
1436 int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, const int64_t& size, const
int& block) | |
1437 { | |
1438 if (UDT_DGRAM == m_iSockType) | |
1439 throw CUDTException(5, 10, 0); | |
1440 | |
1441 if (!m_bConnected) | |
1442 throw CUDTException(2, 2, 0); | |
1443 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize())) | |
1444 throw CUDTException(2, 1, 0); | |
1445 | |
1446 if (size <= 0) | |
1447 return 0; | |
1448 | |
1449 CGuard recvguard(m_RecvLock); | |
1450 | |
1451 int64_t torecv = size; | |
1452 int unitsize = block; | |
1453 int recvsize; | |
1454 | |
1455 // positioning... | |
1456 try | |
1457 { | |
1458 ofs.seekp((streamoff)offset); | |
1459 } | |
1460 catch (...) | |
1461 { | |
1462 throw CUDTException(4, 3); | |
1463 } | |
1464 | |
1465 // receiving... "recvfile" is always blocking | |
1466 while (torecv > 0) | |
1467 { | |
1468 if (ofs.fail()) | |
1469 { | |
1470 // send the sender a signal so it will not be blocked forever | |
1471 int32_t err_code = CUDTException::EFILE; | |
1472 sendCtrl(8, &err_code); | |
1473 | |
1474 throw CUDTException(4, 4); | |
1475 } | |
1476 | |
1477 #ifndef WIN32 | |
1478 pthread_mutex_lock(&m_RecvDataLock); | |
1479 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer-
>getRcvDataSize())) | |
1480 pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); | |
1481 pthread_mutex_unlock(&m_RecvDataLock); | |
1482 #else | |
1483 while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer-
>getRcvDataSize())) | |
1484 WaitForSingleObject(m_RecvDataCond, INFINITE); | |
1485 #endif | |
1486 | |
1487 if (!m_bConnected) | |
1488 throw CUDTException(2, 2, 0); | |
1489 else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize()
)) | |
1490 throw CUDTException(2, 1, 0); | |
1491 | |
1492 unitsize = int((torecv >= block) ? block : torecv); | |
1493 recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize); | |
1494 | |
1495 if (recvsize > 0) | |
1496 { | |
1497 torecv -= recvsize; | |
1498 offset += recvsize; | |
1499 } | |
1500 } | |
1501 | |
1502 if (m_pRcvBuffer->getRcvDataSize() <= 0) | |
1503 { | |
1504 // read is not available any more | |
1505 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); | |
1506 } | |
1507 | |
1508 return size - torecv; | |
1509 } | |
1510 | |
1511 void CUDT::sample(CPerfMon* perf, bool clear) | |
1512 { | |
1513 if (!m_bConnected) | |
1514 throw CUDTException(2, 2, 0); | |
1515 if (m_bBroken || m_bClosing) | |
1516 throw CUDTException(2, 1, 0); | |
1517 | |
1518 uint64_t currtime = CTimer::getTime(); | |
1519 perf->msTimeStamp = (currtime - m_StartTime) / 1000; | |
1520 | |
1521 perf->pktSent = m_llTraceSent; | |
1522 perf->pktRecv = m_llTraceRecv; | |
1523 perf->pktSndLoss = m_iTraceSndLoss; | |
1524 perf->pktRcvLoss = m_iTraceRcvLoss; | |
1525 perf->pktRetrans = m_iTraceRetrans; | |
1526 perf->pktSentACK = m_iSentACK; | |
1527 perf->pktRecvACK = m_iRecvACK; | |
1528 perf->pktSentNAK = m_iSentNAK; | |
1529 perf->pktRecvNAK = m_iRecvNAK; | |
1530 perf->usSndDuration = m_llSndDuration; | |
1531 | |
1532 perf->pktSentTotal = m_llSentTotal; | |
1533 perf->pktRecvTotal = m_llRecvTotal; | |
1534 perf->pktSndLossTotal = m_iSndLossTotal; | |
1535 perf->pktRcvLossTotal = m_iRcvLossTotal; | |
1536 perf->pktRetransTotal = m_iRetransTotal; | |
1537 perf->pktSentACKTotal = m_iSentACKTotal; | |
1538 perf->pktRecvACKTotal = m_iRecvACKTotal; | |
1539 perf->pktSentNAKTotal = m_iSentNAKTotal; | |
1540 perf->pktRecvNAKTotal = m_iRecvNAKTotal; | |
1541 perf->usSndDurationTotal = m_llSndDurationTotal; | |
1542 | |
1543 double interval = double(currtime - m_LastSampleTime); | |
1544 | |
1545 perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval; | |
1546 perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval; | |
1547 | |
1548 perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency); | |
1549 perf->pktFlowWindow = m_iFlowWindowSize; | |
1550 perf->pktCongestionWindow = (int)m_dCongestionWindow; | |
1551 perf->pktFlightSize = CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSe
qNo::incseq(m_iSndCurrSeqNo)) - 1; | |
1552 perf->msRTT = m_iRTT/1000.0; | |
1553 perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0; | |
1554 | |
1555 #ifndef WIN32 | |
1556 if (0 == pthread_mutex_trylock(&m_ConnectionLock)) | |
1557 #else | |
1558 if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0)) | |
1559 #endif | |
1560 { | |
1561 perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : (m_iSndBufSize - m_pS
ndBuffer->getCurrBufSize()) * m_iMSS; | |
1562 perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvai
lBufSize() * m_iMSS; | |
1563 | |
1564 #ifndef WIN32 | |
1565 pthread_mutex_unlock(&m_ConnectionLock); | |
1566 #else | |
1567 ReleaseMutex(m_ConnectionLock); | |
1568 #endif | |
1569 } | |
1570 else | |
1571 { | |
1572 perf->byteAvailSndBuf = 0; | |
1573 perf->byteAvailRcvBuf = 0; | |
1574 } | |
1575 | |
1576 if (clear) | |
1577 { | |
1578 m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTra
ceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; | |
1579 m_llSndDuration = 0; | |
1580 m_LastSampleTime = currtime; | |
1581 } | |
1582 } | |
1583 | |
1584 void CUDT::initSynch() | |
1585 { | |
1586 #ifndef WIN32 | |
1587 pthread_mutex_init(&m_SendBlockLock, NULL); | |
1588 pthread_cond_init(&m_SendBlockCond, NULL); | |
1589 pthread_mutex_init(&m_RecvDataLock, NULL); | |
1590 pthread_cond_init(&m_RecvDataCond, NULL); | |
1591 pthread_mutex_init(&m_SendLock, NULL); | |
1592 pthread_mutex_init(&m_RecvLock, NULL); | |
1593 pthread_mutex_init(&m_AckLock, NULL); | |
1594 pthread_mutex_init(&m_ConnectionLock, NULL); | |
1595 #else | |
1596 m_SendBlockLock = CreateMutex(NULL, false, NULL); | |
1597 m_SendBlockCond = CreateEvent(NULL, false, false, NULL); | |
1598 m_RecvDataLock = CreateMutex(NULL, false, NULL); | |
1599 m_RecvDataCond = CreateEvent(NULL, false, false, NULL); | |
1600 m_SendLock = CreateMutex(NULL, false, NULL); | |
1601 m_RecvLock = CreateMutex(NULL, false, NULL); | |
1602 m_AckLock = CreateMutex(NULL, false, NULL); | |
1603 m_ConnectionLock = CreateMutex(NULL, false, NULL); | |
1604 #endif | |
1605 } | |
1606 | |
1607 void CUDT::destroySynch() | |
1608 { | |
1609 #ifndef WIN32 | |
1610 pthread_mutex_destroy(&m_SendBlockLock); | |
1611 pthread_cond_destroy(&m_SendBlockCond); | |
1612 pthread_mutex_destroy(&m_RecvDataLock); | |
1613 pthread_cond_destroy(&m_RecvDataCond); | |
1614 pthread_mutex_destroy(&m_SendLock); | |
1615 pthread_mutex_destroy(&m_RecvLock); | |
1616 pthread_mutex_destroy(&m_AckLock); | |
1617 pthread_mutex_destroy(&m_ConnectionLock); | |
1618 #else | |
1619 CloseHandle(m_SendBlockLock); | |
1620 CloseHandle(m_SendBlockCond); | |
1621 CloseHandle(m_RecvDataLock); | |
1622 CloseHandle(m_RecvDataCond); | |
1623 CloseHandle(m_SendLock); | |
1624 CloseHandle(m_RecvLock); | |
1625 CloseHandle(m_AckLock); | |
1626 CloseHandle(m_ConnectionLock); | |
1627 #endif | |
1628 } | |
1629 | |
1630 void CUDT::releaseSynch() | |
1631 { | |
1632 #ifndef WIN32 | |
1633 // wake up user calls | |
1634 pthread_mutex_lock(&m_SendBlockLock); | |
1635 pthread_cond_signal(&m_SendBlockCond); | |
1636 pthread_mutex_unlock(&m_SendBlockLock); | |
1637 | |
1638 pthread_mutex_lock(&m_SendLock); | |
1639 pthread_mutex_unlock(&m_SendLock); | |
1640 | |
1641 pthread_mutex_lock(&m_RecvDataLock); | |
1642 pthread_cond_signal(&m_RecvDataCond); | |
1643 pthread_mutex_unlock(&m_RecvDataLock); | |
1644 | |
1645 pthread_mutex_lock(&m_RecvLock); | |
1646 pthread_mutex_unlock(&m_RecvLock); | |
1647 #else | |
1648 SetEvent(m_SendBlockCond); | |
1649 WaitForSingleObject(m_SendLock, INFINITE); | |
1650 ReleaseMutex(m_SendLock); | |
1651 SetEvent(m_RecvDataCond); | |
1652 WaitForSingleObject(m_RecvLock, INFINITE); | |
1653 ReleaseMutex(m_RecvLock); | |
1654 #endif | |
1655 } | |
1656 | |
1657 void CUDT::sendCtrl(const int& pkttype, void* lparam, void* rparam, const int& s
ize) | |
1658 { | |
1659 CPacket ctrlpkt; | |
1660 | |
1661 switch (pkttype) | |
1662 { | |
1663 case 2: //010 - Acknowledgement | |
1664 { | |
1665 int32_t ack; | |
1666 | |
1667 // If there is no loss, the ACK is the current largest sequence number plu
s 1; | |
1668 // Otherwise it is the smallest sequence number in the receiver loss list. | |
1669 if (0 == m_pRcvLossList->getLossLength()) | |
1670 ack = CSeqNo::incseq(m_iRcvCurrSeqNo); | |
1671 else | |
1672 ack = m_pRcvLossList->getFirstLostSeq(); | |
1673 | |
1674 if (ack == m_iRcvLastAckAck) | |
1675 break; | |
1676 | |
1677 // send out a lite ACK | |
1678 // to save time on buffer processing and bandwidth/AS measurement, a lite
ACK only feeds back an ACK number | |
1679 if (4 == size) | |
1680 { | |
1681 ctrlpkt.pack(pkttype, NULL, &ack, size); | |
1682 ctrlpkt.m_iID = m_PeerID; | |
1683 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1684 | |
1685 break; | |
1686 } | |
1687 | |
1688 uint64_t currtime; | |
1689 CTimer::rdtsc(currtime); | |
1690 | |
1691 // There are new received packets to acknowledge, update related informati
on. | |
1692 if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0) | |
1693 { | |
1694 int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack); | |
1695 | |
1696 m_iRcvLastAck = ack; | |
1697 | |
1698 m_pRcvBuffer->ackData(acksize); | |
1699 | |
1700 // signal a waiting "recv" call if there is any data available | |
1701 #ifndef WIN32 | |
1702 pthread_mutex_lock(&m_RecvDataLock); | |
1703 if (m_bSynRecving) | |
1704 pthread_cond_signal(&m_RecvDataCond); | |
1705 pthread_mutex_unlock(&m_RecvDataLock); | |
1706 #else | |
1707 if (m_bSynRecving) | |
1708 SetEvent(m_RecvDataCond); | |
1709 #endif | |
1710 | |
1711 // acknowledge any waiting epolls to read | |
1712 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); | |
1713 } | |
1714 else if (ack == m_iRcvLastAck) | |
1715 { | |
1716 if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCP
UFrequency)) | |
1717 break; | |
1718 } | |
1719 else | |
1720 break; | |
1721 | |
1722 // Send out the ACK only if has not been received by the sender before | |
1723 if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0) | |
1724 { | |
1725 int32_t data[6]; | |
1726 | |
1727 m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo); | |
1728 data[0] = m_iRcvLastAck; | |
1729 data[1] = m_iRTT; | |
1730 data[2] = m_iRTTVar; | |
1731 data[3] = m_pRcvBuffer->getAvailBufSize(); | |
1732 // a minimum flow window of 2 is used, even if buffer is full, to break
potential deadlock | |
1733 if (data[3] < 2) | |
1734 data[3] = 2; | |
1735 | |
1736 if (currtime - m_ullLastAckTime > m_ullSYNInt) | |
1737 { | |
1738 data[4] = m_pRcvTimeWindow->getPktRcvSpeed(); | |
1739 data[5] = m_pRcvTimeWindow->getBandwidth(); | |
1740 ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 24); | |
1741 | |
1742 CTimer::rdtsc(m_ullLastAckTime); | |
1743 } | |
1744 else | |
1745 { | |
1746 ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 16); | |
1747 } | |
1748 | |
1749 ctrlpkt.m_iID = m_PeerID; | |
1750 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1751 | |
1752 m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck); | |
1753 | |
1754 ++ m_iSentACK; | |
1755 ++ m_iSentACKTotal; | |
1756 } | |
1757 | |
1758 break; | |
1759 } | |
1760 | |
1761 case 6: //110 - Acknowledgement of Acknowledgement | |
1762 ctrlpkt.pack(pkttype, lparam); | |
1763 ctrlpkt.m_iID = m_PeerID; | |
1764 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1765 | |
1766 break; | |
1767 | |
1768 case 3: //011 - Loss Report | |
1769 { | |
1770 if (NULL != rparam) | |
1771 { | |
1772 if (1 == size) | |
1773 { | |
1774 // only 1 loss packet | |
1775 ctrlpkt.pack(pkttype, NULL, (int32_t *)rparam + 1, 4); | |
1776 } | |
1777 else | |
1778 { | |
1779 // more than 1 loss packets | |
1780 ctrlpkt.pack(pkttype, NULL, rparam, 8); | |
1781 } | |
1782 | |
1783 ctrlpkt.m_iID = m_PeerID; | |
1784 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1785 | |
1786 ++ m_iSentNAK; | |
1787 ++ m_iSentNAKTotal; | |
1788 } | |
1789 else if (m_pRcvLossList->getLossLength() > 0) | |
1790 { | |
1791 // this is periodically NAK report; make sure NAK cannot be sent back t
oo often | |
1792 | |
1793 // read loss list from the local receiver loss list | |
1794 int32_t* data = new int32_t[m_iPayloadSize / 4]; | |
1795 int losslen; | |
1796 m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4); | |
1797 | |
1798 if (0 < losslen) | |
1799 { | |
1800 ctrlpkt.pack(pkttype, NULL, data, losslen * 4); | |
1801 ctrlpkt.m_iID = m_PeerID; | |
1802 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1803 | |
1804 ++ m_iSentNAK; | |
1805 ++ m_iSentNAKTotal; | |
1806 } | |
1807 | |
1808 delete [] data; | |
1809 } | |
1810 | |
1811 // update next NAK time, which should wait enough time for the retansmissi
on, but not too long | |
1812 m_ullNAKInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency; | |
1813 int rcv_speed = m_pRcvTimeWindow->getPktRcvSpeed(); | |
1814 if (rcv_speed > 0) | |
1815 m_ullNAKInt += (m_pRcvLossList->getLossLength() * 1000000ULL / rcv_spee
d) * m_ullCPUFrequency; | |
1816 if (m_ullNAKInt < m_ullMinNakInt) | |
1817 m_ullNAKInt = m_ullMinNakInt; | |
1818 | |
1819 break; | |
1820 } | |
1821 | |
1822 case 4: //100 - Congestion Warning | |
1823 ctrlpkt.pack(pkttype); | |
1824 ctrlpkt.m_iID = m_PeerID; | |
1825 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1826 | |
1827 CTimer::rdtsc(m_ullLastWarningTime); | |
1828 | |
1829 break; | |
1830 | |
1831 case 1: //001 - Keep-alive | |
1832 ctrlpkt.pack(pkttype); | |
1833 ctrlpkt.m_iID = m_PeerID; | |
1834 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1835 | |
1836 break; | |
1837 | |
1838 case 0: //000 - Handshake | |
1839 ctrlpkt.pack(pkttype, NULL, rparam, sizeof(CHandShake)); | |
1840 ctrlpkt.m_iID = m_PeerID; | |
1841 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1842 | |
1843 break; | |
1844 | |
1845 case 5: //101 - Shutdown | |
1846 ctrlpkt.pack(pkttype); | |
1847 ctrlpkt.m_iID = m_PeerID; | |
1848 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1849 | |
1850 break; | |
1851 | |
1852 case 7: //111 - Msg drop request | |
1853 ctrlpkt.pack(pkttype, lparam, rparam, 8); | |
1854 ctrlpkt.m_iID = m_PeerID; | |
1855 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1856 | |
1857 break; | |
1858 | |
1859 case 8: //1000 - acknowledge the peer side a special error | |
1860 ctrlpkt.pack(pkttype, lparam); | |
1861 ctrlpkt.m_iID = m_PeerID; | |
1862 m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); | |
1863 | |
1864 break; | |
1865 | |
1866 case 32767: //0x7FFF - Resevered for future use | |
1867 break; | |
1868 | |
1869 default: | |
1870 break; | |
1871 } | |
1872 } | |
1873 | |
1874 void CUDT::processCtrl(CPacket& ctrlpkt) | |
1875 { | |
1876 // Just heard from the peer, reset the expiration count. | |
1877 m_iEXPCount = 1; | |
1878 m_llLastRspTime = CTimer::getTime(); | |
1879 | |
1880 if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getTy
pe()) || (3 == ctrlpkt.getType())) | |
1881 { | |
1882 uint64_t currtime; | |
1883 CTimer::rdtsc(currtime); | |
1884 if (!m_pCC->m_bUserDefinedRTO) | |
1885 m_ullNextEXPTime = currtime + m_ullEXPInt; | |
1886 else | |
1887 m_ullNextEXPTime = currtime + m_pCC->m_iRTO * m_ullCPUFrequency; | |
1888 } | |
1889 | |
1890 switch (ctrlpkt.getType()) | |
1891 { | |
1892 case 2: //010 - Acknowledgement | |
1893 { | |
1894 int32_t ack; | |
1895 | |
1896 // process a lite ACK | |
1897 if (4 == ctrlpkt.getLength()) | |
1898 { | |
1899 ack = *(int32_t *)ctrlpkt.m_pcData; | |
1900 if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) | |
1901 { | |
1902 m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastA
ck), ack); | |
1903 m_iSndLastAck = ack; | |
1904 } | |
1905 | |
1906 break; | |
1907 } | |
1908 | |
1909 // read ACK seq. no. | |
1910 ack = ctrlpkt.getAckSeqNo(); | |
1911 | |
1912 // send ACK acknowledgement | |
1913 // number of ACK2 can be much less than number of ACK | |
1914 uint64_t currtime = CTimer::getTime(); | |
1915 if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack =
= m_iSndLastAck2)) | |
1916 { | |
1917 sendCtrl(6, &ack); | |
1918 m_iSndLastAck2 = ack; | |
1919 m_ullSndLastAck2Time = currtime; | |
1920 } | |
1921 | |
1922 // Got data ACK | |
1923 ack = *(int32_t *)ctrlpkt.m_pcData; | |
1924 | |
1925 // check the validation of the ack | |
1926 if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) | |
1927 { | |
1928 //this should not happen: attack or bug | |
1929 m_bBroken = true; | |
1930 m_iBrokenCounter = 0; | |
1931 break; | |
1932 } | |
1933 | |
1934 if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) | |
1935 { | |
1936 // Update Flow Window Size, must update before and together with m_iSnd
LastAck | |
1937 m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3); | |
1938 m_iSndLastAck = ack; | |
1939 } | |
1940 | |
1941 // protect packet retransmission | |
1942 CGuard::enterCS(m_AckLock); | |
1943 | |
1944 int offset = CSeqNo::seqoff((int32_t&)m_iSndLastDataAck, ack); | |
1945 if (offset <= 0) | |
1946 { | |
1947 // discard it if it is a repeated ACK | |
1948 CGuard::leaveCS(m_AckLock); | |
1949 break; | |
1950 } | |
1951 | |
1952 // acknowledge the sending buffer | |
1953 m_pSndBuffer->ackData(offset); | |
1954 | |
1955 // record total time used for sending | |
1956 m_llSndDuration += currtime - m_llSndDurationCounter; | |
1957 m_llSndDurationTotal += currtime - m_llSndDurationCounter; | |
1958 m_llSndDurationCounter = currtime; | |
1959 | |
1960 // update sending variables | |
1961 m_iSndLastDataAck = ack; | |
1962 m_pSndLossList->remove(CSeqNo::decseq((int32_t&)m_iSndLastDataAck)); | |
1963 | |
1964 CGuard::leaveCS(m_AckLock); | |
1965 | |
1966 #ifndef WIN32 | |
1967 pthread_mutex_lock(&m_SendBlockLock); | |
1968 if (m_bSynSending) | |
1969 pthread_cond_signal(&m_SendBlockCond); | |
1970 pthread_mutex_unlock(&m_SendBlockLock); | |
1971 #else | |
1972 if (m_bSynSending) | |
1973 SetEvent(m_SendBlockCond); | |
1974 #endif | |
1975 | |
1976 // acknowledde any waiting epolls to write | |
1977 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); | |
1978 | |
1979 // insert this socket to snd list if it is not on the list yet | |
1980 m_pSndQueue->m_pSndUList->update(this, false); | |
1981 | |
1982 // Update RTT | |
1983 //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1); | |
1984 //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2); | |
1985 int rtt = *((int32_t *)ctrlpkt.m_pcData + 1); | |
1986 m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; | |
1987 m_iRTT = (m_iRTT * 7 + rtt) >> 3; | |
1988 | |
1989 m_pCC->setRTT(m_iRTT); | |
1990 | |
1991 m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; | |
1992 if (m_ullEXPInt < m_ullMinExpInt) | |
1993 m_ullEXPInt = m_ullMinExpInt; | |
1994 | |
1995 if (ctrlpkt.getLength() > 16) | |
1996 { | |
1997 // Update Estimated Bandwidth and packet delivery rate | |
1998 if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0) | |
1999 m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcDa
ta + 4)) >> 3; | |
2000 | |
2001 if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0) | |
2002 m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5
)) >> 3; | |
2003 | |
2004 m_pCC->setRcvRate(m_iDeliveryRate); | |
2005 m_pCC->setBandwidth(m_iBandwidth); | |
2006 } | |
2007 | |
2008 m_pCC->onACK(ack); | |
2009 // update CC parameters | |
2010 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); | |
2011 m_dCongestionWindow = m_pCC->m_dCWndSize; | |
2012 | |
2013 ++ m_iRecvACK; | |
2014 ++ m_iRecvACKTotal; | |
2015 | |
2016 break; | |
2017 } | |
2018 | |
2019 case 6: //110 - Acknowledgement of Acknowledgement | |
2020 { | |
2021 int32_t ack; | |
2022 int rtt = -1; | |
2023 | |
2024 // update RTT | |
2025 rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack); | |
2026 if (rtt <= 0) | |
2027 break; | |
2028 | |
2029 //if increasing delay detected... | |
2030 // sendCtrl(4); | |
2031 | |
2032 // RTT EWMA | |
2033 m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; | |
2034 m_iRTT = (m_iRTT * 7 + rtt) >> 3; | |
2035 | |
2036 m_pCC->setRTT(m_iRTT); | |
2037 | |
2038 m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; | |
2039 if (m_ullEXPInt < m_ullMinExpInt) | |
2040 m_ullEXPInt = m_ullMinExpInt; | |
2041 | |
2042 // update last ACK that has been received by the sender | |
2043 if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0) | |
2044 m_iRcvLastAckAck = ack; | |
2045 | |
2046 break; | |
2047 } | |
2048 | |
2049 case 3: //011 - Loss Report | |
2050 { | |
2051 int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData); | |
2052 | |
2053 m_pCC->onLoss(losslist, ctrlpkt.getLength() / 4); | |
2054 // update CC parameters | |
2055 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); | |
2056 m_dCongestionWindow = m_pCC->m_dCWndSize; | |
2057 | |
2058 bool secure = true; | |
2059 | |
2060 // decode loss list message and insert loss into the sender loss list | |
2061 for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i) | |
2062 { | |
2063 if (0 != (losslist[i] & 0x80000000)) | |
2064 { | |
2065 if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0)
|| (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSndCurrSeqNo)) > 0)) | |
2066 { | |
2067 // seq_a must not be greater than seq_b; seq_b must not be greate
r than the most recent sent seq | |
2068 secure = false; | |
2069 break; | |
2070 } | |
2071 | |
2072 int num = 0; | |
2073 if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, const_cast<int32_t&>(m_
iSndLastAck)) >= 0) | |
2074 num = m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i
+ 1]); | |
2075 else if (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSnd
LastAck)) >= 0) | |
2076 num = m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck),
losslist[i + 1]); | |
2077 | |
2078 m_iTraceSndLoss += num; | |
2079 m_iSndLossTotal += num; | |
2080 | |
2081 ++ i; | |
2082 } | |
2083 else if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndLastAck
)) >= 0) | |
2084 { | |
2085 if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndCurrSeqNo
)) > 0) | |
2086 { | |
2087 //seq_a must not be greater than the most recent sent seq | |
2088 secure = false; | |
2089 break; | |
2090 } | |
2091 | |
2092 int num = m_pSndLossList->insert(losslist[i], losslist[i]); | |
2093 | |
2094 m_iTraceSndLoss += num; | |
2095 m_iSndLossTotal += num; | |
2096 } | |
2097 } | |
2098 | |
2099 if (!secure) | |
2100 { | |
2101 //this should not happen: attack or bug | |
2102 m_bBroken = true; | |
2103 m_iBrokenCounter = 0; | |
2104 break; | |
2105 } | |
2106 | |
2107 // the lost packet (retransmission) should be sent out immediately | |
2108 m_pSndQueue->m_pSndUList->update(this); | |
2109 | |
2110 ++ m_iRecvNAK; | |
2111 ++ m_iRecvNAKTotal; | |
2112 | |
2113 break; | |
2114 } | |
2115 | |
2116 case 4: //100 - Delay Warning | |
2117 // One way packet delay is increasing, so decrease the sending rate | |
2118 m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125); | |
2119 m_iLastDecSeq = m_iSndCurrSeqNo; | |
2120 | |
2121 break; | |
2122 | |
2123 case 1: //001 - Keep-alive | |
2124 // The only purpose of keep-alive packet is to tell that the peer is still
alive | |
2125 // nothing needs to be done. | |
2126 | |
2127 break; | |
2128 | |
2129 case 0: //000 - Handshake | |
2130 if ((((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType > 0) || (m_bRendezvous
&& (((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType != -2))) | |
2131 { | |
2132 // The peer side has not received the handshake message, so it keeps qu
erying | |
2133 // resend the handshake packet | |
2134 | |
2135 CHandShake initdata; | |
2136 initdata.m_iISN = m_iISN; | |
2137 initdata.m_iMSS = m_iMSS; | |
2138 initdata.m_iFlightFlagSize = m_iFlightFlagSize; | |
2139 initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2; | |
2140 initdata.m_iID = m_SocketID; | |
2141 | |
2142 char buffer[1500]; | |
2143 int size = 1500; | |
2144 initdata.serialize(buffer, size); | |
2145 sendCtrl(0, NULL, buffer, size); | |
2146 } | |
2147 | |
2148 break; | |
2149 | |
2150 case 5: //101 - Shutdown | |
2151 m_bShutdown = true; | |
2152 m_bClosing = true; | |
2153 m_bBroken = true; | |
2154 m_iBrokenCounter = 60; | |
2155 | |
2156 // Signal the sender and recver if they are waiting for data. | |
2157 releaseSynch(); | |
2158 | |
2159 CTimer::triggerEvent(); | |
2160 | |
2161 break; | |
2162 | |
2163 case 7: //111 - Msg drop request | |
2164 m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq()); | |
2165 m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_
pcData + 4)); | |
2166 | |
2167 // move forward with current recv seq no. | |
2168 if ((CSeqNo::seqcmp(*(int32_t*)ctrlpkt.m_pcData, CSeqNo::incseq(m_iRcvCurr
SeqNo)) <= 0) | |
2169 && (CSeqNo::seqcmp(*(int32_t*)(ctrlpkt.m_pcData + 4), m_iRcvCurrSeqNo)
> 0)) | |
2170 { | |
2171 m_iRcvCurrSeqNo = *(int32_t*)(ctrlpkt.m_pcData + 4); | |
2172 } | |
2173 | |
2174 break; | |
2175 | |
2176 case 8: // 1000 - An error has happened to the peer side | |
2177 //int err_type = packet.getAddInfo(); | |
2178 | |
2179 // currently only this error is signalled from the peer side | |
2180 // if recvfile() failes (e.g., due to disk fail), blcoked sendfile/send sh
ould return immediately | |
2181 // giving the app a chance to fix the issue | |
2182 | |
2183 m_bPeerHealth = false; | |
2184 | |
2185 break; | |
2186 | |
2187 case 32767: //0x7FFF - reserved and user defined messages | |
2188 m_pCC->processCustomMsg(&ctrlpkt); | |
2189 // update CC parameters | |
2190 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); | |
2191 m_dCongestionWindow = m_pCC->m_dCWndSize; | |
2192 | |
2193 break; | |
2194 | |
2195 default: | |
2196 break; | |
2197 } | |
2198 } | |
2199 | |
2200 int CUDT::packData(CPacket& packet, uint64_t& ts) | |
2201 { | |
2202 int payload = 0; | |
2203 bool probe = false; | |
2204 | |
2205 uint64_t entertime; | |
2206 CTimer::rdtsc(entertime); | |
2207 | |
2208 if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime)) | |
2209 m_ullTimeDiff += entertime - m_ullTargetTime; | |
2210 | |
2211 // Loss retransmission always has higher priority. | |
2212 if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) | |
2213 { | |
2214 // protect m_iSndLastDataAck from updating by ACK processing | |
2215 CGuard ackguard(m_AckLock); | |
2216 | |
2217 int offset = CSeqNo::seqoff((int32_t&)m_iSndLastDataAck, packet.m_iSeqNo); | |
2218 if (offset < 0) | |
2219 return 0; | |
2220 | |
2221 int msglen; | |
2222 | |
2223 payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsg
No, msglen); | |
2224 | |
2225 if (-1 == payload) | |
2226 { | |
2227 int32_t seqpair[2]; | |
2228 seqpair[0] = packet.m_iSeqNo; | |
2229 seqpair[1] = CSeqNo::incseq(seqpair[0], msglen); | |
2230 sendCtrl(7, &packet.m_iMsgNo, seqpair, 8); | |
2231 | |
2232 // only one msg drop request is necessary | |
2233 m_pSndLossList->remove(seqpair[1]); | |
2234 | |
2235 // skip all dropped packets | |
2236 if (CSeqNo::seqcmp(const_cast<int32_t&>(m_iSndCurrSeqNo), CSeqNo::incse
q(seqpair[1])) < 0) | |
2237 m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]); | |
2238 | |
2239 return 0; | |
2240 } | |
2241 else if (0 == payload) | |
2242 return 0; | |
2243 | |
2244 ++ m_iTraceRetrans; | |
2245 ++ m_iRetransTotal; | |
2246 } | |
2247 else | |
2248 { | |
2249 // If no loss, pack a new packet. | |
2250 | |
2251 // check congestion/flow window limit | |
2252 int cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowS
ize : (int)m_dCongestionWindow; | |
2253 if (cwnd >= CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSeqNo::in
cseq(m_iSndCurrSeqNo))) | |
2254 { | |
2255 if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m
_iMsgNo))) | |
2256 { | |
2257 m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); | |
2258 m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo); | |
2259 | |
2260 packet.m_iSeqNo = m_iSndCurrSeqNo; | |
2261 | |
2262 // every 16 (0xF) packets, a packet pair is sent | |
2263 if (0 == (packet.m_iSeqNo & 0xF)) | |
2264 probe = true; | |
2265 } | |
2266 else | |
2267 { | |
2268 m_ullTargetTime = 0; | |
2269 m_ullTimeDiff = 0; | |
2270 ts = 0; | |
2271 return 0; | |
2272 } | |
2273 } | |
2274 else | |
2275 { | |
2276 m_ullTargetTime = 0; | |
2277 m_ullTimeDiff = 0; | |
2278 ts = 0; | |
2279 return 0; | |
2280 } | |
2281 } | |
2282 | |
2283 packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime); | |
2284 packet.m_iID = m_PeerID; | |
2285 packet.setLength(payload); | |
2286 | |
2287 m_pCC->onPktSent(&packet); | |
2288 //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp); | |
2289 | |
2290 ++ m_llTraceSent; | |
2291 ++ m_llSentTotal; | |
2292 | |
2293 if (probe) | |
2294 { | |
2295 // sends out probing packet pair | |
2296 ts = entertime; | |
2297 probe = false; | |
2298 } | |
2299 else | |
2300 { | |
2301 #ifndef NO_BUSY_WAITING | |
2302 ts = entertime + m_ullInterval; | |
2303 #else | |
2304 if (m_ullTimeDiff >= m_ullInterval) | |
2305 { | |
2306 ts = entertime; | |
2307 m_ullTimeDiff -= m_ullInterval; | |
2308 } | |
2309 else | |
2310 { | |
2311 ts = entertime + m_ullInterval - m_ullTimeDiff; | |
2312 m_ullTimeDiff = 0; | |
2313 } | |
2314 #endif | |
2315 } | |
2316 | |
2317 m_ullTargetTime = ts; | |
2318 | |
2319 return payload; | |
2320 } | |
2321 | |
2322 int CUDT::processData(CUnit* unit) | |
2323 { | |
2324 CPacket& packet = unit->m_Packet; | |
2325 | |
2326 // Just heard from the peer, reset the expiration count. | |
2327 m_iEXPCount = 1; | |
2328 m_llLastRspTime = CTimer::getTime(); | |
2329 | |
2330 if (CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) | |
2331 { | |
2332 uint64_t currtime; | |
2333 CTimer::rdtsc(currtime); | |
2334 if (!m_pCC->m_bUserDefinedRTO) | |
2335 m_ullNextEXPTime = currtime + m_ullEXPInt; | |
2336 else | |
2337 m_ullNextEXPTime = currtime + m_pCC->m_iRTO * m_ullCPUFrequency; | |
2338 } | |
2339 | |
2340 m_pCC->onPktReceived(&packet); | |
2341 | |
2342 ++ m_iPktCount; | |
2343 | |
2344 // update time information | |
2345 m_pRcvTimeWindow->onPktArrival(); | |
2346 | |
2347 // check if it is probing packet pair | |
2348 if (0 == (packet.m_iSeqNo & 0xF)) | |
2349 m_pRcvTimeWindow->probe1Arrival(); | |
2350 else if (1 == (packet.m_iSeqNo & 0xF)) | |
2351 m_pRcvTimeWindow->probe2Arrival(); | |
2352 | |
2353 ++ m_llTraceRecv; | |
2354 ++ m_llRecvTotal; | |
2355 | |
2356 int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo); | |
2357 if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize())) | |
2358 return -1; | |
2359 | |
2360 if (m_pRcvBuffer->addData(unit, offset) < 0) | |
2361 return -1; | |
2362 | |
2363 // Loss detection. | |
2364 if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) | |
2365 { | |
2366 // If loss found, insert them to the receiver loss list | |
2367 m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(pac
ket.m_iSeqNo)); | |
2368 | |
2369 // pack loss list for NAK | |
2370 int32_t lossdata[2]; | |
2371 lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000; | |
2372 lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo); | |
2373 | |
2374 // Generate loss report immediately. | |
2375 sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::de
cseq(packet.m_iSeqNo)) ? 1 : 2); | |
2376 | |
2377 int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2; | |
2378 m_iTraceRcvLoss += loss; | |
2379 m_iRcvLossTotal += loss; | |
2380 } | |
2381 | |
2382 // This is not a regular fixed size packet... | |
2383 //an irregular sized packet usually indicates the end of a message, so send a
n ACK immediately | |
2384 if (packet.getLength() != m_iPayloadSize) | |
2385 CTimer::rdtsc(m_ullNextACKTime); | |
2386 | |
2387 // Update the current largest sequence number that has been received. | |
2388 // Or it is a retransmitted packet, remove it from receiver loss list. | |
2389 if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0) | |
2390 m_iRcvCurrSeqNo = packet.m_iSeqNo; | |
2391 else | |
2392 m_pRcvLossList->remove(packet.m_iSeqNo); | |
2393 | |
2394 return 0; | |
2395 } | |
2396 | |
2397 int CUDT::listen(sockaddr* addr, CPacket& packet) | |
2398 { | |
2399 if (m_bClosing) | |
2400 return 1002; | |
2401 | |
2402 if (packet.getLength() != CHandShake::m_iContentSize) | |
2403 return 1004; | |
2404 | |
2405 CHandShake hs; | |
2406 hs.deserialize(packet.m_pcData, packet.getLength()); | |
2407 | |
2408 // SYN cookie | |
2409 char clienthost[NI_MAXHOST]; | |
2410 char clientport[NI_MAXSERV]; | |
2411 getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sock
addr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NU
MERICHOST|NI_NUMERICSERV); | |
2412 int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret c
hanges every one minute | |
2413 char cookiestr[1024]; | |
2414 sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int)times
tamp); | |
2415 unsigned char cookie[16]; | |
2416 CMD5::compute(cookiestr, cookie); | |
2417 | |
2418 if (1 == hs.m_iReqType) | |
2419 { | |
2420 hs.m_iCookie = *(int*)cookie; | |
2421 packet.m_iID = hs.m_iID; | |
2422 int size = packet.getLength(); | |
2423 hs.serialize(packet.m_pcData, size); | |
2424 m_pSndQueue->sendto(addr, packet); | |
2425 return 0; | |
2426 } | |
2427 else | |
2428 { | |
2429 if (hs.m_iCookie != *(int*)cookie) | |
2430 { | |
2431 timestamp --; | |
2432 sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int
)timestamp); | |
2433 CMD5::compute(cookiestr, cookie); | |
2434 | |
2435 if (hs.m_iCookie != *(int*)cookie) | |
2436 return -1; | |
2437 } | |
2438 } | |
2439 | |
2440 int32_t id = hs.m_iID; | |
2441 | |
2442 // When a peer side connects in... | |
2443 if ((1 == packet.getFlag()) && (0 == packet.getType())) | |
2444 { | |
2445 if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType)) | |
2446 { | |
2447 // mismatch, reject the request | |
2448 hs.m_iReqType = 1002; | |
2449 int size = CHandShake::m_iContentSize; | |
2450 hs.serialize(packet.m_pcData, size); | |
2451 packet.m_iID = id; | |
2452 m_pSndQueue->sendto(addr, packet); | |
2453 } | |
2454 else | |
2455 { | |
2456 int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs); | |
2457 if (result == -1) | |
2458 hs.m_iReqType = 1002; | |
2459 | |
2460 // send back a response if connection failed or connection already exis
ted | |
2461 // new connection response should be sent in connect() | |
2462 if (result != 1) | |
2463 { | |
2464 int size = CHandShake::m_iContentSize; | |
2465 hs.serialize(packet.m_pcData, size); | |
2466 packet.m_iID = id; | |
2467 m_pSndQueue->sendto(addr, packet); | |
2468 } | |
2469 else | |
2470 { | |
2471 // a mew connection has been created, enable epoll for read | |
2472 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); | |
2473 } | |
2474 } | |
2475 } | |
2476 | |
2477 return hs.m_iReqType; | |
2478 } | |
2479 | |
2480 void CUDT::checkTimers() | |
2481 { | |
2482 // update CC parameters | |
2483 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); | |
2484 m_dCongestionWindow = m_pCC->m_dCWndSize; | |
2485 //uint64_t minint = (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPk
tSndInt() * 0.9); | |
2486 //if (m_ullInterval < minint) | |
2487 // m_ullInterval = minint; | |
2488 | |
2489 uint64_t currtime; | |
2490 CTimer::rdtsc(currtime); | |
2491 | |
2492 if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->
m_iACKInterval <= m_iPktCount))) | |
2493 { | |
2494 // ACK timer expired or ACK interval is reached | |
2495 | |
2496 sendCtrl(2); | |
2497 CTimer::rdtsc(currtime); | |
2498 if (m_pCC->m_iACKPeriod > 0) | |
2499 m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency; | |
2500 else | |
2501 m_ullNextACKTime = currtime + m_ullACKInt; | |
2502 | |
2503 m_iPktCount = 0; | |
2504 m_iLightACKCount = 1; | |
2505 } | |
2506 else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount) | |
2507 { | |
2508 //send a "light" ACK | |
2509 sendCtrl(2, NULL, NULL, 4); | |
2510 ++ m_iLightACKCount; | |
2511 } | |
2512 | |
2513 // we are not sending back repeated NAK anymore and rely on the sender's EXP
for retransmission | |
2514 //if ((m_pRcvLossList->getLossLength() > 0) && (currtime > m_ullNextNAKTime)) | |
2515 //{ | |
2516 // // NAK timer expired, and there is loss to be reported. | |
2517 // sendCtrl(3); | |
2518 // | |
2519 // CTimer::rdtsc(currtime); | |
2520 // m_ullNextNAKTime = currtime + m_ullNAKInt; | |
2521 //} | |
2522 | |
2523 if (currtime > m_ullNextEXPTime) | |
2524 { | |
2525 // Haven't receive any information from the peer, is it dead?! | |
2526 // timeout: at least 16 expirations and must be greater than 10 seconds | |
2527 if ((m_iEXPCount > 16) && (CTimer::getTime() - m_llLastRspTime > 10000000)
) | |
2528 { | |
2529 // | |
2530 // Connection is broken. | |
2531 // UDT does not signal any information about this instead of to stop qu
ietly. | |
2532 // Apllication will detect this when it calls any UDT methods next time
. | |
2533 // | |
2534 m_bClosing = true; | |
2535 m_bBroken = true; | |
2536 m_iBrokenCounter = 30; | |
2537 | |
2538 // update snd U list to remove this socket | |
2539 m_pSndQueue->m_pSndUList->update(this); | |
2540 | |
2541 releaseSynch(); | |
2542 | |
2543 // a broken socket can be "write" to learn the error | |
2544 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); | |
2545 | |
2546 CTimer::triggerEvent(); | |
2547 | |
2548 return; | |
2549 } | |
2550 | |
2551 // sender: Insert all the packets sent after last received acknowledgement
into the sender loss list. | |
2552 // recver: Send out a keep-alive packet | |
2553 if (m_pSndBuffer->getCurrBufSize() > 0) | |
2554 { | |
2555 if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossLi
st->getLossLength() == 0)) | |
2556 { | |
2557 // resend all unacknowledged packets on timeout, but only if there i
s no packet in the loss list | |
2558 int32_t csn = m_iSndCurrSeqNo; | |
2559 int num = m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck)
, csn); | |
2560 m_iTraceSndLoss += num; | |
2561 m_iSndLossTotal += num; | |
2562 } | |
2563 | |
2564 m_pCC->onTimeout(); | |
2565 // update CC parameters | |
2566 m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); | |
2567 m_dCongestionWindow = m_pCC->m_dCWndSize; | |
2568 | |
2569 // immediately restart transmission | |
2570 m_pSndQueue->m_pSndUList->update(this); | |
2571 } | |
2572 else | |
2573 { | |
2574 sendCtrl(1); | |
2575 } | |
2576 | |
2577 ++ m_iEXPCount; | |
2578 m_ullEXPInt = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) *
m_ullCPUFrequency; | |
2579 if (m_ullEXPInt < m_iEXPCount * m_ullMinExpInt) | |
2580 m_ullEXPInt = m_iEXPCount * m_ullMinExpInt; | |
2581 CTimer::rdtsc(m_ullNextEXPTime); | |
2582 m_ullNextEXPTime += m_ullEXPInt; | |
2583 } | |
2584 } | |
2585 | |
2586 void CUDT::addEPoll(const int eid) | |
2587 { | |
2588 CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock); | |
2589 m_sPollID.insert(eid); | |
2590 CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); | |
2591 | |
2592 if (!m_bConnected || m_bBroken || m_bClosing) | |
2593 return; | |
2594 | |
2595 if ((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) | |
2596 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); | |
2597 else if ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)) | |
2598 s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); | |
2599 | |
2600 if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) | |
2601 s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); | |
2602 } | |
2603 | |
2604 void CUDT::removeEPoll(const int eid) | |
2605 { | |
2606 s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); | |
2607 s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); | |
2608 | |
2609 CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock); | |
2610 m_sPollID.erase(eid); | |
2611 CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); | |
2612 } | |
OLD | NEW |