OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/quic/quic_stream_sequencer_buffer.h" | |
6 | |
7 #include "base/logging.h" | |
8 #include "base/strings/string_number_conversions.h" | |
9 #include "net/quic/quic_bug_tracker.h" | |
10 | |
11 using std::min; | |
12 using std::string; | |
13 | |
14 namespace net { | |
15 | |
16 namespace { | |
17 | |
18 string RangeDebugString(QuicStreamOffset start, QuicStreamOffset end) { | |
19 return string("[") + base::Uint64ToString(start) + ", " + | |
20 base::Uint64ToString(end) + ") "; | |
21 } | |
22 | |
23 } // namespace | |
24 | |
25 QuicStreamSequencerBuffer::Gap::Gap(QuicStreamOffset begin_offset, | |
26 QuicStreamOffset end_offset) | |
27 : begin_offset(begin_offset), end_offset(end_offset) {} | |
28 | |
29 QuicStreamSequencerBuffer::FrameInfo::FrameInfo() | |
30 : length(1), timestamp(QuicTime::Zero()) {} | |
31 | |
32 QuicStreamSequencerBuffer::FrameInfo::FrameInfo(size_t length, | |
33 QuicTime timestamp) | |
34 : length(length), timestamp(timestamp) {} | |
35 | |
36 QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes) | |
37 : max_buffer_capacity_bytes_(max_capacity_bytes), | |
38 blocks_count_( | |
39 ceil(static_cast<double>(max_capacity_bytes) / kBlockSizeBytes)), | |
40 total_bytes_read_(0), | |
41 blocks_(blocks_count_) { | |
42 Clear(); | |
43 } | |
44 | |
45 QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() { | |
46 Clear(); | |
47 } | |
48 | |
49 void QuicStreamSequencerBuffer::Clear() { | |
50 for (size_t i = 0; i < blocks_count_; ++i) { | |
51 if (blocks_[i] != nullptr) { | |
52 RetireBlock(i); | |
53 } | |
54 } | |
55 num_bytes_buffered_ = 0; | |
56 // Reset gaps_ so that buffer is in a state as if all data before | |
57 // total_bytes_read_ has been consumed, and those after total_bytes_read_ | |
58 // has never arrived. | |
59 gaps_ = std::list<Gap>( | |
60 1, Gap(total_bytes_read_, std::numeric_limits<QuicStreamOffset>::max())), | |
61 frame_arrival_time_map_.clear(); | |
62 } | |
63 | |
64 void QuicStreamSequencerBuffer::RetireBlock(size_t idx) { | |
65 DCHECK(blocks_[idx] != nullptr); | |
66 delete blocks_[idx]; | |
67 blocks_[idx] = nullptr; | |
68 DVLOG(1) << "Retired block with index: " << idx; | |
69 } | |
70 | |
71 QuicErrorCode QuicStreamSequencerBuffer::OnStreamData( | |
72 QuicStreamOffset starting_offset, | |
73 base::StringPiece data, | |
74 QuicTime timestamp, | |
75 size_t* const bytes_buffered, | |
76 std::string* error_details) { | |
77 *bytes_buffered = 0; | |
78 QuicStreamOffset offset = starting_offset; | |
79 size_t size = data.size(); | |
80 if (size == 0) { | |
81 *error_details = "Received empty stream frame without FIN."; | |
82 return QUIC_EMPTY_STREAM_FRAME_NO_FIN; | |
83 } | |
84 | |
85 // Find the first gap not ending before |offset|. This gap maybe the gap to | |
86 // fill if the arriving frame doesn't overlaps with previous ones. | |
87 std::list<Gap>::iterator current_gap = gaps_.begin(); | |
88 while (current_gap != gaps_.end() && current_gap->end_offset <= offset) { | |
89 ++current_gap; | |
90 } | |
91 | |
92 DCHECK(current_gap != gaps_.end()); | |
93 | |
94 // "duplication": might duplicate with data alread filled,but also might | |
95 // overlap across different base::StringPiece objects already written. | |
96 // In both cases, don't write the data, | |
97 // and allow the caller of this method to handle the result. | |
98 if (offset < current_gap->begin_offset && | |
99 offset + size <= current_gap->begin_offset) { | |
100 DVLOG(1) << "Duplicated data at offset: " << offset << " length: " << size; | |
101 return QUIC_NO_ERROR; | |
102 } | |
103 if (offset < current_gap->begin_offset && | |
104 offset + size > current_gap->begin_offset) { | |
105 // Beginning of new data overlaps data before current gap. | |
106 *error_details = | |
107 string("Beginning of received data overlaps with buffered data.\n") + | |
108 "New frame range " + RangeDebugString(offset, offset + size) + | |
109 " with first 128 bytes: " + | |
110 string(data.data(), data.length() < 128 ? data.length() : 128) + | |
111 "\nCurrently received frames: " + ReceivedFramesDebugString() + | |
112 "\nCurrent gaps: " + GapsDebugString(); | |
113 return QUIC_OVERLAPPING_STREAM_DATA; | |
114 } | |
115 if (offset + size > current_gap->end_offset) { | |
116 // End of new data overlaps with data after current gap. | |
117 *error_details = | |
118 string("End of received data overlaps with buffered data.\n") + | |
119 "New frame range " + RangeDebugString(offset, offset + size) + | |
120 " with first 128 bytes: " + | |
121 string(data.data(), data.length() < 128 ? data.length() : 128) + | |
122 "\nCurrently received frames: " + ReceivedFramesDebugString() + | |
123 "\nCurrent gaps: " + GapsDebugString(); | |
124 return QUIC_OVERLAPPING_STREAM_DATA; | |
125 } | |
126 | |
127 // Write beyond the current range this buffer is covering. | |
128 if (offset + size > total_bytes_read_ + max_buffer_capacity_bytes_) { | |
129 *error_details = "Received data beyond available range."; | |
130 return QUIC_INTERNAL_ERROR; | |
131 } | |
132 | |
133 size_t total_written = 0; | |
134 size_t source_remaining = size; | |
135 const char* source = data.data(); | |
136 // Write data block by block. If corresponding block has not created yet, | |
137 // create it first. | |
138 // Stop when all data are written or reaches the logical end of the buffer. | |
139 while (source_remaining > 0) { | |
140 const size_t write_block_num = GetBlockIndex(offset); | |
141 const size_t write_block_offset = GetInBlockOffset(offset); | |
142 DCHECK_GT(blocks_count_, write_block_num); | |
143 | |
144 size_t block_capacity = GetBlockCapacity(write_block_num); | |
145 size_t bytes_avail = block_capacity - write_block_offset; | |
146 | |
147 // If this write meets the upper boundary of the buffer, | |
148 // reduce the available free bytes. | |
149 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) { | |
150 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset; | |
151 } | |
152 | |
153 if (blocks_[write_block_num] == nullptr) { | |
154 // TODO(danzh): Investigate if using a freelist would improve performance. | |
155 // Same as RetireBlock(). | |
156 blocks_[write_block_num] = new BufferBlock(); | |
157 } | |
158 | |
159 const size_t bytes_to_copy = min<size_t>(bytes_avail, source_remaining); | |
160 char* dest = blocks_[write_block_num]->buffer + write_block_offset; | |
161 DVLOG(1) << "Write at offset: " << offset << " length: " << bytes_to_copy; | |
162 memcpy(dest, source, bytes_to_copy); | |
163 source += bytes_to_copy; | |
164 source_remaining -= bytes_to_copy; | |
165 offset += bytes_to_copy; | |
166 total_written += bytes_to_copy; | |
167 } | |
168 | |
169 DCHECK_GT(total_written, 0u); | |
170 *bytes_buffered = total_written; | |
171 UpdateGapList(current_gap, starting_offset, total_written); | |
172 | |
173 frame_arrival_time_map_.insert( | |
174 std::make_pair(starting_offset, FrameInfo(size, timestamp))); | |
175 num_bytes_buffered_ += total_written; | |
176 return QUIC_NO_ERROR; | |
177 } | |
178 | |
179 inline void QuicStreamSequencerBuffer::UpdateGapList( | |
180 std::list<Gap>::iterator gap_with_new_data_written, | |
181 QuicStreamOffset start_offset, | |
182 size_t bytes_written) { | |
183 if (gap_with_new_data_written->begin_offset == start_offset && | |
184 gap_with_new_data_written->end_offset > start_offset + bytes_written) { | |
185 // New data has been written into the left part of the buffer. | |
186 gap_with_new_data_written->begin_offset = start_offset + bytes_written; | |
187 } else if (gap_with_new_data_written->begin_offset < start_offset && | |
188 gap_with_new_data_written->end_offset == | |
189 start_offset + bytes_written) { | |
190 // New data has been written into the right part of the buffer. | |
191 gap_with_new_data_written->end_offset = start_offset; | |
192 } else if (gap_with_new_data_written->begin_offset < start_offset && | |
193 gap_with_new_data_written->end_offset > | |
194 start_offset + bytes_written) { | |
195 // New data has been written into the middle of the buffer. | |
196 auto current = gap_with_new_data_written++; | |
197 QuicStreamOffset current_end = current->end_offset; | |
198 current->end_offset = start_offset; | |
199 gaps_.insert(gap_with_new_data_written, | |
200 Gap(start_offset + bytes_written, current_end)); | |
201 } else if (gap_with_new_data_written->begin_offset == start_offset && | |
202 gap_with_new_data_written->end_offset == | |
203 start_offset + bytes_written) { | |
204 // This gap has been filled with new data. So it's no longer a gap. | |
205 gaps_.erase(gap_with_new_data_written); | |
206 } | |
207 } | |
208 | |
209 size_t QuicStreamSequencerBuffer::Readv(const iovec* dest_iov, | |
210 size_t dest_count) { | |
211 size_t bytes_read = 0; | |
212 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) { | |
213 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base); | |
214 size_t dest_remaining = dest_iov[i].iov_len; | |
215 while (dest_remaining > 0 && ReadableBytes() > 0) { | |
216 size_t block_idx = NextBlockToRead(); | |
217 size_t start_offset_in_block = ReadOffset(); | |
218 size_t block_capacity = GetBlockCapacity(block_idx); | |
219 size_t bytes_available_in_block = | |
220 min<size_t>(ReadableBytes(), block_capacity - start_offset_in_block); | |
221 size_t bytes_to_copy = | |
222 min<size_t>(bytes_available_in_block, dest_remaining); | |
223 DCHECK_GT(bytes_to_copy, 0u); | |
224 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]); | |
225 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block, | |
226 bytes_to_copy); | |
227 dest += bytes_to_copy; | |
228 dest_remaining -= bytes_to_copy; | |
229 num_bytes_buffered_ -= bytes_to_copy; | |
230 total_bytes_read_ += bytes_to_copy; | |
231 bytes_read += bytes_to_copy; | |
232 | |
233 // Retire the block if all the data is read out | |
234 // and no other data is stored in this block. | |
235 if (bytes_to_copy == bytes_available_in_block) { | |
236 RetireBlockIfEmpty(block_idx); | |
237 } | |
238 } | |
239 } | |
240 | |
241 if (bytes_read > 0) { | |
242 UpdateFrameArrivalMap(total_bytes_read_); | |
243 } | |
244 return bytes_read; | |
245 } | |
246 | |
247 int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov, | |
248 int iov_count) const { | |
249 DCHECK(iov != nullptr); | |
250 DCHECK_GT(iov_count, 0); | |
251 | |
252 if (ReadableBytes() == 0) { | |
253 iov[0].iov_base = nullptr; | |
254 iov[0].iov_len = 0; | |
255 return 0; | |
256 } | |
257 | |
258 size_t start_block_idx = NextBlockToRead(); | |
259 QuicStreamOffset readable_offset_end = gaps_.front().begin_offset - 1; | |
260 DCHECK_GE(readable_offset_end + 1, total_bytes_read_); | |
261 size_t end_block_offset = GetInBlockOffset(readable_offset_end); | |
262 size_t end_block_idx = GetBlockIndex(readable_offset_end); | |
263 | |
264 // If readable region is within one block, deal with it seperately. | |
265 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) { | |
266 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); | |
267 iov[0].iov_len = ReadableBytes(); | |
268 DVLOG(1) << "Got only a single block with index: " << start_block_idx; | |
269 return 1; | |
270 } | |
271 | |
272 // Get first block | |
273 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); | |
274 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset(); | |
275 DVLOG(1) << "Got first block " << start_block_idx << " with len " | |
276 << iov[0].iov_len; | |
277 DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len) | |
278 << "there should be more available data"; | |
279 | |
280 // Get readable regions of the rest blocks till either 2nd to last block | |
281 // before gap is met or |iov| is filled. For these blocks, one whole block is | |
282 // a region. | |
283 int iov_used = 1; | |
284 size_t block_idx = (start_block_idx + iov_used) % blocks_count_; | |
285 while (block_idx != end_block_idx && iov_used < iov_count) { | |
286 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]); | |
287 iov[iov_used].iov_base = blocks_[block_idx]->buffer; | |
288 iov[iov_used].iov_len = GetBlockCapacity(block_idx); | |
289 DVLOG(1) << "Got block with index: " << block_idx; | |
290 ++iov_used; | |
291 block_idx = (start_block_idx + iov_used) % blocks_count_; | |
292 } | |
293 | |
294 // Deal with last block if |iov| can hold more. | |
295 if (iov_used < iov_count) { | |
296 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]); | |
297 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer; | |
298 iov[iov_used].iov_len = end_block_offset + 1; | |
299 DVLOG(1) << "Got last block with index: " << end_block_idx; | |
300 ++iov_used; | |
301 } | |
302 return iov_used; | |
303 } | |
304 | |
305 bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov, | |
306 QuicTime* timestamp) const { | |
307 if (ReadableBytes() == 0) { | |
308 iov[0].iov_base = nullptr; | |
309 iov[0].iov_len = 0; | |
310 return false; | |
311 } | |
312 | |
313 size_t start_block_idx = NextBlockToRead(); | |
314 iov->iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); | |
315 size_t readable_bytes_in_block = min<size_t>( | |
316 GetBlockCapacity(start_block_idx) - ReadOffset(), ReadableBytes()); | |
317 size_t region_len = 0; | |
318 auto iter = frame_arrival_time_map_.begin(); | |
319 *timestamp = iter->second.timestamp; | |
320 DVLOG(1) << "Readable bytes in block: " << readable_bytes_in_block; | |
321 for (; iter != frame_arrival_time_map_.end() && | |
322 region_len + iter->second.length <= readable_bytes_in_block; | |
323 ++iter) { | |
324 if (iter->second.timestamp != *timestamp) { | |
325 // If reaches a frame arrive at another timestamp, stop expanding current | |
326 // region. | |
327 DVLOG(1) << "Meet frame with different timestamp."; | |
328 break; | |
329 } | |
330 region_len += iter->second.length; | |
331 DVLOG(1) << "Added bytes to region: " << iter->second.length; | |
332 } | |
333 if (iter == frame_arrival_time_map_.end() || | |
334 iter->second.timestamp == *timestamp) { | |
335 // If encountered the end of readable bytes before reaching a different | |
336 // timestamp. | |
337 DVLOG(1) << "Got all readable bytes in first block."; | |
338 region_len = readable_bytes_in_block; | |
339 } | |
340 iov->iov_len = region_len; | |
341 return true; | |
342 } | |
343 | |
344 bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_used) { | |
345 if (bytes_used > ReadableBytes()) { | |
346 return false; | |
347 } | |
348 size_t bytes_to_consume = bytes_used; | |
349 while (bytes_to_consume > 0) { | |
350 size_t block_idx = NextBlockToRead(); | |
351 size_t offset_in_block = ReadOffset(); | |
352 size_t bytes_available = min<size_t>( | |
353 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block); | |
354 size_t bytes_read = min<size_t>(bytes_to_consume, bytes_available); | |
355 total_bytes_read_ += bytes_read; | |
356 num_bytes_buffered_ -= bytes_read; | |
357 bytes_to_consume -= bytes_read; | |
358 // If advanced to the end of current block and end of buffer hasn't wrapped | |
359 // to this block yet. | |
360 if (bytes_available == bytes_read) { | |
361 RetireBlockIfEmpty(block_idx); | |
362 } | |
363 } | |
364 if (bytes_used > 0) { | |
365 UpdateFrameArrivalMap(total_bytes_read_); | |
366 } | |
367 return true; | |
368 } | |
369 | |
370 size_t QuicStreamSequencerBuffer::FlushBufferedFrames() { | |
371 size_t prev_total_bytes_read = total_bytes_read_; | |
372 total_bytes_read_ = gaps_.back().begin_offset; | |
373 Clear(); | |
374 return total_bytes_read_ - prev_total_bytes_read; | |
375 } | |
376 | |
377 size_t QuicStreamSequencerBuffer::ReadableBytes() const { | |
378 return gaps_.front().begin_offset - total_bytes_read_; | |
379 } | |
380 | |
381 bool QuicStreamSequencerBuffer::HasBytesToRead() const { | |
382 return ReadableBytes() > 0; | |
383 } | |
384 | |
385 QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const { | |
386 return total_bytes_read_; | |
387 } | |
388 | |
389 size_t QuicStreamSequencerBuffer::BytesBuffered() const { | |
390 return num_bytes_buffered_; | |
391 } | |
392 | |
393 size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const { | |
394 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes; | |
395 } | |
396 | |
397 size_t QuicStreamSequencerBuffer::GetInBlockOffset( | |
398 QuicStreamOffset offset) const { | |
399 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes; | |
400 } | |
401 | |
402 size_t QuicStreamSequencerBuffer::ReadOffset() const { | |
403 return GetInBlockOffset(total_bytes_read_); | |
404 } | |
405 | |
406 size_t QuicStreamSequencerBuffer::NextBlockToRead() const { | |
407 return GetBlockIndex(total_bytes_read_); | |
408 } | |
409 | |
410 void QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) { | |
411 DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0) | |
412 << "RetireBlockIfEmpty() should only be called when advancing to next " | |
413 "block" | |
414 " or a gap has been reached."; | |
415 // If the whole buffer becomes empty, the last piece of data has been read. | |
416 if (Empty()) { | |
417 RetireBlock(block_index); | |
418 return; | |
419 } | |
420 | |
421 // Check where the logical end of this buffer is. | |
422 // Not empty if the end of circular buffer has been wrapped to this block. | |
423 if (GetBlockIndex(gaps_.back().begin_offset - 1) == block_index) { | |
424 return; | |
425 } | |
426 | |
427 // Read index remains in this block, which means a gap has been reached. | |
428 if (NextBlockToRead() == block_index) { | |
429 Gap first_gap = gaps_.front(); | |
430 DCHECK(first_gap.begin_offset == total_bytes_read_); | |
431 // Check where the next piece data is. | |
432 // Not empty if next piece of data is still in this chunk. | |
433 bool gap_extends_to_infinity = | |
434 (first_gap.end_offset != std::numeric_limits<QuicStreamOffset>::max()); | |
435 bool gap_ends_in_this_block = | |
436 (GetBlockIndex(first_gap.end_offset) == block_index); | |
437 if (gap_extends_to_infinity || gap_ends_in_this_block) { | |
438 return; | |
439 } | |
440 } | |
441 RetireBlock(block_index); | |
442 } | |
443 | |
444 bool QuicStreamSequencerBuffer::Empty() const { | |
445 return gaps_.size() == 1 && gaps_.front().begin_offset == total_bytes_read_; | |
446 } | |
447 | |
448 size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const { | |
449 if ((block_index + 1) == blocks_count_) { | |
450 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes; | |
451 if (result == 0) { // whole block | |
452 result = kBlockSizeBytes; | |
453 } | |
454 return result; | |
455 } else { | |
456 return kBlockSizeBytes; | |
457 } | |
458 } | |
459 | |
460 void QuicStreamSequencerBuffer::UpdateFrameArrivalMap(QuicStreamOffset offset) { | |
461 // Get the frame before which all frames should be removed. | |
462 auto next_frame = frame_arrival_time_map_.upper_bound(offset); | |
463 DCHECK(next_frame != frame_arrival_time_map_.begin()); | |
464 auto iter = frame_arrival_time_map_.begin(); | |
465 while (iter != next_frame) { | |
466 auto erased = *iter; | |
467 iter = frame_arrival_time_map_.erase(iter); | |
468 DVLOG(1) << "Removed FrameInfo with offset: " << erased.first | |
469 << " and length: " << erased.second.length; | |
470 if (erased.first + erased.second.length > offset) { | |
471 // If last frame is partially read out, update this FrameInfo and insert | |
472 // it back. | |
473 auto updated = std::make_pair( | |
474 offset, FrameInfo(erased.first + erased.second.length - offset, | |
475 erased.second.timestamp)); | |
476 DVLOG(1) << "Inserted FrameInfo with offset: " << updated.first | |
477 << " and length: " << updated.second.length; | |
478 frame_arrival_time_map_.insert(updated); | |
479 } | |
480 } | |
481 } | |
482 | |
483 string QuicStreamSequencerBuffer::GapsDebugString() { | |
484 string current_gaps_string; | |
485 for (const Gap& gap : gaps_) { | |
486 QuicStreamOffset current_gap_begin = gap.begin_offset; | |
487 QuicStreamOffset current_gap_end = gap.end_offset; | |
488 current_gaps_string += RangeDebugString(current_gap_begin, current_gap_end); | |
489 } | |
490 return current_gaps_string; | |
491 } | |
492 | |
493 string QuicStreamSequencerBuffer::ReceivedFramesDebugString() { | |
494 string current_frames_string; | |
495 for (auto it : frame_arrival_time_map_) { | |
496 QuicStreamOffset current_frame_begin_offset = it.first; | |
497 QuicStreamOffset current_frame_end_offset = | |
498 it.second.length + current_frame_begin_offset; | |
499 current_frames_string += | |
500 RangeDebugString(current_frame_begin_offset, current_frame_end_offset); | |
501 } | |
502 return current_frames_string; | |
503 } | |
504 | |
505 } // namespace net | |
OLD | NEW |