OLD | NEW |
| (Empty) |
1 /***************************************************************************** | |
2 Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois. | |
3 All rights reserved. | |
4 | |
5 Redistribution and use in source and binary forms, with or without | |
6 modification, are permitted provided that the following conditions are | |
7 met: | |
8 | |
9 * Redistributions of source code must retain the above | |
10 copyright notice, this list of conditions and the | |
11 following disclaimer. | |
12 | |
13 * Redistributions in binary form must reproduce the | |
14 above copyright notice, this list of conditions | |
15 and the following disclaimer in the documentation | |
16 and/or other materials provided with the distribution. | |
17 | |
18 * Neither the name of the University of Illinois | |
19 nor the names of its contributors may be used to | |
20 endorse or promote products derived from this | |
21 software without specific prior written permission. | |
22 | |
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS | |
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, | |
25 THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR | |
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR | |
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, | |
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, | |
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | |
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | |
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | |
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
34 *****************************************************************************/ | |
35 | |
36 /***************************************************************************** | |
37 written by | |
38 Yunhong Gu, last updated 02/03/2011 | |
39 *****************************************************************************/ | |
40 | |
41 #include <cstring> | |
42 #include <cmath> | |
43 #include "buffer.h" | |
44 | |
45 using namespace std; | |
46 | |
47 CSndBuffer::CSndBuffer(const int& size, const int& mss): | |
48 m_BufLock(), | |
49 m_pBlock(NULL), | |
50 m_pFirstBlock(NULL), | |
51 m_pCurrBlock(NULL), | |
52 m_pLastBlock(NULL), | |
53 m_pBuffer(NULL), | |
54 m_iNextMsgNo(1), | |
55 m_iSize(size), | |
56 m_iMSS(mss), | |
57 m_iCount(0) | |
58 { | |
59 // initial physical buffer of "size" | |
60 m_pBuffer = new Buffer; | |
61 m_pBuffer->m_pcData = new char [m_iSize * m_iMSS]; | |
62 m_pBuffer->m_iSize = m_iSize; | |
63 m_pBuffer->m_pNext = NULL; | |
64 | |
65 // circular linked list for out bound packets | |
66 m_pBlock = new Block; | |
67 Block* pb = m_pBlock; | |
68 for (int i = 1; i < m_iSize; ++ i) | |
69 { | |
70 pb->m_pNext = new Block; | |
71 pb->m_iMsgNo = 0; | |
72 pb = pb->m_pNext; | |
73 } | |
74 pb->m_pNext = m_pBlock; | |
75 | |
76 pb = m_pBlock; | |
77 char* pc = m_pBuffer->m_pcData; | |
78 for (int i = 0; i < m_iSize; ++ i) | |
79 { | |
80 pb->m_pcData = pc; | |
81 pb = pb->m_pNext; | |
82 pc += m_iMSS; | |
83 } | |
84 | |
85 m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock; | |
86 | |
87 #ifndef WIN32 | |
88 pthread_mutex_init(&m_BufLock, NULL); | |
89 #else | |
90 m_BufLock = CreateMutex(NULL, false, NULL); | |
91 #endif | |
92 } | |
93 | |
94 CSndBuffer::~CSndBuffer() | |
95 { | |
96 Block* pb = m_pBlock->m_pNext; | |
97 while (pb != m_pBlock) | |
98 { | |
99 Block* temp = pb; | |
100 pb = pb->m_pNext; | |
101 delete temp; | |
102 } | |
103 delete m_pBlock; | |
104 | |
105 while (m_pBuffer != NULL) | |
106 { | |
107 Buffer* temp = m_pBuffer; | |
108 m_pBuffer = m_pBuffer->m_pNext; | |
109 delete [] temp->m_pcData; | |
110 delete temp; | |
111 } | |
112 | |
113 #ifndef WIN32 | |
114 pthread_mutex_destroy(&m_BufLock); | |
115 #else | |
116 CloseHandle(m_BufLock); | |
117 #endif | |
118 } | |
119 | |
120 void CSndBuffer::addBuffer(const char* data, const int& len, const int& ttl, con
st bool& order) | |
121 { | |
122 int size = len / m_iMSS; | |
123 if ((len % m_iMSS) != 0) | |
124 size ++; | |
125 | |
126 // dynamically increase sender buffer | |
127 while (size + m_iCount >= m_iSize) | |
128 increase(); | |
129 | |
130 uint64_t time = CTimer::getTime(); | |
131 int32_t inorder = order; | |
132 inorder <<= 29; | |
133 | |
134 Block* s = m_pLastBlock; | |
135 for (int i = 0; i < size; ++ i) | |
136 { | |
137 int pktlen = len - i * m_iMSS; | |
138 if (pktlen > m_iMSS) | |
139 pktlen = m_iMSS; | |
140 | |
141 memcpy(s->m_pcData, data + i * m_iMSS, pktlen); | |
142 s->m_iLength = pktlen; | |
143 | |
144 s->m_iMsgNo = m_iNextMsgNo | inorder; | |
145 if (i == 0) | |
146 s->m_iMsgNo |= 0x80000000; | |
147 if (i == size - 1) | |
148 s->m_iMsgNo |= 0x40000000; | |
149 | |
150 s->m_OriginTime = time; | |
151 s->m_iTTL = ttl; | |
152 | |
153 s = s->m_pNext; | |
154 } | |
155 m_pLastBlock = s; | |
156 | |
157 CGuard::enterCS(m_BufLock); | |
158 m_iCount += size; | |
159 CGuard::leaveCS(m_BufLock); | |
160 | |
161 m_iNextMsgNo ++; | |
162 if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) | |
163 m_iNextMsgNo = 1; | |
164 } | |
165 | |
166 int CSndBuffer::addBufferFromFile(fstream& ifs, const int& len) | |
167 { | |
168 int size = len / m_iMSS; | |
169 if ((len % m_iMSS) != 0) | |
170 size ++; | |
171 | |
172 // dynamically increase sender buffer | |
173 while (size + m_iCount >= m_iSize) | |
174 increase(); | |
175 | |
176 Block* s = m_pLastBlock; | |
177 int total = 0; | |
178 for (int i = 0; i < size; ++ i) | |
179 { | |
180 if (ifs.bad() || ifs.fail() || ifs.eof()) | |
181 break; | |
182 | |
183 int pktlen = len - i * m_iMSS; | |
184 if (pktlen > m_iMSS) | |
185 pktlen = m_iMSS; | |
186 | |
187 ifs.read(s->m_pcData, pktlen); | |
188 if ((pktlen = ifs.gcount()) <= 0) | |
189 break; | |
190 | |
191 // currently file transfer is only available in streaming mode, message is
always in order, ttl = infinite | |
192 s->m_iMsgNo = m_iNextMsgNo | 0x20000000; | |
193 if (i == 0) | |
194 s->m_iMsgNo |= 0x80000000; | |
195 if (i == size - 1) | |
196 s->m_iMsgNo |= 0x40000000; | |
197 | |
198 s->m_iLength = pktlen; | |
199 s->m_iTTL = -1; | |
200 s = s->m_pNext; | |
201 | |
202 total += pktlen; | |
203 } | |
204 m_pLastBlock = s; | |
205 | |
206 CGuard::enterCS(m_BufLock); | |
207 m_iCount += size; | |
208 CGuard::leaveCS(m_BufLock); | |
209 | |
210 m_iNextMsgNo ++; | |
211 if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) | |
212 m_iNextMsgNo = 1; | |
213 | |
214 return total; | |
215 } | |
216 | |
217 int CSndBuffer::readData(char** data, int32_t& msgno) | |
218 { | |
219 // No data to read | |
220 if (m_pCurrBlock == m_pLastBlock) | |
221 return 0; | |
222 | |
223 *data = m_pCurrBlock->m_pcData; | |
224 int readlen = m_pCurrBlock->m_iLength; | |
225 msgno = m_pCurrBlock->m_iMsgNo; | |
226 | |
227 m_pCurrBlock = m_pCurrBlock->m_pNext; | |
228 | |
229 return readlen; | |
230 } | |
231 | |
232 int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msg
len) | |
233 { | |
234 CGuard bufferguard(m_BufLock); | |
235 | |
236 Block* p = m_pFirstBlock; | |
237 | |
238 for (int i = 0; i < offset; ++ i) | |
239 p = p->m_pNext; | |
240 | |
241 if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint
64_t)p->m_iTTL)) | |
242 { | |
243 msgno = p->m_iMsgNo & 0x1FFFFFFF; | |
244 | |
245 msglen = 1; | |
246 p = p->m_pNext; | |
247 bool move = false; | |
248 while (msgno == (p->m_iMsgNo & 0x1FFFFFFF)) | |
249 { | |
250 if (p == m_pCurrBlock) | |
251 move = true; | |
252 p = p->m_pNext; | |
253 if (move) | |
254 m_pCurrBlock = p; | |
255 msglen ++; | |
256 } | |
257 | |
258 return -1; | |
259 } | |
260 | |
261 *data = p->m_pcData; | |
262 int readlen = p->m_iLength; | |
263 msgno = p->m_iMsgNo; | |
264 | |
265 return readlen; | |
266 } | |
267 | |
268 void CSndBuffer::ackData(const int& offset) | |
269 { | |
270 CGuard bufferguard(m_BufLock); | |
271 | |
272 for (int i = 0; i < offset; ++ i) | |
273 m_pFirstBlock = m_pFirstBlock->m_pNext; | |
274 | |
275 m_iCount -= offset; | |
276 | |
277 CTimer::triggerEvent(); | |
278 } | |
279 | |
280 int CSndBuffer::getCurrBufSize() const | |
281 { | |
282 return m_iCount; | |
283 } | |
284 | |
285 void CSndBuffer::increase() | |
286 { | |
287 int unitsize = m_pBuffer->m_iSize; | |
288 | |
289 // new physical buffer | |
290 Buffer* nbuf = NULL; | |
291 try | |
292 { | |
293 nbuf = new Buffer; | |
294 nbuf->m_pcData = new char [unitsize * m_iMSS]; | |
295 } | |
296 catch (...) | |
297 { | |
298 delete nbuf; | |
299 throw CUDTException(3, 2, 0); | |
300 } | |
301 nbuf->m_iSize = unitsize; | |
302 nbuf->m_pNext = NULL; | |
303 | |
304 // insert the buffer at the end of the buffer list | |
305 Buffer* p = m_pBuffer; | |
306 while (NULL != p->m_pNext) | |
307 p = p->m_pNext; | |
308 p->m_pNext = nbuf; | |
309 | |
310 // new packet blocks | |
311 Block* nblk = NULL; | |
312 try | |
313 { | |
314 nblk = new Block; | |
315 } | |
316 catch (...) | |
317 { | |
318 delete nblk; | |
319 throw CUDTException(3, 2, 0); | |
320 } | |
321 Block* pb = nblk; | |
322 for (int i = 1; i < unitsize; ++ i) | |
323 { | |
324 pb->m_pNext = new Block; | |
325 pb = pb->m_pNext; | |
326 } | |
327 | |
328 // insert the new blocks onto the existing one | |
329 pb->m_pNext = m_pLastBlock->m_pNext; | |
330 m_pLastBlock->m_pNext = nblk; | |
331 | |
332 pb = nblk; | |
333 char* pc = nbuf->m_pcData; | |
334 for (int i = 0; i < unitsize; ++ i) | |
335 { | |
336 pb->m_pcData = pc; | |
337 pb = pb->m_pNext; | |
338 pc += m_iMSS; | |
339 } | |
340 | |
341 m_iSize += unitsize; | |
342 } | |
343 | |
344 //////////////////////////////////////////////////////////////////////////////// | |
345 | |
346 CRcvBuffer::CRcvBuffer(CUnitQueue* queue, const int& bufsize): | |
347 m_pUnit(NULL), | |
348 m_iSize(bufsize), | |
349 m_pUnitQueue(queue), | |
350 m_iStartPos(0), | |
351 m_iLastAckPos(0), | |
352 m_iMaxPos(-1), | |
353 m_iNotch(0) | |
354 { | |
355 m_pUnit = new CUnit* [m_iSize]; | |
356 for (int i = 0; i < m_iSize; ++ i) | |
357 m_pUnit[i] = NULL; | |
358 } | |
359 | |
360 CRcvBuffer::~CRcvBuffer() | |
361 { | |
362 for (int i = 0; i < m_iSize; ++ i) | |
363 { | |
364 if (NULL != m_pUnit[i]) | |
365 { | |
366 m_pUnit[i]->m_iFlag = 0; | |
367 -- m_pUnitQueue->m_iCount; | |
368 } | |
369 } | |
370 | |
371 delete [] m_pUnit; | |
372 } | |
373 | |
374 int CRcvBuffer::addData(CUnit* unit, int offset) | |
375 { | |
376 int pos = (m_iLastAckPos + offset) % m_iSize; | |
377 if (offset > m_iMaxPos) | |
378 m_iMaxPos = offset; | |
379 | |
380 if (NULL != m_pUnit[pos]) | |
381 return -1; | |
382 | |
383 m_pUnit[pos] = unit; | |
384 | |
385 unit->m_iFlag = 1; | |
386 ++ m_pUnitQueue->m_iCount; | |
387 | |
388 return 0; | |
389 } | |
390 | |
391 int CRcvBuffer::readBuffer(char* data, const int& len) | |
392 { | |
393 int p = m_iStartPos; | |
394 int lastack = m_iLastAckPos; | |
395 int rs = len; | |
396 | |
397 while ((p != lastack) && (rs > 0)) | |
398 { | |
399 int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch; | |
400 if (unitsize > rs) | |
401 unitsize = rs; | |
402 | |
403 memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize); | |
404 data += unitsize; | |
405 | |
406 if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch)
) | |
407 { | |
408 CUnit* tmp = m_pUnit[p]; | |
409 m_pUnit[p] = NULL; | |
410 tmp->m_iFlag = 0; | |
411 -- m_pUnitQueue->m_iCount; | |
412 | |
413 if (++ p == m_iSize) | |
414 p = 0; | |
415 | |
416 m_iNotch = 0; | |
417 } | |
418 else | |
419 m_iNotch += rs; | |
420 | |
421 rs -= unitsize; | |
422 } | |
423 | |
424 m_iStartPos = p; | |
425 return len - rs; | |
426 } | |
427 | |
428 int CRcvBuffer::readBufferToFile(fstream& ofs, const int& len) | |
429 { | |
430 int p = m_iStartPos; | |
431 int lastack = m_iLastAckPos; | |
432 int rs = len; | |
433 | |
434 while ((p != lastack) && (rs > 0)) | |
435 { | |
436 int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch; | |
437 if (unitsize > rs) | |
438 unitsize = rs; | |
439 | |
440 ofs.write(m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize); | |
441 if (ofs.fail()) | |
442 break; | |
443 | |
444 if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch)
) | |
445 { | |
446 CUnit* tmp = m_pUnit[p]; | |
447 m_pUnit[p] = NULL; | |
448 tmp->m_iFlag = 0; | |
449 -- m_pUnitQueue->m_iCount; | |
450 | |
451 if (++ p == m_iSize) | |
452 p = 0; | |
453 | |
454 m_iNotch = 0; | |
455 } | |
456 else | |
457 m_iNotch += rs; | |
458 | |
459 rs -= unitsize; | |
460 } | |
461 | |
462 m_iStartPos = p; | |
463 | |
464 return len - rs; | |
465 } | |
466 | |
467 void CRcvBuffer::ackData(const int& len) | |
468 { | |
469 m_iLastAckPos = (m_iLastAckPos + len) % m_iSize; | |
470 m_iMaxPos -= len; | |
471 if (m_iMaxPos < 0) | |
472 m_iMaxPos = 0; | |
473 | |
474 CTimer::triggerEvent(); | |
475 } | |
476 | |
477 int CRcvBuffer::getAvailBufSize() const | |
478 { | |
479 // One slot must be empty in order to tell the difference between "empty buff
er" and "full buffer" | |
480 return m_iSize - getRcvDataSize() - 1; | |
481 } | |
482 | |
483 int CRcvBuffer::getRcvDataSize() const | |
484 { | |
485 if (m_iLastAckPos >= m_iStartPos) | |
486 return m_iLastAckPos - m_iStartPos; | |
487 | |
488 return m_iSize + m_iLastAckPos - m_iStartPos; | |
489 } | |
490 | |
491 void CRcvBuffer::dropMsg(const int32_t& msgno) | |
492 { | |
493 for (int i = m_iStartPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n;
i = (i + 1) % m_iSize) | |
494 if ((NULL != m_pUnit[i]) && (msgno == m_pUnit[i]->m_Packet.m_iMsgNo)) | |
495 m_pUnit[i]->m_iFlag = 3; | |
496 } | |
497 | |
498 int CRcvBuffer::readMsg(char* data, const int& len) | |
499 { | |
500 int p, q; | |
501 bool passack; | |
502 if (!scanMsg(p, q, passack)) | |
503 return 0; | |
504 | |
505 int rs = len; | |
506 while (p != (q + 1) % m_iSize) | |
507 { | |
508 int unitsize = m_pUnit[p]->m_Packet.getLength(); | |
509 if ((rs >= 0) && (unitsize > rs)) | |
510 unitsize = rs; | |
511 | |
512 if (unitsize > 0) | |
513 { | |
514 memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize); | |
515 data += unitsize; | |
516 rs -= unitsize; | |
517 } | |
518 | |
519 if (!passack) | |
520 { | |
521 CUnit* tmp = m_pUnit[p]; | |
522 m_pUnit[p] = NULL; | |
523 tmp->m_iFlag = 0; | |
524 -- m_pUnitQueue->m_iCount; | |
525 } | |
526 else | |
527 m_pUnit[p]->m_iFlag = 2; | |
528 | |
529 if (++ p == m_iSize) | |
530 p = 0; | |
531 } | |
532 | |
533 if (!passack) | |
534 m_iStartPos = (q + 1) % m_iSize; | |
535 | |
536 return len - rs; | |
537 } | |
538 | |
539 int CRcvBuffer::getRcvMsgNum() | |
540 { | |
541 int p, q; | |
542 bool passack; | |
543 return scanMsg(p, q, passack) ? 1 : 0; | |
544 } | |
545 | |
546 bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack) | |
547 { | |
548 // empty buffer | |
549 if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0)) | |
550 return false; | |
551 | |
552 //skip all bad msgs at the beginning | |
553 while (m_iStartPos != m_iLastAckPos) | |
554 { | |
555 if (NULL == m_pUnit[m_iStartPos]) | |
556 { | |
557 if (++ m_iStartPos == m_iSize) | |
558 m_iStartPos = 0; | |
559 continue; | |
560 } | |
561 | |
562 if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packe
t.getMsgBoundary() > 1)) | |
563 { | |
564 bool good = true; | |
565 | |
566 // look ahead for the whole message | |
567 for (int i = m_iStartPos; i != m_iLastAckPos;) | |
568 { | |
569 if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag)) | |
570 { | |
571 good = false; | |
572 break; | |
573 } | |
574 | |
575 if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_P
acket.getMsgBoundary() == 3)) | |
576 break; | |
577 | |
578 if (++ i == m_iSize) | |
579 i = 0; | |
580 } | |
581 | |
582 if (good) | |
583 break; | |
584 } | |
585 | |
586 CUnit* tmp = m_pUnit[m_iStartPos]; | |
587 m_pUnit[m_iStartPos] = NULL; | |
588 tmp->m_iFlag = 0; | |
589 -- m_pUnitQueue->m_iCount; | |
590 | |
591 if (++ m_iStartPos == m_iSize) | |
592 m_iStartPos = 0; | |
593 } | |
594 | |
595 p = -1; // message head | |
596 q = m_iStartPos; // message tail | |
597 passack = m_iStartPos == m_iLastAckPos; | |
598 bool found = false; | |
599 | |
600 // looking for the first message | |
601 for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++ i) | |
602 { | |
603 if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag)) | |
604 { | |
605 switch (m_pUnit[q]->m_Packet.getMsgBoundary()) | |
606 { | |
607 case 3: // 11 | |
608 p = q; | |
609 found = true; | |
610 break; | |
611 | |
612 case 2: // 10 | |
613 p = q; | |
614 break; | |
615 | |
616 case 1: // 01 | |
617 if (p != -1) | |
618 found = true; | |
619 } | |
620 } | |
621 else | |
622 { | |
623 // a hole in this message, not valid, restart search | |
624 p = -1; | |
625 } | |
626 | |
627 if (found) | |
628 { | |
629 // the msg has to be ack'ed or it is allowed to read out of order, and
was not read before | |
630 if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag()) | |
631 break; | |
632 | |
633 found = false; | |
634 } | |
635 | |
636 if (++ q == m_iSize) | |
637 q = 0; | |
638 | |
639 if (q == m_iLastAckPos) | |
640 passack = true; | |
641 } | |
642 | |
643 // no msg found | |
644 if (!found) | |
645 { | |
646 // if the message is larger than the receiver buffer, return part of the m
essage | |
647 if ((p != -1) && ((q + 1) % m_iSize == p)) | |
648 found = true; | |
649 } | |
650 | |
651 return found; | |
652 } | |
OLD | NEW |