OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/quic_data_stream.h" | 5 #include "net/quic/quic_data_stream.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "net/quic/quic_session.h" | 8 #include "net/quic/quic_session.h" |
9 #include "net/quic/quic_spdy_decompressor.h" | |
10 #include "net/quic/quic_utils.h" | 9 #include "net/quic/quic_utils.h" |
11 #include "net/quic/quic_write_blocked_list.h" | 10 #include "net/quic/quic_write_blocked_list.h" |
12 | 11 |
13 using base::StringPiece; | 12 using base::StringPiece; |
14 using std::min; | 13 using std::min; |
15 | 14 |
16 namespace net { | 15 namespace net { |
17 | 16 |
18 #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") | 17 #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") |
19 | 18 |
20 namespace { | 19 namespace { |
21 | 20 |
22 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail | 21 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail |
23 // to set a priority client-side, or cancel a stream before stripping the | 22 // to set a priority client-side, or cancel a stream before stripping the |
24 // priority from the wire server-side. In either case, start out with a | 23 // priority from the wire server-side. In either case, start out with a |
25 // priority in the middle. | 24 // priority in the middle. |
26 QuicPriority kDefaultPriority = 3; | 25 QuicPriority kDefaultPriority = 3; |
27 | 26 |
28 // Appends bytes from data into partial_data_buffer. Once partial_data_buffer | |
29 // reaches 4 bytes, copies the data into 'result' and clears | |
30 // partial_data_buffer. | |
31 // Returns the number of bytes consumed. | |
32 uint32 StripUint32(const char* data, uint32 data_len, | |
33 string* partial_data_buffer, | |
34 uint32* result) { | |
35 DCHECK_GT(4u, partial_data_buffer->length()); | |
36 size_t missing_size = 4 - partial_data_buffer->length(); | |
37 if (data_len < missing_size) { | |
38 StringPiece(data, data_len).AppendToString(partial_data_buffer); | |
39 return data_len; | |
40 } | |
41 StringPiece(data, missing_size).AppendToString(partial_data_buffer); | |
42 DCHECK_EQ(4u, partial_data_buffer->length()); | |
43 memcpy(result, partial_data_buffer->data(), 4); | |
44 partial_data_buffer->clear(); | |
45 return missing_size; | |
46 } | |
47 | |
48 } // namespace | 27 } // namespace |
49 | 28 |
50 QuicDataStream::QuicDataStream(QuicStreamId id, | 29 QuicDataStream::QuicDataStream(QuicStreamId id, |
51 QuicSession* session) | 30 QuicSession* session) |
52 : ReliableQuicStream(id, session), | 31 : ReliableQuicStream(id, session), |
53 visitor_(NULL), | 32 visitor_(NULL), |
54 headers_decompressed_(false), | 33 headers_decompressed_(false), |
55 priority_(kDefaultPriority), | 34 priority_(kDefaultPriority), |
56 headers_id_(0), | |
57 decompression_failed_(false), | 35 decompression_failed_(false), |
58 priority_parsed_(false) { | 36 priority_parsed_(false) { |
59 DCHECK_NE(kCryptoStreamId, id); | 37 DCHECK_NE(kCryptoStreamId, id); |
60 if (version() > QUIC_VERSION_12) { | 38 // Don't receive any callbacks from the sequencer until headers |
61 // Don't receive any callbacks from the sequencer until headers | 39 // are complete. |
62 // are complete. | 40 sequencer()->SetBlockedUntilFlush(); |
63 sequencer()->SetBlockedUntilFlush(); | |
64 } | |
65 } | 41 } |
66 | 42 |
67 QuicDataStream::~QuicDataStream() { | 43 QuicDataStream::~QuicDataStream() { |
68 } | 44 } |
69 | 45 |
70 size_t QuicDataStream::WriteHeaders(const SpdyHeaderBlock& header_block, | 46 size_t QuicDataStream::WriteHeaders(const SpdyHeaderBlock& header_block, |
71 bool fin) { | 47 bool fin) { |
72 size_t bytes_written = session()->WriteHeaders(id(), header_block, fin); | 48 size_t bytes_written = session()->WriteHeaders(id(), header_block, fin); |
73 if (fin) { | 49 if (fin) { |
74 // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent. | 50 // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent. |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
131 void QuicDataStream::set_priority(QuicPriority priority) { | 107 void QuicDataStream::set_priority(QuicPriority priority) { |
132 DCHECK_EQ(0u, stream_bytes_written()); | 108 DCHECK_EQ(0u, stream_bytes_written()); |
133 priority_ = priority; | 109 priority_ = priority; |
134 } | 110 } |
135 | 111 |
136 QuicPriority QuicDataStream::EffectivePriority() const { | 112 QuicPriority QuicDataStream::EffectivePriority() const { |
137 return priority(); | 113 return priority(); |
138 } | 114 } |
139 | 115 |
140 uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { | 116 uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { |
141 if (version() <= QUIC_VERSION_12) { | |
142 return ProcessRawData12(data, data_len); | |
143 } | |
144 | |
145 if (!FinishedReadingHeaders()) { | 117 if (!FinishedReadingHeaders()) { |
146 LOG(DFATAL) << "ProcessRawData called before headers have been finished"; | 118 LOG(DFATAL) << "ProcessRawData called before headers have been finished"; |
147 return 0; | 119 return 0; |
148 } | 120 } |
149 return ProcessData(data, data_len); | 121 return ProcessData(data, data_len); |
150 } | 122 } |
151 | 123 |
152 uint32 QuicDataStream::ProcessRawData12(const char* data, uint32 data_len) { | |
153 DCHECK_NE(0u, data_len); | |
154 | |
155 uint32 total_bytes_consumed = 0; | |
156 if (headers_id_ == 0u) { | |
157 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); | |
158 data += total_bytes_consumed; | |
159 data_len -= total_bytes_consumed; | |
160 if (data_len == 0 || total_bytes_consumed == 0) { | |
161 return total_bytes_consumed; | |
162 } | |
163 } | |
164 DCHECK_NE(0u, headers_id_); | |
165 | |
166 // Once the headers are finished, we simply pass the data through. | |
167 if (headers_decompressed_) { | |
168 // Some buffered header data remains. | |
169 if (!decompressed_headers_.empty()) { | |
170 ProcessHeaderData(); | |
171 } | |
172 if (decompressed_headers_.empty()) { | |
173 DVLOG(1) << "Delegating procesing to ProcessData"; | |
174 total_bytes_consumed += ProcessData(data, data_len); | |
175 } | |
176 return total_bytes_consumed; | |
177 } | |
178 | |
179 QuicHeaderId current_header_id = | |
180 session()->decompressor()->current_header_id(); | |
181 // Ensure that this header id looks sane. | |
182 if (headers_id_ < current_header_id || | |
183 headers_id_ > kMaxHeaderIdDelta + current_header_id) { | |
184 DVLOG(1) << ENDPOINT | |
185 << "Invalid headers for stream: " << id() | |
186 << " header_id: " << headers_id_ | |
187 << " current_header_id: " << current_header_id; | |
188 session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); | |
189 return total_bytes_consumed; | |
190 } | |
191 | |
192 // If we are head-of-line blocked on decompression, then back up. | |
193 if (current_header_id != headers_id_) { | |
194 session()->MarkDecompressionBlocked(headers_id_, id()); | |
195 DVLOG(1) << ENDPOINT | |
196 << "Unable to decompress header data for stream: " << id() | |
197 << " header_id: " << headers_id_; | |
198 return total_bytes_consumed; | |
199 } | |
200 | |
201 // Decompressed data will be delivered to decompressed_headers_. | |
202 size_t bytes_consumed = session()->decompressor()->DecompressData( | |
203 StringPiece(data, data_len), this); | |
204 DCHECK_NE(0u, bytes_consumed); | |
205 if (bytes_consumed > data_len) { | |
206 DCHECK(false) << "DecompressData returned illegal value"; | |
207 OnDecompressionError(); | |
208 return total_bytes_consumed; | |
209 } | |
210 total_bytes_consumed += bytes_consumed; | |
211 data += bytes_consumed; | |
212 data_len -= bytes_consumed; | |
213 | |
214 if (decompression_failed_) { | |
215 // The session will have been closed in OnDecompressionError. | |
216 return total_bytes_consumed; | |
217 } | |
218 | |
219 // Headers are complete if the decompressor has moved on to the | |
220 // next stream. | |
221 headers_decompressed_ = | |
222 session()->decompressor()->current_header_id() != headers_id_; | |
223 if (!headers_decompressed_) { | |
224 DCHECK_EQ(0u, data_len); | |
225 } | |
226 | |
227 ProcessHeaderData(); | |
228 | |
229 if (!headers_decompressed_ || !decompressed_headers_.empty()) { | |
230 return total_bytes_consumed; | |
231 } | |
232 | |
233 // We have processed all of the decompressed data but we might | |
234 // have some more raw data to process. | |
235 if (data_len > 0) { | |
236 total_bytes_consumed += ProcessData(data, data_len); | |
237 } | |
238 | |
239 // The sequencer will push any additional buffered frames if this data | |
240 // has been completely consumed. | |
241 return total_bytes_consumed; | |
242 } | |
243 | |
244 const IPEndPoint& QuicDataStream::GetPeerAddress() { | 124 const IPEndPoint& QuicDataStream::GetPeerAddress() { |
245 return session()->peer_address(); | 125 return session()->peer_address(); |
246 } | 126 } |
247 | 127 |
248 QuicSpdyCompressor* QuicDataStream::compressor() { | |
249 return session()->compressor(); | |
250 } | |
251 | |
252 bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { | 128 bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { |
253 return session()->GetSSLInfo(ssl_info); | 129 return session()->GetSSLInfo(ssl_info); |
254 } | 130 } |
255 | 131 |
256 uint32 QuicDataStream::ProcessHeaderData() { | 132 uint32 QuicDataStream::ProcessHeaderData() { |
257 if (decompressed_headers_.empty()) { | 133 if (decompressed_headers_.empty()) { |
258 return 0; | 134 return 0; |
259 } | 135 } |
260 | 136 |
261 size_t bytes_processed = ProcessData(decompressed_headers_.data(), | 137 size_t bytes_processed = ProcessData(decompressed_headers_.data(), |
262 decompressed_headers_.length()); | 138 decompressed_headers_.length()); |
263 if (bytes_processed == decompressed_headers_.length()) { | 139 if (bytes_processed == decompressed_headers_.length()) { |
264 decompressed_headers_.clear(); | 140 decompressed_headers_.clear(); |
265 } else { | 141 } else { |
266 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | 142 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); |
267 } | 143 } |
268 return bytes_processed; | 144 return bytes_processed; |
269 } | 145 } |
270 | 146 |
271 void QuicDataStream::OnDecompressorAvailable() { | |
272 DCHECK_LE(QUIC_VERSION_12, version()); | |
273 DCHECK_EQ(headers_id_, | |
274 session()->decompressor()->current_header_id()); | |
275 DCHECK(!headers_decompressed_); | |
276 DCHECK(!decompression_failed_); | |
277 DCHECK_EQ(0u, decompressed_headers_.length()); | |
278 | |
279 while (!headers_decompressed_) { | |
280 struct iovec iovec; | |
281 if (sequencer()->GetReadableRegions(&iovec, 1) == 0) { | |
282 return; | |
283 } | |
284 | |
285 size_t bytes_consumed = session()->decompressor()->DecompressData( | |
286 StringPiece(static_cast<char*>(iovec.iov_base), | |
287 iovec.iov_len), | |
288 this); | |
289 DCHECK_LE(bytes_consumed, iovec.iov_len); | |
290 if (decompression_failed_) { | |
291 return; | |
292 } | |
293 sequencer()->MarkConsumed(bytes_consumed); | |
294 | |
295 headers_decompressed_ = | |
296 session()->decompressor()->current_header_id() != headers_id_; | |
297 } | |
298 | |
299 // Either the headers are complete, or the all data as been consumed. | |
300 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. | |
301 if (IsDoneReading()) { | |
302 OnFinRead(); | |
303 } else if (FinishedReadingHeaders()) { | |
304 sequencer()->FlushBufferedFrames(); | |
305 } | |
306 } | |
307 | |
308 bool QuicDataStream::OnDecompressedData(StringPiece data) { | |
309 DCHECK_GE(QUIC_VERSION_12, version()); | |
310 data.AppendToString(&decompressed_headers_); | |
311 return true; | |
312 } | |
313 | |
314 void QuicDataStream::OnDecompressionError() { | |
315 DCHECK_LE(QUIC_VERSION_12, version()); | |
316 DCHECK(!decompression_failed_); | |
317 decompression_failed_ = true; | |
318 session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); | |
319 } | |
320 | |
321 void QuicDataStream::OnStreamHeaders(StringPiece headers_data) { | 147 void QuicDataStream::OnStreamHeaders(StringPiece headers_data) { |
322 DCHECK_LT(QUIC_VERSION_12, version()); | |
323 headers_data.AppendToString(&decompressed_headers_); | 148 headers_data.AppendToString(&decompressed_headers_); |
324 ProcessHeaderData(); | 149 ProcessHeaderData(); |
325 } | 150 } |
326 | 151 |
327 void QuicDataStream::OnStreamHeadersPriority(QuicPriority priority) { | 152 void QuicDataStream::OnStreamHeadersPriority(QuicPriority priority) { |
328 DCHECK(session()->connection()->is_server()); | 153 DCHECK(session()->connection()->is_server()); |
329 set_priority(priority); | 154 set_priority(priority); |
330 } | 155 } |
331 | 156 |
332 void QuicDataStream::OnStreamHeadersComplete(bool fin, size_t frame_len) { | 157 void QuicDataStream::OnStreamHeadersComplete(bool fin, size_t frame_len) { |
333 DCHECK_LT(QUIC_VERSION_12, version()); | |
334 headers_decompressed_ = true; | 158 headers_decompressed_ = true; |
335 if (fin) { | 159 if (fin) { |
336 sequencer()->OnStreamFrame(QuicStreamFrame(id(), fin, 0, IOVector())); | 160 sequencer()->OnStreamFrame(QuicStreamFrame(id(), fin, 0, IOVector())); |
337 } | 161 } |
338 ProcessHeaderData(); | 162 ProcessHeaderData(); |
339 if (FinishedReadingHeaders()) { | 163 if (FinishedReadingHeaders()) { |
340 sequencer()->FlushBufferedFrames(); | 164 sequencer()->FlushBufferedFrames(); |
341 } | 165 } |
342 } | 166 } |
343 | 167 |
344 void QuicDataStream::OnClose() { | 168 void QuicDataStream::OnClose() { |
345 ReliableQuicStream::OnClose(); | 169 ReliableQuicStream::OnClose(); |
346 | 170 |
347 if (visitor_) { | 171 if (visitor_) { |
348 Visitor* visitor = visitor_; | 172 Visitor* visitor = visitor_; |
349 // Calling Visitor::OnClose() may result the destruction of the visitor, | 173 // Calling Visitor::OnClose() may result the destruction of the visitor, |
350 // so we need to ensure we don't call it again. | 174 // so we need to ensure we don't call it again. |
351 visitor_ = NULL; | 175 visitor_ = NULL; |
352 visitor->OnClose(this); | 176 visitor->OnClose(this); |
353 } | 177 } |
354 } | 178 } |
355 | 179 |
356 uint32 QuicDataStream::StripPriorityAndHeaderId( | |
357 const char* data, uint32 data_len) { | |
358 uint32 total_bytes_parsed = 0; | |
359 | |
360 if (!priority_parsed_ && session()->connection()->is_server()) { | |
361 QuicPriority temporary_priority = priority_; | |
362 total_bytes_parsed = StripUint32( | |
363 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); | |
364 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { | |
365 priority_parsed_ = true; | |
366 | |
367 // Spdy priorities are inverted, so the highest numerical value is the | |
368 // lowest legal priority. | |
369 if (temporary_priority > QuicUtils::LowestPriority()) { | |
370 session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); | |
371 return 0; | |
372 } | |
373 priority_ = temporary_priority; | |
374 } | |
375 data += total_bytes_parsed; | |
376 data_len -= total_bytes_parsed; | |
377 } | |
378 if (data_len > 0 && headers_id_ == 0u) { | |
379 // The headers ID has not yet been read. Strip it from the beginning of | |
380 // the data stream. | |
381 total_bytes_parsed += StripUint32( | |
382 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); | |
383 } | |
384 return total_bytes_parsed; | |
385 } | |
386 | |
387 bool QuicDataStream::FinishedReadingHeaders() { | 180 bool QuicDataStream::FinishedReadingHeaders() { |
388 return (headers_id_ != 0 || version() > QUIC_VERSION_12) && | 181 return headers_decompressed_ && decompressed_headers_.empty(); |
389 headers_decompressed_ && decompressed_headers_.empty(); | |
390 } | 182 } |
391 | 183 |
392 void QuicDataStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) { | 184 void QuicDataStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) { |
393 DVLOG(1) << "Received WindowUpdateFrame for stream: " << id() | 185 DVLOG(1) << "Received WindowUpdateFrame for stream: " << id() |
394 << ", with byte offset: " << frame.byte_offset; | 186 << ", with byte offset: " << frame.byte_offset; |
395 // TODO(rjshade): Adjust flow control window. | 187 // TODO(rjshade): Adjust flow control window. |
396 } | 188 } |
397 | 189 |
398 } // namespace net | 190 } // namespace net |
OLD | NEW |