Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(780)

Unified Diff: net/third_party/udt/src/queue.cpp

Issue 6708091: Remove UDT. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/third_party/udt/src/queue.h ('k') | net/third_party/udt/src/udt.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/third_party/udt/src/queue.cpp
===================================================================
--- net/third_party/udt/src/queue.cpp (revision 78992)
+++ net/third_party/udt/src/queue.cpp (working copy)
@@ -1,1164 +0,0 @@
-/*****************************************************************************
-Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
-* Redistributions of source code must retain the above
- copyright notice, this list of conditions and the
- following disclaimer.
-
-* Redistributions in binary form must reproduce the
- above copyright notice, this list of conditions
- and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-* Neither the name of the University of Illinois
- nor the names of its contributors may be used to
- endorse or promote products derived from this
- software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
-IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
-THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-*****************************************************************************/
-
-/*****************************************************************************
-written by
- Yunhong Gu, last updated 01/02/2011
-*****************************************************************************/
-
-#ifdef WIN32
- #include <winsock2.h>
- #include <ws2tcpip.h>
- #ifdef LEGACY_WIN32
- #include <wspiapi.h>
- #endif
-#endif
-
-#include <cstring>
-#include "common.h"
-#include "queue.h"
-#include "core.h"
-
-using namespace std;
-
-CUnitQueue::CUnitQueue():
-m_pQEntry(NULL),
-m_pCurrQueue(NULL),
-m_pLastQueue(NULL),
-m_iSize(0),
-m_iCount(0),
-m_iMSS(),
-m_iIPversion()
-{
-}
-
-CUnitQueue::~CUnitQueue()
-{
- CQEntry* p = m_pQEntry;
-
- while (p != NULL)
- {
- delete [] p->m_pUnit;
- delete [] p->m_pBuffer;
-
- CQEntry* q = p;
- if (p == m_pLastQueue)
- p = NULL;
- else
- p = p->m_pNext;
- delete q;
- }
-}
-
-int CUnitQueue::init(const int& size, const int& mss, const int& version)
-{
- CQEntry* tempq = NULL;
- CUnit* tempu = NULL;
- char* tempb = NULL;
-
- try
- {
- tempq = new CQEntry;
- tempu = new CUnit [size];
- tempb = new char [size * mss];
- }
- catch (...)
- {
- delete tempq;
- delete [] tempu;
- delete [] tempb;
-
- return -1;
- }
-
- for (int i = 0; i < size; ++ i)
- {
- tempu[i].m_iFlag = 0;
- tempu[i].m_Packet.m_pcData = tempb + i * mss;
- }
- tempq->m_pUnit = tempu;
- tempq->m_pBuffer = tempb;
- tempq->m_iSize = size;
-
- m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;
- m_pQEntry->m_pNext = m_pQEntry;
-
- m_pAvailUnit = m_pCurrQueue->m_pUnit;
-
- m_iSize = size;
- m_iMSS = mss;
- m_iIPversion = version;
-
- return 0;
-}
-
-int CUnitQueue::increase()
-{
- // adjust/correct m_iCount
- int real_count = 0;
- CQEntry* p = m_pQEntry;
- while (p != NULL)
- {
- CUnit* u = p->m_pUnit;
- for (CUnit* end = u + p->m_iSize; u != end; ++ u)
- if (u->m_iFlag != 0)
- ++ real_count;
-
- if (p == m_pLastQueue)
- p = NULL;
- else
- p = p->m_pNext;
- }
- m_iCount = real_count;
- if (double(m_iCount) / m_iSize < 0.9)
- return -1;
-
- CQEntry* tempq = NULL;
- CUnit* tempu = NULL;
- char* tempb = NULL;
-
- // all queues have the same size
- int size = m_pQEntry->m_iSize;
-
- try
- {
- tempq = new CQEntry;
- tempu = new CUnit [size];
- tempb = new char [size * m_iMSS];
- }
- catch (...)
- {
- delete tempq;
- delete [] tempu;
- delete [] tempb;
-
- return -1;
- }
-
- for (int i = 0; i < size; ++ i)
- {
- tempu[i].m_iFlag = 0;
- tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
- }
- tempq->m_pUnit = tempu;
- tempq->m_pBuffer = tempb;
- tempq->m_iSize = size;
-
- m_pLastQueue->m_pNext = tempq;
- m_pLastQueue = tempq;
- m_pLastQueue->m_pNext = m_pQEntry;
-
- m_iSize += size;
-
- return 0;
-}
-
-int CUnitQueue::shrink()
-{
- // currently queue cannot be shrunk.
- return -1;
-}
-
-CUnit* CUnitQueue::getNextAvailUnit()
-{
- if (m_iCount * 10 > m_iSize * 9)
- increase();
-
- if (m_iCount >= m_iSize)
- return NULL;
-
- CQEntry* entrance = m_pCurrQueue;
-
- do
- {
- for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
- if (m_pAvailUnit->m_iFlag == 0)
- return m_pAvailUnit;
-
- if (m_pCurrQueue->m_pUnit->m_iFlag == 0)
- {
- m_pAvailUnit = m_pCurrQueue->m_pUnit;
- return m_pAvailUnit;
- }
-
- m_pCurrQueue = m_pCurrQueue->m_pNext;
- m_pAvailUnit = m_pCurrQueue->m_pUnit;
- } while (m_pCurrQueue != entrance);
-
- increase();
-
- return NULL;
-}
-
-
-CSndUList::CSndUList():
-m_pHeap(NULL),
-m_iArrayLength(4096),
-m_iLastEntry(-1),
-m_ListLock(),
-m_pWindowLock(NULL),
-m_pWindowCond(NULL),
-m_pTimer(NULL)
-{
- m_pHeap = new CSNode*[m_iArrayLength];
-
- #ifndef WIN32
- pthread_mutex_init(&m_ListLock, NULL);
- #else
- m_ListLock = CreateMutex(NULL, false, NULL);
- #endif
-}
-
-CSndUList::~CSndUList()
-{
- delete [] m_pHeap;
-
- #ifndef WIN32
- pthread_mutex_destroy(&m_ListLock);
- #else
- CloseHandle(m_ListLock);
- #endif
-}
-
-void CSndUList::insert(const int64_t& ts, const CUDT* u)
-{
- CGuard listguard(m_ListLock);
-
- // increase the heap array size if necessary
- if (m_iLastEntry == m_iArrayLength - 1)
- {
- CSNode** temp = NULL;
-
- try
- {
- temp = new CSNode*[m_iArrayLength * 2];
- }
- catch(...)
- {
- return;
- }
-
- memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
- m_iArrayLength *= 2;
- delete [] m_pHeap;
- m_pHeap = temp;
- }
-
- insert_(ts, u);
-}
-
-void CSndUList::update(const CUDT* u, const bool& reschedule)
-{
- CGuard listguard(m_ListLock);
-
- CSNode* n = u->m_pSNode;
-
- if (n->m_iHeapLoc >= 0)
- {
- if (!reschedule)
- return;
-
- if (n->m_iHeapLoc == 0)
- {
- n->m_llTimeStamp = 1;
- m_pTimer->interrupt();
- return;
- }
-
- remove_(u);
- }
-
- insert_(1, u);
-}
-
-int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
-{
- CGuard listguard(m_ListLock);
-
- if (-1 == m_iLastEntry)
- return -1;
-
- CUDT* u = m_pHeap[0]->m_pUDT;
- remove_(u);
-
- if (!u->m_bConnected || u->m_bBroken)
- return -1;
-
- // pack a packet from the socket
- uint64_t ts;
- if (u->packData(pkt, ts) <= 0)
- return -1;
-
- addr = u->m_pPeerAddr;
-
- // insert a new entry, ts is the next processing time
- if (ts > 0)
- insert_(ts, u);
-
- return 1;
-}
-
-void CSndUList::remove(const CUDT* u)
-{
- CGuard listguard(m_ListLock);
-
- remove_(u);
-}
-
-uint64_t CSndUList::getNextProcTime()
-{
- CGuard listguard(m_ListLock);
-
- if (-1 == m_iLastEntry)
- return 0;
-
- return m_pHeap[0]->m_llTimeStamp;
-}
-
-void CSndUList::insert_(const int64_t& ts, const CUDT* u)
-{
- CSNode* n = u->m_pSNode;
-
- // do not insert repeated node
- if (n->m_iHeapLoc >= 0)
- return;
-
- m_iLastEntry ++;
- m_pHeap[m_iLastEntry] = n;
- n->m_llTimeStamp = ts;
-
- int q = m_iLastEntry;
- int p = q;
- while (p != 0)
- {
- p = (q - 1) >> 1;
- if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp)
- {
- CSNode* t = m_pHeap[p];
- m_pHeap[p] = m_pHeap[q];
- m_pHeap[q] = t;
- t->m_iHeapLoc = q;
- q = p;
- }
- else
- break;
- }
-
- n->m_iHeapLoc = q;
-
- // first entry, activate the sending queue
- if (0 == m_iLastEntry)
- {
- #ifndef WIN32
- pthread_mutex_lock(m_pWindowLock);
- pthread_cond_signal(m_pWindowCond);
- pthread_mutex_unlock(m_pWindowLock);
- #else
- SetEvent(*m_pWindowCond);
- #endif
- }
-}
-
-void CSndUList::remove_(const CUDT* u)
-{
- CSNode* n = u->m_pSNode;
-
- if (n->m_iHeapLoc >= 0)
- {
- // remove the node from heap
- m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
- m_iLastEntry --;
- m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
-
- int q = n->m_iHeapLoc;
- int p = q * 2 + 1;
- while (p <= m_iLastEntry)
- {
- if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
- p ++;
-
- if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
- {
- CSNode* t = m_pHeap[p];
- m_pHeap[p] = m_pHeap[q];
- m_pHeap[p]->m_iHeapLoc = p;
- m_pHeap[q] = t;
- m_pHeap[q]->m_iHeapLoc = q;
-
- q = p;
- p = q * 2 + 1;
- }
- else
- break;
- }
-
- n->m_iHeapLoc = -1;
- }
-}
-
-//
-CSndQueue::CSndQueue():
-m_WorkerThread(),
-m_pSndUList(NULL),
-m_pChannel(NULL),
-m_pTimer(NULL),
-m_WindowLock(),
-m_WindowCond(),
-m_bClosing(false),
-m_ExitCond()
-{
- #ifndef WIN32
- pthread_cond_init(&m_WindowCond, NULL);
- pthread_mutex_init(&m_WindowLock, NULL);
- #else
- m_WindowLock = CreateMutex(NULL, false, NULL);
- m_WindowCond = CreateEvent(NULL, false, false, NULL);
- m_ExitCond = CreateEvent(NULL, false, false, NULL);
- #endif
-}
-
-CSndQueue::~CSndQueue()
-{
- m_bClosing = true;
-
- #ifndef WIN32
- pthread_mutex_lock(&m_WindowLock);
- pthread_cond_signal(&m_WindowCond);
- pthread_mutex_unlock(&m_WindowLock);
- if (0 != m_WorkerThread)
- pthread_join(m_WorkerThread, NULL);
- pthread_cond_destroy(&m_WindowCond);
- pthread_mutex_destroy(&m_WindowLock);
- #else
- SetEvent(m_WindowCond);
- if (NULL != m_WorkerThread)
- WaitForSingleObject(m_ExitCond, INFINITE);
- CloseHandle(m_WorkerThread);
- CloseHandle(m_WindowLock);
- CloseHandle(m_WindowCond);
- CloseHandle(m_ExitCond);
- #endif
-
- delete m_pSndUList;
-}
-
-void CSndQueue::init(const CChannel* c, const CTimer* t)
-{
- m_pChannel = (CChannel*)c;
- m_pTimer = (CTimer*)t;
- m_pSndUList = new CSndUList;
- m_pSndUList->m_pWindowLock = &m_WindowLock;
- m_pSndUList->m_pWindowCond = &m_WindowCond;
- m_pSndUList->m_pTimer = m_pTimer;
-
- #ifndef WIN32
- if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
- {
- m_WorkerThread = 0;
- throw CUDTException(3, 1);
- }
- #else
- DWORD threadID;
- m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
- if (NULL == m_WorkerThread)
- throw CUDTException(3, 1);
- #endif
-}
-
-#ifndef WIN32
- void* CSndQueue::worker(void* param)
-#else
- DWORD WINAPI CSndQueue::worker(LPVOID param)
-#endif
-{
- CSndQueue* self = (CSndQueue*)param;
-
- while (!self->m_bClosing)
- {
- uint64_t ts = self->m_pSndUList->getNextProcTime();
-
- if (ts > 0)
- {
- // wait until next processing time of the first socket on the list
- uint64_t currtime;
- CTimer::rdtsc(currtime);
- if (currtime < ts)
- self->m_pTimer->sleepto(ts);
-
- // it is time to send the next pkt
- sockaddr* addr;
- CPacket pkt;
- if (self->m_pSndUList->pop(addr, pkt) < 0)
- continue;
-
- self->m_pChannel->sendto(addr, pkt);
- }
- else
- {
- // wait here if there is no sockets with data to be sent
- #ifndef WIN32
- pthread_mutex_lock(&self->m_WindowLock);
- if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
- pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
- pthread_mutex_unlock(&self->m_WindowLock);
- #else
- WaitForSingleObject(self->m_WindowCond, INFINITE);
- #endif
- }
- }
-
- #ifndef WIN32
- return NULL;
- #else
- SetEvent(self->m_ExitCond);
- return 0;
- #endif
-}
-
-int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
-{
- // send out the packet immediately (high priority), this is a control packet
- m_pChannel->sendto(addr, packet);
- return packet.getLength();
-}
-
-
-//
-CRcvUList::CRcvUList():
-m_pUList(NULL),
-m_pLast(NULL)
-{
-}
-
-CRcvUList::~CRcvUList()
-{
-}
-
-void CRcvUList::insert(const CUDT* u)
-{
- CRNode* n = u->m_pRNode;
- CTimer::rdtsc(n->m_llTimeStamp);
-
- if (NULL == m_pUList)
- {
- // empty list, insert as the single node
- n->m_pPrev = n->m_pNext = NULL;
- m_pLast = m_pUList = n;
-
- return;
- }
-
- // always insert at the end for RcvUList
- n->m_pPrev = m_pLast;
- n->m_pNext = NULL;
- m_pLast->m_pNext = n;
- m_pLast = n;
-}
-
-void CRcvUList::remove(const CUDT* u)
-{
- CRNode* n = u->m_pRNode;
-
- if (!n->m_bOnList)
- return;
-
- if (NULL == n->m_pPrev)
- {
- // n is the first node
- m_pUList = n->m_pNext;
- if (NULL == m_pUList)
- m_pLast = NULL;
- else
- m_pUList->m_pPrev = NULL;
- }
- else
- {
- n->m_pPrev->m_pNext = n->m_pNext;
- if (NULL == n->m_pNext)
- {
- // n is the last node
- m_pLast = n->m_pPrev;
- }
- else
- n->m_pNext->m_pPrev = n->m_pPrev;
- }
-
- n->m_pNext = n->m_pPrev = NULL;
-}
-
-void CRcvUList::update(const CUDT* u)
-{
- CRNode* n = u->m_pRNode;
-
- if (!n->m_bOnList)
- return;
-
- CTimer::rdtsc(n->m_llTimeStamp);
-
- // if n is the last node, do not need to change
- if (NULL == n->m_pNext)
- return;
-
- if (NULL == n->m_pPrev)
- {
- m_pUList = n->m_pNext;
- m_pUList->m_pPrev = NULL;
- }
- else
- {
- n->m_pPrev->m_pNext = n->m_pNext;
- n->m_pNext->m_pPrev = n->m_pPrev;
- }
-
- n->m_pPrev = m_pLast;
- n->m_pNext = NULL;
- m_pLast->m_pNext = n;
- m_pLast = n;
-}
-
-//
-CHash::CHash():
-m_pBucket(NULL),
-m_iHashSize(0)
-{
-}
-
-CHash::~CHash()
-{
- for (int i = 0; i < m_iHashSize; ++ i)
- {
- CBucket* b = m_pBucket[i];
- while (NULL != b)
- {
- CBucket* n = b->m_pNext;
- delete b;
- b = n;
- }
- }
-
- delete [] m_pBucket;
-}
-
-void CHash::init(const int& size)
-{
- m_pBucket = new CBucket* [size];
-
- for (int i = 0; i < size; ++ i)
- m_pBucket[i] = NULL;
-
- m_iHashSize = size;
-}
-
-CUDT* CHash::lookup(const int32_t& id)
-{
- // simple hash function (% hash table size); suitable for socket descriptors
- CBucket* b = m_pBucket[id % m_iHashSize];
-
- while (NULL != b)
- {
- if (id == b->m_iID)
- return b->m_pUDT;
- b = b->m_pNext;
- }
-
- return NULL;
-}
-
-void CHash::insert(const int32_t& id, const CUDT* u)
-{
- CBucket* b = m_pBucket[id % m_iHashSize];
-
- CBucket* n = new CBucket;
- n->m_iID = id;
- n->m_pUDT = (CUDT*)u;
- n->m_pNext = b;
-
- m_pBucket[id % m_iHashSize] = n;
-}
-
-void CHash::remove(const int32_t& id)
-{
- CBucket* b = m_pBucket[id % m_iHashSize];
- CBucket* p = NULL;
-
- while (NULL != b)
- {
- if (id == b->m_iID)
- {
- if (NULL == p)
- m_pBucket[id % m_iHashSize] = b->m_pNext;
- else
- p->m_pNext = b->m_pNext;
-
- delete b;
-
- return;
- }
-
- p = b;
- b = b->m_pNext;
- }
-}
-
-
-//
-CRendezvousQueue::CRendezvousQueue():
-m_vRendezvousID(),
-m_RIDVectorLock()
-{
- #ifndef WIN32
- pthread_mutex_init(&m_RIDVectorLock, NULL);
- #else
- m_RIDVectorLock = CreateMutex(NULL, false, NULL);
- #endif
-}
-
-CRendezvousQueue::~CRendezvousQueue()
-{
- #ifndef WIN32
- pthread_mutex_destroy(&m_RIDVectorLock);
- #else
- CloseHandle(m_RIDVectorLock);
- #endif
-
- for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)
- {
- if (AF_INET == i->m_iIPversion)
- delete (sockaddr_in*)i->m_pPeerAddr;
- else
- delete (sockaddr_in6*)i->m_pPeerAddr;
- }
-
- m_vRendezvousID.clear();
-}
-
-void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockaddr* addr)
-{
- CGuard vg(m_RIDVectorLock);
-
- CRL r;
- r.m_iID = id;
- r.m_iIPversion = ipv;
- r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
- memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
-
- m_vRendezvousID.insert(m_vRendezvousID.end(), r);
-}
-
-void CRendezvousQueue::remove(const UDTSOCKET& id)
-{
- CGuard vg(m_RIDVectorLock);
-
- for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)
- {
- if (i->m_iID == id)
- {
- if (AF_INET == i->m_iIPversion)
- delete (sockaddr_in*)i->m_pPeerAddr;
- else
- delete (sockaddr_in6*)i->m_pPeerAddr;
-
- m_vRendezvousID.erase(i);
-
- return;
- }
- }
-}
-
-bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id)
-{
- CGuard vg(m_RIDVectorLock);
-
- for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)
- {
- if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID)))
- {
- id = i->m_iID;
- return true;
- }
- }
-
- return false;
-}
-
-
-//
-CRcvQueue::CRcvQueue():
-m_WorkerThread(),
-m_UnitQueue(),
-m_pRcvUList(NULL),
-m_pHash(NULL),
-m_pChannel(NULL),
-m_pTimer(NULL),
-m_iPayloadSize(),
-m_bClosing(false),
-m_ExitCond(),
-m_LSLock(),
-m_pListener(NULL),
-m_pRendezvousQueue(NULL),
-m_vNewEntry(),
-m_IDLock(),
-m_mBuffer(),
-m_PassLock(),
-m_PassCond()
-{
- #ifndef WIN32
- pthread_mutex_init(&m_PassLock, NULL);
- pthread_cond_init(&m_PassCond, NULL);
- pthread_mutex_init(&m_LSLock, NULL);
- pthread_mutex_init(&m_IDLock, NULL);
- #else
- m_PassLock = CreateMutex(NULL, false, NULL);
- m_PassCond = CreateEvent(NULL, false, false, NULL);
- m_LSLock = CreateMutex(NULL, false, NULL);
- m_IDLock = CreateMutex(NULL, false, NULL);
- m_ExitCond = CreateEvent(NULL, false, false, NULL);
- #endif
-}
-
-CRcvQueue::~CRcvQueue()
-{
- m_bClosing = true;
-
- #ifndef WIN32
- if (0 != m_WorkerThread)
- pthread_join(m_WorkerThread, NULL);
- pthread_mutex_destroy(&m_PassLock);
- pthread_cond_destroy(&m_PassCond);
- pthread_mutex_destroy(&m_LSLock);
- pthread_mutex_destroy(&m_IDLock);
- #else
- if (NULL != m_WorkerThread)
- WaitForSingleObject(m_ExitCond, INFINITE);
- CloseHandle(m_WorkerThread);
- CloseHandle(m_PassLock);
- CloseHandle(m_PassCond);
- CloseHandle(m_LSLock);
- CloseHandle(m_IDLock);
- CloseHandle(m_ExitCond);
- #endif
-
- delete m_pRcvUList;
- delete m_pHash;
- delete m_pRendezvousQueue;
-
- // remove all queued messages
- for (map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
- {
- while (!i->second.empty())
- {
- CPacket* pkt = i->second.front();
- delete [] pkt->m_pcData;
- delete pkt;
- i->second.pop();
- }
- }
-}
-
-void CRcvQueue::init(const int& qsize, const int& payload, const int& version, const int& hsize, const CChannel* cc, const CTimer* t)
-{
- m_iPayloadSize = payload;
-
- m_UnitQueue.init(qsize, payload, version);
-
- m_pHash = new CHash;
- m_pHash->init(hsize);
-
- m_pChannel = (CChannel*)cc;
- m_pTimer = (CTimer*)t;
-
- m_pRcvUList = new CRcvUList;
- m_pRendezvousQueue = new CRendezvousQueue;
-
- #ifndef WIN32
- if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
- {
- m_WorkerThread = 0;
- throw CUDTException(3, 1);
- }
- #else
- DWORD threadID;
- m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
- if (NULL == m_WorkerThread)
- throw CUDTException(3, 1);
- #endif
-}
-
-#ifndef WIN32
- void* CRcvQueue::worker(void* param)
-#else
- DWORD WINAPI CRcvQueue::worker(LPVOID param)
-#endif
-{
- CRcvQueue* self = (CRcvQueue*)param;
-
- sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
- CUDT* u = NULL;
- int32_t id;
-
- while (!self->m_bClosing)
- {
- #ifdef NO_BUSY_WAITING
- self->m_pTimer->tick();
- #endif
-
- // check waiting list, if new socket, insert it to the list
- while (self->ifNewEntry())
- {
- CUDT* ne = self->getNewEntry();
- if (NULL != ne)
- {
- self->m_pRcvUList->insert(ne);
- self->m_pHash->insert(ne->m_SocketID, ne);
- }
- }
-
- // find next available slot for incoming packet
- CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
- if (NULL == unit)
- {
- // no space, skip this packet
- CPacket temp;
- temp.m_pcData = new char[self->m_iPayloadSize];
- temp.setLength(self->m_iPayloadSize);
- self->m_pChannel->recvfrom(addr, temp);
- delete [] temp.m_pcData;
- goto TIMER_CHECK;
- }
-
- unit->m_Packet.setLength(self->m_iPayloadSize);
-
- // reading next incoming packet, recvfrom returns -1 is nothing has been received
- if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
- goto TIMER_CHECK;
-
- id = unit->m_Packet.m_iID;
-
- // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
- if (0 == id)
- {
- if (NULL != self->m_pListener)
- ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet);
- else if (self->m_pRendezvousQueue->retrieve(addr, id))
- self->storePkt(id, unit->m_Packet.clone());
- }
- else if (id > 0)
- {
- if (NULL != (u = self->m_pHash->lookup(id)))
- {
- if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
- {
- if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
- {
- if (0 == unit->m_Packet.getFlag())
- u->processData(unit);
- else
- u->processCtrl(unit->m_Packet);
-
- u->checkTimers();
- self->m_pRcvUList->update(u);
- }
- }
- }
- else if (self->m_pRendezvousQueue->retrieve(addr, id))
- self->storePkt(id, unit->m_Packet.clone());
- }
-
-TIMER_CHECK:
- // take care of the timing event for all UDT sockets
-
- CRNode* ul = self->m_pRcvUList->m_pUList;
- uint64_t currtime;
- CTimer::rdtsc(currtime);
- uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
-
- while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
- {
- CUDT* u = ul->m_pUDT;
-
- if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
- {
- u->checkTimers();
- self->m_pRcvUList->update(u);
- }
- else
- {
- // the socket must be removed from Hash table first, then RcvUList
- self->m_pHash->remove(u->m_SocketID);
- self->m_pRcvUList->remove(u);
- u->m_pRNode->m_bOnList = false;
- }
-
- ul = self->m_pRcvUList->m_pUList;
- }
- }
-
- if (AF_INET == self->m_UnitQueue.m_iIPversion)
- delete (sockaddr_in*)addr;
- else
- delete (sockaddr_in6*)addr;
-
- #ifndef WIN32
- return NULL;
- #else
- SetEvent(self->m_ExitCond);
- return 0;
- #endif
-}
-
-int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet)
-{
- CGuard bufferlock(m_PassLock);
-
- map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id);
-
- if (i == m_mBuffer.end())
- {
- #ifndef WIN32
- uint64_t now = CTimer::getTime();
- timespec timeout;
-
- timeout.tv_sec = now / 1000000 + 1;
- timeout.tv_nsec = (now % 1000000) * 1000;
-
- pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);
- #else
- ReleaseMutex(m_PassLock);
- WaitForSingleObject(m_PassCond, 1000);
- WaitForSingleObject(m_PassLock, INFINITE);
- #endif
-
- i = m_mBuffer.find(id);
- if (i == m_mBuffer.end())
- {
- packet.setLength(-1);
- return -1;
- }
- }
-
- // retrieve the earliest packet
- CPacket* newpkt = i->second.front();
-
- if (packet.getLength() < newpkt->getLength())
- {
- packet.setLength(-1);
- return -1;
- }
-
- // copy packet content
- memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
- memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
- packet.setLength(newpkt->getLength());
-
- delete [] newpkt->m_pcData;
- delete newpkt;
-
- // remove this message from queue,
- // if no more messages left for this socket, release its data structure
- i->second.pop();
- if (i->second.empty())
- m_mBuffer.erase(i);
-
- return packet.getLength();
-}
-
-int CRcvQueue::setListener(const CUDT* u)
-{
- CGuard lslock(m_LSLock);
-
- if (NULL != m_pListener)
- return -1;
-
- m_pListener = (CUDT*)u;
- return 1;
-}
-
-void CRcvQueue::removeListener(const CUDT* u)
-{
- CGuard lslock(m_LSLock);
-
- if (u == m_pListener)
- m_pListener = NULL;
-}
-
-void CRcvQueue::setNewEntry(CUDT* u)
-{
- CGuard listguard(m_IDLock);
- m_vNewEntry.insert(m_vNewEntry.end(), u);
-}
-
-bool CRcvQueue::ifNewEntry()
-{
- return !(m_vNewEntry.empty());
-}
-
-CUDT* CRcvQueue::getNewEntry()
-{
- CGuard listguard(m_IDLock);
-
- if (m_vNewEntry.empty())
- return NULL;
-
- CUDT* u = (CUDT*)*(m_vNewEntry.begin());
- m_vNewEntry.erase(m_vNewEntry.begin());
-
- return u;
-}
-
-void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt)
-{
- CGuard bufferlock(m_PassLock);
-
- map<int32_t, queue<CPacket*> >::iterator i = m_mBuffer.find(id);
-
- if (i == m_mBuffer.end())
- {
- m_mBuffer[id].push(pkt);
-
- #ifndef WIN32
- pthread_cond_signal(&m_PassCond);
- #else
- SetEvent(m_PassCond);
- #endif
- }
- else
- {
- //avoid storing too many packets, in case of malfunction or attack
- if (i->second.size() > 16)
- return;
-
- i->second.push(pkt);
- }
-}
« no previous file with comments | « net/third_party/udt/src/queue.h ('k') | net/third_party/udt/src/udt.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698