Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1790)

Side by Side Diff: net/third_party/udt/src/queue.cpp

Issue 6708091: Remove UDT. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/third_party/udt/src/queue.h ('k') | net/third_party/udt/src/udt.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*****************************************************************************
2 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are
7 met:
8
9 * Redistributions of source code must retain the above
10 copyright notice, this list of conditions and the
11 following disclaimer.
12
13 * Redistributions in binary form must reproduce the
14 above copyright notice, this list of conditions
15 and the following disclaimer in the documentation
16 and/or other materials provided with the distribution.
17
18 * Neither the name of the University of Illinois
19 nor the names of its contributors may be used to
20 endorse or promote products derived from this
21 software without specific prior written permission.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
25 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *****************************************************************************/
35
36 /*****************************************************************************
37 written by
38 Yunhong Gu, last updated 01/02/2011
39 *****************************************************************************/
40
41 #ifdef WIN32
42 #include <winsock2.h>
43 #include <ws2tcpip.h>
44 #ifdef LEGACY_WIN32
45 #include <wspiapi.h>
46 #endif
47 #endif
48
49 #include <cstring>
50 #include "common.h"
51 #include "queue.h"
52 #include "core.h"
53
54 using namespace std;
55
56 CUnitQueue::CUnitQueue():
57 m_pQEntry(NULL),
58 m_pCurrQueue(NULL),
59 m_pLastQueue(NULL),
60 m_iSize(0),
61 m_iCount(0),
62 m_iMSS(),
63 m_iIPversion()
64 {
65 }
66
67 CUnitQueue::~CUnitQueue()
68 {
69 CQEntry* p = m_pQEntry;
70
71 while (p != NULL)
72 {
73 delete [] p->m_pUnit;
74 delete [] p->m_pBuffer;
75
76 CQEntry* q = p;
77 if (p == m_pLastQueue)
78 p = NULL;
79 else
80 p = p->m_pNext;
81 delete q;
82 }
83 }
84
85 int CUnitQueue::init(const int& size, const int& mss, const int& version)
86 {
87 CQEntry* tempq = NULL;
88 CUnit* tempu = NULL;
89 char* tempb = NULL;
90
91 try
92 {
93 tempq = new CQEntry;
94 tempu = new CUnit [size];
95 tempb = new char [size * mss];
96 }
97 catch (...)
98 {
99 delete tempq;
100 delete [] tempu;
101 delete [] tempb;
102
103 return -1;
104 }
105
106 for (int i = 0; i < size; ++ i)
107 {
108 tempu[i].m_iFlag = 0;
109 tempu[i].m_Packet.m_pcData = tempb + i * mss;
110 }
111 tempq->m_pUnit = tempu;
112 tempq->m_pBuffer = tempb;
113 tempq->m_iSize = size;
114
115 m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
116 m_pQEntry->m_pNext = m_pQEntry;
117
118 m_pAvailUnit = m_pCurrQueue->m_pUnit;
119
120 m_iSize = size;
121 m_iMSS = mss;
122 m_iIPversion = version;
123
124 return 0;
125 }
126
127 int CUnitQueue::increase()
128 {
129 // adjust/correct m_iCount
130 int real_count = 0;
131 CQEntry* p = m_pQEntry;
132 while (p != NULL)
133 {
134 CUnit* u = p->m_pUnit;
135 for (CUnit* end = u + p->m_iSize; u != end; ++ u)
136 if (u->m_iFlag != 0)
137 ++ real_count;
138
139 if (p == m_pLastQueue)
140 p = NULL;
141 else
142 p = p->m_pNext;
143 }
144 m_iCount = real_count;
145 if (double(m_iCount) / m_iSize < 0.9)
146 return -1;
147
148 CQEntry* tempq = NULL;
149 CUnit* tempu = NULL;
150 char* tempb = NULL;
151
152 // all queues have the same size
153 int size = m_pQEntry->m_iSize;
154
155 try
156 {
157 tempq = new CQEntry;
158 tempu = new CUnit [size];
159 tempb = new char [size * m_iMSS];
160 }
161 catch (...)
162 {
163 delete tempq;
164 delete [] tempu;
165 delete [] tempb;
166
167 return -1;
168 }
169
170 for (int i = 0; i < size; ++ i)
171 {
172 tempu[i].m_iFlag = 0;
173 tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
174 }
175 tempq->m_pUnit = tempu;
176 tempq->m_pBuffer = tempb;
177 tempq->m_iSize = size;
178
179 m_pLastQueue->m_pNext = tempq;
180 m_pLastQueue = tempq;
181 m_pLastQueue->m_pNext = m_pQEntry;
182
183 m_iSize += size;
184
185 return 0;
186 }
187
188 int CUnitQueue::shrink()
189 {
190 // currently queue cannot be shrunk.
191 return -1;
192 }
193
194 CUnit* CUnitQueue::getNextAvailUnit()
195 {
196 if (m_iCount * 10 > m_iSize * 9)
197 increase();
198
199 if (m_iCount >= m_iSize)
200 return NULL;
201
202 CQEntry* entrance = m_pCurrQueue;
203
204 do
205 {
206 for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
207 if (m_pAvailUnit->m_iFlag == 0)
208 return m_pAvailUnit;
209
210 if (m_pCurrQueue->m_pUnit->m_iFlag == 0)
211 {
212 m_pAvailUnit = m_pCurrQueue->m_pUnit;
213 return m_pAvailUnit;
214 }
215
216 m_pCurrQueue = m_pCurrQueue->m_pNext;
217 m_pAvailUnit = m_pCurrQueue->m_pUnit;
218 } while (m_pCurrQueue != entrance);
219
220 increase();
221
222 return NULL;
223 }
224
225
226 CSndUList::CSndUList():
227 m_pHeap(NULL),
228 m_iArrayLength(4096),
229 m_iLastEntry(-1),
230 m_ListLock(),
231 m_pWindowLock(NULL),
232 m_pWindowCond(NULL),
233 m_pTimer(NULL)
234 {
235 m_pHeap = new CSNode*[m_iArrayLength];
236
237 #ifndef WIN32
238 pthread_mutex_init(&m_ListLock, NULL);
239 #else
240 m_ListLock = CreateMutex(NULL, false, NULL);
241 #endif
242 }
243
244 CSndUList::~CSndUList()
245 {
246 delete [] m_pHeap;
247
248 #ifndef WIN32
249 pthread_mutex_destroy(&m_ListLock);
250 #else
251 CloseHandle(m_ListLock);
252 #endif
253 }
254
255 void CSndUList::insert(const int64_t& ts, const CUDT* u)
256 {
257 CGuard listguard(m_ListLock);
258
259 // increase the heap array size if necessary
260 if (m_iLastEntry == m_iArrayLength - 1)
261 {
262 CSNode** temp = NULL;
263
264 try
265 {
266 temp = new CSNode*[m_iArrayLength * 2];
267 }
268 catch(...)
269 {
270 return;
271 }
272
273 memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
274 m_iArrayLength *= 2;
275 delete [] m_pHeap;
276 m_pHeap = temp;
277 }
278
279 insert_(ts, u);
280 }
281
282 void CSndUList::update(const CUDT* u, const bool& reschedule)
283 {
284 CGuard listguard(m_ListLock);
285
286 CSNode* n = u->m_pSNode;
287
288 if (n->m_iHeapLoc >= 0)
289 {
290 if (!reschedule)
291 return;
292
293 if (n->m_iHeapLoc == 0)
294 {
295 n->m_llTimeStamp = 1;
296 m_pTimer->interrupt();
297 return;
298 }
299
300 remove_(u);
301 }
302
303 insert_(1, u);
304 }
305
306 int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
307 {
308 CGuard listguard(m_ListLock);
309
310 if (-1 == m_iLastEntry)
311 return -1;
312
313 CUDT* u = m_pHeap[0]->m_pUDT;
314 remove_(u);
315
316 if (!u->m_bConnected || u->m_bBroken)
317 return -1;
318
319 // pack a packet from the socket
320 uint64_t ts;
321 if (u->packData(pkt, ts) <= 0)
322 return -1;
323
324 addr = u->m_pPeerAddr;
325
326 // insert a new entry, ts is the next processing time
327 if (ts > 0)
328 insert_(ts, u);
329
330 return 1;
331 }
332
333 void CSndUList::remove(const CUDT* u)
334 {
335 CGuard listguard(m_ListLock);
336
337 remove_(u);
338 }
339
340 uint64_t CSndUList::getNextProcTime()
341 {
342 CGuard listguard(m_ListLock);
343
344 if (-1 == m_iLastEntry)
345 return 0;
346
347 return m_pHeap[0]->m_llTimeStamp;
348 }
349
350 void CSndUList::insert_(const int64_t& ts, const CUDT* u)
351 {
352 CSNode* n = u->m_pSNode;
353
354 // do not insert repeated node
355 if (n->m_iHeapLoc >= 0)
356 return;
357
358 m_iLastEntry ++;
359 m_pHeap[m_iLastEntry] = n;
360 n->m_llTimeStamp = ts;
361
362 int q = m_iLastEntry;
363 int p = q;
364 while (p != 0)
365 {
366 p = (q - 1) >> 1;
367 if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp)
368 {
369 CSNode* t = m_pHeap[p];
370 m_pHeap[p] = m_pHeap[q];
371 m_pHeap[q] = t;
372 t->m_iHeapLoc = q;
373 q = p;
374 }
375 else
376 break;
377 }
378
379 n->m_iHeapLoc = q;
380
381 // first entry, activate the sending queue
382 if (0 == m_iLastEntry)
383 {
384 #ifndef WIN32
385 pthread_mutex_lock(m_pWindowLock);
386 pthread_cond_signal(m_pWindowCond);
387 pthread_mutex_unlock(m_pWindowLock);
388 #else
389 SetEvent(*m_pWindowCond);
390 #endif
391 }
392 }
393
394 void CSndUList::remove_(const CUDT* u)
395 {
396 CSNode* n = u->m_pSNode;
397
398 if (n->m_iHeapLoc >= 0)
399 {
400 // remove the node from heap
401 m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
402 m_iLastEntry --;
403 m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
404
405 int q = n->m_iHeapLoc;
406 int p = q * 2 + 1;
407 while (p <= m_iLastEntry)
408 {
409 if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
410 p ++;
411
412 if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
413 {
414 CSNode* t = m_pHeap[p];
415 m_pHeap[p] = m_pHeap[q];
416 m_pHeap[p]->m_iHeapLoc = p;
417 m_pHeap[q] = t;
418 m_pHeap[q]->m_iHeapLoc = q;
419
420 q = p;
421 p = q * 2 + 1;
422 }
423 else
424 break;
425 }
426
427 n->m_iHeapLoc = -1;
428 }
429 }
430
431 //
432 CSndQueue::CSndQueue():
433 m_WorkerThread(),
434 m_pSndUList(NULL),
435 m_pChannel(NULL),
436 m_pTimer(NULL),
437 m_WindowLock(),
438 m_WindowCond(),
439 m_bClosing(false),
440 m_ExitCond()
441 {
442 #ifndef WIN32
443 pthread_cond_init(&m_WindowCond, NULL);
444 pthread_mutex_init(&m_WindowLock, NULL);
445 #else
446 m_WindowLock = CreateMutex(NULL, false, NULL);
447 m_WindowCond = CreateEvent(NULL, false, false, NULL);
448 m_ExitCond = CreateEvent(NULL, false, false, NULL);
449 #endif
450 }
451
452 CSndQueue::~CSndQueue()
453 {
454 m_bClosing = true;
455
456 #ifndef WIN32
457 pthread_mutex_lock(&m_WindowLock);
458 pthread_cond_signal(&m_WindowCond);
459 pthread_mutex_unlock(&m_WindowLock);
460 if (0 != m_WorkerThread)
461 pthread_join(m_WorkerThread, NULL);
462 pthread_cond_destroy(&m_WindowCond);
463 pthread_mutex_destroy(&m_WindowLock);
464 #else
465 SetEvent(m_WindowCond);
466 if (NULL != m_WorkerThread)
467 WaitForSingleObject(m_ExitCond, INFINITE);
468 CloseHandle(m_WorkerThread);
469 CloseHandle(m_WindowLock);
470 CloseHandle(m_WindowCond);
471 CloseHandle(m_ExitCond);
472 #endif
473
474 delete m_pSndUList;
475 }
476
477 void CSndQueue::init(const CChannel* c, const CTimer* t)
478 {
479 m_pChannel = (CChannel*)c;
480 m_pTimer = (CTimer*)t;
481 m_pSndUList = new CSndUList;
482 m_pSndUList->m_pWindowLock = &m_WindowLock;
483 m_pSndUList->m_pWindowCond = &m_WindowCond;
484 m_pSndUList->m_pTimer = m_pTimer;
485
486 #ifndef WIN32
487 if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
488 {
489 m_WorkerThread = 0;
490 throw CUDTException(3, 1);
491 }
492 #else
493 DWORD threadID;
494 m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &thread ID);
495 if (NULL == m_WorkerThread)
496 throw CUDTException(3, 1);
497 #endif
498 }
499
500 #ifndef WIN32
501 void* CSndQueue::worker(void* param)
502 #else
503 DWORD WINAPI CSndQueue::worker(LPVOID param)
504 #endif
505 {
506 CSndQueue* self = (CSndQueue*)param;
507
508 while (!self->m_bClosing)
509 {
510 uint64_t ts = self->m_pSndUList->getNextProcTime();
511
512 if (ts > 0)
513 {
514 // wait until next processing time of the first socket on the list
515 uint64_t currtime;
516 CTimer::rdtsc(currtime);
517 if (currtime < ts)
518 self->m_pTimer->sleepto(ts);
519
520 // it is time to send the next pkt
521 sockaddr* addr;
522 CPacket pkt;
523 if (self->m_pSndUList->pop(addr, pkt) < 0)
524 continue;
525
526 self->m_pChannel->sendto(addr, pkt);
527 }
528 else
529 {
530 // wait here if there is no sockets with data to be sent
531 #ifndef WIN32
532 pthread_mutex_lock(&self->m_WindowLock);
533 if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
534 pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
535 pthread_mutex_unlock(&self->m_WindowLock);
536 #else
537 WaitForSingleObject(self->m_WindowCond, INFINITE);
538 #endif
539 }
540 }
541
542 #ifndef WIN32
543 return NULL;
544 #else
545 SetEvent(self->m_ExitCond);
546 return 0;
547 #endif
548 }
549
550 int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
551 {
552 // send out the packet immediately (high priority), this is a control packet
553 m_pChannel->sendto(addr, packet);
554 return packet.getLength();
555 }
556
557
558 //
559 CRcvUList::CRcvUList():
560 m_pUList(NULL),
561 m_pLast(NULL)
562 {
563 }
564
565 CRcvUList::~CRcvUList()
566 {
567 }
568
569 void CRcvUList::insert(const CUDT* u)
570 {
571 CRNode* n = u->m_pRNode;
572 CTimer::rdtsc(n->m_llTimeStamp);
573
574 if (NULL == m_pUList)
575 {
576 // empty list, insert as the single node
577 n->m_pPrev = n->m_pNext = NULL;
578 m_pLast = m_pUList = n;
579
580 return;
581 }
582
583 // always insert at the end for RcvUList
584 n->m_pPrev = m_pLast;
585 n->m_pNext = NULL;
586 m_pLast->m_pNext = n;
587 m_pLast = n;
588 }
589
590 void CRcvUList::remove(const CUDT* u)
591 {
592 CRNode* n = u->m_pRNode;
593
594 if (!n->m_bOnList)
595 return;
596
597 if (NULL == n->m_pPrev)
598 {
599 // n is the first node
600 m_pUList = n->m_pNext;
601 if (NULL == m_pUList)
602 m_pLast = NULL;
603 else
604 m_pUList->m_pPrev = NULL;
605 }
606 else
607 {
608 n->m_pPrev->m_pNext = n->m_pNext;
609 if (NULL == n->m_pNext)
610 {
611 // n is the last node
612 m_pLast = n->m_pPrev;
613 }
614 else
615 n->m_pNext->m_pPrev = n->m_pPrev;
616 }
617
618 n->m_pNext = n->m_pPrev = NULL;
619 }
620
621 void CRcvUList::update(const CUDT* u)
622 {
623 CRNode* n = u->m_pRNode;
624
625 if (!n->m_bOnList)
626 return;
627
628 CTimer::rdtsc(n->m_llTimeStamp);
629
630 // if n is the last node, do not need to change
631 if (NULL == n->m_pNext)
632 return;
633
634 if (NULL == n->m_pPrev)
635 {
636 m_pUList = n->m_pNext;
637 m_pUList->m_pPrev = NULL;
638 }
639 else
640 {
641 n->m_pPrev->m_pNext = n->m_pNext;
642 n->m_pNext->m_pPrev = n->m_pPrev;
643 }
644
645 n->m_pPrev = m_pLast;
646 n->m_pNext = NULL;
647 m_pLast->m_pNext = n;
648 m_pLast = n;
649 }
650
651 //
652 CHash::CHash():
653 m_pBucket(NULL),
654 m_iHashSize(0)
655 {
656 }
657
658 CHash::~CHash()
659 {
660 for (int i = 0; i < m_iHashSize; ++ i)
661 {
662 CBucket* b = m_pBucket[i];
663 while (NULL != b)
664 {
665 CBucket* n = b->m_pNext;
666 delete b;
667 b = n;
668 }
669 }
670
671 delete [] m_pBucket;
672 }
673
674 void CHash::init(const int& size)
675 {
676 m_pBucket = new CBucket* [size];
677
678 for (int i = 0; i < size; ++ i)
679 m_pBucket[i] = NULL;
680
681 m_iHashSize = size;
682 }
683
684 CUDT* CHash::lookup(const int32_t& id)
685 {
686 // simple hash function (% hash table size); suitable for socket descriptors
687 CBucket* b = m_pBucket[id % m_iHashSize];
688
689 while (NULL != b)
690 {
691 if (id == b->m_iID)
692 return b->m_pUDT;
693 b = b->m_pNext;
694 }
695
696 return NULL;
697 }
698
699 void CHash::insert(const int32_t& id, const CUDT* u)
700 {
701 CBucket* b = m_pBucket[id % m_iHashSize];
702
703 CBucket* n = new CBucket;
704 n->m_iID = id;
705 n->m_pUDT = (CUDT*)u;
706 n->m_pNext = b;
707
708 m_pBucket[id % m_iHashSize] = n;
709 }
710
711 void CHash::remove(const int32_t& id)
712 {
713 CBucket* b = m_pBucket[id % m_iHashSize];
714 CBucket* p = NULL;
715
716 while (NULL != b)
717 {
718 if (id == b->m_iID)
719 {
720 if (NULL == p)
721 m_pBucket[id % m_iHashSize] = b->m_pNext;
722 else
723 p->m_pNext = b->m_pNext;
724
725 delete b;
726
727 return;
728 }
729
730 p = b;
731 b = b->m_pNext;
732 }
733 }
734
735
736 //
737 CRendezvousQueue::CRendezvousQueue():
738 m_vRendezvousID(),
739 m_RIDVectorLock()
740 {
741 #ifndef WIN32
742 pthread_mutex_init(&m_RIDVectorLock, NULL);
743 #else
744 m_RIDVectorLock = CreateMutex(NULL, false, NULL);
745 #endif
746 }
747
748 CRendezvousQueue::~CRendezvousQueue()
749 {
750 #ifndef WIN32
751 pthread_mutex_destroy(&m_RIDVectorLock);
752 #else
753 CloseHandle(m_RIDVectorLock);
754 #endif
755
756 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID. end(); ++ i)
757 {
758 if (AF_INET == i->m_iIPversion)
759 delete (sockaddr_in*)i->m_pPeerAddr;
760 else
761 delete (sockaddr_in6*)i->m_pPeerAddr;
762 }
763
764 m_vRendezvousID.clear();
765 }
766
767 void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockadd r* addr)
768 {
769 CGuard vg(m_RIDVectorLock);
770
771 CRL r;
772 r.m_iID = id;
773 r.m_iIPversion = ipv;
774 r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)ne w sockaddr_in6;
775 memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(s ockaddr_in6));
776
777 m_vRendezvousID.insert(m_vRendezvousID.end(), r);
778 }
779
780 void CRendezvousQueue::remove(const UDTSOCKET& id)
781 {
782 CGuard vg(m_RIDVectorLock);
783
784 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID. end(); ++ i)
785 {
786 if (i->m_iID == id)
787 {
788 if (AF_INET == i->m_iIPversion)
789 delete (sockaddr_in*)i->m_pPeerAddr;
790 else
791 delete (sockaddr_in6*)i->m_pPeerAddr;
792
793 m_vRendezvousID.erase(i);
794
795 return;
796 }
797 }
798 }
799
800 bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id)
801 {
802 CGuard vg(m_RIDVectorLock);
803
804 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID. end(); ++ i)
805 {
806 if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID)))
807 {
808 id = i->m_iID;
809 return true;
810 }
811 }
812
813 return false;
814 }
815
816
817 //
818 CRcvQueue::CRcvQueue():
819 m_WorkerThread(),
820 m_UnitQueue(),
821 m_pRcvUList(NULL),
822 m_pHash(NULL),
823 m_pChannel(NULL),
824 m_pTimer(NULL),
825 m_iPayloadSize(),
826 m_bClosing(false),
827 m_ExitCond(),
828 m_LSLock(),
829 m_pListener(NULL),
830 m_pRendezvousQueue(NULL),
831 m_vNewEntry(),
832 m_IDLock(),
833 m_mBuffer(),
834 m_PassLock(),
835 m_PassCond()
836 {
837 #ifndef WIN32
838 pthread_mutex_init(&m_PassLock, NULL);
839 pthread_cond_init(&m_PassCond, NULL);
840 pthread_mutex_init(&m_LSLock, NULL);
841 pthread_mutex_init(&m_IDLock, NULL);
842 #else
843 m_PassLock = CreateMutex(NULL, false, NULL);
844 m_PassCond = CreateEvent(NULL, false, false, NULL);
845 m_LSLock = CreateMutex(NULL, false, NULL);
846 m_IDLock = CreateMutex(NULL, false, NULL);
847 m_ExitCond = CreateEvent(NULL, false, false, NULL);
848 #endif
849 }
850
851 CRcvQueue::~CRcvQueue()
852 {
853 m_bClosing = true;
854
855 #ifndef WIN32
856 if (0 != m_WorkerThread)
857 pthread_join(m_WorkerThread, NULL);
858 pthread_mutex_destroy(&m_PassLock);
859 pthread_cond_destroy(&m_PassCond);
860 pthread_mutex_destroy(&m_LSLock);
861 pthread_mutex_destroy(&m_IDLock);
862 #else
863 if (NULL != m_WorkerThread)
864 WaitForSingleObject(m_ExitCond, INFINITE);
865 CloseHandle(m_WorkerThread);
866 CloseHandle(m_PassLock);
867 CloseHandle(m_PassCond);
868 CloseHandle(m_LSLock);
869 CloseHandle(m_IDLock);
870 CloseHandle(m_ExitCond);
871 #endif
872
873 delete m_pRcvUList;
874 delete m_pHash;
875 delete m_pRendezvousQueue;
876
877 // remove all queued messages
878 for (map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_m Buffer.end(); ++ i)
879 {
880 while (!i->second.empty())
881 {
882 CPacket* pkt = i->second.front();
883 delete [] pkt->m_pcData;
884 delete pkt;
885 i->second.pop();
886 }
887 }
888 }
889
890 void CRcvQueue::init(const int& qsize, const int& payload, const int& version, c onst int& hsize, const CChannel* cc, const CTimer* t)
891 {
892 m_iPayloadSize = payload;
893
894 m_UnitQueue.init(qsize, payload, version);
895
896 m_pHash = new CHash;
897 m_pHash->init(hsize);
898
899 m_pChannel = (CChannel*)cc;
900 m_pTimer = (CTimer*)t;
901
902 m_pRcvUList = new CRcvUList;
903 m_pRendezvousQueue = new CRendezvousQueue;
904
905 #ifndef WIN32
906 if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
907 {
908 m_WorkerThread = 0;
909 throw CUDTException(3, 1);
910 }
911 #else
912 DWORD threadID;
913 m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &thread ID);
914 if (NULL == m_WorkerThread)
915 throw CUDTException(3, 1);
916 #endif
917 }
918
919 #ifndef WIN32
920 void* CRcvQueue::worker(void* param)
921 #else
922 DWORD WINAPI CRcvQueue::worker(LPVOID param)
923 #endif
924 {
925 CRcvQueue* self = (CRcvQueue*)param;
926
927 sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) ne w sockaddr_in : (sockaddr*) new sockaddr_in6;
928 CUDT* u = NULL;
929 int32_t id;
930
931 while (!self->m_bClosing)
932 {
933 #ifdef NO_BUSY_WAITING
934 self->m_pTimer->tick();
935 #endif
936
937 // check waiting list, if new socket, insert it to the list
938 while (self->ifNewEntry())
939 {
940 CUDT* ne = self->getNewEntry();
941 if (NULL != ne)
942 {
943 self->m_pRcvUList->insert(ne);
944 self->m_pHash->insert(ne->m_SocketID, ne);
945 }
946 }
947
948 // find next available slot for incoming packet
949 CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
950 if (NULL == unit)
951 {
952 // no space, skip this packet
953 CPacket temp;
954 temp.m_pcData = new char[self->m_iPayloadSize];
955 temp.setLength(self->m_iPayloadSize);
956 self->m_pChannel->recvfrom(addr, temp);
957 delete [] temp.m_pcData;
958 goto TIMER_CHECK;
959 }
960
961 unit->m_Packet.setLength(self->m_iPayloadSize);
962
963 // reading next incoming packet, recvfrom returns -1 is nothing has been r eceived
964 if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
965 goto TIMER_CHECK;
966
967 id = unit->m_Packet.m_iID;
968
969 // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
970 if (0 == id)
971 {
972 if (NULL != self->m_pListener)
973 ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet);
974 else if (self->m_pRendezvousQueue->retrieve(addr, id))
975 self->storePkt(id, unit->m_Packet.clone());
976 }
977 else if (id > 0)
978 {
979 if (NULL != (u = self->m_pHash->lookup(id)))
980 {
981 if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
982 {
983 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
984 {
985 if (0 == unit->m_Packet.getFlag())
986 u->processData(unit);
987 else
988 u->processCtrl(unit->m_Packet);
989
990 u->checkTimers();
991 self->m_pRcvUList->update(u);
992 }
993 }
994 }
995 else if (self->m_pRendezvousQueue->retrieve(addr, id))
996 self->storePkt(id, unit->m_Packet.clone());
997 }
998
999 TIMER_CHECK:
1000 // take care of the timing event for all UDT sockets
1001
1002 CRNode* ul = self->m_pRcvUList->m_pUList;
1003 uint64_t currtime;
1004 CTimer::rdtsc(currtime);
1005 uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
1006
1007 while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
1008 {
1009 CUDT* u = ul->m_pUDT;
1010
1011 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
1012 {
1013 u->checkTimers();
1014 self->m_pRcvUList->update(u);
1015 }
1016 else
1017 {
1018 // the socket must be removed from Hash table first, then RcvUList
1019 self->m_pHash->remove(u->m_SocketID);
1020 self->m_pRcvUList->remove(u);
1021 u->m_pRNode->m_bOnList = false;
1022 }
1023
1024 ul = self->m_pRcvUList->m_pUList;
1025 }
1026 }
1027
1028 if (AF_INET == self->m_UnitQueue.m_iIPversion)
1029 delete (sockaddr_in*)addr;
1030 else
1031 delete (sockaddr_in6*)addr;
1032
1033 #ifndef WIN32
1034 return NULL;
1035 #else
1036 SetEvent(self->m_ExitCond);
1037 return 0;
1038 #endif
1039 }
1040
1041 int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet)
1042 {
1043 CGuard bufferlock(m_PassLock);
1044
1045 map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1046
1047 if (i == m_mBuffer.end())
1048 {
1049 #ifndef WIN32
1050 uint64_t now = CTimer::getTime();
1051 timespec timeout;
1052
1053 timeout.tv_sec = now / 1000000 + 1;
1054 timeout.tv_nsec = (now % 1000000) * 1000;
1055
1056 pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);
1057 #else
1058 ReleaseMutex(m_PassLock);
1059 WaitForSingleObject(m_PassCond, 1000);
1060 WaitForSingleObject(m_PassLock, INFINITE);
1061 #endif
1062
1063 i = m_mBuffer.find(id);
1064 if (i == m_mBuffer.end())
1065 {
1066 packet.setLength(-1);
1067 return -1;
1068 }
1069 }
1070
1071 // retrieve the earliest packet
1072 CPacket* newpkt = i->second.front();
1073
1074 if (packet.getLength() < newpkt->getLength())
1075 {
1076 packet.setLength(-1);
1077 return -1;
1078 }
1079
1080 // copy packet content
1081 memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
1082 memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
1083 packet.setLength(newpkt->getLength());
1084
1085 delete [] newpkt->m_pcData;
1086 delete newpkt;
1087
1088 // remove this message from queue,
1089 // if no more messages left for this socket, release its data structure
1090 i->second.pop();
1091 if (i->second.empty())
1092 m_mBuffer.erase(i);
1093
1094 return packet.getLength();
1095 }
1096
1097 int CRcvQueue::setListener(const CUDT* u)
1098 {
1099 CGuard lslock(m_LSLock);
1100
1101 if (NULL != m_pListener)
1102 return -1;
1103
1104 m_pListener = (CUDT*)u;
1105 return 1;
1106 }
1107
1108 void CRcvQueue::removeListener(const CUDT* u)
1109 {
1110 CGuard lslock(m_LSLock);
1111
1112 if (u == m_pListener)
1113 m_pListener = NULL;
1114 }
1115
1116 void CRcvQueue::setNewEntry(CUDT* u)
1117 {
1118 CGuard listguard(m_IDLock);
1119 m_vNewEntry.insert(m_vNewEntry.end(), u);
1120 }
1121
1122 bool CRcvQueue::ifNewEntry()
1123 {
1124 return !(m_vNewEntry.empty());
1125 }
1126
1127 CUDT* CRcvQueue::getNewEntry()
1128 {
1129 CGuard listguard(m_IDLock);
1130
1131 if (m_vNewEntry.empty())
1132 return NULL;
1133
1134 CUDT* u = (CUDT*)*(m_vNewEntry.begin());
1135 m_vNewEntry.erase(m_vNewEntry.begin());
1136
1137 return u;
1138 }
1139
1140 void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt)
1141 {
1142 CGuard bufferlock(m_PassLock);
1143
1144 map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id);
1145
1146 if (i == m_mBuffer.end())
1147 {
1148 m_mBuffer[id].push(pkt);
1149
1150 #ifndef WIN32
1151 pthread_cond_signal(&m_PassCond);
1152 #else
1153 SetEvent(m_PassCond);
1154 #endif
1155 }
1156 else
1157 {
1158 //avoid storing too many packets, in case of malfunction or attack
1159 if (i->second.size() > 16)
1160 return;
1161
1162 i->second.push(pkt);
1163 }
1164 }
OLDNEW
« no previous file with comments | « net/third_party/udt/src/queue.h ('k') | net/third_party/udt/src/udt.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698