OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
6 | 6 |
7 #include "net/quic/quic_session.h" | 7 #include "net/quic/quic_session.h" |
8 #include "net/quic/quic_spdy_decompressor.h" | 8 #include "net/quic/quic_spdy_decompressor.h" |
9 #include "net/spdy/write_blocked_list.h" | 9 #include "net/spdy/write_blocked_list.h" |
10 | 10 |
11 using base::StringPiece; | 11 using base::StringPiece; |
12 using std::min; | 12 using std::min; |
13 | 13 |
14 namespace net { | 14 namespace net { |
15 | 15 |
| 16 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
| 17 |
16 namespace { | 18 namespace { |
17 | 19 |
18 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail | |
19 // to set a priority client-side, or cancel a stream before stripping the | |
20 // priority from the wire server-side. In either case, start out with a | |
21 // priority in the middle. | |
22 QuicPriority kDefaultPriority = 3; | |
23 | |
24 // Appends bytes from data into partial_data_buffer. Once partial_data_buffer | |
25 // reaches 4 bytes, copies the data into 'result' and clears | |
26 // partial_data_buffer. | |
27 // Returns the number of bytes consumed. | |
28 uint32 StripUint32(const char* data, uint32 data_len, | |
29 string* partial_data_buffer, | |
30 uint32* result) { | |
31 DCHECK_GT(4u, partial_data_buffer->length()); | |
32 size_t missing_size = 4 - partial_data_buffer->length(); | |
33 if (data_len < missing_size) { | |
34 StringPiece(data, data_len).AppendToString(partial_data_buffer); | |
35 return data_len; | |
36 } | |
37 StringPiece(data, missing_size).AppendToString(partial_data_buffer); | |
38 DCHECK_EQ(4u, partial_data_buffer->length()); | |
39 memcpy(result, partial_data_buffer->data(), 4); | |
40 partial_data_buffer->clear(); | |
41 return missing_size; | |
42 } | |
43 | |
44 struct iovec MakeIovec(StringPiece data) { | 20 struct iovec MakeIovec(StringPiece data) { |
45 struct iovec iov = {const_cast<char*>(data.data()), | 21 struct iovec iov = {const_cast<char*>(data.data()), |
46 static_cast<size_t>(data.size())}; | 22 static_cast<size_t>(data.size())}; |
47 return iov; | 23 return iov; |
48 } | 24 } |
49 | 25 |
50 } // namespace | 26 } // namespace |
51 | 27 |
52 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, | 28 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
53 QuicSession* session) | 29 QuicSession* session) |
54 : sequencer_(this), | 30 : sequencer_(this), |
55 id_(id), | 31 id_(id), |
56 session_(session), | 32 session_(session), |
57 visitor_(NULL), | |
58 stream_bytes_read_(0), | 33 stream_bytes_read_(0), |
59 stream_bytes_written_(0), | 34 stream_bytes_written_(0), |
60 headers_decompressed_(false), | |
61 priority_(kDefaultPriority), | |
62 headers_id_(0), | |
63 decompression_failed_(false), | |
64 stream_error_(QUIC_STREAM_NO_ERROR), | 35 stream_error_(QUIC_STREAM_NO_ERROR), |
65 connection_error_(QUIC_NO_ERROR), | 36 connection_error_(QUIC_NO_ERROR), |
66 read_side_closed_(false), | 37 read_side_closed_(false), |
67 write_side_closed_(false), | 38 write_side_closed_(false), |
68 priority_parsed_(false), | |
69 fin_buffered_(false), | 39 fin_buffered_(false), |
70 fin_sent_(false), | 40 fin_sent_(false), |
71 is_server_(session_->is_server()) { | 41 is_server_(session_->is_server()) { |
72 } | 42 } |
73 | 43 |
74 ReliableQuicStream::~ReliableQuicStream() { | 44 ReliableQuicStream::~ReliableQuicStream() { |
75 } | 45 } |
76 | 46 |
77 bool ReliableQuicStream::WillAcceptStreamFrame( | 47 bool ReliableQuicStream::WillAcceptStreamFrame( |
78 const QuicStreamFrame& frame) const { | 48 const QuicStreamFrame& frame) const { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
135 | 105 |
136 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { | 106 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { |
137 session()->connection()->SendConnectionClose(error); | 107 session()->connection()->SendConnectionClose(error); |
138 } | 108 } |
139 | 109 |
140 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, | 110 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, |
141 const string& details) { | 111 const string& details) { |
142 session()->connection()->SendConnectionCloseWithDetails(error, details); | 112 session()->connection()->SendConnectionCloseWithDetails(error, details); |
143 } | 113 } |
144 | 114 |
145 size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { | |
146 if (headers_decompressed_ && decompressed_headers_.empty()) { | |
147 return sequencer_.Readv(iov, iov_len); | |
148 } | |
149 size_t bytes_consumed = 0; | |
150 size_t iov_index = 0; | |
151 while (iov_index < iov_len && | |
152 decompressed_headers_.length() > bytes_consumed) { | |
153 size_t bytes_to_read = min(iov[iov_index].iov_len, | |
154 decompressed_headers_.length() - bytes_consumed); | |
155 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); | |
156 memcpy(iov_ptr, | |
157 decompressed_headers_.data() + bytes_consumed, bytes_to_read); | |
158 bytes_consumed += bytes_to_read; | |
159 ++iov_index; | |
160 } | |
161 decompressed_headers_.erase(0, bytes_consumed); | |
162 return bytes_consumed; | |
163 } | |
164 | |
165 int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { | |
166 if (headers_decompressed_ && decompressed_headers_.empty()) { | |
167 return sequencer_.GetReadableRegions(iov, iov_len); | |
168 } | |
169 if (iov_len == 0) { | |
170 return 0; | |
171 } | |
172 iov[0].iov_base = static_cast<void*>( | |
173 const_cast<char*>(decompressed_headers_.data())); | |
174 iov[0].iov_len = decompressed_headers_.length(); | |
175 return 1; | |
176 } | |
177 | |
178 bool ReliableQuicStream::IsDoneReading() const { | |
179 if (!headers_decompressed_ || !decompressed_headers_.empty()) { | |
180 return false; | |
181 } | |
182 return sequencer_.IsClosed(); | |
183 } | |
184 | |
185 bool ReliableQuicStream::HasBytesToRead() const { | |
186 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); | |
187 } | |
188 | |
189 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { | |
190 return session_->peer_address(); | |
191 } | |
192 | |
193 QuicSpdyCompressor* ReliableQuicStream::compressor() { | |
194 return session_->compressor(); | |
195 } | |
196 | |
197 bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { | |
198 return session_->GetSSLInfo(ssl_info); | |
199 } | |
200 | |
201 void ReliableQuicStream::set_priority(QuicPriority priority) { | |
202 DCHECK_EQ(0u, stream_bytes_written_); | |
203 priority_ = priority; | |
204 } | |
205 | |
206 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { | 115 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { |
207 DCHECK(data.size() > 0 || fin); | 116 DCHECK(data.size() > 0 || fin); |
208 DCHECK(!fin_buffered_); | 117 DCHECK(!fin_buffered_); |
209 | 118 |
210 QuicConsumedData consumed_data(0, false); | 119 QuicConsumedData consumed_data(0, false); |
211 fin_buffered_ = fin; | 120 fin_buffered_ = fin; |
212 | 121 |
213 if (queued_data_.empty()) { | 122 if (queued_data_.empty()) { |
214 struct iovec iov(MakeIovec(data)); | 123 struct iovec iov(MakeIovec(data)); |
215 consumed_data = WritevData(&iov, 1, fin, NULL); | 124 consumed_data = WritevData(&iov, 1, fin, NULL); |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
267 CloseWriteSide(); | 176 CloseWriteSide(); |
268 } else if (fin && !consumed_data.fin_consumed) { | 177 } else if (fin && !consumed_data.fin_consumed) { |
269 session_->MarkWriteBlocked(id(), EffectivePriority()); | 178 session_->MarkWriteBlocked(id(), EffectivePriority()); |
270 } | 179 } |
271 } else { | 180 } else { |
272 session_->MarkWriteBlocked(id(), EffectivePriority()); | 181 session_->MarkWriteBlocked(id(), EffectivePriority()); |
273 } | 182 } |
274 return consumed_data; | 183 return consumed_data; |
275 } | 184 } |
276 | 185 |
277 QuicPriority ReliableQuicStream::EffectivePriority() const { | |
278 return priority(); | |
279 } | |
280 | |
281 void ReliableQuicStream::CloseReadSide() { | 186 void ReliableQuicStream::CloseReadSide() { |
282 if (read_side_closed_) { | 187 if (read_side_closed_) { |
283 return; | 188 return; |
284 } | 189 } |
285 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); | 190 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); |
286 | 191 |
287 read_side_closed_ = true; | 192 read_side_closed_ = true; |
288 if (write_side_closed_) { | 193 if (write_side_closed_) { |
289 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | 194 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
290 session_->CloseStream(id()); | 195 session_->CloseStream(id()); |
291 } | 196 } |
292 } | 197 } |
293 | 198 |
294 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { | |
295 DCHECK_NE(0u, data_len); | |
296 if (id() == kCryptoStreamId) { | |
297 // The crypto stream does not use compression. | |
298 return ProcessData(data, data_len); | |
299 } | |
300 | |
301 uint32 total_bytes_consumed = 0; | |
302 if (headers_id_ == 0u) { | |
303 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); | |
304 data += total_bytes_consumed; | |
305 data_len -= total_bytes_consumed; | |
306 if (data_len == 0 || total_bytes_consumed == 0) { | |
307 return total_bytes_consumed; | |
308 } | |
309 } | |
310 DCHECK_NE(0u, headers_id_); | |
311 | |
312 // Once the headers are finished, we simply pass the data through. | |
313 if (headers_decompressed_) { | |
314 // Some buffered header data remains. | |
315 if (!decompressed_headers_.empty()) { | |
316 ProcessHeaderData(); | |
317 } | |
318 if (decompressed_headers_.empty()) { | |
319 DVLOG(1) << "Delegating procesing to ProcessData"; | |
320 total_bytes_consumed += ProcessData(data, data_len); | |
321 } | |
322 return total_bytes_consumed; | |
323 } | |
324 | |
325 QuicHeaderId current_header_id = | |
326 session_->decompressor()->current_header_id(); | |
327 // Ensure that this header id looks sane. | |
328 if (headers_id_ < current_header_id || | |
329 headers_id_ > kMaxHeaderIdDelta + current_header_id) { | |
330 DVLOG(1) << ENDPOINT | |
331 << "Invalid headers for stream: " << id() | |
332 << " header_id: " << headers_id_ | |
333 << " current_header_id: " << current_header_id; | |
334 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); | |
335 return total_bytes_consumed; | |
336 } | |
337 | |
338 // If we are head-of-line blocked on decompression, then back up. | |
339 if (current_header_id != headers_id_) { | |
340 session_->MarkDecompressionBlocked(headers_id_, id()); | |
341 DVLOG(1) << ENDPOINT | |
342 << "Unable to decompress header data for stream: " << id() | |
343 << " header_id: " << headers_id_; | |
344 return total_bytes_consumed; | |
345 } | |
346 | |
347 // Decompressed data will be delivered to decompressed_headers_. | |
348 size_t bytes_consumed = session_->decompressor()->DecompressData( | |
349 StringPiece(data, data_len), this); | |
350 DCHECK_NE(0u, bytes_consumed); | |
351 if (bytes_consumed > data_len) { | |
352 DCHECK(false) << "DecompressData returned illegal value"; | |
353 OnDecompressionError(); | |
354 return total_bytes_consumed; | |
355 } | |
356 total_bytes_consumed += bytes_consumed; | |
357 data += bytes_consumed; | |
358 data_len -= bytes_consumed; | |
359 | |
360 if (decompression_failed_) { | |
361 // The session will have been closed in OnDecompressionError. | |
362 return total_bytes_consumed; | |
363 } | |
364 | |
365 // Headers are complete if the decompressor has moved on to the | |
366 // next stream. | |
367 headers_decompressed_ = | |
368 session_->decompressor()->current_header_id() != headers_id_; | |
369 if (!headers_decompressed_) { | |
370 DCHECK_EQ(0u, data_len); | |
371 } | |
372 | |
373 ProcessHeaderData(); | |
374 | |
375 if (!headers_decompressed_ || !decompressed_headers_.empty()) { | |
376 return total_bytes_consumed; | |
377 } | |
378 | |
379 // We have processed all of the decompressed data but we might | |
380 // have some more raw data to process. | |
381 if (data_len > 0) { | |
382 total_bytes_consumed += ProcessData(data, data_len); | |
383 } | |
384 | |
385 // The sequencer will push any additional buffered frames if this data | |
386 // has been completely consumed. | |
387 return total_bytes_consumed; | |
388 } | |
389 | |
390 uint32 ReliableQuicStream::ProcessHeaderData() { | |
391 if (decompressed_headers_.empty()) { | |
392 return 0; | |
393 } | |
394 | |
395 size_t bytes_processed = ProcessData(decompressed_headers_.data(), | |
396 decompressed_headers_.length()); | |
397 if (bytes_processed == decompressed_headers_.length()) { | |
398 decompressed_headers_.clear(); | |
399 } else { | |
400 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | |
401 } | |
402 return bytes_processed; | |
403 } | |
404 | |
405 void ReliableQuicStream::OnDecompressorAvailable() { | |
406 DCHECK_EQ(headers_id_, | |
407 session_->decompressor()->current_header_id()); | |
408 DCHECK(!headers_decompressed_); | |
409 DCHECK(!decompression_failed_); | |
410 DCHECK_EQ(0u, decompressed_headers_.length()); | |
411 | |
412 while (!headers_decompressed_) { | |
413 struct iovec iovec; | |
414 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { | |
415 return; | |
416 } | |
417 | |
418 size_t bytes_consumed = session_->decompressor()->DecompressData( | |
419 StringPiece(static_cast<char*>(iovec.iov_base), | |
420 iovec.iov_len), | |
421 this); | |
422 DCHECK_LE(bytes_consumed, iovec.iov_len); | |
423 if (decompression_failed_) { | |
424 return; | |
425 } | |
426 sequencer_.MarkConsumed(bytes_consumed); | |
427 | |
428 headers_decompressed_ = | |
429 session_->decompressor()->current_header_id() != headers_id_; | |
430 } | |
431 | |
432 // Either the headers are complete, or the all data as been consumed. | |
433 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. | |
434 if (IsDoneReading()) { | |
435 OnFinRead(); | |
436 } else if (headers_decompressed_ && decompressed_headers_.empty()) { | |
437 sequencer_.FlushBufferedFrames(); | |
438 } | |
439 } | |
440 | |
441 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { | |
442 data.AppendToString(&decompressed_headers_); | |
443 return true; | |
444 } | |
445 | |
446 void ReliableQuicStream::OnDecompressionError() { | |
447 DCHECK(!decompression_failed_); | |
448 decompression_failed_ = true; | |
449 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); | |
450 } | |
451 | |
452 | |
453 void ReliableQuicStream::CloseWriteSide() { | 199 void ReliableQuicStream::CloseWriteSide() { |
454 if (write_side_closed_) { | 200 if (write_side_closed_) { |
455 return; | 201 return; |
456 } | 202 } |
457 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); | 203 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); |
458 | 204 |
459 write_side_closed_ = true; | 205 write_side_closed_ = true; |
460 if (read_side_closed_) { | 206 if (read_side_closed_) { |
461 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | 207 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
462 session_->CloseStream(id()); | 208 session_->CloseStream(id()); |
463 } | 209 } |
464 } | 210 } |
465 | 211 |
466 bool ReliableQuicStream::HasBufferedData() { | 212 bool ReliableQuicStream::HasBufferedData() { |
467 return !queued_data_.empty(); | 213 return !queued_data_.empty(); |
468 } | 214 } |
469 | 215 |
470 void ReliableQuicStream::OnClose() { | 216 void ReliableQuicStream::OnClose() { |
471 CloseReadSide(); | 217 CloseReadSide(); |
472 CloseWriteSide(); | 218 CloseWriteSide(); |
473 | |
474 if (visitor_) { | |
475 Visitor* visitor = visitor_; | |
476 // Calling Visitor::OnClose() may result the destruction of the visitor, | |
477 // so we need to ensure we don't call it again. | |
478 visitor_ = NULL; | |
479 visitor->OnClose(this); | |
480 } | |
481 } | |
482 | |
483 uint32 ReliableQuicStream::StripPriorityAndHeaderId( | |
484 const char* data, uint32 data_len) { | |
485 uint32 total_bytes_parsed = 0; | |
486 | |
487 if (!priority_parsed_ && session_->connection()->is_server()) { | |
488 QuicPriority temporary_priority = priority_; | |
489 total_bytes_parsed = StripUint32( | |
490 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); | |
491 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) { | |
492 priority_parsed_ = true; | |
493 | |
494 // Spdy priorities are inverted, so the highest numerical value is the | |
495 // lowest legal priority. | |
496 if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { | |
497 session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); | |
498 return 0; | |
499 } | |
500 priority_ = temporary_priority; | |
501 } | |
502 data += total_bytes_parsed; | |
503 data_len -= total_bytes_parsed; | |
504 } | |
505 if (data_len > 0 && headers_id_ == 0u) { | |
506 // The headers ID has not yet been read. Strip it from the beginning of | |
507 // the data stream. | |
508 total_bytes_parsed += StripUint32( | |
509 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); | |
510 } | |
511 return total_bytes_parsed; | |
512 } | 219 } |
513 | 220 |
514 } // namespace net | 221 } // namespace net |
OLD | NEW |