| Index: net/third_party/udt/src/queue.cpp
|
| ===================================================================
|
| --- net/third_party/udt/src/queue.cpp (revision 78992)
|
| +++ net/third_party/udt/src/queue.cpp (working copy)
|
| @@ -1,1164 +0,0 @@
|
| -/*****************************************************************************
|
| -Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
|
| -All rights reserved.
|
| -
|
| -Redistribution and use in source and binary forms, with or without
|
| -modification, are permitted provided that the following conditions are
|
| -met:
|
| -
|
| -* Redistributions of source code must retain the above
|
| - copyright notice, this list of conditions and the
|
| - following disclaimer.
|
| -
|
| -* Redistributions in binary form must reproduce the
|
| - above copyright notice, this list of conditions
|
| - and the following disclaimer in the documentation
|
| - and/or other materials provided with the distribution.
|
| -
|
| -* Neither the name of the University of Illinois
|
| - nor the names of its contributors may be used to
|
| - endorse or promote products derived from this
|
| - software without specific prior written permission.
|
| -
|
| -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
|
| -IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
| -THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
| -PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
|
| -CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
| -EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
| -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
| -PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
| -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
| -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
| -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| -*****************************************************************************/
|
| -
|
| -/*****************************************************************************
|
| -written by
|
| - Yunhong Gu, last updated 01/02/2011
|
| -*****************************************************************************/
|
| -
|
| -#ifdef WIN32
|
| - #include <winsock2.h>
|
| - #include <ws2tcpip.h>
|
| - #ifdef LEGACY_WIN32
|
| - #include <wspiapi.h>
|
| - #endif
|
| -#endif
|
| -
|
| -#include <cstring>
|
| -#include "common.h"
|
| -#include "queue.h"
|
| -#include "core.h"
|
| -
|
| -using namespace std;
|
| -
|
| -CUnitQueue::CUnitQueue():
|
| -m_pQEntry(NULL),
|
| -m_pCurrQueue(NULL),
|
| -m_pLastQueue(NULL),
|
| -m_iSize(0),
|
| -m_iCount(0),
|
| -m_iMSS(),
|
| -m_iIPversion()
|
| -{
|
| -}
|
| -
|
| -CUnitQueue::~CUnitQueue()
|
| -{
|
| - CQEntry* p = m_pQEntry;
|
| -
|
| - while (p != NULL)
|
| - {
|
| - delete [] p->m_pUnit;
|
| - delete [] p->m_pBuffer;
|
| -
|
| - CQEntry* q = p;
|
| - if (p == m_pLastQueue)
|
| - p = NULL;
|
| - else
|
| - p = p->m_pNext;
|
| - delete q;
|
| - }
|
| -}
|
| -
|
| -int CUnitQueue::init(const int& size, const int& mss, const int& version)
|
| -{
|
| - CQEntry* tempq = NULL;
|
| - CUnit* tempu = NULL;
|
| - char* tempb = NULL;
|
| -
|
| - try
|
| - {
|
| - tempq = new CQEntry;
|
| - tempu = new CUnit [size];
|
| - tempb = new char [size * mss];
|
| - }
|
| - catch (...)
|
| - {
|
| - delete tempq;
|
| - delete [] tempu;
|
| - delete [] tempb;
|
| -
|
| - return -1;
|
| - }
|
| -
|
| - for (int i = 0; i < size; ++ i)
|
| - {
|
| - tempu[i].m_iFlag = 0;
|
| - tempu[i].m_Packet.m_pcData = tempb + i * mss;
|
| - }
|
| - tempq->m_pUnit = tempu;
|
| - tempq->m_pBuffer = tempb;
|
| - tempq->m_iSize = size;
|
| -
|
| - m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
|
| - m_pQEntry->m_pNext = m_pQEntry;
|
| -
|
| - m_pAvailUnit = m_pCurrQueue->m_pUnit;
|
| -
|
| - m_iSize = size;
|
| - m_iMSS = mss;
|
| - m_iIPversion = version;
|
| -
|
| - return 0;
|
| -}
|
| -
|
| -int CUnitQueue::increase()
|
| -{
|
| - // adjust/correct m_iCount
|
| - int real_count = 0;
|
| - CQEntry* p = m_pQEntry;
|
| - while (p != NULL)
|
| - {
|
| - CUnit* u = p->m_pUnit;
|
| - for (CUnit* end = u + p->m_iSize; u != end; ++ u)
|
| - if (u->m_iFlag != 0)
|
| - ++ real_count;
|
| -
|
| - if (p == m_pLastQueue)
|
| - p = NULL;
|
| - else
|
| - p = p->m_pNext;
|
| - }
|
| - m_iCount = real_count;
|
| - if (double(m_iCount) / m_iSize < 0.9)
|
| - return -1;
|
| -
|
| - CQEntry* tempq = NULL;
|
| - CUnit* tempu = NULL;
|
| - char* tempb = NULL;
|
| -
|
| - // all queues have the same size
|
| - int size = m_pQEntry->m_iSize;
|
| -
|
| - try
|
| - {
|
| - tempq = new CQEntry;
|
| - tempu = new CUnit [size];
|
| - tempb = new char [size * m_iMSS];
|
| - }
|
| - catch (...)
|
| - {
|
| - delete tempq;
|
| - delete [] tempu;
|
| - delete [] tempb;
|
| -
|
| - return -1;
|
| - }
|
| -
|
| - for (int i = 0; i < size; ++ i)
|
| - {
|
| - tempu[i].m_iFlag = 0;
|
| - tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
|
| - }
|
| - tempq->m_pUnit = tempu;
|
| - tempq->m_pBuffer = tempb;
|
| - tempq->m_iSize = size;
|
| -
|
| - m_pLastQueue->m_pNext = tempq;
|
| - m_pLastQueue = tempq;
|
| - m_pLastQueue->m_pNext = m_pQEntry;
|
| -
|
| - m_iSize += size;
|
| -
|
| - return 0;
|
| -}
|
| -
|
| -int CUnitQueue::shrink()
|
| -{
|
| - // currently queue cannot be shrunk.
|
| - return -1;
|
| -}
|
| -
|
| -CUnit* CUnitQueue::getNextAvailUnit()
|
| -{
|
| - if (m_iCount * 10 > m_iSize * 9)
|
| - increase();
|
| -
|
| - if (m_iCount >= m_iSize)
|
| - return NULL;
|
| -
|
| - CQEntry* entrance = m_pCurrQueue;
|
| -
|
| - do
|
| - {
|
| - for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
|
| - if (m_pAvailUnit->m_iFlag == 0)
|
| - return m_pAvailUnit;
|
| -
|
| - if (m_pCurrQueue->m_pUnit->m_iFlag == 0)
|
| - {
|
| - m_pAvailUnit = m_pCurrQueue->m_pUnit;
|
| - return m_pAvailUnit;
|
| - }
|
| -
|
| - m_pCurrQueue = m_pCurrQueue->m_pNext;
|
| - m_pAvailUnit = m_pCurrQueue->m_pUnit;
|
| - } while (m_pCurrQueue != entrance);
|
| -
|
| - increase();
|
| -
|
| - return NULL;
|
| -}
|
| -
|
| -
|
| -CSndUList::CSndUList():
|
| -m_pHeap(NULL),
|
| -m_iArrayLength(4096),
|
| -m_iLastEntry(-1),
|
| -m_ListLock(),
|
| -m_pWindowLock(NULL),
|
| -m_pWindowCond(NULL),
|
| -m_pTimer(NULL)
|
| -{
|
| - m_pHeap = new CSNode*[m_iArrayLength];
|
| -
|
| - #ifndef WIN32
|
| - pthread_mutex_init(&m_ListLock, NULL);
|
| - #else
|
| - m_ListLock = CreateMutex(NULL, false, NULL);
|
| - #endif
|
| -}
|
| -
|
| -CSndUList::~CSndUList()
|
| -{
|
| - delete [] m_pHeap;
|
| -
|
| - #ifndef WIN32
|
| - pthread_mutex_destroy(&m_ListLock);
|
| - #else
|
| - CloseHandle(m_ListLock);
|
| - #endif
|
| -}
|
| -
|
| -void CSndUList::insert(const int64_t& ts, const CUDT* u)
|
| -{
|
| - CGuard listguard(m_ListLock);
|
| -
|
| - // increase the heap array size if necessary
|
| - if (m_iLastEntry == m_iArrayLength - 1)
|
| - {
|
| - CSNode** temp = NULL;
|
| -
|
| - try
|
| - {
|
| - temp = new CSNode*[m_iArrayLength * 2];
|
| - }
|
| - catch(...)
|
| - {
|
| - return;
|
| - }
|
| -
|
| - memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
|
| - m_iArrayLength *= 2;
|
| - delete [] m_pHeap;
|
| - m_pHeap = temp;
|
| - }
|
| -
|
| - insert_(ts, u);
|
| -}
|
| -
|
| -void CSndUList::update(const CUDT* u, const bool& reschedule)
|
| -{
|
| - CGuard listguard(m_ListLock);
|
| -
|
| - CSNode* n = u->m_pSNode;
|
| -
|
| - if (n->m_iHeapLoc >= 0)
|
| - {
|
| - if (!reschedule)
|
| - return;
|
| -
|
| - if (n->m_iHeapLoc == 0)
|
| - {
|
| - n->m_llTimeStamp = 1;
|
| - m_pTimer->interrupt();
|
| - return;
|
| - }
|
| -
|
| - remove_(u);
|
| - }
|
| -
|
| - insert_(1, u);
|
| -}
|
| -
|
| -int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
|
| -{
|
| - CGuard listguard(m_ListLock);
|
| -
|
| - if (-1 == m_iLastEntry)
|
| - return -1;
|
| -
|
| - CUDT* u = m_pHeap[0]->m_pUDT;
|
| - remove_(u);
|
| -
|
| - if (!u->m_bConnected || u->m_bBroken)
|
| - return -1;
|
| -
|
| - // pack a packet from the socket
|
| - uint64_t ts;
|
| - if (u->packData(pkt, ts) <= 0)
|
| - return -1;
|
| -
|
| - addr = u->m_pPeerAddr;
|
| -
|
| - // insert a new entry, ts is the next processing time
|
| - if (ts > 0)
|
| - insert_(ts, u);
|
| -
|
| - return 1;
|
| -}
|
| -
|
| -void CSndUList::remove(const CUDT* u)
|
| -{
|
| - CGuard listguard(m_ListLock);
|
| -
|
| - remove_(u);
|
| -}
|
| -
|
| -uint64_t CSndUList::getNextProcTime()
|
| -{
|
| - CGuard listguard(m_ListLock);
|
| -
|
| - if (-1 == m_iLastEntry)
|
| - return 0;
|
| -
|
| - return m_pHeap[0]->m_llTimeStamp;
|
| -}
|
| -
|
| -void CSndUList::insert_(const int64_t& ts, const CUDT* u)
|
| -{
|
| - CSNode* n = u->m_pSNode;
|
| -
|
| - // do not insert repeated node
|
| - if (n->m_iHeapLoc >= 0)
|
| - return;
|
| -
|
| - m_iLastEntry ++;
|
| - m_pHeap[m_iLastEntry] = n;
|
| - n->m_llTimeStamp = ts;
|
| -
|
| - int q = m_iLastEntry;
|
| - int p = q;
|
| - while (p != 0)
|
| - {
|
| - p = (q - 1) >> 1;
|
| - if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp)
|
| - {
|
| - CSNode* t = m_pHeap[p];
|
| - m_pHeap[p] = m_pHeap[q];
|
| - m_pHeap[q] = t;
|
| - t->m_iHeapLoc = q;
|
| - q = p;
|
| - }
|
| - else
|
| - break;
|
| - }
|
| -
|
| - n->m_iHeapLoc = q;
|
| -
|
| - // first entry, activate the sending queue
|
| - if (0 == m_iLastEntry)
|
| - {
|
| - #ifndef WIN32
|
| - pthread_mutex_lock(m_pWindowLock);
|
| - pthread_cond_signal(m_pWindowCond);
|
| - pthread_mutex_unlock(m_pWindowLock);
|
| - #else
|
| - SetEvent(*m_pWindowCond);
|
| - #endif
|
| - }
|
| -}
|
| -
|
| -void CSndUList::remove_(const CUDT* u)
|
| -{
|
| - CSNode* n = u->m_pSNode;
|
| -
|
| - if (n->m_iHeapLoc >= 0)
|
| - {
|
| - // remove the node from heap
|
| - m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
|
| - m_iLastEntry --;
|
| - m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
|
| -
|
| - int q = n->m_iHeapLoc;
|
| - int p = q * 2 + 1;
|
| - while (p <= m_iLastEntry)
|
| - {
|
| - if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
|
| - p ++;
|
| -
|
| - if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
|
| - {
|
| - CSNode* t = m_pHeap[p];
|
| - m_pHeap[p] = m_pHeap[q];
|
| - m_pHeap[p]->m_iHeapLoc = p;
|
| - m_pHeap[q] = t;
|
| - m_pHeap[q]->m_iHeapLoc = q;
|
| -
|
| - q = p;
|
| - p = q * 2 + 1;
|
| - }
|
| - else
|
| - break;
|
| - }
|
| -
|
| - n->m_iHeapLoc = -1;
|
| - }
|
| -}
|
| -
|
| -//
|
| -CSndQueue::CSndQueue():
|
| -m_WorkerThread(),
|
| -m_pSndUList(NULL),
|
| -m_pChannel(NULL),
|
| -m_pTimer(NULL),
|
| -m_WindowLock(),
|
| -m_WindowCond(),
|
| -m_bClosing(false),
|
| -m_ExitCond()
|
| -{
|
| - #ifndef WIN32
|
| - pthread_cond_init(&m_WindowCond, NULL);
|
| - pthread_mutex_init(&m_WindowLock, NULL);
|
| - #else
|
| - m_WindowLock = CreateMutex(NULL, false, NULL);
|
| - m_WindowCond = CreateEvent(NULL, false, false, NULL);
|
| - m_ExitCond = CreateEvent(NULL, false, false, NULL);
|
| - #endif
|
| -}
|
| -
|
| -CSndQueue::~CSndQueue()
|
| -{
|
| - m_bClosing = true;
|
| -
|
| - #ifndef WIN32
|
| - pthread_mutex_lock(&m_WindowLock);
|
| - pthread_cond_signal(&m_WindowCond);
|
| - pthread_mutex_unlock(&m_WindowLock);
|
| - if (0 != m_WorkerThread)
|
| - pthread_join(m_WorkerThread, NULL);
|
| - pthread_cond_destroy(&m_WindowCond);
|
| - pthread_mutex_destroy(&m_WindowLock);
|
| - #else
|
| - SetEvent(m_WindowCond);
|
| - if (NULL != m_WorkerThread)
|
| - WaitForSingleObject(m_ExitCond, INFINITE);
|
| - CloseHandle(m_WorkerThread);
|
| - CloseHandle(m_WindowLock);
|
| - CloseHandle(m_WindowCond);
|
| - CloseHandle(m_ExitCond);
|
| - #endif
|
| -
|
| - delete m_pSndUList;
|
| -}
|
| -
|
| -void CSndQueue::init(const CChannel* c, const CTimer* t)
|
| -{
|
| - m_pChannel = (CChannel*)c;
|
| - m_pTimer = (CTimer*)t;
|
| - m_pSndUList = new CSndUList;
|
| - m_pSndUList->m_pWindowLock = &m_WindowLock;
|
| - m_pSndUList->m_pWindowCond = &m_WindowCond;
|
| - m_pSndUList->m_pTimer = m_pTimer;
|
| -
|
| - #ifndef WIN32
|
| - if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
|
| - {
|
| - m_WorkerThread = 0;
|
| - throw CUDTException(3, 1);
|
| - }
|
| - #else
|
| - DWORD threadID;
|
| - m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
|
| - if (NULL == m_WorkerThread)
|
| - throw CUDTException(3, 1);
|
| - #endif
|
| -}
|
| -
|
| -#ifndef WIN32
|
| - void* CSndQueue::worker(void* param)
|
| -#else
|
| - DWORD WINAPI CSndQueue::worker(LPVOID param)
|
| -#endif
|
| -{
|
| - CSndQueue* self = (CSndQueue*)param;
|
| -
|
| - while (!self->m_bClosing)
|
| - {
|
| - uint64_t ts = self->m_pSndUList->getNextProcTime();
|
| -
|
| - if (ts > 0)
|
| - {
|
| - // wait until next processing time of the first socket on the list
|
| - uint64_t currtime;
|
| - CTimer::rdtsc(currtime);
|
| - if (currtime < ts)
|
| - self->m_pTimer->sleepto(ts);
|
| -
|
| - // it is time to send the next pkt
|
| - sockaddr* addr;
|
| - CPacket pkt;
|
| - if (self->m_pSndUList->pop(addr, pkt) < 0)
|
| - continue;
|
| -
|
| - self->m_pChannel->sendto(addr, pkt);
|
| - }
|
| - else
|
| - {
|
| - // wait here if there is no sockets with data to be sent
|
| - #ifndef WIN32
|
| - pthread_mutex_lock(&self->m_WindowLock);
|
| - if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
|
| - pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
|
| - pthread_mutex_unlock(&self->m_WindowLock);
|
| - #else
|
| - WaitForSingleObject(self->m_WindowCond, INFINITE);
|
| - #endif
|
| - }
|
| - }
|
| -
|
| - #ifndef WIN32
|
| - return NULL;
|
| - #else
|
| - SetEvent(self->m_ExitCond);
|
| - return 0;
|
| - #endif
|
| -}
|
| -
|
| -int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
|
| -{
|
| - // send out the packet immediately (high priority), this is a control packet
|
| - m_pChannel->sendto(addr, packet);
|
| - return packet.getLength();
|
| -}
|
| -
|
| -
|
| -//
|
| -CRcvUList::CRcvUList():
|
| -m_pUList(NULL),
|
| -m_pLast(NULL)
|
| -{
|
| -}
|
| -
|
| -CRcvUList::~CRcvUList()
|
| -{
|
| -}
|
| -
|
| -void CRcvUList::insert(const CUDT* u)
|
| -{
|
| - CRNode* n = u->m_pRNode;
|
| - CTimer::rdtsc(n->m_llTimeStamp);
|
| -
|
| - if (NULL == m_pUList)
|
| - {
|
| - // empty list, insert as the single node
|
| - n->m_pPrev = n->m_pNext = NULL;
|
| - m_pLast = m_pUList = n;
|
| -
|
| - return;
|
| - }
|
| -
|
| - // always insert at the end for RcvUList
|
| - n->m_pPrev = m_pLast;
|
| - n->m_pNext = NULL;
|
| - m_pLast->m_pNext = n;
|
| - m_pLast = n;
|
| -}
|
| -
|
| -void CRcvUList::remove(const CUDT* u)
|
| -{
|
| - CRNode* n = u->m_pRNode;
|
| -
|
| - if (!n->m_bOnList)
|
| - return;
|
| -
|
| - if (NULL == n->m_pPrev)
|
| - {
|
| - // n is the first node
|
| - m_pUList = n->m_pNext;
|
| - if (NULL == m_pUList)
|
| - m_pLast = NULL;
|
| - else
|
| - m_pUList->m_pPrev = NULL;
|
| - }
|
| - else
|
| - {
|
| - n->m_pPrev->m_pNext = n->m_pNext;
|
| - if (NULL == n->m_pNext)
|
| - {
|
| - // n is the last node
|
| - m_pLast = n->m_pPrev;
|
| - }
|
| - else
|
| - n->m_pNext->m_pPrev = n->m_pPrev;
|
| - }
|
| -
|
| - n->m_pNext = n->m_pPrev = NULL;
|
| -}
|
| -
|
| -void CRcvUList::update(const CUDT* u)
|
| -{
|
| - CRNode* n = u->m_pRNode;
|
| -
|
| - if (!n->m_bOnList)
|
| - return;
|
| -
|
| - CTimer::rdtsc(n->m_llTimeStamp);
|
| -
|
| - // if n is the last node, do not need to change
|
| - if (NULL == n->m_pNext)
|
| - return;
|
| -
|
| - if (NULL == n->m_pPrev)
|
| - {
|
| - m_pUList = n->m_pNext;
|
| - m_pUList->m_pPrev = NULL;
|
| - }
|
| - else
|
| - {
|
| - n->m_pPrev->m_pNext = n->m_pNext;
|
| - n->m_pNext->m_pPrev = n->m_pPrev;
|
| - }
|
| -
|
| - n->m_pPrev = m_pLast;
|
| - n->m_pNext = NULL;
|
| - m_pLast->m_pNext = n;
|
| - m_pLast = n;
|
| -}
|
| -
|
| -//
|
| -CHash::CHash():
|
| -m_pBucket(NULL),
|
| -m_iHashSize(0)
|
| -{
|
| -}
|
| -
|
| -CHash::~CHash()
|
| -{
|
| - for (int i = 0; i < m_iHashSize; ++ i)
|
| - {
|
| - CBucket* b = m_pBucket[i];
|
| - while (NULL != b)
|
| - {
|
| - CBucket* n = b->m_pNext;
|
| - delete b;
|
| - b = n;
|
| - }
|
| - }
|
| -
|
| - delete [] m_pBucket;
|
| -}
|
| -
|
| -void CHash::init(const int& size)
|
| -{
|
| - m_pBucket = new CBucket* [size];
|
| -
|
| - for (int i = 0; i < size; ++ i)
|
| - m_pBucket[i] = NULL;
|
| -
|
| - m_iHashSize = size;
|
| -}
|
| -
|
| -CUDT* CHash::lookup(const int32_t& id)
|
| -{
|
| - // simple hash function (% hash table size); suitable for socket descriptors
|
| - CBucket* b = m_pBucket[id % m_iHashSize];
|
| -
|
| - while (NULL != b)
|
| - {
|
| - if (id == b->m_iID)
|
| - return b->m_pUDT;
|
| - b = b->m_pNext;
|
| - }
|
| -
|
| - return NULL;
|
| -}
|
| -
|
| -void CHash::insert(const int32_t& id, const CUDT* u)
|
| -{
|
| - CBucket* b = m_pBucket[id % m_iHashSize];
|
| -
|
| - CBucket* n = new CBucket;
|
| - n->m_iID = id;
|
| - n->m_pUDT = (CUDT*)u;
|
| - n->m_pNext = b;
|
| -
|
| - m_pBucket[id % m_iHashSize] = n;
|
| -}
|
| -
|
| -void CHash::remove(const int32_t& id)
|
| -{
|
| - CBucket* b = m_pBucket[id % m_iHashSize];
|
| - CBucket* p = NULL;
|
| -
|
| - while (NULL != b)
|
| - {
|
| - if (id == b->m_iID)
|
| - {
|
| - if (NULL == p)
|
| - m_pBucket[id % m_iHashSize] = b->m_pNext;
|
| - else
|
| - p->m_pNext = b->m_pNext;
|
| -
|
| - delete b;
|
| -
|
| - return;
|
| - }
|
| -
|
| - p = b;
|
| - b = b->m_pNext;
|
| - }
|
| -}
|
| -
|
| -
|
| -//
|
| -CRendezvousQueue::CRendezvousQueue():
|
| -m_vRendezvousID(),
|
| -m_RIDVectorLock()
|
| -{
|
| - #ifndef WIN32
|
| - pthread_mutex_init(&m_RIDVectorLock, NULL);
|
| - #else
|
| - m_RIDVectorLock = CreateMutex(NULL, false, NULL);
|
| - #endif
|
| -}
|
| -
|
| -CRendezvousQueue::~CRendezvousQueue()
|
| -{
|
| - #ifndef WIN32
|
| - pthread_mutex_destroy(&m_RIDVectorLock);
|
| - #else
|
| - CloseHandle(m_RIDVectorLock);
|
| - #endif
|
| -
|
| - for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)
|
| - {
|
| - if (AF_INET == i->m_iIPversion)
|
| - delete (sockaddr_in*)i->m_pPeerAddr;
|
| - else
|
| - delete (sockaddr_in6*)i->m_pPeerAddr;
|
| - }
|
| -
|
| - m_vRendezvousID.clear();
|
| -}
|
| -
|
| -void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockaddr* addr)
|
| -{
|
| - CGuard vg(m_RIDVectorLock);
|
| -
|
| - CRL r;
|
| - r.m_iID = id;
|
| - r.m_iIPversion = ipv;
|
| - r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
|
| - memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
|
| -
|
| - m_vRendezvousID.insert(m_vRendezvousID.end(), r);
|
| -}
|
| -
|
| -void CRendezvousQueue::remove(const UDTSOCKET& id)
|
| -{
|
| - CGuard vg(m_RIDVectorLock);
|
| -
|
| - for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)
|
| - {
|
| - if (i->m_iID == id)
|
| - {
|
| - if (AF_INET == i->m_iIPversion)
|
| - delete (sockaddr_in*)i->m_pPeerAddr;
|
| - else
|
| - delete (sockaddr_in6*)i->m_pPeerAddr;
|
| -
|
| - m_vRendezvousID.erase(i);
|
| -
|
| - return;
|
| - }
|
| - }
|
| -}
|
| -
|
| -bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id)
|
| -{
|
| - CGuard vg(m_RIDVectorLock);
|
| -
|
| - for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)
|
| - {
|
| - if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID)))
|
| - {
|
| - id = i->m_iID;
|
| - return true;
|
| - }
|
| - }
|
| -
|
| - return false;
|
| -}
|
| -
|
| -
|
| -//
|
| -CRcvQueue::CRcvQueue():
|
| -m_WorkerThread(),
|
| -m_UnitQueue(),
|
| -m_pRcvUList(NULL),
|
| -m_pHash(NULL),
|
| -m_pChannel(NULL),
|
| -m_pTimer(NULL),
|
| -m_iPayloadSize(),
|
| -m_bClosing(false),
|
| -m_ExitCond(),
|
| -m_LSLock(),
|
| -m_pListener(NULL),
|
| -m_pRendezvousQueue(NULL),
|
| -m_vNewEntry(),
|
| -m_IDLock(),
|
| -m_mBuffer(),
|
| -m_PassLock(),
|
| -m_PassCond()
|
| -{
|
| - #ifndef WIN32
|
| - pthread_mutex_init(&m_PassLock, NULL);
|
| - pthread_cond_init(&m_PassCond, NULL);
|
| - pthread_mutex_init(&m_LSLock, NULL);
|
| - pthread_mutex_init(&m_IDLock, NULL);
|
| - #else
|
| - m_PassLock = CreateMutex(NULL, false, NULL);
|
| - m_PassCond = CreateEvent(NULL, false, false, NULL);
|
| - m_LSLock = CreateMutex(NULL, false, NULL);
|
| - m_IDLock = CreateMutex(NULL, false, NULL);
|
| - m_ExitCond = CreateEvent(NULL, false, false, NULL);
|
| - #endif
|
| -}
|
| -
|
| -CRcvQueue::~CRcvQueue()
|
| -{
|
| - m_bClosing = true;
|
| -
|
| - #ifndef WIN32
|
| - if (0 != m_WorkerThread)
|
| - pthread_join(m_WorkerThread, NULL);
|
| - pthread_mutex_destroy(&m_PassLock);
|
| - pthread_cond_destroy(&m_PassCond);
|
| - pthread_mutex_destroy(&m_LSLock);
|
| - pthread_mutex_destroy(&m_IDLock);
|
| - #else
|
| - if (NULL != m_WorkerThread)
|
| - WaitForSingleObject(m_ExitCond, INFINITE);
|
| - CloseHandle(m_WorkerThread);
|
| - CloseHandle(m_PassLock);
|
| - CloseHandle(m_PassCond);
|
| - CloseHandle(m_LSLock);
|
| - CloseHandle(m_IDLock);
|
| - CloseHandle(m_ExitCond);
|
| - #endif
|
| -
|
| - delete m_pRcvUList;
|
| - delete m_pHash;
|
| - delete m_pRendezvousQueue;
|
| -
|
| - // remove all queued messages
|
| - for (map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
|
| - {
|
| - while (!i->second.empty())
|
| - {
|
| - CPacket* pkt = i->second.front();
|
| - delete [] pkt->m_pcData;
|
| - delete pkt;
|
| - i->second.pop();
|
| - }
|
| - }
|
| -}
|
| -
|
| -void CRcvQueue::init(const int& qsize, const int& payload, const int& version, const int& hsize, const CChannel* cc, const CTimer* t)
|
| -{
|
| - m_iPayloadSize = payload;
|
| -
|
| - m_UnitQueue.init(qsize, payload, version);
|
| -
|
| - m_pHash = new CHash;
|
| - m_pHash->init(hsize);
|
| -
|
| - m_pChannel = (CChannel*)cc;
|
| - m_pTimer = (CTimer*)t;
|
| -
|
| - m_pRcvUList = new CRcvUList;
|
| - m_pRendezvousQueue = new CRendezvousQueue;
|
| -
|
| - #ifndef WIN32
|
| - if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
|
| - {
|
| - m_WorkerThread = 0;
|
| - throw CUDTException(3, 1);
|
| - }
|
| - #else
|
| - DWORD threadID;
|
| - m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
|
| - if (NULL == m_WorkerThread)
|
| - throw CUDTException(3, 1);
|
| - #endif
|
| -}
|
| -
|
| -#ifndef WIN32
|
| - void* CRcvQueue::worker(void* param)
|
| -#else
|
| - DWORD WINAPI CRcvQueue::worker(LPVOID param)
|
| -#endif
|
| -{
|
| - CRcvQueue* self = (CRcvQueue*)param;
|
| -
|
| - sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
|
| - CUDT* u = NULL;
|
| - int32_t id;
|
| -
|
| - while (!self->m_bClosing)
|
| - {
|
| - #ifdef NO_BUSY_WAITING
|
| - self->m_pTimer->tick();
|
| - #endif
|
| -
|
| - // check waiting list, if new socket, insert it to the list
|
| - while (self->ifNewEntry())
|
| - {
|
| - CUDT* ne = self->getNewEntry();
|
| - if (NULL != ne)
|
| - {
|
| - self->m_pRcvUList->insert(ne);
|
| - self->m_pHash->insert(ne->m_SocketID, ne);
|
| - }
|
| - }
|
| -
|
| - // find next available slot for incoming packet
|
| - CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
|
| - if (NULL == unit)
|
| - {
|
| - // no space, skip this packet
|
| - CPacket temp;
|
| - temp.m_pcData = new char[self->m_iPayloadSize];
|
| - temp.setLength(self->m_iPayloadSize);
|
| - self->m_pChannel->recvfrom(addr, temp);
|
| - delete [] temp.m_pcData;
|
| - goto TIMER_CHECK;
|
| - }
|
| -
|
| - unit->m_Packet.setLength(self->m_iPayloadSize);
|
| -
|
| - // reading next incoming packet, recvfrom returns -1 is nothing has been received
|
| - if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
|
| - goto TIMER_CHECK;
|
| -
|
| - id = unit->m_Packet.m_iID;
|
| -
|
| - // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
|
| - if (0 == id)
|
| - {
|
| - if (NULL != self->m_pListener)
|
| - ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet);
|
| - else if (self->m_pRendezvousQueue->retrieve(addr, id))
|
| - self->storePkt(id, unit->m_Packet.clone());
|
| - }
|
| - else if (id > 0)
|
| - {
|
| - if (NULL != (u = self->m_pHash->lookup(id)))
|
| - {
|
| - if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
|
| - {
|
| - if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
|
| - {
|
| - if (0 == unit->m_Packet.getFlag())
|
| - u->processData(unit);
|
| - else
|
| - u->processCtrl(unit->m_Packet);
|
| -
|
| - u->checkTimers();
|
| - self->m_pRcvUList->update(u);
|
| - }
|
| - }
|
| - }
|
| - else if (self->m_pRendezvousQueue->retrieve(addr, id))
|
| - self->storePkt(id, unit->m_Packet.clone());
|
| - }
|
| -
|
| -TIMER_CHECK:
|
| - // take care of the timing event for all UDT sockets
|
| -
|
| - CRNode* ul = self->m_pRcvUList->m_pUList;
|
| - uint64_t currtime;
|
| - CTimer::rdtsc(currtime);
|
| - uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
|
| -
|
| - while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
|
| - {
|
| - CUDT* u = ul->m_pUDT;
|
| -
|
| - if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
|
| - {
|
| - u->checkTimers();
|
| - self->m_pRcvUList->update(u);
|
| - }
|
| - else
|
| - {
|
| - // the socket must be removed from Hash table first, then RcvUList
|
| - self->m_pHash->remove(u->m_SocketID);
|
| - self->m_pRcvUList->remove(u);
|
| - u->m_pRNode->m_bOnList = false;
|
| - }
|
| -
|
| - ul = self->m_pRcvUList->m_pUList;
|
| - }
|
| - }
|
| -
|
| - if (AF_INET == self->m_UnitQueue.m_iIPversion)
|
| - delete (sockaddr_in*)addr;
|
| - else
|
| - delete (sockaddr_in6*)addr;
|
| -
|
| - #ifndef WIN32
|
| - return NULL;
|
| - #else
|
| - SetEvent(self->m_ExitCond);
|
| - return 0;
|
| - #endif
|
| -}
|
| -
|
| -int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet)
|
| -{
|
| - CGuard bufferlock(m_PassLock);
|
| -
|
| - map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id);
|
| -
|
| - if (i == m_mBuffer.end())
|
| - {
|
| - #ifndef WIN32
|
| - uint64_t now = CTimer::getTime();
|
| - timespec timeout;
|
| -
|
| - timeout.tv_sec = now / 1000000 + 1;
|
| - timeout.tv_nsec = (now % 1000000) * 1000;
|
| -
|
| - pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);
|
| - #else
|
| - ReleaseMutex(m_PassLock);
|
| - WaitForSingleObject(m_PassCond, 1000);
|
| - WaitForSingleObject(m_PassLock, INFINITE);
|
| - #endif
|
| -
|
| - i = m_mBuffer.find(id);
|
| - if (i == m_mBuffer.end())
|
| - {
|
| - packet.setLength(-1);
|
| - return -1;
|
| - }
|
| - }
|
| -
|
| - // retrieve the earliest packet
|
| - CPacket* newpkt = i->second.front();
|
| -
|
| - if (packet.getLength() < newpkt->getLength())
|
| - {
|
| - packet.setLength(-1);
|
| - return -1;
|
| - }
|
| -
|
| - // copy packet content
|
| - memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
|
| - memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
|
| - packet.setLength(newpkt->getLength());
|
| -
|
| - delete [] newpkt->m_pcData;
|
| - delete newpkt;
|
| -
|
| - // remove this message from queue,
|
| - // if no more messages left for this socket, release its data structure
|
| - i->second.pop();
|
| - if (i->second.empty())
|
| - m_mBuffer.erase(i);
|
| -
|
| - return packet.getLength();
|
| -}
|
| -
|
| -int CRcvQueue::setListener(const CUDT* u)
|
| -{
|
| - CGuard lslock(m_LSLock);
|
| -
|
| - if (NULL != m_pListener)
|
| - return -1;
|
| -
|
| - m_pListener = (CUDT*)u;
|
| - return 1;
|
| -}
|
| -
|
| -void CRcvQueue::removeListener(const CUDT* u)
|
| -{
|
| - CGuard lslock(m_LSLock);
|
| -
|
| - if (u == m_pListener)
|
| - m_pListener = NULL;
|
| -}
|
| -
|
| -void CRcvQueue::setNewEntry(CUDT* u)
|
| -{
|
| - CGuard listguard(m_IDLock);
|
| - m_vNewEntry.insert(m_vNewEntry.end(), u);
|
| -}
|
| -
|
| -bool CRcvQueue::ifNewEntry()
|
| -{
|
| - return !(m_vNewEntry.empty());
|
| -}
|
| -
|
| -CUDT* CRcvQueue::getNewEntry()
|
| -{
|
| - CGuard listguard(m_IDLock);
|
| -
|
| - if (m_vNewEntry.empty())
|
| - return NULL;
|
| -
|
| - CUDT* u = (CUDT*)*(m_vNewEntry.begin());
|
| - m_vNewEntry.erase(m_vNewEntry.begin());
|
| -
|
| - return u;
|
| -}
|
| -
|
| -void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt)
|
| -{
|
| - CGuard bufferlock(m_PassLock);
|
| -
|
| - map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id);
|
| -
|
| - if (i == m_mBuffer.end())
|
| - {
|
| - m_mBuffer[id].push(pkt);
|
| -
|
| - #ifndef WIN32
|
| - pthread_cond_signal(&m_PassCond);
|
| - #else
|
| - SetEvent(m_PassCond);
|
| - #endif
|
| - }
|
| - else
|
| - {
|
| - //avoid storing too many packets, in case of malfunction or attack
|
| - if (i->second.size() > 16)
|
| - return;
|
| -
|
| - i->second.push(pkt);
|
| - }
|
| -}
|
|
|