OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "net/quic/iovector.h" | 8 #include "net/quic/iovector.h" |
| 9 #include "net/quic/quic_flow_controller.h" |
9 #include "net/quic/quic_session.h" | 10 #include "net/quic/quic_session.h" |
10 #include "net/quic/quic_write_blocked_list.h" | 11 #include "net/quic/quic_write_blocked_list.h" |
11 | 12 |
12 using base::StringPiece; | 13 using base::StringPiece; |
13 using std::min; | 14 using std::min; |
14 | 15 |
15 namespace net { | 16 namespace net { |
16 | 17 |
17 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") | 18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
18 | 19 |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
109 } | 110 } |
110 | 111 |
111 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | 112 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) |
112 : sequencer_(this), | 113 : sequencer_(this), |
113 id_(id), | 114 id_(id), |
114 session_(session), | 115 session_(session), |
115 stream_bytes_read_(0), | 116 stream_bytes_read_(0), |
116 stream_bytes_written_(0), | 117 stream_bytes_written_(0), |
117 stream_error_(QUIC_STREAM_NO_ERROR), | 118 stream_error_(QUIC_STREAM_NO_ERROR), |
118 connection_error_(QUIC_NO_ERROR), | 119 connection_error_(QUIC_NO_ERROR), |
119 flow_control_send_limit_( | |
120 session_->config()->peer_initial_flow_control_window_bytes()), | |
121 max_flow_control_receive_window_bytes_( | |
122 session_->connection()->max_flow_control_receive_window_bytes()), | |
123 flow_control_receive_window_offset_bytes_( | |
124 session_->connection()->max_flow_control_receive_window_bytes()), | |
125 read_side_closed_(false), | 120 read_side_closed_(false), |
126 write_side_closed_(false), | 121 write_side_closed_(false), |
127 fin_buffered_(false), | 122 fin_buffered_(false), |
128 fin_sent_(false), | 123 fin_sent_(false), |
129 rst_sent_(false), | 124 rst_sent_(false), |
130 is_server_(session_->is_server()) { | 125 is_server_(session_->is_server()), |
131 DVLOG(1) << ENDPOINT << "Created stream " << id_ | 126 flow_controller_( |
132 << ", setting initial receive window to: " | 127 id_, |
133 << flow_control_receive_window_offset_bytes_ | 128 is_server_, |
134 << ", setting send window to: " << flow_control_send_limit_; | 129 session_->config()->peer_initial_flow_control_window_bytes(), |
| 130 session_->connection()->max_flow_control_receive_window_bytes(), |
| 131 session_->connection()->max_flow_control_receive_window_bytes()) { |
135 } | 132 } |
136 | 133 |
137 ReliableQuicStream::~ReliableQuicStream() { | 134 ReliableQuicStream::~ReliableQuicStream() { |
138 } | 135 } |
139 | 136 |
140 bool ReliableQuicStream::WillAcceptStreamFrame( | 137 bool ReliableQuicStream::WillAcceptStreamFrame( |
141 const QuicStreamFrame& frame) const { | 138 const QuicStreamFrame& frame) const { |
142 if (read_side_closed_) { | 139 if (read_side_closed_) { |
143 return true; | 140 return true; |
144 } | 141 } |
(...skipping 10 matching lines...) Expand all Loading... |
155 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; | 152 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; |
156 // We don't want to be reading: blackhole the data. | 153 // We don't want to be reading: blackhole the data. |
157 return true; | 154 return true; |
158 } | 155 } |
159 | 156 |
160 // This count include duplicate data received. | 157 // This count include duplicate data received. |
161 stream_bytes_read_ += frame.data.TotalBufferSize(); | 158 stream_bytes_read_ += frame.data.TotalBufferSize(); |
162 | 159 |
163 bool accepted = sequencer_.OnStreamFrame(frame); | 160 bool accepted = sequencer_.OnStreamFrame(frame); |
164 | 161 |
165 if (IsFlowControlEnabled()) { | 162 if (version() >= QUIC_VERSION_17) { |
166 if (flow_control_receive_window_offset_bytes_ < TotalReceivedBytes()) { | 163 if (flow_controller_.FlowControlViolation()) { |
167 // TODO(rjshade): Lower severity from DFATAL once we have established that | |
168 // flow control is working correctly. | |
169 LOG(DFATAL) | |
170 << ENDPOINT << "Flow control violation on stream: " << id() | |
171 << ", our receive offset is: " | |
172 << flow_control_receive_window_offset_bytes_ | |
173 << ", we have consumed: " << sequencer_.num_bytes_consumed() | |
174 << ", we have buffered: " << sequencer_.num_bytes_buffered() | |
175 << ", total: " << TotalReceivedBytes(); | |
176 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); | 164 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); |
177 return false; | 165 return false; |
178 } | 166 } |
179 MaybeSendWindowUpdate(); | 167 MaybeSendWindowUpdate(); |
180 } | 168 } |
181 | 169 |
182 return accepted; | 170 return accepted; |
183 } | 171 } |
184 | 172 |
185 void ReliableQuicStream::MaybeSendWindowUpdate() { | 173 void ReliableQuicStream::MaybeSendWindowUpdate() { |
186 if (!IsFlowControlEnabled()) { | 174 if (version() >= QUIC_VERSION_17) { |
187 return; | 175 flow_controller_.MaybeSendWindowUpdate(session()->connection()); |
188 } | |
189 | |
190 // Send WindowUpdate to increase receive window if | |
191 // (receive window offset - consumed bytes) < (max window / 2). | |
192 // This is behaviour copied from SPDY. | |
193 size_t consumed_window = flow_control_receive_window_offset_bytes_ - | |
194 sequencer_.num_bytes_consumed(); | |
195 size_t threshold = (max_flow_control_receive_window_bytes_ / 2); | |
196 if (consumed_window < threshold) { | |
197 // Update our receive window. | |
198 flow_control_receive_window_offset_bytes_ += | |
199 (max_flow_control_receive_window_bytes_ - consumed_window); | |
200 DVLOG(1) << ENDPOINT << "Stream: " << id() | |
201 << ", sending WindowUpdate frame. " | |
202 << "Consumed bytes: " << sequencer_.num_bytes_consumed() | |
203 << ", Receive window offset: " | |
204 << flow_control_receive_window_offset_bytes_ | |
205 << ", Consumed window: " << consumed_window | |
206 << ", and threshold: " << threshold | |
207 << ". New receive window offset is: " | |
208 << flow_control_receive_window_offset_bytes_; | |
209 | |
210 // Inform the peer of our new receive window. | |
211 session()->connection()->SendWindowUpdate( | |
212 id(), flow_control_receive_window_offset_bytes_); | |
213 } | 176 } |
214 } | 177 } |
215 | 178 |
216 int ReliableQuicStream::num_frames_received() { | 179 int ReliableQuicStream::num_frames_received() { |
217 return sequencer_.num_frames_received(); | 180 return sequencer_.num_frames_received(); |
218 } | 181 } |
219 | 182 |
220 int ReliableQuicStream::num_duplicate_frames_received() { | 183 int ReliableQuicStream::num_duplicate_frames_received() { |
221 return sequencer_.num_duplicate_frames_received(); | 184 return sequencer_.num_duplicate_frames_received(); |
222 } | 185 } |
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
347 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | 310 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { |
348 if (write_side_closed_) { | 311 if (write_side_closed_) { |
349 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | 312 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; |
350 return QuicConsumedData(0, false); | 313 return QuicConsumedData(0, false); |
351 } | 314 } |
352 | 315 |
353 // How much data we want to write. | 316 // How much data we want to write. |
354 size_t write_length = TotalIovecLength(iov, iov_count); | 317 size_t write_length = TotalIovecLength(iov, iov_count); |
355 | 318 |
356 // How much data we are allowed to write from flow control. | 319 // How much data we are allowed to write from flow control. |
357 size_t send_window = SendWindowSize(); | 320 size_t send_window = flow_controller_.SendWindowSize(); |
358 | 321 |
359 // A FIN with zero data payload should not be flow control blocked. | 322 // A FIN with zero data payload should not be flow control blocked. |
360 bool fin_with_zero_data = (fin && write_length == 0); | 323 bool fin_with_zero_data = (fin && write_length == 0); |
361 | 324 |
362 if (IsFlowControlEnabled()) { | 325 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
363 if (send_window == 0 && !fin_with_zero_data) { | 326 if (send_window == 0 && !fin_with_zero_data) { |
364 // Quick return if we can't send anything. | 327 // Quick return if we can't send anything. |
365 session()->connection()->SendBlocked(id()); | 328 flow_controller_.MaybeSendBlocked(session()->connection()); |
366 return QuicConsumedData(0, false); | 329 return QuicConsumedData(0, false); |
367 } | 330 } |
368 | 331 |
369 if (write_length > send_window) { | 332 if (write_length > send_window) { |
370 // Don't send the FIN if we aren't going to send all the data. | 333 // Don't send the FIN if we aren't going to send all the data. |
371 fin = false; | 334 fin = false; |
372 | 335 |
373 // Writing more data would be a violation of flow control. | 336 // Writing more data would be a violation of flow control. |
374 write_length = send_window; | 337 write_length = send_window; |
375 } | 338 } |
376 } | 339 } |
377 | 340 |
378 // Fill an IOVector with bytes from the iovec. | 341 // Fill an IOVector with bytes from the iovec. |
379 IOVector data; | 342 IOVector data; |
380 data.AppendIovecAtMostBytes(iov, iov_count, write_length); | 343 data.AppendIovecAtMostBytes(iov, iov_count, write_length); |
381 | 344 |
382 QuicConsumedData consumed_data = session()->WritevData( | 345 QuicConsumedData consumed_data = session()->WritevData( |
383 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); | 346 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); |
384 stream_bytes_written_ += consumed_data.bytes_consumed; | 347 stream_bytes_written_ += consumed_data.bytes_consumed; |
385 | 348 |
| 349 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
| 350 flow_controller_.AddBytesSent(consumed_data.bytes_consumed); |
| 351 } |
| 352 |
386 if (consumed_data.bytes_consumed == write_length) { | 353 if (consumed_data.bytes_consumed == write_length) { |
387 if (IsFlowControlEnabled() && write_length == send_window && | 354 if (!fin_with_zero_data) { |
388 !fin_with_zero_data) { | 355 if (version() >= QUIC_VERSION_17) { |
389 DVLOG(1) << ENDPOINT << "Stream " << id() | 356 flow_controller_.MaybeSendBlocked(session()->connection()); |
390 << " is flow control blocked. " | 357 } |
391 << "Send window: " << send_window | |
392 << ", stream_bytes_written: " << stream_bytes_written_ | |
393 << ", flow_control_send_limit: " | |
394 << flow_control_send_limit_; | |
395 // The entire send_window has been consumed, we are now flow control | |
396 // blocked. | |
397 session()->connection()->SendBlocked(id()); | |
398 } | 358 } |
399 if (fin && consumed_data.fin_consumed) { | 359 if (fin && consumed_data.fin_consumed) { |
400 fin_sent_ = true; | 360 fin_sent_ = true; |
401 CloseWriteSide(); | 361 CloseWriteSide(); |
402 } else if (fin && !consumed_data.fin_consumed) { | 362 } else if (fin && !consumed_data.fin_consumed) { |
403 session_->MarkWriteBlocked(id(), EffectivePriority()); | 363 session_->MarkWriteBlocked(id(), EffectivePriority()); |
404 } | 364 } |
405 } else { | 365 } else { |
406 session_->MarkWriteBlocked(id(), EffectivePriority()); | 366 session_->MarkWriteBlocked(id(), EffectivePriority()); |
407 } | 367 } |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
448 // written on this stream before termination. Done here if needed, using a | 408 // written on this stream before termination. Done here if needed, using a |
449 // RST frame. | 409 // RST frame. |
450 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); | 410 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); |
451 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); | 411 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); |
452 rst_sent_ = true; | 412 rst_sent_ = true; |
453 } | 413 } |
454 } | 414 } |
455 | 415 |
456 void ReliableQuicStream::OnWindowUpdateFrame( | 416 void ReliableQuicStream::OnWindowUpdateFrame( |
457 const QuicWindowUpdateFrame& frame) { | 417 const QuicWindowUpdateFrame& frame) { |
458 if (!IsFlowControlEnabled()) { | 418 if (!flow_controller_.IsEnabled()) { |
459 DLOG(DFATAL) << "Flow control not enabled! " << version(); | 419 DLOG(DFATAL) << "Flow control not enabled! " << version(); |
460 return; | 420 return; |
461 } | 421 } |
462 | 422 |
463 DVLOG(1) << ENDPOINT | 423 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { |
464 << "OnWindowUpdateFrame for stream " << id() | 424 // We can write again! |
465 << " with byte offset " << frame.byte_offset | 425 // TODO(rjshade): This does not respect priorities (e.g. multiple |
466 << " , current offset: " << flow_control_send_limit_ << ")."; | 426 // outstanding POSTs are unblocked on arrival of |
467 | 427 // SHLO with initial window). |
468 UpdateFlowControlSendLimit(frame.byte_offset); | 428 OnCanWrite(); |
469 } | |
470 | |
471 void ReliableQuicStream::UpdateFlowControlSendLimit(QuicStreamOffset offset) { | |
472 if (offset <= flow_control_send_limit_) { | |
473 DVLOG(1) << ENDPOINT << "Stream " << id() | |
474 << ", not changing window, current: " << flow_control_send_limit_ | |
475 << " new: " << offset; | |
476 // No change to our send window. | |
477 return; | |
478 } | 429 } |
479 | |
480 DVLOG(1) << ENDPOINT << "Stream " << id() | |
481 << ", changing window, current: " << flow_control_send_limit_ | |
482 << " new: " << offset; | |
483 // Send window has increased. | |
484 flow_control_send_limit_ = offset; | |
485 | |
486 // We can write again! | |
487 // TODO(rjshade): This does not respect priorities (e.g. multiple outstanding | |
488 // POSTs are unblocked on arrival of SHLO with initial window). | |
489 OnCanWrite(); | |
490 } | |
491 | |
492 bool ReliableQuicStream::IsFlowControlBlocked() const { | |
493 return IsFlowControlEnabled() && SendWindowSize() == 0; | |
494 } | |
495 | |
496 uint64 ReliableQuicStream::SendWindowSize() const { | |
497 return flow_control_send_limit_ - stream_bytes_written(); | |
498 } | |
499 | |
500 uint64 ReliableQuicStream::TotalReceivedBytes() const { | |
501 return sequencer_.num_bytes_consumed() + sequencer_.num_bytes_buffered(); | |
502 } | 430 } |
503 | 431 |
504 } // namespace net | 432 } // namespace net |
OLD | NEW |