OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
6 | 6 |
7 #include "base/logging.h" | 7 #include "base/logging.h" |
8 #include "net/quic/iovector.h" | 8 #include "net/quic/iovector.h" |
| 9 #include "net/quic/quic_flow_controller.h" |
9 #include "net/quic/quic_session.h" | 10 #include "net/quic/quic_session.h" |
10 #include "net/quic/quic_write_blocked_list.h" | 11 #include "net/quic/quic_write_blocked_list.h" |
11 | 12 |
12 using base::StringPiece; | 13 using base::StringPiece; |
13 using std::min; | 14 using std::min; |
14 | 15 |
15 namespace net { | 16 namespace net { |
16 | 17 |
17 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") | 18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
18 | 19 |
(...skipping 24 matching lines...) Expand all Loading... |
43 wrote_last_data_(false), | 44 wrote_last_data_(false), |
44 num_original_packets_(0), | 45 num_original_packets_(0), |
45 num_original_bytes_(0), | 46 num_original_bytes_(0), |
46 num_retransmitted_packets_(0), | 47 num_retransmitted_packets_(0), |
47 num_retransmitted_bytes_(0) { | 48 num_retransmitted_bytes_(0) { |
48 } | 49 } |
49 | 50 |
50 virtual void OnAckNotification(int num_original_packets, | 51 virtual void OnAckNotification(int num_original_packets, |
51 int num_original_bytes, | 52 int num_original_bytes, |
52 int num_retransmitted_packets, | 53 int num_retransmitted_packets, |
53 int num_retransmitted_bytes) OVERRIDE { | 54 int num_retransmitted_bytes, |
| 55 QuicTime::Delta delta_largest_observed) |
| 56 OVERRIDE { |
54 DCHECK_LT(0, pending_acks_); | 57 DCHECK_LT(0, pending_acks_); |
55 --pending_acks_; | 58 --pending_acks_; |
56 num_original_packets_ += num_original_packets; | 59 num_original_packets_ += num_original_packets; |
57 num_original_bytes_ += num_original_bytes; | 60 num_original_bytes_ += num_original_bytes; |
58 num_retransmitted_packets_ += num_retransmitted_packets; | 61 num_retransmitted_packets_ += num_retransmitted_packets; |
59 num_retransmitted_bytes_ += num_retransmitted_bytes; | 62 num_retransmitted_bytes_ += num_retransmitted_bytes; |
60 | 63 |
61 if (wrote_last_data_ && pending_acks_ == 0) { | 64 if (wrote_last_data_ && pending_acks_ == 0) { |
62 delegate_->OnAckNotification(num_original_packets_, | 65 delegate_->OnAckNotification(num_original_packets_, |
63 num_original_bytes_, | 66 num_original_bytes_, |
64 num_retransmitted_packets_, | 67 num_retransmitted_packets_, |
65 num_retransmitted_bytes_); | 68 num_retransmitted_bytes_, |
| 69 delta_largest_observed); |
66 } | 70 } |
67 } | 71 } |
68 | 72 |
69 void WroteData(bool last_data) { | 73 void WroteData(bool last_data) { |
70 DCHECK(!wrote_last_data_); | 74 DCHECK(!wrote_last_data_); |
71 ++pending_acks_; | 75 ++pending_acks_; |
72 wrote_last_data_ = last_data; | 76 wrote_last_data_ = last_data; |
73 } | 77 } |
74 | 78 |
75 protected: | 79 protected: |
(...skipping 30 matching lines...) Expand all Loading... |
106 } | 110 } |
107 | 111 |
108 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | 112 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) |
109 : sequencer_(this), | 113 : sequencer_(this), |
110 id_(id), | 114 id_(id), |
111 session_(session), | 115 session_(session), |
112 stream_bytes_read_(0), | 116 stream_bytes_read_(0), |
113 stream_bytes_written_(0), | 117 stream_bytes_written_(0), |
114 stream_error_(QUIC_STREAM_NO_ERROR), | 118 stream_error_(QUIC_STREAM_NO_ERROR), |
115 connection_error_(QUIC_NO_ERROR), | 119 connection_error_(QUIC_NO_ERROR), |
116 flow_control_send_limit_( | |
117 session_->config()->peer_initial_flow_control_window_bytes()), | |
118 max_flow_control_receive_window_bytes_( | |
119 session_->connection()->max_flow_control_receive_window_bytes()), | |
120 flow_control_receive_window_offset_bytes_( | |
121 session_->connection()->max_flow_control_receive_window_bytes()), | |
122 read_side_closed_(false), | 120 read_side_closed_(false), |
123 write_side_closed_(false), | 121 write_side_closed_(false), |
124 fin_buffered_(false), | 122 fin_buffered_(false), |
125 fin_sent_(false), | 123 fin_sent_(false), |
126 rst_sent_(false), | 124 rst_sent_(false), |
127 is_server_(session_->is_server()) { | 125 is_server_(session_->is_server()), |
128 DVLOG(1) << ENDPOINT << "Created stream " << id_ | 126 flow_controller_( |
129 << ", setting initial receive window to: " | 127 id_, |
130 << flow_control_receive_window_offset_bytes_ | 128 is_server_, |
131 << ", setting send window to: " << flow_control_send_limit_; | 129 session_->config()->peer_initial_flow_control_window_bytes(), |
| 130 session_->connection()->max_flow_control_receive_window_bytes(), |
| 131 session_->connection()->max_flow_control_receive_window_bytes()) { |
| 132 if (session_->connection()->version() < QUIC_VERSION_17) { |
| 133 flow_controller_.Disable(); |
| 134 } |
132 } | 135 } |
133 | 136 |
134 ReliableQuicStream::~ReliableQuicStream() { | 137 ReliableQuicStream::~ReliableQuicStream() { |
135 } | 138 } |
136 | 139 |
137 bool ReliableQuicStream::WillAcceptStreamFrame( | 140 bool ReliableQuicStream::WillAcceptStreamFrame( |
138 const QuicStreamFrame& frame) const { | 141 const QuicStreamFrame& frame) const { |
139 if (read_side_closed_) { | 142 if (read_side_closed_) { |
140 return true; | 143 return true; |
141 } | 144 } |
(...skipping 10 matching lines...) Expand all Loading... |
152 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; | 155 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; |
153 // We don't want to be reading: blackhole the data. | 156 // We don't want to be reading: blackhole the data. |
154 return true; | 157 return true; |
155 } | 158 } |
156 | 159 |
157 // This count include duplicate data received. | 160 // This count include duplicate data received. |
158 stream_bytes_read_ += frame.data.TotalBufferSize(); | 161 stream_bytes_read_ += frame.data.TotalBufferSize(); |
159 | 162 |
160 bool accepted = sequencer_.OnStreamFrame(frame); | 163 bool accepted = sequencer_.OnStreamFrame(frame); |
161 | 164 |
162 if (IsFlowControlEnabled()) { | 165 if (version() >= QUIC_VERSION_17) { |
163 if (flow_control_receive_window_offset_bytes_ < TotalReceivedBytes()) { | 166 if (flow_controller_.FlowControlViolation()) { |
164 // TODO(rjshade): Lower severity from DFATAL once we have established that | |
165 // flow control is working correctly. | |
166 LOG(DFATAL) | |
167 << ENDPOINT << "Flow control violation on stream: " << id() | |
168 << ", our receive offset is: " | |
169 << flow_control_receive_window_offset_bytes_ | |
170 << ", we have consumed: " << sequencer_.num_bytes_consumed() | |
171 << ", we have buffered: " << sequencer_.num_bytes_buffered() | |
172 << ", total: " << TotalReceivedBytes(); | |
173 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); | 167 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); |
174 return false; | 168 return false; |
175 } | 169 } |
176 MaybeSendWindowUpdate(); | 170 MaybeSendWindowUpdate(); |
177 } | 171 } |
178 | 172 |
179 return accepted; | 173 return accepted; |
180 } | 174 } |
181 | 175 |
182 void ReliableQuicStream::MaybeSendWindowUpdate() { | 176 void ReliableQuicStream::MaybeSendWindowUpdate() { |
183 if (!IsFlowControlEnabled()) { | 177 if (version() >= QUIC_VERSION_17) { |
184 return; | 178 flow_controller_.MaybeSendWindowUpdate(session()->connection()); |
185 } | |
186 | |
187 // Send WindowUpdate to increase receive window if | |
188 // (receive window offset - consumed bytes) < (max window / 2). | |
189 // This is behaviour copied from SPDY. | |
190 size_t consumed_window = flow_control_receive_window_offset_bytes_ - | |
191 sequencer_.num_bytes_consumed(); | |
192 size_t threshold = (max_flow_control_receive_window_bytes_ / 2); | |
193 if (consumed_window < threshold) { | |
194 // Update our receive window. | |
195 flow_control_receive_window_offset_bytes_ += | |
196 (max_flow_control_receive_window_bytes_ - consumed_window); | |
197 DVLOG(1) << ENDPOINT << "Stream: " << id() | |
198 << ", sending WindowUpdate frame. " | |
199 << "Consumed bytes: " << sequencer_.num_bytes_consumed() | |
200 << ", Receive window offset: " | |
201 << flow_control_receive_window_offset_bytes_ | |
202 << ", Consumed window: " << consumed_window | |
203 << ", and threshold: " << threshold | |
204 << ". New receive window offset is: " | |
205 << flow_control_receive_window_offset_bytes_; | |
206 | |
207 // Inform the peer of our new receive window. | |
208 session()->connection()->SendWindowUpdate( | |
209 id(), flow_control_receive_window_offset_bytes_); | |
210 } | 179 } |
211 } | 180 } |
212 | 181 |
213 int ReliableQuicStream::num_frames_received() { | 182 int ReliableQuicStream::num_frames_received() { |
214 return sequencer_.num_frames_received(); | 183 return sequencer_.num_frames_received(); |
215 } | 184 } |
216 | 185 |
217 int ReliableQuicStream::num_duplicate_frames_received() { | 186 int ReliableQuicStream::num_duplicate_frames_received() { |
218 return sequencer_.num_duplicate_frames_received(); | 187 return sequencer_.num_duplicate_frames_received(); |
219 } | 188 } |
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
344 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | 313 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { |
345 if (write_side_closed_) { | 314 if (write_side_closed_) { |
346 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | 315 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; |
347 return QuicConsumedData(0, false); | 316 return QuicConsumedData(0, false); |
348 } | 317 } |
349 | 318 |
350 // How much data we want to write. | 319 // How much data we want to write. |
351 size_t write_length = TotalIovecLength(iov, iov_count); | 320 size_t write_length = TotalIovecLength(iov, iov_count); |
352 | 321 |
353 // How much data we are allowed to write from flow control. | 322 // How much data we are allowed to write from flow control. |
354 size_t send_window = SendWindowSize(); | 323 size_t send_window = flow_controller_.SendWindowSize(); |
355 | 324 |
356 // A FIN with zero data payload should not be flow control blocked. | 325 // A FIN with zero data payload should not be flow control blocked. |
357 bool fin_with_zero_data = (fin && write_length == 0); | 326 bool fin_with_zero_data = (fin && write_length == 0); |
358 | 327 |
359 if (IsFlowControlEnabled()) { | 328 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
360 if (send_window == 0 && !fin_with_zero_data) { | 329 if (send_window == 0 && !fin_with_zero_data) { |
361 // Quick return if we can't send anything. | 330 // Quick return if we can't send anything. |
362 session()->connection()->SendBlocked(id()); | 331 flow_controller_.MaybeSendBlocked(session()->connection()); |
363 return QuicConsumedData(0, false); | 332 return QuicConsumedData(0, false); |
364 } | 333 } |
365 | 334 |
366 if (write_length > send_window) { | 335 if (write_length > send_window) { |
367 // Don't send the FIN if we aren't going to send all the data. | 336 // Don't send the FIN if we aren't going to send all the data. |
368 fin = false; | 337 fin = false; |
369 | 338 |
370 // Writing more data would be a violation of flow control. | 339 // Writing more data would be a violation of flow control. |
371 write_length = send_window; | 340 write_length = send_window; |
372 } | 341 } |
373 } | 342 } |
374 | 343 |
375 // Fill an IOVector with bytes from the iovec. | 344 // Fill an IOVector with bytes from the iovec. |
376 IOVector data; | 345 IOVector data; |
377 data.AppendIovecAtMostBytes(iov, iov_count, write_length); | 346 data.AppendIovecAtMostBytes(iov, iov_count, write_length); |
378 | 347 |
379 QuicConsumedData consumed_data = session()->WritevData( | 348 QuicConsumedData consumed_data = session()->WritevData( |
380 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); | 349 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); |
381 stream_bytes_written_ += consumed_data.bytes_consumed; | 350 stream_bytes_written_ += consumed_data.bytes_consumed; |
382 | 351 |
| 352 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) { |
| 353 flow_controller_.AddBytesSent(consumed_data.bytes_consumed); |
| 354 } |
| 355 |
383 if (consumed_data.bytes_consumed == write_length) { | 356 if (consumed_data.bytes_consumed == write_length) { |
384 if (IsFlowControlEnabled() && write_length == send_window && | 357 if (!fin_with_zero_data) { |
385 !fin_with_zero_data) { | 358 if (version() >= QUIC_VERSION_17) { |
386 DVLOG(1) << ENDPOINT << "Stream " << id() | 359 flow_controller_.MaybeSendBlocked(session()->connection()); |
387 << " is flow control blocked. " | 360 } |
388 << "Send window: " << send_window | |
389 << ", stream_bytes_written: " << stream_bytes_written_ | |
390 << ", flow_control_send_limit: " | |
391 << flow_control_send_limit_; | |
392 // The entire send_window has been consumed, we are now flow control | |
393 // blocked. | |
394 session()->connection()->SendBlocked(id()); | |
395 } | 361 } |
396 if (fin && consumed_data.fin_consumed) { | 362 if (fin && consumed_data.fin_consumed) { |
397 fin_sent_ = true; | 363 fin_sent_ = true; |
398 CloseWriteSide(); | 364 CloseWriteSide(); |
399 } else if (fin && !consumed_data.fin_consumed) { | 365 } else if (fin && !consumed_data.fin_consumed) { |
400 session_->MarkWriteBlocked(id(), EffectivePriority()); | 366 session_->MarkWriteBlocked(id(), EffectivePriority()); |
401 } | 367 } |
402 } else { | 368 } else { |
403 session_->MarkWriteBlocked(id(), EffectivePriority()); | 369 session_->MarkWriteBlocked(id(), EffectivePriority()); |
404 } | 370 } |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
438 void ReliableQuicStream::OnClose() { | 404 void ReliableQuicStream::OnClose() { |
439 CloseReadSide(); | 405 CloseReadSide(); |
440 CloseWriteSide(); | 406 CloseWriteSide(); |
441 | 407 |
442 if (version() > QUIC_VERSION_13 && | 408 if (version() > QUIC_VERSION_13 && |
443 !fin_sent_ && !rst_sent_) { | 409 !fin_sent_ && !rst_sent_) { |
444 // For flow control accounting, we must tell the peer how many bytes we have | 410 // For flow control accounting, we must tell the peer how many bytes we have |
445 // written on this stream before termination. Done here if needed, using a | 411 // written on this stream before termination. Done here if needed, using a |
446 // RST frame. | 412 // RST frame. |
447 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); | 413 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); |
448 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); | 414 session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING, |
| 415 stream_bytes_written_); |
449 rst_sent_ = true; | 416 rst_sent_ = true; |
450 } | 417 } |
451 } | 418 } |
452 | 419 |
453 void ReliableQuicStream::OnWindowUpdateFrame( | 420 void ReliableQuicStream::OnWindowUpdateFrame( |
454 const QuicWindowUpdateFrame& frame) { | 421 const QuicWindowUpdateFrame& frame) { |
455 if (!IsFlowControlEnabled()) { | 422 if (!flow_controller_.IsEnabled()) { |
456 DLOG(DFATAL) << "Flow control not enabled! " << version(); | 423 DLOG(DFATAL) << "Flow control not enabled! " << version(); |
457 return; | 424 return; |
458 } | 425 } |
459 | 426 |
460 DVLOG(1) << ENDPOINT | 427 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { |
461 << "OnWindowUpdateFrame for stream " << id() | 428 // We can write again! |
462 << " with byte offset " << frame.byte_offset | 429 // TODO(rjshade): This does not respect priorities (e.g. multiple |
463 << " , current offset: " << flow_control_send_limit_ << ")."; | 430 // outstanding POSTs are unblocked on arrival of |
464 | 431 // SHLO with initial window). |
465 UpdateFlowControlSendLimit(frame.byte_offset); | 432 OnCanWrite(); |
466 } | |
467 | |
468 void ReliableQuicStream::UpdateFlowControlSendLimit(QuicStreamOffset offset) { | |
469 if (offset <= flow_control_send_limit_) { | |
470 DVLOG(1) << ENDPOINT << "Stream " << id() | |
471 << ", not changing window, current: " << flow_control_send_limit_ | |
472 << " new: " << offset; | |
473 // No change to our send window. | |
474 return; | |
475 } | 433 } |
476 | |
477 DVLOG(1) << ENDPOINT << "Stream " << id() | |
478 << ", changing window, current: " << flow_control_send_limit_ | |
479 << " new: " << offset; | |
480 // Send window has increased. | |
481 flow_control_send_limit_ = offset; | |
482 | |
483 // We can write again! | |
484 // TODO(rjshade): This does not respect priorities (e.g. multiple outstanding | |
485 // POSTs are unblocked on arrival of SHLO with initial window). | |
486 OnCanWrite(); | |
487 } | |
488 | |
489 bool ReliableQuicStream::IsFlowControlBlocked() const { | |
490 return IsFlowControlEnabled() && SendWindowSize() == 0; | |
491 } | |
492 | |
493 uint64 ReliableQuicStream::SendWindowSize() const { | |
494 return flow_control_send_limit_ - stream_bytes_written(); | |
495 } | |
496 | |
497 uint64 ReliableQuicStream::TotalReceivedBytes() const { | |
498 return sequencer_.num_bytes_consumed() + sequencer_.num_bytes_buffered(); | |
499 } | 434 } |
500 | 435 |
501 } // namespace net | 436 } // namespace net |
OLD | NEW |