Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(115)

Side by Side Diff: net/third_party/udt/src/buffer.cpp

Issue 6708091: Remove UDT. (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/third_party/udt/src/buffer.h ('k') | net/third_party/udt/src/cache.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*****************************************************************************
2 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are
7 met:
8
9 * Redistributions of source code must retain the above
10 copyright notice, this list of conditions and the
11 following disclaimer.
12
13 * Redistributions in binary form must reproduce the
14 above copyright notice, this list of conditions
15 and the following disclaimer in the documentation
16 and/or other materials provided with the distribution.
17
18 * Neither the name of the University of Illinois
19 nor the names of its contributors may be used to
20 endorse or promote products derived from this
21 software without specific prior written permission.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
25 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *****************************************************************************/
35
36 /*****************************************************************************
37 written by
38 Yunhong Gu, last updated 02/03/2011
39 *****************************************************************************/
40
41 #include <cstring>
42 #include <cmath>
43 #include "buffer.h"
44
45 using namespace std;
46
47 CSndBuffer::CSndBuffer(const int& size, const int& mss):
48 m_BufLock(),
49 m_pBlock(NULL),
50 m_pFirstBlock(NULL),
51 m_pCurrBlock(NULL),
52 m_pLastBlock(NULL),
53 m_pBuffer(NULL),
54 m_iNextMsgNo(1),
55 m_iSize(size),
56 m_iMSS(mss),
57 m_iCount(0)
58 {
59 // initial physical buffer of "size"
60 m_pBuffer = new Buffer;
61 m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];
62 m_pBuffer->m_iSize = m_iSize;
63 m_pBuffer->m_pNext = NULL;
64
65 // circular linked list for out bound packets
66 m_pBlock = new Block;
67 Block* pb = m_pBlock;
68 for (int i = 1; i < m_iSize; ++ i)
69 {
70 pb->m_pNext = new Block;
71 pb->m_iMsgNo = 0;
72 pb = pb->m_pNext;
73 }
74 pb->m_pNext = m_pBlock;
75
76 pb = m_pBlock;
77 char* pc = m_pBuffer->m_pcData;
78 for (int i = 0; i < m_iSize; ++ i)
79 {
80 pb->m_pcData = pc;
81 pb = pb->m_pNext;
82 pc += m_iMSS;
83 }
84
85 m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
86
87 #ifndef WIN32
88 pthread_mutex_init(&m_BufLock, NULL);
89 #else
90 m_BufLock = CreateMutex(NULL, false, NULL);
91 #endif
92 }
93
94 CSndBuffer::~CSndBuffer()
95 {
96 Block* pb = m_pBlock->m_pNext;
97 while (pb != m_pBlock)
98 {
99 Block* temp = pb;
100 pb = pb->m_pNext;
101 delete temp;
102 }
103 delete m_pBlock;
104
105 while (m_pBuffer != NULL)
106 {
107 Buffer* temp = m_pBuffer;
108 m_pBuffer = m_pBuffer->m_pNext;
109 delete [] temp->m_pcData;
110 delete temp;
111 }
112
113 #ifndef WIN32
114 pthread_mutex_destroy(&m_BufLock);
115 #else
116 CloseHandle(m_BufLock);
117 #endif
118 }
119
120 void CSndBuffer::addBuffer(const char* data, const int& len, const int& ttl, con st bool& order)
121 {
122 int size = len / m_iMSS;
123 if ((len % m_iMSS) != 0)
124 size ++;
125
126 // dynamically increase sender buffer
127 while (size + m_iCount >= m_iSize)
128 increase();
129
130 uint64_t time = CTimer::getTime();
131 int32_t inorder = order;
132 inorder <<= 29;
133
134 Block* s = m_pLastBlock;
135 for (int i = 0; i < size; ++ i)
136 {
137 int pktlen = len - i * m_iMSS;
138 if (pktlen > m_iMSS)
139 pktlen = m_iMSS;
140
141 memcpy(s->m_pcData, data + i * m_iMSS, pktlen);
142 s->m_iLength = pktlen;
143
144 s->m_iMsgNo = m_iNextMsgNo | inorder;
145 if (i == 0)
146 s->m_iMsgNo |= 0x80000000;
147 if (i == size - 1)
148 s->m_iMsgNo |= 0x40000000;
149
150 s->m_OriginTime = time;
151 s->m_iTTL = ttl;
152
153 s = s->m_pNext;
154 }
155 m_pLastBlock = s;
156
157 CGuard::enterCS(m_BufLock);
158 m_iCount += size;
159 CGuard::leaveCS(m_BufLock);
160
161 m_iNextMsgNo ++;
162 if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo)
163 m_iNextMsgNo = 1;
164 }
165
166 int CSndBuffer::addBufferFromFile(fstream& ifs, const int& len)
167 {
168 int size = len / m_iMSS;
169 if ((len % m_iMSS) != 0)
170 size ++;
171
172 // dynamically increase sender buffer
173 while (size + m_iCount >= m_iSize)
174 increase();
175
176 Block* s = m_pLastBlock;
177 int total = 0;
178 for (int i = 0; i < size; ++ i)
179 {
180 if (ifs.bad() || ifs.fail() || ifs.eof())
181 break;
182
183 int pktlen = len - i * m_iMSS;
184 if (pktlen > m_iMSS)
185 pktlen = m_iMSS;
186
187 ifs.read(s->m_pcData, pktlen);
188 if ((pktlen = ifs.gcount()) <= 0)
189 break;
190
191 // currently file transfer is only available in streaming mode, message is always in order, ttl = infinite
192 s->m_iMsgNo = m_iNextMsgNo | 0x20000000;
193 if (i == 0)
194 s->m_iMsgNo |= 0x80000000;
195 if (i == size - 1)
196 s->m_iMsgNo |= 0x40000000;
197
198 s->m_iLength = pktlen;
199 s->m_iTTL = -1;
200 s = s->m_pNext;
201
202 total += pktlen;
203 }
204 m_pLastBlock = s;
205
206 CGuard::enterCS(m_BufLock);
207 m_iCount += size;
208 CGuard::leaveCS(m_BufLock);
209
210 m_iNextMsgNo ++;
211 if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo)
212 m_iNextMsgNo = 1;
213
214 return total;
215 }
216
217 int CSndBuffer::readData(char** data, int32_t& msgno)
218 {
219 // No data to read
220 if (m_pCurrBlock == m_pLastBlock)
221 return 0;
222
223 *data = m_pCurrBlock->m_pcData;
224 int readlen = m_pCurrBlock->m_iLength;
225 msgno = m_pCurrBlock->m_iMsgNo;
226
227 m_pCurrBlock = m_pCurrBlock->m_pNext;
228
229 return readlen;
230 }
231
232 int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msg len)
233 {
234 CGuard bufferguard(m_BufLock);
235
236 Block* p = m_pFirstBlock;
237
238 for (int i = 0; i < offset; ++ i)
239 p = p->m_pNext;
240
241 if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint 64_t)p->m_iTTL))
242 {
243 msgno = p->m_iMsgNo & 0x1FFFFFFF;
244
245 msglen = 1;
246 p = p->m_pNext;
247 bool move = false;
248 while (msgno == (p->m_iMsgNo & 0x1FFFFFFF))
249 {
250 if (p == m_pCurrBlock)
251 move = true;
252 p = p->m_pNext;
253 if (move)
254 m_pCurrBlock = p;
255 msglen ++;
256 }
257
258 return -1;
259 }
260
261 *data = p->m_pcData;
262 int readlen = p->m_iLength;
263 msgno = p->m_iMsgNo;
264
265 return readlen;
266 }
267
268 void CSndBuffer::ackData(const int& offset)
269 {
270 CGuard bufferguard(m_BufLock);
271
272 for (int i = 0; i < offset; ++ i)
273 m_pFirstBlock = m_pFirstBlock->m_pNext;
274
275 m_iCount -= offset;
276
277 CTimer::triggerEvent();
278 }
279
280 int CSndBuffer::getCurrBufSize() const
281 {
282 return m_iCount;
283 }
284
285 void CSndBuffer::increase()
286 {
287 int unitsize = m_pBuffer->m_iSize;
288
289 // new physical buffer
290 Buffer* nbuf = NULL;
291 try
292 {
293 nbuf = new Buffer;
294 nbuf->m_pcData = new char [unitsize * m_iMSS];
295 }
296 catch (...)
297 {
298 delete nbuf;
299 throw CUDTException(3, 2, 0);
300 }
301 nbuf->m_iSize = unitsize;
302 nbuf->m_pNext = NULL;
303
304 // insert the buffer at the end of the buffer list
305 Buffer* p = m_pBuffer;
306 while (NULL != p->m_pNext)
307 p = p->m_pNext;
308 p->m_pNext = nbuf;
309
310 // new packet blocks
311 Block* nblk = NULL;
312 try
313 {
314 nblk = new Block;
315 }
316 catch (...)
317 {
318 delete nblk;
319 throw CUDTException(3, 2, 0);
320 }
321 Block* pb = nblk;
322 for (int i = 1; i < unitsize; ++ i)
323 {
324 pb->m_pNext = new Block;
325 pb = pb->m_pNext;
326 }
327
328 // insert the new blocks onto the existing one
329 pb->m_pNext = m_pLastBlock->m_pNext;
330 m_pLastBlock->m_pNext = nblk;
331
332 pb = nblk;
333 char* pc = nbuf->m_pcData;
334 for (int i = 0; i < unitsize; ++ i)
335 {
336 pb->m_pcData = pc;
337 pb = pb->m_pNext;
338 pc += m_iMSS;
339 }
340
341 m_iSize += unitsize;
342 }
343
344 ////////////////////////////////////////////////////////////////////////////////
345
346 CRcvBuffer::CRcvBuffer(CUnitQueue* queue, const int& bufsize):
347 m_pUnit(NULL),
348 m_iSize(bufsize),
349 m_pUnitQueue(queue),
350 m_iStartPos(0),
351 m_iLastAckPos(0),
352 m_iMaxPos(-1),
353 m_iNotch(0)
354 {
355 m_pUnit = new CUnit* [m_iSize];
356 for (int i = 0; i < m_iSize; ++ i)
357 m_pUnit[i] = NULL;
358 }
359
360 CRcvBuffer::~CRcvBuffer()
361 {
362 for (int i = 0; i < m_iSize; ++ i)
363 {
364 if (NULL != m_pUnit[i])
365 {
366 m_pUnit[i]->m_iFlag = 0;
367 -- m_pUnitQueue->m_iCount;
368 }
369 }
370
371 delete [] m_pUnit;
372 }
373
374 int CRcvBuffer::addData(CUnit* unit, int offset)
375 {
376 int pos = (m_iLastAckPos + offset) % m_iSize;
377 if (offset > m_iMaxPos)
378 m_iMaxPos = offset;
379
380 if (NULL != m_pUnit[pos])
381 return -1;
382
383 m_pUnit[pos] = unit;
384
385 unit->m_iFlag = 1;
386 ++ m_pUnitQueue->m_iCount;
387
388 return 0;
389 }
390
391 int CRcvBuffer::readBuffer(char* data, const int& len)
392 {
393 int p = m_iStartPos;
394 int lastack = m_iLastAckPos;
395 int rs = len;
396
397 while ((p != lastack) && (rs > 0))
398 {
399 int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
400 if (unitsize > rs)
401 unitsize = rs;
402
403 memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
404 data += unitsize;
405
406 if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch) )
407 {
408 CUnit* tmp = m_pUnit[p];
409 m_pUnit[p] = NULL;
410 tmp->m_iFlag = 0;
411 -- m_pUnitQueue->m_iCount;
412
413 if (++ p == m_iSize)
414 p = 0;
415
416 m_iNotch = 0;
417 }
418 else
419 m_iNotch += rs;
420
421 rs -= unitsize;
422 }
423
424 m_iStartPos = p;
425 return len - rs;
426 }
427
428 int CRcvBuffer::readBufferToFile(fstream& ofs, const int& len)
429 {
430 int p = m_iStartPos;
431 int lastack = m_iLastAckPos;
432 int rs = len;
433
434 while ((p != lastack) && (rs > 0))
435 {
436 int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
437 if (unitsize > rs)
438 unitsize = rs;
439
440 ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
441 if (ofs.fail())
442 break;
443
444 if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch) )
445 {
446 CUnit* tmp = m_pUnit[p];
447 m_pUnit[p] = NULL;
448 tmp->m_iFlag = 0;
449 -- m_pUnitQueue->m_iCount;
450
451 if (++ p == m_iSize)
452 p = 0;
453
454 m_iNotch = 0;
455 }
456 else
457 m_iNotch += rs;
458
459 rs -= unitsize;
460 }
461
462 m_iStartPos = p;
463
464 return len - rs;
465 }
466
467 void CRcvBuffer::ackData(const int& len)
468 {
469 m_iLastAckPos = (m_iLastAckPos + len) % m_iSize;
470 m_iMaxPos -= len;
471 if (m_iMaxPos < 0)
472 m_iMaxPos = 0;
473
474 CTimer::triggerEvent();
475 }
476
477 int CRcvBuffer::getAvailBufSize() const
478 {
479 // One slot must be empty in order to tell the difference between "empty buff er" and "full buffer"
480 return m_iSize - getRcvDataSize() - 1;
481 }
482
483 int CRcvBuffer::getRcvDataSize() const
484 {
485 if (m_iLastAckPos >= m_iStartPos)
486 return m_iLastAckPos - m_iStartPos;
487
488 return m_iSize + m_iLastAckPos - m_iStartPos;
489 }
490
491 void CRcvBuffer::dropMsg(const int32_t& msgno)
492 {
493 for (int i = m_iStartPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize)
494 if ((NULL != m_pUnit[i]) && (msgno == m_pUnit[i]->m_Packet.m_iMsgNo))
495 m_pUnit[i]->m_iFlag = 3;
496 }
497
498 int CRcvBuffer::readMsg(char* data, const int& len)
499 {
500 int p, q;
501 bool passack;
502 if (!scanMsg(p, q, passack))
503 return 0;
504
505 int rs = len;
506 while (p != (q + 1) % m_iSize)
507 {
508 int unitsize = m_pUnit[p]->m_Packet.getLength();
509 if ((rs >= 0) && (unitsize > rs))
510 unitsize = rs;
511
512 if (unitsize > 0)
513 {
514 memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
515 data += unitsize;
516 rs -= unitsize;
517 }
518
519 if (!passack)
520 {
521 CUnit* tmp = m_pUnit[p];
522 m_pUnit[p] = NULL;
523 tmp->m_iFlag = 0;
524 -- m_pUnitQueue->m_iCount;
525 }
526 else
527 m_pUnit[p]->m_iFlag = 2;
528
529 if (++ p == m_iSize)
530 p = 0;
531 }
532
533 if (!passack)
534 m_iStartPos = (q + 1) % m_iSize;
535
536 return len - rs;
537 }
538
539 int CRcvBuffer::getRcvMsgNum()
540 {
541 int p, q;
542 bool passack;
543 return scanMsg(p, q, passack) ? 1 : 0;
544 }
545
546 bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack)
547 {
548 // empty buffer
549 if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
550 return false;
551
552 //skip all bad msgs at the beginning
553 while (m_iStartPos != m_iLastAckPos)
554 {
555 if (NULL == m_pUnit[m_iStartPos])
556 {
557 if (++ m_iStartPos == m_iSize)
558 m_iStartPos = 0;
559 continue;
560 }
561
562 if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packe t.getMsgBoundary() > 1))
563 {
564 bool good = true;
565
566 // look ahead for the whole message
567 for (int i = m_iStartPos; i != m_iLastAckPos;)
568 {
569 if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag))
570 {
571 good = false;
572 break;
573 }
574
575 if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_P acket.getMsgBoundary() == 3))
576 break;
577
578 if (++ i == m_iSize)
579 i = 0;
580 }
581
582 if (good)
583 break;
584 }
585
586 CUnit* tmp = m_pUnit[m_iStartPos];
587 m_pUnit[m_iStartPos] = NULL;
588 tmp->m_iFlag = 0;
589 -- m_pUnitQueue->m_iCount;
590
591 if (++ m_iStartPos == m_iSize)
592 m_iStartPos = 0;
593 }
594
595 p = -1; // message head
596 q = m_iStartPos; // message tail
597 passack = m_iStartPos == m_iLastAckPos;
598 bool found = false;
599
600 // looking for the first message
601 for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++ i)
602 {
603 if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag))
604 {
605 switch (m_pUnit[q]->m_Packet.getMsgBoundary())
606 {
607 case 3: // 11
608 p = q;
609 found = true;
610 break;
611
612 case 2: // 10
613 p = q;
614 break;
615
616 case 1: // 01
617 if (p != -1)
618 found = true;
619 }
620 }
621 else
622 {
623 // a hole in this message, not valid, restart search
624 p = -1;
625 }
626
627 if (found)
628 {
629 // the msg has to be ack'ed or it is allowed to read out of order, and was not read before
630 if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
631 break;
632
633 found = false;
634 }
635
636 if (++ q == m_iSize)
637 q = 0;
638
639 if (q == m_iLastAckPos)
640 passack = true;
641 }
642
643 // no msg found
644 if (!found)
645 {
646 // if the message is larger than the receiver buffer, return part of the m essage
647 if ((p != -1) && ((q + 1) % m_iSize == p))
648 found = true;
649 }
650
651 return found;
652 }
OLDNEW
« no previous file with comments | « net/third_party/udt/src/buffer.h ('k') | net/third_party/udt/src/cache.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698