| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "net/quic/reliable_quic_stream.h" | |
| 6 | |
| 7 #include "base/logging.h" | |
| 8 #include "base/profiler/scoped_tracker.h" | |
| 9 #include "net/quic/iovector.h" | |
| 10 #include "net/quic/quic_flow_controller.h" | |
| 11 #include "net/quic/quic_session.h" | |
| 12 #include "net/quic/quic_write_blocked_list.h" | |
| 13 | |
| 14 using base::StringPiece; | |
| 15 using std::min; | |
| 16 using std::string; | |
| 17 | |
| 18 namespace net { | |
| 19 | |
| 20 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") | |
| 21 | |
| 22 namespace { | |
| 23 | |
| 24 struct iovec MakeIovec(StringPiece data) { | |
| 25 struct iovec iov = {const_cast<char*>(data.data()), | |
| 26 static_cast<size_t>(data.size())}; | |
| 27 return iov; | |
| 28 } | |
| 29 | |
| 30 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) { | |
| 31 return session->config()->GetInitialStreamFlowControlWindowToSend(); | |
| 32 } | |
| 33 | |
| 34 size_t GetReceivedFlowControlWindow(QuicSession* session) { | |
| 35 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { | |
| 36 return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); | |
| 37 } | |
| 38 | |
| 39 return kMinimumFlowControlSendWindow; | |
| 40 } | |
| 41 | |
| 42 } // namespace | |
| 43 | |
| 44 // Wrapper that aggregates OnAckNotifications for packets sent using | |
| 45 // WriteOrBufferData and delivers them to the original | |
| 46 // QuicAckNotifier::DelegateInterface after all bytes written using | |
| 47 // WriteOrBufferData are acked. This level of indirection is | |
| 48 // necessary because the delegate interface provides no mechanism that | |
| 49 // WriteOrBufferData can use to inform it that the write required | |
| 50 // multiple WritevData calls or that only part of the data has been | |
| 51 // sent out by the time ACKs start arriving. | |
| 52 class ReliableQuicStream::ProxyAckNotifierDelegate | |
| 53 : public QuicAckNotifier::DelegateInterface { | |
| 54 public: | |
| 55 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) | |
| 56 : delegate_(delegate), | |
| 57 pending_acks_(0), | |
| 58 wrote_last_data_(false), | |
| 59 num_retransmitted_packets_(0), | |
| 60 num_retransmitted_bytes_(0) { | |
| 61 } | |
| 62 | |
| 63 void OnAckNotification(int num_retransmitted_packets, | |
| 64 int num_retransmitted_bytes, | |
| 65 QuicTime::Delta delta_largest_observed) override { | |
| 66 DCHECK_LT(0, pending_acks_); | |
| 67 --pending_acks_; | |
| 68 num_retransmitted_packets_ += num_retransmitted_packets; | |
| 69 num_retransmitted_bytes_ += num_retransmitted_bytes; | |
| 70 | |
| 71 if (wrote_last_data_ && pending_acks_ == 0) { | |
| 72 delegate_->OnAckNotification(num_retransmitted_packets_, | |
| 73 num_retransmitted_bytes_, | |
| 74 delta_largest_observed); | |
| 75 } | |
| 76 } | |
| 77 | |
| 78 void WroteData(bool last_data) { | |
| 79 DCHECK(!wrote_last_data_); | |
| 80 ++pending_acks_; | |
| 81 wrote_last_data_ = last_data; | |
| 82 } | |
| 83 | |
| 84 protected: | |
| 85 // Delegates are ref counted. | |
| 86 ~ProxyAckNotifierDelegate() override {} | |
| 87 | |
| 88 private: | |
| 89 // Original delegate. delegate_->OnAckNotification will be called when: | |
| 90 // wrote_last_data_ == true and pending_acks_ == 0 | |
| 91 scoped_refptr<DelegateInterface> delegate_; | |
| 92 | |
| 93 // Number of outstanding acks. | |
| 94 int pending_acks_; | |
| 95 | |
| 96 // True if no pending writes remain. | |
| 97 bool wrote_last_data_; | |
| 98 | |
| 99 // Accumulators. | |
| 100 int num_original_packets_; | |
| 101 int num_original_bytes_; | |
| 102 int num_retransmitted_packets_; | |
| 103 int num_retransmitted_bytes_; | |
| 104 | |
| 105 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); | |
| 106 }; | |
| 107 | |
| 108 ReliableQuicStream::PendingData::PendingData( | |
| 109 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) | |
| 110 : data(data_in), delegate(delegate_in) { | |
| 111 } | |
| 112 | |
| 113 ReliableQuicStream::PendingData::~PendingData() { | |
| 114 } | |
| 115 | |
| 116 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | |
| 117 : sequencer_(this), | |
| 118 id_(id), | |
| 119 session_(session), | |
| 120 stream_bytes_read_(0), | |
| 121 stream_bytes_written_(0), | |
| 122 stream_error_(QUIC_STREAM_NO_ERROR), | |
| 123 connection_error_(QUIC_NO_ERROR), | |
| 124 read_side_closed_(false), | |
| 125 write_side_closed_(false), | |
| 126 fin_buffered_(false), | |
| 127 fin_sent_(false), | |
| 128 fin_received_(false), | |
| 129 rst_sent_(false), | |
| 130 rst_received_(false), | |
| 131 fec_policy_(FEC_PROTECT_OPTIONAL), | |
| 132 is_server_(session_->is_server()), | |
| 133 flow_controller_( | |
| 134 session_->connection(), id_, is_server_, | |
| 135 GetReceivedFlowControlWindow(session), | |
| 136 GetInitialStreamFlowControlWindowToSend(session), | |
| 137 GetInitialStreamFlowControlWindowToSend(session)), | |
| 138 connection_flow_controller_(session_->flow_controller()), | |
| 139 stream_contributes_to_connection_flow_control_(true) { | |
| 140 } | |
| 141 | |
| 142 ReliableQuicStream::~ReliableQuicStream() { | |
| 143 } | |
| 144 | |
| 145 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { | |
| 146 if (read_side_closed_) { | |
| 147 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; | |
| 148 // We don't want to be reading: blackhole the data. | |
| 149 return; | |
| 150 } | |
| 151 | |
| 152 if (frame.stream_id != id_) { | |
| 153 session_->connection()->SendConnectionClose(QUIC_INTERNAL_ERROR); | |
| 154 return; | |
| 155 } | |
| 156 | |
| 157 if (frame.fin) { | |
| 158 fin_received_ = true; | |
| 159 } | |
| 160 | |
| 161 // This count include duplicate data received. | |
| 162 size_t frame_payload_size = frame.data.TotalBufferSize(); | |
| 163 stream_bytes_read_ += frame_payload_size; | |
| 164 | |
| 165 // Flow control is interested in tracking highest received offset. | |
| 166 if (MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { | |
| 167 // As the highest received offset has changed, we should check to see if | |
| 168 // this is a violation of flow control. | |
| 169 if (flow_controller_.FlowControlViolation() || | |
| 170 connection_flow_controller_->FlowControlViolation()) { | |
| 171 session_->connection()->SendConnectionClose( | |
| 172 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); | |
| 173 return; | |
| 174 } | |
| 175 } | |
| 176 | |
| 177 sequencer_.OnStreamFrame(frame); | |
| 178 } | |
| 179 | |
| 180 int ReliableQuicStream::num_frames_received() const { | |
| 181 return sequencer_.num_frames_received(); | |
| 182 } | |
| 183 | |
| 184 int ReliableQuicStream::num_duplicate_frames_received() const { | |
| 185 return sequencer_.num_duplicate_frames_received(); | |
| 186 } | |
| 187 | |
| 188 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { | |
| 189 rst_received_ = true; | |
| 190 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); | |
| 191 | |
| 192 stream_error_ = frame.error_code; | |
| 193 CloseWriteSide(); | |
| 194 CloseReadSide(); | |
| 195 } | |
| 196 | |
| 197 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, | |
| 198 bool from_peer) { | |
| 199 if (read_side_closed_ && write_side_closed_) { | |
| 200 return; | |
| 201 } | |
| 202 if (error != QUIC_NO_ERROR) { | |
| 203 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; | |
| 204 connection_error_ = error; | |
| 205 } | |
| 206 | |
| 207 CloseWriteSide(); | |
| 208 CloseReadSide(); | |
| 209 } | |
| 210 | |
| 211 void ReliableQuicStream::OnFinRead() { | |
| 212 DCHECK(sequencer_.IsClosed()); | |
| 213 fin_received_ = true; | |
| 214 CloseReadSide(); | |
| 215 } | |
| 216 | |
| 217 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { | |
| 218 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); | |
| 219 stream_error_ = error; | |
| 220 // Sending a RstStream results in calling CloseStream. | |
| 221 session()->SendRstStream(id(), error, stream_bytes_written_); | |
| 222 rst_sent_ = true; | |
| 223 } | |
| 224 | |
| 225 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { | |
| 226 // TODO(vadimt): Remove ScopedTracker below once crbug.com/422516 is fixed. | |
| 227 tracked_objects::ScopedTracker tracking_profile( | |
| 228 FROM_HERE_WITH_EXPLICIT_FUNCTION( | |
| 229 "422516 ReliableQuicStream::CloseConnection")); | |
| 230 | |
| 231 session()->connection()->SendConnectionClose(error); | |
| 232 } | |
| 233 | |
| 234 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, | |
| 235 const string& details) { | |
| 236 session()->connection()->SendConnectionCloseWithDetails(error, details); | |
| 237 } | |
| 238 | |
| 239 QuicVersion ReliableQuicStream::version() const { | |
| 240 return session()->connection()->version(); | |
| 241 } | |
| 242 | |
| 243 void ReliableQuicStream::WriteOrBufferData( | |
| 244 StringPiece data, | |
| 245 bool fin, | |
| 246 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | |
| 247 if (data.empty() && !fin) { | |
| 248 LOG(DFATAL) << "data.empty() && !fin"; | |
| 249 return; | |
| 250 } | |
| 251 | |
| 252 if (fin_buffered_) { | |
| 253 LOG(DFATAL) << "Fin already buffered"; | |
| 254 return; | |
| 255 } | |
| 256 | |
| 257 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; | |
| 258 if (ack_notifier_delegate != nullptr) { | |
| 259 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); | |
| 260 } | |
| 261 | |
| 262 QuicConsumedData consumed_data(0, false); | |
| 263 fin_buffered_ = fin; | |
| 264 | |
| 265 if (queued_data_.empty()) { | |
| 266 struct iovec iov(MakeIovec(data)); | |
| 267 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); | |
| 268 DCHECK_LE(consumed_data.bytes_consumed, data.length()); | |
| 269 } | |
| 270 | |
| 271 bool write_completed; | |
| 272 // If there's unconsumed data or an unconsumed fin, queue it. | |
| 273 if (consumed_data.bytes_consumed < data.length() || | |
| 274 (fin && !consumed_data.fin_consumed)) { | |
| 275 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); | |
| 276 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); | |
| 277 write_completed = false; | |
| 278 } else { | |
| 279 write_completed = true; | |
| 280 } | |
| 281 | |
| 282 if ((proxy_delegate.get() != nullptr) && | |
| 283 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { | |
| 284 proxy_delegate->WroteData(write_completed); | |
| 285 } | |
| 286 } | |
| 287 | |
| 288 void ReliableQuicStream::OnCanWrite() { | |
| 289 bool fin = false; | |
| 290 while (!queued_data_.empty()) { | |
| 291 PendingData* pending_data = &queued_data_.front(); | |
| 292 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); | |
| 293 if (queued_data_.size() == 1 && fin_buffered_) { | |
| 294 fin = true; | |
| 295 } | |
| 296 struct iovec iov(MakeIovec(pending_data->data)); | |
| 297 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); | |
| 298 if (consumed_data.bytes_consumed == pending_data->data.size() && | |
| 299 fin == consumed_data.fin_consumed) { | |
| 300 queued_data_.pop_front(); | |
| 301 if (delegate != nullptr) { | |
| 302 delegate->WroteData(true); | |
| 303 } | |
| 304 } else { | |
| 305 if (consumed_data.bytes_consumed > 0) { | |
| 306 pending_data->data.erase(0, consumed_data.bytes_consumed); | |
| 307 if (delegate != nullptr) { | |
| 308 delegate->WroteData(false); | |
| 309 } | |
| 310 } | |
| 311 break; | |
| 312 } | |
| 313 } | |
| 314 } | |
| 315 | |
| 316 void ReliableQuicStream::MaybeSendBlocked() { | |
| 317 flow_controller_.MaybeSendBlocked(); | |
| 318 if (!stream_contributes_to_connection_flow_control_) { | |
| 319 return; | |
| 320 } | |
| 321 connection_flow_controller_->MaybeSendBlocked(); | |
| 322 // If we are connection level flow control blocked, then add the stream | |
| 323 // to the write blocked list. It will be given a chance to write when a | |
| 324 // connection level WINDOW_UPDATE arrives. | |
| 325 if (connection_flow_controller_->IsBlocked() && | |
| 326 !flow_controller_.IsBlocked()) { | |
| 327 session_->MarkWriteBlocked(id(), EffectivePriority()); | |
| 328 } | |
| 329 } | |
| 330 | |
| 331 QuicConsumedData ReliableQuicStream::WritevData( | |
| 332 const struct iovec* iov, | |
| 333 int iov_count, | |
| 334 bool fin, | |
| 335 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | |
| 336 if (write_side_closed_) { | |
| 337 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | |
| 338 return QuicConsumedData(0, false); | |
| 339 } | |
| 340 | |
| 341 // How much data we want to write. | |
| 342 size_t write_length = TotalIovecLength(iov, iov_count); | |
| 343 | |
| 344 // A FIN with zero data payload should not be flow control blocked. | |
| 345 bool fin_with_zero_data = (fin && write_length == 0); | |
| 346 | |
| 347 if (flow_controller_.IsEnabled()) { | |
| 348 // How much data we are allowed to write from flow control. | |
| 349 QuicByteCount send_window = flow_controller_.SendWindowSize(); | |
| 350 if (stream_contributes_to_connection_flow_control_) { | |
| 351 send_window = | |
| 352 min(send_window, connection_flow_controller_->SendWindowSize()); | |
| 353 } | |
| 354 | |
| 355 if (send_window == 0 && !fin_with_zero_data) { | |
| 356 // Quick return if we can't send anything. | |
| 357 MaybeSendBlocked(); | |
| 358 return QuicConsumedData(0, false); | |
| 359 } | |
| 360 | |
| 361 if (write_length > send_window) { | |
| 362 // Don't send the FIN if we aren't going to send all the data. | |
| 363 fin = false; | |
| 364 | |
| 365 // Writing more data would be a violation of flow control. | |
| 366 write_length = static_cast<size_t>(send_window); | |
| 367 } | |
| 368 } | |
| 369 | |
| 370 // Fill an IOVector with bytes from the iovec. | |
| 371 IOVector data; | |
| 372 data.AppendIovecAtMostBytes(iov, iov_count, write_length); | |
| 373 | |
| 374 QuicConsumedData consumed_data = session()->WritevData( | |
| 375 id(), data, stream_bytes_written_, fin, GetFecProtection(), | |
| 376 ack_notifier_delegate); | |
| 377 stream_bytes_written_ += consumed_data.bytes_consumed; | |
| 378 | |
| 379 AddBytesSent(consumed_data.bytes_consumed); | |
| 380 | |
| 381 if (consumed_data.bytes_consumed == write_length) { | |
| 382 if (!fin_with_zero_data) { | |
| 383 MaybeSendBlocked(); | |
| 384 } | |
| 385 if (fin && consumed_data.fin_consumed) { | |
| 386 fin_sent_ = true; | |
| 387 CloseWriteSide(); | |
| 388 } else if (fin && !consumed_data.fin_consumed) { | |
| 389 session_->MarkWriteBlocked(id(), EffectivePriority()); | |
| 390 } | |
| 391 } else { | |
| 392 session_->MarkWriteBlocked(id(), EffectivePriority()); | |
| 393 } | |
| 394 return consumed_data; | |
| 395 } | |
| 396 | |
| 397 FecProtection ReliableQuicStream::GetFecProtection() { | |
| 398 return fec_policy_ == FEC_PROTECT_ALWAYS ? MUST_FEC_PROTECT : MAY_FEC_PROTECT; | |
| 399 } | |
| 400 | |
| 401 void ReliableQuicStream::CloseReadSide() { | |
| 402 if (read_side_closed_) { | |
| 403 return; | |
| 404 } | |
| 405 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); | |
| 406 | |
| 407 read_side_closed_ = true; | |
| 408 if (write_side_closed_) { | |
| 409 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
| 410 session_->CloseStream(id()); | |
| 411 } | |
| 412 } | |
| 413 | |
| 414 void ReliableQuicStream::CloseWriteSide() { | |
| 415 if (write_side_closed_) { | |
| 416 return; | |
| 417 } | |
| 418 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); | |
| 419 | |
| 420 write_side_closed_ = true; | |
| 421 if (read_side_closed_) { | |
| 422 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
| 423 session_->CloseStream(id()); | |
| 424 } | |
| 425 } | |
| 426 | |
| 427 bool ReliableQuicStream::HasBufferedData() const { | |
| 428 return !queued_data_.empty(); | |
| 429 } | |
| 430 | |
| 431 void ReliableQuicStream::OnClose() { | |
| 432 CloseReadSide(); | |
| 433 CloseWriteSide(); | |
| 434 | |
| 435 if (!fin_sent_ && !rst_sent_) { | |
| 436 // For flow control accounting, we must tell the peer how many bytes we have | |
| 437 // written on this stream before termination. Done here if needed, using a | |
| 438 // RST frame. | |
| 439 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); | |
| 440 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, | |
| 441 stream_bytes_written_); | |
| 442 rst_sent_ = true; | |
| 443 } | |
| 444 | |
| 445 // We are closing the stream and will not process any further incoming bytes. | |
| 446 // As there may be more bytes in flight and we need to ensure that both | |
| 447 // endpoints have the same connection level flow control state, mark all | |
| 448 // unreceived or buffered bytes as consumed. | |
| 449 QuicByteCount bytes_to_consume = | |
| 450 flow_controller_.highest_received_byte_offset() - | |
| 451 flow_controller_.bytes_consumed(); | |
| 452 AddBytesConsumed(bytes_to_consume); | |
| 453 } | |
| 454 | |
| 455 void ReliableQuicStream::OnWindowUpdateFrame( | |
| 456 const QuicWindowUpdateFrame& frame) { | |
| 457 if (!flow_controller_.IsEnabled()) { | |
| 458 DLOG(DFATAL) << "Flow control not enabled! " << version(); | |
| 459 return; | |
| 460 } | |
| 461 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { | |
| 462 // We can write again! | |
| 463 // TODO(rjshade): This does not respect priorities (e.g. multiple | |
| 464 // outstanding POSTs are unblocked on arrival of | |
| 465 // SHLO with initial window). | |
| 466 // As long as the connection is not flow control blocked, we can write! | |
| 467 OnCanWrite(); | |
| 468 } | |
| 469 } | |
| 470 | |
| 471 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset( | |
| 472 QuicStreamOffset new_offset) { | |
| 473 if (!flow_controller_.IsEnabled()) { | |
| 474 return false; | |
| 475 } | |
| 476 uint64 increment = | |
| 477 new_offset - flow_controller_.highest_received_byte_offset(); | |
| 478 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { | |
| 479 return false; | |
| 480 } | |
| 481 | |
| 482 // If |new_offset| increased the stream flow controller's highest received | |
| 483 // offset, then we need to increase the connection flow controller's value | |
| 484 // by the incremental difference. | |
| 485 if (stream_contributes_to_connection_flow_control_) { | |
| 486 connection_flow_controller_->UpdateHighestReceivedOffset( | |
| 487 connection_flow_controller_->highest_received_byte_offset() + | |
| 488 increment); | |
| 489 } | |
| 490 return true; | |
| 491 } | |
| 492 | |
| 493 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) { | |
| 494 if (flow_controller_.IsEnabled()) { | |
| 495 flow_controller_.AddBytesSent(bytes); | |
| 496 if (stream_contributes_to_connection_flow_control_) { | |
| 497 connection_flow_controller_->AddBytesSent(bytes); | |
| 498 } | |
| 499 } | |
| 500 } | |
| 501 | |
| 502 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) { | |
| 503 if (flow_controller_.IsEnabled()) { | |
| 504 // Only adjust stream level flow controller if we are still reading. | |
| 505 if (!read_side_closed_) { | |
| 506 flow_controller_.AddBytesConsumed(bytes); | |
| 507 } | |
| 508 | |
| 509 if (stream_contributes_to_connection_flow_control_) { | |
| 510 connection_flow_controller_->AddBytesConsumed(bytes); | |
| 511 } | |
| 512 } | |
| 513 } | |
| 514 | |
| 515 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { | |
| 516 if (flow_controller_.UpdateSendWindowOffset(new_window)) { | |
| 517 OnCanWrite(); | |
| 518 } | |
| 519 } | |
| 520 | |
| 521 bool ReliableQuicStream::IsFlowControlBlocked() { | |
| 522 if (flow_controller_.IsBlocked()) { | |
| 523 return true; | |
| 524 } | |
| 525 return stream_contributes_to_connection_flow_control_ && | |
| 526 connection_flow_controller_->IsBlocked(); | |
| 527 } | |
| 528 | |
| 529 } // namespace net | |
| OLD | NEW |