Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(482)

Unified Diff: net/quic/stream_sequencer_buffer.cc

Issue 1409053006: Create a new data structure StreamSequencerBuffer for QuicStreamSequencer. Currently QuicStreamSequ… (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@106492030
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/quic/stream_sequencer_buffer.h ('k') | net/quic/stream_sequencer_buffer_test.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/quic/stream_sequencer_buffer.cc
diff --git a/net/quic/stream_sequencer_buffer.cc b/net/quic/stream_sequencer_buffer.cc
new file mode 100644
index 0000000000000000000000000000000000000000..23c08c18d9379692b8d13ff7d68ecfa4350e863d
--- /dev/null
+++ b/net/quic/stream_sequencer_buffer.cc
@@ -0,0 +1,452 @@
+// Copyright (c) 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/quic/stream_sequencer_buffer.h"
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+
+using std::min;
+
+namespace net {
+
+StreamSequencerBuffer::Gap::Gap(QuicStreamOffset begin_offset,
+ QuicStreamOffset end_offset)
+ : begin_offset(begin_offset), end_offset(end_offset) {}
+
+StreamSequencerBuffer::FrameInfo::FrameInfo() : length(1),
+ timestamp(QuicTime::Zero()) {}
+
+StreamSequencerBuffer::FrameInfo::FrameInfo(size_t length, QuicTime timestamp)
+ : length(length), timestamp(timestamp) {}
+
+StreamSequencerBuffer::StreamSequencerBuffer(size_t max_capacity_bytes)
+ : max_buffer_capacity_bytes_(max_capacity_bytes),
+ blocks_count_(
+ ceil(static_cast<double>(max_capacity_bytes) / kBlockSizeBytes)),
+ total_bytes_read_(0),
+ blocks_(blocks_count_) {
+ Clear();
+}
+
+StreamSequencerBuffer::~StreamSequencerBuffer() {
+ Clear();
+}
+
+void StreamSequencerBuffer::Clear() {
+ for (size_t i = 0; i < blocks_count_; ++i) {
+ if (blocks_[i] != nullptr) {
+ RetireBlock(i);
+ }
+ }
+ num_bytes_buffered_ = 0;
+ // Reset gaps_ so that buffer is in a state as if all data before
+ // total_bytes_read_ has been consumed, and those after total_bytes_read_
+ // has never arrived.
+ gaps_ = std::list<Gap>(1, Gap(total_bytes_read_,
+ std::numeric_limits<QuicStreamOffset>::max())),
+ frame_arrival_time_map_.clear();
+}
+
+void StreamSequencerBuffer::RetireBlock(size_t idx) {
+ DCHECK(blocks_[idx] != nullptr);
+ delete blocks_[idx];
+ blocks_[idx] = nullptr;
+ DVLOG(1) << "Retired block" << idx;
+}
+
+QuicErrorCode StreamSequencerBuffer::OnStreamData(
+ QuicStreamOffset starting_offset,
+ base::StringPiece data,
+ QuicTime timestamp,
+ size_t* const bytes_buffered) {
+ *bytes_buffered = 0;
+ QuicStreamOffset offset = starting_offset;
+ size_t size = data.size();
+ if (size == 0) {
+ LOG(DFATAL) << "Attempted to write 0 bytes of data.";
+ return QUIC_INVALID_STREAM_FRAME;
+ }
+
+ // Find the first gap not ending before |offset|. This gap maybe the gap to
+ // fill if the arriving frame doesn't overlaps with previous ones.
+ std::list<Gap>::iterator current_gap = gaps_.begin();
+ while (current_gap != gaps_.end() && current_gap->end_offset <= offset) {
+ ++current_gap;
+ }
+
+ DCHECK(current_gap != gaps_.end());
+
+ // "duplication": might duplicate with data alread filled,but also might
+ // overlap across different base::StringPiece objects already written.
+ // In both cases, don't write the data,
+ // and allow the caller of this method to handle the result.
+ if (offset < current_gap->begin_offset &&
+ offset + size <= current_gap->begin_offset) {
+ DVLOG(1) << "duplicated data at offset:" << offset << " len: " << size;
+ return QUIC_NO_ERROR;
+ }
+ if (offset < current_gap->begin_offset &&
+ offset + size > current_gap->begin_offset) {
+ // Beginning of new data overlaps data before current gap.
+ return QUIC_INVALID_STREAM_DATA;
+ }
+ if (offset + size > current_gap->end_offset) {
+ // End of new data overlaps with data after current gap.
+ return QUIC_INVALID_STREAM_DATA;
+ }
+
+ // Write beyond the current range this buffer is covering.
+ if (offset + size > total_bytes_read_ + max_buffer_capacity_bytes_) {
+ return QUIC_INTERNAL_ERROR;
+ }
+
+ size_t total_written = 0;
+ size_t source_remaining = size;
+ const char* source = data.data();
+ // Write data block by block. If corresponding block has not created yet,
+ // create it first.
+ // Stop when all data are written or reaches the logical end of the buffer.
+ while (source_remaining > 0) {
+ const size_t write_block_num = GetBlockIndex(offset);
+ const size_t write_block_offset = GetInBlockOffset(offset);
+ DCHECK_GT(blocks_count_, write_block_num);
+
+ size_t block_capacity = GetBlockCapacity(write_block_num);
+ size_t bytes_avail = block_capacity - write_block_offset;
+
+ // If this write meets the upper boundary of the buffer,
+ // reduce the available free bytes.
+ if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
+ bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
+ }
+
+ if (blocks_[write_block_num] == nullptr) {
+ // TODO(danzh): Investigate if using a freelist would improve performance.
+ // Same as RetireBlock().
+ blocks_[write_block_num] = new BufferBlock();
+ }
+
+ const size_t bytes_to_copy = min<size_t>(bytes_avail, source_remaining);
+ char* dest = blocks_[write_block_num]->buffer + write_block_offset;
+ DVLOG(1) << "write at offset: " << offset << " len: " << bytes_to_copy;
+ memcpy(dest, source, bytes_to_copy);
+ source += bytes_to_copy;
+ source_remaining -= bytes_to_copy;
+ offset += bytes_to_copy;
+ total_written += bytes_to_copy;
+ }
+
+ DCHECK_GT(total_written, 0u);
+ *bytes_buffered = total_written;
+ UpdateGapList(current_gap, starting_offset, total_written);
+
+ frame_arrival_time_map_.insert(
+ std::make_pair(starting_offset, FrameInfo(size, timestamp)));
+ num_bytes_buffered_ += total_written;
+ return QUIC_NO_ERROR;
+}
+
+inline void StreamSequencerBuffer::UpdateGapList(
+ std::list<Gap>::iterator gap_with_new_data_written,
+ QuicStreamOffset start_offset,
+ size_t bytes_written) {
+ if (gap_with_new_data_written->begin_offset == start_offset &&
+ gap_with_new_data_written->end_offset > start_offset + bytes_written) {
+ // New data has been written into the left part of the buffer.
+ gap_with_new_data_written->begin_offset = start_offset + bytes_written;
+ } else if (gap_with_new_data_written->begin_offset < start_offset &&
+ gap_with_new_data_written->end_offset ==
+ start_offset + bytes_written) {
+ // New data has been written into the right part of the buffer.
+ gap_with_new_data_written->end_offset = start_offset;
+ } else if (gap_with_new_data_written->begin_offset < start_offset &&
+ gap_with_new_data_written->end_offset >
+ start_offset + bytes_written) {
+ // New data has been written into the middle of the buffer.
+ auto current = gap_with_new_data_written++;
+ size_t current_end = current->end_offset;
+ current->end_offset = start_offset;
+ gaps_.insert(gap_with_new_data_written,
+ Gap(start_offset + bytes_written, current_end));
+ } else if (gap_with_new_data_written->begin_offset == start_offset &&
+ gap_with_new_data_written->end_offset ==
+ start_offset + bytes_written) {
+ // This gap has been filled with new data. So it's no longer a gap.
+ gaps_.erase(gap_with_new_data_written);
+ }
+}
+
+size_t StreamSequencerBuffer::Readv(const iovec* dest_iov, size_t dest_count) {
+ size_t bytes_read = 0;
+ for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
+ char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
+ size_t dest_remaining = dest_iov[i].iov_len;
+ while (dest_remaining > 0 && ReadableBytes() > 0) {
+ size_t block_idx = NextBlockToRead();
+ size_t start_offset_in_block = ReadOffset();
+ size_t block_capacity = GetBlockCapacity(block_idx);
+ size_t bytes_available_in_block =
+ min<size_t>(ReadableBytes(), block_capacity - start_offset_in_block);
+ size_t bytes_to_copy = min<size_t>(bytes_available_in_block,
+ dest_remaining);
+ DCHECK_GT(bytes_to_copy, 0u);
+ DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
+ memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
+ bytes_to_copy);
+ dest += bytes_to_copy;
+ dest_remaining -= bytes_to_copy;
+ num_bytes_buffered_ -= bytes_to_copy;
+ total_bytes_read_ += bytes_to_copy;
+ bytes_read += bytes_to_copy;
+
+ // Retire the block if all the data is read out
+ // and no other data is stored in this block.
+ if (bytes_to_copy == bytes_available_in_block) {
+ RetireBlockIfEmpty(block_idx);
+ }
+ }
+ }
+
+ if (bytes_read > 0) {
+ UpdateFrameArrivalMap(total_bytes_read_);
+ }
+ return bytes_read;
+}
+
+int StreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
+ int iov_count) const {
+ DCHECK(iov != nullptr);
+ DCHECK_GT(iov_count, 0);
+
+ if (ReadableBytes() == 0) {
+ iov[0].iov_base = nullptr;
+ iov[0].iov_len = 0;
+ return 0;
+ }
+
+ size_t start_block_idx = NextBlockToRead();
+ QuicStreamOffset readable_offset_end = gaps_.front().begin_offset - 1;
+ DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
+ size_t end_block_offset = GetInBlockOffset(readable_offset_end);
+ size_t end_block_idx = GetBlockIndex(readable_offset_end);
+
+ // If readable region is within one block, deal with it seperately.
+ if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
+ iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
+ iov[0].iov_len = ReadableBytes();
+ DVLOG(1) << "get only block" << start_block_idx;
+ return 1;
+ }
+
+ // Get first block
+ iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
+ iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
+ DVLOG(1) << "get first block" << start_block_idx << " with len "
+ << iov[0].iov_len;
+ DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
+ << "there should be more available data";
+
+ // Get readable regions of the rest blocks till either 2nd to last block
+ // before gap is met or |iov| is filled. For these blocks, one whole block is
+ // a region.
+ int iov_used = 1;
+ size_t block_idx = (start_block_idx + iov_used) % blocks_count_;
+ while (block_idx != end_block_idx && iov_used < iov_count) {
+ DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
+ iov[iov_used].iov_base = blocks_[block_idx]->buffer;
+ iov[iov_used].iov_len = GetBlockCapacity(block_idx);
+ DVLOG(1) << "get block" << block_idx;
+ ++iov_used;
+ block_idx = (start_block_idx + iov_used) % blocks_count_;
+ }
+
+ // Deal with last block if |iov| can hold more.
+ if (iov_used < iov_count) {
+ DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]);
+ iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
+ iov[iov_used].iov_len = end_block_offset + 1;
+ DVLOG(1) << "get last block " << end_block_idx;
+ ++iov_used;
+ }
+ return iov_used;
+}
+
+bool StreamSequencerBuffer::GetReadableRegion(iovec* iov,
+ QuicTime* timestamp) const {
+ if (ReadableBytes() == 0) {
+ iov[0].iov_base = nullptr;
+ iov[0].iov_len = 0;
+ return false;
+ }
+
+ size_t start_block_idx = NextBlockToRead();
+ iov->iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
+ size_t readable_bytes_in_block = min<size_t>(
+ GetBlockCapacity(start_block_idx) - ReadOffset(), ReadableBytes());
+ size_t region_len = 0;
+ auto iter = frame_arrival_time_map_.begin();
+ *timestamp = iter->second.timestamp;
+ DVLOG(1) << "readable bytes in block: " << readable_bytes_in_block;
+ for (; iter != frame_arrival_time_map_.end() &&
+ region_len + iter->second.length <= readable_bytes_in_block;
+ ++iter) {
+ if (iter->second.timestamp != *timestamp) {
+ // If reaches a frame arrive at another timestamp, stop expanding current
+ // region.
+ DVLOG(1) << "Meet frame with different timestamp.";
+ break;
+ }
+ region_len += iter->second.length;
+ DVLOG(1) << "Add bytes to region: " << iter->second.length;
+ }
+ if (iter == frame_arrival_time_map_.end() ||
+ iter->second.timestamp == *timestamp) {
+ // If encountered the end of readable bytes before reaching a different
+ // timestamp.
+ DVLOG(1) << "Get all readable bytes in first block.";
+ region_len = readable_bytes_in_block;
+ }
+ iov->iov_len = region_len;
+ return true;
+}
+
+bool StreamSequencerBuffer::MarkConsumed(size_t bytes_used) {
+ if (bytes_used > ReadableBytes()) {
+ return false;
+ }
+ size_t bytes_to_consume = bytes_used;
+ while (bytes_to_consume > 0) {
+ size_t block_idx = NextBlockToRead();
+ size_t offset_in_block = ReadOffset();
+ size_t bytes_available = min<size_t>(
+ ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
+ size_t bytes_read = min<size_t>(bytes_to_consume, bytes_available);
+ total_bytes_read_ += bytes_read;
+ num_bytes_buffered_ -= bytes_read;
+ bytes_to_consume -= bytes_read;
+ // If advanced to the end of current block and end of buffer hasn't wrapped
+ // to this block yet.
+ if (bytes_available == bytes_read) {
+ RetireBlockIfEmpty(block_idx);
+ }
+ }
+ if (bytes_used > 0) {
+ UpdateFrameArrivalMap(total_bytes_read_);
+ }
+ return true;
+}
+
+size_t StreamSequencerBuffer::FlushBufferedFrames() {
+ size_t prev_total_bytes_read = total_bytes_read_;
+ total_bytes_read_ = gaps_.back().begin_offset;
+ Clear();
+ return total_bytes_read_ - prev_total_bytes_read;
+}
+
+size_t StreamSequencerBuffer::ReadableBytes() const {
+ return gaps_.front().begin_offset - total_bytes_read_;
+}
+
+bool StreamSequencerBuffer::HasBytesToRead() const {
+ return ReadableBytes() > 0;
+}
+
+QuicStreamOffset StreamSequencerBuffer::BytesConsumed() const {
+ return total_bytes_read_;
+}
+
+size_t StreamSequencerBuffer::BytesBuffered() const {
+ return num_bytes_buffered_;
+}
+
+size_t StreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
+ return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
+}
+
+size_t StreamSequencerBuffer::GetInBlockOffset(QuicStreamOffset offset) const {
+ return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
+}
+
+size_t StreamSequencerBuffer::ReadOffset() const {
+ return GetInBlockOffset(total_bytes_read_);
+}
+
+size_t StreamSequencerBuffer::NextBlockToRead() const {
+ return GetBlockIndex(total_bytes_read_);
+}
+
+void StreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
+ DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0)
+ << "RetireBlockIfEmpty() should only be called when advancing to next block"
+ " or a gap has been reached.";
+ // If the whole buffer becomes empty, the last piece of data has been read.
+ if (Empty()) {
+ RetireBlock(block_index);
+ return;
+ }
+
+ // Check where the logical end of this buffer is.
+ // Not empty if the end of circular buffer has been wrapped to this block.
+ if (GetBlockIndex(gaps_.back().begin_offset - 1) == block_index) {
+ return;
+ }
+
+ // Read index remains in this block, which means a gap has been reached.
+ if (NextBlockToRead() == block_index) {
+ Gap first_gap = gaps_.front();
+ DCHECK(first_gap.begin_offset == total_bytes_read_);
+ // Check where the next piece data is.
+ // Not empty if next piece of data is still in this chunk.
+ bool gap_extends_to_infinity = (first_gap.end_offset !=
+ std::numeric_limits<QuicStreamOffset>::max());
+ bool gap_ends_in_this_block = (GetBlockIndex(first_gap.end_offset) ==
+ block_index);
+ if (gap_extends_to_infinity || gap_ends_in_this_block) {
+ return;
+ }
+ }
+ RetireBlock(block_index);
+}
+
+bool StreamSequencerBuffer::Empty() const {
+ return gaps_.size() == 1 && gaps_.front().begin_offset == total_bytes_read_;
+}
+
+size_t StreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
+ if ((block_index + 1) == blocks_count_) {
+ size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
+ if (result == 0) { // whole block
+ result = kBlockSizeBytes;
+ }
+ return result;
+ } else {
+ return kBlockSizeBytes;
+ }
+}
+
+void StreamSequencerBuffer::UpdateFrameArrivalMap(QuicStreamOffset offset) {
+ // Get the frame before which all frames should be removed.
+ auto next_frame = frame_arrival_time_map_.upper_bound(offset);
+ DCHECK(next_frame != frame_arrival_time_map_.begin());
+ auto iter = frame_arrival_time_map_.begin();
+ while (iter != next_frame) {
+ auto erased = *iter;
+ iter = frame_arrival_time_map_.erase(iter);
+ DVLOG(1) << "remove FrameInfo with offsest: " << erased.first
+ << " len: " << erased.second.length;
+ if (erased.first + erased.second.length > offset) {
+ // If last frame is partially read out, update this FrameInfo and insert
+ // it back.
+ auto updated = std::make_pair(
+ offset, FrameInfo(erased.first + erased.second.length - offset,
+ erased.second.timestamp));
+ DVLOG(1) << "insert back FrameInfo with offset: " << updated.first
+ << " len: " << updated.second.length;
+ frame_arrival_time_map_.insert(updated);
+ }
+ }
+}
+
+} // namespace net
« no previous file with comments | « net/quic/stream_sequencer_buffer.h ('k') | net/quic/stream_sequencer_buffer_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698