| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "net/quic/iovector.h" | 8 #include "net/quic/iovector.h" |
| 9 #include "net/quic/quic_flow_controller.h" |
| 9 #include "net/quic/quic_session.h" | 10 #include "net/quic/quic_session.h" |
| 10 #include "net/quic/quic_write_blocked_list.h" | 11 #include "net/quic/quic_write_blocked_list.h" |
| 11 | 12 |
| 12 using base::StringPiece; | 13 using base::StringPiece; |
| 13 using std::min; | 14 using std::min; |
| 14 | 15 |
| 15 namespace net { | 16 namespace net { |
| 16 | 17 |
| 17 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") | 18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
| 18 | 19 |
| (...skipping 24 matching lines...) Expand all Loading... |
| 43 wrote_last_data_(false), | 44 wrote_last_data_(false), |
| 44 num_original_packets_(0), | 45 num_original_packets_(0), |
| 45 num_original_bytes_(0), | 46 num_original_bytes_(0), |
| 46 num_retransmitted_packets_(0), | 47 num_retransmitted_packets_(0), |
| 47 num_retransmitted_bytes_(0) { | 48 num_retransmitted_bytes_(0) { |
| 48 } | 49 } |
| 49 | 50 |
| 50 virtual void OnAckNotification(int num_original_packets, | 51 virtual void OnAckNotification(int num_original_packets, |
| 51 int num_original_bytes, | 52 int num_original_bytes, |
| 52 int num_retransmitted_packets, | 53 int num_retransmitted_packets, |
| 53 int num_retransmitted_bytes) OVERRIDE { | 54 int num_retransmitted_bytes, |
| 55 QuicTime::Delta delta_largest_observed) |
| 56 OVERRIDE { |
| 54 DCHECK_LT(0, pending_acks_); | 57 DCHECK_LT(0, pending_acks_); |
| 55 --pending_acks_; | 58 --pending_acks_; |
| 56 num_original_packets_ += num_original_packets; | 59 num_original_packets_ += num_original_packets; |
| 57 num_original_bytes_ += num_original_bytes; | 60 num_original_bytes_ += num_original_bytes; |
| 58 num_retransmitted_packets_ += num_retransmitted_packets; | 61 num_retransmitted_packets_ += num_retransmitted_packets; |
| 59 num_retransmitted_bytes_ += num_retransmitted_bytes; | 62 num_retransmitted_bytes_ += num_retransmitted_bytes; |
| 60 | 63 |
| 61 if (wrote_last_data_ && pending_acks_ == 0) { | 64 if (wrote_last_data_ && pending_acks_ == 0) { |
| 62 delegate_->OnAckNotification(num_original_packets_, | 65 delegate_->OnAckNotification(num_original_packets_, |
| 63 num_original_bytes_, | 66 num_original_bytes_, |
| 64 num_retransmitted_packets_, | 67 num_retransmitted_packets_, |
| 65 num_retransmitted_bytes_); | 68 num_retransmitted_bytes_, |
| 69 delta_largest_observed); |
| 66 } | 70 } |
| 67 } | 71 } |
| 68 | 72 |
| 69 void WroteData(bool last_data) { | 73 void WroteData(bool last_data) { |
| 70 DCHECK(!wrote_last_data_); | 74 DCHECK(!wrote_last_data_); |
| 71 ++pending_acks_; | 75 ++pending_acks_; |
| 72 wrote_last_data_ = last_data; | 76 wrote_last_data_ = last_data; |
| 73 } | 77 } |
| 74 | 78 |
| 75 protected: | 79 protected: |
| (...skipping 30 matching lines...) Expand all Loading... |
| 106 } | 110 } |
| 107 | 111 |
| 108 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | 112 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) |
| 109 : sequencer_(this), | 113 : sequencer_(this), |
| 110 id_(id), | 114 id_(id), |
| 111 session_(session), | 115 session_(session), |
| 112 stream_bytes_read_(0), | 116 stream_bytes_read_(0), |
| 113 stream_bytes_written_(0), | 117 stream_bytes_written_(0), |
| 114 stream_error_(QUIC_STREAM_NO_ERROR), | 118 stream_error_(QUIC_STREAM_NO_ERROR), |
| 115 connection_error_(QUIC_NO_ERROR), | 119 connection_error_(QUIC_NO_ERROR), |
| 116 flow_control_send_limit_( | |
| 117 session_->config()->peer_initial_flow_control_window_bytes()), | |
| 118 max_flow_control_receive_window_bytes_( | |
| 119 session_->connection()->max_flow_control_receive_window_bytes()), | |
| 120 flow_control_receive_window_offset_bytes_( | |
| 121 session_->connection()->max_flow_control_receive_window_bytes()), | |
| 122 read_side_closed_(false), | 120 read_side_closed_(false), |
| 123 write_side_closed_(false), | 121 write_side_closed_(false), |
| 124 fin_buffered_(false), | 122 fin_buffered_(false), |
| 125 fin_sent_(false), | 123 fin_sent_(false), |
| 126 rst_sent_(false), | 124 rst_sent_(false), |
| 127 is_server_(session_->is_server()) { | 125 is_server_(session_->is_server()), |
| 128 DVLOG(1) << ENDPOINT << "Created stream " << id_ | 126 flow_controller_( |
| 129 << ", setting initial receive window to: " | 127 id_, |
| 130 << flow_control_receive_window_offset_bytes_ | 128 is_server_, |
| 131 << ", setting send window to: " << flow_control_send_limit_; | 129 session_->config()->peer_initial_flow_control_window_bytes(), |
| 130 session_->connection()->max_flow_control_receive_window_bytes(), |
| 131 session_->connection()->max_flow_control_receive_window_bytes()) { |
| 132 if (session_->connection()->version() < QUIC_VERSION_17) { |
| 133 flow_controller_.Disable(); |
| 134 } |
| 132 } | 135 } |
| 133 | 136 |
| 134 ReliableQuicStream::~ReliableQuicStream() { | 137 ReliableQuicStream::~ReliableQuicStream() { |
| 135 } | 138 } |
| 136 | 139 |
| 137 bool ReliableQuicStream::WillAcceptStreamFrame( | 140 bool ReliableQuicStream::WillAcceptStreamFrame( |
| 138 const QuicStreamFrame& frame) const { | 141 const QuicStreamFrame& frame) const { |
| 139 if (read_side_closed_) { | 142 if (read_side_closed_) { |
| 140 return true; | 143 return true; |
| 141 } | 144 } |
| (...skipping 10 matching lines...) Expand all Loading... |
| 152 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; | 155 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; |
| 153 // We don't want to be reading: blackhole the data. | 156 // We don't want to be reading: blackhole the data. |
| 154 return true; | 157 return true; |
| 155 } | 158 } |
| 156 | 159 |
| 157 // This count include duplicate data received. | 160 // This count include duplicate data received. |
| 158 stream_bytes_read_ += frame.data.TotalBufferSize(); | 161 stream_bytes_read_ += frame.data.TotalBufferSize(); |
| 159 | 162 |
| 160 bool accepted = sequencer_.OnStreamFrame(frame); | 163 bool accepted = sequencer_.OnStreamFrame(frame); |
| 161 | 164 |
| 162 if (IsFlowControlEnabled()) { | 165 if (version() >= QUIC_VERSION_17) { |
| 163 if (flow_control_receive_window_offset_bytes_ < TotalReceivedBytes()) { | 166 if (flow_controller_.FlowControlViolation()) { |
| 164 // TODO(rjshade): Lower severity from DFATAL once we have established that | |
| 165 // flow control is working correctly. | |
| 166 LOG(DFATAL) | |
| 167 << ENDPOINT << "Flow control violation on stream: " << id() | |
| 168 << ", our receive offset is: " | |
| 169 << flow_control_receive_window_offset_bytes_ | |
| 170 << ", we have consumed: " << sequencer_.num_bytes_consumed() | |
| 171 << ", we have buffered: " << sequencer_.num_bytes_buffered() | |
| 172 << ", total: " << TotalReceivedBytes(); | |
| 173 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); | 167 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); |
| 174 return false; | 168 return false; |
| 175 } | 169 } |
| 176 MaybeSendWindowUpdate(); | 170 MaybeSendWindowUpdate(); |
| 177 } | 171 } |
| 178 | 172 |
| 179 return accepted; | 173 return accepted; |
| 180 } | 174 } |
| 181 | 175 |
| 182 void ReliableQuicStream::MaybeSendWindowUpdate() { | 176 void ReliableQuicStream::MaybeSendWindowUpdate() { |
| 183 if (!IsFlowControlEnabled()) { | 177 if (version() >= QUIC_VERSION_17) { |
| 184 return; | 178 flow_controller_.MaybeSendWindowUpdate(session()->connection()); |
| 185 } | |
| 186 | |
| 187 // Send WindowUpdate to increase receive window if | |
| 188 // (receive window offset - consumed bytes) < (max window / 2). | |
| 189 // This is behaviour copied from SPDY. | |
| 190 size_t consumed_window = flow_control_receive_window_offset_bytes_ - | |
| 191 sequencer_.num_bytes_consumed(); | |
| 192 size_t threshold = (max_flow_control_receive_window_bytes_ / 2); | |
| 193 if (consumed_window < threshold) { | |
| 194 // Update our receive window. | |
| 195 flow_control_receive_window_offset_bytes_ += | |
| 196 (max_flow_control_receive_window_bytes_ - consumed_window); | |
| 197 DVLOG(1) << ENDPOINT << "Stream: " << id() | |
| 198 << ", sending WindowUpdate frame. " | |
| 199 << "Consumed bytes: " << sequencer_.num_bytes_consumed() | |
| 200 << ", Receive window offset: " | |
| 201 << flow_control_receive_window_offset_bytes_ | |
| 202 << ", Consumed window: " << consumed_window | |
| 203 << ", and threshold: " << threshold | |
| 204 << ". New receive window offset is: " | |
| 205 << flow_control_receive_window_offset_bytes_; | |
| 206 | |
| 207 // Inform the peer of our new receive window. | |
| 208 session()->connection()->SendWindowUpdate( | |
| 209 id(), flow_control_receive_window_offset_bytes_); | |
| 210 } | 179 } |
| 211 } | 180 } |
| 212 | 181 |
| 213 int ReliableQuicStream::num_frames_received() { | 182 int ReliableQuicStream::num_frames_received() { |
| 214 return sequencer_.num_frames_received(); | 183 return sequencer_.num_frames_received(); |
| 215 } | 184 } |
| 216 | 185 |
| 217 int ReliableQuicStream::num_duplicate_frames_received() { | 186 int ReliableQuicStream::num_duplicate_frames_received() { |
| 218 return sequencer_.num_duplicate_frames_received(); | 187 return sequencer_.num_duplicate_frames_received(); |
| 219 } | 188 } |
| (...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 344 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | 313 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { |
| 345 if (write_side_closed_) { | 314 if (write_side_closed_) { |
| 346 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | 315 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; |
| 347 return QuicConsumedData(0, false); | 316 return QuicConsumedData(0, false); |
| 348 } | 317 } |
| 349 | 318 |
| 350 // How much data we want to write. | 319 // How much data we want to write. |
| 351 size_t write_length = TotalIovecLength(iov, iov_count); | 320 size_t write_length = TotalIovecLength(iov, iov_count); |
| 352 | 321 |
| 353 // How much data we are allowed to write from flow control. | 322 // How much data we are allowed to write from flow control. |
| 354 size_t send_window = SendWindowSize(); | 323 size_t send_window = flow_controller_.SendWindowSize(); |
| 355 | 324 |
| 356 // A FIN with zero data payload should not be flow control blocked. | 325 // A FIN with zero data payload should not be flow control blocked. |
| 357 bool fin_with_zero_data = (fin && write_length == 0); | 326 bool fin_with_zero_data = (fin && write_length == 0); |
| 358 | 327 |
| 359 if (IsFlowControlEnabled()) { | 328 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
| 360 if (send_window == 0 && !fin_with_zero_data) { | 329 if (send_window == 0 && !fin_with_zero_data) { |
| 361 // Quick return if we can't send anything. | 330 // Quick return if we can't send anything. |
| 362 session()->connection()->SendBlocked(id()); | 331 flow_controller_.MaybeSendBlocked(session()->connection()); |
| 363 return QuicConsumedData(0, false); | 332 return QuicConsumedData(0, false); |
| 364 } | 333 } |
| 365 | 334 |
| 366 if (write_length > send_window) { | 335 if (write_length > send_window) { |
| 367 // Don't send the FIN if we aren't going to send all the data. | 336 // Don't send the FIN if we aren't going to send all the data. |
| 368 fin = false; | 337 fin = false; |
| 369 | 338 |
| 370 // Writing more data would be a violation of flow control. | 339 // Writing more data would be a violation of flow control. |
| 371 write_length = send_window; | 340 write_length = send_window; |
| 372 } | 341 } |
| 373 } | 342 } |
| 374 | 343 |
| 375 // Fill an IOVector with bytes from the iovec. | 344 // Fill an IOVector with bytes from the iovec. |
| 376 IOVector data; | 345 IOVector data; |
| 377 data.AppendIovecAtMostBytes(iov, iov_count, write_length); | 346 data.AppendIovecAtMostBytes(iov, iov_count, write_length); |
| 378 | 347 |
| 379 QuicConsumedData consumed_data = session()->WritevData( | 348 QuicConsumedData consumed_data = session()->WritevData( |
| 380 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); | 349 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); |
| 381 stream_bytes_written_ += consumed_data.bytes_consumed; | 350 stream_bytes_written_ += consumed_data.bytes_consumed; |
| 382 | 351 |
| 352 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
| 353 flow_controller_.AddBytesSent(consumed_data.bytes_consumed); |
| 354 } |
| 355 |
| 383 if (consumed_data.bytes_consumed == write_length) { | 356 if (consumed_data.bytes_consumed == write_length) { |
| 384 if (IsFlowControlEnabled() && write_length == send_window && | 357 if (!fin_with_zero_data) { |
| 385 !fin_with_zero_data) { | 358 if (version() >= QUIC_VERSION_17) { |
| 386 DVLOG(1) << ENDPOINT << "Stream " << id() | 359 flow_controller_.MaybeSendBlocked(session()->connection()); |
| 387 << " is flow control blocked. " | 360 } |
| 388 << "Send window: " << send_window | |
| 389 << ", stream_bytes_written: " << stream_bytes_written_ | |
| 390 << ", flow_control_send_limit: " | |
| 391 << flow_control_send_limit_; | |
| 392 // The entire send_window has been consumed, we are now flow control | |
| 393 // blocked. | |
| 394 session()->connection()->SendBlocked(id()); | |
| 395 } | 361 } |
| 396 if (fin && consumed_data.fin_consumed) { | 362 if (fin && consumed_data.fin_consumed) { |
| 397 fin_sent_ = true; | 363 fin_sent_ = true; |
| 398 CloseWriteSide(); | 364 CloseWriteSide(); |
| 399 } else if (fin && !consumed_data.fin_consumed) { | 365 } else if (fin && !consumed_data.fin_consumed) { |
| 400 session_->MarkWriteBlocked(id(), EffectivePriority()); | 366 session_->MarkWriteBlocked(id(), EffectivePriority()); |
| 401 } | 367 } |
| 402 } else { | 368 } else { |
| 403 session_->MarkWriteBlocked(id(), EffectivePriority()); | 369 session_->MarkWriteBlocked(id(), EffectivePriority()); |
| 404 } | 370 } |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 438 void ReliableQuicStream::OnClose() { | 404 void ReliableQuicStream::OnClose() { |
| 439 CloseReadSide(); | 405 CloseReadSide(); |
| 440 CloseWriteSide(); | 406 CloseWriteSide(); |
| 441 | 407 |
| 442 if (version() > QUIC_VERSION_13 && | 408 if (version() > QUIC_VERSION_13 && |
| 443 !fin_sent_ && !rst_sent_) { | 409 !fin_sent_ && !rst_sent_) { |
| 444 // For flow control accounting, we must tell the peer how many bytes we have | 410 // For flow control accounting, we must tell the peer how many bytes we have |
| 445 // written on this stream before termination. Done here if needed, using a | 411 // written on this stream before termination. Done here if needed, using a |
| 446 // RST frame. | 412 // RST frame. |
| 447 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); | 413 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); |
| 448 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); | 414 session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING, |
| 415 stream_bytes_written_); |
| 449 rst_sent_ = true; | 416 rst_sent_ = true; |
| 450 } | 417 } |
| 451 } | 418 } |
| 452 | 419 |
| 453 void ReliableQuicStream::OnWindowUpdateFrame( | 420 void ReliableQuicStream::OnWindowUpdateFrame( |
| 454 const QuicWindowUpdateFrame& frame) { | 421 const QuicWindowUpdateFrame& frame) { |
| 455 if (!IsFlowControlEnabled()) { | 422 if (!flow_controller_.IsEnabled()) { |
| 456 DLOG(DFATAL) << "Flow control not enabled! " << version(); | 423 DLOG(DFATAL) << "Flow control not enabled! " << version(); |
| 457 return; | 424 return; |
| 458 } | 425 } |
| 459 | 426 |
| 460 DVLOG(1) << ENDPOINT | 427 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { |
| 461 << "OnWindowUpdateFrame for stream " << id() | 428 // We can write again! |
| 462 << " with byte offset " << frame.byte_offset | 429 // TODO(rjshade): This does not respect priorities (e.g. multiple |
| 463 << " , current offset: " << flow_control_send_limit_ << ")."; | 430 // outstanding POSTs are unblocked on arrival of |
| 464 | 431 // SHLO with initial window). |
| 465 UpdateFlowControlSendLimit(frame.byte_offset); | 432 OnCanWrite(); |
| 466 } | |
| 467 | |
| 468 void ReliableQuicStream::UpdateFlowControlSendLimit(QuicStreamOffset offset) { | |
| 469 if (offset <= flow_control_send_limit_) { | |
| 470 DVLOG(1) << ENDPOINT << "Stream " << id() | |
| 471 << ", not changing window, current: " << flow_control_send_limit_ | |
| 472 << " new: " << offset; | |
| 473 // No change to our send window. | |
| 474 return; | |
| 475 } | 433 } |
| 476 | |
| 477 DVLOG(1) << ENDPOINT << "Stream " << id() | |
| 478 << ", changing window, current: " << flow_control_send_limit_ | |
| 479 << " new: " << offset; | |
| 480 // Send window has increased. | |
| 481 flow_control_send_limit_ = offset; | |
| 482 | |
| 483 // We can write again! | |
| 484 // TODO(rjshade): This does not respect priorities (e.g. multiple outstanding | |
| 485 // POSTs are unblocked on arrival of SHLO with initial window). | |
| 486 OnCanWrite(); | |
| 487 } | |
| 488 | |
| 489 bool ReliableQuicStream::IsFlowControlBlocked() const { | |
| 490 return IsFlowControlEnabled() && SendWindowSize() == 0; | |
| 491 } | |
| 492 | |
| 493 uint64 ReliableQuicStream::SendWindowSize() const { | |
| 494 return flow_control_send_limit_ - stream_bytes_written(); | |
| 495 } | |
| 496 | |
| 497 uint64 ReliableQuicStream::TotalReceivedBytes() const { | |
| 498 return sequencer_.num_bytes_consumed() + sequencer_.num_bytes_buffered(); | |
| 499 } | 434 } |
| 500 | 435 |
| 501 } // namespace net | 436 } // namespace net |
| OLD | NEW |