Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(29)

Side by Side Diff: net/quic/quic_stream_sequencer.cc

Issue 1400293002: relnote: refactoring the buffering logic in QUIC's stream sequencer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: disentangle merge of 104894802 Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/quic/quic_stream_sequencer.h ('k') | net/quic/quic_stream_sequencer_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/quic_stream_sequencer.h" 5 #include "net/quic/quic_stream_sequencer.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "net/quic/quic_frame_list.h"
12 #include "net/quic/reliable_quic_stream.h" 13 #include "net/quic/reliable_quic_stream.h"
13 14
14 using std::min; 15 using std::min;
15 using std::numeric_limits; 16 using std::numeric_limits;
16 using std::string; 17 using std::string;
17 18
18 namespace net { 19 namespace net {
19 20
20 QuicStreamSequencer::FrameData::FrameData(QuicStreamOffset offset,
21 const string& segment)
22 : offset(offset), segment(segment) {}
23
24 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) 21 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
25 : stream_(quic_stream), 22 : stream_(quic_stream),
26 num_bytes_consumed_(0), 23 num_bytes_consumed_(0),
27 close_offset_(numeric_limits<QuicStreamOffset>::max()), 24 close_offset_(numeric_limits<QuicStreamOffset>::max()),
28 blocked_(false), 25 blocked_(false),
29 num_bytes_buffered_(0), 26 num_bytes_buffered_(0),
30 num_frames_received_(0), 27 num_frames_received_(0),
31 num_duplicate_frames_received_(0), 28 num_duplicate_frames_received_(0),
32 num_early_frames_received_(0) { 29 num_early_frames_received_(0) {}
33 }
34 30
35 QuicStreamSequencer::~QuicStreamSequencer() { 31 QuicStreamSequencer::~QuicStreamSequencer() {}
36 }
37 32
38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { 33 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
39 ++num_frames_received_; 34 ++num_frames_received_;
40 FrameList::iterator insertion_point = FindInsertionPoint(frame);
41 if (IsDuplicate(frame, insertion_point)) {
42 ++num_duplicate_frames_received_;
43 // Silently ignore duplicates.
44 return;
45 }
46
47 if (FrameOverlapsBufferedData(frame, insertion_point)) {
48 stream_->CloseConnectionWithDetails(
49 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
50 return;
51 }
52
53 const QuicStreamOffset byte_offset = frame.offset; 35 const QuicStreamOffset byte_offset = frame.offset;
54 const size_t data_len = frame.data.length(); 36 const size_t data_len = frame.data.length();
55 if (data_len == 0 && !frame.fin) { 37 if (data_len == 0 && !frame.fin) {
56 // Stream frames must have data or a fin flag. 38 // Stream frames must have data or a fin flag.
57 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, 39 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
58 "Empty stream frame without FIN set."); 40 "Empty stream frame without FIN set.");
59 return; 41 return;
60 } 42 }
61 43
62 if (frame.fin) { 44 if (frame.fin) {
63 CloseStreamAtOffset(frame.offset + data_len); 45 CloseStreamAtOffset(frame.offset + data_len);
64 if (data_len == 0) { 46 if (data_len == 0) {
65 return; 47 return;
66 } 48 }
67 } 49 }
50 size_t bytes_written;
51 QuicErrorCode result =
52 buffered_frames_.WriteAtOffset(byte_offset, frame.data, &bytes_written);
53
54 if (result == QUIC_INVALID_STREAM_DATA) {
55 stream_->CloseConnectionWithDetails(
56 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
57 return;
58 }
59 if (result == QUIC_NO_ERROR && bytes_written == 0) {
60 ++num_duplicate_frames_received_;
61 // Silently ignore duplicates.
62 return;
63 }
68 64
69 if (byte_offset > num_bytes_consumed_) { 65 if (byte_offset > num_bytes_consumed_) {
70 ++num_early_frames_received_; 66 ++num_early_frames_received_;
71 } 67 }
72 68
73 DVLOG(1) << "Buffering stream data at offset " << byte_offset;
74 // Inserting an empty string and then copying to avoid the extra copy.
75 insertion_point =
76 buffered_frames_.insert(insertion_point, FrameData(byte_offset, ""));
77 frame.data.CopyToString(&insertion_point->segment);
78 num_bytes_buffered_ += data_len; 69 num_bytes_buffered_ += data_len;
79 70
80 if (blocked_) { 71 if (blocked_) {
81 return; 72 return;
82 } 73 }
83 74
84 if (byte_offset == num_bytes_consumed_) { 75 if (byte_offset == num_bytes_consumed_) {
85 stream_->OnDataAvailable(); 76 stream_->OnDataAvailable();
86 } 77 }
87 } 78 }
88 79
89 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { 80 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
90 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); 81 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
91 82
92 // If there is a scheduled close, the new offset should match it. 83 // If there is a scheduled close, the new offset should match it.
93 if (close_offset_ != kMaxOffset && offset != close_offset_) { 84 if (close_offset_ != kMaxOffset && offset != close_offset_) {
94 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); 85 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
95 return; 86 return;
96 } 87 }
97 88
98 close_offset_ = offset; 89 close_offset_ = offset;
99 90
100 MaybeCloseStream(); 91 MaybeCloseStream();
101 } 92 }
102 93
103 bool QuicStreamSequencer::MaybeCloseStream() { 94 bool QuicStreamSequencer::MaybeCloseStream() {
104 if (!blocked_ && IsClosed()) { 95 if (!blocked_ && IsClosed()) {
105 DVLOG(1) << "Passing up termination, as we've processed " 96 DVLOG(1) << "Passing up termination, as we've processed "
106 << num_bytes_consumed_ << " of " << close_offset_ 97 << num_bytes_consumed_ << " of " << close_offset_ << " bytes.";
107 << " bytes.";
108 // This will cause the stream to consume the fin. 98 // This will cause the stream to consume the fin.
109 // Technically it's an error if num_bytes_consumed isn't exactly 99 // Technically it's an error if num_bytes_consumed isn't exactly
110 // equal, but error handling seems silly at this point. 100 // equal, but error handling seems silly at this point.
111 stream_->OnDataAvailable(); 101 stream_->OnDataAvailable();
112 buffered_frames_.clear(); 102 buffered_frames_.Clear();
113 num_bytes_buffered_ = 0; 103 num_bytes_buffered_ = 0;
114 return true; 104 return true;
115 } 105 }
116 return false; 106 return false;
117 } 107 }
118 108
119 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { 109 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const {
120 DCHECK(!blocked_); 110 DCHECK(!blocked_);
121 FrameList::const_iterator it = buffered_frames_.begin(); 111 return buffered_frames_.GetReadableRegions(iov, iov_len);
122 size_t index = 0;
123 QuicStreamOffset offset = num_bytes_consumed_;
124 while (it != buffered_frames_.end() && index < iov_len) {
125 if (it->offset != offset) {
126 return index;
127 }
128
129 iov[index].iov_base =
130 static_cast<void*>(const_cast<char*>(it->segment.data()));
131 iov[index].iov_len = it->segment.size();
132 offset += it->segment.size();
133
134 ++index;
135 ++it;
136 }
137 return index;
138 } 112 }
139 113
140 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { 114 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
141 DCHECK(!blocked_); 115 DCHECK(!blocked_);
142 FrameList::iterator it = buffered_frames_.begin(); 116 size_t bytes_read = buffered_frames_.ReadvAndInvalidate(iov, iov_len);
143 size_t iov_index = 0; 117 RecordBytesConsumed(bytes_read);
144 size_t iov_offset = 0; 118 return static_cast<int>(bytes_read);
145 size_t frame_offset = 0;
146 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_;
147
148 while (iov_index < iov_len && it != buffered_frames_.end() &&
149 it->offset == num_bytes_consumed_) {
150 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
151 it->segment.size() - frame_offset);
152
153 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
154 memcpy(iov_ptr, it->segment.data() + frame_offset, bytes_to_read);
155 frame_offset += bytes_to_read;
156 iov_offset += bytes_to_read;
157
158 if (iov[iov_index].iov_len == iov_offset) {
159 // We've filled this buffer.
160 iov_offset = 0;
161 ++iov_index;
162 }
163 if (it->segment.size() == frame_offset) {
164 // We've copied this whole frame
165 RecordBytesConsumed(it->segment.size());
166 buffered_frames_.erase(it);
167 it = buffered_frames_.begin();
168 frame_offset = 0;
169 }
170 }
171 // Done copying. If there is a partial frame, update it.
172 if (frame_offset != 0) {
173 buffered_frames_.push_front(
174 FrameData(it->offset + frame_offset, it->segment.substr(frame_offset)));
175 buffered_frames_.erase(it);
176 RecordBytesConsumed(frame_offset);
177 }
178 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed);
179 } 119 }
180 120
181 bool QuicStreamSequencer::HasBytesToRead() const { 121 bool QuicStreamSequencer::HasBytesToRead() const {
182 return !buffered_frames_.empty() && 122 return buffered_frames_.HasBytesToRead();
183 buffered_frames_.begin()->offset == num_bytes_consumed_;
184 } 123 }
185 124
186 bool QuicStreamSequencer::IsClosed() const { 125 bool QuicStreamSequencer::IsClosed() const {
187 return num_bytes_consumed_ >= close_offset_; 126 return num_bytes_consumed_ >= close_offset_;
188 } 127 }
189 128
190 QuicStreamSequencer::FrameList::iterator
191 QuicStreamSequencer::FindInsertionPoint(const QuicStreamFrame& frame) {
192 if (buffered_frames_.empty()) {
193 return buffered_frames_.begin();
194 }
195 // If it's after all buffered_frames, return the end.
196 if (frame.offset >= (buffered_frames_.rbegin()->offset +
197 buffered_frames_.rbegin()->segment.length())) {
198 return buffered_frames_.end();
199 }
200 FrameList::iterator iter = buffered_frames_.begin();
201 // Only advance the iterator if the data begins after the already received
202 // frame. If the new frame overlaps with an existing frame, the iterator will
203 // still point to the frame it overlaps with.
204 while (iter != buffered_frames_.end() &&
205 frame.offset >= iter->offset + iter->segment.length()) {
206 ++iter;
207 }
208 return iter;
209 }
210
211 bool QuicStreamSequencer::FrameOverlapsBufferedData(
212 const QuicStreamFrame& frame,
213 FrameList::const_iterator insertion_point) const {
214 if (buffered_frames_.empty() || insertion_point == buffered_frames_.end()) {
215 return false;
216 }
217 // If there is a buffered frame with a higher starting offset, then check to
218 // see if the new frame overlaps the beginning of the higher frame.
219 if (frame.offset < insertion_point->offset &&
220 frame.offset + frame.data.length() > insertion_point->offset) {
221 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + "
222 << frame.data.size() << " > " << insertion_point->offset;
223 return true;
224 }
225 // If there is a buffered frame with a lower starting offset, then check to
226 // see if the buffered frame runs into the new frame.
227 if (frame.offset >= insertion_point->offset &&
228 frame.offset <
229 insertion_point->offset + insertion_point->segment.length()) {
230 DVLOG(1) << "Preceeding frame overlaps new frame: "
231 << insertion_point->offset << " + "
232 << insertion_point->segment.length() << " > " << frame.offset;
233 return true;
234 }
235
236 return false;
237 }
238
239 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { 129 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
240 DCHECK(!blocked_); 130 DCHECK(!blocked_);
241 size_t end_offset = num_bytes_consumed_ + num_bytes_consumed; 131 bool result =
242 while (!buffered_frames_.empty() && end_offset != num_bytes_consumed_) { 132 buffered_frames_.IncreaseTotalReadAndInvalidate(num_bytes_consumed);
243 FrameList::iterator it = buffered_frames_.begin(); 133 if (!result) {
244 if (it->offset != num_bytes_consumed_) { 134 LOG(DFATAL) << "Invalid argument to MarkConsumed."
245 LOG(DFATAL) << "Invalid argument to MarkConsumed. " 135 << " expect to consume: " << num_bytes_consumed
246 << " num_bytes_consumed_: " << num_bytes_consumed_ 136 << ", but not enough bytes available.";
247 << " end_offset: " << end_offset << " offset: " << it->offset 137 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
248 << " length: " << it->segment.length(); 138 return;
249 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
250 return;
251 }
252
253 if (it->offset + it->segment.length() <= end_offset) {
254 RecordBytesConsumed(it->segment.length());
255 // This chunk is entirely consumed.
256 buffered_frames_.erase(it);
257 continue;
258 }
259
260 // Partially consume this frame.
261 size_t delta = end_offset - it->offset;
262 RecordBytesConsumed(delta);
263 string new_data = it->segment.substr(delta);
264 buffered_frames_.erase(it);
265 buffered_frames_.push_front(FrameData(num_bytes_consumed_, new_data));
266 break;
267 } 139 }
268 } 140 RecordBytesConsumed(num_bytes_consumed);
269
270 bool QuicStreamSequencer::IsDuplicate(
271 const QuicStreamFrame& frame,
272 FrameList::const_iterator insertion_point) const {
273 // A frame is duplicate if the frame offset is smaller than the bytes consumed
274 // or identical to an already received frame.
275 return frame.offset < num_bytes_consumed_ ||
276 (insertion_point != buffered_frames_.end() &&
277 frame.offset == insertion_point->offset);
278 } 141 }
279 142
280 void QuicStreamSequencer::SetBlockedUntilFlush() { 143 void QuicStreamSequencer::SetBlockedUntilFlush() {
281 blocked_ = true; 144 blocked_ = true;
282 } 145 }
283 146
284 void QuicStreamSequencer::SetUnblocked() { 147 void QuicStreamSequencer::SetUnblocked() {
285 blocked_ = false; 148 blocked_ = false;
286 if (IsClosed() || HasBytesToRead()) { 149 if (IsClosed() || HasBytesToRead()) {
287 stream_->OnDataAvailable(); 150 stream_->OnDataAvailable();
288 } 151 }
289 } 152 }
290 153
291 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { 154 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
292 num_bytes_consumed_ += bytes_consumed; 155 num_bytes_consumed_ += bytes_consumed;
293 num_bytes_buffered_ -= bytes_consumed; 156 num_bytes_buffered_ -= bytes_consumed;
294 157
295 stream_->AddBytesConsumed(bytes_consumed); 158 stream_->AddBytesConsumed(bytes_consumed);
296 } 159 }
297 160
298 } // namespace net 161 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/quic_stream_sequencer.h ('k') | net/quic/quic_stream_sequencer_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698