| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/core/reliable_quic_stream.h" | 5 #include "net/quic/core/quic_stream.h" |
| 6 | 6 |
| 7 #include "base/logging.h" | 7 #include "base/logging.h" |
| 8 #include "net/quic/core/quic_bug_tracker.h" | 8 #include "net/quic/core/quic_bug_tracker.h" |
| 9 #include "net/quic/core/quic_flags.h" | 9 #include "net/quic/core/quic_flags.h" |
| 10 #include "net/quic/core/quic_flow_controller.h" | 10 #include "net/quic/core/quic_flow_controller.h" |
| 11 #include "net/quic/core/quic_session.h" | 11 #include "net/quic/core/quic_session.h" |
| 12 #include "net/quic/core/quic_write_blocked_list.h" | 12 #include "net/quic/core/quic_write_blocked_list.h" |
| 13 | 13 |
| 14 using base::StringPiece; | 14 using base::StringPiece; |
| 15 using std::min; | 15 using std::min; |
| (...skipping 19 matching lines...) Expand all Loading... |
| 35 size_t GetReceivedFlowControlWindow(QuicSession* session) { | 35 size_t GetReceivedFlowControlWindow(QuicSession* session) { |
| 36 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { | 36 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { |
| 37 return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); | 37 return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); |
| 38 } | 38 } |
| 39 | 39 |
| 40 return kMinimumFlowControlSendWindow; | 40 return kMinimumFlowControlSendWindow; |
| 41 } | 41 } |
| 42 | 42 |
| 43 } // namespace | 43 } // namespace |
| 44 | 44 |
| 45 ReliableQuicStream::PendingData::PendingData( | 45 QuicStream::PendingData::PendingData(string data_in, |
| 46 string data_in, | 46 QuicAckListenerInterface* ack_listener_in) |
| 47 QuicAckListenerInterface* ack_listener_in) | |
| 48 : data(std::move(data_in)), offset(0), ack_listener(ack_listener_in) {} | 47 : data(std::move(data_in)), offset(0), ack_listener(ack_listener_in) {} |
| 49 | 48 |
| 50 ReliableQuicStream::PendingData::~PendingData() {} | 49 QuicStream::PendingData::~PendingData() {} |
| 51 | 50 |
| 52 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | 51 QuicStream::QuicStream(QuicStreamId id, QuicSession* session) |
| 53 : queued_data_bytes_(0), | 52 : queued_data_bytes_(0), |
| 54 sequencer_(this, session->connection()->clock()), | 53 sequencer_(this, session->connection()->clock()), |
| 55 id_(id), | 54 id_(id), |
| 56 session_(session), | 55 session_(session), |
| 57 stream_bytes_read_(0), | 56 stream_bytes_read_(0), |
| 58 stream_bytes_written_(0), | 57 stream_bytes_written_(0), |
| 59 stream_error_(QUIC_STREAM_NO_ERROR), | 58 stream_error_(QUIC_STREAM_NO_ERROR), |
| 60 connection_error_(QUIC_NO_ERROR), | 59 connection_error_(QUIC_NO_ERROR), |
| 61 read_side_closed_(false), | 60 read_side_closed_(false), |
| 62 write_side_closed_(false), | 61 write_side_closed_(false), |
| 63 fin_buffered_(false), | 62 fin_buffered_(false), |
| 64 fin_sent_(false), | 63 fin_sent_(false), |
| 65 fin_received_(false), | 64 fin_received_(false), |
| 66 rst_sent_(false), | 65 rst_sent_(false), |
| 67 rst_received_(false), | 66 rst_received_(false), |
| 68 perspective_(session_->perspective()), | 67 perspective_(session_->perspective()), |
| 69 flow_controller_(session_->connection(), | 68 flow_controller_(session_->connection(), |
| 70 id_, | 69 id_, |
| 71 perspective_, | 70 perspective_, |
| 72 GetReceivedFlowControlWindow(session), | 71 GetReceivedFlowControlWindow(session), |
| 73 GetInitialStreamFlowControlWindowToSend(session), | 72 GetInitialStreamFlowControlWindowToSend(session), |
| 74 session_->flow_controller()->auto_tune_receive_window()), | 73 session_->flow_controller()->auto_tune_receive_window()), |
| 75 connection_flow_controller_(session_->flow_controller()), | 74 connection_flow_controller_(session_->flow_controller()), |
| 76 stream_contributes_to_connection_flow_control_(true), | 75 stream_contributes_to_connection_flow_control_(true), |
| 77 busy_counter_(0) { | 76 busy_counter_(0) { |
| 78 SetFromConfig(); | 77 SetFromConfig(); |
| 79 } | 78 } |
| 80 | 79 |
| 81 ReliableQuicStream::~ReliableQuicStream() {} | 80 QuicStream::~QuicStream() {} |
| 82 | 81 |
| 83 void ReliableQuicStream::SetFromConfig() {} | 82 void QuicStream::SetFromConfig() {} |
| 84 | 83 |
| 85 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { | 84 void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) { |
| 86 DCHECK_EQ(frame.stream_id, id_); | 85 DCHECK_EQ(frame.stream_id, id_); |
| 87 | 86 |
| 88 DCHECK(!(read_side_closed_ && write_side_closed_)); | 87 DCHECK(!(read_side_closed_ && write_side_closed_)); |
| 89 | 88 |
| 90 if (frame.fin) { | 89 if (frame.fin) { |
| 91 fin_received_ = true; | 90 fin_received_ = true; |
| 92 if (fin_sent_) { | 91 if (fin_sent_) { |
| 93 session_->StreamDraining(id_); | 92 session_->StreamDraining(id_); |
| 94 } | 93 } |
| 95 } | 94 } |
| 96 | 95 |
| 97 if (read_side_closed_) { | 96 if (read_side_closed_) { |
| 98 DVLOG(1) << ENDPOINT << "Ignoring data in frame " << frame.stream_id; | 97 DVLOG(1) << ENDPOINT << "Stream " << frame.stream_id |
| 98 << " is closed for reading. Ignoring newly received stream data."; |
| 99 // The subclass does not want to read data: blackhole the data. | 99 // The subclass does not want to read data: blackhole the data. |
| 100 return; | 100 return; |
| 101 } | 101 } |
| 102 | 102 |
| 103 // This count includes duplicate data received. | 103 // This count includes duplicate data received. |
| 104 size_t frame_payload_size = frame.data_length; | 104 size_t frame_payload_size = frame.data_length; |
| 105 stream_bytes_read_ += frame_payload_size; | 105 stream_bytes_read_ += frame_payload_size; |
| 106 | 106 |
| 107 // Flow control is interested in tracking highest received offset. | 107 // Flow control is interested in tracking highest received offset. |
| 108 // Only interested in received frames that carry data. | 108 // Only interested in received frames that carry data. |
| 109 if (frame_payload_size > 0 && | 109 if (frame_payload_size > 0 && |
| 110 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { | 110 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { |
| 111 // As the highest received offset has changed, check to see if this is a | 111 // As the highest received offset has changed, check to see if this is a |
| 112 // violation of flow control. | 112 // violation of flow control. |
| 113 if (flow_controller_.FlowControlViolation() || | 113 if (flow_controller_.FlowControlViolation() || |
| 114 connection_flow_controller_->FlowControlViolation()) { | 114 connection_flow_controller_->FlowControlViolation()) { |
| 115 CloseConnectionWithDetails( | 115 CloseConnectionWithDetails( |
| 116 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, | 116 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, |
| 117 "Flow control violation after increasing offset"); | 117 "Flow control violation after increasing offset"); |
| 118 return; | 118 return; |
| 119 } | 119 } |
| 120 } | 120 } |
| 121 | 121 |
| 122 sequencer_.OnStreamFrame(frame); | 122 sequencer_.OnStreamFrame(frame); |
| 123 } | 123 } |
| 124 | 124 |
| 125 int ReliableQuicStream::num_frames_received() const { | 125 int QuicStream::num_frames_received() const { |
| 126 return sequencer_.num_frames_received(); | 126 return sequencer_.num_frames_received(); |
| 127 } | 127 } |
| 128 | 128 |
| 129 int ReliableQuicStream::num_duplicate_frames_received() const { | 129 int QuicStream::num_duplicate_frames_received() const { |
| 130 return sequencer_.num_duplicate_frames_received(); | 130 return sequencer_.num_duplicate_frames_received(); |
| 131 } | 131 } |
| 132 | 132 |
| 133 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { | 133 void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { |
| 134 rst_received_ = true; | 134 rst_received_ = true; |
| 135 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); | 135 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); |
| 136 | 136 |
| 137 stream_error_ = frame.error_code; | 137 stream_error_ = frame.error_code; |
| 138 CloseWriteSide(); | 138 CloseWriteSide(); |
| 139 CloseReadSide(); | 139 CloseReadSide(); |
| 140 } | 140 } |
| 141 | 141 |
| 142 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, | 142 void QuicStream::OnConnectionClosed(QuicErrorCode error, |
| 143 ConnectionCloseSource /*source*/) { | 143 ConnectionCloseSource /*source*/) { |
| 144 if (read_side_closed_ && write_side_closed_) { | 144 if (read_side_closed_ && write_side_closed_) { |
| 145 return; | 145 return; |
| 146 } | 146 } |
| 147 if (error != QUIC_NO_ERROR) { | 147 if (error != QUIC_NO_ERROR) { |
| 148 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; | 148 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; |
| 149 connection_error_ = error; | 149 connection_error_ = error; |
| 150 } | 150 } |
| 151 | 151 |
| 152 CloseWriteSide(); | 152 CloseWriteSide(); |
| 153 CloseReadSide(); | 153 CloseReadSide(); |
| 154 } | 154 } |
| 155 | 155 |
| 156 void ReliableQuicStream::OnFinRead() { | 156 void QuicStream::OnFinRead() { |
| 157 DCHECK(sequencer_.IsClosed()); | 157 DCHECK(sequencer_.IsClosed()); |
| 158 // OnFinRead can be called due to a FIN flag in a headers block, so there may | 158 // OnFinRead can be called due to a FIN flag in a headers block, so there may |
| 159 // have been no OnStreamFrame call with a FIN in the frame. | 159 // have been no OnStreamFrame call with a FIN in the frame. |
| 160 fin_received_ = true; | 160 fin_received_ = true; |
| 161 // If fin_sent_ is true, then CloseWriteSide has already been called, and the | 161 // If fin_sent_ is true, then CloseWriteSide has already been called, and the |
| 162 // stream will be destroyed by CloseReadSide, so don't need to call | 162 // stream will be destroyed by CloseReadSide, so don't need to call |
| 163 // StreamDraining. | 163 // StreamDraining. |
| 164 CloseReadSide(); | 164 CloseReadSide(); |
| 165 } | 165 } |
| 166 | 166 |
| 167 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { | 167 void QuicStream::Reset(QuicRstStreamErrorCode error) { |
| 168 stream_error_ = error; | 168 stream_error_ = error; |
| 169 // Sending a RstStream results in calling CloseStream. | 169 // Sending a RstStream results in calling CloseStream. |
| 170 session()->SendRstStream(id(), error, stream_bytes_written_); | 170 session()->SendRstStream(id(), error, stream_bytes_written_); |
| 171 rst_sent_ = true; | 171 rst_sent_ = true; |
| 172 } | 172 } |
| 173 | 173 |
| 174 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, | 174 void QuicStream::CloseConnectionWithDetails(QuicErrorCode error, |
| 175 const string& details) { | 175 const string& details) { |
| 176 session()->connection()->CloseConnection( | 176 session()->connection()->CloseConnection( |
| 177 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); | 177 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); |
| 178 } | 178 } |
| 179 | 179 |
| 180 void ReliableQuicStream::WriteOrBufferData( | 180 void QuicStream::WriteOrBufferData(StringPiece data, |
| 181 StringPiece data, | 181 bool fin, |
| 182 bool fin, | 182 QuicAckListenerInterface* ack_listener) { |
| 183 QuicAckListenerInterface* ack_listener) { | |
| 184 if (data.empty() && !fin) { | 183 if (data.empty() && !fin) { |
| 185 QUIC_BUG << "data.empty() && !fin"; | 184 QUIC_BUG << "data.empty() && !fin"; |
| 186 return; | 185 return; |
| 187 } | 186 } |
| 188 | 187 |
| 189 if (fin_buffered_) { | 188 if (fin_buffered_) { |
| 190 QUIC_BUG << "Fin already buffered"; | 189 QUIC_BUG << "Fin already buffered"; |
| 191 return; | 190 return; |
| 192 } | 191 } |
| 193 if (write_side_closed_) { | 192 if (write_side_closed_) { |
| (...skipping 12 matching lines...) Expand all Loading... |
| 206 | 205 |
| 207 // If there's unconsumed data or an unconsumed fin, queue it. | 206 // If there's unconsumed data or an unconsumed fin, queue it. |
| 208 if (consumed_data.bytes_consumed < data.length() || | 207 if (consumed_data.bytes_consumed < data.length() || |
| 209 (fin && !consumed_data.fin_consumed)) { | 208 (fin && !consumed_data.fin_consumed)) { |
| 210 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); | 209 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); |
| 211 queued_data_bytes_ += remainder.size(); | 210 queued_data_bytes_ += remainder.size(); |
| 212 queued_data_.emplace_back(remainder.as_string(), ack_listener); | 211 queued_data_.emplace_back(remainder.as_string(), ack_listener); |
| 213 } | 212 } |
| 214 } | 213 } |
| 215 | 214 |
| 216 void ReliableQuicStream::OnCanWrite() { | 215 void QuicStream::OnCanWrite() { |
| 217 bool fin = false; | 216 bool fin = false; |
| 218 while (!queued_data_.empty()) { | 217 while (!queued_data_.empty()) { |
| 219 PendingData* pending_data = &queued_data_.front(); | 218 PendingData* pending_data = &queued_data_.front(); |
| 220 QuicAckListenerInterface* ack_listener = pending_data->ack_listener.get(); | 219 QuicAckListenerInterface* ack_listener = pending_data->ack_listener.get(); |
| 221 if (queued_data_.size() == 1 && fin_buffered_) { | 220 if (queued_data_.size() == 1 && fin_buffered_) { |
| 222 fin = true; | 221 fin = true; |
| 223 } | 222 } |
| 224 if (pending_data->offset > 0 && | 223 if (pending_data->offset > 0 && |
| 225 pending_data->offset >= pending_data->data.size()) { | 224 pending_data->offset >= pending_data->data.size()) { |
| 226 // This should be impossible because offset tracks the amount of | 225 // This should be impossible because offset tracks the amount of |
| (...skipping 13 matching lines...) Expand all Loading... |
| 240 queued_data_.pop_front(); | 239 queued_data_.pop_front(); |
| 241 } else { | 240 } else { |
| 242 if (consumed_data.bytes_consumed > 0) { | 241 if (consumed_data.bytes_consumed > 0) { |
| 243 pending_data->offset += consumed_data.bytes_consumed; | 242 pending_data->offset += consumed_data.bytes_consumed; |
| 244 } | 243 } |
| 245 break; | 244 break; |
| 246 } | 245 } |
| 247 } | 246 } |
| 248 } | 247 } |
| 249 | 248 |
| 250 void ReliableQuicStream::MaybeSendBlocked() { | 249 void QuicStream::MaybeSendBlocked() { |
| 251 flow_controller_.MaybeSendBlocked(); | 250 flow_controller_.MaybeSendBlocked(); |
| 252 if (!stream_contributes_to_connection_flow_control_) { | 251 if (!stream_contributes_to_connection_flow_control_) { |
| 253 return; | 252 return; |
| 254 } | 253 } |
| 255 connection_flow_controller_->MaybeSendBlocked(); | 254 connection_flow_controller_->MaybeSendBlocked(); |
| 256 // If the stream is blocked by connection-level flow control but not by | 255 // If the stream is blocked by connection-level flow control but not by |
| 257 // stream-level flow control, add the stream to the write blocked list so that | 256 // stream-level flow control, add the stream to the write blocked list so that |
| 258 // the stream will be given a chance to write when a connection-level | 257 // the stream will be given a chance to write when a connection-level |
| 259 // WINDOW_UPDATE arrives. | 258 // WINDOW_UPDATE arrives. |
| 260 if (connection_flow_controller_->IsBlocked() && | 259 if (connection_flow_controller_->IsBlocked() && |
| 261 !flow_controller_.IsBlocked()) { | 260 !flow_controller_.IsBlocked()) { |
| 262 session_->MarkConnectionLevelWriteBlocked(id()); | 261 session_->MarkConnectionLevelWriteBlocked(id()); |
| 263 } | 262 } |
| 264 } | 263 } |
| 265 | 264 |
| 266 QuicConsumedData ReliableQuicStream::WritevData( | 265 QuicConsumedData QuicStream::WritevData( |
| 267 const struct iovec* iov, | 266 const struct iovec* iov, |
| 268 int iov_count, | 267 int iov_count, |
| 269 bool fin, | 268 bool fin, |
| 270 QuicAckListenerInterface* ack_listener) { | 269 QuicAckListenerInterface* ack_listener) { |
| 271 if (write_side_closed_) { | 270 if (write_side_closed_) { |
| 272 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | 271 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; |
| 273 return QuicConsumedData(0, false); | 272 return QuicConsumedData(0, false); |
| 274 } | 273 } |
| 275 | 274 |
| 276 // How much data was provided. | 275 // How much data was provided. |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 337 CloseWriteSide(); | 336 CloseWriteSide(); |
| 338 } else if (fin && !consumed_data.fin_consumed) { | 337 } else if (fin && !consumed_data.fin_consumed) { |
| 339 session_->MarkConnectionLevelWriteBlocked(id()); | 338 session_->MarkConnectionLevelWriteBlocked(id()); |
| 340 } | 339 } |
| 341 } else { | 340 } else { |
| 342 session_->MarkConnectionLevelWriteBlocked(id()); | 341 session_->MarkConnectionLevelWriteBlocked(id()); |
| 343 } | 342 } |
| 344 return consumed_data; | 343 return consumed_data; |
| 345 } | 344 } |
| 346 | 345 |
| 347 QuicConsumedData ReliableQuicStream::WritevDataInner( | 346 QuicConsumedData QuicStream::WritevDataInner( |
| 348 QuicIOVector iov, | 347 QuicIOVector iov, |
| 349 QuicStreamOffset offset, | 348 QuicStreamOffset offset, |
| 350 bool fin, | 349 bool fin, |
| 351 QuicAckListenerInterface* ack_notifier_delegate) { | 350 QuicAckListenerInterface* ack_notifier_delegate) { |
| 352 return session()->WritevData(this, id(), iov, offset, fin, | 351 return session()->WritevData(this, id(), iov, offset, fin, |
| 353 ack_notifier_delegate); | 352 ack_notifier_delegate); |
| 354 } | 353 } |
| 355 | 354 |
| 356 void ReliableQuicStream::CloseReadSide() { | 355 void QuicStream::CloseReadSide() { |
| 357 if (read_side_closed_) { | 356 if (read_side_closed_) { |
| 358 return; | 357 return; |
| 359 } | 358 } |
| 360 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); | 359 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); |
| 361 | 360 |
| 362 read_side_closed_ = true; | 361 read_side_closed_ = true; |
| 363 sequencer_.ReleaseBuffer(); | 362 sequencer_.ReleaseBuffer(); |
| 364 | 363 |
| 365 if (write_side_closed_) { | 364 if (write_side_closed_) { |
| 366 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | 365 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
| 367 session_->CloseStream(id()); | 366 session_->CloseStream(id()); |
| 368 } | 367 } |
| 369 } | 368 } |
| 370 | 369 |
| 371 void ReliableQuicStream::CloseWriteSide() { | 370 void QuicStream::CloseWriteSide() { |
| 372 if (write_side_closed_) { | 371 if (write_side_closed_) { |
| 373 return; | 372 return; |
| 374 } | 373 } |
| 375 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); | 374 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); |
| 376 | 375 |
| 377 write_side_closed_ = true; | 376 write_side_closed_ = true; |
| 378 if (read_side_closed_) { | 377 if (read_side_closed_) { |
| 379 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | 378 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); |
| 380 session_->CloseStream(id()); | 379 session_->CloseStream(id()); |
| 381 } | 380 } |
| 382 } | 381 } |
| 383 | 382 |
| 384 bool ReliableQuicStream::HasBufferedData() const { | 383 bool QuicStream::HasBufferedData() const { |
| 385 return !queued_data_.empty(); | 384 return !queued_data_.empty(); |
| 386 } | 385 } |
| 387 | 386 |
| 388 QuicVersion ReliableQuicStream::version() const { | 387 QuicVersion QuicStream::version() const { |
| 389 return session_->connection()->version(); | 388 return session_->connection()->version(); |
| 390 } | 389 } |
| 391 | 390 |
| 392 void ReliableQuicStream::StopReading() { | 391 void QuicStream::StopReading() { |
| 393 DVLOG(1) << ENDPOINT << "Stop reading from stream " << id(); | 392 DVLOG(1) << ENDPOINT << "Stop reading from stream " << id(); |
| 394 sequencer_.StopReading(); | 393 sequencer_.StopReading(); |
| 395 } | 394 } |
| 396 | 395 |
| 397 const IPEndPoint& ReliableQuicStream::PeerAddressOfLatestPacket() const { | 396 const IPEndPoint& QuicStream::PeerAddressOfLatestPacket() const { |
| 398 return session_->connection()->last_packet_source_address(); | 397 return session_->connection()->last_packet_source_address(); |
| 399 } | 398 } |
| 400 | 399 |
| 401 void ReliableQuicStream::OnClose() { | 400 void QuicStream::OnClose() { |
| 402 CloseReadSide(); | 401 CloseReadSide(); |
| 403 CloseWriteSide(); | 402 CloseWriteSide(); |
| 404 | 403 |
| 405 if (!fin_sent_ && !rst_sent_) { | 404 if (!fin_sent_ && !rst_sent_) { |
| 406 // For flow control accounting, tell the peer how many bytes have been | 405 // For flow control accounting, tell the peer how many bytes have been |
| 407 // written on this stream before termination. Done here if needed, using a | 406 // written on this stream before termination. Done here if needed, using a |
| 408 // RST_STREAM frame. | 407 // RST_STREAM frame. |
| 409 DVLOG(1) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id(); | 408 DVLOG(1) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id(); |
| 410 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, | 409 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, |
| 411 stream_bytes_written_); | 410 stream_bytes_written_); |
| 412 rst_sent_ = true; | 411 rst_sent_ = true; |
| 413 } | 412 } |
| 414 | 413 |
| 415 // The stream is being closed and will not process any further incoming bytes. | 414 // The stream is being closed and will not process any further incoming bytes. |
| 416 // As there may be more bytes in flight, to ensure that both endpoints have | 415 // As there may be more bytes in flight, to ensure that both endpoints have |
| 417 // the same connection level flow control state, mark all unreceived or | 416 // the same connection level flow control state, mark all unreceived or |
| 418 // buffered bytes as consumed. | 417 // buffered bytes as consumed. |
| 419 QuicByteCount bytes_to_consume = | 418 QuicByteCount bytes_to_consume = |
| 420 flow_controller_.highest_received_byte_offset() - | 419 flow_controller_.highest_received_byte_offset() - |
| 421 flow_controller_.bytes_consumed(); | 420 flow_controller_.bytes_consumed(); |
| 422 AddBytesConsumed(bytes_to_consume); | 421 AddBytesConsumed(bytes_to_consume); |
| 423 } | 422 } |
| 424 | 423 |
| 425 void ReliableQuicStream::OnWindowUpdateFrame( | 424 void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) { |
| 426 const QuicWindowUpdateFrame& frame) { | |
| 427 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { | 425 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { |
| 428 // Writing can be done again! | 426 // Writing can be done again! |
| 429 // TODO(rjshade): This does not respect priorities (e.g. multiple | 427 // TODO(rjshade): This does not respect priorities (e.g. multiple |
| 430 // outstanding POSTs are unblocked on arrival of | 428 // outstanding POSTs are unblocked on arrival of |
| 431 // SHLO with initial window). | 429 // SHLO with initial window). |
| 432 // As long as the connection is not flow control blocked, write on! | 430 // As long as the connection is not flow control blocked, write on! |
| 433 OnCanWrite(); | 431 OnCanWrite(); |
| 434 } | 432 } |
| 435 } | 433 } |
| 436 | 434 |
| 437 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset( | 435 bool QuicStream::MaybeIncreaseHighestReceivedOffset( |
| 438 QuicStreamOffset new_offset) { | 436 QuicStreamOffset new_offset) { |
| 439 uint64_t increment = | 437 uint64_t increment = |
| 440 new_offset - flow_controller_.highest_received_byte_offset(); | 438 new_offset - flow_controller_.highest_received_byte_offset(); |
| 441 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { | 439 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { |
| 442 return false; | 440 return false; |
| 443 } | 441 } |
| 444 | 442 |
| 445 // If |new_offset| increased the stream flow controller's highest received | 443 // If |new_offset| increased the stream flow controller's highest received |
| 446 // offset, increase the connection flow controller's value by the incremental | 444 // offset, increase the connection flow controller's value by the incremental |
| 447 // difference. | 445 // difference. |
| 448 if (stream_contributes_to_connection_flow_control_) { | 446 if (stream_contributes_to_connection_flow_control_) { |
| 449 connection_flow_controller_->UpdateHighestReceivedOffset( | 447 connection_flow_controller_->UpdateHighestReceivedOffset( |
| 450 connection_flow_controller_->highest_received_byte_offset() + | 448 connection_flow_controller_->highest_received_byte_offset() + |
| 451 increment); | 449 increment); |
| 452 } | 450 } |
| 453 return true; | 451 return true; |
| 454 } | 452 } |
| 455 | 453 |
| 456 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) { | 454 void QuicStream::AddBytesSent(QuicByteCount bytes) { |
| 457 flow_controller_.AddBytesSent(bytes); | 455 flow_controller_.AddBytesSent(bytes); |
| 458 if (stream_contributes_to_connection_flow_control_) { | 456 if (stream_contributes_to_connection_flow_control_) { |
| 459 connection_flow_controller_->AddBytesSent(bytes); | 457 connection_flow_controller_->AddBytesSent(bytes); |
| 460 } | 458 } |
| 461 } | 459 } |
| 462 | 460 |
| 463 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) { | 461 void QuicStream::AddBytesConsumed(QuicByteCount bytes) { |
| 464 // Only adjust stream level flow controller if still reading. | 462 // Only adjust stream level flow controller if still reading. |
| 465 if (!read_side_closed_) { | 463 if (!read_side_closed_) { |
| 466 flow_controller_.AddBytesConsumed(bytes); | 464 flow_controller_.AddBytesConsumed(bytes); |
| 467 } | 465 } |
| 468 | 466 |
| 469 if (stream_contributes_to_connection_flow_control_) { | 467 if (stream_contributes_to_connection_flow_control_) { |
| 470 connection_flow_controller_->AddBytesConsumed(bytes); | 468 connection_flow_controller_->AddBytesConsumed(bytes); |
| 471 } | 469 } |
| 472 } | 470 } |
| 473 | 471 |
| 474 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { | 472 void QuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { |
| 475 if (flow_controller_.UpdateSendWindowOffset(new_window)) { | 473 if (flow_controller_.UpdateSendWindowOffset(new_window)) { |
| 476 OnCanWrite(); | 474 OnCanWrite(); |
| 477 } | 475 } |
| 478 } | 476 } |
| 479 | 477 |
| 480 } // namespace net | 478 } // namespace net |
| OLD | NEW |