| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
| 6 | 6 |
| 7 #include "net/quic/quic_session.h" | 7 #include "net/quic/quic_session.h" |
| 8 #include "net/quic/quic_spdy_decompressor.h" | 8 #include "net/quic/quic_spdy_decompressor.h" |
| 9 #include "net/spdy/write_blocked_list.h" | 9 #include "net/spdy/write_blocked_list.h" |
| 10 | 10 |
| 11 using base::StringPiece; | 11 using base::StringPiece; |
| 12 using std::min; | 12 using std::min; |
| 13 | 13 |
| 14 namespace net { | 14 namespace net { |
| 15 | 15 |
| 16 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
| 17 |
| 16 namespace { | 18 namespace { |
| 17 | 19 |
| 18 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail | |
| 19 // to set a priority client-side, or cancel a stream before stripping the | |
| 20 // priority from the wire server-side. In either case, start out with a | |
| 21 // priority in the middle. | |
| 22 QuicPriority kDefaultPriority = 3; | |
| 23 | |
| 24 // Appends bytes from data into partial_data_buffer. Once partial_data_buffer | |
| 25 // reaches 4 bytes, copies the data into 'result' and clears | |
| 26 // partial_data_buffer. | |
| 27 // Returns the number of bytes consumed. | |
| 28 uint32 StripUint32(const char* data, uint32 data_len, | |
| 29 string* partial_data_buffer, | |
| 30 uint32* result) { | |
| 31 DCHECK_GT(4u, partial_data_buffer->length()); | |
| 32 size_t missing_size = 4 - partial_data_buffer->length(); | |
| 33 if (data_len < missing_size) { | |
| 34 StringPiece(data, data_len).AppendToString(partial_data_buffer); | |
| 35 return data_len; | |
| 36 } | |
| 37 StringPiece(data, missing_size).AppendToString(partial_data_buffer); | |
| 38 DCHECK_EQ(4u, partial_data_buffer->length()); | |
| 39 memcpy(result, partial_data_buffer->data(), 4); | |
| 40 partial_data_buffer->clear(); | |
| 41 return missing_size; | |
| 42 } | |
| 43 | |
| 44 struct iovec MakeIovec(StringPiece data) { | 20 struct iovec MakeIovec(StringPiece data) { |
| 45 struct iovec iov = {const_cast<char*>(data.data()), | 21 struct iovec iov = {const_cast<char*>(data.data()), |
| 46 static_cast<size_t>(data.size())}; | 22 static_cast<size_t>(data.size())}; |
| 47 return iov; | 23 return iov; |
| 48 } | 24 } |
| 49 | 25 |
| 50 } // namespace | 26 } // namespace |
| 51 | 27 |
| 52 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, | 28 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
| 53 QuicSession* session) | 29 QuicSession* session) |
| 54 : sequencer_(this), | 30 : sequencer_(this), |
| 55 id_(id), | 31 id_(id), |
| 56 session_(session), | 32 session_(session), |
| 57 visitor_(NULL), | |
| 58 stream_bytes_read_(0), | 33 stream_bytes_read_(0), |
| 59 stream_bytes_written_(0), | 34 stream_bytes_written_(0), |
| 60 headers_decompressed_(false), | |
| 61 priority_(kDefaultPriority), | |
| 62 headers_id_(0), | |
| 63 decompression_failed_(false), | |
| 64 stream_error_(QUIC_STREAM_NO_ERROR), | 35 stream_error_(QUIC_STREAM_NO_ERROR), |
| 65 connection_error_(QUIC_NO_ERROR), | 36 connection_error_(QUIC_NO_ERROR), |
| 66 read_side_closed_(false), | 37 read_side_closed_(false), |
| 67 write_side_closed_(false), | 38 write_side_closed_(false), |
| 68 priority_parsed_(false), | |
| 69 fin_buffered_(false), | 39 fin_buffered_(false), |
| 70 fin_sent_(false), | 40 fin_sent_(false), |
| 71 is_server_(session_->is_server()) { | 41 is_server_(session_->is_server()) { |
| 72 } | 42 } |
| 73 | 43 |
| 74 ReliableQuicStream::~ReliableQuicStream() { | 44 ReliableQuicStream::~ReliableQuicStream() { |
| 75 } | 45 } |
| 76 | 46 |
| 77 bool ReliableQuicStream::WillAcceptStreamFrame( | 47 bool ReliableQuicStream::WillAcceptStreamFrame( |
| 78 const QuicStreamFrame& frame) const { | 48 const QuicStreamFrame& frame) const { |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 135 | 105 |
| 136 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { | 106 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { |
| 137 session()->connection()->SendConnectionClose(error); | 107 session()->connection()->SendConnectionClose(error); |
| 138 } | 108 } |
| 139 | 109 |
| 140 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, | 110 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, |
| 141 const string& details) { | 111 const string& details) { |
| 142 session()->connection()->SendConnectionCloseWithDetails(error, details); | 112 session()->connection()->SendConnectionCloseWithDetails(error, details); |
| 143 } | 113 } |
| 144 | 114 |
| 145 size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { | |
| 146 if (headers_decompressed_ && decompressed_headers_.empty()) { | |
| 147 return sequencer_.Readv(iov, iov_len); | |
| 148 } | |
| 149 size_t bytes_consumed = 0; | |
| 150 size_t iov_index = 0; | |
| 151 while (iov_index < iov_len && | |
| 152 decompressed_headers_.length() > bytes_consumed) { | |
| 153 size_t bytes_to_read = min(iov[iov_index].iov_len, | |
| 154 decompressed_headers_.length() - bytes_consumed); | |
| 155 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); | |
| 156 memcpy(iov_ptr, | |
| 157 decompressed_headers_.data() + bytes_consumed, bytes_to_read); | |
| 158 bytes_consumed += bytes_to_read; | |
| 159 ++iov_index; | |
| 160 } | |
| 161 decompressed_headers_.erase(0, bytes_consumed); | |
| 162 return bytes_consumed; | |
| 163 } | |
| 164 | |
| 165 int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { | |
| 166 if (headers_decompressed_ && decompressed_headers_.empty()) { | |
| 167 return sequencer_.GetReadableRegions(iov, iov_len); | |
| 168 } | |
| 169 if (iov_len == 0) { | |
| 170 return 0; | |
| 171 } | |
| 172 iov[0].iov_base = static_cast<void*>( | |
| 173 const_cast<char*>(decompressed_headers_.data())); | |
| 174 iov[0].iov_len = decompressed_headers_.length(); | |
| 175 return 1; | |
| 176 } | |
| 177 | |
| 178 bool ReliableQuicStream::IsDoneReading() const { | |
| 179 if (!headers_decompressed_ || !decompressed_headers_.empty()) { | |
| 180 return false; | |
| 181 } | |
| 182 return sequencer_.IsClosed(); | |
| 183 } | |
| 184 | |
| 185 bool ReliableQuicStream::HasBytesToRead() const { | |
| 186 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); | |
| 187 } | |
| 188 | |
| 189 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { | |
| 190 return session_->peer_address(); | |
| 191 } | |
| 192 | |
| 193 QuicSpdyCompressor* ReliableQuicStream::compressor() { | |
| 194 return session_->compressor(); | |
| 195 } | |
| 196 | |
| 197 bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { | |
| 198 return session_->GetSSLInfo(ssl_info); | |
| 199 } | |
| 200 | |
| 201 void ReliableQuicStream::set_priority(QuicPriority priority) { | |
| 202 DCHECK_EQ(0u, stream_bytes_written_); | |
| 203 priority_ = priority; | |
| 204 } | |
| 205 | |
| 206 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { | 115 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { |
| 207 DCHECK(data.size() > 0 || fin); | 116 DCHECK(data.size() > 0 || fin); |
| 208 DCHECK(!fin_buffered_); | 117 DCHECK(!fin_buffered_); |
| 209 | 118 |
| 210 QuicConsumedData consumed_data(0, false); | 119 QuicConsumedData consumed_data(0, false); |
| 211 fin_buffered_ = fin; | 120 fin_buffered_ = fin; |
| 212 | 121 |
| 213 if (queued_data_.empty()) { | 122 if (queued_data_.empty()) { |
| 214 struct iovec iov(MakeIovec(data)); | 123 struct iovec iov(MakeIovec(data)); |
| 215 consumed_data = WritevData(&iov, 1, fin, NULL); | 124 consumed_data = WritevData(&iov, 1, fin, NULL); |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 267 CloseWriteSide(); | 176 CloseWriteSide(); |
| 268 } else if (fin && !consumed_data.fin_consumed) { | 177 } else if (fin && !consumed_data.fin_consumed) { |
| 269 session_->MarkWriteBlocked(id(), EffectivePriority()); | 178 session_->MarkWriteBlocked(id(), EffectivePriority()); |
| 270 } | 179 } |
| 271 } else { | 180 } else { |
| 272 session_->MarkWriteBlocked(id(), EffectivePriority()); | 181 session_->MarkWriteBlocked(id(), EffectivePriority()); |
| 273 } | 182 } |
| 274 return consumed_data; | 183 return consumed_data; |
| 275 } | 184 } |
| 276 | 185 |
| 277 QuicPriority ReliableQuicStream::EffectivePriority() const { | |
| 278 return priority(); | |
| 279 } | |
| 280 | |
| 281 void ReliableQuicStream::CloseReadSide() { | 186 void ReliableQuicStream::CloseReadSide() { |
| 282 if (read_side_closed_) { | 187 if (read_side_closed_) { |
| 283 return; | 188 return; |
| 284 } | 189 } |
| 285 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); | 190 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); |
| 286 | 191 |
| 287 read_side_closed_ = true; | 192 read_side_closed_ = true; |
| 288 if (write_side_closed_) { | 193 if (write_side_closed_) { |
| 289 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | 194 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
| 290 session_->CloseStream(id()); | 195 session_->CloseStream(id()); |
| 291 } | 196 } |
| 292 } | 197 } |
| 293 | 198 |
| 294 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { | |
| 295 DCHECK_NE(0u, data_len); | |
| 296 if (id() == kCryptoStreamId) { | |
| 297 // The crypto stream does not use compression. | |
| 298 return ProcessData(data, data_len); | |
| 299 } | |
| 300 | |
| 301 uint32 total_bytes_consumed = 0; | |
| 302 if (headers_id_ == 0u) { | |
| 303 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); | |
| 304 data += total_bytes_consumed; | |
| 305 data_len -= total_bytes_consumed; | |
| 306 if (data_len == 0 || total_bytes_consumed == 0) { | |
| 307 return total_bytes_consumed; | |
| 308 } | |
| 309 } | |
| 310 DCHECK_NE(0u, headers_id_); | |
| 311 | |
| 312 // Once the headers are finished, we simply pass the data through. | |
| 313 if (headers_decompressed_) { | |
| 314 // Some buffered header data remains. | |
| 315 if (!decompressed_headers_.empty()) { | |
| 316 ProcessHeaderData(); | |
| 317 } | |
| 318 if (decompressed_headers_.empty()) { | |
| 319 DVLOG(1) << "Delegating procesing to ProcessData"; | |
| 320 total_bytes_consumed += ProcessData(data, data_len); | |
| 321 } | |
| 322 return total_bytes_consumed; | |
| 323 } | |
| 324 | |
| 325 QuicHeaderId current_header_id = | |
| 326 session_->decompressor()->current_header_id(); | |
| 327 // Ensure that this header id looks sane. | |
| 328 if (headers_id_ < current_header_id || | |
| 329 headers_id_ > kMaxHeaderIdDelta + current_header_id) { | |
| 330 DVLOG(1) << ENDPOINT | |
| 331 << "Invalid headers for stream: " << id() | |
| 332 << " header_id: " << headers_id_ | |
| 333 << " current_header_id: " << current_header_id; | |
| 334 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); | |
| 335 return total_bytes_consumed; | |
| 336 } | |
| 337 | |
| 338 // If we are head-of-line blocked on decompression, then back up. | |
| 339 if (current_header_id != headers_id_) { | |
| 340 session_->MarkDecompressionBlocked(headers_id_, id()); | |
| 341 DVLOG(1) << ENDPOINT | |
| 342 << "Unable to decompress header data for stream: " << id() | |
| 343 << " header_id: " << headers_id_; | |
| 344 return total_bytes_consumed; | |
| 345 } | |
| 346 | |
| 347 // Decompressed data will be delivered to decompressed_headers_. | |
| 348 size_t bytes_consumed = session_->decompressor()->DecompressData( | |
| 349 StringPiece(data, data_len), this); | |
| 350 DCHECK_NE(0u, bytes_consumed); | |
| 351 if (bytes_consumed > data_len) { | |
| 352 DCHECK(false) << "DecompressData returned illegal value"; | |
| 353 OnDecompressionError(); | |
| 354 return total_bytes_consumed; | |
| 355 } | |
| 356 total_bytes_consumed += bytes_consumed; | |
| 357 data += bytes_consumed; | |
| 358 data_len -= bytes_consumed; | |
| 359 | |
| 360 if (decompression_failed_) { | |
| 361 // The session will have been closed in OnDecompressionError. | |
| 362 return total_bytes_consumed; | |
| 363 } | |
| 364 | |
| 365 // Headers are complete if the decompressor has moved on to the | |
| 366 // next stream. | |
| 367 headers_decompressed_ = | |
| 368 session_->decompressor()->current_header_id() != headers_id_; | |
| 369 if (!headers_decompressed_) { | |
| 370 DCHECK_EQ(0u, data_len); | |
| 371 } | |
| 372 | |
| 373 ProcessHeaderData(); | |
| 374 | |
| 375 if (!headers_decompressed_ || !decompressed_headers_.empty()) { | |
| 376 return total_bytes_consumed; | |
| 377 } | |
| 378 | |
| 379 // We have processed all of the decompressed data but we might | |
| 380 // have some more raw data to process. | |
| 381 if (data_len > 0) { | |
| 382 total_bytes_consumed += ProcessData(data, data_len); | |
| 383 } | |
| 384 | |
| 385 // The sequencer will push any additional buffered frames if this data | |
| 386 // has been completely consumed. | |
| 387 return total_bytes_consumed; | |
| 388 } | |
| 389 | |
| 390 uint32 ReliableQuicStream::ProcessHeaderData() { | |
| 391 if (decompressed_headers_.empty()) { | |
| 392 return 0; | |
| 393 } | |
| 394 | |
| 395 size_t bytes_processed = ProcessData(decompressed_headers_.data(), | |
| 396 decompressed_headers_.length()); | |
| 397 if (bytes_processed == decompressed_headers_.length()) { | |
| 398 decompressed_headers_.clear(); | |
| 399 } else { | |
| 400 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | |
| 401 } | |
| 402 return bytes_processed; | |
| 403 } | |
| 404 | |
| 405 void ReliableQuicStream::OnDecompressorAvailable() { | |
| 406 DCHECK_EQ(headers_id_, | |
| 407 session_->decompressor()->current_header_id()); | |
| 408 DCHECK(!headers_decompressed_); | |
| 409 DCHECK(!decompression_failed_); | |
| 410 DCHECK_EQ(0u, decompressed_headers_.length()); | |
| 411 | |
| 412 while (!headers_decompressed_) { | |
| 413 struct iovec iovec; | |
| 414 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { | |
| 415 return; | |
| 416 } | |
| 417 | |
| 418 size_t bytes_consumed = session_->decompressor()->DecompressData( | |
| 419 StringPiece(static_cast<char*>(iovec.iov_base), | |
| 420 iovec.iov_len), | |
| 421 this); | |
| 422 DCHECK_LE(bytes_consumed, iovec.iov_len); | |
| 423 if (decompression_failed_) { | |
| 424 return; | |
| 425 } | |
| 426 sequencer_.MarkConsumed(bytes_consumed); | |
| 427 | |
| 428 headers_decompressed_ = | |
| 429 session_->decompressor()->current_header_id() != headers_id_; | |
| 430 } | |
| 431 | |
| 432 // Either the headers are complete, or the all data as been consumed. | |
| 433 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. | |
| 434 if (IsDoneReading()) { | |
| 435 OnFinRead(); | |
| 436 } else if (headers_decompressed_ && decompressed_headers_.empty()) { | |
| 437 sequencer_.FlushBufferedFrames(); | |
| 438 } | |
| 439 } | |
| 440 | |
| 441 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { | |
| 442 data.AppendToString(&decompressed_headers_); | |
| 443 return true; | |
| 444 } | |
| 445 | |
| 446 void ReliableQuicStream::OnDecompressionError() { | |
| 447 DCHECK(!decompression_failed_); | |
| 448 decompression_failed_ = true; | |
| 449 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); | |
| 450 } | |
| 451 | |
| 452 | |
| 453 void ReliableQuicStream::CloseWriteSide() { | 199 void ReliableQuicStream::CloseWriteSide() { |
| 454 if (write_side_closed_) { | 200 if (write_side_closed_) { |
| 455 return; | 201 return; |
| 456 } | 202 } |
| 457 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); | 203 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); |
| 458 | 204 |
| 459 write_side_closed_ = true; | 205 write_side_closed_ = true; |
| 460 if (read_side_closed_) { | 206 if (read_side_closed_) { |
| 461 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | 207 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
| 462 session_->CloseStream(id()); | 208 session_->CloseStream(id()); |
| 463 } | 209 } |
| 464 } | 210 } |
| 465 | 211 |
| 466 bool ReliableQuicStream::HasBufferedData() { | 212 bool ReliableQuicStream::HasBufferedData() { |
| 467 return !queued_data_.empty(); | 213 return !queued_data_.empty(); |
| 468 } | 214 } |
| 469 | 215 |
| 470 void ReliableQuicStream::OnClose() { | 216 void ReliableQuicStream::OnClose() { |
| 471 CloseReadSide(); | 217 CloseReadSide(); |
| 472 CloseWriteSide(); | 218 CloseWriteSide(); |
| 473 | |
| 474 if (visitor_) { | |
| 475 Visitor* visitor = visitor_; | |
| 476 // Calling Visitor::OnClose() may result the destruction of the visitor, | |
| 477 // so we need to ensure we don't call it again. | |
| 478 visitor_ = NULL; | |
| 479 visitor->OnClose(this); | |
| 480 } | |
| 481 } | |
| 482 | |
| 483 uint32 ReliableQuicStream::StripPriorityAndHeaderId( | |
| 484 const char* data, uint32 data_len) { | |
| 485 uint32 total_bytes_parsed = 0; | |
| 486 | |
| 487 if (!priority_parsed_ && session_->connection()->is_server()) { | |
| 488 QuicPriority temporary_priority = priority_; | |
| 489 total_bytes_parsed = StripUint32( | |
| 490 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); | |
| 491 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) { | |
| 492 priority_parsed_ = true; | |
| 493 | |
| 494 // Spdy priorities are inverted, so the highest numerical value is the | |
| 495 // lowest legal priority. | |
| 496 if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { | |
| 497 session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); | |
| 498 return 0; | |
| 499 } | |
| 500 priority_ = temporary_priority; | |
| 501 } | |
| 502 data += total_bytes_parsed; | |
| 503 data_len -= total_bytes_parsed; | |
| 504 } | |
| 505 if (data_len > 0 && headers_id_ == 0u) { | |
| 506 // The headers ID has not yet been read. Strip it from the beginning of | |
| 507 // the data stream. | |
| 508 total_bytes_parsed += StripUint32( | |
| 509 data, data_len, &headers_id_and_priority_buffer_, &headers_id_); | |
| 510 } | |
| 511 return total_bytes_parsed; | |
| 512 } | 219 } |
| 513 | 220 |
| 514 } // namespace net | 221 } // namespace net |
| OLD | NEW |