| OLD | NEW |
| (Empty) |
| 1 /***************************************************************************** | |
| 2 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois. | |
| 3 All rights reserved. | |
| 4 | |
| 5 Redistribution and use in source and binary forms, with or without | |
| 6 modification, are permitted provided that the following conditions are | |
| 7 met: | |
| 8 | |
| 9 * Redistributions of source code must retain the above | |
| 10 copyright notice, this list of conditions and the | |
| 11 following disclaimer. | |
| 12 | |
| 13 * Redistributions in binary form must reproduce the | |
| 14 above copyright notice, this list of conditions | |
| 15 and the following disclaimer in the documentation | |
| 16 and/or other materials provided with the distribution. | |
| 17 | |
| 18 * Neither the name of the University of Illinois | |
| 19 nor the names of its contributors may be used to | |
| 20 endorse or promote products derived from this | |
| 21 software without specific prior written permission. | |
| 22 | |
| 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS | |
| 24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, | |
| 25 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR | |
| 26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR | |
| 27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, | |
| 28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, | |
| 29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | |
| 30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | |
| 31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | |
| 32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
| 33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
| 34 *****************************************************************************/ | |
| 35 | |
| 36 /***************************************************************************** | |
| 37 written by | |
| 38 Yunhong Gu, last updated 01/02/2011 | |
| 39 *****************************************************************************/ | |
| 40 | |
| 41 #ifdef WIN32 | |
| 42 #include <winsock2.h> | |
| 43 #include <ws2tcpip.h> | |
| 44 #ifdef LEGACY_WIN32 | |
| 45 #include <wspiapi.h> | |
| 46 #endif | |
| 47 #endif | |
| 48 | |
| 49 #include <cstring> | |
| 50 #include "common.h" | |
| 51 #include "queue.h" | |
| 52 #include "core.h" | |
| 53 | |
| 54 using namespace std; | |
| 55 | |
| 56 CUnitQueue::CUnitQueue(): | |
| 57 m_pQEntry(NULL), | |
| 58 m_pCurrQueue(NULL), | |
| 59 m_pLastQueue(NULL), | |
| 60 m_iSize(0), | |
| 61 m_iCount(0), | |
| 62 m_iMSS(), | |
| 63 m_iIPversion() | |
| 64 { | |
| 65 } | |
| 66 | |
| 67 CUnitQueue::~CUnitQueue() | |
| 68 { | |
| 69 CQEntry* p = m_pQEntry; | |
| 70 | |
| 71 while (p != NULL) | |
| 72 { | |
| 73 delete [] p->m_pUnit; | |
| 74 delete [] p->m_pBuffer; | |
| 75 | |
| 76 CQEntry* q = p; | |
| 77 if (p == m_pLastQueue) | |
| 78 p = NULL; | |
| 79 else | |
| 80 p = p->m_pNext; | |
| 81 delete q; | |
| 82 } | |
| 83 } | |
| 84 | |
| 85 int CUnitQueue::init(const int& size, const int& mss, const int& version) | |
| 86 { | |
| 87 CQEntry* tempq = NULL; | |
| 88 CUnit* tempu = NULL; | |
| 89 char* tempb = NULL; | |
| 90 | |
| 91 try | |
| 92 { | |
| 93 tempq = new CQEntry; | |
| 94 tempu = new CUnit [size]; | |
| 95 tempb = new char [size * mss]; | |
| 96 } | |
| 97 catch (...) | |
| 98 { | |
| 99 delete tempq; | |
| 100 delete [] tempu; | |
| 101 delete [] tempb; | |
| 102 | |
| 103 return -1; | |
| 104 } | |
| 105 | |
| 106 for (int i = 0; i < size; ++ i) | |
| 107 { | |
| 108 tempu[i].m_iFlag = 0; | |
| 109 tempu[i].m_Packet.m_pcData = tempb + i * mss; | |
| 110 } | |
| 111 tempq->m_pUnit = tempu; | |
| 112 tempq->m_pBuffer = tempb; | |
| 113 tempq->m_iSize = size; | |
| 114 | |
| 115 m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; | |
| 116 m_pQEntry->m_pNext = m_pQEntry; | |
| 117 | |
| 118 m_pAvailUnit = m_pCurrQueue->m_pUnit; | |
| 119 | |
| 120 m_iSize = size; | |
| 121 m_iMSS = mss; | |
| 122 m_iIPversion = version; | |
| 123 | |
| 124 return 0; | |
| 125 } | |
| 126 | |
| 127 int CUnitQueue::increase() | |
| 128 { | |
| 129 // adjust/correct m_iCount | |
| 130 int real_count = 0; | |
| 131 CQEntry* p = m_pQEntry; | |
| 132 while (p != NULL) | |
| 133 { | |
| 134 CUnit* u = p->m_pUnit; | |
| 135 for (CUnit* end = u + p->m_iSize; u != end; ++ u) | |
| 136 if (u->m_iFlag != 0) | |
| 137 ++ real_count; | |
| 138 | |
| 139 if (p == m_pLastQueue) | |
| 140 p = NULL; | |
| 141 else | |
| 142 p = p->m_pNext; | |
| 143 } | |
| 144 m_iCount = real_count; | |
| 145 if (double(m_iCount) / m_iSize < 0.9) | |
| 146 return -1; | |
| 147 | |
| 148 CQEntry* tempq = NULL; | |
| 149 CUnit* tempu = NULL; | |
| 150 char* tempb = NULL; | |
| 151 | |
| 152 // all queues have the same size | |
| 153 int size = m_pQEntry->m_iSize; | |
| 154 | |
| 155 try | |
| 156 { | |
| 157 tempq = new CQEntry; | |
| 158 tempu = new CUnit [size]; | |
| 159 tempb = new char [size * m_iMSS]; | |
| 160 } | |
| 161 catch (...) | |
| 162 { | |
| 163 delete tempq; | |
| 164 delete [] tempu; | |
| 165 delete [] tempb; | |
| 166 | |
| 167 return -1; | |
| 168 } | |
| 169 | |
| 170 for (int i = 0; i < size; ++ i) | |
| 171 { | |
| 172 tempu[i].m_iFlag = 0; | |
| 173 tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS; | |
| 174 } | |
| 175 tempq->m_pUnit = tempu; | |
| 176 tempq->m_pBuffer = tempb; | |
| 177 tempq->m_iSize = size; | |
| 178 | |
| 179 m_pLastQueue->m_pNext = tempq; | |
| 180 m_pLastQueue = tempq; | |
| 181 m_pLastQueue->m_pNext = m_pQEntry; | |
| 182 | |
| 183 m_iSize += size; | |
| 184 | |
| 185 return 0; | |
| 186 } | |
| 187 | |
| 188 int CUnitQueue::shrink() | |
| 189 { | |
| 190 // currently queue cannot be shrunk. | |
| 191 return -1; | |
| 192 } | |
| 193 | |
| 194 CUnit* CUnitQueue::getNextAvailUnit() | |
| 195 { | |
| 196 if (m_iCount * 10 > m_iSize * 9) | |
| 197 increase(); | |
| 198 | |
| 199 if (m_iCount >= m_iSize) | |
| 200 return NULL; | |
| 201 | |
| 202 CQEntry* entrance = m_pCurrQueue; | |
| 203 | |
| 204 do | |
| 205 { | |
| 206 for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1;
m_pAvailUnit != sentinel; ++ m_pAvailUnit) | |
| 207 if (m_pAvailUnit->m_iFlag == 0) | |
| 208 return m_pAvailUnit; | |
| 209 | |
| 210 if (m_pCurrQueue->m_pUnit->m_iFlag == 0) | |
| 211 { | |
| 212 m_pAvailUnit = m_pCurrQueue->m_pUnit; | |
| 213 return m_pAvailUnit; | |
| 214 } | |
| 215 | |
| 216 m_pCurrQueue = m_pCurrQueue->m_pNext; | |
| 217 m_pAvailUnit = m_pCurrQueue->m_pUnit; | |
| 218 } while (m_pCurrQueue != entrance); | |
| 219 | |
| 220 increase(); | |
| 221 | |
| 222 return NULL; | |
| 223 } | |
| 224 | |
| 225 | |
| 226 CSndUList::CSndUList(): | |
| 227 m_pHeap(NULL), | |
| 228 m_iArrayLength(4096), | |
| 229 m_iLastEntry(-1), | |
| 230 m_ListLock(), | |
| 231 m_pWindowLock(NULL), | |
| 232 m_pWindowCond(NULL), | |
| 233 m_pTimer(NULL) | |
| 234 { | |
| 235 m_pHeap = new CSNode*[m_iArrayLength]; | |
| 236 | |
| 237 #ifndef WIN32 | |
| 238 pthread_mutex_init(&m_ListLock, NULL); | |
| 239 #else | |
| 240 m_ListLock = CreateMutex(NULL, false, NULL); | |
| 241 #endif | |
| 242 } | |
| 243 | |
| 244 CSndUList::~CSndUList() | |
| 245 { | |
| 246 delete [] m_pHeap; | |
| 247 | |
| 248 #ifndef WIN32 | |
| 249 pthread_mutex_destroy(&m_ListLock); | |
| 250 #else | |
| 251 CloseHandle(m_ListLock); | |
| 252 #endif | |
| 253 } | |
| 254 | |
| 255 void CSndUList::insert(const int64_t& ts, const CUDT* u) | |
| 256 { | |
| 257 CGuard listguard(m_ListLock); | |
| 258 | |
| 259 // increase the heap array size if necessary | |
| 260 if (m_iLastEntry == m_iArrayLength - 1) | |
| 261 { | |
| 262 CSNode** temp = NULL; | |
| 263 | |
| 264 try | |
| 265 { | |
| 266 temp = new CSNode*[m_iArrayLength * 2]; | |
| 267 } | |
| 268 catch(...) | |
| 269 { | |
| 270 return; | |
| 271 } | |
| 272 | |
| 273 memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength); | |
| 274 m_iArrayLength *= 2; | |
| 275 delete [] m_pHeap; | |
| 276 m_pHeap = temp; | |
| 277 } | |
| 278 | |
| 279 insert_(ts, u); | |
| 280 } | |
| 281 | |
| 282 void CSndUList::update(const CUDT* u, const bool& reschedule) | |
| 283 { | |
| 284 CGuard listguard(m_ListLock); | |
| 285 | |
| 286 CSNode* n = u->m_pSNode; | |
| 287 | |
| 288 if (n->m_iHeapLoc >= 0) | |
| 289 { | |
| 290 if (!reschedule) | |
| 291 return; | |
| 292 | |
| 293 if (n->m_iHeapLoc == 0) | |
| 294 { | |
| 295 n->m_llTimeStamp = 1; | |
| 296 m_pTimer->interrupt(); | |
| 297 return; | |
| 298 } | |
| 299 | |
| 300 remove_(u); | |
| 301 } | |
| 302 | |
| 303 insert_(1, u); | |
| 304 } | |
| 305 | |
| 306 int CSndUList::pop(sockaddr*& addr, CPacket& pkt) | |
| 307 { | |
| 308 CGuard listguard(m_ListLock); | |
| 309 | |
| 310 if (-1 == m_iLastEntry) | |
| 311 return -1; | |
| 312 | |
| 313 CUDT* u = m_pHeap[0]->m_pUDT; | |
| 314 remove_(u); | |
| 315 | |
| 316 if (!u->m_bConnected || u->m_bBroken) | |
| 317 return -1; | |
| 318 | |
| 319 // pack a packet from the socket | |
| 320 uint64_t ts; | |
| 321 if (u->packData(pkt, ts) <= 0) | |
| 322 return -1; | |
| 323 | |
| 324 addr = u->m_pPeerAddr; | |
| 325 | |
| 326 // insert a new entry, ts is the next processing time | |
| 327 if (ts > 0) | |
| 328 insert_(ts, u); | |
| 329 | |
| 330 return 1; | |
| 331 } | |
| 332 | |
| 333 void CSndUList::remove(const CUDT* u) | |
| 334 { | |
| 335 CGuard listguard(m_ListLock); | |
| 336 | |
| 337 remove_(u); | |
| 338 } | |
| 339 | |
| 340 uint64_t CSndUList::getNextProcTime() | |
| 341 { | |
| 342 CGuard listguard(m_ListLock); | |
| 343 | |
| 344 if (-1 == m_iLastEntry) | |
| 345 return 0; | |
| 346 | |
| 347 return m_pHeap[0]->m_llTimeStamp; | |
| 348 } | |
| 349 | |
| 350 void CSndUList::insert_(const int64_t& ts, const CUDT* u) | |
| 351 { | |
| 352 CSNode* n = u->m_pSNode; | |
| 353 | |
| 354 // do not insert repeated node | |
| 355 if (n->m_iHeapLoc >= 0) | |
| 356 return; | |
| 357 | |
| 358 m_iLastEntry ++; | |
| 359 m_pHeap[m_iLastEntry] = n; | |
| 360 n->m_llTimeStamp = ts; | |
| 361 | |
| 362 int q = m_iLastEntry; | |
| 363 int p = q; | |
| 364 while (p != 0) | |
| 365 { | |
| 366 p = (q - 1) >> 1; | |
| 367 if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) | |
| 368 { | |
| 369 CSNode* t = m_pHeap[p]; | |
| 370 m_pHeap[p] = m_pHeap[q]; | |
| 371 m_pHeap[q] = t; | |
| 372 t->m_iHeapLoc = q; | |
| 373 q = p; | |
| 374 } | |
| 375 else | |
| 376 break; | |
| 377 } | |
| 378 | |
| 379 n->m_iHeapLoc = q; | |
| 380 | |
| 381 // first entry, activate the sending queue | |
| 382 if (0 == m_iLastEntry) | |
| 383 { | |
| 384 #ifndef WIN32 | |
| 385 pthread_mutex_lock(m_pWindowLock); | |
| 386 pthread_cond_signal(m_pWindowCond); | |
| 387 pthread_mutex_unlock(m_pWindowLock); | |
| 388 #else | |
| 389 SetEvent(*m_pWindowCond); | |
| 390 #endif | |
| 391 } | |
| 392 } | |
| 393 | |
| 394 void CSndUList::remove_(const CUDT* u) | |
| 395 { | |
| 396 CSNode* n = u->m_pSNode; | |
| 397 | |
| 398 if (n->m_iHeapLoc >= 0) | |
| 399 { | |
| 400 // remove the node from heap | |
| 401 m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry]; | |
| 402 m_iLastEntry --; | |
| 403 m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc; | |
| 404 | |
| 405 int q = n->m_iHeapLoc; | |
| 406 int p = q * 2 + 1; | |
| 407 while (p <= m_iLastEntry) | |
| 408 { | |
| 409 if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p +
1]->m_llTimeStamp)) | |
| 410 p ++; | |
| 411 | |
| 412 if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp) | |
| 413 { | |
| 414 CSNode* t = m_pHeap[p]; | |
| 415 m_pHeap[p] = m_pHeap[q]; | |
| 416 m_pHeap[p]->m_iHeapLoc = p; | |
| 417 m_pHeap[q] = t; | |
| 418 m_pHeap[q]->m_iHeapLoc = q; | |
| 419 | |
| 420 q = p; | |
| 421 p = q * 2 + 1; | |
| 422 } | |
| 423 else | |
| 424 break; | |
| 425 } | |
| 426 | |
| 427 n->m_iHeapLoc = -1; | |
| 428 } | |
| 429 } | |
| 430 | |
| 431 // | |
| 432 CSndQueue::CSndQueue(): | |
| 433 m_WorkerThread(), | |
| 434 m_pSndUList(NULL), | |
| 435 m_pChannel(NULL), | |
| 436 m_pTimer(NULL), | |
| 437 m_WindowLock(), | |
| 438 m_WindowCond(), | |
| 439 m_bClosing(false), | |
| 440 m_ExitCond() | |
| 441 { | |
| 442 #ifndef WIN32 | |
| 443 pthread_cond_init(&m_WindowCond, NULL); | |
| 444 pthread_mutex_init(&m_WindowLock, NULL); | |
| 445 #else | |
| 446 m_WindowLock = CreateMutex(NULL, false, NULL); | |
| 447 m_WindowCond = CreateEvent(NULL, false, false, NULL); | |
| 448 m_ExitCond = CreateEvent(NULL, false, false, NULL); | |
| 449 #endif | |
| 450 } | |
| 451 | |
| 452 CSndQueue::~CSndQueue() | |
| 453 { | |
| 454 m_bClosing = true; | |
| 455 | |
| 456 #ifndef WIN32 | |
| 457 pthread_mutex_lock(&m_WindowLock); | |
| 458 pthread_cond_signal(&m_WindowCond); | |
| 459 pthread_mutex_unlock(&m_WindowLock); | |
| 460 if (0 != m_WorkerThread) | |
| 461 pthread_join(m_WorkerThread, NULL); | |
| 462 pthread_cond_destroy(&m_WindowCond); | |
| 463 pthread_mutex_destroy(&m_WindowLock); | |
| 464 #else | |
| 465 SetEvent(m_WindowCond); | |
| 466 if (NULL != m_WorkerThread) | |
| 467 WaitForSingleObject(m_ExitCond, INFINITE); | |
| 468 CloseHandle(m_WorkerThread); | |
| 469 CloseHandle(m_WindowLock); | |
| 470 CloseHandle(m_WindowCond); | |
| 471 CloseHandle(m_ExitCond); | |
| 472 #endif | |
| 473 | |
| 474 delete m_pSndUList; | |
| 475 } | |
| 476 | |
| 477 void CSndQueue::init(const CChannel* c, const CTimer* t) | |
| 478 { | |
| 479 m_pChannel = (CChannel*)c; | |
| 480 m_pTimer = (CTimer*)t; | |
| 481 m_pSndUList = new CSndUList; | |
| 482 m_pSndUList->m_pWindowLock = &m_WindowLock; | |
| 483 m_pSndUList->m_pWindowCond = &m_WindowCond; | |
| 484 m_pSndUList->m_pTimer = m_pTimer; | |
| 485 | |
| 486 #ifndef WIN32 | |
| 487 if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)) | |
| 488 { | |
| 489 m_WorkerThread = 0; | |
| 490 throw CUDTException(3, 1); | |
| 491 } | |
| 492 #else | |
| 493 DWORD threadID; | |
| 494 m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &thread
ID); | |
| 495 if (NULL == m_WorkerThread) | |
| 496 throw CUDTException(3, 1); | |
| 497 #endif | |
| 498 } | |
| 499 | |
| 500 #ifndef WIN32 | |
| 501 void* CSndQueue::worker(void* param) | |
| 502 #else | |
| 503 DWORD WINAPI CSndQueue::worker(LPVOID param) | |
| 504 #endif | |
| 505 { | |
| 506 CSndQueue* self = (CSndQueue*)param; | |
| 507 | |
| 508 while (!self->m_bClosing) | |
| 509 { | |
| 510 uint64_t ts = self->m_pSndUList->getNextProcTime(); | |
| 511 | |
| 512 if (ts > 0) | |
| 513 { | |
| 514 // wait until next processing time of the first socket on the list | |
| 515 uint64_t currtime; | |
| 516 CTimer::rdtsc(currtime); | |
| 517 if (currtime < ts) | |
| 518 self->m_pTimer->sleepto(ts); | |
| 519 | |
| 520 // it is time to send the next pkt | |
| 521 sockaddr* addr; | |
| 522 CPacket pkt; | |
| 523 if (self->m_pSndUList->pop(addr, pkt) < 0) | |
| 524 continue; | |
| 525 | |
| 526 self->m_pChannel->sendto(addr, pkt); | |
| 527 } | |
| 528 else | |
| 529 { | |
| 530 // wait here if there is no sockets with data to be sent | |
| 531 #ifndef WIN32 | |
| 532 pthread_mutex_lock(&self->m_WindowLock); | |
| 533 if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0)) | |
| 534 pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock); | |
| 535 pthread_mutex_unlock(&self->m_WindowLock); | |
| 536 #else | |
| 537 WaitForSingleObject(self->m_WindowCond, INFINITE); | |
| 538 #endif | |
| 539 } | |
| 540 } | |
| 541 | |
| 542 #ifndef WIN32 | |
| 543 return NULL; | |
| 544 #else | |
| 545 SetEvent(self->m_ExitCond); | |
| 546 return 0; | |
| 547 #endif | |
| 548 } | |
| 549 | |
| 550 int CSndQueue::sendto(const sockaddr* addr, CPacket& packet) | |
| 551 { | |
| 552 // send out the packet immediately (high priority), this is a control packet | |
| 553 m_pChannel->sendto(addr, packet); | |
| 554 return packet.getLength(); | |
| 555 } | |
| 556 | |
| 557 | |
| 558 // | |
| 559 CRcvUList::CRcvUList(): | |
| 560 m_pUList(NULL), | |
| 561 m_pLast(NULL) | |
| 562 { | |
| 563 } | |
| 564 | |
| 565 CRcvUList::~CRcvUList() | |
| 566 { | |
| 567 } | |
| 568 | |
| 569 void CRcvUList::insert(const CUDT* u) | |
| 570 { | |
| 571 CRNode* n = u->m_pRNode; | |
| 572 CTimer::rdtsc(n->m_llTimeStamp); | |
| 573 | |
| 574 if (NULL == m_pUList) | |
| 575 { | |
| 576 // empty list, insert as the single node | |
| 577 n->m_pPrev = n->m_pNext = NULL; | |
| 578 m_pLast = m_pUList = n; | |
| 579 | |
| 580 return; | |
| 581 } | |
| 582 | |
| 583 // always insert at the end for RcvUList | |
| 584 n->m_pPrev = m_pLast; | |
| 585 n->m_pNext = NULL; | |
| 586 m_pLast->m_pNext = n; | |
| 587 m_pLast = n; | |
| 588 } | |
| 589 | |
| 590 void CRcvUList::remove(const CUDT* u) | |
| 591 { | |
| 592 CRNode* n = u->m_pRNode; | |
| 593 | |
| 594 if (!n->m_bOnList) | |
| 595 return; | |
| 596 | |
| 597 if (NULL == n->m_pPrev) | |
| 598 { | |
| 599 // n is the first node | |
| 600 m_pUList = n->m_pNext; | |
| 601 if (NULL == m_pUList) | |
| 602 m_pLast = NULL; | |
| 603 else | |
| 604 m_pUList->m_pPrev = NULL; | |
| 605 } | |
| 606 else | |
| 607 { | |
| 608 n->m_pPrev->m_pNext = n->m_pNext; | |
| 609 if (NULL == n->m_pNext) | |
| 610 { | |
| 611 // n is the last node | |
| 612 m_pLast = n->m_pPrev; | |
| 613 } | |
| 614 else | |
| 615 n->m_pNext->m_pPrev = n->m_pPrev; | |
| 616 } | |
| 617 | |
| 618 n->m_pNext = n->m_pPrev = NULL; | |
| 619 } | |
| 620 | |
| 621 void CRcvUList::update(const CUDT* u) | |
| 622 { | |
| 623 CRNode* n = u->m_pRNode; | |
| 624 | |
| 625 if (!n->m_bOnList) | |
| 626 return; | |
| 627 | |
| 628 CTimer::rdtsc(n->m_llTimeStamp); | |
| 629 | |
| 630 // if n is the last node, do not need to change | |
| 631 if (NULL == n->m_pNext) | |
| 632 return; | |
| 633 | |
| 634 if (NULL == n->m_pPrev) | |
| 635 { | |
| 636 m_pUList = n->m_pNext; | |
| 637 m_pUList->m_pPrev = NULL; | |
| 638 } | |
| 639 else | |
| 640 { | |
| 641 n->m_pPrev->m_pNext = n->m_pNext; | |
| 642 n->m_pNext->m_pPrev = n->m_pPrev; | |
| 643 } | |
| 644 | |
| 645 n->m_pPrev = m_pLast; | |
| 646 n->m_pNext = NULL; | |
| 647 m_pLast->m_pNext = n; | |
| 648 m_pLast = n; | |
| 649 } | |
| 650 | |
| 651 // | |
| 652 CHash::CHash(): | |
| 653 m_pBucket(NULL), | |
| 654 m_iHashSize(0) | |
| 655 { | |
| 656 } | |
| 657 | |
| 658 CHash::~CHash() | |
| 659 { | |
| 660 for (int i = 0; i < m_iHashSize; ++ i) | |
| 661 { | |
| 662 CBucket* b = m_pBucket[i]; | |
| 663 while (NULL != b) | |
| 664 { | |
| 665 CBucket* n = b->m_pNext; | |
| 666 delete b; | |
| 667 b = n; | |
| 668 } | |
| 669 } | |
| 670 | |
| 671 delete [] m_pBucket; | |
| 672 } | |
| 673 | |
| 674 void CHash::init(const int& size) | |
| 675 { | |
| 676 m_pBucket = new CBucket* [size]; | |
| 677 | |
| 678 for (int i = 0; i < size; ++ i) | |
| 679 m_pBucket[i] = NULL; | |
| 680 | |
| 681 m_iHashSize = size; | |
| 682 } | |
| 683 | |
| 684 CUDT* CHash::lookup(const int32_t& id) | |
| 685 { | |
| 686 // simple hash function (% hash table size); suitable for socket descriptors | |
| 687 CBucket* b = m_pBucket[id % m_iHashSize]; | |
| 688 | |
| 689 while (NULL != b) | |
| 690 { | |
| 691 if (id == b->m_iID) | |
| 692 return b->m_pUDT; | |
| 693 b = b->m_pNext; | |
| 694 } | |
| 695 | |
| 696 return NULL; | |
| 697 } | |
| 698 | |
| 699 void CHash::insert(const int32_t& id, const CUDT* u) | |
| 700 { | |
| 701 CBucket* b = m_pBucket[id % m_iHashSize]; | |
| 702 | |
| 703 CBucket* n = new CBucket; | |
| 704 n->m_iID = id; | |
| 705 n->m_pUDT = (CUDT*)u; | |
| 706 n->m_pNext = b; | |
| 707 | |
| 708 m_pBucket[id % m_iHashSize] = n; | |
| 709 } | |
| 710 | |
| 711 void CHash::remove(const int32_t& id) | |
| 712 { | |
| 713 CBucket* b = m_pBucket[id % m_iHashSize]; | |
| 714 CBucket* p = NULL; | |
| 715 | |
| 716 while (NULL != b) | |
| 717 { | |
| 718 if (id == b->m_iID) | |
| 719 { | |
| 720 if (NULL == p) | |
| 721 m_pBucket[id % m_iHashSize] = b->m_pNext; | |
| 722 else | |
| 723 p->m_pNext = b->m_pNext; | |
| 724 | |
| 725 delete b; | |
| 726 | |
| 727 return; | |
| 728 } | |
| 729 | |
| 730 p = b; | |
| 731 b = b->m_pNext; | |
| 732 } | |
| 733 } | |
| 734 | |
| 735 | |
| 736 // | |
| 737 CRendezvousQueue::CRendezvousQueue(): | |
| 738 m_vRendezvousID(), | |
| 739 m_RIDVectorLock() | |
| 740 { | |
| 741 #ifndef WIN32 | |
| 742 pthread_mutex_init(&m_RIDVectorLock, NULL); | |
| 743 #else | |
| 744 m_RIDVectorLock = CreateMutex(NULL, false, NULL); | |
| 745 #endif | |
| 746 } | |
| 747 | |
| 748 CRendezvousQueue::~CRendezvousQueue() | |
| 749 { | |
| 750 #ifndef WIN32 | |
| 751 pthread_mutex_destroy(&m_RIDVectorLock); | |
| 752 #else | |
| 753 CloseHandle(m_RIDVectorLock); | |
| 754 #endif | |
| 755 | |
| 756 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.
end(); ++ i) | |
| 757 { | |
| 758 if (AF_INET == i->m_iIPversion) | |
| 759 delete (sockaddr_in*)i->m_pPeerAddr; | |
| 760 else | |
| 761 delete (sockaddr_in6*)i->m_pPeerAddr; | |
| 762 } | |
| 763 | |
| 764 m_vRendezvousID.clear(); | |
| 765 } | |
| 766 | |
| 767 void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockadd
r* addr) | |
| 768 { | |
| 769 CGuard vg(m_RIDVectorLock); | |
| 770 | |
| 771 CRL r; | |
| 772 r.m_iID = id; | |
| 773 r.m_iIPversion = ipv; | |
| 774 r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)ne
w sockaddr_in6; | |
| 775 memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(s
ockaddr_in6)); | |
| 776 | |
| 777 m_vRendezvousID.insert(m_vRendezvousID.end(), r); | |
| 778 } | |
| 779 | |
| 780 void CRendezvousQueue::remove(const UDTSOCKET& id) | |
| 781 { | |
| 782 CGuard vg(m_RIDVectorLock); | |
| 783 | |
| 784 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.
end(); ++ i) | |
| 785 { | |
| 786 if (i->m_iID == id) | |
| 787 { | |
| 788 if (AF_INET == i->m_iIPversion) | |
| 789 delete (sockaddr_in*)i->m_pPeerAddr; | |
| 790 else | |
| 791 delete (sockaddr_in6*)i->m_pPeerAddr; | |
| 792 | |
| 793 m_vRendezvousID.erase(i); | |
| 794 | |
| 795 return; | |
| 796 } | |
| 797 } | |
| 798 } | |
| 799 | |
| 800 bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id) | |
| 801 { | |
| 802 CGuard vg(m_RIDVectorLock); | |
| 803 | |
| 804 for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.
end(); ++ i) | |
| 805 { | |
| 806 if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id)
|| (id == i->m_iID))) | |
| 807 { | |
| 808 id = i->m_iID; | |
| 809 return true; | |
| 810 } | |
| 811 } | |
| 812 | |
| 813 return false; | |
| 814 } | |
| 815 | |
| 816 | |
| 817 // | |
| 818 CRcvQueue::CRcvQueue(): | |
| 819 m_WorkerThread(), | |
| 820 m_UnitQueue(), | |
| 821 m_pRcvUList(NULL), | |
| 822 m_pHash(NULL), | |
| 823 m_pChannel(NULL), | |
| 824 m_pTimer(NULL), | |
| 825 m_iPayloadSize(), | |
| 826 m_bClosing(false), | |
| 827 m_ExitCond(), | |
| 828 m_LSLock(), | |
| 829 m_pListener(NULL), | |
| 830 m_pRendezvousQueue(NULL), | |
| 831 m_vNewEntry(), | |
| 832 m_IDLock(), | |
| 833 m_mBuffer(), | |
| 834 m_PassLock(), | |
| 835 m_PassCond() | |
| 836 { | |
| 837 #ifndef WIN32 | |
| 838 pthread_mutex_init(&m_PassLock, NULL); | |
| 839 pthread_cond_init(&m_PassCond, NULL); | |
| 840 pthread_mutex_init(&m_LSLock, NULL); | |
| 841 pthread_mutex_init(&m_IDLock, NULL); | |
| 842 #else | |
| 843 m_PassLock = CreateMutex(NULL, false, NULL); | |
| 844 m_PassCond = CreateEvent(NULL, false, false, NULL); | |
| 845 m_LSLock = CreateMutex(NULL, false, NULL); | |
| 846 m_IDLock = CreateMutex(NULL, false, NULL); | |
| 847 m_ExitCond = CreateEvent(NULL, false, false, NULL); | |
| 848 #endif | |
| 849 } | |
| 850 | |
| 851 CRcvQueue::~CRcvQueue() | |
| 852 { | |
| 853 m_bClosing = true; | |
| 854 | |
| 855 #ifndef WIN32 | |
| 856 if (0 != m_WorkerThread) | |
| 857 pthread_join(m_WorkerThread, NULL); | |
| 858 pthread_mutex_destroy(&m_PassLock); | |
| 859 pthread_cond_destroy(&m_PassCond); | |
| 860 pthread_mutex_destroy(&m_LSLock); | |
| 861 pthread_mutex_destroy(&m_IDLock); | |
| 862 #else | |
| 863 if (NULL != m_WorkerThread) | |
| 864 WaitForSingleObject(m_ExitCond, INFINITE); | |
| 865 CloseHandle(m_WorkerThread); | |
| 866 CloseHandle(m_PassLock); | |
| 867 CloseHandle(m_PassCond); | |
| 868 CloseHandle(m_LSLock); | |
| 869 CloseHandle(m_IDLock); | |
| 870 CloseHandle(m_ExitCond); | |
| 871 #endif | |
| 872 | |
| 873 delete m_pRcvUList; | |
| 874 delete m_pHash; | |
| 875 delete m_pRendezvousQueue; | |
| 876 | |
| 877 // remove all queued messages | |
| 878 for (map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_m
Buffer.end(); ++ i) | |
| 879 { | |
| 880 while (!i->second.empty()) | |
| 881 { | |
| 882 CPacket* pkt = i->second.front(); | |
| 883 delete [] pkt->m_pcData; | |
| 884 delete pkt; | |
| 885 i->second.pop(); | |
| 886 } | |
| 887 } | |
| 888 } | |
| 889 | |
| 890 void CRcvQueue::init(const int& qsize, const int& payload, const int& version, c
onst int& hsize, const CChannel* cc, const CTimer* t) | |
| 891 { | |
| 892 m_iPayloadSize = payload; | |
| 893 | |
| 894 m_UnitQueue.init(qsize, payload, version); | |
| 895 | |
| 896 m_pHash = new CHash; | |
| 897 m_pHash->init(hsize); | |
| 898 | |
| 899 m_pChannel = (CChannel*)cc; | |
| 900 m_pTimer = (CTimer*)t; | |
| 901 | |
| 902 m_pRcvUList = new CRcvUList; | |
| 903 m_pRendezvousQueue = new CRendezvousQueue; | |
| 904 | |
| 905 #ifndef WIN32 | |
| 906 if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this)) | |
| 907 { | |
| 908 m_WorkerThread = 0; | |
| 909 throw CUDTException(3, 1); | |
| 910 } | |
| 911 #else | |
| 912 DWORD threadID; | |
| 913 m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &thread
ID); | |
| 914 if (NULL == m_WorkerThread) | |
| 915 throw CUDTException(3, 1); | |
| 916 #endif | |
| 917 } | |
| 918 | |
| 919 #ifndef WIN32 | |
| 920 void* CRcvQueue::worker(void* param) | |
| 921 #else | |
| 922 DWORD WINAPI CRcvQueue::worker(LPVOID param) | |
| 923 #endif | |
| 924 { | |
| 925 CRcvQueue* self = (CRcvQueue*)param; | |
| 926 | |
| 927 sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) ne
w sockaddr_in : (sockaddr*) new sockaddr_in6; | |
| 928 CUDT* u = NULL; | |
| 929 int32_t id; | |
| 930 | |
| 931 while (!self->m_bClosing) | |
| 932 { | |
| 933 #ifdef NO_BUSY_WAITING | |
| 934 self->m_pTimer->tick(); | |
| 935 #endif | |
| 936 | |
| 937 // check waiting list, if new socket, insert it to the list | |
| 938 while (self->ifNewEntry()) | |
| 939 { | |
| 940 CUDT* ne = self->getNewEntry(); | |
| 941 if (NULL != ne) | |
| 942 { | |
| 943 self->m_pRcvUList->insert(ne); | |
| 944 self->m_pHash->insert(ne->m_SocketID, ne); | |
| 945 } | |
| 946 } | |
| 947 | |
| 948 // find next available slot for incoming packet | |
| 949 CUnit* unit = self->m_UnitQueue.getNextAvailUnit(); | |
| 950 if (NULL == unit) | |
| 951 { | |
| 952 // no space, skip this packet | |
| 953 CPacket temp; | |
| 954 temp.m_pcData = new char[self->m_iPayloadSize]; | |
| 955 temp.setLength(self->m_iPayloadSize); | |
| 956 self->m_pChannel->recvfrom(addr, temp); | |
| 957 delete [] temp.m_pcData; | |
| 958 goto TIMER_CHECK; | |
| 959 } | |
| 960 | |
| 961 unit->m_Packet.setLength(self->m_iPayloadSize); | |
| 962 | |
| 963 // reading next incoming packet, recvfrom returns -1 is nothing has been r
eceived | |
| 964 if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0) | |
| 965 goto TIMER_CHECK; | |
| 966 | |
| 967 id = unit->m_Packet.m_iID; | |
| 968 | |
| 969 // ID 0 is for connection request, which should be passed to the listening
socket or rendezvous sockets | |
| 970 if (0 == id) | |
| 971 { | |
| 972 if (NULL != self->m_pListener) | |
| 973 ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet); | |
| 974 else if (self->m_pRendezvousQueue->retrieve(addr, id)) | |
| 975 self->storePkt(id, unit->m_Packet.clone()); | |
| 976 } | |
| 977 else if (id > 0) | |
| 978 { | |
| 979 if (NULL != (u = self->m_pHash->lookup(id))) | |
| 980 { | |
| 981 if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) | |
| 982 { | |
| 983 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) | |
| 984 { | |
| 985 if (0 == unit->m_Packet.getFlag()) | |
| 986 u->processData(unit); | |
| 987 else | |
| 988 u->processCtrl(unit->m_Packet); | |
| 989 | |
| 990 u->checkTimers(); | |
| 991 self->m_pRcvUList->update(u); | |
| 992 } | |
| 993 } | |
| 994 } | |
| 995 else if (self->m_pRendezvousQueue->retrieve(addr, id)) | |
| 996 self->storePkt(id, unit->m_Packet.clone()); | |
| 997 } | |
| 998 | |
| 999 TIMER_CHECK: | |
| 1000 // take care of the timing event for all UDT sockets | |
| 1001 | |
| 1002 CRNode* ul = self->m_pRcvUList->m_pUList; | |
| 1003 uint64_t currtime; | |
| 1004 CTimer::rdtsc(currtime); | |
| 1005 uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency(); | |
| 1006 | |
| 1007 while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) | |
| 1008 { | |
| 1009 CUDT* u = ul->m_pUDT; | |
| 1010 | |
| 1011 if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) | |
| 1012 { | |
| 1013 u->checkTimers(); | |
| 1014 self->m_pRcvUList->update(u); | |
| 1015 } | |
| 1016 else | |
| 1017 { | |
| 1018 // the socket must be removed from Hash table first, then RcvUList | |
| 1019 self->m_pHash->remove(u->m_SocketID); | |
| 1020 self->m_pRcvUList->remove(u); | |
| 1021 u->m_pRNode->m_bOnList = false; | |
| 1022 } | |
| 1023 | |
| 1024 ul = self->m_pRcvUList->m_pUList; | |
| 1025 } | |
| 1026 } | |
| 1027 | |
| 1028 if (AF_INET == self->m_UnitQueue.m_iIPversion) | |
| 1029 delete (sockaddr_in*)addr; | |
| 1030 else | |
| 1031 delete (sockaddr_in6*)addr; | |
| 1032 | |
| 1033 #ifndef WIN32 | |
| 1034 return NULL; | |
| 1035 #else | |
| 1036 SetEvent(self->m_ExitCond); | |
| 1037 return 0; | |
| 1038 #endif | |
| 1039 } | |
| 1040 | |
| 1041 int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet) | |
| 1042 { | |
| 1043 CGuard bufferlock(m_PassLock); | |
| 1044 | |
| 1045 map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id); | |
| 1046 | |
| 1047 if (i == m_mBuffer.end()) | |
| 1048 { | |
| 1049 #ifndef WIN32 | |
| 1050 uint64_t now = CTimer::getTime(); | |
| 1051 timespec timeout; | |
| 1052 | |
| 1053 timeout.tv_sec = now / 1000000 + 1; | |
| 1054 timeout.tv_nsec = (now % 1000000) * 1000; | |
| 1055 | |
| 1056 pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout); | |
| 1057 #else | |
| 1058 ReleaseMutex(m_PassLock); | |
| 1059 WaitForSingleObject(m_PassCond, 1000); | |
| 1060 WaitForSingleObject(m_PassLock, INFINITE); | |
| 1061 #endif | |
| 1062 | |
| 1063 i = m_mBuffer.find(id); | |
| 1064 if (i == m_mBuffer.end()) | |
| 1065 { | |
| 1066 packet.setLength(-1); | |
| 1067 return -1; | |
| 1068 } | |
| 1069 } | |
| 1070 | |
| 1071 // retrieve the earliest packet | |
| 1072 CPacket* newpkt = i->second.front(); | |
| 1073 | |
| 1074 if (packet.getLength() < newpkt->getLength()) | |
| 1075 { | |
| 1076 packet.setLength(-1); | |
| 1077 return -1; | |
| 1078 } | |
| 1079 | |
| 1080 // copy packet content | |
| 1081 memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize); | |
| 1082 memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength()); | |
| 1083 packet.setLength(newpkt->getLength()); | |
| 1084 | |
| 1085 delete [] newpkt->m_pcData; | |
| 1086 delete newpkt; | |
| 1087 | |
| 1088 // remove this message from queue, | |
| 1089 // if no more messages left for this socket, release its data structure | |
| 1090 i->second.pop(); | |
| 1091 if (i->second.empty()) | |
| 1092 m_mBuffer.erase(i); | |
| 1093 | |
| 1094 return packet.getLength(); | |
| 1095 } | |
| 1096 | |
| 1097 int CRcvQueue::setListener(const CUDT* u) | |
| 1098 { | |
| 1099 CGuard lslock(m_LSLock); | |
| 1100 | |
| 1101 if (NULL != m_pListener) | |
| 1102 return -1; | |
| 1103 | |
| 1104 m_pListener = (CUDT*)u; | |
| 1105 return 1; | |
| 1106 } | |
| 1107 | |
| 1108 void CRcvQueue::removeListener(const CUDT* u) | |
| 1109 { | |
| 1110 CGuard lslock(m_LSLock); | |
| 1111 | |
| 1112 if (u == m_pListener) | |
| 1113 m_pListener = NULL; | |
| 1114 } | |
| 1115 | |
| 1116 void CRcvQueue::setNewEntry(CUDT* u) | |
| 1117 { | |
| 1118 CGuard listguard(m_IDLock); | |
| 1119 m_vNewEntry.insert(m_vNewEntry.end(), u); | |
| 1120 } | |
| 1121 | |
| 1122 bool CRcvQueue::ifNewEntry() | |
| 1123 { | |
| 1124 return !(m_vNewEntry.empty()); | |
| 1125 } | |
| 1126 | |
| 1127 CUDT* CRcvQueue::getNewEntry() | |
| 1128 { | |
| 1129 CGuard listguard(m_IDLock); | |
| 1130 | |
| 1131 if (m_vNewEntry.empty()) | |
| 1132 return NULL; | |
| 1133 | |
| 1134 CUDT* u = (CUDT*)*(m_vNewEntry.begin()); | |
| 1135 m_vNewEntry.erase(m_vNewEntry.begin()); | |
| 1136 | |
| 1137 return u; | |
| 1138 } | |
| 1139 | |
| 1140 void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt) | |
| 1141 { | |
| 1142 CGuard bufferlock(m_PassLock); | |
| 1143 | |
| 1144 map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id); | |
| 1145 | |
| 1146 if (i == m_mBuffer.end()) | |
| 1147 { | |
| 1148 m_mBuffer[id].push(pkt); | |
| 1149 | |
| 1150 #ifndef WIN32 | |
| 1151 pthread_cond_signal(&m_PassCond); | |
| 1152 #else | |
| 1153 SetEvent(m_PassCond); | |
| 1154 #endif | |
| 1155 } | |
| 1156 else | |
| 1157 { | |
| 1158 //avoid storing too many packets, in case of malfunction or attack | |
| 1159 if (i->second.size() > 16) | |
| 1160 return; | |
| 1161 | |
| 1162 i->second.push(pkt); | |
| 1163 } | |
| 1164 } | |
| OLD | NEW |