| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/quic_stream_sequencer.h" | 5 #include "net/quic/quic_stream_sequencer.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/logging.h" | 11 #include "base/logging.h" |
| 12 #include "net/quic/reliable_quic_stream.h" | 12 #include "net/quic/reliable_quic_stream.h" |
| 13 | 13 |
| 14 using std::min; | 14 using std::min; |
| 15 using std::numeric_limits; | 15 using std::numeric_limits; |
| 16 using std::string; | 16 using std::string; |
| 17 | 17 |
| 18 namespace net { | 18 namespace net { |
| 19 | 19 |
| 20 QuicStreamSequencer::FrameData::FrameData(QuicStreamOffset offset, |
| 21 string segment) |
| 22 : offset(offset), segment(segment) {} |
| 23 |
| 20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) | 24 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) |
| 21 : stream_(quic_stream), | 25 : stream_(quic_stream), |
| 22 num_bytes_consumed_(0), | 26 num_bytes_consumed_(0), |
| 23 close_offset_(numeric_limits<QuicStreamOffset>::max()), | 27 close_offset_(numeric_limits<QuicStreamOffset>::max()), |
| 24 blocked_(false), | 28 blocked_(false), |
| 25 num_bytes_buffered_(0), | 29 num_bytes_buffered_(0), |
| 26 num_frames_received_(0), | 30 num_frames_received_(0), |
| 27 num_duplicate_frames_received_(0), | 31 num_duplicate_frames_received_(0), |
| 28 num_early_frames_received_(0) { | 32 num_early_frames_received_(0) { |
| 29 } | 33 } |
| 30 | 34 |
| 31 QuicStreamSequencer::~QuicStreamSequencer() { | 35 QuicStreamSequencer::~QuicStreamSequencer() { |
| 32 } | 36 } |
| 33 | 37 |
| 34 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { | 38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { |
| 35 ++num_frames_received_; | 39 ++num_frames_received_; |
| 36 if (IsDuplicate(frame)) { | 40 FrameList::iterator insertion_point = FindInsertionPoint(frame); |
| 41 if (IsDuplicate(frame, insertion_point)) { |
| 37 ++num_duplicate_frames_received_; | 42 ++num_duplicate_frames_received_; |
| 38 // Silently ignore duplicates. | 43 // Silently ignore duplicates. |
| 39 return; | 44 return; |
| 40 } | 45 } |
| 41 | 46 |
| 42 if (FrameOverlapsBufferedData(frame)) { | 47 if (FrameOverlapsBufferedData(frame, insertion_point)) { |
| 43 stream_->CloseConnectionWithDetails( | 48 stream_->CloseConnectionWithDetails( |
| 44 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); | 49 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); |
| 45 return; | 50 return; |
| 46 } | 51 } |
| 47 | 52 |
| 48 const QuicStreamOffset byte_offset = frame.offset; | 53 const QuicStreamOffset byte_offset = frame.offset; |
| 49 const size_t data_len = frame.data.length(); | 54 const size_t data_len = frame.data.length(); |
| 50 if (data_len == 0 && !frame.fin) { | 55 if (data_len == 0 && !frame.fin) { |
| 51 // Stream frames must have data or a fin flag. | 56 // Stream frames must have data or a fin flag. |
| 52 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, | 57 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, |
| 53 "Empty stream frame without FIN set."); | 58 "Empty stream frame without FIN set."); |
| 54 return; | 59 return; |
| 55 } | 60 } |
| 56 | 61 |
| 57 if (frame.fin) { | 62 if (frame.fin) { |
| 58 CloseStreamAtOffset(frame.offset + data_len); | 63 CloseStreamAtOffset(frame.offset + data_len); |
| 59 if (data_len == 0) { | 64 if (data_len == 0) { |
| 60 return; | 65 return; |
| 61 } | 66 } |
| 62 } | 67 } |
| 63 | 68 |
| 64 if (byte_offset > num_bytes_consumed_) { | 69 if (byte_offset > num_bytes_consumed_) { |
| 65 ++num_early_frames_received_; | 70 ++num_early_frames_received_; |
| 66 } | 71 } |
| 67 | 72 |
| 68 // If the frame has arrived in-order then we can process it immediately, only | 73 // If the frame has arrived in-order then process it immediately, only |
| 69 // buffering if the stream is unable to process it. | 74 // buffering if the stream is unable to process it. |
| 70 size_t bytes_consumed = 0; | 75 size_t bytes_consumed = 0; |
| 71 if (!blocked_ && byte_offset == num_bytes_consumed_) { | 76 if (!blocked_ && byte_offset == num_bytes_consumed_) { |
| 72 DVLOG(1) << "Processing byte offset " << byte_offset; | 77 DVLOG(1) << "Processing byte offset " << byte_offset; |
| 73 bytes_consumed = | 78 bytes_consumed = |
| 74 stream_->ProcessRawData(frame.data.data(), frame.data.length()); | 79 stream_->ProcessRawData(frame.data.data(), frame.data.length()); |
| 75 num_bytes_consumed_ += bytes_consumed; | 80 num_bytes_consumed_ += bytes_consumed; |
| 76 stream_->AddBytesConsumed(bytes_consumed); | 81 stream_->AddBytesConsumed(bytes_consumed); |
| 77 | 82 |
| 78 if (MaybeCloseStream()) { | 83 if (MaybeCloseStream()) { |
| 79 return; | 84 return; |
| 80 } | 85 } |
| 81 if (bytes_consumed > data_len) { | 86 if (bytes_consumed > data_len) { |
| 82 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | 87 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); |
| 83 return; | 88 return; |
| 84 } else if (bytes_consumed == data_len) { | 89 } else if (bytes_consumed == data_len) { |
| 85 FlushBufferedFrames(); | 90 FlushBufferedFrames(); |
| 86 return; // it's safe to ack this frame. | 91 return; // it's safe to ack this frame. |
| 87 } | 92 } |
| 88 } | 93 } |
| 89 | 94 |
| 90 // Buffer any remaining data to be consumed by the stream when ready. | 95 // Buffer any remaining data to be consumed by the stream when ready. |
| 91 if (bytes_consumed < data_len) { | 96 if (bytes_consumed < data_len) { |
| 92 DVLOG(1) << "Buffering stream data at offset " << byte_offset; | 97 DVLOG(1) << "Buffering stream data at offset " << byte_offset; |
| 93 const size_t remaining_length = data_len - bytes_consumed; | 98 const size_t remaining_length = data_len - bytes_consumed; |
| 94 buffered_frames_.insert(std::make_pair( | 99 // Inserting an empty string and then copying to avoid the extra copy. |
| 95 byte_offset + bytes_consumed, | 100 insertion_point = buffered_frames_.insert( |
| 96 string(frame.data.data() + bytes_consumed, remaining_length))); | 101 insertion_point, FrameData(byte_offset + bytes_consumed, "")); |
| 102 insertion_point->segment = |
| 103 string(frame.data.data() + bytes_consumed, remaining_length); |
| 97 num_bytes_buffered_ += remaining_length; | 104 num_bytes_buffered_ += remaining_length; |
| 98 } | 105 } |
| 99 } | 106 } |
| 100 | 107 |
| 101 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { | 108 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { |
| 102 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); | 109 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); |
| 103 | 110 |
| 104 // If we have a scheduled termination or close, any new offset should match | 111 // If there is a scheduled close, the new offset should match it. |
| 105 // it. | |
| 106 if (close_offset_ != kMaxOffset && offset != close_offset_) { | 112 if (close_offset_ != kMaxOffset && offset != close_offset_) { |
| 107 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); | 113 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); |
| 108 return; | 114 return; |
| 109 } | 115 } |
| 110 | 116 |
| 111 close_offset_ = offset; | 117 close_offset_ = offset; |
| 112 | 118 |
| 113 MaybeCloseStream(); | 119 MaybeCloseStream(); |
| 114 } | 120 } |
| 115 | 121 |
| 116 bool QuicStreamSequencer::MaybeCloseStream() { | 122 bool QuicStreamSequencer::MaybeCloseStream() { |
| 117 if (!blocked_ && IsClosed()) { | 123 if (!blocked_ && IsClosed()) { |
| 118 DVLOG(1) << "Passing up termination, as we've processed " | 124 DVLOG(1) << "Passing up termination, as we've processed " |
| 119 << num_bytes_consumed_ << " of " << close_offset_ | 125 << num_bytes_consumed_ << " of " << close_offset_ |
| 120 << " bytes."; | 126 << " bytes."; |
| 121 // Technically it's an error if num_bytes_consumed isn't exactly | 127 // Technically it's an error if num_bytes_consumed isn't exactly |
| 122 // equal, but error handling seems silly at this point. | 128 // equal, but error handling seems silly at this point. |
| 123 stream_->OnFinRead(); | 129 stream_->OnFinRead(); |
| 124 buffered_frames_.clear(); | 130 buffered_frames_.clear(); |
| 125 num_bytes_buffered_ = 0; | 131 num_bytes_buffered_ = 0; |
| 126 return true; | 132 return true; |
| 127 } | 133 } |
| 128 return false; | 134 return false; |
| 129 } | 135 } |
| 130 | 136 |
| 131 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { | 137 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { |
| 132 DCHECK(!blocked_); | 138 DCHECK(!blocked_); |
| 133 FrameMap::const_iterator it = buffered_frames_.begin(); | 139 FrameList::const_iterator it = buffered_frames_.begin(); |
| 134 size_t index = 0; | 140 size_t index = 0; |
| 135 QuicStreamOffset offset = num_bytes_consumed_; | 141 QuicStreamOffset offset = num_bytes_consumed_; |
| 136 while (it != buffered_frames_.end() && index < iov_len) { | 142 while (it != buffered_frames_.end() && index < iov_len) { |
| 137 if (it->first != offset) return index; | 143 if (it->offset != offset) { |
| 144 return index; |
| 145 } |
| 138 | 146 |
| 139 iov[index].iov_base = static_cast<void*>( | 147 iov[index].iov_base = |
| 140 const_cast<char*>(it->second.data())); | 148 static_cast<void*>(const_cast<char*>(it->segment.data())); |
| 141 iov[index].iov_len = it->second.size(); | 149 iov[index].iov_len = it->segment.size(); |
| 142 offset += it->second.size(); | 150 offset += it->segment.size(); |
| 143 | 151 |
| 144 ++index; | 152 ++index; |
| 145 ++it; | 153 ++it; |
| 146 } | 154 } |
| 147 return index; | 155 return index; |
| 148 } | 156 } |
| 149 | 157 |
| 150 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { | 158 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { |
| 151 DCHECK(!blocked_); | 159 DCHECK(!blocked_); |
| 152 FrameMap::iterator it = buffered_frames_.begin(); | 160 FrameList::iterator it = buffered_frames_.begin(); |
| 153 size_t iov_index = 0; | 161 size_t iov_index = 0; |
| 154 size_t iov_offset = 0; | 162 size_t iov_offset = 0; |
| 155 size_t frame_offset = 0; | 163 size_t frame_offset = 0; |
| 156 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_; | 164 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_; |
| 157 | 165 |
| 158 while (iov_index < iov_len && | 166 while (iov_index < iov_len && it != buffered_frames_.end() && |
| 159 it != buffered_frames_.end() && | 167 it->offset == num_bytes_consumed_) { |
| 160 it->first == num_bytes_consumed_) { | |
| 161 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, | 168 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, |
| 162 it->second.size() - frame_offset); | 169 it->segment.size() - frame_offset); |
| 163 | 170 |
| 164 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; | 171 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; |
| 165 memcpy(iov_ptr, | 172 memcpy(iov_ptr, it->segment.data() + frame_offset, bytes_to_read); |
| 166 it->second.data() + frame_offset, bytes_to_read); | |
| 167 frame_offset += bytes_to_read; | 173 frame_offset += bytes_to_read; |
| 168 iov_offset += bytes_to_read; | 174 iov_offset += bytes_to_read; |
| 169 | 175 |
| 170 if (iov[iov_index].iov_len == iov_offset) { | 176 if (iov[iov_index].iov_len == iov_offset) { |
| 171 // We've filled this buffer. | 177 // We've filled this buffer. |
| 172 iov_offset = 0; | 178 iov_offset = 0; |
| 173 ++iov_index; | 179 ++iov_index; |
| 174 } | 180 } |
| 175 if (it->second.size() == frame_offset) { | 181 if (it->segment.size() == frame_offset) { |
| 176 // We've copied this whole frame | 182 // We've copied this whole frame |
| 177 RecordBytesConsumed(it->second.size()); | 183 RecordBytesConsumed(it->segment.size()); |
| 178 buffered_frames_.erase(it); | 184 buffered_frames_.erase(it); |
| 179 it = buffered_frames_.begin(); | 185 it = buffered_frames_.begin(); |
| 180 frame_offset = 0; | 186 frame_offset = 0; |
| 181 } | 187 } |
| 182 } | 188 } |
| 183 // We've finished copying. If we have a partial frame, update it. | 189 // Done copying. If there is a partial frame, update it. |
| 184 if (frame_offset != 0) { | 190 if (frame_offset != 0) { |
| 185 buffered_frames_.insert(std::make_pair(it->first + frame_offset, | 191 buffered_frames_.push_front( |
| 186 it->second.substr(frame_offset))); | 192 FrameData(it->offset + frame_offset, it->segment.substr(frame_offset))); |
| 187 buffered_frames_.erase(buffered_frames_.begin()); | 193 buffered_frames_.erase(it); |
| 188 RecordBytesConsumed(frame_offset); | 194 RecordBytesConsumed(frame_offset); |
| 189 } | 195 } |
| 190 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed); | 196 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed); |
| 191 } | 197 } |
| 192 | 198 |
| 193 bool QuicStreamSequencer::HasBytesToRead() const { | 199 bool QuicStreamSequencer::HasBytesToRead() const { |
| 194 FrameMap::const_iterator it = buffered_frames_.begin(); | 200 return !buffered_frames_.empty() && |
| 195 | 201 buffered_frames_.begin()->offset == num_bytes_consumed_; |
| 196 return it != buffered_frames_.end() && it->first == num_bytes_consumed_; | |
| 197 } | 202 } |
| 198 | 203 |
| 199 bool QuicStreamSequencer::IsClosed() const { | 204 bool QuicStreamSequencer::IsClosed() const { |
| 200 return num_bytes_consumed_ >= close_offset_; | 205 return num_bytes_consumed_ >= close_offset_; |
| 201 } | 206 } |
| 202 | 207 |
| 208 QuicStreamSequencer::FrameList::iterator |
| 209 QuicStreamSequencer::FindInsertionPoint(const QuicStreamFrame& frame) { |
| 210 if (buffered_frames_.empty()) { |
| 211 return buffered_frames_.begin(); |
| 212 } |
| 213 // If it's after all buffered_frames, return the end. |
| 214 if (frame.offset >= (buffered_frames_.rbegin()->offset + |
| 215 buffered_frames_.rbegin()->segment.length())) { |
| 216 return buffered_frames_.end(); |
| 217 } |
| 218 FrameList::iterator iter = buffered_frames_.begin(); |
| 219 // Only advance the iterator if the data begins after the already received |
| 220 // frame. If the new frame overlaps with an existing frame, the iterator will |
| 221 // still point to the frame it overlaps with. |
| 222 while (iter != buffered_frames_.end() && |
| 223 frame.offset >= iter->offset + iter->segment.length()) { |
| 224 ++iter; |
| 225 } |
| 226 return iter; |
| 227 } |
| 228 |
| 203 bool QuicStreamSequencer::FrameOverlapsBufferedData( | 229 bool QuicStreamSequencer::FrameOverlapsBufferedData( |
| 204 const QuicStreamFrame& frame) const { | 230 const QuicStreamFrame& frame, |
| 205 if (buffered_frames_.empty()) { | 231 FrameList::const_iterator insertion_point) const { |
| 232 if (buffered_frames_.empty() || insertion_point == buffered_frames_.end()) { |
| 206 return false; | 233 return false; |
| 207 } | 234 } |
| 208 | 235 // If there is a buffered frame with a higher starting offset, then check to |
| 209 FrameMap::const_iterator next_frame = | 236 // see if the new frame overlaps the beginning of the higher frame. |
| 210 buffered_frames_.lower_bound(frame.offset); | 237 if (frame.offset < insertion_point->offset && |
| 211 // Duplicate frames should have been dropped in IsDuplicate. | 238 frame.offset + frame.data.length() > insertion_point->offset) { |
| 212 DCHECK(next_frame == buffered_frames_.end() || | |
| 213 next_frame->first != frame.offset); | |
| 214 | |
| 215 // If there is a buffered frame with a higher starting offset, then we check | |
| 216 // to see if the new frame runs into the higher frame. | |
| 217 if (next_frame != buffered_frames_.end() && | |
| 218 (frame.offset + frame.data.size()) > next_frame->first) { | |
| 219 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + " | 239 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + " |
| 220 << frame.data.size() << " > " << next_frame->first; | 240 << frame.data.size() << " > " << insertion_point->offset; |
| 241 return true; |
| 242 } |
| 243 // If there is a buffered frame with a lower starting offset, then check to |
| 244 // see if the buffered frame runs into the new frame. |
| 245 if (frame.offset >= insertion_point->offset && |
| 246 frame.offset < |
| 247 insertion_point->offset + insertion_point->segment.length()) { |
| 248 DVLOG(1) << "Preceeding frame overlaps new frame: " |
| 249 << insertion_point->offset << " + " |
| 250 << insertion_point->segment.length() << " > " << frame.offset; |
| 221 return true; | 251 return true; |
| 222 } | 252 } |
| 223 | 253 |
| 224 // If there is a buffered frame with a lower starting offset, then we check | |
| 225 // to see if the buffered frame runs into the new frame. | |
| 226 if (next_frame != buffered_frames_.begin()) { | |
| 227 FrameMap::const_iterator preceeding_frame = --next_frame; | |
| 228 QuicStreamOffset offset = preceeding_frame->first; | |
| 229 uint64 data_length = preceeding_frame->second.length(); | |
| 230 if ((offset + data_length) > frame.offset) { | |
| 231 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + " | |
| 232 << data_length << " > " << frame.offset; | |
| 233 return true; | |
| 234 } | |
| 235 } | |
| 236 return false; | 254 return false; |
| 237 } | 255 } |
| 238 | 256 |
| 239 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { | 257 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { |
| 240 DCHECK(!blocked_); | 258 DCHECK(!blocked_); |
| 241 size_t end_offset = num_bytes_consumed_ + num_bytes_consumed; | 259 size_t end_offset = num_bytes_consumed_ + num_bytes_consumed; |
| 242 while (!buffered_frames_.empty() && end_offset != num_bytes_consumed_) { | 260 while (!buffered_frames_.empty() && end_offset != num_bytes_consumed_) { |
| 243 FrameMap::iterator it = buffered_frames_.begin(); | 261 FrameList::iterator it = buffered_frames_.begin(); |
| 244 if (it->first != num_bytes_consumed_) { | 262 if (it->offset != num_bytes_consumed_) { |
| 245 LOG(DFATAL) << "Invalid argument to MarkConsumed. " | 263 LOG(DFATAL) << "Invalid argument to MarkConsumed. " |
| 246 << " num_bytes_consumed_: " << num_bytes_consumed_ | 264 << " num_bytes_consumed_: " << num_bytes_consumed_ |
| 247 << " end_offset: " << end_offset << " offset: " << it->first | 265 << " end_offset: " << end_offset << " offset: " << it->offset |
| 248 << " length: " << it->second.length(); | 266 << " length: " << it->segment.length(); |
| 249 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | 267 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); |
| 250 return; | 268 return; |
| 251 } | 269 } |
| 252 | 270 |
| 253 if (it->first + it->second.length() <= end_offset) { | 271 if (it->offset + it->segment.length() <= end_offset) { |
| 254 num_bytes_consumed_ += it->second.length(); | 272 num_bytes_consumed_ += it->segment.length(); |
| 255 num_bytes_buffered_ -= it->second.length(); | 273 num_bytes_buffered_ -= it->segment.length(); |
| 256 // This chunk is entirely consumed. | 274 // This chunk is entirely consumed. |
| 257 buffered_frames_.erase(it); | 275 buffered_frames_.erase(it); |
| 258 continue; | 276 continue; |
| 259 } | 277 } |
| 260 | 278 |
| 261 // Partially consume this frame. | 279 // Partially consume this frame. |
| 262 size_t delta = end_offset - it->first; | 280 size_t delta = end_offset - it->offset; |
| 263 RecordBytesConsumed(delta); | 281 RecordBytesConsumed(delta); |
| 264 buffered_frames_.insert(make_pair(end_offset, it->second.substr(delta))); | 282 string new_data = it->segment.substr(delta); |
| 265 buffered_frames_.erase(it); | 283 buffered_frames_.erase(it); |
| 284 buffered_frames_.push_front(FrameData(num_bytes_consumed_, new_data)); |
| 266 break; | 285 break; |
| 267 } | 286 } |
| 268 } | 287 } |
| 269 | 288 |
| 270 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { | 289 bool QuicStreamSequencer::IsDuplicate( |
| 271 // A frame is duplicate if the frame offset is smaller than our bytes consumed | 290 const QuicStreamFrame& frame, |
| 272 // or we have stored the frame in our map. | 291 FrameList::const_iterator insertion_point) const { |
| 273 // TODO(pwestin): Is it possible that a new frame contain more data even if | 292 // A frame is duplicate if the frame offset is smaller than the bytes consumed |
| 274 // the offset is the same? | 293 // or identical to an already received frame. |
| 275 return frame.offset < num_bytes_consumed_ || | 294 return frame.offset < num_bytes_consumed_ || |
| 276 buffered_frames_.find(frame.offset) != buffered_frames_.end(); | 295 (insertion_point != buffered_frames_.end() && |
| 296 frame.offset == insertion_point->offset); |
| 277 } | 297 } |
| 278 | 298 |
| 279 void QuicStreamSequencer::SetBlockedUntilFlush() { | 299 void QuicStreamSequencer::SetBlockedUntilFlush() { |
| 280 blocked_ = true; | 300 blocked_ = true; |
| 281 } | 301 } |
| 282 | 302 |
| 283 void QuicStreamSequencer::FlushBufferedFrames() { | 303 void QuicStreamSequencer::FlushBufferedFrames() { |
| 284 blocked_ = false; | 304 blocked_ = false; |
| 285 FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_); | 305 FrameList::iterator it = buffered_frames_.begin(); |
| 286 while (it != buffered_frames_.end()) { | 306 while (it != buffered_frames_.end() && it->offset == num_bytes_consumed_) { |
| 287 DVLOG(1) << "Flushing buffered packet at offset " << it->first; | 307 DVLOG(1) << "Flushing buffered packet at offset " << it->offset; |
| 288 string* data = &it->second; | 308 const string* data = &it->segment; |
| 289 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(), | 309 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(), |
| 290 data->size()); | 310 data->size()); |
| 291 RecordBytesConsumed(bytes_consumed); | 311 RecordBytesConsumed(bytes_consumed); |
| 292 if (MaybeCloseStream()) { | 312 if (MaybeCloseStream()) { |
| 293 return; | 313 return; |
| 294 } | 314 } |
| 295 if (bytes_consumed > data->size()) { | 315 if (bytes_consumed > data->size()) { |
| 296 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error | 316 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error |
| 297 return; | 317 return; |
| 298 } else if (bytes_consumed == data->size()) { | 318 } |
| 319 if (bytes_consumed < data->size()) { |
| 320 string new_data = data->substr(bytes_consumed); |
| 299 buffered_frames_.erase(it); | 321 buffered_frames_.erase(it); |
| 300 it = buffered_frames_.find(num_bytes_consumed_); | 322 buffered_frames_.push_front(FrameData(num_bytes_consumed_, new_data)); |
| 301 } else { | |
| 302 string new_data = it->second.substr(bytes_consumed); | |
| 303 buffered_frames_.erase(it); | |
| 304 buffered_frames_.insert(std::make_pair(num_bytes_consumed_, new_data)); | |
| 305 return; | 323 return; |
| 306 } | 324 } |
| 325 buffered_frames_.erase(it++); |
| 307 } | 326 } |
| 308 MaybeCloseStream(); | 327 MaybeCloseStream(); |
| 309 } | 328 } |
| 310 | 329 |
| 311 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { | 330 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { |
| 312 num_bytes_consumed_ += bytes_consumed; | 331 num_bytes_consumed_ += bytes_consumed; |
| 313 num_bytes_buffered_ -= bytes_consumed; | 332 num_bytes_buffered_ -= bytes_consumed; |
| 314 | 333 |
| 315 stream_->AddBytesConsumed(bytes_consumed); | 334 stream_->AddBytesConsumed(bytes_consumed); |
| 316 } | 335 } |
| 317 | 336 |
| 318 } // namespace net | 337 } // namespace net |
| OLD | NEW |