Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: net/quic/stream_sequencer_buffer.cc

Issue 1409053006: Create a new data structure StreamSequencerBuffer for QuicStreamSequencer. Currently QuicStreamSequ… (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@106492030
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/quic/stream_sequencer_buffer.h ('k') | net/quic/stream_sequencer_buffer_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/stream_sequencer_buffer.h"
6
7 #include "base/basictypes.h"
8 #include "base/logging.h"
9
10 using std::min;
11
12 namespace net {
13
14 StreamSequencerBuffer::Gap::Gap(QuicStreamOffset begin_offset,
15 QuicStreamOffset end_offset)
16 : begin_offset(begin_offset), end_offset(end_offset) {}
17
18 StreamSequencerBuffer::FrameInfo::FrameInfo() : length(1),
19 timestamp(QuicTime::Zero()) {}
20
21 StreamSequencerBuffer::FrameInfo::FrameInfo(size_t length, QuicTime timestamp)
22 : length(length), timestamp(timestamp) {}
23
24 StreamSequencerBuffer::StreamSequencerBuffer(size_t max_capacity_bytes)
25 : max_buffer_capacity_bytes_(max_capacity_bytes),
26 blocks_count_(
27 ceil(static_cast<double>(max_capacity_bytes) / kBlockSizeBytes)),
28 total_bytes_read_(0),
29 blocks_(blocks_count_) {
30 Clear();
31 }
32
33 StreamSequencerBuffer::~StreamSequencerBuffer() {
34 Clear();
35 }
36
37 void StreamSequencerBuffer::Clear() {
38 for (size_t i = 0; i < blocks_count_; ++i) {
39 if (blocks_[i] != nullptr) {
40 RetireBlock(i);
41 }
42 }
43 num_bytes_buffered_ = 0;
44 // Reset gaps_ so that buffer is in a state as if all data before
45 // total_bytes_read_ has been consumed, and those after total_bytes_read_
46 // has never arrived.
47 gaps_ = std::list<Gap>(1, Gap(total_bytes_read_,
48 std::numeric_limits<QuicStreamOffset>::max())),
49 frame_arrival_time_map_.clear();
50 }
51
52 void StreamSequencerBuffer::RetireBlock(size_t idx) {
53 DCHECK(blocks_[idx] != nullptr);
54 delete blocks_[idx];
55 blocks_[idx] = nullptr;
56 DVLOG(1) << "Retired block" << idx;
57 }
58
59 QuicErrorCode StreamSequencerBuffer::OnStreamData(
60 QuicStreamOffset starting_offset,
61 base::StringPiece data,
62 QuicTime timestamp,
63 size_t* const bytes_buffered) {
64 *bytes_buffered = 0;
65 QuicStreamOffset offset = starting_offset;
66 size_t size = data.size();
67 if (size == 0) {
68 LOG(DFATAL) << "Attempted to write 0 bytes of data.";
69 return QUIC_INVALID_STREAM_FRAME;
70 }
71
72 // Find the first gap not ending before |offset|. This gap maybe the gap to
73 // fill if the arriving frame doesn't overlaps with previous ones.
74 std::list<Gap>::iterator current_gap = gaps_.begin();
75 while (current_gap != gaps_.end() && current_gap->end_offset <= offset) {
76 ++current_gap;
77 }
78
79 DCHECK(current_gap != gaps_.end());
80
81 // "duplication": might duplicate with data alread filled,but also might
82 // overlap across different base::StringPiece objects already written.
83 // In both cases, don't write the data,
84 // and allow the caller of this method to handle the result.
85 if (offset < current_gap->begin_offset &&
86 offset + size <= current_gap->begin_offset) {
87 DVLOG(1) << "duplicated data at offset:" << offset << " len: " << size;
88 return QUIC_NO_ERROR;
89 }
90 if (offset < current_gap->begin_offset &&
91 offset + size > current_gap->begin_offset) {
92 // Beginning of new data overlaps data before current gap.
93 return QUIC_INVALID_STREAM_DATA;
94 }
95 if (offset + size > current_gap->end_offset) {
96 // End of new data overlaps with data after current gap.
97 return QUIC_INVALID_STREAM_DATA;
98 }
99
100 // Write beyond the current range this buffer is covering.
101 if (offset + size > total_bytes_read_ + max_buffer_capacity_bytes_) {
102 return QUIC_INTERNAL_ERROR;
103 }
104
105 size_t total_written = 0;
106 size_t source_remaining = size;
107 const char* source = data.data();
108 // Write data block by block. If corresponding block has not created yet,
109 // create it first.
110 // Stop when all data are written or reaches the logical end of the buffer.
111 while (source_remaining > 0) {
112 const size_t write_block_num = GetBlockIndex(offset);
113 const size_t write_block_offset = GetInBlockOffset(offset);
114 DCHECK_GT(blocks_count_, write_block_num);
115
116 size_t block_capacity = GetBlockCapacity(write_block_num);
117 size_t bytes_avail = block_capacity - write_block_offset;
118
119 // If this write meets the upper boundary of the buffer,
120 // reduce the available free bytes.
121 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
122 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
123 }
124
125 if (blocks_[write_block_num] == nullptr) {
126 // TODO(danzh): Investigate if using a freelist would improve performance.
127 // Same as RetireBlock().
128 blocks_[write_block_num] = new BufferBlock();
129 }
130
131 const size_t bytes_to_copy = min<size_t>(bytes_avail, source_remaining);
132 char* dest = blocks_[write_block_num]->buffer + write_block_offset;
133 DVLOG(1) << "write at offset: " << offset << " len: " << bytes_to_copy;
134 memcpy(dest, source, bytes_to_copy);
135 source += bytes_to_copy;
136 source_remaining -= bytes_to_copy;
137 offset += bytes_to_copy;
138 total_written += bytes_to_copy;
139 }
140
141 DCHECK_GT(total_written, 0u);
142 *bytes_buffered = total_written;
143 UpdateGapList(current_gap, starting_offset, total_written);
144
145 frame_arrival_time_map_.insert(
146 std::make_pair(starting_offset, FrameInfo(size, timestamp)));
147 num_bytes_buffered_ += total_written;
148 return QUIC_NO_ERROR;
149 }
150
151 inline void StreamSequencerBuffer::UpdateGapList(
152 std::list<Gap>::iterator gap_with_new_data_written,
153 QuicStreamOffset start_offset,
154 size_t bytes_written) {
155 if (gap_with_new_data_written->begin_offset == start_offset &&
156 gap_with_new_data_written->end_offset > start_offset + bytes_written) {
157 // New data has been written into the left part of the buffer.
158 gap_with_new_data_written->begin_offset = start_offset + bytes_written;
159 } else if (gap_with_new_data_written->begin_offset < start_offset &&
160 gap_with_new_data_written->end_offset ==
161 start_offset + bytes_written) {
162 // New data has been written into the right part of the buffer.
163 gap_with_new_data_written->end_offset = start_offset;
164 } else if (gap_with_new_data_written->begin_offset < start_offset &&
165 gap_with_new_data_written->end_offset >
166 start_offset + bytes_written) {
167 // New data has been written into the middle of the buffer.
168 auto current = gap_with_new_data_written++;
169 size_t current_end = current->end_offset;
170 current->end_offset = start_offset;
171 gaps_.insert(gap_with_new_data_written,
172 Gap(start_offset + bytes_written, current_end));
173 } else if (gap_with_new_data_written->begin_offset == start_offset &&
174 gap_with_new_data_written->end_offset ==
175 start_offset + bytes_written) {
176 // This gap has been filled with new data. So it's no longer a gap.
177 gaps_.erase(gap_with_new_data_written);
178 }
179 }
180
181 size_t StreamSequencerBuffer::Readv(const iovec* dest_iov, size_t dest_count) {
182 size_t bytes_read = 0;
183 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
184 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
185 size_t dest_remaining = dest_iov[i].iov_len;
186 while (dest_remaining > 0 && ReadableBytes() > 0) {
187 size_t block_idx = NextBlockToRead();
188 size_t start_offset_in_block = ReadOffset();
189 size_t block_capacity = GetBlockCapacity(block_idx);
190 size_t bytes_available_in_block =
191 min<size_t>(ReadableBytes(), block_capacity - start_offset_in_block);
192 size_t bytes_to_copy = min<size_t>(bytes_available_in_block,
193 dest_remaining);
194 DCHECK_GT(bytes_to_copy, 0u);
195 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
196 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
197 bytes_to_copy);
198 dest += bytes_to_copy;
199 dest_remaining -= bytes_to_copy;
200 num_bytes_buffered_ -= bytes_to_copy;
201 total_bytes_read_ += bytes_to_copy;
202 bytes_read += bytes_to_copy;
203
204 // Retire the block if all the data is read out
205 // and no other data is stored in this block.
206 if (bytes_to_copy == bytes_available_in_block) {
207 RetireBlockIfEmpty(block_idx);
208 }
209 }
210 }
211
212 if (bytes_read > 0) {
213 UpdateFrameArrivalMap(total_bytes_read_);
214 }
215 return bytes_read;
216 }
217
218 int StreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
219 int iov_count) const {
220 DCHECK(iov != nullptr);
221 DCHECK_GT(iov_count, 0);
222
223 if (ReadableBytes() == 0) {
224 iov[0].iov_base = nullptr;
225 iov[0].iov_len = 0;
226 return 0;
227 }
228
229 size_t start_block_idx = NextBlockToRead();
230 QuicStreamOffset readable_offset_end = gaps_.front().begin_offset - 1;
231 DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
232 size_t end_block_offset = GetInBlockOffset(readable_offset_end);
233 size_t end_block_idx = GetBlockIndex(readable_offset_end);
234
235 // If readable region is within one block, deal with it seperately.
236 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
237 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
238 iov[0].iov_len = ReadableBytes();
239 DVLOG(1) << "get only block" << start_block_idx;
240 return 1;
241 }
242
243 // Get first block
244 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
245 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
246 DVLOG(1) << "get first block" << start_block_idx << " with len "
247 << iov[0].iov_len;
248 DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
249 << "there should be more available data";
250
251 // Get readable regions of the rest blocks till either 2nd to last block
252 // before gap is met or |iov| is filled. For these blocks, one whole block is
253 // a region.
254 int iov_used = 1;
255 size_t block_idx = (start_block_idx + iov_used) % blocks_count_;
256 while (block_idx != end_block_idx && iov_used < iov_count) {
257 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
258 iov[iov_used].iov_base = blocks_[block_idx]->buffer;
259 iov[iov_used].iov_len = GetBlockCapacity(block_idx);
260 DVLOG(1) << "get block" << block_idx;
261 ++iov_used;
262 block_idx = (start_block_idx + iov_used) % blocks_count_;
263 }
264
265 // Deal with last block if |iov| can hold more.
266 if (iov_used < iov_count) {
267 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
268 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
269 iov[iov_used].iov_len = end_block_offset + 1;
270 DVLOG(1) << "get last block " << end_block_idx;
271 ++iov_used;
272 }
273 return iov_used;
274 }
275
276 bool StreamSequencerBuffer::GetReadableRegion(iovec* iov,
277 QuicTime* timestamp) const {
278 if (ReadableBytes() == 0) {
279 iov[0].iov_base = nullptr;
280 iov[0].iov_len = 0;
281 return false;
282 }
283
284 size_t start_block_idx = NextBlockToRead();
285 iov->iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
286 size_t readable_bytes_in_block = min<size_t>(
287 GetBlockCapacity(start_block_idx) - ReadOffset(), ReadableBytes());
288 size_t region_len = 0;
289 auto iter = frame_arrival_time_map_.begin();
290 *timestamp = iter->second.timestamp;
291 DVLOG(1) << "readable bytes in block: " << readable_bytes_in_block;
292 for (; iter != frame_arrival_time_map_.end() &&
293 region_len + iter->second.length <= readable_bytes_in_block;
294 ++iter) {
295 if (iter->second.timestamp != *timestamp) {
296 // If reaches a frame arrive at another timestamp, stop expanding current
297 // region.
298 DVLOG(1) << "Meet frame with different timestamp.";
299 break;
300 }
301 region_len += iter->second.length;
302 DVLOG(1) << "Add bytes to region: " << iter->second.length;
303 }
304 if (iter == frame_arrival_time_map_.end() ||
305 iter->second.timestamp == *timestamp) {
306 // If encountered the end of readable bytes before reaching a different
307 // timestamp.
308 DVLOG(1) << "Get all readable bytes in first block.";
309 region_len = readable_bytes_in_block;
310 }
311 iov->iov_len = region_len;
312 return true;
313 }
314
315 bool StreamSequencerBuffer::MarkConsumed(size_t bytes_used) {
316 if (bytes_used > ReadableBytes()) {
317 return false;
318 }
319 size_t bytes_to_consume = bytes_used;
320 while (bytes_to_consume > 0) {
321 size_t block_idx = NextBlockToRead();
322 size_t offset_in_block = ReadOffset();
323 size_t bytes_available = min<size_t>(
324 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
325 size_t bytes_read = min<size_t>(bytes_to_consume, bytes_available);
326 total_bytes_read_ += bytes_read;
327 num_bytes_buffered_ -= bytes_read;
328 bytes_to_consume -= bytes_read;
329 // If advanced to the end of current block and end of buffer hasn't wrapped
330 // to this block yet.
331 if (bytes_available == bytes_read) {
332 RetireBlockIfEmpty(block_idx);
333 }
334 }
335 if (bytes_used > 0) {
336 UpdateFrameArrivalMap(total_bytes_read_);
337 }
338 return true;
339 }
340
341 size_t StreamSequencerBuffer::FlushBufferedFrames() {
342 size_t prev_total_bytes_read = total_bytes_read_;
343 total_bytes_read_ = gaps_.back().begin_offset;
344 Clear();
345 return total_bytes_read_ - prev_total_bytes_read;
346 }
347
348 size_t StreamSequencerBuffer::ReadableBytes() const {
349 return gaps_.front().begin_offset - total_bytes_read_;
350 }
351
352 bool StreamSequencerBuffer::HasBytesToRead() const {
353 return ReadableBytes() > 0;
354 }
355
356 QuicStreamOffset StreamSequencerBuffer::BytesConsumed() const {
357 return total_bytes_read_;
358 }
359
360 size_t StreamSequencerBuffer::BytesBuffered() const {
361 return num_bytes_buffered_;
362 }
363
364 size_t StreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
365 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
366 }
367
368 size_t StreamSequencerBuffer::GetInBlockOffset(QuicStreamOffset offset) const {
369 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
370 }
371
372 size_t StreamSequencerBuffer::ReadOffset() const {
373 return GetInBlockOffset(total_bytes_read_);
374 }
375
376 size_t StreamSequencerBuffer::NextBlockToRead() const {
377 return GetBlockIndex(total_bytes_read_);
378 }
379
380 void StreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
381 DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0)
382 << "RetireBlockIfEmpty() should only be called when advancing to next block"
383 " or a gap has been reached.";
384 // If the whole buffer becomes empty, the last piece of data has been read.
385 if (Empty()) {
386 RetireBlock(block_index);
387 return;
388 }
389
390 // Check where the logical end of this buffer is.
391 // Not empty if the end of circular buffer has been wrapped to this block.
392 if (GetBlockIndex(gaps_.back().begin_offset - 1) == block_index) {
393 return;
394 }
395
396 // Read index remains in this block, which means a gap has been reached.
397 if (NextBlockToRead() == block_index) {
398 Gap first_gap = gaps_.front();
399 DCHECK(first_gap.begin_offset == total_bytes_read_);
400 // Check where the next piece data is.
401 // Not empty if next piece of data is still in this chunk.
402 bool gap_extends_to_infinity = (first_gap.end_offset !=
403 std::numeric_limits<QuicStreamOffset>::max() );
404 bool gap_ends_in_this_block = (GetBlockIndex(first_gap.end_offset) ==
405 block_index);
406 if (gap_extends_to_infinity || gap_ends_in_this_block) {
407 return;
408 }
409 }
410 RetireBlock(block_index);
411 }
412
413 bool StreamSequencerBuffer::Empty() const {
414 return gaps_.size() == 1 && gaps_.front().begin_offset == total_bytes_read_;
415 }
416
417 size_t StreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
418 if ((block_index + 1) == blocks_count_) {
419 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
420 if (result == 0) { // whole block
421 result = kBlockSizeBytes;
422 }
423 return result;
424 } else {
425 return kBlockSizeBytes;
426 }
427 }
428
429 void StreamSequencerBuffer::UpdateFrameArrivalMap(QuicStreamOffset offset) {
430 // Get the frame before which all frames should be removed.
431 auto next_frame = frame_arrival_time_map_.upper_bound(offset);
432 DCHECK(next_frame != frame_arrival_time_map_.begin());
433 auto iter = frame_arrival_time_map_.begin();
434 while (iter != next_frame) {
435 auto erased = *iter;
436 iter = frame_arrival_time_map_.erase(iter);
437 DVLOG(1) << "remove FrameInfo with offsest: " << erased.first
438 << " len: " << erased.second.length;
439 if (erased.first + erased.second.length > offset) {
440 // If last frame is partially read out, update this FrameInfo and insert
441 // it back.
442 auto updated = std::make_pair(
443 offset, FrameInfo(erased.first + erased.second.length - offset,
444 erased.second.timestamp));
445 DVLOG(1) << "insert back FrameInfo with offset: " << updated.first
446 << " len: " << updated.second.length;
447 frame_arrival_time_map_.insert(updated);
448 }
449 }
450 }
451
452 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/stream_sequencer_buffer.h ('k') | net/quic/stream_sequencer_buffer_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698