OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/quic/quic_stream_sequencer.h" | |
6 | |
7 #include <algorithm> | |
8 #include <limits> | |
9 | |
10 #include "base/logging.h" | |
11 #include "base/metrics/sparse_histogram.h" | |
12 #include "net/quic/reliable_quic_stream.h" | |
13 | |
14 using std::min; | |
15 using std::numeric_limits; | |
16 using std::string; | |
17 | |
18 namespace net { | |
19 | |
20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream) | |
21 : stream_(quic_stream), | |
22 num_bytes_consumed_(0), | |
23 close_offset_(numeric_limits<QuicStreamOffset>::max()), | |
24 blocked_(false), | |
25 num_bytes_buffered_(0), | |
26 num_frames_received_(0), | |
27 num_duplicate_frames_received_(0), | |
28 num_early_frames_received_(0) { | |
29 } | |
30 | |
31 QuicStreamSequencer::~QuicStreamSequencer() { | |
32 } | |
33 | |
34 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { | |
35 ++num_frames_received_; | |
36 if (IsDuplicate(frame)) { | |
37 ++num_duplicate_frames_received_; | |
38 // Silently ignore duplicates. | |
39 return; | |
40 } | |
41 | |
42 if (FrameOverlapsBufferedData(frame)) { | |
43 stream_->CloseConnectionWithDetails( | |
44 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); | |
45 return; | |
46 } | |
47 | |
48 QuicStreamOffset byte_offset = frame.offset; | |
49 size_t data_len = frame.data.TotalBufferSize(); | |
50 if (data_len == 0 && !frame.fin) { | |
51 // Stream frames must have data or a fin flag. | |
52 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, | |
53 "Empty stream frame without FIN set."); | |
54 return; | |
55 } | |
56 | |
57 if (frame.fin) { | |
58 CloseStreamAtOffset(frame.offset + data_len); | |
59 if (data_len == 0) { | |
60 return; | |
61 } | |
62 } | |
63 | |
64 IOVector data; | |
65 data.AppendIovec(frame.data.iovec(), frame.data.Size()); | |
66 | |
67 if (byte_offset > num_bytes_consumed_) { | |
68 ++num_early_frames_received_; | |
69 } | |
70 | |
71 // If the frame has arrived in-order then we can process it immediately, only | |
72 // buffering if the stream is unable to process it. | |
73 if (!blocked_ && byte_offset == num_bytes_consumed_) { | |
74 DVLOG(1) << "Processing byte offset " << byte_offset; | |
75 size_t bytes_consumed = 0; | |
76 for (size_t i = 0; i < data.Size(); ++i) { | |
77 bytes_consumed += stream_->ProcessRawData( | |
78 static_cast<char*>(data.iovec()[i].iov_base), | |
79 data.iovec()[i].iov_len); | |
80 } | |
81 num_bytes_consumed_ += bytes_consumed; | |
82 stream_->AddBytesConsumed(bytes_consumed); | |
83 | |
84 if (MaybeCloseStream()) { | |
85 return; | |
86 } | |
87 if (bytes_consumed > data_len) { | |
88 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); | |
89 return; | |
90 } else if (bytes_consumed == data_len) { | |
91 FlushBufferedFrames(); | |
92 return; // it's safe to ack this frame. | |
93 } else { | |
94 // Set ourselves up to buffer what's left. | |
95 data_len -= bytes_consumed; | |
96 data.Consume(bytes_consumed); | |
97 byte_offset += bytes_consumed; | |
98 } | |
99 } | |
100 | |
101 // Buffer any remaining data to be consumed by the stream when ready. | |
102 for (size_t i = 0; i < data.Size(); ++i) { | |
103 DVLOG(1) << "Buffering stream data at offset " << byte_offset; | |
104 const iovec& iov = data.iovec()[i]; | |
105 buffered_frames_.insert(std::make_pair( | |
106 byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len))); | |
107 byte_offset += iov.iov_len; | |
108 num_bytes_buffered_ += iov.iov_len; | |
109 } | |
110 return; | |
111 } | |
112 | |
113 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { | |
114 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); | |
115 | |
116 // If we have a scheduled termination or close, any new offset should match | |
117 // it. | |
118 if (close_offset_ != kMaxOffset && offset != close_offset_) { | |
119 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); | |
120 return; | |
121 } | |
122 | |
123 close_offset_ = offset; | |
124 | |
125 MaybeCloseStream(); | |
126 } | |
127 | |
128 bool QuicStreamSequencer::MaybeCloseStream() { | |
129 if (!blocked_ && IsClosed()) { | |
130 DVLOG(1) << "Passing up termination, as we've processed " | |
131 << num_bytes_consumed_ << " of " << close_offset_ | |
132 << " bytes."; | |
133 // Technically it's an error if num_bytes_consumed isn't exactly | |
134 // equal, but error handling seems silly at this point. | |
135 stream_->OnFinRead(); | |
136 buffered_frames_.clear(); | |
137 num_bytes_buffered_ = 0; | |
138 return true; | |
139 } | |
140 return false; | |
141 } | |
142 | |
143 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) { | |
144 DCHECK(!blocked_); | |
145 FrameMap::iterator it = buffered_frames_.begin(); | |
146 size_t index = 0; | |
147 QuicStreamOffset offset = num_bytes_consumed_; | |
148 while (it != buffered_frames_.end() && index < iov_len) { | |
149 if (it->first != offset) return index; | |
150 | |
151 iov[index].iov_base = static_cast<void*>( | |
152 const_cast<char*>(it->second.data())); | |
153 iov[index].iov_len = it->second.size(); | |
154 offset += it->second.size(); | |
155 | |
156 ++index; | |
157 ++it; | |
158 } | |
159 return index; | |
160 } | |
161 | |
162 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { | |
163 DCHECK(!blocked_); | |
164 FrameMap::iterator it = buffered_frames_.begin(); | |
165 size_t iov_index = 0; | |
166 size_t iov_offset = 0; | |
167 size_t frame_offset = 0; | |
168 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_; | |
169 | |
170 while (iov_index < iov_len && | |
171 it != buffered_frames_.end() && | |
172 it->first == num_bytes_consumed_) { | |
173 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset, | |
174 it->second.size() - frame_offset); | |
175 | |
176 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset; | |
177 memcpy(iov_ptr, | |
178 it->second.data() + frame_offset, bytes_to_read); | |
179 frame_offset += bytes_to_read; | |
180 iov_offset += bytes_to_read; | |
181 | |
182 if (iov[iov_index].iov_len == iov_offset) { | |
183 // We've filled this buffer. | |
184 iov_offset = 0; | |
185 ++iov_index; | |
186 } | |
187 if (it->second.size() == frame_offset) { | |
188 // We've copied this whole frame | |
189 RecordBytesConsumed(it->second.size()); | |
190 buffered_frames_.erase(it); | |
191 it = buffered_frames_.begin(); | |
192 frame_offset = 0; | |
193 } | |
194 } | |
195 // We've finished copying. If we have a partial frame, update it. | |
196 if (frame_offset != 0) { | |
197 buffered_frames_.insert(std::make_pair(it->first + frame_offset, | |
198 it->second.substr(frame_offset))); | |
199 buffered_frames_.erase(buffered_frames_.begin()); | |
200 RecordBytesConsumed(frame_offset); | |
201 } | |
202 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed); | |
203 } | |
204 | |
205 bool QuicStreamSequencer::HasBytesToRead() const { | |
206 FrameMap::const_iterator it = buffered_frames_.begin(); | |
207 | |
208 return it != buffered_frames_.end() && it->first == num_bytes_consumed_; | |
209 } | |
210 | |
211 bool QuicStreamSequencer::IsClosed() const { | |
212 return num_bytes_consumed_ >= close_offset_; | |
213 } | |
214 | |
215 bool QuicStreamSequencer::FrameOverlapsBufferedData( | |
216 const QuicStreamFrame& frame) const { | |
217 if (buffered_frames_.empty()) { | |
218 return false; | |
219 } | |
220 | |
221 FrameMap::const_iterator next_frame = | |
222 buffered_frames_.lower_bound(frame.offset); | |
223 // Duplicate frames should have been dropped in IsDuplicate. | |
224 DCHECK(next_frame == buffered_frames_.end() || | |
225 next_frame->first != frame.offset); | |
226 | |
227 // If there is a buffered frame with a higher starting offset, then we check | |
228 // to see if the new frame runs into the higher frame. | |
229 if (next_frame != buffered_frames_.end() && | |
230 (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) { | |
231 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + " | |
232 << frame.data.TotalBufferSize() << " > " << next_frame->first; | |
233 return true; | |
234 } | |
235 | |
236 // If there is a buffered frame with a lower starting offset, then we check | |
237 // to see if the buffered frame runs into the new frame. | |
238 if (next_frame != buffered_frames_.begin()) { | |
239 FrameMap::const_iterator preceeding_frame = --next_frame; | |
240 QuicStreamOffset offset = preceeding_frame->first; | |
241 uint64 data_length = preceeding_frame->second.length(); | |
242 if ((offset + data_length) > frame.offset) { | |
243 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + " | |
244 << data_length << " > " << frame.offset; | |
245 return true; | |
246 } | |
247 } | |
248 return false; | |
249 } | |
250 | |
251 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const { | |
252 // A frame is duplicate if the frame offset is smaller than our bytes consumed | |
253 // or we have stored the frame in our map. | |
254 // TODO(pwestin): Is it possible that a new frame contain more data even if | |
255 // the offset is the same? | |
256 return frame.offset < num_bytes_consumed_ || | |
257 buffered_frames_.find(frame.offset) != buffered_frames_.end(); | |
258 } | |
259 | |
260 void QuicStreamSequencer::SetBlockedUntilFlush() { | |
261 blocked_ = true; | |
262 } | |
263 | |
264 void QuicStreamSequencer::FlushBufferedFrames() { | |
265 blocked_ = false; | |
266 FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_); | |
267 while (it != buffered_frames_.end()) { | |
268 DVLOG(1) << "Flushing buffered packet at offset " << it->first; | |
269 string* data = &it->second; | |
270 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(), | |
271 data->size()); | |
272 RecordBytesConsumed(bytes_consumed); | |
273 if (MaybeCloseStream()) { | |
274 return; | |
275 } | |
276 if (bytes_consumed > data->size()) { | |
277 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error | |
278 return; | |
279 } else if (bytes_consumed == data->size()) { | |
280 buffered_frames_.erase(it); | |
281 it = buffered_frames_.find(num_bytes_consumed_); | |
282 } else { | |
283 string new_data = it->second.substr(bytes_consumed); | |
284 buffered_frames_.erase(it); | |
285 buffered_frames_.insert(std::make_pair(num_bytes_consumed_, new_data)); | |
286 return; | |
287 } | |
288 } | |
289 MaybeCloseStream(); | |
290 } | |
291 | |
292 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { | |
293 num_bytes_consumed_ += bytes_consumed; | |
294 num_bytes_buffered_ -= bytes_consumed; | |
295 | |
296 stream_->AddBytesConsumed(bytes_consumed); | |
297 } | |
298 | |
299 } // namespace net | |
OLD | NEW |