OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "net/quic/stream_sequencer_buffer.h" |
| 6 |
| 7 #include "base/basictypes.h" |
| 8 #include "base/logging.h" |
| 9 |
| 10 using std::min; |
| 11 |
| 12 namespace net { |
| 13 |
| 14 StreamSequencerBuffer::Gap::Gap(QuicStreamOffset begin_offset, |
| 15 QuicStreamOffset end_offset) |
| 16 : begin_offset(begin_offset), end_offset(end_offset) {} |
| 17 |
| 18 StreamSequencerBuffer::FrameInfo::FrameInfo() : length(1), |
| 19 timestamp(QuicTime::Zero()) {} |
| 20 |
| 21 StreamSequencerBuffer::FrameInfo::FrameInfo(size_t length, QuicTime timestamp) |
| 22 : length(length), timestamp(timestamp) {} |
| 23 |
| 24 StreamSequencerBuffer::StreamSequencerBuffer(size_t max_capacity_bytes) |
| 25 : max_buffer_capacity_bytes_(max_capacity_bytes), |
| 26 blocks_count_( |
| 27 ceil(static_cast<double>(max_capacity_bytes) / kBlockSizeBytes)), |
| 28 total_bytes_read_(0), |
| 29 blocks_(blocks_count_) { |
| 30 Clear(); |
| 31 } |
| 32 |
| 33 StreamSequencerBuffer::~StreamSequencerBuffer() { |
| 34 Clear(); |
| 35 } |
| 36 |
| 37 void StreamSequencerBuffer::Clear() { |
| 38 for (size_t i = 0; i < blocks_count_; ++i) { |
| 39 if (blocks_[i] != nullptr) { |
| 40 RetireBlock(i); |
| 41 } |
| 42 } |
| 43 num_bytes_buffered_ = 0; |
| 44 // Reset gaps_ so that buffer is in a state as if all data before |
| 45 // total_bytes_read_ has been consumed, and those after total_bytes_read_ |
| 46 // has never arrived. |
| 47 gaps_ = std::list<Gap>(1, Gap(total_bytes_read_, |
| 48 std::numeric_limits<QuicStreamOffset>::max())), |
| 49 frame_arrival_time_map_.clear(); |
| 50 } |
| 51 |
| 52 void StreamSequencerBuffer::RetireBlock(size_t idx) { |
| 53 DCHECK(blocks_[idx] != nullptr); |
| 54 delete blocks_[idx]; |
| 55 blocks_[idx] = nullptr; |
| 56 DVLOG(1) << "Retired block" << idx; |
| 57 } |
| 58 |
| 59 QuicErrorCode StreamSequencerBuffer::OnStreamData( |
| 60 QuicStreamOffset starting_offset, |
| 61 base::StringPiece data, |
| 62 QuicTime timestamp, |
| 63 size_t* const bytes_buffered) { |
| 64 *bytes_buffered = 0; |
| 65 QuicStreamOffset offset = starting_offset; |
| 66 size_t size = data.size(); |
| 67 if (size == 0) { |
| 68 LOG(DFATAL) << "Attempted to write 0 bytes of data."; |
| 69 return QUIC_INVALID_STREAM_FRAME; |
| 70 } |
| 71 |
| 72 // Find the first gap not ending before |offset|. This gap maybe the gap to |
| 73 // fill if the arriving frame doesn't overlaps with previous ones. |
| 74 std::list<Gap>::iterator current_gap = gaps_.begin(); |
| 75 while (current_gap != gaps_.end() && current_gap->end_offset <= offset) { |
| 76 ++current_gap; |
| 77 } |
| 78 |
| 79 DCHECK(current_gap != gaps_.end()); |
| 80 |
| 81 // "duplication": might duplicate with data alread filled,but also might |
| 82 // overlap across different base::StringPiece objects already written. |
| 83 // In both cases, don't write the data, |
| 84 // and allow the caller of this method to handle the result. |
| 85 if (offset < current_gap->begin_offset && |
| 86 offset + size <= current_gap->begin_offset) { |
| 87 DVLOG(1) << "duplicated data at offset:" << offset << " len: " << size; |
| 88 return QUIC_NO_ERROR; |
| 89 } |
| 90 if (offset < current_gap->begin_offset && |
| 91 offset + size > current_gap->begin_offset) { |
| 92 // Beginning of new data overlaps data before current gap. |
| 93 return QUIC_INVALID_STREAM_DATA; |
| 94 } |
| 95 if (offset + size > current_gap->end_offset) { |
| 96 // End of new data overlaps with data after current gap. |
| 97 return QUIC_INVALID_STREAM_DATA; |
| 98 } |
| 99 |
| 100 // Write beyond the current range this buffer is covering. |
| 101 if (offset + size > total_bytes_read_ + max_buffer_capacity_bytes_) { |
| 102 return QUIC_INTERNAL_ERROR; |
| 103 } |
| 104 |
| 105 size_t total_written = 0; |
| 106 size_t source_remaining = size; |
| 107 const char* source = data.data(); |
| 108 // Write data block by block. If corresponding block has not created yet, |
| 109 // create it first. |
| 110 // Stop when all data are written or reaches the logical end of the buffer. |
| 111 while (source_remaining > 0) { |
| 112 const size_t write_block_num = GetBlockIndex(offset); |
| 113 const size_t write_block_offset = GetInBlockOffset(offset); |
| 114 DCHECK_GT(blocks_count_, write_block_num); |
| 115 |
| 116 size_t block_capacity = GetBlockCapacity(write_block_num); |
| 117 size_t bytes_avail = block_capacity - write_block_offset; |
| 118 |
| 119 // If this write meets the upper boundary of the buffer, |
| 120 // reduce the available free bytes. |
| 121 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) { |
| 122 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset; |
| 123 } |
| 124 |
| 125 if (blocks_[write_block_num] == nullptr) { |
| 126 // TODO(danzh): Investigate if using a freelist would improve performance. |
| 127 // Same as RetireBlock(). |
| 128 blocks_[write_block_num] = new BufferBlock(); |
| 129 } |
| 130 |
| 131 const size_t bytes_to_copy = min<size_t>(bytes_avail, source_remaining); |
| 132 char* dest = blocks_[write_block_num]->buffer + write_block_offset; |
| 133 DVLOG(1) << "write at offset: " << offset << " len: " << bytes_to_copy; |
| 134 memcpy(dest, source, bytes_to_copy); |
| 135 source += bytes_to_copy; |
| 136 source_remaining -= bytes_to_copy; |
| 137 offset += bytes_to_copy; |
| 138 total_written += bytes_to_copy; |
| 139 } |
| 140 |
| 141 DCHECK_GT(total_written, 0u); |
| 142 *bytes_buffered = total_written; |
| 143 UpdateGapList(current_gap, starting_offset, total_written); |
| 144 |
| 145 frame_arrival_time_map_.insert( |
| 146 std::make_pair(starting_offset, FrameInfo(size, timestamp))); |
| 147 num_bytes_buffered_ += total_written; |
| 148 return QUIC_NO_ERROR; |
| 149 } |
| 150 |
| 151 inline void StreamSequencerBuffer::UpdateGapList( |
| 152 std::list<Gap>::iterator gap_with_new_data_written, |
| 153 QuicStreamOffset start_offset, |
| 154 size_t bytes_written) { |
| 155 if (gap_with_new_data_written->begin_offset == start_offset && |
| 156 gap_with_new_data_written->end_offset > start_offset + bytes_written) { |
| 157 // New data has been written into the left part of the buffer. |
| 158 gap_with_new_data_written->begin_offset = start_offset + bytes_written; |
| 159 } else if (gap_with_new_data_written->begin_offset < start_offset && |
| 160 gap_with_new_data_written->end_offset == |
| 161 start_offset + bytes_written) { |
| 162 // New data has been written into the right part of the buffer. |
| 163 gap_with_new_data_written->end_offset = start_offset; |
| 164 } else if (gap_with_new_data_written->begin_offset < start_offset && |
| 165 gap_with_new_data_written->end_offset > |
| 166 start_offset + bytes_written) { |
| 167 // New data has been written into the middle of the buffer. |
| 168 auto current = gap_with_new_data_written++; |
| 169 size_t current_end = current->end_offset; |
| 170 current->end_offset = start_offset; |
| 171 gaps_.insert(gap_with_new_data_written, |
| 172 Gap(start_offset + bytes_written, current_end)); |
| 173 } else if (gap_with_new_data_written->begin_offset == start_offset && |
| 174 gap_with_new_data_written->end_offset == |
| 175 start_offset + bytes_written) { |
| 176 // This gap has been filled with new data. So it's no longer a gap. |
| 177 gaps_.erase(gap_with_new_data_written); |
| 178 } |
| 179 } |
| 180 |
| 181 size_t StreamSequencerBuffer::Readv(const iovec* dest_iov, size_t dest_count) { |
| 182 size_t bytes_read = 0; |
| 183 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) { |
| 184 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base); |
| 185 size_t dest_remaining = dest_iov[i].iov_len; |
| 186 while (dest_remaining > 0 && ReadableBytes() > 0) { |
| 187 size_t block_idx = NextBlockToRead(); |
| 188 size_t start_offset_in_block = ReadOffset(); |
| 189 size_t block_capacity = GetBlockCapacity(block_idx); |
| 190 size_t bytes_available_in_block = |
| 191 min<size_t>(ReadableBytes(), block_capacity - start_offset_in_block); |
| 192 size_t bytes_to_copy = min<size_t>(bytes_available_in_block, |
| 193 dest_remaining); |
| 194 DCHECK_GT(bytes_to_copy, 0u); |
| 195 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]); |
| 196 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block, |
| 197 bytes_to_copy); |
| 198 dest += bytes_to_copy; |
| 199 dest_remaining -= bytes_to_copy; |
| 200 num_bytes_buffered_ -= bytes_to_copy; |
| 201 total_bytes_read_ += bytes_to_copy; |
| 202 bytes_read += bytes_to_copy; |
| 203 |
| 204 // Retire the block if all the data is read out |
| 205 // and no other data is stored in this block. |
| 206 if (bytes_to_copy == bytes_available_in_block) { |
| 207 RetireBlockIfEmpty(block_idx); |
| 208 } |
| 209 } |
| 210 } |
| 211 |
| 212 if (bytes_read > 0) { |
| 213 UpdateFrameArrivalMap(total_bytes_read_); |
| 214 } |
| 215 return bytes_read; |
| 216 } |
| 217 |
| 218 int StreamSequencerBuffer::GetReadableRegions(struct iovec* iov, |
| 219 int iov_count) const { |
| 220 DCHECK(iov != nullptr); |
| 221 DCHECK_GT(iov_count, 0); |
| 222 |
| 223 if (ReadableBytes() == 0) { |
| 224 iov[0].iov_base = nullptr; |
| 225 iov[0].iov_len = 0; |
| 226 return 0; |
| 227 } |
| 228 |
| 229 size_t start_block_idx = NextBlockToRead(); |
| 230 QuicStreamOffset readable_offset_end = gaps_.front().begin_offset - 1; |
| 231 DCHECK_GE(readable_offset_end + 1, total_bytes_read_); |
| 232 size_t end_block_offset = GetInBlockOffset(readable_offset_end); |
| 233 size_t end_block_idx = GetBlockIndex(readable_offset_end); |
| 234 |
| 235 // If readable region is within one block, deal with it seperately. |
| 236 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) { |
| 237 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); |
| 238 iov[0].iov_len = ReadableBytes(); |
| 239 DVLOG(1) << "get only block" << start_block_idx; |
| 240 return 1; |
| 241 } |
| 242 |
| 243 // Get first block |
| 244 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); |
| 245 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset(); |
| 246 DVLOG(1) << "get first block" << start_block_idx << " with len " |
| 247 << iov[0].iov_len; |
| 248 DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len) |
| 249 << "there should be more available data"; |
| 250 |
| 251 // Get readable regions of the rest blocks till either 2nd to last block |
| 252 // before gap is met or |iov| is filled. For these blocks, one whole block is |
| 253 // a region. |
| 254 int iov_used = 1; |
| 255 size_t block_idx = (start_block_idx + iov_used) % blocks_count_; |
| 256 while (block_idx != end_block_idx && iov_used < iov_count) { |
| 257 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]); |
| 258 iov[iov_used].iov_base = blocks_[block_idx]->buffer; |
| 259 iov[iov_used].iov_len = GetBlockCapacity(block_idx); |
| 260 DVLOG(1) << "get block" << block_idx; |
| 261 ++iov_used; |
| 262 block_idx = (start_block_idx + iov_used) % blocks_count_; |
| 263 } |
| 264 |
| 265 // Deal with last block if |iov| can hold more. |
| 266 if (iov_used < iov_count) { |
| 267 DCHECK_NE(static_cast<BufferBlock*>(nullptr), blocks_[block_idx]); |
| 268 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer; |
| 269 iov[iov_used].iov_len = end_block_offset + 1; |
| 270 DVLOG(1) << "get last block " << end_block_idx; |
| 271 ++iov_used; |
| 272 } |
| 273 return iov_used; |
| 274 } |
| 275 |
| 276 bool StreamSequencerBuffer::GetReadableRegion(iovec* iov, |
| 277 QuicTime* timestamp) const { |
| 278 if (ReadableBytes() == 0) { |
| 279 iov[0].iov_base = nullptr; |
| 280 iov[0].iov_len = 0; |
| 281 return false; |
| 282 } |
| 283 |
| 284 size_t start_block_idx = NextBlockToRead(); |
| 285 iov->iov_base = blocks_[start_block_idx]->buffer + ReadOffset(); |
| 286 size_t readable_bytes_in_block = min<size_t>( |
| 287 GetBlockCapacity(start_block_idx) - ReadOffset(), ReadableBytes()); |
| 288 size_t region_len = 0; |
| 289 auto iter = frame_arrival_time_map_.begin(); |
| 290 *timestamp = iter->second.timestamp; |
| 291 DVLOG(1) << "readable bytes in block: " << readable_bytes_in_block; |
| 292 for (; iter != frame_arrival_time_map_.end() && |
| 293 region_len + iter->second.length <= readable_bytes_in_block; |
| 294 ++iter) { |
| 295 if (iter->second.timestamp != *timestamp) { |
| 296 // If reaches a frame arrive at another timestamp, stop expanding current |
| 297 // region. |
| 298 DVLOG(1) << "Meet frame with different timestamp."; |
| 299 break; |
| 300 } |
| 301 region_len += iter->second.length; |
| 302 DVLOG(1) << "Add bytes to region: " << iter->second.length; |
| 303 } |
| 304 if (iter == frame_arrival_time_map_.end() || |
| 305 iter->second.timestamp == *timestamp) { |
| 306 // If encountered the end of readable bytes before reaching a different |
| 307 // timestamp. |
| 308 DVLOG(1) << "Get all readable bytes in first block."; |
| 309 region_len = readable_bytes_in_block; |
| 310 } |
| 311 iov->iov_len = region_len; |
| 312 return true; |
| 313 } |
| 314 |
| 315 bool StreamSequencerBuffer::MarkConsumed(size_t bytes_used) { |
| 316 if (bytes_used > ReadableBytes()) { |
| 317 return false; |
| 318 } |
| 319 size_t bytes_to_consume = bytes_used; |
| 320 while (bytes_to_consume > 0) { |
| 321 size_t block_idx = NextBlockToRead(); |
| 322 size_t offset_in_block = ReadOffset(); |
| 323 size_t bytes_available = min<size_t>( |
| 324 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block); |
| 325 size_t bytes_read = min<size_t>(bytes_to_consume, bytes_available); |
| 326 total_bytes_read_ += bytes_read; |
| 327 num_bytes_buffered_ -= bytes_read; |
| 328 bytes_to_consume -= bytes_read; |
| 329 // If advanced to the end of current block and end of buffer hasn't wrapped |
| 330 // to this block yet. |
| 331 if (bytes_available == bytes_read) { |
| 332 RetireBlockIfEmpty(block_idx); |
| 333 } |
| 334 } |
| 335 if (bytes_used > 0) { |
| 336 UpdateFrameArrivalMap(total_bytes_read_); |
| 337 } |
| 338 return true; |
| 339 } |
| 340 |
| 341 size_t StreamSequencerBuffer::FlushBufferedFrames() { |
| 342 size_t prev_total_bytes_read = total_bytes_read_; |
| 343 total_bytes_read_ = gaps_.back().begin_offset; |
| 344 Clear(); |
| 345 return total_bytes_read_ - prev_total_bytes_read; |
| 346 } |
| 347 |
| 348 size_t StreamSequencerBuffer::ReadableBytes() const { |
| 349 return gaps_.front().begin_offset - total_bytes_read_; |
| 350 } |
| 351 |
| 352 bool StreamSequencerBuffer::HasBytesToRead() const { |
| 353 return ReadableBytes() > 0; |
| 354 } |
| 355 |
| 356 QuicStreamOffset StreamSequencerBuffer::BytesConsumed() const { |
| 357 return total_bytes_read_; |
| 358 } |
| 359 |
| 360 size_t StreamSequencerBuffer::BytesBuffered() const { |
| 361 return num_bytes_buffered_; |
| 362 } |
| 363 |
| 364 size_t StreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const { |
| 365 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes; |
| 366 } |
| 367 |
| 368 size_t StreamSequencerBuffer::GetInBlockOffset(QuicStreamOffset offset) const { |
| 369 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes; |
| 370 } |
| 371 |
| 372 size_t StreamSequencerBuffer::ReadOffset() const { |
| 373 return GetInBlockOffset(total_bytes_read_); |
| 374 } |
| 375 |
| 376 size_t StreamSequencerBuffer::NextBlockToRead() const { |
| 377 return GetBlockIndex(total_bytes_read_); |
| 378 } |
| 379 |
| 380 void StreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) { |
| 381 DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0) |
| 382 << "RetireBlockIfEmpty() should only be called when advancing to next block" |
| 383 " or a gap has been reached."; |
| 384 // If the whole buffer becomes empty, the last piece of data has been read. |
| 385 if (Empty()) { |
| 386 RetireBlock(block_index); |
| 387 return; |
| 388 } |
| 389 |
| 390 // Check where the logical end of this buffer is. |
| 391 // Not empty if the end of circular buffer has been wrapped to this block. |
| 392 if (GetBlockIndex(gaps_.back().begin_offset - 1) == block_index) { |
| 393 return; |
| 394 } |
| 395 |
| 396 // Read index remains in this block, which means a gap has been reached. |
| 397 if (NextBlockToRead() == block_index) { |
| 398 Gap first_gap = gaps_.front(); |
| 399 DCHECK(first_gap.begin_offset == total_bytes_read_); |
| 400 // Check where the next piece data is. |
| 401 // Not empty if next piece of data is still in this chunk. |
| 402 bool gap_extends_to_infinity = (first_gap.end_offset != |
| 403 std::numeric_limits<QuicStreamOffset>::max()
); |
| 404 bool gap_ends_in_this_block = (GetBlockIndex(first_gap.end_offset) == |
| 405 block_index); |
| 406 if (gap_extends_to_infinity || gap_ends_in_this_block) { |
| 407 return; |
| 408 } |
| 409 } |
| 410 RetireBlock(block_index); |
| 411 } |
| 412 |
| 413 bool StreamSequencerBuffer::Empty() const { |
| 414 return gaps_.size() == 1 && gaps_.front().begin_offset == total_bytes_read_; |
| 415 } |
| 416 |
| 417 size_t StreamSequencerBuffer::GetBlockCapacity(size_t block_index) const { |
| 418 if ((block_index + 1) == blocks_count_) { |
| 419 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes; |
| 420 if (result == 0) { // whole block |
| 421 result = kBlockSizeBytes; |
| 422 } |
| 423 return result; |
| 424 } else { |
| 425 return kBlockSizeBytes; |
| 426 } |
| 427 } |
| 428 |
| 429 void StreamSequencerBuffer::UpdateFrameArrivalMap(QuicStreamOffset offset) { |
| 430 // Get the frame before which all frames should be removed. |
| 431 auto next_frame = frame_arrival_time_map_.upper_bound(offset); |
| 432 DCHECK(next_frame != frame_arrival_time_map_.begin()); |
| 433 auto iter = frame_arrival_time_map_.begin(); |
| 434 while (iter != next_frame) { |
| 435 auto erased = *iter; |
| 436 iter = frame_arrival_time_map_.erase(iter); |
| 437 DVLOG(1) << "remove FrameInfo with offsest: " << erased.first |
| 438 << " len: " << erased.second.length; |
| 439 if (erased.first + erased.second.length > offset) { |
| 440 // If last frame is partially read out, update this FrameInfo and insert |
| 441 // it back. |
| 442 auto updated = std::make_pair( |
| 443 offset, FrameInfo(erased.first + erased.second.length - offset, |
| 444 erased.second.timestamp)); |
| 445 DVLOG(1) << "insert back FrameInfo with offset: " << updated.first |
| 446 << " len: " << updated.second.length; |
| 447 frame_arrival_time_map_.insert(updated); |
| 448 } |
| 449 } |
| 450 } |
| 451 |
| 452 } // namespace net |
OLD | NEW |