Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(53)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 242593002: Land Recent QUIC Changes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Build fix. Use uint32 instead of int Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "net/quic/iovector.h" 8 #include "net/quic/iovector.h"
9 #include "net/quic/quic_flow_controller.h"
9 #include "net/quic/quic_session.h" 10 #include "net/quic/quic_session.h"
10 #include "net/quic/quic_write_blocked_list.h" 11 #include "net/quic/quic_write_blocked_list.h"
11 12
12 using base::StringPiece; 13 using base::StringPiece;
13 using std::min; 14 using std::min;
14 15
15 namespace net { 16 namespace net {
16 17
17 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") 18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
18 19
(...skipping 24 matching lines...) Expand all
43 wrote_last_data_(false), 44 wrote_last_data_(false),
44 num_original_packets_(0), 45 num_original_packets_(0),
45 num_original_bytes_(0), 46 num_original_bytes_(0),
46 num_retransmitted_packets_(0), 47 num_retransmitted_packets_(0),
47 num_retransmitted_bytes_(0) { 48 num_retransmitted_bytes_(0) {
48 } 49 }
49 50
50 virtual void OnAckNotification(int num_original_packets, 51 virtual void OnAckNotification(int num_original_packets,
51 int num_original_bytes, 52 int num_original_bytes,
52 int num_retransmitted_packets, 53 int num_retransmitted_packets,
53 int num_retransmitted_bytes) OVERRIDE { 54 int num_retransmitted_bytes,
55 QuicTime::Delta delta_largest_observed)
56 OVERRIDE {
54 DCHECK_LT(0, pending_acks_); 57 DCHECK_LT(0, pending_acks_);
55 --pending_acks_; 58 --pending_acks_;
56 num_original_packets_ += num_original_packets; 59 num_original_packets_ += num_original_packets;
57 num_original_bytes_ += num_original_bytes; 60 num_original_bytes_ += num_original_bytes;
58 num_retransmitted_packets_ += num_retransmitted_packets; 61 num_retransmitted_packets_ += num_retransmitted_packets;
59 num_retransmitted_bytes_ += num_retransmitted_bytes; 62 num_retransmitted_bytes_ += num_retransmitted_bytes;
60 63
61 if (wrote_last_data_ && pending_acks_ == 0) { 64 if (wrote_last_data_ && pending_acks_ == 0) {
62 delegate_->OnAckNotification(num_original_packets_, 65 delegate_->OnAckNotification(num_original_packets_,
63 num_original_bytes_, 66 num_original_bytes_,
64 num_retransmitted_packets_, 67 num_retransmitted_packets_,
65 num_retransmitted_bytes_); 68 num_retransmitted_bytes_,
69 delta_largest_observed);
66 } 70 }
67 } 71 }
68 72
69 void WroteData(bool last_data) { 73 void WroteData(bool last_data) {
70 DCHECK(!wrote_last_data_); 74 DCHECK(!wrote_last_data_);
71 ++pending_acks_; 75 ++pending_acks_;
72 wrote_last_data_ = last_data; 76 wrote_last_data_ = last_data;
73 } 77 }
74 78
75 protected: 79 protected:
(...skipping 30 matching lines...) Expand all
106 } 110 }
107 111
108 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) 112 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
109 : sequencer_(this), 113 : sequencer_(this),
110 id_(id), 114 id_(id),
111 session_(session), 115 session_(session),
112 stream_bytes_read_(0), 116 stream_bytes_read_(0),
113 stream_bytes_written_(0), 117 stream_bytes_written_(0),
114 stream_error_(QUIC_STREAM_NO_ERROR), 118 stream_error_(QUIC_STREAM_NO_ERROR),
115 connection_error_(QUIC_NO_ERROR), 119 connection_error_(QUIC_NO_ERROR),
116 flow_control_send_limit_(
117 session_->config()->peer_initial_flow_control_window_bytes()),
118 max_flow_control_receive_window_bytes_(
119 session_->connection()->max_flow_control_receive_window_bytes()),
120 flow_control_receive_window_offset_bytes_(
121 session_->connection()->max_flow_control_receive_window_bytes()),
122 read_side_closed_(false), 120 read_side_closed_(false),
123 write_side_closed_(false), 121 write_side_closed_(false),
124 fin_buffered_(false), 122 fin_buffered_(false),
125 fin_sent_(false), 123 fin_sent_(false),
126 rst_sent_(false), 124 rst_sent_(false),
127 is_server_(session_->is_server()) { 125 is_server_(session_->is_server()),
128 DVLOG(1) << ENDPOINT << "Created stream " << id_ 126 flow_controller_(
129 << ", setting initial receive window to: " 127 id_,
130 << flow_control_receive_window_offset_bytes_ 128 is_server_,
131 << ", setting send window to: " << flow_control_send_limit_; 129 session_->config()->peer_initial_flow_control_window_bytes(),
130 session_->connection()->max_flow_control_receive_window_bytes(),
131 session_->connection()->max_flow_control_receive_window_bytes()) {
132 if (session_->connection()->version() < QUIC_VERSION_17) {
133 flow_controller_.Disable();
134 }
132 } 135 }
133 136
134 ReliableQuicStream::~ReliableQuicStream() { 137 ReliableQuicStream::~ReliableQuicStream() {
135 } 138 }
136 139
137 bool ReliableQuicStream::WillAcceptStreamFrame( 140 bool ReliableQuicStream::WillAcceptStreamFrame(
138 const QuicStreamFrame& frame) const { 141 const QuicStreamFrame& frame) const {
139 if (read_side_closed_) { 142 if (read_side_closed_) {
140 return true; 143 return true;
141 } 144 }
(...skipping 10 matching lines...) Expand all
152 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; 155 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
153 // We don't want to be reading: blackhole the data. 156 // We don't want to be reading: blackhole the data.
154 return true; 157 return true;
155 } 158 }
156 159
157 // This count include duplicate data received. 160 // This count include duplicate data received.
158 stream_bytes_read_ += frame.data.TotalBufferSize(); 161 stream_bytes_read_ += frame.data.TotalBufferSize();
159 162
160 bool accepted = sequencer_.OnStreamFrame(frame); 163 bool accepted = sequencer_.OnStreamFrame(frame);
161 164
162 if (IsFlowControlEnabled()) { 165 if (version() >= QUIC_VERSION_17) {
163 if (flow_control_receive_window_offset_bytes_ < TotalReceivedBytes()) { 166 if (flow_controller_.FlowControlViolation()) {
164 // TODO(rjshade): Lower severity from DFATAL once we have established that
165 // flow control is working correctly.
166 LOG(DFATAL)
167 << ENDPOINT << "Flow control violation on stream: " << id()
168 << ", our receive offset is: "
169 << flow_control_receive_window_offset_bytes_
170 << ", we have consumed: " << sequencer_.num_bytes_consumed()
171 << ", we have buffered: " << sequencer_.num_bytes_buffered()
172 << ", total: " << TotalReceivedBytes();
173 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR); 167 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR);
174 return false; 168 return false;
175 } 169 }
176 MaybeSendWindowUpdate(); 170 MaybeSendWindowUpdate();
177 } 171 }
178 172
179 return accepted; 173 return accepted;
180 } 174 }
181 175
182 void ReliableQuicStream::MaybeSendWindowUpdate() { 176 void ReliableQuicStream::MaybeSendWindowUpdate() {
183 if (!IsFlowControlEnabled()) { 177 if (version() >= QUIC_VERSION_17) {
184 return; 178 flow_controller_.MaybeSendWindowUpdate(session()->connection());
185 }
186
187 // Send WindowUpdate to increase receive window if
188 // (receive window offset - consumed bytes) < (max window / 2).
189 // This is behaviour copied from SPDY.
190 size_t consumed_window = flow_control_receive_window_offset_bytes_ -
191 sequencer_.num_bytes_consumed();
192 size_t threshold = (max_flow_control_receive_window_bytes_ / 2);
193 if (consumed_window < threshold) {
194 // Update our receive window.
195 flow_control_receive_window_offset_bytes_ +=
196 (max_flow_control_receive_window_bytes_ - consumed_window);
197 DVLOG(1) << ENDPOINT << "Stream: " << id()
198 << ", sending WindowUpdate frame. "
199 << "Consumed bytes: " << sequencer_.num_bytes_consumed()
200 << ", Receive window offset: "
201 << flow_control_receive_window_offset_bytes_
202 << ", Consumed window: " << consumed_window
203 << ", and threshold: " << threshold
204 << ". New receive window offset is: "
205 << flow_control_receive_window_offset_bytes_;
206
207 // Inform the peer of our new receive window.
208 session()->connection()->SendWindowUpdate(
209 id(), flow_control_receive_window_offset_bytes_);
210 } 179 }
211 } 180 }
212 181
213 int ReliableQuicStream::num_frames_received() { 182 int ReliableQuicStream::num_frames_received() {
214 return sequencer_.num_frames_received(); 183 return sequencer_.num_frames_received();
215 } 184 }
216 185
217 int ReliableQuicStream::num_duplicate_frames_received() { 186 int ReliableQuicStream::num_duplicate_frames_received() {
218 return sequencer_.num_duplicate_frames_received(); 187 return sequencer_.num_duplicate_frames_received();
219 } 188 }
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
344 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 313 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
345 if (write_side_closed_) { 314 if (write_side_closed_) {
346 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 315 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
347 return QuicConsumedData(0, false); 316 return QuicConsumedData(0, false);
348 } 317 }
349 318
350 // How much data we want to write. 319 // How much data we want to write.
351 size_t write_length = TotalIovecLength(iov, iov_count); 320 size_t write_length = TotalIovecLength(iov, iov_count);
352 321
353 // How much data we are allowed to write from flow control. 322 // How much data we are allowed to write from flow control.
354 size_t send_window = SendWindowSize(); 323 size_t send_window = flow_controller_.SendWindowSize();
355 324
356 // A FIN with zero data payload should not be flow control blocked. 325 // A FIN with zero data payload should not be flow control blocked.
357 bool fin_with_zero_data = (fin && write_length == 0); 326 bool fin_with_zero_data = (fin && write_length == 0);
358 327
359 if (IsFlowControlEnabled()) { 328 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) {
360 if (send_window == 0 && !fin_with_zero_data) { 329 if (send_window == 0 && !fin_with_zero_data) {
361 // Quick return if we can't send anything. 330 // Quick return if we can't send anything.
362 session()->connection()->SendBlocked(id()); 331 flow_controller_.MaybeSendBlocked(session()->connection());
363 return QuicConsumedData(0, false); 332 return QuicConsumedData(0, false);
364 } 333 }
365 334
366 if (write_length > send_window) { 335 if (write_length > send_window) {
367 // Don't send the FIN if we aren't going to send all the data. 336 // Don't send the FIN if we aren't going to send all the data.
368 fin = false; 337 fin = false;
369 338
370 // Writing more data would be a violation of flow control. 339 // Writing more data would be a violation of flow control.
371 write_length = send_window; 340 write_length = send_window;
372 } 341 }
373 } 342 }
374 343
375 // Fill an IOVector with bytes from the iovec. 344 // Fill an IOVector with bytes from the iovec.
376 IOVector data; 345 IOVector data;
377 data.AppendIovecAtMostBytes(iov, iov_count, write_length); 346 data.AppendIovecAtMostBytes(iov, iov_count, write_length);
378 347
379 QuicConsumedData consumed_data = session()->WritevData( 348 QuicConsumedData consumed_data = session()->WritevData(
380 id(), data, stream_bytes_written_, fin, ack_notifier_delegate); 349 id(), data, stream_bytes_written_, fin, ack_notifier_delegate);
381 stream_bytes_written_ += consumed_data.bytes_consumed; 350 stream_bytes_written_ += consumed_data.bytes_consumed;
382 351
352 if (version() >= QUIC_VERSION_17 && flow_controller_.IsEnabled()) {
353 flow_controller_.AddBytesSent(consumed_data.bytes_consumed);
354 }
355
383 if (consumed_data.bytes_consumed == write_length) { 356 if (consumed_data.bytes_consumed == write_length) {
384 if (IsFlowControlEnabled() && write_length == send_window && 357 if (!fin_with_zero_data) {
385 !fin_with_zero_data) { 358 if (version() >= QUIC_VERSION_17) {
386 DVLOG(1) << ENDPOINT << "Stream " << id() 359 flow_controller_.MaybeSendBlocked(session()->connection());
387 << " is flow control blocked. " 360 }
388 << "Send window: " << send_window
389 << ", stream_bytes_written: " << stream_bytes_written_
390 << ", flow_control_send_limit: "
391 << flow_control_send_limit_;
392 // The entire send_window has been consumed, we are now flow control
393 // blocked.
394 session()->connection()->SendBlocked(id());
395 } 361 }
396 if (fin && consumed_data.fin_consumed) { 362 if (fin && consumed_data.fin_consumed) {
397 fin_sent_ = true; 363 fin_sent_ = true;
398 CloseWriteSide(); 364 CloseWriteSide();
399 } else if (fin && !consumed_data.fin_consumed) { 365 } else if (fin && !consumed_data.fin_consumed) {
400 session_->MarkWriteBlocked(id(), EffectivePriority()); 366 session_->MarkWriteBlocked(id(), EffectivePriority());
401 } 367 }
402 } else { 368 } else {
403 session_->MarkWriteBlocked(id(), EffectivePriority()); 369 session_->MarkWriteBlocked(id(), EffectivePriority());
404 } 370 }
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
438 void ReliableQuicStream::OnClose() { 404 void ReliableQuicStream::OnClose() {
439 CloseReadSide(); 405 CloseReadSide();
440 CloseWriteSide(); 406 CloseWriteSide();
441 407
442 if (version() > QUIC_VERSION_13 && 408 if (version() > QUIC_VERSION_13 &&
443 !fin_sent_ && !rst_sent_) { 409 !fin_sent_ && !rst_sent_) {
444 // For flow control accounting, we must tell the peer how many bytes we have 410 // For flow control accounting, we must tell the peer how many bytes we have
445 // written on this stream before termination. Done here if needed, using a 411 // written on this stream before termination. Done here if needed, using a
446 // RST frame. 412 // RST frame.
447 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); 413 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
448 session_->SendRstStream(id(), QUIC_STREAM_NO_ERROR, stream_bytes_written_); 414 session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
415 stream_bytes_written_);
449 rst_sent_ = true; 416 rst_sent_ = true;
450 } 417 }
451 } 418 }
452 419
453 void ReliableQuicStream::OnWindowUpdateFrame( 420 void ReliableQuicStream::OnWindowUpdateFrame(
454 const QuicWindowUpdateFrame& frame) { 421 const QuicWindowUpdateFrame& frame) {
455 if (!IsFlowControlEnabled()) { 422 if (!flow_controller_.IsEnabled()) {
456 DLOG(DFATAL) << "Flow control not enabled! " << version(); 423 DLOG(DFATAL) << "Flow control not enabled! " << version();
457 return; 424 return;
458 } 425 }
459 426
460 DVLOG(1) << ENDPOINT 427 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
461 << "OnWindowUpdateFrame for stream " << id() 428 // We can write again!
462 << " with byte offset " << frame.byte_offset 429 // TODO(rjshade): This does not respect priorities (e.g. multiple
463 << " , current offset: " << flow_control_send_limit_ << ")."; 430 // outstanding POSTs are unblocked on arrival of
464 431 // SHLO with initial window).
465 UpdateFlowControlSendLimit(frame.byte_offset); 432 OnCanWrite();
466 }
467
468 void ReliableQuicStream::UpdateFlowControlSendLimit(QuicStreamOffset offset) {
469 if (offset <= flow_control_send_limit_) {
470 DVLOG(1) << ENDPOINT << "Stream " << id()
471 << ", not changing window, current: " << flow_control_send_limit_
472 << " new: " << offset;
473 // No change to our send window.
474 return;
475 } 433 }
476
477 DVLOG(1) << ENDPOINT << "Stream " << id()
478 << ", changing window, current: " << flow_control_send_limit_
479 << " new: " << offset;
480 // Send window has increased.
481 flow_control_send_limit_ = offset;
482
483 // We can write again!
484 // TODO(rjshade): This does not respect priorities (e.g. multiple outstanding
485 // POSTs are unblocked on arrival of SHLO with initial window).
486 OnCanWrite();
487 }
488
489 bool ReliableQuicStream::IsFlowControlBlocked() const {
490 return IsFlowControlEnabled() && SendWindowSize() == 0;
491 }
492
493 uint64 ReliableQuicStream::SendWindowSize() const {
494 return flow_control_send_limit_ - stream_bytes_written();
495 }
496
497 uint64 ReliableQuicStream::TotalReceivedBytes() const {
498 return sequencer_.num_bytes_consumed() + sequencer_.num_bytes_buffered();
499 } 434 }
500 435
501 } // namespace net 436 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698