| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "net/quic/quic_stream_sequencer.h" | |
| 6 | |
| 7 #include <algorithm> | |
| 8 #include <limits> | |
| 9 | |
| 10 #include "base/logging.h" | |
| 11 #include "base/metrics/sparse_histogram.h" | |
| 12 #include "net/quic/reliable_quic_stream.h" | |
| 13 | |
| 14 using std::min; | |
| 15 using std::numeric_limits; | |
| 16 using std::string; | |
| 17 | |
| 18 namespace net { | |
| 19 | |
| 20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) | |
| 21 : stream_(quic_stream), | |
| 22 num_bytes_consumed_(0), | |
| 23 close_offset_(numeric_limits<QuicStreamOffset>::max()), | |
| 24 blocked_(false), | |
| 25 num_bytes_buffered_(0), | |
| 26 num_frames_received_(0), | |
| 27 num_duplicate_frames_received_(0), | |
| 28 num_early_frames_received_(0) { | |
| 29 } | |
| 30 | |
| 31 QuicStreamSequencer::~QuicStreamSequencer() { | |
| 32 } | |
| 33 | |
| 34 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { | |
| 35 ++num_frames_received_; | |
| 36 if (IsDuplicate(frame)) { | |
| 37 ++num_duplicate_frames_received_; | |
| 38 // Silently ignore duplicates. | |
| 39 return; | |
| 40 } | |
| 41 | |
| 42 if (FrameOverlapsBufferedData(frame)) { | |
| 43 stream_->CloseConnectionWithDetails( | |
| 44 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); | |
| 45 return; | |
| 46 } | |
| 47 | |
| 48 QuicStreamOffset byte_offset = frame.offset; | |
| 49 size_t data_len = frame.data.TotalBufferSize(); | |
| 50 if (data_len == 0 && !frame.fin) { | |
| 51 // Stream frames must have data or a fin flag. | |
| 52 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, | |
| 53 "Empty stream frame without FIN set."); | |
| 54 return; | |
| 55 } | |
| 56 | |
| 57 if (frame.fin) { | |
| 58 CloseStreamAtOffset(frame.offset + data_len); | |
| 59 if (data_len == 0) { | |
| 60 return; | |
| 61 } | |
| 62 } | |
| 63 | |
| 64 IOVector data; | |
| 65 data.AppendIovec(frame.data.iovec(), frame.data.Size()); | |
| 66 | |
| 67 if (byte_offset > num_bytes_consumed_) { | |
| 68 ++num_early_frames_received_; | |
| 69 } | |
| 70 | |
| 71 // If the frame has arrived in-order then we can process it immediately, only | |
| 72 // buffering if the stream is unable to process it. | |
| 73 if (!blocked_ && byte_offset == num_bytes_consumed_) { | |
| 74 DVLOG(1) << "Processing byte offset " << byte_offset; | |
| 75 size_t bytes_consumed = 0; | |
| 76 for (size_t i = 0; i < data.Size(); ++i) { | |
| 77 bytes_consumed += stream_->ProcessRawData( | |
| 78 static_cast<char*>(data.iovec()[i].iov_base), | |
| 79 data.iovec()[i].iov_len); | |
| 80 } | |
| 81 num_bytes_consumed_ += bytes_consumed; | |
| 82 stream_->AddBytesConsumed(bytes_consumed); | |
| 83 | |
| 84 if (MaybeCloseStream()) { | |
| 85 return; | |
| 86 } | |
| 87 if (bytes_consumed > data_len) { | |
| 88 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | |
| 89 return; | |
| 90 } else if (bytes_consumed == data_len) { | |
| 91 FlushBufferedFrames(); | |
| 92 return; // it's safe to ack this frame. | |
| 93 } else { | |
| 94 // Set ourselves up to buffer what's left. | |
| 95 data_len -= bytes_consumed; | |
| 96 data.Consume(bytes_consumed); | |
| 97 byte_offset += bytes_consumed; | |
| 98 } | |
| 99 } | |
| 100 | |
| 101 // Buffer any remaining data to be consumed by the stream when ready. | |
| 102 for (size_t i = 0; i < data.Size(); ++i) { | |
| 103 DVLOG(1) << "Buffering stream data at offset " << byte_offset; | |
| 104 const iovec& iov = data.iovec()[i]; | |
| 105 buffered_frames_.insert(std::make_pair( | |
| 106 byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len))); | |
| 107 byte_offset += iov.iov_len; | |
| 108 num_bytes_buffered_ += iov.iov_len; | |
| 109 } | |
| 110 return; | |
| 111 } | |
| 112 | |
| 113 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { | |
| 114 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); | |
| 115 | |
| 116 // If we have a scheduled termination or close, any new offset should match | |
| 117 // it. | |
| 118 if (close_offset_ != kMaxOffset && offset != close_offset_) { | |
| 119 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); | |
| 120 return; | |
| 121 } | |
| 122 | |
| 123 close_offset_ = offset; | |
| 124 | |
| 125 MaybeCloseStream(); | |
| 126 } | |
| 127 | |
| 128 bool QuicStreamSequencer::MaybeCloseStream() { | |
| 129 if (!blocked_ && IsClosed()) { | |
| 130 DVLOG(1) << "Passing up termination, as we've processed " | |
| 131 << num_bytes_consumed_ << " of " << close_offset_ | |
| 132 << " bytes."; | |
| 133 // Technically it's an error if num_bytes_consumed isn't exactly | |
| 134 // equal, but error handling seems silly at this point. | |
| 135 stream_->OnFinRead(); | |
| 136 buffered_frames_.clear(); | |
| 137 num_bytes_buffered_ = 0; | |
| 138 return true; | |
| 139 } | |
| 140 return false; | |
| 141 } | |
| 142 | |
| 143 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) { | |
| 144 DCHECK(!blocked_); | |
| 145 FrameMap::iterator it = buffered_frames_.begin(); | |
| 146 size_t index = 0; | |
| 147 QuicStreamOffset offset = num_bytes_consumed_; | |
| 148 while (it != buffered_frames_.end() && index < iov_len) { | |
| 149 if (it->first != offset) return index; | |
| 150 | |
| 151 iov[index].iov_base = static_cast<void*>( | |
| 152 const_cast<char*>(it->second.data())); | |
| 153 iov[index].iov_len = it->second.size(); | |
| 154 offset += it->second.size(); | |
| 155 | |
| 156 ++index; | |
| 157 ++it; | |
| 158 } | |
| 159 return index; | |
| 160 } | |
| 161 | |
| 162 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { | |
| 163 DCHECK(!blocked_); | |
| 164 FrameMap::iterator it = buffered_frames_.begin(); | |
| 165 size_t iov_index = 0; | |
| 166 size_t iov_offset = 0; | |
| 167 size_t frame_offset = 0; | |
| 168 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_; | |
| 169 | |
| 170 while (iov_index < iov_len && | |
| 171 it != buffered_frames_.end() && | |
| 172 it->first == num_bytes_consumed_) { | |
| 173 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, | |
| 174 it->second.size() - frame_offset); | |
| 175 | |
| 176 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; | |
| 177 memcpy(iov_ptr, | |
| 178 it->second.data() + frame_offset, bytes_to_read); | |
| 179 frame_offset += bytes_to_read; | |
| 180 iov_offset += bytes_to_read; | |
| 181 | |
| 182 if (iov[iov_index].iov_len == iov_offset) { | |
| 183 // We've filled this buffer. | |
| 184 iov_offset = 0; | |
| 185 ++iov_index; | |
| 186 } | |
| 187 if (it->second.size() == frame_offset) { | |
| 188 // We've copied this whole frame | |
| 189 RecordBytesConsumed(it->second.size()); | |
| 190 buffered_frames_.erase(it); | |
| 191 it = buffered_frames_.begin(); | |
| 192 frame_offset = 0; | |
| 193 } | |
| 194 } | |
| 195 // We've finished copying. If we have a partial frame, update it. | |
| 196 if (frame_offset != 0) { | |
| 197 buffered_frames_.insert(std::make_pair(it->first + frame_offset, | |
| 198 it->second.substr(frame_offset))); | |
| 199 buffered_frames_.erase(buffered_frames_.begin()); | |
| 200 RecordBytesConsumed(frame_offset); | |
| 201 } | |
| 202 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed); | |
| 203 } | |
| 204 | |
| 205 bool QuicStreamSequencer::HasBytesToRead() const { | |
| 206 FrameMap::const_iterator it = buffered_frames_.begin(); | |
| 207 | |
| 208 return it != buffered_frames_.end() && it->first == num_bytes_consumed_; | |
| 209 } | |
| 210 | |
| 211 bool QuicStreamSequencer::IsClosed() const { | |
| 212 return num_bytes_consumed_ >= close_offset_; | |
| 213 } | |
| 214 | |
| 215 bool QuicStreamSequencer::FrameOverlapsBufferedData( | |
| 216 const QuicStreamFrame& frame) const { | |
| 217 if (buffered_frames_.empty()) { | |
| 218 return false; | |
| 219 } | |
| 220 | |
| 221 FrameMap::const_iterator next_frame = | |
| 222 buffered_frames_.lower_bound(frame.offset); | |
| 223 // Duplicate frames should have been dropped in IsDuplicate. | |
| 224 DCHECK(next_frame == buffered_frames_.end() || | |
| 225 next_frame->first != frame.offset); | |
| 226 | |
| 227 // If there is a buffered frame with a higher starting offset, then we check | |
| 228 // to see if the new frame runs into the higher frame. | |
| 229 if (next_frame != buffered_frames_.end() && | |
| 230 (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) { | |
| 231 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + " | |
| 232 << frame.data.TotalBufferSize() << " > " << next_frame->first; | |
| 233 return true; | |
| 234 } | |
| 235 | |
| 236 // If there is a buffered frame with a lower starting offset, then we check | |
| 237 // to see if the buffered frame runs into the new frame. | |
| 238 if (next_frame != buffered_frames_.begin()) { | |
| 239 FrameMap::const_iterator preceeding_frame = --next_frame; | |
| 240 QuicStreamOffset offset = preceeding_frame->first; | |
| 241 uint64 data_length = preceeding_frame->second.length(); | |
| 242 if ((offset + data_length) > frame.offset) { | |
| 243 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + " | |
| 244 << data_length << " > " << frame.offset; | |
| 245 return true; | |
| 246 } | |
| 247 } | |
| 248 return false; | |
| 249 } | |
| 250 | |
| 251 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { | |
| 252 // A frame is duplicate if the frame offset is smaller than our bytes consumed | |
| 253 // or we have stored the frame in our map. | |
| 254 // TODO(pwestin): Is it possible that a new frame contain more data even if | |
| 255 // the offset is the same? | |
| 256 return frame.offset < num_bytes_consumed_ || | |
| 257 buffered_frames_.find(frame.offset) != buffered_frames_.end(); | |
| 258 } | |
| 259 | |
| 260 void QuicStreamSequencer::SetBlockedUntilFlush() { | |
| 261 blocked_ = true; | |
| 262 } | |
| 263 | |
| 264 void QuicStreamSequencer::FlushBufferedFrames() { | |
| 265 blocked_ = false; | |
| 266 FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_); | |
| 267 while (it != buffered_frames_.end()) { | |
| 268 DVLOG(1) << "Flushing buffered packet at offset " << it->first; | |
| 269 string* data = &it->second; | |
| 270 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(), | |
| 271 data->size()); | |
| 272 RecordBytesConsumed(bytes_consumed); | |
| 273 if (MaybeCloseStream()) { | |
| 274 return; | |
| 275 } | |
| 276 if (bytes_consumed > data->size()) { | |
| 277 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error | |
| 278 return; | |
| 279 } else if (bytes_consumed == data->size()) { | |
| 280 buffered_frames_.erase(it); | |
| 281 it = buffered_frames_.find(num_bytes_consumed_); | |
| 282 } else { | |
| 283 string new_data = it->second.substr(bytes_consumed); | |
| 284 buffered_frames_.erase(it); | |
| 285 buffered_frames_.insert(std::make_pair(num_bytes_consumed_, new_data)); | |
| 286 return; | |
| 287 } | |
| 288 } | |
| 289 MaybeCloseStream(); | |
| 290 } | |
| 291 | |
| 292 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { | |
| 293 num_bytes_consumed_ += bytes_consumed; | |
| 294 num_bytes_buffered_ -= bytes_consumed; | |
| 295 | |
| 296 stream_->AddBytesConsumed(bytes_consumed); | |
| 297 } | |
| 298 | |
| 299 } // namespace net | |
| OLD | NEW |