OLD | NEW |
| (Empty) |
1 /***************************************************************************** | |
2 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois. | |
3 All rights reserved. | |
4 | |
5 Redistribution and use in source and binary forms, with or without | |
6 modification, are permitted provided that the following conditions are | |
7 met: | |
8 | |
9 * Redistributions of source code must retain the above | |
10 copyright notice, this list of conditions and the | |
11 following disclaimer. | |
12 | |
13 * Redistributions in binary form must reproduce the | |
14 above copyright notice, this list of conditions | |
15 and the following disclaimer in the documentation | |
16 and/or other materials provided with the distribution. | |
17 | |
18 * Neither the name of the University of Illinois | |
19 nor the names of its contributors may be used to | |
20 endorse or promote products derived from this | |
21 software without specific prior written permission. | |
22 | |
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS | |
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, | |
25 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR | |
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR | |
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, | |
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, | |
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | |
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | |
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | |
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
34 *****************************************************************************/ | |
35 | |
36 /***************************************************************************** | |
37 written by | |
38 Yunhong Gu, last updated 01/02/2011 | |
39 *****************************************************************************/ | |
40 | |
41 #ifdef WIN32 | |
42 #include <winsock2.h> | |
43 #include <ws2tcpip.h> | |
44 #ifdef LEGACY_WIN32 | |
45 #include <wspiapi.h> | |
46 #endif | |
47 #endif | |
48 | |
49 #include <cstring> | |
50 #include "common.h" | |
51 #include "queue.h" | |
52 #include "core.h" | |
53 | |
54 using namespace std; | |
55 | |
56 CUnitQueue::CUnitQueue(): | |
57 m_pQEntry(NULL), | |
58 m_pCurrQueue(NULL), | |
59 m_pLastQueue(NULL), | |
60 m_iSize(0), | |
61 m_iCount(0), | |
62 m_iMSS(), | |
63 m_iIPversion() | |
64 { | |
65 } | |
66 | |
67 CUnitQueue::~CUnitQueue() | |
68 { | |
69 CQEntry* p = m_pQEntry; | |
70 | |
71 while (p != NULL) | |
72 { | |
73 delete [] p->m_pUnit; | |
74 delete [] p->m_pBuffer; | |
75 | |
76 CQEntry* q = p; | |
77 if (p == m_pLastQueue) | |
78 p = NULL; | |
79 else | |
80 p = p->m_pNext; | |
81 delete q; | |
82 } | |
83 } | |
84 | |
85 int CUnitQueue::init(const int& size, const int& mss, const int& version) | |
86 { | |
87 CQEntry* tempq = NULL; | |
88 CUnit* tempu = NULL; | |
89 char* tempb = NULL; | |
90 | |
91 try | |
92 { | |
93 tempq = new CQEntry; | |
94 tempu = new CUnit [size]; | |
95 tempb = new char [size * mss]; | |
96 } | |
97 catch (...) | |
98 { | |
99 delete tempq; | |
100 delete [] tempu; | |
101 delete [] tempb; | |
102 | |
103 return -1; | |
104 } | |
105 | |
106 for (int i = 0; i < size; ++ i) | |
107 { | |
108 tempu[i].m_iFlag = 0; | |
109 tempu[i].m_Packet.m_pcData = tempb + i * mss; | |
110 } | |
111 tempq->m_pUnit = tempu; | |
112 tempq->m_pBuffer = tempb; | |
113 tempq->m_iSize = size; | |
114 | |
115 m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; | |
116 m_pQEntry->m_pNext = m_pQEntry; | |
117 | |
118 m_pAvailUnit = m_pCurrQueue->m_pUnit; | |
119 | |
120 m_iSize = size; | |
121 m_iMSS = mss; | |
122 m_iIPversion = version; | |
123 | |
124 return 0; | |
125 } | |
126 | |
127 int CUnitQueue::increase() | |
128 { | |
129 // adjust/correct m_iCount | |
130 int real_count = 0; | |
131 CQEntry* p = m_pQEntry; | |
132 while (p != NULL) | |
133 { | |
134 CUnit* u = p->m_pUnit; | |
135 for (CUnit* end = u + p->m_iSize; u != end; ++ u) | |
136 if (u->m_iFlag != 0) | |
137 ++ real_count; | |
138 | |
139 if (p == m_pLastQueue) | |
140 p = NULL; | |
141 else | |
142 p = p->m_pNext; | |
143 } | |
144 m_iCount = real_count; | |
145 if (double(m_iCount) / m_iSize < 0.9) | |
146 return -1; | |
147 | |
148 CQEntry* tempq = NULL; | |
149 CUnit* tempu = NULL; | |
150 char* tempb = NULL; | |
151 | |
152 // all queues have the same size | |
153 int size = m_pQEntry->m_iSize; | |
154 | |
155 try | |
156 { | |
157 tempq = new CQEntry; | |
158 tempu = new CUnit [size]; | |
159 tempb = new char [size * m_iMSS]; | |
160 } | |
161 catch (...) | |
162 { | |
163 delete tempq; | |
164 delete [] tempu; | |
165 delete [] tempb; | |
166 | |
167 return -1; | |
168 } | |
169 | |
170 for (int i = 0; i < size; ++ i) | |
171 { | |
172 tempu[i].m_iFlag = 0; | |
173 tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS; | |
174 } | |
175 tempq->m_pUnit = tempu; | |
176 tempq->m_pBuffer = tempb; | |
177 tempq->m_iSize = size; | |
178 | |
179 m_pLastQueue->m_pNext = tempq; | |
180 m_pLastQueue = tempq; | |
181 m_pLastQueue->m_pNext = m_pQEntry; | |
182 | |
183 m_iSize += size; | |
184 | |
185 return 0; | |
186 } | |
187 | |
188 int CUnitQueue::shrink() | |
189 { | |
190 // currently queue cannot be shrunk. | |
191 return -1; | |
192 } | |
193 | |
194 CUnit* CUnitQueue::getNextAvailUnit() | |
195 { | |
196 if (m_iCount * 10 > m_iSize * 9) | |
197 increase(); | |
198 | |
199 if (m_iCount >= m_iSize) | |
200 return NULL; | |
201 | |
202 CQEntry* entrance = m_pCurrQueue; | |
203 | |
204 do | |
205 { | |
206 for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1;
m_pAvailUnit != sentinel; ++ m_pAvailUnit) | |
207 if (m_pAvailUnit->m_iFlag == 0) | |
208 return m_pAvailUnit; | |
209 | |
210 if (m_pCurrQueue->m_pUnit->m_iFlag == 0) | |
211 { | |
212 m_pAvailUnit = m_pCurrQueue->m_pUnit; | |
213 return m_pAvailUnit; | |
214 } | |
215 | |
216 m_pCurrQueue = m_pCurrQueue->m_pNext; | |
217 m_pAvailUnit = m_pCurrQueue->m_pUnit; | |
218 } while (m_pCurrQueue != entrance); | |
219 | |
220 increase(); | |
221 | |
222 return NULL; | |
223 } | |
224 | |
225 | |
226 CSndUList::CSndUList(): | |
227 m_pHeap(NULL), | |
228 m_iArrayLength(4096), | |
229 m_iLastEntry(-1), | |
230 m_ListLock(), | |
231 m_pWindowLock(NULL), | |
232 m_pWindowCond(NULL), | |
233 m_pTimer(NULL) | |
234 { | |
235 m_pHeap = new CSNode*[m_iArrayLength]; | |
236 | |
237 #ifndef WIN32 | |
238 pthread_mutex_init(&m_ListLock, NULL); | |
239 #else | |
240 m_ListLock = CreateMutex(NULL, false, NULL); | |
241 #endif | |
242 } | |
243 | |
244 CSndUList::~CSndUList() | |
245 { | |
246 delete [] m_pHeap; | |
247 | |
248 #ifndef WIN32 | |
249 pthread_mutex_destroy(&m_ListLock); | |
250 #else | |
251 CloseHandle(m_ListLock); | |
252 #endif | |
253 } | |
254 | |
255 void CSndUList::insert(const int64_t& ts, const CUDT* u) | |
256 { | |
257 CGuard listguard(m_ListLock); | |
258 | |
259 // increase the heap array size if necessary | |
260 if (m_iLastEntry == m_iArrayLength - 1) | |
261 { | |
262 CSNode** temp = NULL; | |
263 | |
264 try | |
265 { | |
266 temp = new CSNode*[m_iArrayLength * 2]; | |
267 } | |
268 catch(...) | |
269 { | |
270 return; | |
271 } | |
272 | |
273 memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength); | |
274 m_iArrayLength *= 2; | |
275 delete [] m_pHeap; | |
276 m_pHeap = temp; | |
277 } | |
278 | |
279 insert_(ts, u); | |
280 } | |
281 | |
282 void CSndUList::update(const CUDT* u, const bool& reschedule) | |
283 { | |
284 CGuard listguard(m_ListLock); | |
285 | |
286 CSNode* n = u->m_pSNode; | |
287 | |
288 if (n->m_iHeapLoc >= 0) | |
289 { | |
290 if (!reschedule) | |
291 return; | |
292 | |
293 if (n->m_iHeapLoc == 0) | |
294 { | |
295 n->m_llTimeStamp = 1; | |
296 m_pTimer->interrupt(); | |
297 return; | |
298 } | |
299 | |
300 remove_(u); | |
301 } | |
302 | |
303 insert_(1, u); | |
304 } | |
305 | |
306 int CSndUList::pop(sockaddr*& addr, CPacket& pkt) | |
307 { | |
308 CGuard listguard(m_ListLock); | |
309 | |
310 if (-1 == m_iLastEntry) | |
311 return -1; | |
312 | |
313 CUDT* u = m_pHeap[0]->m_pUDT; | |
314 remove_(u); | |
315 | |
316 if (!u->m_bConnected || u->m_bBroken) | |
317 return -1; | |
318 | |
319 // pack a packet from the socket | |
320 uint64_t ts; | |
321 if (u->packData(pkt, ts) <= 0) | |
322 return -1; | |
323 | |
324 addr = u->m_pPeerAddr; | |
325 | |
326 // insert a new entry, ts is the next processing time | |
327 if (ts > 0) | |
328 insert_(ts, u); | |
329 | |
330 return 1; | |
331 } | |
332 | |
333 void CSndUList::remove(const CUDT* u) | |
334 { | |
335 CGuard listguard(m_ListLock); | |
336 | |
337 remove_(u); | |
338 } | |
339 | |
340 uint64_t CSndUList::getNextProcTime() | |
341 { | |
342 CGuard listguard(m_ListLock); | |
343 | |
344 if (-1 == m_iLastEntry) | |
345 return 0; | |
346 | |
347 return m_pHeap[0]->m_llTimeStamp; | |
348 } | |
349 | |
350 void CSndUList::insert_(const int64_t& ts, const CUDT* u) | |
351 { | |
352 CSNode* n = u->m_pSNode; | |
353 | |
354 // do not insert repeated node | |
355 if (n->m_iHeapLoc >= 0) | |
356 return; | |
357 | |
358 m_iLastEntry ++; | |
359 m_pHeap[m_iLastEntry] = n; | |
360 n->m_llTimeStamp = ts; | |
361 | |
362 int q = m_iLastEntry; | |
363 int p = q; | |
364 while (p != 0) | |
365 { | |
366 p = (q - 1) >> 1; | |
367 if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) | |
368 { | |
369 CSNode* t = m_pHeap[p]; | |
370 m_pHeap[p] = m_pHeap[q]; | |
371 m_pHeap[q] = t; | |
372 t->m_iHeapLoc = q; | |
373 q = p; | |
374 } | |
375 else | |
376 break; | |
377 } | |
378 | |
379 n->m_iHeapLoc = q; | |
380 | |
381 // first entry, activate the sending queue | |
382 if (0 == m_iLastEntry) | |
383 { | |
384 #ifndef WIN32 | |
385 pthread_mutex_lock(m_pWindowLock); | |
386 pthread_cond_signal(m_pWindowCond); | |
387 pthread_mutex_unlock(m_pWindowLock); | |
388 #else | |
389 SetEvent(*m_pWindowCond); | |
390 #endif | |
391 } | |
392 } | |
393 | |
394 void CSndUList::remove_(const CUDT* u) | |
395 { | |
396 CSNode* n = u->m_pSNode; | |
397 | |
398 if (n->m_iHeapLoc >= 0) | |
399 { | |
400 // remove the node from heap | |
401 m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry]; | |
402 m_iLastEntry --; | |
403 m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc; | |
404 | |
405 int q = n->m_iHeapLoc; | |
406 int p = q * 2 + 1; | |
407 while (p <= m_iLastEntry) | |
408 { | |
409 if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p +
1]->m_llTimeStamp)) | |
410 p ++; | |
411 | |
412 if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp) | |
413 { | |
414 CSNode* t = m_pHeap[p]; | |
415 m_pHeap[p] = m_pHeap[q]; | |
416 m_pHeap[p]->m_iHeapLoc = p; | |
417 m_pHeap[q] = t; | |
418 m_pHeap[q]->m_iHeapLoc = q; | |
419 | |
420 q = p; | |
421 p = q * 2 + 1; | |
422 } | |
423 else | |
424 break; | |
425 } | |
426 | |
427 n->m_iHeapLoc = -1; | |
428 } | |
429 } | |
430 | |
431 // | |
432 CSndQueue::CSndQueue(): | |
433 m_WorkerThread(), | |
434 m_pSndUList(NULL), | |
435 m_pChannel(NULL), | |
436 m_pTimer(NULL), | |
437 m_WindowLock(), | |
438 m_WindowCond(), | |
439 m_bClosing(false), | |
440 m_ExitCond() | |
441 { | |
442 #ifndef WIN32 | |
443 pthread_cond_init(&m_WindowCond, NULL); | |
444 pthread_mutex_init(&m_WindowLock, NULL); | |
445 #else | |
446 m_WindowLock = CreateMutex(NULL, false, NULL); | |
447 m_WindowCond = CreateEvent(NULL, false, false, NULL); | |
448 m_ExitCond = CreateEvent(NULL, false, false, NULL); | |
449 #endif | |
450 } | |
451 | |
452 CSndQueue::~CSndQueue() | |
453 { | |
454 m_bClosing = true; | |
455 | |
456 #ifndef WIN32 | |
457 pthread_mutex_lock(&m_WindowLock); | |
458 pthread_cond_signal(&m_WindowCond); | |
459 pthread_mutex_unlock(&m_WindowLock); | |
460 if (0 != m_WorkerThread) | |
461 pthread_join(m_WorkerThread, NULL); | |
462 pthread_cond_destroy(&m_WindowCond); | |
463 pthread_mutex_destroy(&m_WindowLock); | |
464 #else | |
465 SetEvent(m_WindowCond); | |
466 if (NULL != m_WorkerThread) | |
467 WaitForSingleObject(m_ExitCond, INFINITE); | |
468 CloseHandle(m_WorkerThread); | |
469 CloseHandle(m_WindowLock); | |
470 CloseHandle(m_WindowCond); | |
471 CloseHandle(m_ExitCond); | |
472 #endif | |
473 | |
474 delete m_pSndUList; | |
475 } | |
476 | |
477 void CSndQueue::init(const CChannel* c, const CTimer* t) | |
478 { | |
479 m_pChannel = (CChannel*)c; | |
480 m_pTimer = (CTimer*)t; | |
481 m_pSndUList = new CSndUList; | |
482 m_pSndUList->m_pWindowLock = &m_WindowLock; | |
483 m_pSndUList->m_pWindowCond = &m_WindowCond; | |
484 m_pSndUList->m_pTimer = m_pTimer; | |
485 | |
486 #ifndef WIN32 | |
487 if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)) | |
488 { | |
489 m_WorkerThread = 0; | |
490 throw CUDTException(3, 1); | |
491 } | |
492 #else | |
493 DWORD threadID; | |
494 m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &thread
ID); | |
495 if (NULL == m_WorkerThread) | |
496 throw CUDTException(3, 1); | |
497 #endif | |
498 } | |
499 | |
500 #ifndef WIN32 | |
501 void* CSndQueue::worker(void* param) | |
502 #else | |
503 DWORD WINAPI CSndQueue::worker(LPVOID param) | |
504 #endif | |
505 { | |
506 CSndQueue* self = (CSndQueue*)param; | |
507 | |
508 while (!self->m_bClosing) | |
509 { | |
510 uint64_t ts = self->m_pSndUList->getNextProcTime(); | |
511 | |
512 if (ts > 0) | |
513 { | |
514 // wait until next processing time of the first socket on the list | |
515 uint64_t currtime; | |
516 CTimer::rdtsc(currtime); | |
517 if (currtime < ts) | |
518 self->m_pTimer->sleepto(ts); | |
519 | |
520 // it is time to send the next pkt | |
521 sockaddr* addr; | |
522 CPacket pkt; | |
523 if (self->m_pSndUList->pop(addr, pkt) < 0) | |
524 continue; | |
525 | |
526 self->m_pChannel->sendto(addr, pkt); | |
527 } | |
528 else | |
529 { | |
530 // wait here if there is no sockets with data to be sent | |
531 #ifndef WIN32 | |
532 pthread_mutex_lock(&self->m_WindowLock); | |
533 if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0)) | |
534 pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock); | |
535 pthread_mutex_unlock(&self->m_WindowLock); | |
536 #else | |
537 WaitForSingleObject(self->m_WindowCond, INFINITE); | |
538 #endif | |
539 } | |
540 } | |
541 | |
542 #ifndef WIN32 | |
543 return NULL; | |
544 #else | |
545 SetEvent(self->m_ExitCond); | |
546 return 0; | |
547 #endif | |
548 } | |
549 | |
550 int CSndQueue::sendto(const sockaddr* addr, CPacket& packet) | |
551 { | |
552 // send out the packet immediately (high priority), this is a control packet | |
553 m_pChannel->sendto(addr, packet); | |
554 return packet.getLength(); | |
555 } | |
556 | |
557 | |
558 // | |
559 CRcvUList::CRcvUList(): | |
560 m_pUList(NULL), | |
561 m_pLast(NULL) | |
562 { | |
563 } | |
564 | |
565 CRcvUList::~CRcvUList() | |
566 { | |
567 } | |
568 | |
569 void CRcvUList::insert(const CUDT* u) | |
570 { | |
571 CRNode* n = u->m_pRNode; | |
572 CTimer::rdtsc(n->m_llTimeStamp); | |
573 | |
574 if (NULL == m_pUList) | |
575 { | |
576 // empty list, insert as the single node | |
577 n->m_pPrev = n->m_pNext = NULL; | |
578 m_pLast = m_pUList = n; | |
579 | |
580 return; | |
581 } | |
582 | |
583 // always insert at the end for RcvUList | |
584 n->m_pPrev = m_pLast; | |
585 n->m_pNext = NULL; | |
586 m_pLast->m_pNext = n; | |
587 m_pLast = n; | |
588 } | |
589 | |
590 void CRcvUList::remove(const CUDT* u) | |
591 { | |
592 CRNode* n = u->m_pRNode; | |
593 | |
594 if (!n->m_bOnList) | |
595 return; | |
596 | |
597 if (NULL == n->m_pPrev) | |
598 { | |
599 // n is the first node | |
600 m_pUList = n->m_pNext; | |
601 if (NULL == m_pUList) | |
602 m_pLast = NULL; | |
603 else | |
604 m_pUList->m_pPrev = NULL; | |
605 } | |
606 else | |
607 { | |
608 n->m_pPrev->m_pNext = n->m_pNext; | |
609 if (NULL == n->m_pNext) | |
610 { | |
611 // n is the last node | |
612 m_pLast = n->m_pPrev; | |
613 } | |
614 else | |
615 n->m_pNext->m_pPrev = n->m_pPrev; | |
616 } | |
617 | |
618 n->m_pNext = n->m_pPrev = NULL; | |
619 } | |
620 | |
621 void CRcvUList::update(const CUDT* u) | |
622 { | |
623 CRNode* n = u->m_pRNode; | |
624 | |
625 if (!n->m_bOnList) | |
626 return; | |
627 | |
628 CTimer::rdtsc(n->m_llTimeStamp); | |
629 | |
630 // if n is the last node, do not need to change | |
631 if (NULL == n->m_pNext) | |
632 return; | |
633 | |
634 if (NULL == n->m_pPrev) | |
635 { | |
636 m_pUList = n->m_pNext; | |
637 m_pUList->m_pPrev = NULL; | |
638 } | |
639 else | |
640 { | |
641 n->m_pPrev->m_pNext = n->m_pNext; | |
642 n->m_pNext->m_pPrev = n->m_pPrev; | |
643 } | |
644 | |
645 n->m_pPrev = m_pLast; | |
646 n->m_pNext = NULL; | |
647 m_pLast->m_pNext = n; | |
648 m_pLast = n; | |
649 } | |
650 | |
651 // | |
652 CHash::CHash(): | |
653 m_pBucket(NULL), | |
654 m_iHashSize(0) | |
655 { | |
656 } | |
657 | |
658 CHash::~CHash() | |
659 { | |
660 for (int i = 0; i < m_iHashSize; ++ i) | |
661 { | |
662 CBucket* b = m_pBucket[i]; | |
663 while (NULL != b) | |
664 { | |
665 CBucket* n = b->m_pNext; | |
666 delete b; | |
667 b = n; | |
668 } | |
669 } | |
670 | |
671 delete [] m_pBucket; | |
672 } | |
673 | |
674 void CHash::init(const int& size) | |
675 { | |
676 m_pBucket = new CBucket* [size]; | |
677 | |
678 for (int i = 0; i < size; ++ i) | |
679 m_pBucket[i] = NULL; | |
680 | |
681 m_iHashSize = size; | |
682 } | |
683 | |
684 CUDT* CHash::lookup(const int32_t& id) | |
685 { | |
686 // simple hash function (% hash table size); suitable for socket descriptors | |
687 CBucket* b = m_pBucket[id % m_iHashSize]; | |
688 | |
689 while (NULL != b) | |
690 { | |
691 if (id == b->m_iID) | |
692 return b->m_pUDT; | |
693 b = b->m_pNext; | |
694 } | |
695 | |
696 return NULL; | |
697 } | |
698 | |
699 void CHash::insert(const int32_t& id, const CUDT* u) | |
700 { | |
701 CBucket* b = m_pBucket[id % m_iHashSize]; | |
702 | |
703 CBucket* n = new CBucket; | |
704 n->m_iID = id; | |
705 n->m_pUDT = (CUDT*)u; | |
706 n->m_pNext = b; | |
707 | |
708 m_pBucket[id % m_iHashSize] = n; | |
709 } | |
710 | |
711 void CHash::remove(const int32_t& id) | |
712 { | |
713 CBucket* b = m_pBucket[id % m_iHashSize]; | |
714 CBucket* p = NULL; | |
715 | |
716 while (NULL != b) | |
717 { | |
718 if (id == b->m_iID) | |
719 { | |
720 if (NULL == p) | |
721 m_pBucket[id % m_iHashSize] = b->m_pNext; | |
722 else | |
723 p->m_pNext = b->m_pNext; | |
724 | |
725 delete b; | |
726 | |
727 return; | |
728 } | |
729 | |
730 p = b; | |
731 b = b->m_pNext; | |
732 } | |
733 } | |
734 | |
735 | |
736 // | |
737 CRendezvousQueue::CRendezvousQueue(): | |
738 m_vRendezvousID(), | |
739 m_RIDVectorLock() | |
740 { | |
741 #ifndef WIN32 | |
742 pthread_mutex_init(&m_RIDVectorLock, NULL); | |
743 #else | |
744 m_RIDVectorLock = CreateMutex(NULL, false, NULL); | |
745 #endif | |
746 } | |
747 | |
748 CRendezvousQueue::~CRendezvousQueue() | |
749 { | |
750 #ifndef WIN32 | |
751 pthread_mutex_destroy(&m_RIDVectorLock); | |
752 #else | |
753 CloseHandle(m_RIDVectorLock); | |
754 #endif | |
755 | |
756 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.
end(); ++ i) | |
757 { | |
758 if (AF_INET == i->m_iIPversion) | |
759 delete (sockaddr_in*)i->m_pPeerAddr; | |
760 else | |
761 delete (sockaddr_in6*)i->m_pPeerAddr; | |
762 } | |
763 | |
764 m_vRendezvousID.clear(); | |
765 } | |
766 | |
767 void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockadd
r* addr) | |
768 { | |
769 CGuard vg(m_RIDVectorLock); | |
770 | |
771 CRL r; | |
772 r.m_iID = id; | |
773 r.m_iIPversion = ipv; | |
774 r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)ne
w sockaddr_in6; | |
775 memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(s
ockaddr_in6)); | |
776 | |
777 m_vRendezvousID.insert(m_vRendezvousID.end(), r); | |
778 } | |
779 | |
780 void CRendezvousQueue::remove(const UDTSOCKET& id) | |
781 { | |
782 CGuard vg(m_RIDVectorLock); | |
783 | |
784 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.
end(); ++ i) | |
785 { | |
786 if (i->m_iID == id) | |
787 { | |
788 if (AF_INET == i->m_iIPversion) | |
789 delete (sockaddr_in*)i->m_pPeerAddr; | |
790 else | |
791 delete (sockaddr_in6*)i->m_pPeerAddr; | |
792 | |
793 m_vRendezvousID.erase(i); | |
794 | |
795 return; | |
796 } | |
797 } | |
798 } | |
799 | |
800 bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id) | |
801 { | |
802 CGuard vg(m_RIDVectorLock); | |
803 | |
804 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.
end(); ++ i) | |
805 { | |
806 if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id)
|| (id == i->m_iID))) | |
807 { | |
808 id = i->m_iID; | |
809 return true; | |
810 } | |
811 } | |
812 | |
813 return false; | |
814 } | |
815 | |
816 | |
817 // | |
818 CRcvQueue::CRcvQueue(): | |
819 m_WorkerThread(), | |
820 m_UnitQueue(), | |
821 m_pRcvUList(NULL), | |
822 m_pHash(NULL), | |
823 m_pChannel(NULL), | |
824 m_pTimer(NULL), | |
825 m_iPayloadSize(), | |
826 m_bClosing(false), | |
827 m_ExitCond(), | |
828 m_LSLock(), | |
829 m_pListener(NULL), | |
830 m_pRendezvousQueue(NULL), | |
831 m_vNewEntry(), | |
832 m_IDLock(), | |
833 m_mBuffer(), | |
834 m_PassLock(), | |
835 m_PassCond() | |
836 { | |
837 #ifndef WIN32 | |
838 pthread_mutex_init(&m_PassLock, NULL); | |
839 pthread_cond_init(&m_PassCond, NULL); | |
840 pthread_mutex_init(&m_LSLock, NULL); | |
841 pthread_mutex_init(&m_IDLock, NULL); | |
842 #else | |
843 m_PassLock = CreateMutex(NULL, false, NULL); | |
844 m_PassCond = CreateEvent(NULL, false, false, NULL); | |
845 m_LSLock = CreateMutex(NULL, false, NULL); | |
846 m_IDLock = CreateMutex(NULL, false, NULL); | |
847 m_ExitCond = CreateEvent(NULL, false, false, NULL); | |
848 #endif | |
849 } | |
850 | |
851 CRcvQueue::~CRcvQueue() | |
852 { | |
853 m_bClosing = true; | |
854 | |
855 #ifndef WIN32 | |
856 if (0 != m_WorkerThread) | |
857 pthread_join(m_WorkerThread, NULL); | |
858 pthread_mutex_destroy(&m_PassLock); | |
859 pthread_cond_destroy(&m_PassCond); | |
860 pthread_mutex_destroy(&m_LSLock); | |
861 pthread_mutex_destroy(&m_IDLock); | |
862 #else | |
863 if (NULL != m_WorkerThread) | |
864 WaitForSingleObject(m_ExitCond, INFINITE); | |
865 CloseHandle(m_WorkerThread); | |
866 CloseHandle(m_PassLock); | |
867 CloseHandle(m_PassCond); | |
868 CloseHandle(m_LSLock); | |
869 CloseHandle(m_IDLock); | |
870 CloseHandle(m_ExitCond); | |
871 #endif | |
872 | |
873 delete m_pRcvUList; | |
874 delete m_pHash; | |
875 delete m_pRendezvousQueue; | |
876 | |
877 // remove all queued messages | |
878 for (map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_m
Buffer.end(); ++ i) | |
879 { | |
880 while (!i->second.empty()) | |
881 { | |
882 CPacket* pkt = i->second.front(); | |
883 delete [] pkt->m_pcData; | |
884 delete pkt; | |
885 i->second.pop(); | |
886 } | |
887 } | |
888 } | |
889 | |
890 void CRcvQueue::init(const int& qsize, const int& payload, const int& version, c
onst int& hsize, const CChannel* cc, const CTimer* t) | |
891 { | |
892 m_iPayloadSize = payload; | |
893 | |
894 m_UnitQueue.init(qsize, payload, version); | |
895 | |
896 m_pHash = new CHash; | |
897 m_pHash->init(hsize); | |
898 | |
899 m_pChannel = (CChannel*)cc; | |
900 m_pTimer = (CTimer*)t; | |
901 | |
902 m_pRcvUList = new CRcvUList; | |
903 m_pRendezvousQueue = new CRendezvousQueue; | |
904 | |
905 #ifndef WIN32 | |
906 if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this)) | |
907 { | |
908 m_WorkerThread = 0; | |
909 throw CUDTException(3, 1); | |
910 } | |
911 #else | |
912 DWORD threadID; | |
913 m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &thread
ID); | |
914 if (NULL == m_WorkerThread) | |
915 throw CUDTException(3, 1); | |
916 #endif | |
917 } | |
918 | |
919 #ifndef WIN32 | |
920 void* CRcvQueue::worker(void* param) | |
921 #else | |
922 DWORD WINAPI CRcvQueue::worker(LPVOID param) | |
923 #endif | |
924 { | |
925 CRcvQueue* self = (CRcvQueue*)param; | |
926 | |
927 sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) ne
w sockaddr_in : (sockaddr*) new sockaddr_in6; | |
928 CUDT* u = NULL; | |
929 int32_t id; | |
930 | |
931 while (!self->m_bClosing) | |
932 { | |
933 #ifdef NO_BUSY_WAITING | |
934 self->m_pTimer->tick(); | |
935 #endif | |
936 | |
937 // check waiting list, if new socket, insert it to the list | |
938 while (self->ifNewEntry()) | |
939 { | |
940 CUDT* ne = self->getNewEntry(); | |
941 if (NULL != ne) | |
942 { | |
943 self->m_pRcvUList->insert(ne); | |
944 self->m_pHash->insert(ne->m_SocketID, ne); | |
945 } | |
946 } | |
947 | |
948 // find next available slot for incoming packet | |
949 CUnit* unit = self->m_UnitQueue.getNextAvailUnit(); | |
950 if (NULL == unit) | |
951 { | |
952 // no space, skip this packet | |
953 CPacket temp; | |
954 temp.m_pcData = new char[self->m_iPayloadSize]; | |
955 temp.setLength(self->m_iPayloadSize); | |
956 self->m_pChannel->recvfrom(addr, temp); | |
957 delete [] temp.m_pcData; | |
958 goto TIMER_CHECK; | |
959 } | |
960 | |
961 unit->m_Packet.setLength(self->m_iPayloadSize); | |
962 | |
963 // reading next incoming packet, recvfrom returns -1 is nothing has been r
eceived | |
964 if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0) | |
965 goto TIMER_CHECK; | |
966 | |
967 id = unit->m_Packet.m_iID; | |
968 | |
969 // ID 0 is for connection request, which should be passed to the listening
socket or rendezvous sockets | |
970 if (0 == id) | |
971 { | |
972 if (NULL != self->m_pListener) | |
973 ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet); | |
974 else if (self->m_pRendezvousQueue->retrieve(addr, id)) | |
975 self->storePkt(id, unit->m_Packet.clone()); | |
976 } | |
977 else if (id > 0) | |
978 { | |
979 if (NULL != (u = self->m_pHash->lookup(id))) | |
980 { | |
981 if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) | |
982 { | |
983 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) | |
984 { | |
985 if (0 == unit->m_Packet.getFlag()) | |
986 u->processData(unit); | |
987 else | |
988 u->processCtrl(unit->m_Packet); | |
989 | |
990 u->checkTimers(); | |
991 self->m_pRcvUList->update(u); | |
992 } | |
993 } | |
994 } | |
995 else if (self->m_pRendezvousQueue->retrieve(addr, id)) | |
996 self->storePkt(id, unit->m_Packet.clone()); | |
997 } | |
998 | |
999 TIMER_CHECK: | |
1000 // take care of the timing event for all UDT sockets | |
1001 | |
1002 CRNode* ul = self->m_pRcvUList->m_pUList; | |
1003 uint64_t currtime; | |
1004 CTimer::rdtsc(currtime); | |
1005 uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency(); | |
1006 | |
1007 while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) | |
1008 { | |
1009 CUDT* u = ul->m_pUDT; | |
1010 | |
1011 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) | |
1012 { | |
1013 u->checkTimers(); | |
1014 self->m_pRcvUList->update(u); | |
1015 } | |
1016 else | |
1017 { | |
1018 // the socket must be removed from Hash table first, then RcvUList | |
1019 self->m_pHash->remove(u->m_SocketID); | |
1020 self->m_pRcvUList->remove(u); | |
1021 u->m_pRNode->m_bOnList = false; | |
1022 } | |
1023 | |
1024 ul = self->m_pRcvUList->m_pUList; | |
1025 } | |
1026 } | |
1027 | |
1028 if (AF_INET == self->m_UnitQueue.m_iIPversion) | |
1029 delete (sockaddr_in*)addr; | |
1030 else | |
1031 delete (sockaddr_in6*)addr; | |
1032 | |
1033 #ifndef WIN32 | |
1034 return NULL; | |
1035 #else | |
1036 SetEvent(self->m_ExitCond); | |
1037 return 0; | |
1038 #endif | |
1039 } | |
1040 | |
1041 int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet) | |
1042 { | |
1043 CGuard bufferlock(m_PassLock); | |
1044 | |
1045 map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id); | |
1046 | |
1047 if (i == m_mBuffer.end()) | |
1048 { | |
1049 #ifndef WIN32 | |
1050 uint64_t now = CTimer::getTime(); | |
1051 timespec timeout; | |
1052 | |
1053 timeout.tv_sec = now / 1000000 + 1; | |
1054 timeout.tv_nsec = (now % 1000000) * 1000; | |
1055 | |
1056 pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout); | |
1057 #else | |
1058 ReleaseMutex(m_PassLock); | |
1059 WaitForSingleObject(m_PassCond, 1000); | |
1060 WaitForSingleObject(m_PassLock, INFINITE); | |
1061 #endif | |
1062 | |
1063 i = m_mBuffer.find(id); | |
1064 if (i == m_mBuffer.end()) | |
1065 { | |
1066 packet.setLength(-1); | |
1067 return -1; | |
1068 } | |
1069 } | |
1070 | |
1071 // retrieve the earliest packet | |
1072 CPacket* newpkt = i->second.front(); | |
1073 | |
1074 if (packet.getLength() < newpkt->getLength()) | |
1075 { | |
1076 packet.setLength(-1); | |
1077 return -1; | |
1078 } | |
1079 | |
1080 // copy packet content | |
1081 memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize); | |
1082 memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength()); | |
1083 packet.setLength(newpkt->getLength()); | |
1084 | |
1085 delete [] newpkt->m_pcData; | |
1086 delete newpkt; | |
1087 | |
1088 // remove this message from queue, | |
1089 // if no more messages left for this socket, release its data structure | |
1090 i->second.pop(); | |
1091 if (i->second.empty()) | |
1092 m_mBuffer.erase(i); | |
1093 | |
1094 return packet.getLength(); | |
1095 } | |
1096 | |
1097 int CRcvQueue::setListener(const CUDT* u) | |
1098 { | |
1099 CGuard lslock(m_LSLock); | |
1100 | |
1101 if (NULL != m_pListener) | |
1102 return -1; | |
1103 | |
1104 m_pListener = (CUDT*)u; | |
1105 return 1; | |
1106 } | |
1107 | |
1108 void CRcvQueue::removeListener(const CUDT* u) | |
1109 { | |
1110 CGuard lslock(m_LSLock); | |
1111 | |
1112 if (u == m_pListener) | |
1113 m_pListener = NULL; | |
1114 } | |
1115 | |
1116 void CRcvQueue::setNewEntry(CUDT* u) | |
1117 { | |
1118 CGuard listguard(m_IDLock); | |
1119 m_vNewEntry.insert(m_vNewEntry.end(), u); | |
1120 } | |
1121 | |
1122 bool CRcvQueue::ifNewEntry() | |
1123 { | |
1124 return !(m_vNewEntry.empty()); | |
1125 } | |
1126 | |
1127 CUDT* CRcvQueue::getNewEntry() | |
1128 { | |
1129 CGuard listguard(m_IDLock); | |
1130 | |
1131 if (m_vNewEntry.empty()) | |
1132 return NULL; | |
1133 | |
1134 CUDT* u = (CUDT*)*(m_vNewEntry.begin()); | |
1135 m_vNewEntry.erase(m_vNewEntry.begin()); | |
1136 | |
1137 return u; | |
1138 } | |
1139 | |
1140 void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt) | |
1141 { | |
1142 CGuard bufferlock(m_PassLock); | |
1143 | |
1144 map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id); | |
1145 | |
1146 if (i == m_mBuffer.end()) | |
1147 { | |
1148 m_mBuffer[id].push(pkt); | |
1149 | |
1150 #ifndef WIN32 | |
1151 pthread_cond_signal(&m_PassCond); | |
1152 #else | |
1153 SetEvent(m_PassCond); | |
1154 #endif | |
1155 } | |
1156 else | |
1157 { | |
1158 //avoid storing too many packets, in case of malfunction or attack | |
1159 if (i->second.size() > 16) | |
1160 return; | |
1161 | |
1162 i->second.push(pkt); | |
1163 } | |
1164 } | |
OLD | NEW |