OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/quic_stream_sequencer.h" | 5 #include "net/quic/quic_stream_sequencer.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/logging.h" | 11 #include "base/logging.h" |
| 12 #include "net/quic/quic_frame_list.h" |
12 #include "net/quic/reliable_quic_stream.h" | 13 #include "net/quic/reliable_quic_stream.h" |
13 | 14 |
14 using std::min; | 15 using std::min; |
15 using std::numeric_limits; | 16 using std::numeric_limits; |
16 using std::string; | 17 using std::string; |
17 | 18 |
18 namespace net { | 19 namespace net { |
19 | 20 |
20 QuicStreamSequencer::FrameData::FrameData(QuicStreamOffset offset, | |
21 const string& segment) | |
22 : offset(offset), segment(segment) {} | |
23 | |
24 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) | 21 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) |
25 : stream_(quic_stream), | 22 : stream_(quic_stream), |
26 num_bytes_consumed_(0), | 23 num_bytes_consumed_(0), |
27 close_offset_(numeric_limits<QuicStreamOffset>::max()), | 24 close_offset_(numeric_limits<QuicStreamOffset>::max()), |
28 blocked_(false), | 25 blocked_(false), |
29 num_bytes_buffered_(0), | 26 num_bytes_buffered_(0), |
30 num_frames_received_(0), | 27 num_frames_received_(0), |
31 num_duplicate_frames_received_(0), | 28 num_duplicate_frames_received_(0), |
32 num_early_frames_received_(0) { | 29 num_early_frames_received_(0) {} |
33 } | |
34 | 30 |
35 QuicStreamSequencer::~QuicStreamSequencer() { | 31 QuicStreamSequencer::~QuicStreamSequencer() {} |
36 } | |
37 | 32 |
38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { | 33 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { |
39 ++num_frames_received_; | 34 ++num_frames_received_; |
40 FrameList::iterator insertion_point = FindInsertionPoint(frame); | |
41 if (IsDuplicate(frame, insertion_point)) { | |
42 ++num_duplicate_frames_received_; | |
43 // Silently ignore duplicates. | |
44 return; | |
45 } | |
46 | |
47 if (FrameOverlapsBufferedData(frame, insertion_point)) { | |
48 stream_->CloseConnectionWithDetails( | |
49 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); | |
50 return; | |
51 } | |
52 | |
53 const QuicStreamOffset byte_offset = frame.offset; | 35 const QuicStreamOffset byte_offset = frame.offset; |
54 const size_t data_len = frame.data.length(); | 36 const size_t data_len = frame.data.length(); |
55 if (data_len == 0 && !frame.fin) { | 37 if (data_len == 0 && !frame.fin) { |
56 // Stream frames must have data or a fin flag. | 38 // Stream frames must have data or a fin flag. |
57 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, | 39 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, |
58 "Empty stream frame without FIN set."); | 40 "Empty stream frame without FIN set."); |
59 return; | 41 return; |
60 } | 42 } |
61 | 43 |
62 if (frame.fin) { | 44 if (frame.fin) { |
63 CloseStreamAtOffset(frame.offset + data_len); | 45 CloseStreamAtOffset(frame.offset + data_len); |
64 if (data_len == 0) { | 46 if (data_len == 0) { |
65 return; | 47 return; |
66 } | 48 } |
67 } | 49 } |
| 50 size_t bytes_written; |
| 51 QuicErrorCode result = |
| 52 buffered_frames_.WriteAtOffset(byte_offset, frame.data, &bytes_written); |
| 53 |
| 54 if (result == QUIC_INVALID_STREAM_DATA) { |
| 55 stream_->CloseConnectionWithDetails( |
| 56 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); |
| 57 return; |
| 58 } |
| 59 if (result == QUIC_NO_ERROR && bytes_written == 0) { |
| 60 ++num_duplicate_frames_received_; |
| 61 // Silently ignore duplicates. |
| 62 return; |
| 63 } |
68 | 64 |
69 if (byte_offset > num_bytes_consumed_) { | 65 if (byte_offset > num_bytes_consumed_) { |
70 ++num_early_frames_received_; | 66 ++num_early_frames_received_; |
71 } | 67 } |
72 | 68 |
73 DVLOG(1) << "Buffering stream data at offset " << byte_offset; | |
74 // Inserting an empty string and then copying to avoid the extra copy. | |
75 insertion_point = | |
76 buffered_frames_.insert(insertion_point, FrameData(byte_offset, "")); | |
77 frame.data.CopyToString(&insertion_point->segment); | |
78 num_bytes_buffered_ += data_len; | 69 num_bytes_buffered_ += data_len; |
79 | 70 |
80 if (blocked_) { | 71 if (blocked_) { |
81 return; | 72 return; |
82 } | 73 } |
83 | 74 |
84 if (byte_offset == num_bytes_consumed_) { | 75 if (byte_offset == num_bytes_consumed_) { |
85 stream_->OnDataAvailable(); | 76 stream_->OnDataAvailable(); |
86 } | 77 } |
87 } | 78 } |
88 | 79 |
89 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { | 80 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { |
90 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); | 81 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); |
91 | 82 |
92 // If there is a scheduled close, the new offset should match it. | 83 // If there is a scheduled close, the new offset should match it. |
93 if (close_offset_ != kMaxOffset && offset != close_offset_) { | 84 if (close_offset_ != kMaxOffset && offset != close_offset_) { |
94 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); | 85 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); |
95 return; | 86 return; |
96 } | 87 } |
97 | 88 |
98 close_offset_ = offset; | 89 close_offset_ = offset; |
99 | 90 |
100 MaybeCloseStream(); | 91 MaybeCloseStream(); |
101 } | 92 } |
102 | 93 |
103 bool QuicStreamSequencer::MaybeCloseStream() { | 94 bool QuicStreamSequencer::MaybeCloseStream() { |
104 if (!blocked_ && IsClosed()) { | 95 if (!blocked_ && IsClosed()) { |
105 DVLOG(1) << "Passing up termination, as we've processed " | 96 DVLOG(1) << "Passing up termination, as we've processed " |
106 << num_bytes_consumed_ << " of " << close_offset_ | 97 << num_bytes_consumed_ << " of " << close_offset_ << " bytes."; |
107 << " bytes."; | |
108 // This will cause the stream to consume the fin. | 98 // This will cause the stream to consume the fin. |
109 // Technically it's an error if num_bytes_consumed isn't exactly | 99 // Technically it's an error if num_bytes_consumed isn't exactly |
110 // equal, but error handling seems silly at this point. | 100 // equal, but error handling seems silly at this point. |
111 stream_->OnDataAvailable(); | 101 stream_->OnDataAvailable(); |
112 buffered_frames_.clear(); | 102 buffered_frames_.Clear(); |
113 num_bytes_buffered_ = 0; | 103 num_bytes_buffered_ = 0; |
114 return true; | 104 return true; |
115 } | 105 } |
116 return false; | 106 return false; |
117 } | 107 } |
118 | 108 |
119 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { | 109 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { |
120 DCHECK(!blocked_); | 110 DCHECK(!blocked_); |
121 FrameList::const_iterator it = buffered_frames_.begin(); | 111 return buffered_frames_.GetReadableRegions(iov, iov_len); |
122 size_t index = 0; | |
123 QuicStreamOffset offset = num_bytes_consumed_; | |
124 while (it != buffered_frames_.end() && index < iov_len) { | |
125 if (it->offset != offset) { | |
126 return index; | |
127 } | |
128 | |
129 iov[index].iov_base = | |
130 static_cast<void*>(const_cast<char*>(it->segment.data())); | |
131 iov[index].iov_len = it->segment.size(); | |
132 offset += it->segment.size(); | |
133 | |
134 ++index; | |
135 ++it; | |
136 } | |
137 return index; | |
138 } | 112 } |
139 | 113 |
140 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { | 114 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { |
141 DCHECK(!blocked_); | 115 DCHECK(!blocked_); |
142 FrameList::iterator it = buffered_frames_.begin(); | 116 size_t bytes_read = buffered_frames_.ReadvAndInvalidate(iov, iov_len); |
143 size_t iov_index = 0; | 117 RecordBytesConsumed(bytes_read); |
144 size_t iov_offset = 0; | 118 return static_cast<int>(bytes_read); |
145 size_t frame_offset = 0; | |
146 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_; | |
147 | |
148 while (iov_index < iov_len && it != buffered_frames_.end() && | |
149 it->offset == num_bytes_consumed_) { | |
150 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, | |
151 it->segment.size() - frame_offset); | |
152 | |
153 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; | |
154 memcpy(iov_ptr, it->segment.data() + frame_offset, bytes_to_read); | |
155 frame_offset += bytes_to_read; | |
156 iov_offset += bytes_to_read; | |
157 | |
158 if (iov[iov_index].iov_len == iov_offset) { | |
159 // We've filled this buffer. | |
160 iov_offset = 0; | |
161 ++iov_index; | |
162 } | |
163 if (it->segment.size() == frame_offset) { | |
164 // We've copied this whole frame | |
165 RecordBytesConsumed(it->segment.size()); | |
166 buffered_frames_.erase(it); | |
167 it = buffered_frames_.begin(); | |
168 frame_offset = 0; | |
169 } | |
170 } | |
171 // Done copying. If there is a partial frame, update it. | |
172 if (frame_offset != 0) { | |
173 buffered_frames_.push_front( | |
174 FrameData(it->offset + frame_offset, it->segment.substr(frame_offset))); | |
175 buffered_frames_.erase(it); | |
176 RecordBytesConsumed(frame_offset); | |
177 } | |
178 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed); | |
179 } | 119 } |
180 | 120 |
181 bool QuicStreamSequencer::HasBytesToRead() const { | 121 bool QuicStreamSequencer::HasBytesToRead() const { |
182 return !buffered_frames_.empty() && | 122 return buffered_frames_.HasBytesToRead(); |
183 buffered_frames_.begin()->offset == num_bytes_consumed_; | |
184 } | 123 } |
185 | 124 |
186 bool QuicStreamSequencer::IsClosed() const { | 125 bool QuicStreamSequencer::IsClosed() const { |
187 return num_bytes_consumed_ >= close_offset_; | 126 return num_bytes_consumed_ >= close_offset_; |
188 } | 127 } |
189 | 128 |
190 QuicStreamSequencer::FrameList::iterator | |
191 QuicStreamSequencer::FindInsertionPoint(const QuicStreamFrame& frame) { | |
192 if (buffered_frames_.empty()) { | |
193 return buffered_frames_.begin(); | |
194 } | |
195 // If it's after all buffered_frames, return the end. | |
196 if (frame.offset >= (buffered_frames_.rbegin()->offset + | |
197 buffered_frames_.rbegin()->segment.length())) { | |
198 return buffered_frames_.end(); | |
199 } | |
200 FrameList::iterator iter = buffered_frames_.begin(); | |
201 // Only advance the iterator if the data begins after the already received | |
202 // frame. If the new frame overlaps with an existing frame, the iterator will | |
203 // still point to the frame it overlaps with. | |
204 while (iter != buffered_frames_.end() && | |
205 frame.offset >= iter->offset + iter->segment.length()) { | |
206 ++iter; | |
207 } | |
208 return iter; | |
209 } | |
210 | |
211 bool QuicStreamSequencer::FrameOverlapsBufferedData( | |
212 const QuicStreamFrame& frame, | |
213 FrameList::const_iterator insertion_point) const { | |
214 if (buffered_frames_.empty() || insertion_point == buffered_frames_.end()) { | |
215 return false; | |
216 } | |
217 // If there is a buffered frame with a higher starting offset, then check to | |
218 // see if the new frame overlaps the beginning of the higher frame. | |
219 if (frame.offset < insertion_point->offset && | |
220 frame.offset + frame.data.length() > insertion_point->offset) { | |
221 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + " | |
222 << frame.data.size() << " > " << insertion_point->offset; | |
223 return true; | |
224 } | |
225 // If there is a buffered frame with a lower starting offset, then check to | |
226 // see if the buffered frame runs into the new frame. | |
227 if (frame.offset >= insertion_point->offset && | |
228 frame.offset < | |
229 insertion_point->offset + insertion_point->segment.length()) { | |
230 DVLOG(1) << "Preceeding frame overlaps new frame: " | |
231 << insertion_point->offset << " + " | |
232 << insertion_point->segment.length() << " > " << frame.offset; | |
233 return true; | |
234 } | |
235 | |
236 return false; | |
237 } | |
238 | |
239 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { | 129 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { |
240 DCHECK(!blocked_); | 130 DCHECK(!blocked_); |
241 size_t end_offset = num_bytes_consumed_ + num_bytes_consumed; | 131 bool result = |
242 while (!buffered_frames_.empty() && end_offset != num_bytes_consumed_) { | 132 buffered_frames_.IncreaseTotalReadAndInvalidate(num_bytes_consumed); |
243 FrameList::iterator it = buffered_frames_.begin(); | 133 if (!result) { |
244 if (it->offset != num_bytes_consumed_) { | 134 LOG(DFATAL) << "Invalid argument to MarkConsumed." |
245 LOG(DFATAL) << "Invalid argument to MarkConsumed. " | 135 << " expect to consume: " << num_bytes_consumed |
246 << " num_bytes_consumed_: " << num_bytes_consumed_ | 136 << ", but not enough bytes available."; |
247 << " end_offset: " << end_offset << " offset: " << it->offset | 137 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); |
248 << " length: " << it->segment.length(); | 138 return; |
249 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | |
250 return; | |
251 } | |
252 | |
253 if (it->offset + it->segment.length() <= end_offset) { | |
254 RecordBytesConsumed(it->segment.length()); | |
255 // This chunk is entirely consumed. | |
256 buffered_frames_.erase(it); | |
257 continue; | |
258 } | |
259 | |
260 // Partially consume this frame. | |
261 size_t delta = end_offset - it->offset; | |
262 RecordBytesConsumed(delta); | |
263 string new_data = it->segment.substr(delta); | |
264 buffered_frames_.erase(it); | |
265 buffered_frames_.push_front(FrameData(num_bytes_consumed_, new_data)); | |
266 break; | |
267 } | 139 } |
268 } | 140 RecordBytesConsumed(num_bytes_consumed); |
269 | |
270 bool QuicStreamSequencer::IsDuplicate( | |
271 const QuicStreamFrame& frame, | |
272 FrameList::const_iterator insertion_point) const { | |
273 // A frame is duplicate if the frame offset is smaller than the bytes consumed | |
274 // or identical to an already received frame. | |
275 return frame.offset < num_bytes_consumed_ || | |
276 (insertion_point != buffered_frames_.end() && | |
277 frame.offset == insertion_point->offset); | |
278 } | 141 } |
279 | 142 |
280 void QuicStreamSequencer::SetBlockedUntilFlush() { | 143 void QuicStreamSequencer::SetBlockedUntilFlush() { |
281 blocked_ = true; | 144 blocked_ = true; |
282 } | 145 } |
283 | 146 |
284 void QuicStreamSequencer::SetUnblocked() { | 147 void QuicStreamSequencer::SetUnblocked() { |
285 blocked_ = false; | 148 blocked_ = false; |
286 if (IsClosed() || HasBytesToRead()) { | 149 if (IsClosed() || HasBytesToRead()) { |
287 stream_->OnDataAvailable(); | 150 stream_->OnDataAvailable(); |
288 } | 151 } |
289 } | 152 } |
290 | 153 |
291 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { | 154 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { |
292 num_bytes_consumed_ += bytes_consumed; | 155 num_bytes_consumed_ += bytes_consumed; |
293 num_bytes_buffered_ -= bytes_consumed; | 156 num_bytes_buffered_ -= bytes_consumed; |
294 | 157 |
295 stream_->AddBytesConsumed(bytes_consumed); | 158 stream_->AddBytesConsumed(bytes_consumed); |
296 } | 159 } |
297 | 160 |
298 } // namespace net | 161 } // namespace net |
OLD | NEW |