Index: net/third_party/udt/src/core.cpp |
=================================================================== |
--- net/third_party/udt/src/core.cpp (revision 78992) |
+++ net/third_party/udt/src/core.cpp (working copy) |
@@ -1,2612 +0,0 @@ |
-/***************************************************************************** |
-Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois. |
-All rights reserved. |
- |
-Redistribution and use in source and binary forms, with or without |
-modification, are permitted provided that the following conditions are |
-met: |
- |
-* Redistributions of source code must retain the above |
- copyright notice, this list of conditions and the |
- following disclaimer. |
- |
-* Redistributions in binary form must reproduce the |
- above copyright notice, this list of conditions |
- and the following disclaimer in the documentation |
- and/or other materials provided with the distribution. |
- |
-* Neither the name of the University of Illinois |
- nor the names of its contributors may be used to |
- endorse or promote products derived from this |
- software without specific prior written permission. |
- |
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
-IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, |
-THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
-PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
-CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
-EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
-PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
-NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
-*****************************************************************************/ |
- |
-/***************************************************************************** |
-written by |
- Yunhong Gu, last updated 01/22/2011 |
-*****************************************************************************/ |
- |
-#ifndef WIN32 |
- #include <unistd.h> |
- #include <netdb.h> |
- #include <arpa/inet.h> |
- #include <cerrno> |
- #include <cstring> |
- #include <cstdlib> |
-#else |
- #include <winsock2.h> |
- #include <ws2tcpip.h> |
- #ifdef LEGACY_WIN32 |
- #include <wspiapi.h> |
- #endif |
-#endif |
-#include <cmath> |
-#include "queue.h" |
-#include "core.h" |
- |
-using namespace std; |
- |
- |
-CUDTUnited CUDT::s_UDTUnited; |
- |
-const UDTSOCKET CUDT::INVALID_SOCK = -1; |
-const int CUDT::ERROR = -1; |
- |
-const UDTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK; |
-const int UDT::ERROR = CUDT::ERROR; |
- |
-const int32_t CSeqNo::m_iSeqNoTH = 0x3FFFFFFF; |
-const int32_t CSeqNo::m_iMaxSeqNo = 0x7FFFFFFF; |
-const int32_t CAckNo::m_iMaxAckSeqNo = 0x7FFFFFFF; |
-const int32_t CMsgNo::m_iMsgNoTH = 0xFFFFFFF; |
-const int32_t CMsgNo::m_iMaxMsgNo = 0x1FFFFFFF; |
- |
-const int CUDT::m_iVersion = 4; |
-const int CUDT::m_iSYNInterval = 10000; |
-const int CUDT::m_iSelfClockInterval = 64; |
- |
- |
-CUDT::CUDT() |
-{ |
- m_pSndBuffer = NULL; |
- m_pRcvBuffer = NULL; |
- m_pSndLossList = NULL; |
- m_pRcvLossList = NULL; |
- m_pACKWindow = NULL; |
- m_pSndTimeWindow = NULL; |
- m_pRcvTimeWindow = NULL; |
- |
- m_pSndQueue = NULL; |
- m_pRcvQueue = NULL; |
- m_pPeerAddr = NULL; |
- m_pSNode = NULL; |
- m_pRNode = NULL; |
- |
- // Initilize mutex and condition variables |
- initSynch(); |
- |
- // Default UDT configurations |
- m_iMSS = 1500; |
- m_bSynSending = true; |
- m_bSynRecving = true; |
- m_iFlightFlagSize = 25600; |
- m_iSndBufSize = 8192; |
- m_iRcvBufSize = 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size |
- m_Linger.l_onoff = 1; |
- m_Linger.l_linger = 180; |
- m_iUDPSndBufSize = 65536; |
- m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS; |
- m_iSockType = UDT_STREAM; |
- m_iIPversion = AF_INET; |
- m_bRendezvous = false; |
- m_iSndTimeOut = -1; |
- m_iRcvTimeOut = -1; |
- m_bReuseAddr = true; |
- m_llMaxBW = -1; |
- |
- m_pCCFactory = new CCCFactory<CUDTCC>; |
- m_pCC = NULL; |
- m_pCache = NULL; |
- |
- // Initial status |
- m_bOpened = false; |
- m_bListening = false; |
- m_bConnected = false; |
- m_bClosing = false; |
- m_bShutdown = false; |
- m_bBroken = false; |
- m_bPeerHealth = true; |
- m_ullLingerExpiration = 0; |
-} |
- |
-CUDT::CUDT(const CUDT& ancestor) |
-{ |
- m_pSndBuffer = NULL; |
- m_pRcvBuffer = NULL; |
- m_pSndLossList = NULL; |
- m_pRcvLossList = NULL; |
- m_pACKWindow = NULL; |
- m_pSndTimeWindow = NULL; |
- m_pRcvTimeWindow = NULL; |
- |
- m_pSndQueue = NULL; |
- m_pRcvQueue = NULL; |
- m_pPeerAddr = NULL; |
- m_pSNode = NULL; |
- m_pRNode = NULL; |
- |
- // Initilize mutex and condition variables |
- initSynch(); |
- |
- // Default UDT configurations |
- m_iMSS = ancestor.m_iMSS; |
- m_bSynSending = ancestor.m_bSynSending; |
- m_bSynRecving = ancestor.m_bSynRecving; |
- m_iFlightFlagSize = ancestor.m_iFlightFlagSize; |
- m_iSndBufSize = ancestor.m_iSndBufSize; |
- m_iRcvBufSize = ancestor.m_iRcvBufSize; |
- m_Linger = ancestor.m_Linger; |
- m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize; |
- m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize; |
- m_iSockType = ancestor.m_iSockType; |
- m_iIPversion = ancestor.m_iIPversion; |
- m_bRendezvous = ancestor.m_bRendezvous; |
- m_iSndTimeOut = ancestor.m_iSndTimeOut; |
- m_iRcvTimeOut = ancestor.m_iRcvTimeOut; |
- m_bReuseAddr = true; // this must be true, because all accepted sockets shared the same port with the listener |
- m_llMaxBW = ancestor.m_llMaxBW; |
- |
- m_pCCFactory = ancestor.m_pCCFactory->clone(); |
- m_pCC = NULL; |
- m_pCache = ancestor.m_pCache; |
- |
- // Initial status |
- m_bOpened = false; |
- m_bListening = false; |
- m_bConnected = false; |
- m_bClosing = false; |
- m_bShutdown = false; |
- m_bBroken = false; |
- m_bPeerHealth = true; |
- m_ullLingerExpiration = 0; |
-} |
- |
-CUDT::~CUDT() |
-{ |
- // release mutex/condtion variables |
- destroySynch(); |
- |
- // destroy the data structures |
- delete m_pSndBuffer; |
- delete m_pRcvBuffer; |
- delete m_pSndLossList; |
- delete m_pRcvLossList; |
- delete m_pACKWindow; |
- delete m_pSndTimeWindow; |
- delete m_pRcvTimeWindow; |
- delete m_pCCFactory; |
- delete m_pCC; |
- delete m_pPeerAddr; |
- delete m_pSNode; |
- delete m_pRNode; |
-} |
- |
-void CUDT::setOpt(UDTOpt optName, const void* optval, const int&) |
-{ |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- |
- CGuard cg(m_ConnectionLock); |
- CGuard sendguard(m_SendLock); |
- CGuard recvguard(m_RecvLock); |
- |
- switch (optName) |
- { |
- case UDT_MSS: |
- if (m_bOpened) |
- throw CUDTException(5, 1, 0); |
- |
- if (*(int*)optval < int(28 + CHandShake::m_iContentSize)) |
- throw CUDTException(5, 3, 0); |
- |
- m_iMSS = *(int*)optval; |
- |
- // Packet size cannot be greater than UDP buffer size |
- if (m_iMSS > m_iUDPSndBufSize) |
- m_iMSS = m_iUDPSndBufSize; |
- if (m_iMSS > m_iUDPRcvBufSize) |
- m_iMSS = m_iUDPRcvBufSize; |
- |
- break; |
- |
- case UDT_SNDSYN: |
- m_bSynSending = *(bool *)optval; |
- break; |
- |
- case UDT_RCVSYN: |
- m_bSynRecving = *(bool *)optval; |
- break; |
- |
- case UDT_CC: |
- if (m_bConnected) |
- throw CUDTException(5, 1, 0); |
- if (NULL != m_pCCFactory) |
- delete m_pCCFactory; |
- m_pCCFactory = ((CCCVirtualFactory *)optval)->clone(); |
- |
- break; |
- |
- case UDT_FC: |
- if (m_bConnected) |
- throw CUDTException(5, 2, 0); |
- |
- if (*(int*)optval < 1) |
- throw CUDTException(5, 3); |
- |
- // Mimimum recv flight flag size is 32 packets |
- if (*(int*)optval > 32) |
- m_iFlightFlagSize = *(int*)optval; |
- else |
- m_iFlightFlagSize = 32; |
- |
- break; |
- |
- case UDT_SNDBUF: |
- if (m_bOpened) |
- throw CUDTException(5, 1, 0); |
- |
- if (*(int*)optval <= 0) |
- throw CUDTException(5, 3, 0); |
- |
- m_iSndBufSize = *(int*)optval / (m_iMSS - 28); |
- |
- break; |
- |
- case UDT_RCVBUF: |
- if (m_bOpened) |
- throw CUDTException(5, 1, 0); |
- |
- if (*(int*)optval <= 0) |
- throw CUDTException(5, 3, 0); |
- |
- // Mimimum recv buffer size is 32 packets |
- if (*(int*)optval > (m_iMSS - 28) * 32) |
- m_iRcvBufSize = *(int*)optval / (m_iMSS - 28); |
- else |
- m_iRcvBufSize = 32; |
- |
- // recv buffer MUST not be greater than FC size |
- if (m_iRcvBufSize > m_iFlightFlagSize) |
- m_iRcvBufSize = m_iFlightFlagSize; |
- |
- break; |
- |
- case UDT_LINGER: |
- m_Linger = *(linger*)optval; |
- break; |
- |
- case UDP_SNDBUF: |
- if (m_bOpened) |
- throw CUDTException(5, 1, 0); |
- |
- m_iUDPSndBufSize = *(int*)optval; |
- |
- if (m_iUDPSndBufSize < m_iMSS) |
- m_iUDPSndBufSize = m_iMSS; |
- |
- break; |
- |
- case UDP_RCVBUF: |
- if (m_bOpened) |
- throw CUDTException(5, 1, 0); |
- |
- m_iUDPRcvBufSize = *(int*)optval; |
- |
- if (m_iUDPRcvBufSize < m_iMSS) |
- m_iUDPRcvBufSize = m_iMSS; |
- |
- break; |
- |
- case UDT_RENDEZVOUS: |
- if (m_bConnected) |
- throw CUDTException(5, 1, 0); |
- m_bRendezvous = *(bool *)optval; |
- break; |
- |
- case UDT_SNDTIMEO: |
- m_iSndTimeOut = *(int*)optval; |
- break; |
- |
- case UDT_RCVTIMEO: |
- m_iRcvTimeOut = *(int*)optval; |
- break; |
- |
- case UDT_REUSEADDR: |
- if (m_bOpened) |
- throw CUDTException(5, 1, 0); |
- m_bReuseAddr = *(bool*)optval; |
- break; |
- |
- case UDT_MAXBW: |
- if (m_bConnected) |
- throw CUDTException(5, 1, 0); |
- m_llMaxBW = *(int64_t*)optval; |
- break; |
- |
- default: |
- throw CUDTException(5, 0, 0); |
- } |
-} |
- |
-void CUDT::getOpt(UDTOpt optName, void* optval, int& optlen) |
-{ |
- CGuard cg(m_ConnectionLock); |
- |
- switch (optName) |
- { |
- case UDT_MSS: |
- *(int*)optval = m_iMSS; |
- optlen = sizeof(int); |
- break; |
- |
- case UDT_SNDSYN: |
- *(bool*)optval = m_bSynSending; |
- optlen = sizeof(bool); |
- break; |
- |
- case UDT_RCVSYN: |
- *(bool*)optval = m_bSynRecving; |
- optlen = sizeof(bool); |
- break; |
- |
- case UDT_CC: |
- if (!m_bOpened) |
- throw CUDTException(5, 5, 0); |
- *(CCC**)optval = m_pCC; |
- optlen = sizeof(CCC*); |
- |
- break; |
- |
- case UDT_FC: |
- *(int*)optval = m_iFlightFlagSize; |
- optlen = sizeof(int); |
- break; |
- |
- case UDT_SNDBUF: |
- *(int*)optval = m_iSndBufSize * (m_iMSS - 28); |
- optlen = sizeof(int); |
- break; |
- |
- case UDT_RCVBUF: |
- *(int*)optval = m_iRcvBufSize * (m_iMSS - 28); |
- optlen = sizeof(int); |
- break; |
- |
- case UDT_LINGER: |
- if (optlen < (int)(sizeof(linger))) |
- throw CUDTException(5, 3, 0); |
- |
- *(linger*)optval = m_Linger; |
- optlen = sizeof(linger); |
- break; |
- |
- case UDP_SNDBUF: |
- *(int*)optval = m_iUDPSndBufSize; |
- optlen = sizeof(int); |
- break; |
- |
- case UDP_RCVBUF: |
- *(int*)optval = m_iUDPRcvBufSize; |
- optlen = sizeof(int); |
- break; |
- |
- case UDT_RENDEZVOUS: |
- *(bool *)optval = m_bRendezvous; |
- optlen = sizeof(bool); |
- break; |
- |
- case UDT_SNDTIMEO: |
- *(int*)optval = m_iSndTimeOut; |
- optlen = sizeof(int); |
- break; |
- |
- case UDT_RCVTIMEO: |
- *(int*)optval = m_iRcvTimeOut; |
- optlen = sizeof(int); |
- break; |
- |
- case UDT_REUSEADDR: |
- *(bool *)optval = m_bReuseAddr; |
- optlen = sizeof(bool); |
- break; |
- |
- case UDT_MAXBW: |
- *(int64_t*)optval = m_llMaxBW; |
- break; |
- |
- default: |
- throw CUDTException(5, 0, 0); |
- } |
-} |
- |
-void CUDT::open() |
-{ |
- CGuard cg(m_ConnectionLock); |
- |
- // Initial sequence number, loss, acknowledgement, etc. |
- m_iPktSize = m_iMSS - 28; |
- m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; |
- |
- m_iEXPCount = 1; |
- m_iBandwidth = 1; |
- m_iDeliveryRate = 16; |
- m_iAckSeqNo = 0; |
- m_ullLastAckTime = 0; |
- |
- // trace information |
- m_StartTime = CTimer::getTime(); |
- m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0; |
- m_LastSampleTime = CTimer::getTime(); |
- m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; |
- m_llSndDuration = m_llSndDurationTotal = 0; |
- |
- // structures for queue |
- if (NULL == m_pSNode) |
- m_pSNode = new CSNode; |
- m_pSNode->m_pUDT = this; |
- m_pSNode->m_llTimeStamp = 1; |
- m_pSNode->m_iHeapLoc = -1; |
- |
- if (NULL == m_pRNode) |
- m_pRNode = new CRNode; |
- m_pRNode->m_pUDT = this; |
- m_pRNode->m_llTimeStamp = 1; |
- m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL; |
- m_pRNode->m_bOnList = false; |
- |
- m_iRTT = 10 * m_iSYNInterval; |
- m_iRTTVar = m_iRTT >> 1; |
- m_ullCPUFrequency = CTimer::getCPUFrequency(); |
- |
- // set up the timers |
- m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency; |
- |
- // set minimum NAK and EXP timeout to 100ms |
- m_ullMinNakInt = 300000 * m_ullCPUFrequency; |
- m_ullMinExpInt = 300000 * m_ullCPUFrequency; |
- |
- m_ullACKInt = m_ullSYNInt; |
- m_ullNAKInt = m_ullMinNakInt; |
- m_ullEXPInt = m_ullMinExpInt; |
- m_llLastRspTime = CTimer::getTime(); |
- |
- CTimer::rdtsc(m_ullNextACKTime); |
- m_ullNextACKTime += m_ullSYNInt; |
- CTimer::rdtsc(m_ullNextNAKTime); |
- m_ullNextNAKTime += m_ullNAKInt; |
- CTimer::rdtsc(m_ullNextEXPTime); |
- m_ullNextEXPTime += m_ullEXPInt; |
- |
- m_iPktCount = 0; |
- m_iLightACKCount = 1; |
- |
- m_ullTargetTime = 0; |
- m_ullTimeDiff = 0; |
- |
- // Now UDT is opened. |
- m_bOpened = true; |
-} |
- |
-void CUDT::listen() |
-{ |
- CGuard cg(m_ConnectionLock); |
- |
- if (!m_bOpened) |
- throw CUDTException(5, 0, 0); |
- |
- if (m_bConnected) |
- throw CUDTException(5, 2, 0); |
- |
- // listen can be called more than once |
- if (m_bListening) |
- return; |
- |
- // if there is already another socket listening on the same port |
- if (m_pRcvQueue->setListener(this) < 0) |
- throw CUDTException(5, 11, 0); |
- |
- m_bListening = true; |
-} |
- |
-void CUDT::connect(const sockaddr* serv_addr) |
-{ |
- CGuard cg(m_ConnectionLock); |
- |
- if (!m_bOpened) |
- throw CUDTException(5, 0, 0); |
- |
- if (m_bListening) |
- throw CUDTException(5, 2, 0); |
- |
- if (m_bConnected) |
- throw CUDTException(5, 2, 0); |
- |
- // register this socket in the rendezvous queue |
- // RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this function |
- m_pRcvQueue->m_pRendezvousQueue->insert(m_SocketID, m_iIPversion, serv_addr); |
- |
- CPacket request; |
- char* reqdata = new char [m_iPayloadSize]; |
- CHandShake req; |
- |
- CPacket response; |
- char* resdata = new char [m_iPayloadSize]; |
- CHandShake res; |
- |
- // This is my current configurations |
- req.m_iVersion = m_iVersion; |
- req.m_iType = m_iSockType; |
- req.m_iMSS = m_iMSS; |
- req.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize; |
- req.m_iReqType = (!m_bRendezvous) ? 1 : 0; |
- req.m_iID = m_SocketID; |
- CIPAddress::ntop(serv_addr, req.m_piPeerIP, m_iIPversion); |
- |
- // Random Initial Sequence Number |
- srand((unsigned int)CTimer::getTime()); |
- m_iISN = req.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX)); |
- |
- m_iLastDecSeq = req.m_iISN - 1; |
- m_iSndLastAck = req.m_iISN; |
- m_iSndLastDataAck = req.m_iISN; |
- m_iSndCurrSeqNo = req.m_iISN - 1; |
- m_iSndLastAck2 = req.m_iISN; |
- m_ullSndLastAck2Time = CTimer::getTime(); |
- |
- // Inform the server my configurations. |
- request.pack(0, NULL, reqdata, m_iPayloadSize); |
- // ID = 0, connection request |
- request.m_iID = 0; |
- |
- // Wait for the negotiated configurations from the peer side. |
- response.pack(0, NULL, resdata, m_iPayloadSize); |
- |
- uint64_t timeo = 3000000; |
- if (m_bRendezvous) |
- timeo *= 10; |
- uint64_t entertime = CTimer::getTime(); |
- uint64_t last_req_time = 0; |
- |
- CUDTException e(0, 0); |
- char* tmp = NULL; |
- |
- while (!m_bClosing) |
- { |
- // avoid sending too many requests, at most 1 request per 250ms |
- if (CTimer::getTime() - last_req_time > 250000) |
- { |
- req.serialize(reqdata, m_iPayloadSize); |
- request.setLength(CHandShake::m_iContentSize); |
- m_pSndQueue->sendto(serv_addr, request); |
- |
- last_req_time = CTimer::getTime(); |
- } |
- |
- response.setLength(m_iPayloadSize); |
- if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0) |
- { |
- if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (NULL != tmp)) |
- { |
- // a data packet or a keep-alive packet comes, which means the peer side is already connected |
- // in this situation, a previously recorded response (tmp) will be used |
- res.deserialize(tmp, CHandShake::m_iContentSize); |
- memcpy(m_piSelfIP, res.m_piPeerIP, 16); |
- break; |
- } |
- |
- if ((1 != response.getFlag()) || (0 != response.getType())) |
- response.setLength(-1); |
- else |
- { |
- res.deserialize(response.m_pcData, response.getLength()); |
- |
- if (m_bRendezvous) |
- { |
- // regular connect should NOT communicate with rendezvous connect |
- // rendezvous connect require 3-way handshake |
- if (1 == res.m_iReqType) |
- response.setLength(-1); |
- else if ((0 == res.m_iReqType) || (0 == req.m_iReqType)) |
- { |
- if (NULL == tmp) |
- tmp = new char [m_iPayloadSize]; |
- res.serialize(tmp, m_iPayloadSize); |
- |
- req.m_iReqType = -1; |
- request.m_iID = res.m_iID; |
- response.setLength(-1); |
- } |
- } |
- else |
- { |
- // set cookie |
- if (1 == res.m_iReqType) |
- { |
- req.m_iReqType = -1; |
- req.m_iCookie = res.m_iCookie; |
- response.setLength(-1); |
- } |
- } |
- } |
- |
- // new request/response should be sent out immediately on receving a response |
- last_req_time = 0; |
- } |
- |
- if (response.getLength() > 0) |
- { |
- memcpy(m_piSelfIP, res.m_piPeerIP, 16); |
- break; |
- } |
- |
- if (CTimer::getTime() > entertime + timeo) |
- { |
- // timeout |
- e = CUDTException(1, 1, 0); |
- break; |
- } |
- } |
- |
- delete [] tmp; |
- delete [] reqdata; |
- delete [] resdata; |
- |
- // remove from rendezvous queue |
- m_pRcvQueue->m_pRendezvousQueue->remove(m_SocketID); |
- |
- if (e.getErrorCode() == 0) |
- { |
- if (m_bClosing) // if the socket is closed before connection... |
- e = CUDTException(1); |
- else if (1002 == res.m_iReqType) // connection request rejected |
- e = CUDTException(1, 2, 0); |
- else if ((!m_bRendezvous) && (m_iISN != res.m_iISN)) // secuity check |
- e = CUDTException(1, 4, 0); |
- } |
- |
- if (e.getErrorCode() != 0) |
- { |
- throw e; |
- } |
- |
- // Got it. Re-configure according to the negotiated values. |
- m_iMSS = res.m_iMSS; |
- m_iFlowWindowSize = res.m_iFlightFlagSize; |
- m_iPktSize = m_iMSS - 28; |
- m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; |
- m_iPeerISN = res.m_iISN; |
- m_iRcvLastAck = res.m_iISN; |
- m_iRcvLastAckAck = res.m_iISN; |
- m_iRcvCurrSeqNo = res.m_iISN - 1; |
- m_PeerID = res.m_iID; |
- |
- // Prepare all data structures |
- try |
- { |
- m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); |
- m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize); |
- // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. |
- m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); |
- m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); |
- m_pACKWindow = new CACKWindow(1024); |
- m_pRcvTimeWindow = new CPktTimeWindow(16, 64); |
- m_pSndTimeWindow = new CPktTimeWindow(); |
- } |
- catch (...) |
- { |
- throw CUDTException(3, 2, 0); |
- } |
- |
- m_pCC = m_pCCFactory->create(); |
- m_pCC->m_UDT = m_SocketID; |
- m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); |
- m_dCongestionWindow = m_pCC->m_dCWndSize; |
- |
- CInfoBlock ib; |
- ib.m_iIPversion = m_iIPversion; |
- CInfoBlock::convert(serv_addr, m_iIPversion, ib.m_piIP); |
- if (m_pCache->lookup(&ib) >= 0) |
- { |
- m_iRTT = ib.m_iRTT; |
- m_iBandwidth = ib.m_iBandwidth; |
- } |
- |
- m_pCC->setMSS(m_iMSS); |
- m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize); |
- m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo); |
- m_pCC->setRcvRate(m_iDeliveryRate); |
- m_pCC->setRTT(m_iRTT); |
- m_pCC->setBandwidth(m_iBandwidth); |
- if (m_llMaxBW > 0) m_pCC->setUserParam((char*)&(m_llMaxBW), 8); |
- m_pCC->init(); |
- |
- m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6; |
- memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); |
- |
- // And, I am connected too. |
- m_bConnected = true; |
- |
- // register this socket for receiving data packets |
- m_pRNode->m_bOnList = true; |
- m_pRcvQueue->setNewEntry(this); |
- |
- // acknowledde any waiting epolls to read/write |
- s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); |
- s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); |
-} |
- |
-void CUDT::connect(const sockaddr* peer, CHandShake* hs) |
-{ |
- CGuard cg(m_ConnectionLock); |
- |
- // Uses the smaller MSS between the peers |
- if (hs->m_iMSS > m_iMSS) |
- hs->m_iMSS = m_iMSS; |
- else |
- m_iMSS = hs->m_iMSS; |
- |
- // exchange info for maximum flow window size |
- m_iFlowWindowSize = hs->m_iFlightFlagSize; |
- hs->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize; |
- |
- m_iPeerISN = hs->m_iISN; |
- |
- m_iRcvLastAck = hs->m_iISN; |
- m_iRcvLastAckAck = hs->m_iISN; |
- m_iRcvCurrSeqNo = hs->m_iISN - 1; |
- |
- m_PeerID = hs->m_iID; |
- hs->m_iID = m_SocketID; |
- |
- // use peer's ISN and send it back for security check |
- m_iISN = hs->m_iISN; |
- |
- m_iLastDecSeq = m_iISN - 1; |
- m_iSndLastAck = m_iISN; |
- m_iSndLastDataAck = m_iISN; |
- m_iSndCurrSeqNo = m_iISN - 1; |
- m_iSndLastAck2 = m_iISN; |
- m_ullSndLastAck2Time = CTimer::getTime(); |
- |
- // this is a reponse handshake |
- hs->m_iReqType = -1; |
- |
- // get local IP address and send the peer its IP address (because UDP cannot get local IP address) |
- memcpy(m_piSelfIP, hs->m_piPeerIP, 16); |
- CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion); |
- |
- m_iPktSize = m_iMSS - 28; |
- m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; |
- |
- // Prepare all structures |
- try |
- { |
- m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); |
- m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize); |
- m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); |
- m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); |
- m_pACKWindow = new CACKWindow(1024); |
- m_pRcvTimeWindow = new CPktTimeWindow(16, 64); |
- m_pSndTimeWindow = new CPktTimeWindow(); |
- } |
- catch (...) |
- { |
- throw CUDTException(3, 2, 0); |
- } |
- |
- m_pCC = m_pCCFactory->create(); |
- m_pCC->m_UDT = m_SocketID; |
- m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); |
- m_dCongestionWindow = m_pCC->m_dCWndSize; |
- |
- CInfoBlock ib; |
- ib.m_iIPversion = m_iIPversion; |
- CInfoBlock::convert(peer, m_iIPversion, ib.m_piIP); |
- if (m_pCache->lookup(&ib) >= 0) |
- { |
- m_iRTT = ib.m_iRTT; |
- m_iBandwidth = ib.m_iBandwidth; |
- } |
- |
- m_pCC->setMSS(m_iMSS); |
- m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize); |
- m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo); |
- m_pCC->setRcvRate(m_iDeliveryRate); |
- m_pCC->setRTT(m_iRTT); |
- m_pCC->setBandwidth(m_iBandwidth); |
- if (m_llMaxBW > 0) m_pCC->setUserParam((char*)&(m_llMaxBW), 8); |
- m_pCC->init(); |
- |
- m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6; |
- memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); |
- |
- // And of course, it is connected. |
- m_bConnected = true; |
- |
- // register this socket for receiving data packets |
- m_pRNode->m_bOnList = true; |
- m_pRcvQueue->setNewEntry(this); |
- |
- //send the response to the peer, see listen() for more discussions about this |
- CPacket response; |
- int size = CHandShake::m_iContentSize; |
- char* buffer = new char[size]; |
- hs->serialize(buffer, size); |
- response.pack(0, NULL, buffer, size); |
- response.m_iID = m_PeerID; |
- m_pSndQueue->sendto(peer, response); |
- delete [] buffer; |
-} |
- |
-void CUDT::close() |
-{ |
- if (!m_bOpened) |
- return; |
- |
- if (0 != m_Linger.l_onoff) |
- { |
- uint64_t entertime = CTimer::getTime(); |
- |
- while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) |
- { |
- // linger has been checked by previous close() call and has expired |
- if (m_ullLingerExpiration >= entertime) |
- break; |
- |
- if (!m_bSynSending) |
- { |
- // if this socket enables asynchronous sending, return immediately and let GC to close it later |
- if (0 == m_ullLingerExpiration) |
- m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL; |
- |
- return; |
- } |
- |
- #ifndef WIN32 |
- timespec ts; |
- ts.tv_sec = 0; |
- ts.tv_nsec = 1000000; |
- nanosleep(&ts, NULL); |
- #else |
- Sleep(1); |
- #endif |
- } |
- } |
- |
- // remove this socket from the snd queue |
- if (m_bConnected) |
- m_pSndQueue->m_pSndUList->remove(this); |
- |
- // remove itself from all epoll monitoring |
- for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i) |
- s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID); |
- |
- CGuard cg(m_ConnectionLock); |
- |
- if (!m_bOpened) |
- return; |
- |
- // Inform the threads handler to stop. |
- m_bClosing = true; |
- |
- // Signal the sender and recver if they are waiting for data. |
- releaseSynch(); |
- |
- if (m_bListening) |
- { |
- m_bListening = false; |
- m_pRcvQueue->removeListener(this); |
- } |
- if (m_bConnected) |
- { |
- if (!m_bShutdown) |
- sendCtrl(5); |
- |
- m_pCC->close(); |
- |
- CInfoBlock ib; |
- ib.m_iIPversion = m_iIPversion; |
- CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP); |
- ib.m_iRTT = m_iRTT; |
- ib.m_iBandwidth = m_iBandwidth; |
- m_pCache->update(&ib); |
- |
- m_bConnected = false; |
- } |
- |
- // waiting all send and recv calls to stop |
- CGuard sendguard(m_SendLock); |
- CGuard recvguard(m_RecvLock); |
- |
- // CLOSED. |
- m_bOpened = false; |
-} |
- |
-int CUDT::send(const char* data, const int& len) |
-{ |
- if (UDT_DGRAM == m_iSockType) |
- throw CUDTException(5, 10, 0); |
- |
- // throw an exception if not connected |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- else if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- |
- if (len <= 0) |
- return 0; |
- |
- CGuard sendguard(m_SendLock); |
- |
- if (m_pSndBuffer->getCurrBufSize() == 0) |
- { |
- // delay the EXP timer to avoid mis-fired timeout |
- uint64_t currtime; |
- CTimer::rdtsc(currtime); |
- m_ullNextEXPTime = currtime + m_ullEXPInt; |
- } |
- |
- if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) |
- { |
- if (!m_bSynSending) |
- throw CUDTException(6, 1, 0); |
- else |
- { |
- // wait here during a blocking sending |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_SendBlockLock); |
- if (m_iSndTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) |
- pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); |
- } |
- else |
- { |
- uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; |
- timespec locktime; |
- |
- locktime.tv_sec = exptime / 1000000; |
- locktime.tv_nsec = (exptime % 1000000) * 1000; |
- |
- while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < exptime)) |
- pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime); |
- } |
- pthread_mutex_unlock(&m_SendBlockLock); |
- #else |
- if (m_iSndTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) |
- WaitForSingleObject(m_SendBlockCond, INFINITE); |
- } |
- else |
- { |
- uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; |
- |
- while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth && (CTimer::getTime() < exptime)) |
- WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000)); |
- } |
- #endif |
- |
- // check the connection status |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- else if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- else if (!m_bPeerHealth) |
- { |
- m_bPeerHealth = true; |
- throw CUDTException(7); |
- } |
- } |
- } |
- |
- if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) |
- { |
- if (m_iSndTimeOut >= 0) |
- throw CUDTException(6, 1, 0); |
- |
- return 0; |
- } |
- |
- int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize; |
- if (size > len) |
- size = len; |
- |
- // record total time used for sending |
- if (0 == m_pSndBuffer->getCurrBufSize()) |
- m_llSndDurationCounter = CTimer::getTime(); |
- |
- // insert the user buffer into the sening list |
- m_pSndBuffer->addBuffer(data, size); |
- |
- // insert this socket to snd list if it is not on the list yet |
- m_pSndQueue->m_pSndUList->update(this, false); |
- |
- if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) |
- { |
- // write is not available any more |
- s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); |
- } |
- |
- return size; |
-} |
- |
-int CUDT::recv(char* data, const int& len) |
-{ |
- if (UDT_DGRAM == m_iSockType) |
- throw CUDTException(5, 10, 0); |
- |
- // throw an exception if not connected |
- if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize())) |
- throw CUDTException(2, 1, 0); |
- |
- if (len <= 0) |
- return 0; |
- |
- CGuard recvguard(m_RecvLock); |
- |
- if (0 == m_pRcvBuffer->getRcvDataSize()) |
- { |
- if (!m_bSynRecving) |
- throw CUDTException(6, 2, 0); |
- else |
- { |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_RecvDataLock); |
- if (m_iRcvTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) |
- pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); |
- } |
- else |
- { |
- uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL; |
- timespec locktime; |
- |
- locktime.tv_sec = exptime / 1000000; |
- locktime.tv_nsec = (exptime % 1000000) * 1000; |
- |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) |
- { |
- pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime); |
- if (CTimer::getTime() >= exptime) |
- break; |
- } |
- } |
- pthread_mutex_unlock(&m_RecvDataLock); |
- #else |
- if (m_iRcvTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) |
- WaitForSingleObject(m_RecvDataCond, INFINITE); |
- } |
- else |
- { |
- uint64_t enter_time = CTimer::getTime(); |
- |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) |
- { |
- int diff = int(CTimer::getTime() - enter_time) / 1000; |
- if (diff >= m_iRcvTimeOut) |
- break; |
- WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut - diff )); |
- } |
- } |
- #endif |
- } |
- } |
- |
- // throw an exception if not connected |
- if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize())) |
- throw CUDTException(2, 1, 0); |
- |
- int res = m_pRcvBuffer->readBuffer(data, len); |
- |
- if (m_pRcvBuffer->getRcvDataSize() <= 0) |
- { |
- // read is not available any more |
- s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); |
- } |
- |
- if ((res <= 0) && (m_iRcvTimeOut >= 0)) |
- throw CUDTException(6, 2, 0); |
- |
- return res; |
-} |
- |
-int CUDT::sendmsg(const char* data, const int& len, const int& msttl, const bool& inorder) |
-{ |
- if (UDT_STREAM == m_iSockType) |
- throw CUDTException(5, 9, 0); |
- |
- // throw an exception if not connected |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- else if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- |
- if (len <= 0) |
- return 0; |
- |
- if (len > m_iSndBufSize * m_iPayloadSize) |
- throw CUDTException(5, 12, 0); |
- |
- CGuard sendguard(m_SendLock); |
- |
- if (m_pSndBuffer->getCurrBufSize() == 0) |
- { |
- // delay the EXP timer to avoid mis-fired timeout |
- uint64_t currtime; |
- CTimer::rdtsc(currtime); |
- m_ullNextEXPTime = currtime + m_ullEXPInt; |
- } |
- |
- if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) |
- { |
- if (!m_bSynSending) |
- throw CUDTException(6, 1, 0); |
- else |
- { |
- // wait here during a blocking sending |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_SendBlockLock); |
- if (m_iSndTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)) |
- pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); |
- } |
- else |
- { |
- uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; |
- timespec locktime; |
- |
- locktime.tv_sec = exptime / 1000000; |
- locktime.tv_nsec = (exptime % 1000000) * 1000; |
- |
- while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime)) |
- pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime); |
- } |
- pthread_mutex_unlock(&m_SendBlockLock); |
- #else |
- if (m_iSndTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)) |
- WaitForSingleObject(m_SendBlockCond, INFINITE); |
- } |
- else |
- { |
- uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; |
- |
- while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime)) |
- WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000)); |
- } |
- #endif |
- |
- // check the connection status |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- else if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- } |
- } |
- |
- if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) |
- { |
- if (m_iSndTimeOut >= 0) |
- throw CUDTException(6, 1, 0); |
- |
- return 0; |
- } |
- |
- // record total time used for sending |
- if (0 == m_pSndBuffer->getCurrBufSize()) |
- m_llSndDurationCounter = CTimer::getTime(); |
- |
- // insert the user buffer into the sening list |
- m_pSndBuffer->addBuffer(data, len, msttl, inorder); |
- |
- // insert this socket to the snd list if it is not on the list yet |
- m_pSndQueue->m_pSndUList->update(this, false); |
- |
- if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) |
- { |
- // write is not available any more |
- s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); |
- } |
- |
- return len; |
-} |
- |
-int CUDT::recvmsg(char* data, const int& len) |
-{ |
- if (UDT_STREAM == m_iSockType) |
- throw CUDTException(5, 9, 0); |
- |
- // throw an exception if not connected |
- if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- |
- if (len <= 0) |
- return 0; |
- |
- CGuard recvguard(m_RecvLock); |
- |
- if (m_bBroken || m_bClosing) |
- { |
- int res = m_pRcvBuffer->readMsg(data, len); |
- if (0 == res) |
- { |
- // read is not available |
- s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); |
- |
- throw CUDTException(2, 1, 0); |
- } |
- else |
- return res; |
- } |
- |
- if (!m_bSynRecving) |
- { |
- int res = m_pRcvBuffer->readMsg(data, len); |
- if (0 == res) |
- throw CUDTException(6, 2, 0); |
- else |
- return res; |
- } |
- |
- int res = 0; |
- bool timeout = false; |
- |
- do |
- { |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_RecvDataLock); |
- |
- if (m_iRcvTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len)))) |
- pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); |
- } |
- else |
- { |
- uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL; |
- timespec locktime; |
- |
- locktime.tv_sec = exptime / 1000000; |
- locktime.tv_nsec = (exptime % 1000000) * 1000; |
- |
- if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT) |
- timeout = true; |
- |
- res = m_pRcvBuffer->readMsg(data, len); |
- } |
- pthread_mutex_unlock(&m_RecvDataLock); |
- #else |
- if (m_iRcvTimeOut < 0) |
- { |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len)))) |
- WaitForSingleObject(m_RecvDataCond, INFINITE); |
- } |
- else |
- { |
- if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT) |
- timeout = true; |
- |
- res = m_pRcvBuffer->readMsg(data, len); |
- } |
- #endif |
- |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- else if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- } while ((0 == res) && !timeout); |
- |
- if (m_pRcvBuffer->getRcvMsgNum() <= 0) |
- { |
- // read is not available any more |
- s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); |
- } |
- |
- if ((res <= 0) && (m_iRcvTimeOut >= 0)) |
- throw CUDTException(6, 2, 0); |
- |
- return res; |
-} |
- |
-int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, const int64_t& size, const int& block) |
-{ |
- if (UDT_DGRAM == m_iSockType) |
- throw CUDTException(5, 10, 0); |
- |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- else if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- |
- if (size <= 0) |
- return 0; |
- |
- CGuard sendguard(m_SendLock); |
- |
- if (m_pSndBuffer->getCurrBufSize() == 0) |
- { |
- // delay the EXP timer to avoid mis-fired timeout |
- uint64_t currtime; |
- CTimer::rdtsc(currtime); |
- m_ullNextEXPTime = currtime + m_ullEXPInt; |
- } |
- |
- int64_t tosend = size; |
- int unitsize; |
- |
- // positioning... |
- try |
- { |
- ifs.seekg((streamoff)offset); |
- } |
- catch (...) |
- { |
- throw CUDTException(4, 1); |
- } |
- |
- // sending block by block |
- while (tosend > 0) |
- { |
- if (ifs.fail()) |
- throw CUDTException(4, 4); |
- |
- if (ifs.eof()) |
- break; |
- |
- unitsize = int((tosend >= block) ? block : tosend); |
- |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_SendBlockLock); |
- while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) |
- pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); |
- pthread_mutex_unlock(&m_SendBlockLock); |
- #else |
- while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) && m_bPeerHealth) |
- WaitForSingleObject(m_SendBlockCond, INFINITE); |
- #endif |
- |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- else if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- else if (!m_bPeerHealth) |
- { |
- // reset peer health status, once this error returns, the app should handle the situation at the peer side |
- m_bPeerHealth = true; |
- throw CUDTException(7); |
- } |
- |
- // record total time used for sending |
- if (0 == m_pSndBuffer->getCurrBufSize()) |
- m_llSndDurationCounter = CTimer::getTime(); |
- |
- int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize); |
- |
- if (sentsize > 0) |
- { |
- tosend -= sentsize; |
- offset += sentsize; |
- } |
- |
- // insert this socket to snd list if it is not on the list yet |
- m_pSndQueue->m_pSndUList->update(this, false); |
- } |
- |
- if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) |
- { |
- // write is not available any more |
- s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); |
- } |
- |
- return size - tosend; |
-} |
- |
-int64_t CUDT::recvfile(fstream& ofs, int64_t& offset, const int64_t& size, const int& block) |
-{ |
- if (UDT_DGRAM == m_iSockType) |
- throw CUDTException(5, 10, 0); |
- |
- if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize())) |
- throw CUDTException(2, 1, 0); |
- |
- if (size <= 0) |
- return 0; |
- |
- CGuard recvguard(m_RecvLock); |
- |
- int64_t torecv = size; |
- int unitsize = block; |
- int recvsize; |
- |
- // positioning... |
- try |
- { |
- ofs.seekp((streamoff)offset); |
- } |
- catch (...) |
- { |
- throw CUDTException(4, 3); |
- } |
- |
- // receiving... "recvfile" is always blocking |
- while (torecv > 0) |
- { |
- if (ofs.fail()) |
- { |
- // send the sender a signal so it will not be blocked forever |
- int32_t err_code = CUDTException::EFILE; |
- sendCtrl(8, &err_code); |
- |
- throw CUDTException(4, 4); |
- } |
- |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_RecvDataLock); |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) |
- pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); |
- pthread_mutex_unlock(&m_RecvDataLock); |
- #else |
- while (!m_bBroken && m_bConnected && !m_bClosing && (0 == m_pRcvBuffer->getRcvDataSize())) |
- WaitForSingleObject(m_RecvDataCond, INFINITE); |
- #endif |
- |
- if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- else if ((m_bBroken || m_bClosing) && (0 == m_pRcvBuffer->getRcvDataSize())) |
- throw CUDTException(2, 1, 0); |
- |
- unitsize = int((torecv >= block) ? block : torecv); |
- recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize); |
- |
- if (recvsize > 0) |
- { |
- torecv -= recvsize; |
- offset += recvsize; |
- } |
- } |
- |
- if (m_pRcvBuffer->getRcvDataSize() <= 0) |
- { |
- // read is not available any more |
- s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); |
- } |
- |
- return size - torecv; |
-} |
- |
-void CUDT::sample(CPerfMon* perf, bool clear) |
-{ |
- if (!m_bConnected) |
- throw CUDTException(2, 2, 0); |
- if (m_bBroken || m_bClosing) |
- throw CUDTException(2, 1, 0); |
- |
- uint64_t currtime = CTimer::getTime(); |
- perf->msTimeStamp = (currtime - m_StartTime) / 1000; |
- |
- perf->pktSent = m_llTraceSent; |
- perf->pktRecv = m_llTraceRecv; |
- perf->pktSndLoss = m_iTraceSndLoss; |
- perf->pktRcvLoss = m_iTraceRcvLoss; |
- perf->pktRetrans = m_iTraceRetrans; |
- perf->pktSentACK = m_iSentACK; |
- perf->pktRecvACK = m_iRecvACK; |
- perf->pktSentNAK = m_iSentNAK; |
- perf->pktRecvNAK = m_iRecvNAK; |
- perf->usSndDuration = m_llSndDuration; |
- |
- perf->pktSentTotal = m_llSentTotal; |
- perf->pktRecvTotal = m_llRecvTotal; |
- perf->pktSndLossTotal = m_iSndLossTotal; |
- perf->pktRcvLossTotal = m_iRcvLossTotal; |
- perf->pktRetransTotal = m_iRetransTotal; |
- perf->pktSentACKTotal = m_iSentACKTotal; |
- perf->pktRecvACKTotal = m_iRecvACKTotal; |
- perf->pktSentNAKTotal = m_iSentNAKTotal; |
- perf->pktRecvNAKTotal = m_iRecvNAKTotal; |
- perf->usSndDurationTotal = m_llSndDurationTotal; |
- |
- double interval = double(currtime - m_LastSampleTime); |
- |
- perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval; |
- perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval; |
- |
- perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency); |
- perf->pktFlowWindow = m_iFlowWindowSize; |
- perf->pktCongestionWindow = (int)m_dCongestionWindow; |
- perf->pktFlightSize = CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSeqNo::incseq(m_iSndCurrSeqNo)) - 1; |
- perf->msRTT = m_iRTT/1000.0; |
- perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0; |
- |
- #ifndef WIN32 |
- if (0 == pthread_mutex_trylock(&m_ConnectionLock)) |
- #else |
- if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0)) |
- #endif |
- { |
- perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iMSS; |
- perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvailBufSize() * m_iMSS; |
- |
- #ifndef WIN32 |
- pthread_mutex_unlock(&m_ConnectionLock); |
- #else |
- ReleaseMutex(m_ConnectionLock); |
- #endif |
- } |
- else |
- { |
- perf->byteAvailSndBuf = 0; |
- perf->byteAvailRcvBuf = 0; |
- } |
- |
- if (clear) |
- { |
- m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; |
- m_llSndDuration = 0; |
- m_LastSampleTime = currtime; |
- } |
-} |
- |
-void CUDT::initSynch() |
-{ |
- #ifndef WIN32 |
- pthread_mutex_init(&m_SendBlockLock, NULL); |
- pthread_cond_init(&m_SendBlockCond, NULL); |
- pthread_mutex_init(&m_RecvDataLock, NULL); |
- pthread_cond_init(&m_RecvDataCond, NULL); |
- pthread_mutex_init(&m_SendLock, NULL); |
- pthread_mutex_init(&m_RecvLock, NULL); |
- pthread_mutex_init(&m_AckLock, NULL); |
- pthread_mutex_init(&m_ConnectionLock, NULL); |
- #else |
- m_SendBlockLock = CreateMutex(NULL, false, NULL); |
- m_SendBlockCond = CreateEvent(NULL, false, false, NULL); |
- m_RecvDataLock = CreateMutex(NULL, false, NULL); |
- m_RecvDataCond = CreateEvent(NULL, false, false, NULL); |
- m_SendLock = CreateMutex(NULL, false, NULL); |
- m_RecvLock = CreateMutex(NULL, false, NULL); |
- m_AckLock = CreateMutex(NULL, false, NULL); |
- m_ConnectionLock = CreateMutex(NULL, false, NULL); |
- #endif |
-} |
- |
-void CUDT::destroySynch() |
-{ |
- #ifndef WIN32 |
- pthread_mutex_destroy(&m_SendBlockLock); |
- pthread_cond_destroy(&m_SendBlockCond); |
- pthread_mutex_destroy(&m_RecvDataLock); |
- pthread_cond_destroy(&m_RecvDataCond); |
- pthread_mutex_destroy(&m_SendLock); |
- pthread_mutex_destroy(&m_RecvLock); |
- pthread_mutex_destroy(&m_AckLock); |
- pthread_mutex_destroy(&m_ConnectionLock); |
- #else |
- CloseHandle(m_SendBlockLock); |
- CloseHandle(m_SendBlockCond); |
- CloseHandle(m_RecvDataLock); |
- CloseHandle(m_RecvDataCond); |
- CloseHandle(m_SendLock); |
- CloseHandle(m_RecvLock); |
- CloseHandle(m_AckLock); |
- CloseHandle(m_ConnectionLock); |
- #endif |
-} |
- |
-void CUDT::releaseSynch() |
-{ |
- #ifndef WIN32 |
- // wake up user calls |
- pthread_mutex_lock(&m_SendBlockLock); |
- pthread_cond_signal(&m_SendBlockCond); |
- pthread_mutex_unlock(&m_SendBlockLock); |
- |
- pthread_mutex_lock(&m_SendLock); |
- pthread_mutex_unlock(&m_SendLock); |
- |
- pthread_mutex_lock(&m_RecvDataLock); |
- pthread_cond_signal(&m_RecvDataCond); |
- pthread_mutex_unlock(&m_RecvDataLock); |
- |
- pthread_mutex_lock(&m_RecvLock); |
- pthread_mutex_unlock(&m_RecvLock); |
- #else |
- SetEvent(m_SendBlockCond); |
- WaitForSingleObject(m_SendLock, INFINITE); |
- ReleaseMutex(m_SendLock); |
- SetEvent(m_RecvDataCond); |
- WaitForSingleObject(m_RecvLock, INFINITE); |
- ReleaseMutex(m_RecvLock); |
- #endif |
-} |
- |
-void CUDT::sendCtrl(const int& pkttype, void* lparam, void* rparam, const int& size) |
-{ |
- CPacket ctrlpkt; |
- |
- switch (pkttype) |
- { |
- case 2: //010 - Acknowledgement |
- { |
- int32_t ack; |
- |
- // If there is no loss, the ACK is the current largest sequence number plus 1; |
- // Otherwise it is the smallest sequence number in the receiver loss list. |
- if (0 == m_pRcvLossList->getLossLength()) |
- ack = CSeqNo::incseq(m_iRcvCurrSeqNo); |
- else |
- ack = m_pRcvLossList->getFirstLostSeq(); |
- |
- if (ack == m_iRcvLastAckAck) |
- break; |
- |
- // send out a lite ACK |
- // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number |
- if (4 == size) |
- { |
- ctrlpkt.pack(pkttype, NULL, &ack, size); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- break; |
- } |
- |
- uint64_t currtime; |
- CTimer::rdtsc(currtime); |
- |
- // There are new received packets to acknowledge, update related information. |
- if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0) |
- { |
- int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack); |
- |
- m_iRcvLastAck = ack; |
- |
- m_pRcvBuffer->ackData(acksize); |
- |
- // signal a waiting "recv" call if there is any data available |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_RecvDataLock); |
- if (m_bSynRecving) |
- pthread_cond_signal(&m_RecvDataCond); |
- pthread_mutex_unlock(&m_RecvDataLock); |
- #else |
- if (m_bSynRecving) |
- SetEvent(m_RecvDataCond); |
- #endif |
- |
- // acknowledge any waiting epolls to read |
- s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); |
- } |
- else if (ack == m_iRcvLastAck) |
- { |
- if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency)) |
- break; |
- } |
- else |
- break; |
- |
- // Send out the ACK only if has not been received by the sender before |
- if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0) |
- { |
- int32_t data[6]; |
- |
- m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo); |
- data[0] = m_iRcvLastAck; |
- data[1] = m_iRTT; |
- data[2] = m_iRTTVar; |
- data[3] = m_pRcvBuffer->getAvailBufSize(); |
- // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock |
- if (data[3] < 2) |
- data[3] = 2; |
- |
- if (currtime - m_ullLastAckTime > m_ullSYNInt) |
- { |
- data[4] = m_pRcvTimeWindow->getPktRcvSpeed(); |
- data[5] = m_pRcvTimeWindow->getBandwidth(); |
- ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 24); |
- |
- CTimer::rdtsc(m_ullLastAckTime); |
- } |
- else |
- { |
- ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 16); |
- } |
- |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck); |
- |
- ++ m_iSentACK; |
- ++ m_iSentACKTotal; |
- } |
- |
- break; |
- } |
- |
- case 6: //110 - Acknowledgement of Acknowledgement |
- ctrlpkt.pack(pkttype, lparam); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- break; |
- |
- case 3: //011 - Loss Report |
- { |
- if (NULL != rparam) |
- { |
- if (1 == size) |
- { |
- // only 1 loss packet |
- ctrlpkt.pack(pkttype, NULL, (int32_t *)rparam + 1, 4); |
- } |
- else |
- { |
- // more than 1 loss packets |
- ctrlpkt.pack(pkttype, NULL, rparam, 8); |
- } |
- |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- ++ m_iSentNAK; |
- ++ m_iSentNAKTotal; |
- } |
- else if (m_pRcvLossList->getLossLength() > 0) |
- { |
- // this is periodically NAK report; make sure NAK cannot be sent back too often |
- |
- // read loss list from the local receiver loss list |
- int32_t* data = new int32_t[m_iPayloadSize / 4]; |
- int losslen; |
- m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4); |
- |
- if (0 < losslen) |
- { |
- ctrlpkt.pack(pkttype, NULL, data, losslen * 4); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- ++ m_iSentNAK; |
- ++ m_iSentNAKTotal; |
- } |
- |
- delete [] data; |
- } |
- |
- // update next NAK time, which should wait enough time for the retansmission, but not too long |
- m_ullNAKInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency; |
- int rcv_speed = m_pRcvTimeWindow->getPktRcvSpeed(); |
- if (rcv_speed > 0) |
- m_ullNAKInt += (m_pRcvLossList->getLossLength() * 1000000ULL / rcv_speed) * m_ullCPUFrequency; |
- if (m_ullNAKInt < m_ullMinNakInt) |
- m_ullNAKInt = m_ullMinNakInt; |
- |
- break; |
- } |
- |
- case 4: //100 - Congestion Warning |
- ctrlpkt.pack(pkttype); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- CTimer::rdtsc(m_ullLastWarningTime); |
- |
- break; |
- |
- case 1: //001 - Keep-alive |
- ctrlpkt.pack(pkttype); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- break; |
- |
- case 0: //000 - Handshake |
- ctrlpkt.pack(pkttype, NULL, rparam, sizeof(CHandShake)); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- break; |
- |
- case 5: //101 - Shutdown |
- ctrlpkt.pack(pkttype); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- break; |
- |
- case 7: //111 - Msg drop request |
- ctrlpkt.pack(pkttype, lparam, rparam, 8); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- break; |
- |
- case 8: //1000 - acknowledge the peer side a special error |
- ctrlpkt.pack(pkttype, lparam); |
- ctrlpkt.m_iID = m_PeerID; |
- m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); |
- |
- break; |
- |
- case 32767: //0x7FFF - Resevered for future use |
- break; |
- |
- default: |
- break; |
- } |
-} |
- |
-void CUDT::processCtrl(CPacket& ctrlpkt) |
-{ |
- // Just heard from the peer, reset the expiration count. |
- m_iEXPCount = 1; |
- m_llLastRspTime = CTimer::getTime(); |
- |
- if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getType()) || (3 == ctrlpkt.getType())) |
- { |
- uint64_t currtime; |
- CTimer::rdtsc(currtime); |
- if (!m_pCC->m_bUserDefinedRTO) |
- m_ullNextEXPTime = currtime + m_ullEXPInt; |
- else |
- m_ullNextEXPTime = currtime + m_pCC->m_iRTO * m_ullCPUFrequency; |
- } |
- |
- switch (ctrlpkt.getType()) |
- { |
- case 2: //010 - Acknowledgement |
- { |
- int32_t ack; |
- |
- // process a lite ACK |
- if (4 == ctrlpkt.getLength()) |
- { |
- ack = *(int32_t *)ctrlpkt.m_pcData; |
- if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) |
- { |
- m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastAck), ack); |
- m_iSndLastAck = ack; |
- } |
- |
- break; |
- } |
- |
- // read ACK seq. no. |
- ack = ctrlpkt.getAckSeqNo(); |
- |
- // send ACK acknowledgement |
- // number of ACK2 can be much less than number of ACK |
- uint64_t currtime = CTimer::getTime(); |
- if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack == m_iSndLastAck2)) |
- { |
- sendCtrl(6, &ack); |
- m_iSndLastAck2 = ack; |
- m_ullSndLastAck2Time = currtime; |
- } |
- |
- // Got data ACK |
- ack = *(int32_t *)ctrlpkt.m_pcData; |
- |
- // check the validation of the ack |
- if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) |
- { |
- //this should not happen: attack or bug |
- m_bBroken = true; |
- m_iBrokenCounter = 0; |
- break; |
- } |
- |
- if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) |
- { |
- // Update Flow Window Size, must update before and together with m_iSndLastAck |
- m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3); |
- m_iSndLastAck = ack; |
- } |
- |
- // protect packet retransmission |
- CGuard::enterCS(m_AckLock); |
- |
- int offset = CSeqNo::seqoff((int32_t&)m_iSndLastDataAck, ack); |
- if (offset <= 0) |
- { |
- // discard it if it is a repeated ACK |
- CGuard::leaveCS(m_AckLock); |
- break; |
- } |
- |
- // acknowledge the sending buffer |
- m_pSndBuffer->ackData(offset); |
- |
- // record total time used for sending |
- m_llSndDuration += currtime - m_llSndDurationCounter; |
- m_llSndDurationTotal += currtime - m_llSndDurationCounter; |
- m_llSndDurationCounter = currtime; |
- |
- // update sending variables |
- m_iSndLastDataAck = ack; |
- m_pSndLossList->remove(CSeqNo::decseq((int32_t&)m_iSndLastDataAck)); |
- |
- CGuard::leaveCS(m_AckLock); |
- |
- #ifndef WIN32 |
- pthread_mutex_lock(&m_SendBlockLock); |
- if (m_bSynSending) |
- pthread_cond_signal(&m_SendBlockCond); |
- pthread_mutex_unlock(&m_SendBlockLock); |
- #else |
- if (m_bSynSending) |
- SetEvent(m_SendBlockCond); |
- #endif |
- |
- // acknowledde any waiting epolls to write |
- s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); |
- |
- // insert this socket to snd list if it is not on the list yet |
- m_pSndQueue->m_pSndUList->update(this, false); |
- |
- // Update RTT |
- //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1); |
- //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2); |
- int rtt = *((int32_t *)ctrlpkt.m_pcData + 1); |
- m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; |
- m_iRTT = (m_iRTT * 7 + rtt) >> 3; |
- |
- m_pCC->setRTT(m_iRTT); |
- |
- m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; |
- if (m_ullEXPInt < m_ullMinExpInt) |
- m_ullEXPInt = m_ullMinExpInt; |
- |
- if (ctrlpkt.getLength() > 16) |
- { |
- // Update Estimated Bandwidth and packet delivery rate |
- if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0) |
- m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcData + 4)) >> 3; |
- |
- if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0) |
- m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5)) >> 3; |
- |
- m_pCC->setRcvRate(m_iDeliveryRate); |
- m_pCC->setBandwidth(m_iBandwidth); |
- } |
- |
- m_pCC->onACK(ack); |
- // update CC parameters |
- m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); |
- m_dCongestionWindow = m_pCC->m_dCWndSize; |
- |
- ++ m_iRecvACK; |
- ++ m_iRecvACKTotal; |
- |
- break; |
- } |
- |
- case 6: //110 - Acknowledgement of Acknowledgement |
- { |
- int32_t ack; |
- int rtt = -1; |
- |
- // update RTT |
- rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack); |
- if (rtt <= 0) |
- break; |
- |
- //if increasing delay detected... |
- // sendCtrl(4); |
- |
- // RTT EWMA |
- m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; |
- m_iRTT = (m_iRTT * 7 + rtt) >> 3; |
- |
- m_pCC->setRTT(m_iRTT); |
- |
- m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; |
- if (m_ullEXPInt < m_ullMinExpInt) |
- m_ullEXPInt = m_ullMinExpInt; |
- |
- // update last ACK that has been received by the sender |
- if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0) |
- m_iRcvLastAckAck = ack; |
- |
- break; |
- } |
- |
- case 3: //011 - Loss Report |
- { |
- int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData); |
- |
- m_pCC->onLoss(losslist, ctrlpkt.getLength() / 4); |
- // update CC parameters |
- m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); |
- m_dCongestionWindow = m_pCC->m_dCWndSize; |
- |
- bool secure = true; |
- |
- // decode loss list message and insert loss into the sender loss list |
- for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i) |
- { |
- if (0 != (losslist[i] & 0x80000000)) |
- { |
- if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSndCurrSeqNo)) > 0)) |
- { |
- // seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seq |
- secure = false; |
- break; |
- } |
- |
- int num = 0; |
- if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, const_cast<int32_t&>(m_iSndLastAck)) >= 0) |
- num = m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]); |
- else if (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSndLastAck)) >= 0) |
- num = m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), losslist[i + 1]); |
- |
- m_iTraceSndLoss += num; |
- m_iSndLossTotal += num; |
- |
- ++ i; |
- } |
- else if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndLastAck)) >= 0) |
- { |
- if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndCurrSeqNo)) > 0) |
- { |
- //seq_a must not be greater than the most recent sent seq |
- secure = false; |
- break; |
- } |
- |
- int num = m_pSndLossList->insert(losslist[i], losslist[i]); |
- |
- m_iTraceSndLoss += num; |
- m_iSndLossTotal += num; |
- } |
- } |
- |
- if (!secure) |
- { |
- //this should not happen: attack or bug |
- m_bBroken = true; |
- m_iBrokenCounter = 0; |
- break; |
- } |
- |
- // the lost packet (retransmission) should be sent out immediately |
- m_pSndQueue->m_pSndUList->update(this); |
- |
- ++ m_iRecvNAK; |
- ++ m_iRecvNAKTotal; |
- |
- break; |
- } |
- |
- case 4: //100 - Delay Warning |
- // One way packet delay is increasing, so decrease the sending rate |
- m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125); |
- m_iLastDecSeq = m_iSndCurrSeqNo; |
- |
- break; |
- |
- case 1: //001 - Keep-alive |
- // The only purpose of keep-alive packet is to tell that the peer is still alive |
- // nothing needs to be done. |
- |
- break; |
- |
- case 0: //000 - Handshake |
- if ((((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType > 0) || (m_bRendezvous && (((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType != -2))) |
- { |
- // The peer side has not received the handshake message, so it keeps querying |
- // resend the handshake packet |
- |
- CHandShake initdata; |
- initdata.m_iISN = m_iISN; |
- initdata.m_iMSS = m_iMSS; |
- initdata.m_iFlightFlagSize = m_iFlightFlagSize; |
- initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2; |
- initdata.m_iID = m_SocketID; |
- |
- char buffer[1500]; |
- int size = 1500; |
- initdata.serialize(buffer, size); |
- sendCtrl(0, NULL, buffer, size); |
- } |
- |
- break; |
- |
- case 5: //101 - Shutdown |
- m_bShutdown = true; |
- m_bClosing = true; |
- m_bBroken = true; |
- m_iBrokenCounter = 60; |
- |
- // Signal the sender and recver if they are waiting for data. |
- releaseSynch(); |
- |
- CTimer::triggerEvent(); |
- |
- break; |
- |
- case 7: //111 - Msg drop request |
- m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq()); |
- m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_pcData + 4)); |
- |
- // move forward with current recv seq no. |
- if ((CSeqNo::seqcmp(*(int32_t*)ctrlpkt.m_pcData, CSeqNo::incseq(m_iRcvCurrSeqNo)) <= 0) |
- && (CSeqNo::seqcmp(*(int32_t*)(ctrlpkt.m_pcData + 4), m_iRcvCurrSeqNo) > 0)) |
- { |
- m_iRcvCurrSeqNo = *(int32_t*)(ctrlpkt.m_pcData + 4); |
- } |
- |
- break; |
- |
- case 8: // 1000 - An error has happened to the peer side |
- //int err_type = packet.getAddInfo(); |
- |
- // currently only this error is signalled from the peer side |
- // if recvfile() failes (e.g., due to disk fail), blcoked sendfile/send should return immediately |
- // giving the app a chance to fix the issue |
- |
- m_bPeerHealth = false; |
- |
- break; |
- |
- case 32767: //0x7FFF - reserved and user defined messages |
- m_pCC->processCustomMsg(&ctrlpkt); |
- // update CC parameters |
- m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); |
- m_dCongestionWindow = m_pCC->m_dCWndSize; |
- |
- break; |
- |
- default: |
- break; |
- } |
-} |
- |
-int CUDT::packData(CPacket& packet, uint64_t& ts) |
-{ |
- int payload = 0; |
- bool probe = false; |
- |
- uint64_t entertime; |
- CTimer::rdtsc(entertime); |
- |
- if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime)) |
- m_ullTimeDiff += entertime - m_ullTargetTime; |
- |
- // Loss retransmission always has higher priority. |
- if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) |
- { |
- // protect m_iSndLastDataAck from updating by ACK processing |
- CGuard ackguard(m_AckLock); |
- |
- int offset = CSeqNo::seqoff((int32_t&)m_iSndLastDataAck, packet.m_iSeqNo); |
- if (offset < 0) |
- return 0; |
- |
- int msglen; |
- |
- payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen); |
- |
- if (-1 == payload) |
- { |
- int32_t seqpair[2]; |
- seqpair[0] = packet.m_iSeqNo; |
- seqpair[1] = CSeqNo::incseq(seqpair[0], msglen); |
- sendCtrl(7, &packet.m_iMsgNo, seqpair, 8); |
- |
- // only one msg drop request is necessary |
- m_pSndLossList->remove(seqpair[1]); |
- |
- // skip all dropped packets |
- if (CSeqNo::seqcmp(const_cast<int32_t&>(m_iSndCurrSeqNo), CSeqNo::incseq(seqpair[1])) < 0) |
- m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]); |
- |
- return 0; |
- } |
- else if (0 == payload) |
- return 0; |
- |
- ++ m_iTraceRetrans; |
- ++ m_iRetransTotal; |
- } |
- else |
- { |
- // If no loss, pack a new packet. |
- |
- // check congestion/flow window limit |
- int cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowSize : (int)m_dCongestionWindow; |
- if (cwnd >= CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSeqNo::incseq(m_iSndCurrSeqNo))) |
- { |
- if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo))) |
- { |
- m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); |
- m_pCC->setSndCurrSeqNo((int32_t&)m_iSndCurrSeqNo); |
- |
- packet.m_iSeqNo = m_iSndCurrSeqNo; |
- |
- // every 16 (0xF) packets, a packet pair is sent |
- if (0 == (packet.m_iSeqNo & 0xF)) |
- probe = true; |
- } |
- else |
- { |
- m_ullTargetTime = 0; |
- m_ullTimeDiff = 0; |
- ts = 0; |
- return 0; |
- } |
- } |
- else |
- { |
- m_ullTargetTime = 0; |
- m_ullTimeDiff = 0; |
- ts = 0; |
- return 0; |
- } |
- } |
- |
- packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime); |
- packet.m_iID = m_PeerID; |
- packet.setLength(payload); |
- |
- m_pCC->onPktSent(&packet); |
- //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp); |
- |
- ++ m_llTraceSent; |
- ++ m_llSentTotal; |
- |
- if (probe) |
- { |
- // sends out probing packet pair |
- ts = entertime; |
- probe = false; |
- } |
- else |
- { |
- #ifndef NO_BUSY_WAITING |
- ts = entertime + m_ullInterval; |
- #else |
- if (m_ullTimeDiff >= m_ullInterval) |
- { |
- ts = entertime; |
- m_ullTimeDiff -= m_ullInterval; |
- } |
- else |
- { |
- ts = entertime + m_ullInterval - m_ullTimeDiff; |
- m_ullTimeDiff = 0; |
- } |
- #endif |
- } |
- |
- m_ullTargetTime = ts; |
- |
- return payload; |
-} |
- |
-int CUDT::processData(CUnit* unit) |
-{ |
- CPacket& packet = unit->m_Packet; |
- |
- // Just heard from the peer, reset the expiration count. |
- m_iEXPCount = 1; |
- m_llLastRspTime = CTimer::getTime(); |
- |
- if (CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) |
- { |
- uint64_t currtime; |
- CTimer::rdtsc(currtime); |
- if (!m_pCC->m_bUserDefinedRTO) |
- m_ullNextEXPTime = currtime + m_ullEXPInt; |
- else |
- m_ullNextEXPTime = currtime + m_pCC->m_iRTO * m_ullCPUFrequency; |
- } |
- |
- m_pCC->onPktReceived(&packet); |
- |
- ++ m_iPktCount; |
- |
- // update time information |
- m_pRcvTimeWindow->onPktArrival(); |
- |
- // check if it is probing packet pair |
- if (0 == (packet.m_iSeqNo & 0xF)) |
- m_pRcvTimeWindow->probe1Arrival(); |
- else if (1 == (packet.m_iSeqNo & 0xF)) |
- m_pRcvTimeWindow->probe2Arrival(); |
- |
- ++ m_llTraceRecv; |
- ++ m_llRecvTotal; |
- |
- int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo); |
- if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize())) |
- return -1; |
- |
- if (m_pRcvBuffer->addData(unit, offset) < 0) |
- return -1; |
- |
- // Loss detection. |
- if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) |
- { |
- // If loss found, insert them to the receiver loss list |
- m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo)); |
- |
- // pack loss list for NAK |
- int32_t lossdata[2]; |
- lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000; |
- lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo); |
- |
- // Generate loss report immediately. |
- sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2); |
- |
- int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2; |
- m_iTraceRcvLoss += loss; |
- m_iRcvLossTotal += loss; |
- } |
- |
- // This is not a regular fixed size packet... |
- //an irregular sized packet usually indicates the end of a message, so send an ACK immediately |
- if (packet.getLength() != m_iPayloadSize) |
- CTimer::rdtsc(m_ullNextACKTime); |
- |
- // Update the current largest sequence number that has been received. |
- // Or it is a retransmitted packet, remove it from receiver loss list. |
- if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0) |
- m_iRcvCurrSeqNo = packet.m_iSeqNo; |
- else |
- m_pRcvLossList->remove(packet.m_iSeqNo); |
- |
- return 0; |
-} |
- |
-int CUDT::listen(sockaddr* addr, CPacket& packet) |
-{ |
- if (m_bClosing) |
- return 1002; |
- |
- if (packet.getLength() != CHandShake::m_iContentSize) |
- return 1004; |
- |
- CHandShake hs; |
- hs.deserialize(packet.m_pcData, packet.getLength()); |
- |
- // SYN cookie |
- char clienthost[NI_MAXHOST]; |
- char clientport[NI_MAXSERV]; |
- getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV); |
- int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minute |
- char cookiestr[1024]; |
- sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int)timestamp); |
- unsigned char cookie[16]; |
- CMD5::compute(cookiestr, cookie); |
- |
- if (1 == hs.m_iReqType) |
- { |
- hs.m_iCookie = *(int*)cookie; |
- packet.m_iID = hs.m_iID; |
- int size = packet.getLength(); |
- hs.serialize(packet.m_pcData, size); |
- m_pSndQueue->sendto(addr, packet); |
- return 0; |
- } |
- else |
- { |
- if (hs.m_iCookie != *(int*)cookie) |
- { |
- timestamp --; |
- sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int)timestamp); |
- CMD5::compute(cookiestr, cookie); |
- |
- if (hs.m_iCookie != *(int*)cookie) |
- return -1; |
- } |
- } |
- |
- int32_t id = hs.m_iID; |
- |
- // When a peer side connects in... |
- if ((1 == packet.getFlag()) && (0 == packet.getType())) |
- { |
- if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType)) |
- { |
- // mismatch, reject the request |
- hs.m_iReqType = 1002; |
- int size = CHandShake::m_iContentSize; |
- hs.serialize(packet.m_pcData, size); |
- packet.m_iID = id; |
- m_pSndQueue->sendto(addr, packet); |
- } |
- else |
- { |
- int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs); |
- if (result == -1) |
- hs.m_iReqType = 1002; |
- |
- // send back a response if connection failed or connection already existed |
- // new connection response should be sent in connect() |
- if (result != 1) |
- { |
- int size = CHandShake::m_iContentSize; |
- hs.serialize(packet.m_pcData, size); |
- packet.m_iID = id; |
- m_pSndQueue->sendto(addr, packet); |
- } |
- else |
- { |
- // a mew connection has been created, enable epoll for read |
- s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); |
- } |
- } |
- } |
- |
- return hs.m_iReqType; |
-} |
- |
-void CUDT::checkTimers() |
-{ |
- // update CC parameters |
- m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); |
- m_dCongestionWindow = m_pCC->m_dCWndSize; |
- //uint64_t minint = (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPktSndInt() * 0.9); |
- //if (m_ullInterval < minint) |
- // m_ullInterval = minint; |
- |
- uint64_t currtime; |
- CTimer::rdtsc(currtime); |
- |
- if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount))) |
- { |
- // ACK timer expired or ACK interval is reached |
- |
- sendCtrl(2); |
- CTimer::rdtsc(currtime); |
- if (m_pCC->m_iACKPeriod > 0) |
- m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency; |
- else |
- m_ullNextACKTime = currtime + m_ullACKInt; |
- |
- m_iPktCount = 0; |
- m_iLightACKCount = 1; |
- } |
- else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount) |
- { |
- //send a "light" ACK |
- sendCtrl(2, NULL, NULL, 4); |
- ++ m_iLightACKCount; |
- } |
- |
- // we are not sending back repeated NAK anymore and rely on the sender's EXP for retransmission |
- //if ((m_pRcvLossList->getLossLength() > 0) && (currtime > m_ullNextNAKTime)) |
- //{ |
- // // NAK timer expired, and there is loss to be reported. |
- // sendCtrl(3); |
- // |
- // CTimer::rdtsc(currtime); |
- // m_ullNextNAKTime = currtime + m_ullNAKInt; |
- //} |
- |
- if (currtime > m_ullNextEXPTime) |
- { |
- // Haven't receive any information from the peer, is it dead?! |
- // timeout: at least 16 expirations and must be greater than 10 seconds |
- if ((m_iEXPCount > 16) && (CTimer::getTime() - m_llLastRspTime > 10000000)) |
- { |
- // |
- // Connection is broken. |
- // UDT does not signal any information about this instead of to stop quietly. |
- // Apllication will detect this when it calls any UDT methods next time. |
- // |
- m_bClosing = true; |
- m_bBroken = true; |
- m_iBrokenCounter = 30; |
- |
- // update snd U list to remove this socket |
- m_pSndQueue->m_pSndUList->update(this); |
- |
- releaseSynch(); |
- |
- // a broken socket can be "write" to learn the error |
- s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); |
- |
- CTimer::triggerEvent(); |
- |
- return; |
- } |
- |
- // sender: Insert all the packets sent after last received acknowledgement into the sender loss list. |
- // recver: Send out a keep-alive packet |
- if (m_pSndBuffer->getCurrBufSize() > 0) |
- { |
- if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossList->getLossLength() == 0)) |
- { |
- // resend all unacknowledged packets on timeout, but only if there is no packet in the loss list |
- int32_t csn = m_iSndCurrSeqNo; |
- int num = m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), csn); |
- m_iTraceSndLoss += num; |
- m_iSndLossTotal += num; |
- } |
- |
- m_pCC->onTimeout(); |
- // update CC parameters |
- m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); |
- m_dCongestionWindow = m_pCC->m_dCWndSize; |
- |
- // immediately restart transmission |
- m_pSndQueue->m_pSndUList->update(this); |
- } |
- else |
- { |
- sendCtrl(1); |
- } |
- |
- ++ m_iEXPCount; |
- m_ullEXPInt = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency; |
- if (m_ullEXPInt < m_iEXPCount * m_ullMinExpInt) |
- m_ullEXPInt = m_iEXPCount * m_ullMinExpInt; |
- CTimer::rdtsc(m_ullNextEXPTime); |
- m_ullNextEXPTime += m_ullEXPInt; |
- } |
-} |
- |
-void CUDT::addEPoll(const int eid) |
-{ |
- CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock); |
- m_sPollID.insert(eid); |
- CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); |
- |
- if (!m_bConnected || m_bBroken || m_bClosing) |
- return; |
- |
- if ((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) |
- s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); |
- else if ((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)) |
- s_UDTUnited.m_EPoll.enable_read(m_SocketID, m_sPollID); |
- |
- if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) |
- s_UDTUnited.m_EPoll.enable_write(m_SocketID, m_sPollID); |
-} |
- |
-void CUDT::removeEPoll(const int eid) |
-{ |
- s_UDTUnited.m_EPoll.disable_read(m_SocketID, m_sPollID); |
- s_UDTUnited.m_EPoll.disable_write(m_SocketID, m_sPollID); |
- |
- CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock); |
- m_sPollID.erase(eid); |
- CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock); |
-} |