| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "net/quic/iovector.h" | 8 #include "net/quic/iovector.h" |
| 9 #include "net/quic/quic_flow_controller.h" |
| 9 #include "net/quic/quic_session.h" | 10 #include "net/quic/quic_session.h" |
| 10 #include "net/quic/quic_write_blocked_list.h" | 11 #include "net/quic/quic_write_blocked_list.h" |
| 11 | 12 |
| 12 using base::StringPiece; | 13 using base::StringPiece; |
| 13 using std::min; | 14 using std::min; |
| 14 | 15 |
| 15 namespace net { | 16 namespace net { |
| 16 | 17 |
| 17 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") | 18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
| 18 | 19 |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 109 } | 110 } |
| 110 | 111 |
| 111 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | 112 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) |
| 112 : sequencer_(this), | 113 : sequencer_(this), |
| 113 id_(id), | 114 id_(id), |
| 114 session_(session), | 115 session_(session), |
| 115 stream_bytes_read_(0), | 116 stream_bytes_read_(0), |
| 116 stream_bytes_written_(0), | 117 stream_bytes_written_(0), |
| 117 stream_error_(QUIC_STREAM_NO_ERROR), | 118 stream_error_(QUIC_STREAM_NO_ERROR), |
| 118 connection_error_(QUIC_NO_ERROR), | 119 connection_error_(QUIC_NO_ERROR), |
| 119 flow_control_send_limit_( | |
| 120 session_->config()->peer_initial_flow_control_window_bytes()), | |
| 121 max_flow_control_receive_window_bytes_( | |
| 122 session_->connection()->max_flow_control_receive_window_bytes()), | |
| 123 flow_control_receive_window_offset_bytes_( | |
| 124 session_->connection()->max_flow_control_receive_window_bytes()), | |
| 125 read_side_closed_(false), | 120 read_side_closed_(false), |
| 126 write_side_closed_(false), | 121 write_side_closed_(false), |
| 127 fin_buffered_(false), | 122 fin_buffered_(false), |
| 128 fin_sent_(false), | 123 fin_sent_(false), |
| 129 rst_sent_(false), | 124 rst_sent_(false), |
| 130 is_server_(session_->is_server()) { | 125 is_server_(session_->is_server()), |
| 131 DVLOG(1) << ENDPOINT << "Created stream " << id_ | 126 flow_controller_( |
| 132 << ", setting initial receive window to: " | 127 id_, |
| 133 << flow_control_receive_window_offset_bytes_ | 128 is_server_, |
| 134 << ", setting send window to: " << flow_control_send_limit_; | 129 session_->config()->peer_initial_flow_control_window_bytes(), |
| 130 session_->connection()->max_flow_control_receive_window_bytes(), |
| 131 session_->connection()->max_flow_control_receive_window_bytes()) { |
| 135 } | 132 } |
| 136 | 133 |
| 137 ReliableQuicStream::~ReliableQuicStream() { | 134 ReliableQuicStream::~ReliableQuicStream() { |
| 138 } | 135 } |
| 139 | 136 |
| 140 bool ReliableQuicStream::WillAcceptStreamFrame( | 137 bool ReliableQuicStream::WillAcceptStreamFrame( |
| 141 const QuicStreamFrame& frame) const { | 138 const QuicStreamFrame& frame) const { |
| 142 if (read_side_closed_) { | 139 if (read_side_closed_) { |
| 143 return true; | 140 return true; |
| 144 } | 141 } |
| (...skipping 10 matching lines...) Expand all Loading... |
| 155 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; | 152 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; |
| 156 // We don't want to be reading: blackhole the data. | 153 // We don't want to be reading: blackhole the data. |
| 157 return true; | 154 return true; |
| 158 } | 155 } |
| 159 | 156 |
| 160 // This count include duplicate data received. | 157 // This count include duplicate data received. |
| 161 stream_bytes_read_ += frame.data.TotalBufferSize(); | 158 stream_bytes_read_ += frame.data.TotalBufferSize(); |
| 162 | 159 |
| 163 bool accepted = sequencer_.OnStreamFrame(frame); | 160 bool accepted = sequencer_.OnStreamFrame(frame); |
| 164 | 161 |
| 165 if (IsFlowControlEnabled()) { | 162 if (version() >= QUIC_VERSION_17) { |
| 166 if (flow_control_receive_window_offset_bytes_ < TotalReceivedBytes()) { | 163 if (flow_controller_.FlowControlViolation()) { |
| 167 // TODO(rjshade): Lower severity from DFATAL once we have established that | |
| 168 // flow control is working correctly. | |
| 169 LOG(DFATAL) | |
| 170 << ENDPOINT << "Flow control violation on stream: " << id() | |
| 171 << ", our receive offset is: " | |
| 172 << flow_control_receive_window_offset_bytes_ | |
| 173 << ", we have consumed: " << sequencer_.num_bytes_consumed() | |
| 174 << ", we have buffered: " << sequencer_.num_bytes_buffered() | |
| 175 << ", total: " << TotalReceivedBytes(); | |
| 176 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); | 164 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); |
| 177 return false; | 165 return false; |
| 178 } | 166 } |
| 179 MaybeSendWindowUpdate(); | 167 MaybeSendWindowUpdate(); |
| 180 } | 168 } |
| 181 | 169 |
| 182 return accepted; | 170 return accepted; |
| 183 } | 171 } |
| 184 | 172 |
| 185 void ReliableQuicStream::MaybeSendWindowUpdate() { | 173 void ReliableQuicStream::MaybeSendWindowUpdate() { |
| 186 if (!IsFlowControlEnabled()) { | 174 if (version() >= QUIC_VERSION_17) { |
| 187 return; | 175 flow_controller_.MaybeSendWindowUpdate(session()->connection()); |
| 188 } | |
| 189 | |
| 190 // Send WindowUpdate to increase receive window if | |
| 191 // (receive window offset - consumed bytes) < (max window / 2). | |
| 192 // This is behaviour copied from SPDY. | |
| 193 size_t consumed_window = flow_control_receive_window_offset_bytes_ - | |
| 194 sequencer_.num_bytes_consumed(); | |
| 195 size_t threshold = (max_flow_control_receive_window_bytes_ / 2); | |
| 196 if (consumed_window < threshold) { | |
| 197 // Update our receive window. | |
| 198 flow_control_receive_window_offset_bytes_ += | |
| 199 (max_flow_control_receive_window_bytes_ - consumed_window); | |
| 200 DVLOG(1) << ENDPOINT << "Stream: " << id() | |
| 201 << ", sending WindowUpdate frame. " | |
| 202 << "Consumed bytes: " << sequencer_.num_bytes_consumed() | |
| 203 << ", Receive window offset: " | |
| 204 << flow_control_receive_window_offset_bytes_ | |
| 205 << ", Consumed window: " << consumed_window | |
| 206 << ", and threshold: " << threshold | |
| 207 << ". New receive window offset is: " | |
| 208 << flow_control_receive_window_offset_bytes_; | |
| 209 | |
| 210 // Inform the peer of our new receive window. | |
| 211 session()->connection()->SendWindowUpdate( | |
| 212 id(), flow_control_receive_window_offset_bytes_); | |
| 213 } | 176 } |
| 214 } | 177 } |
| 215 | 178 |
| 216 int ReliableQuicStream::num_frames_received() { | 179 int ReliableQuicStream::num_frames_received() { |
| 217 return sequencer_.num_frames_received(); | 180 return sequencer_.num_frames_received(); |
| 218 } | 181 } |
| 219 | 182 |
| 220 int ReliableQuicStream::num_duplicate_frames_received() { | 183 int ReliableQuicStream::num_duplicate_frames_received() { |
| 221 return sequencer_.num_duplicate_frames_received(); | 184 return sequencer_.num_duplicate_frames_received(); |
| 222 } | 185 } |
| (...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 347 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | 310 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { |
| 348 if (write_side_closed_) { | 311 if (write_side_closed_) { |
| 349 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | 312 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; |
| 350 return QuicConsumedData(0, false); | 313 return QuicConsumedData(0, false); |
| 351 } | 314 } |
| 352 | 315 |
| 353 // How much data we want to write. | 316 // How much data we want to write. |
| 354 size_t write_length = TotalIovecLength(iov, iov_count); | 317 size_t write_length = TotalIovecLength(iov, iov_count); |
| 355 | 318 |
| 356 // How much data we are allowed to write from flow control. | 319 // How much data we are allowed to write from flow control. |
| 357 size_t send_window = SendWindowSize(); | 320 size_t send_window = flow_controller_.SendWindowSize(); |
| 358 | 321 |
| 359 // A FIN with zero data payload should not be flow control blocked. | 322 // A FIN with zero data payload should not be flow control blocked. |
| 360 bool fin_with_zero_data = (fin && write_length == 0); | 323 bool fin_with_zero_data = (fin && write_length == 0); |
| 361 | 324 |
| 362 if (IsFlowControlEnabled()) { | 325 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
| 363 if (send_window == 0 && !fin_with_zero_data) { | 326 if (send_window == 0 && !fin_with_zero_data) { |
| 364 // Quick return if we can't send anything. | 327 // Quick return if we can't send anything. |
| 365 session()->connection()->SendBlocked(id()); | 328 flow_controller_.MaybeSendBlocked(session()->connection()); |
| 366 return QuicConsumedData(0, false); | 329 return QuicConsumedData(0, false); |
| 367 } | 330 } |
| 368 | 331 |
| 369 if (write_length > send_window) { | 332 if (write_length > send_window) { |
| 370 // Don't send the FIN if we aren't going to send all the data. | 333 // Don't send the FIN if we aren't going to send all the data. |
| 371 fin = false; | 334 fin = false; |
| 372 | 335 |
| 373 // Writing more data would be a violation of flow control. | 336 // Writing more data would be a violation of flow control. |
| 374 write_length = send_window; | 337 write_length = send_window; |
| 375 } | 338 } |
| 376 } | 339 } |
| 377 | 340 |
| 378 // Fill an IOVector with bytes from the iovec. | 341 // Fill an IOVector with bytes from the iovec. |
| 379 IOVector data; | 342 IOVector data; |
| 380 data.AppendIovecAtMostBytes(iov, iov_count, write_length); | 343 data.AppendIovecAtMostBytes(iov, iov_count, write_length); |
| 381 | 344 |
| 382 QuicConsumedData consumed_data = session()->WritevData( | 345 QuicConsumedData consumed_data = session()->WritevData( |
| 383 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); | 346 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); |
| 384 stream_bytes_written_ += consumed_data.bytes_consumed; | 347 stream_bytes_written_ += consumed_data.bytes_consumed; |
| 385 | 348 |
| 349 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
| 350 flow_controller_.AddBytesSent(consumed_data.bytes_consumed); |
| 351 } |
| 352 |
| 386 if (consumed_data.bytes_consumed == write_length) { | 353 if (consumed_data.bytes_consumed == write_length) { |
| 387 if (IsFlowControlEnabled() && write_length == send_window && | 354 if (!fin_with_zero_data) { |
| 388 !fin_with_zero_data) { | 355 if (version() >= QUIC_VERSION_17) { |
| 389 DVLOG(1) << ENDPOINT << "Stream " << id() | 356 flow_controller_.MaybeSendBlocked(session()->connection()); |
| 390 << " is flow control blocked. " | 357 } |
| 391 << "Send window: " << send_window | |
| 392 << ", stream_bytes_written: " << stream_bytes_written_ | |
| 393 << ", flow_control_send_limit: " | |
| 394 << flow_control_send_limit_; | |
| 395 // The entire send_window has been consumed, we are now flow control | |
| 396 // blocked. | |
| 397 session()->connection()->SendBlocked(id()); | |
| 398 } | 358 } |
| 399 if (fin && consumed_data.fin_consumed) { | 359 if (fin && consumed_data.fin_consumed) { |
| 400 fin_sent_ = true; | 360 fin_sent_ = true; |
| 401 CloseWriteSide(); | 361 CloseWriteSide(); |
| 402 } else if (fin && !consumed_data.fin_consumed) { | 362 } else if (fin && !consumed_data.fin_consumed) { |
| 403 session_->MarkWriteBlocked(id(), EffectivePriority()); | 363 session_->MarkWriteBlocked(id(), EffectivePriority()); |
| 404 } | 364 } |
| 405 } else { | 365 } else { |
| 406 session_->MarkWriteBlocked(id(), EffectivePriority()); | 366 session_->MarkWriteBlocked(id(), EffectivePriority()); |
| 407 } | 367 } |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 448 // written on this stream before termination. Done here if needed, using a | 408 // written on this stream before termination. Done here if needed, using a |
| 449 // RST frame. | 409 // RST frame. |
| 450 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); | 410 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); |
| 451 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); | 411 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); |
| 452 rst_sent_ = true; | 412 rst_sent_ = true; |
| 453 } | 413 } |
| 454 } | 414 } |
| 455 | 415 |
| 456 void ReliableQuicStream::OnWindowUpdateFrame( | 416 void ReliableQuicStream::OnWindowUpdateFrame( |
| 457 const QuicWindowUpdateFrame& frame) { | 417 const QuicWindowUpdateFrame& frame) { |
| 458 if (!IsFlowControlEnabled()) { | 418 if (!flow_controller_.IsEnabled()) { |
| 459 DLOG(DFATAL) << "Flow control not enabled! " << version(); | 419 DLOG(DFATAL) << "Flow control not enabled! " << version(); |
| 460 return; | 420 return; |
| 461 } | 421 } |
| 462 | 422 |
| 463 DVLOG(1) << ENDPOINT | 423 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { |
| 464 << "OnWindowUpdateFrame for stream " << id() | 424 // We can write again! |
| 465 << " with byte offset " << frame.byte_offset | 425 // TODO(rjshade): This does not respect priorities (e.g. multiple |
| 466 << " , current offset: " << flow_control_send_limit_ << ")."; | 426 // outstanding POSTs are unblocked on arrival of |
| 467 | 427 // SHLO with initial window). |
| 468 UpdateFlowControlSendLimit(frame.byte_offset); | 428 OnCanWrite(); |
| 469 } | |
| 470 | |
| 471 void ReliableQuicStream::UpdateFlowControlSendLimit(QuicStreamOffset offset) { | |
| 472 if (offset <= flow_control_send_limit_) { | |
| 473 DVLOG(1) << ENDPOINT << "Stream " << id() | |
| 474 << ", not changing window, current: " << flow_control_send_limit_ | |
| 475 << " new: " << offset; | |
| 476 // No change to our send window. | |
| 477 return; | |
| 478 } | 429 } |
| 479 | |
| 480 DVLOG(1) << ENDPOINT << "Stream " << id() | |
| 481 << ", changing window, current: " << flow_control_send_limit_ | |
| 482 << " new: " << offset; | |
| 483 // Send window has increased. | |
| 484 flow_control_send_limit_ = offset; | |
| 485 | |
| 486 // We can write again! | |
| 487 // TODO(rjshade): This does not respect priorities (e.g. multiple outstanding | |
| 488 // POSTs are unblocked on arrival of SHLO with initial window). | |
| 489 OnCanWrite(); | |
| 490 } | |
| 491 | |
| 492 bool ReliableQuicStream::IsFlowControlBlocked() const { | |
| 493 return IsFlowControlEnabled() && SendWindowSize() == 0; | |
| 494 } | |
| 495 | |
| 496 uint64 ReliableQuicStream::SendWindowSize() const { | |
| 497 return flow_control_send_limit_ - stream_bytes_written(); | |
| 498 } | |
| 499 | |
| 500 uint64 ReliableQuicStream::TotalReceivedBytes() const { | |
| 501 return sequencer_.num_bytes_consumed() + sequencer_.num_bytes_buffered(); | |
| 502 } | 430 } |
| 503 | 431 |
| 504 } // namespace net | 432 } // namespace net |
| OLD | NEW |