Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(86)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 242453002: Pull out flow control functionality from ReliableQuicStream into a new (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "net/quic/iovector.h" 8 #include "net/quic/iovector.h"
9 #include "net/quic/quic_flow_controller.h"
9 #include "net/quic/quic_session.h" 10 #include "net/quic/quic_session.h"
10 #include "net/quic/quic_write_blocked_list.h" 11 #include "net/quic/quic_write_blocked_list.h"
11 12
12 using base::StringPiece; 13 using base::StringPiece;
13 using std::min; 14 using std::min;
14 15
15 namespace net { 16 namespace net {
16 17
17 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") 18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
18 19
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 } 110 }
110 111
111 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) 112 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
112 : sequencer_(this), 113 : sequencer_(this),
113 id_(id), 114 id_(id),
114 session_(session), 115 session_(session),
115 stream_bytes_read_(0), 116 stream_bytes_read_(0),
116 stream_bytes_written_(0), 117 stream_bytes_written_(0),
117 stream_error_(QUIC_STREAM_NO_ERROR), 118 stream_error_(QUIC_STREAM_NO_ERROR),
118 connection_error_(QUIC_NO_ERROR), 119 connection_error_(QUIC_NO_ERROR),
119 flow_control_send_limit_(
120 session_->config()->peer_initial_flow_control_window_bytes()),
121 max_flow_control_receive_window_bytes_(
122 session_->connection()->max_flow_control_receive_window_bytes()),
123 flow_control_receive_window_offset_bytes_(
124 session_->connection()->max_flow_control_receive_window_bytes()),
125 read_side_closed_(false), 120 read_side_closed_(false),
126 write_side_closed_(false), 121 write_side_closed_(false),
127 fin_buffered_(false), 122 fin_buffered_(false),
128 fin_sent_(false), 123 fin_sent_(false),
129 rst_sent_(false), 124 rst_sent_(false),
130 is_server_(session_->is_server()) { 125 is_server_(session_->is_server()),
131 DVLOG(1) << ENDPOINT << "Created stream " << id_ 126 flow_controller_(
132 << ", setting initial receive window to: " 127 id_,
133 << flow_control_receive_window_offset_bytes_ 128 is_server_,
134 << ", setting send window to: " << flow_control_send_limit_; 129 session_->config()->peer_initial_flow_control_window_bytes(),
130 session_->connection()->max_flow_control_receive_window_bytes(),
131 session_->connection()->max_flow_control_receive_window_bytes()) {
135 } 132 }
136 133
137 ReliableQuicStream::~ReliableQuicStream() { 134 ReliableQuicStream::~ReliableQuicStream() {
138 } 135 }
139 136
140 bool ReliableQuicStream::WillAcceptStreamFrame( 137 bool ReliableQuicStream::WillAcceptStreamFrame(
141 const QuicStreamFrame& frame) const { 138 const QuicStreamFrame& frame) const {
142 if (read_side_closed_) { 139 if (read_side_closed_) {
143 return true; 140 return true;
144 } 141 }
(...skipping 10 matching lines...) Expand all
155 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 152 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
156 // We don't want to be reading: blackhole the data. 153 // We don't want to be reading: blackhole the data.
157 return true; 154 return true;
158 } 155 }
159 156
160 // This count include duplicate data received. 157 // This count include duplicate data received.
161 stream_bytes_read_ += frame.data.TotalBufferSize(); 158 stream_bytes_read_ += frame.data.TotalBufferSize();
162 159
163 bool accepted = sequencer_.OnStreamFrame(frame); 160 bool accepted = sequencer_.OnStreamFrame(frame);
164 161
165 if (IsFlowControlEnabled()) { 162 if (version() >= QUIC_VERSION_17) {
166 if (flow_control_receive_window_offset_bytes_ < TotalReceivedBytes()) { 163 if (flow_controller_.FlowControlViolation()) {
167 // TODO(rjshade): Lower severity from DFATAL once we have established that
168 // flow control is working correctly.
169 LOG(DFATAL)
170 << ENDPOINT << "Flow control violation on stream: " << id()
171 << ", our receive offset is: "
172 << flow_control_receive_window_offset_bytes_
173 << ", we have consumed: " << sequencer_.num_bytes_consumed()
174 << ", we have buffered: " << sequencer_.num_bytes_buffered()
175 << ", total: " << TotalReceivedBytes();
176 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); 164 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR);
177 return false; 165 return false;
178 } 166 }
179 MaybeSendWindowUpdate(); 167 MaybeSendWindowUpdate();
180 } 168 }
181 169
182 return accepted; 170 return accepted;
183 } 171 }
184 172
185 void ReliableQuicStream::MaybeSendWindowUpdate() { 173 void ReliableQuicStream::MaybeSendWindowUpdate() {
186 if (!IsFlowControlEnabled()) { 174 if (version() >= QUIC_VERSION_17) {
187 return; 175 flow_controller_.MaybeSendWindowUpdate(session()->connection());
188 }
189
190 // Send WindowUpdate to increase receive window if
191 // (receive window offset - consumed bytes) < (max window / 2).
192 // This is behaviour copied from SPDY.
193 size_t consumed_window = flow_control_receive_window_offset_bytes_ -
194 sequencer_.num_bytes_consumed();
195 size_t threshold = (max_flow_control_receive_window_bytes_ / 2);
196 if (consumed_window < threshold) {
197 // Update our receive window.
198 flow_control_receive_window_offset_bytes_ +=
199 (max_flow_control_receive_window_bytes_ - consumed_window);
200 DVLOG(1) << ENDPOINT << "Stream: " << id()
201 << ", sending WindowUpdate frame. "
202 << "Consumed bytes: " << sequencer_.num_bytes_consumed()
203 << ", Receive window offset: "
204 << flow_control_receive_window_offset_bytes_
205 << ", Consumed window: " << consumed_window
206 << ", and threshold: " << threshold
207 << ". New receive window offset is: "
208 << flow_control_receive_window_offset_bytes_;
209
210 // Inform the peer of our new receive window.
211 session()->connection()->SendWindowUpdate(
212 id(), flow_control_receive_window_offset_bytes_);
213 } 176 }
214 } 177 }
215 178
216 int ReliableQuicStream::num_frames_received() { 179 int ReliableQuicStream::num_frames_received() {
217 return sequencer_.num_frames_received(); 180 return sequencer_.num_frames_received();
218 } 181 }
219 182
220 int ReliableQuicStream::num_duplicate_frames_received() { 183 int ReliableQuicStream::num_duplicate_frames_received() {
221 return sequencer_.num_duplicate_frames_received(); 184 return sequencer_.num_duplicate_frames_received();
222 } 185 }
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
347 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 310 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
348 if (write_side_closed_) { 311 if (write_side_closed_) {
349 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 312 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
350 return QuicConsumedData(0, false); 313 return QuicConsumedData(0, false);
351 } 314 }
352 315
353 // How much data we want to write. 316 // How much data we want to write.
354 size_t write_length = TotalIovecLength(iov, iov_count); 317 size_t write_length = TotalIovecLength(iov, iov_count);
355 318
356 // How much data we are allowed to write from flow control. 319 // How much data we are allowed to write from flow control.
357 size_t send_window = SendWindowSize(); 320 size_t send_window = flow_controller_.SendWindowSize();
358 321
359 // A FIN with zero data payload should not be flow control blocked. 322 // A FIN with zero data payload should not be flow control blocked.
360 bool fin_with_zero_data = (fin && write_length == 0); 323 bool fin_with_zero_data = (fin && write_length == 0);
361 324
362 if (IsFlowControlEnabled()) { 325 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) {
363 if (send_window == 0 && !fin_with_zero_data) { 326 if (send_window == 0 && !fin_with_zero_data) {
364 // Quick return if we can't send anything. 327 // Quick return if we can't send anything.
365 session()->connection()->SendBlocked(id()); 328 flow_controller_.MaybeSendBlocked(session()->connection());
366 return QuicConsumedData(0, false); 329 return QuicConsumedData(0, false);
367 } 330 }
368 331
369 if (write_length > send_window) { 332 if (write_length > send_window) {
370 // Don't send the FIN if we aren't going to send all the data. 333 // Don't send the FIN if we aren't going to send all the data.
371 fin = false; 334 fin = false;
372 335
373 // Writing more data would be a violation of flow control. 336 // Writing more data would be a violation of flow control.
374 write_length = send_window; 337 write_length = send_window;
375 } 338 }
376 } 339 }
377 340
378 // Fill an IOVector with bytes from the iovec. 341 // Fill an IOVector with bytes from the iovec.
379 IOVector data; 342 IOVector data;
380 data.AppendIovecAtMostBytes(iov, iov_count, write_length); 343 data.AppendIovecAtMostBytes(iov, iov_count, write_length);
381 344
382 QuicConsumedData consumed_data = session()->WritevData( 345 QuicConsumedData consumed_data = session()->WritevData(
383 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); 346 id(), data, stream_bytes_written_, fin, ack_notifier_delegate);
384 stream_bytes_written_ += consumed_data.bytes_consumed; 347 stream_bytes_written_ += consumed_data.bytes_consumed;
385 348
349 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) {
350 flow_controller_.AddBytesSent(consumed_data.bytes_consumed);
351 }
352
386 if (consumed_data.bytes_consumed == write_length) { 353 if (consumed_data.bytes_consumed == write_length) {
387 if (IsFlowControlEnabled() && write_length == send_window && 354 if (!fin_with_zero_data) {
388 !fin_with_zero_data) { 355 if (version() >= QUIC_VERSION_17) {
389 DVLOG(1) << ENDPOINT << "Stream " << id() 356 flow_controller_.MaybeSendBlocked(session()->connection());
390 << " is flow control blocked. " 357 }
391 << "Send window: " << send_window
392 << ", stream_bytes_written: " << stream_bytes_written_
393 << ", flow_control_send_limit: "
394 << flow_control_send_limit_;
395 // The entire send_window has been consumed, we are now flow control
396 // blocked.
397 session()->connection()->SendBlocked(id());
398 } 358 }
399 if (fin && consumed_data.fin_consumed) { 359 if (fin && consumed_data.fin_consumed) {
400 fin_sent_ = true; 360 fin_sent_ = true;
401 CloseWriteSide(); 361 CloseWriteSide();
402 } else if (fin && !consumed_data.fin_consumed) { 362 } else if (fin && !consumed_data.fin_consumed) {
403 session_->MarkWriteBlocked(id(), EffectivePriority()); 363 session_->MarkWriteBlocked(id(), EffectivePriority());
404 } 364 }
405 } else { 365 } else {
406 session_->MarkWriteBlocked(id(), EffectivePriority()); 366 session_->MarkWriteBlocked(id(), EffectivePriority());
407 } 367 }
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
448 // written on this stream before termination. Done here if needed, using a 408 // written on this stream before termination. Done here if needed, using a
449 // RST frame. 409 // RST frame.
450 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 410 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
451 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); 411 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_);
452 rst_sent_ = true; 412 rst_sent_ = true;
453 } 413 }
454 } 414 }
455 415
456 void ReliableQuicStream::OnWindowUpdateFrame( 416 void ReliableQuicStream::OnWindowUpdateFrame(
457 const QuicWindowUpdateFrame& frame) { 417 const QuicWindowUpdateFrame& frame) {
458 if (!IsFlowControlEnabled()) { 418 if (!flow_controller_.IsEnabled()) {
459 DLOG(DFATAL) << "Flow control not enabled! " << version(); 419 DLOG(DFATAL) << "Flow control not enabled! " << version();
460 return; 420 return;
461 } 421 }
462 422
463 DVLOG(1) << ENDPOINT 423 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
464 << "OnWindowUpdateFrame for stream " << id() 424 // We can write again!
465 << " with byte offset " << frame.byte_offset 425 // TODO(rjshade): This does not respect priorities (e.g. multiple
466 << " , current offset: " << flow_control_send_limit_ << ")."; 426 // outstanding POSTs are unblocked on arrival of
467 427 // SHLO with initial window).
468 UpdateFlowControlSendLimit(frame.byte_offset); 428 OnCanWrite();
469 }
470
471 void ReliableQuicStream::UpdateFlowControlSendLimit(QuicStreamOffset offset) {
472 if (offset <= flow_control_send_limit_) {
473 DVLOG(1) << ENDPOINT << "Stream " << id()
474 << ", not changing window, current: " << flow_control_send_limit_
475 << " new: " << offset;
476 // No change to our send window.
477 return;
478 } 429 }
479
480 DVLOG(1) << ENDPOINT << "Stream " << id()
481 << ", changing window, current: " << flow_control_send_limit_
482 << " new: " << offset;
483 // Send window has increased.
484 flow_control_send_limit_ = offset;
485
486 // We can write again!
487 // TODO(rjshade): This does not respect priorities (e.g. multiple outstanding
488 // POSTs are unblocked on arrival of SHLO with initial window).
489 OnCanWrite();
490 }
491
492 bool ReliableQuicStream::IsFlowControlBlocked() const {
493 return IsFlowControlEnabled() && SendWindowSize() == 0;
494 }
495
496 uint64 ReliableQuicStream::SendWindowSize() const {
497 return flow_control_send_limit_ - stream_bytes_written();
498 }
499
500 uint64 ReliableQuicStream::TotalReceivedBytes() const {
501 return sequencer_.num_bytes_consumed() + sequencer_.num_bytes_buffered();
502 } 430 }
503 431
504 } // namespace net 432 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698