Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(525)

Side by Side Diff: net/quic/quic_stream_sequencer_buffer.cc

Issue 2193073003: Move shared files in net/quic/ into net/quic/core/ (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: io_thread_unittest.cc Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/quic_stream_sequencer_buffer.h"
6
7 #include "base/logging.h"
8 #include "base/strings/string_number_conversions.h"
9 #include "net/quic/quic_bug_tracker.h"
10
11 using std::min;
12 using std::string;
13
14 namespace net {
15
16 namespace {
17
18 string RangeDebugString(QuicStreamOffset start, QuicStreamOffset end) {
19 return string("[") + base::Uint64ToString(start) + ", " +
20 base::Uint64ToString(end) + ") ";
21 }
22
23 } // namespace
24
25 QuicStreamSequencerBuffer::Gap::Gap(QuicStreamOffset begin_offset,
26 QuicStreamOffset end_offset)
27 : begin_offset(begin_offset), end_offset(end_offset) {}
28
29 QuicStreamSequencerBuffer::FrameInfo::FrameInfo()
30 : length(1), timestamp(QuicTime::Zero()) {}
31
32 QuicStreamSequencerBuffer::FrameInfo::FrameInfo(size_t length,
33 QuicTime timestamp)
34 : length(length), timestamp(timestamp) {}
35
36 QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes)
37 : max_buffer_capacity_bytes_(max_capacity_bytes),
38 blocks_count_(
39 ceil(static_cast<double>(max_capacity_bytes) / kBlockSizeBytes)),
40 total_bytes_read_(0),
41 blocks_(blocks_count_) {
42 Clear();
43 }
44
45 QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() {
46 Clear();
47 }
48
49 void QuicStreamSequencerBuffer::Clear() {
50 for (size_t i = 0; i < blocks_count_; ++i) {
51 if (blocks_[i] != nullptr) {
52 RetireBlock(i);
53 }
54 }
55 num_bytes_buffered_ = 0;
56 // Reset gaps_ so that buffer is in a state as if all data before
57 // total_bytes_read_ has been consumed, and those after total_bytes_read_
58 // has never arrived.
59 gaps_ = std::list<Gap>(
60 1, Gap(total_bytes_read_, std::numeric_limits<QuicStreamOffset>::max())),
61 frame_arrival_time_map_.clear();
62 }
63
64 void QuicStreamSequencerBuffer::RetireBlock(size_t idx) {
65 DCHECK(blocks_[idx] != nullptr);
66 delete blocks_[idx];
67 blocks_[idx] = nullptr;
68 DVLOG(1) << "Retired block with index: " << idx;
69 }
70
71 QuicErrorCode QuicStreamSequencerBuffer::OnStreamData(
72 QuicStreamOffset starting_offset,
73 base::StringPiece data,
74 QuicTime timestamp,
75 size_t* const bytes_buffered,
76 std::string* error_details) {
77 *bytes_buffered = 0;
78 QuicStreamOffset offset = starting_offset;
79 size_t size = data.size();
80 if (size == 0) {
81 *error_details = "Received empty stream frame without FIN.";
82 return QUIC_EMPTY_STREAM_FRAME_NO_FIN;
83 }
84
85 // Find the first gap not ending before |offset|. This gap maybe the gap to
86 // fill if the arriving frame doesn't overlaps with previous ones.
87 std::list<Gap>::iterator current_gap = gaps_.begin();
88 while (current_gap != gaps_.end() && current_gap->end_offset <= offset) {
89 ++current_gap;
90 }
91
92 DCHECK(current_gap != gaps_.end());
93
94 // "duplication": might duplicate with data alread filled,but also might
95 // overlap across different base::StringPiece objects already written.
96 // In both cases, don't write the data,
97 // and allow the caller of this method to handle the result.
98 if (offset < current_gap->begin_offset &&
99 offset + size <= current_gap->begin_offset) {
100 DVLOG(1) << "Duplicated data at offset: " << offset << " length: " << size;
101 return QUIC_NO_ERROR;
102 }
103 if (offset < current_gap->begin_offset &&
104 offset + size > current_gap->begin_offset) {
105 // Beginning of new data overlaps data before current gap.
106 *error_details =
107 string("Beginning of received data overlaps with buffered data.\n") +
108 "New frame range " + RangeDebugString(offset, offset + size) +
109 " with first 128 bytes: " +
110 string(data.data(), data.length() < 128 ? data.length() : 128) +
111 "\nCurrently received frames: " + ReceivedFramesDebugString() +
112 "\nCurrent gaps: " + GapsDebugString();
113 return QUIC_OVERLAPPING_STREAM_DATA;
114 }
115 if (offset + size > current_gap->end_offset) {
116 // End of new data overlaps with data after current gap.
117 *error_details =
118 string("End of received data overlaps with buffered data.\n") +
119 "New frame range " + RangeDebugString(offset, offset + size) +
120 " with first 128 bytes: " +
121 string(data.data(), data.length() < 128 ? data.length() : 128) +
122 "\nCurrently received frames: " + ReceivedFramesDebugString() +
123 "\nCurrent gaps: " + GapsDebugString();
124 return QUIC_OVERLAPPING_STREAM_DATA;
125 }
126
127 // Write beyond the current range this buffer is covering.
128 if (offset + size > total_bytes_read_ + max_buffer_capacity_bytes_) {
129 *error_details = "Received data beyond available range.";
130 return QUIC_INTERNAL_ERROR;
131 }
132
133 size_t total_written = 0;
134 size_t source_remaining = size;
135 const char* source = data.data();
136 // Write data block by block. If corresponding block has not created yet,
137 // create it first.
138 // Stop when all data are written or reaches the logical end of the buffer.
139 while (source_remaining > 0) {
140 const size_t write_block_num = GetBlockIndex(offset);
141 const size_t write_block_offset = GetInBlockOffset(offset);
142 DCHECK_GT(blocks_count_, write_block_num);
143
144 size_t block_capacity = GetBlockCapacity(write_block_num);
145 size_t bytes_avail = block_capacity - write_block_offset;
146
147 // If this write meets the upper boundary of the buffer,
148 // reduce the available free bytes.
149 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
150 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
151 }
152
153 if (blocks_[write_block_num] == nullptr) {
154 // TODO(danzh): Investigate if using a freelist would improve performance.
155 // Same as RetireBlock().
156 blocks_[write_block_num] = new BufferBlock();
157 }
158
159 const size_t bytes_to_copy = min<size_t>(bytes_avail, source_remaining);
160 char* dest = blocks_[write_block_num]->buffer + write_block_offset;
161 DVLOG(1) << "Write at offset: " << offset << " length: " << bytes_to_copy;
162 memcpy(dest, source, bytes_to_copy);
163 source += bytes_to_copy;
164 source_remaining -= bytes_to_copy;
165 offset += bytes_to_copy;
166 total_written += bytes_to_copy;
167 }
168
169 DCHECK_GT(total_written, 0u);
170 *bytes_buffered = total_written;
171 UpdateGapList(current_gap, starting_offset, total_written);
172
173 frame_arrival_time_map_.insert(
174 std::make_pair(starting_offset, FrameInfo(size, timestamp)));
175 num_bytes_buffered_ += total_written;
176 return QUIC_NO_ERROR;
177 }
178
179 inline void QuicStreamSequencerBuffer::UpdateGapList(
180 std::list<Gap>::iterator gap_with_new_data_written,
181 QuicStreamOffset start_offset,
182 size_t bytes_written) {
183 if (gap_with_new_data_written->begin_offset == start_offset &&
184 gap_with_new_data_written->end_offset > start_offset + bytes_written) {
185 // New data has been written into the left part of the buffer.
186 gap_with_new_data_written->begin_offset = start_offset + bytes_written;
187 } else if (gap_with_new_data_written->begin_offset < start_offset &&
188 gap_with_new_data_written->end_offset ==
189 start_offset + bytes_written) {
190 // New data has been written into the right part of the buffer.
191 gap_with_new_data_written->end_offset = start_offset;
192 } else if (gap_with_new_data_written->begin_offset < start_offset &&
193 gap_with_new_data_written->end_offset >
194 start_offset + bytes_written) {
195 // New data has been written into the middle of the buffer.
196 auto current = gap_with_new_data_written++;
197 QuicStreamOffset current_end = current->end_offset;
198 current->end_offset = start_offset;
199 gaps_.insert(gap_with_new_data_written,
200 Gap(start_offset + bytes_written, current_end));
201 } else if (gap_with_new_data_written->begin_offset == start_offset &&
202 gap_with_new_data_written->end_offset ==
203 start_offset + bytes_written) {
204 // This gap has been filled with new data. So it's no longer a gap.
205 gaps_.erase(gap_with_new_data_written);
206 }
207 }
208
209 size_t QuicStreamSequencerBuffer::Readv(const iovec* dest_iov,
210 size_t dest_count) {
211 size_t bytes_read = 0;
212 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
213 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
214 size_t dest_remaining = dest_iov[i].iov_len;
215 while (dest_remaining > 0 && ReadableBytes() > 0) {
216 size_t block_idx = NextBlockToRead();
217 size_t start_offset_in_block = ReadOffset();
218 size_t block_capacity = GetBlockCapacity(block_idx);
219 size_t bytes_available_in_block =
220 min<size_t>(ReadableBytes(), block_capacity - start_offset_in_block);
221 size_t bytes_to_copy =
222 min<size_t>(bytes_available_in_block, dest_remaining);
223 DCHECK_GT(bytes_to_copy, 0u);
224 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
225 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
226 bytes_to_copy);
227 dest += bytes_to_copy;
228 dest_remaining -= bytes_to_copy;
229 num_bytes_buffered_ -= bytes_to_copy;
230 total_bytes_read_ += bytes_to_copy;
231 bytes_read += bytes_to_copy;
232
233 // Retire the block if all the data is read out
234 // and no other data is stored in this block.
235 if (bytes_to_copy == bytes_available_in_block) {
236 RetireBlockIfEmpty(block_idx);
237 }
238 }
239 }
240
241 if (bytes_read > 0) {
242 UpdateFrameArrivalMap(total_bytes_read_);
243 }
244 return bytes_read;
245 }
246
247 int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
248 int iov_count) const {
249 DCHECK(iov != nullptr);
250 DCHECK_GT(iov_count, 0);
251
252 if (ReadableBytes() == 0) {
253 iov[0].iov_base = nullptr;
254 iov[0].iov_len = 0;
255 return 0;
256 }
257
258 size_t start_block_idx = NextBlockToRead();
259 QuicStreamOffset readable_offset_end = gaps_.front().begin_offset - 1;
260 DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
261 size_t end_block_offset = GetInBlockOffset(readable_offset_end);
262 size_t end_block_idx = GetBlockIndex(readable_offset_end);
263
264 // If readable region is within one block, deal with it seperately.
265 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
266 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
267 iov[0].iov_len = ReadableBytes();
268 DVLOG(1) << "Got only a single block with index: " << start_block_idx;
269 return 1;
270 }
271
272 // Get first block
273 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
274 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
275 DVLOG(1) << "Got first block " << start_block_idx << " with len "
276 << iov[0].iov_len;
277 DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
278 << "there should be more available data";
279
280 // Get readable regions of the rest blocks till either 2nd to last block
281 // before gap is met or |iov| is filled. For these blocks, one whole block is
282 // a region.
283 int iov_used = 1;
284 size_t block_idx = (start_block_idx + iov_used) % blocks_count_;
285 while (block_idx != end_block_idx && iov_used < iov_count) {
286 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
287 iov[iov_used].iov_base = blocks_[block_idx]->buffer;
288 iov[iov_used].iov_len = GetBlockCapacity(block_idx);
289 DVLOG(1) << "Got block with index: " << block_idx;
290 ++iov_used;
291 block_idx = (start_block_idx + iov_used) % blocks_count_;
292 }
293
294 // Deal with last block if |iov| can hold more.
295 if (iov_used < iov_count) {
296 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
297 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
298 iov[iov_used].iov_len = end_block_offset + 1;
299 DVLOG(1) << "Got last block with index: " << end_block_idx;
300 ++iov_used;
301 }
302 return iov_used;
303 }
304
305 bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov,
306 QuicTime* timestamp) const {
307 if (ReadableBytes() == 0) {
308 iov[0].iov_base = nullptr;
309 iov[0].iov_len = 0;
310 return false;
311 }
312
313 size_t start_block_idx = NextBlockToRead();
314 iov->iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
315 size_t readable_bytes_in_block = min<size_t>(
316 GetBlockCapacity(start_block_idx) - ReadOffset(), ReadableBytes());
317 size_t region_len = 0;
318 auto iter = frame_arrival_time_map_.begin();
319 *timestamp = iter->second.timestamp;
320 DVLOG(1) << "Readable bytes in block: " << readable_bytes_in_block;
321 for (; iter != frame_arrival_time_map_.end() &&
322 region_len + iter->second.length <= readable_bytes_in_block;
323 ++iter) {
324 if (iter->second.timestamp != *timestamp) {
325 // If reaches a frame arrive at another timestamp, stop expanding current
326 // region.
327 DVLOG(1) << "Meet frame with different timestamp.";
328 break;
329 }
330 region_len += iter->second.length;
331 DVLOG(1) << "Added bytes to region: " << iter->second.length;
332 }
333 if (iter == frame_arrival_time_map_.end() ||
334 iter->second.timestamp == *timestamp) {
335 // If encountered the end of readable bytes before reaching a different
336 // timestamp.
337 DVLOG(1) << "Got all readable bytes in first block.";
338 region_len = readable_bytes_in_block;
339 }
340 iov->iov_len = region_len;
341 return true;
342 }
343
344 bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_used) {
345 if (bytes_used > ReadableBytes()) {
346 return false;
347 }
348 size_t bytes_to_consume = bytes_used;
349 while (bytes_to_consume > 0) {
350 size_t block_idx = NextBlockToRead();
351 size_t offset_in_block = ReadOffset();
352 size_t bytes_available = min<size_t>(
353 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
354 size_t bytes_read = min<size_t>(bytes_to_consume, bytes_available);
355 total_bytes_read_ += bytes_read;
356 num_bytes_buffered_ -= bytes_read;
357 bytes_to_consume -= bytes_read;
358 // If advanced to the end of current block and end of buffer hasn't wrapped
359 // to this block yet.
360 if (bytes_available == bytes_read) {
361 RetireBlockIfEmpty(block_idx);
362 }
363 }
364 if (bytes_used > 0) {
365 UpdateFrameArrivalMap(total_bytes_read_);
366 }
367 return true;
368 }
369
370 size_t QuicStreamSequencerBuffer::FlushBufferedFrames() {
371 size_t prev_total_bytes_read = total_bytes_read_;
372 total_bytes_read_ = gaps_.back().begin_offset;
373 Clear();
374 return total_bytes_read_ - prev_total_bytes_read;
375 }
376
377 size_t QuicStreamSequencerBuffer::ReadableBytes() const {
378 return gaps_.front().begin_offset - total_bytes_read_;
379 }
380
381 bool QuicStreamSequencerBuffer::HasBytesToRead() const {
382 return ReadableBytes() > 0;
383 }
384
385 QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const {
386 return total_bytes_read_;
387 }
388
389 size_t QuicStreamSequencerBuffer::BytesBuffered() const {
390 return num_bytes_buffered_;
391 }
392
393 size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
394 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
395 }
396
397 size_t QuicStreamSequencerBuffer::GetInBlockOffset(
398 QuicStreamOffset offset) const {
399 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
400 }
401
402 size_t QuicStreamSequencerBuffer::ReadOffset() const {
403 return GetInBlockOffset(total_bytes_read_);
404 }
405
406 size_t QuicStreamSequencerBuffer::NextBlockToRead() const {
407 return GetBlockIndex(total_bytes_read_);
408 }
409
410 void QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
411 DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0)
412 << "RetireBlockIfEmpty() should only be called when advancing to next "
413 "block"
414 " or a gap has been reached.";
415 // If the whole buffer becomes empty, the last piece of data has been read.
416 if (Empty()) {
417 RetireBlock(block_index);
418 return;
419 }
420
421 // Check where the logical end of this buffer is.
422 // Not empty if the end of circular buffer has been wrapped to this block.
423 if (GetBlockIndex(gaps_.back().begin_offset - 1) == block_index) {
424 return;
425 }
426
427 // Read index remains in this block, which means a gap has been reached.
428 if (NextBlockToRead() == block_index) {
429 Gap first_gap = gaps_.front();
430 DCHECK(first_gap.begin_offset == total_bytes_read_);
431 // Check where the next piece data is.
432 // Not empty if next piece of data is still in this chunk.
433 bool gap_extends_to_infinity =
434 (first_gap.end_offset != std::numeric_limits<QuicStreamOffset>::max());
435 bool gap_ends_in_this_block =
436 (GetBlockIndex(first_gap.end_offset) == block_index);
437 if (gap_extends_to_infinity || gap_ends_in_this_block) {
438 return;
439 }
440 }
441 RetireBlock(block_index);
442 }
443
444 bool QuicStreamSequencerBuffer::Empty() const {
445 return gaps_.size() == 1 && gaps_.front().begin_offset == total_bytes_read_;
446 }
447
448 size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
449 if ((block_index + 1) == blocks_count_) {
450 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
451 if (result == 0) { // whole block
452 result = kBlockSizeBytes;
453 }
454 return result;
455 } else {
456 return kBlockSizeBytes;
457 }
458 }
459
460 void QuicStreamSequencerBuffer::UpdateFrameArrivalMap(QuicStreamOffset offset) {
461 // Get the frame before which all frames should be removed.
462 auto next_frame = frame_arrival_time_map_.upper_bound(offset);
463 DCHECK(next_frame != frame_arrival_time_map_.begin());
464 auto iter = frame_arrival_time_map_.begin();
465 while (iter != next_frame) {
466 auto erased = *iter;
467 iter = frame_arrival_time_map_.erase(iter);
468 DVLOG(1) << "Removed FrameInfo with offset: " << erased.first
469 << " and length: " << erased.second.length;
470 if (erased.first + erased.second.length > offset) {
471 // If last frame is partially read out, update this FrameInfo and insert
472 // it back.
473 auto updated = std::make_pair(
474 offset, FrameInfo(erased.first + erased.second.length - offset,
475 erased.second.timestamp));
476 DVLOG(1) << "Inserted FrameInfo with offset: " << updated.first
477 << " and length: " << updated.second.length;
478 frame_arrival_time_map_.insert(updated);
479 }
480 }
481 }
482
483 string QuicStreamSequencerBuffer::GapsDebugString() {
484 string current_gaps_string;
485 for (const Gap& gap : gaps_) {
486 QuicStreamOffset current_gap_begin = gap.begin_offset;
487 QuicStreamOffset current_gap_end = gap.end_offset;
488 current_gaps_string += RangeDebugString(current_gap_begin, current_gap_end);
489 }
490 return current_gaps_string;
491 }
492
493 string QuicStreamSequencerBuffer::ReceivedFramesDebugString() {
494 string current_frames_string;
495 for (auto it : frame_arrival_time_map_) {
496 QuicStreamOffset current_frame_begin_offset = it.first;
497 QuicStreamOffset current_frame_end_offset =
498 it.second.length + current_frame_begin_offset;
499 current_frames_string +=
500 RangeDebugString(current_frame_begin_offset, current_frame_end_offset);
501 }
502 return current_frames_string;
503 }
504
505 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/quic_stream_sequencer_buffer.h ('k') | net/quic/quic_stream_sequencer_buffer_interface.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698