| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "net/quic/core/reliable_quic_stream.h" | |
| 6 | |
| 7 #include "base/logging.h" | |
| 8 #include "net/quic/core/quic_bug_tracker.h" | |
| 9 #include "net/quic/core/quic_flags.h" | |
| 10 #include "net/quic/core/quic_flow_controller.h" | |
| 11 #include "net/quic/core/quic_session.h" | |
| 12 #include "net/quic/core/quic_write_blocked_list.h" | |
| 13 | |
| 14 using base::StringPiece; | |
| 15 using std::min; | |
| 16 using std::string; | |
| 17 | |
| 18 namespace net { | |
| 19 | |
| 20 #define ENDPOINT \ | |
| 21 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ") | |
| 22 | |
| 23 namespace { | |
| 24 | |
| 25 struct iovec MakeIovec(StringPiece data) { | |
| 26 struct iovec iov = {const_cast<char*>(data.data()), | |
| 27 static_cast<size_t>(data.size())}; | |
| 28 return iov; | |
| 29 } | |
| 30 | |
| 31 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) { | |
| 32 return session->config()->GetInitialStreamFlowControlWindowToSend(); | |
| 33 } | |
| 34 | |
| 35 size_t GetReceivedFlowControlWindow(QuicSession* session) { | |
| 36 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { | |
| 37 return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); | |
| 38 } | |
| 39 | |
| 40 return kMinimumFlowControlSendWindow; | |
| 41 } | |
| 42 | |
| 43 } // namespace | |
| 44 | |
| 45 ReliableQuicStream::PendingData::PendingData( | |
| 46 string data_in, | |
| 47 QuicAckListenerInterface* ack_listener_in) | |
| 48 : data(std::move(data_in)), offset(0), ack_listener(ack_listener_in) {} | |
| 49 | |
| 50 ReliableQuicStream::PendingData::~PendingData() {} | |
| 51 | |
| 52 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | |
| 53 : queued_data_bytes_(0), | |
| 54 sequencer_(this, session->connection()->clock()), | |
| 55 id_(id), | |
| 56 session_(session), | |
| 57 stream_bytes_read_(0), | |
| 58 stream_bytes_written_(0), | |
| 59 stream_error_(QUIC_STREAM_NO_ERROR), | |
| 60 connection_error_(QUIC_NO_ERROR), | |
| 61 read_side_closed_(false), | |
| 62 write_side_closed_(false), | |
| 63 fin_buffered_(false), | |
| 64 fin_sent_(false), | |
| 65 fin_received_(false), | |
| 66 rst_sent_(false), | |
| 67 rst_received_(false), | |
| 68 perspective_(session_->perspective()), | |
| 69 flow_controller_(session_->connection(), | |
| 70 id_, | |
| 71 perspective_, | |
| 72 GetReceivedFlowControlWindow(session), | |
| 73 GetInitialStreamFlowControlWindowToSend(session), | |
| 74 session_->flow_controller()->auto_tune_receive_window()), | |
| 75 connection_flow_controller_(session_->flow_controller()), | |
| 76 stream_contributes_to_connection_flow_control_(true), | |
| 77 busy_counter_(0) { | |
| 78 SetFromConfig(); | |
| 79 } | |
| 80 | |
| 81 ReliableQuicStream::~ReliableQuicStream() {} | |
| 82 | |
| 83 void ReliableQuicStream::SetFromConfig() {} | |
| 84 | |
| 85 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { | |
| 86 DCHECK_EQ(frame.stream_id, id_); | |
| 87 | |
| 88 DCHECK(!(read_side_closed_ && write_side_closed_)); | |
| 89 | |
| 90 if (frame.fin) { | |
| 91 fin_received_ = true; | |
| 92 if (fin_sent_) { | |
| 93 session_->StreamDraining(id_); | |
| 94 } | |
| 95 } | |
| 96 | |
| 97 if (read_side_closed_) { | |
| 98 DVLOG(1) << ENDPOINT << "Ignoring data in frame " << frame.stream_id; | |
| 99 // The subclass does not want to read data: blackhole the data. | |
| 100 return; | |
| 101 } | |
| 102 | |
| 103 // This count includes duplicate data received. | |
| 104 size_t frame_payload_size = frame.data_length; | |
| 105 stream_bytes_read_ += frame_payload_size; | |
| 106 | |
| 107 // Flow control is interested in tracking highest received offset. | |
| 108 // Only interested in received frames that carry data. | |
| 109 if (frame_payload_size > 0 && | |
| 110 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { | |
| 111 // As the highest received offset has changed, check to see if this is a | |
| 112 // violation of flow control. | |
| 113 if (flow_controller_.FlowControlViolation() || | |
| 114 connection_flow_controller_->FlowControlViolation()) { | |
| 115 CloseConnectionWithDetails( | |
| 116 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, | |
| 117 "Flow control violation after increasing offset"); | |
| 118 return; | |
| 119 } | |
| 120 } | |
| 121 | |
| 122 sequencer_.OnStreamFrame(frame); | |
| 123 } | |
| 124 | |
| 125 int ReliableQuicStream::num_frames_received() const { | |
| 126 return sequencer_.num_frames_received(); | |
| 127 } | |
| 128 | |
| 129 int ReliableQuicStream::num_duplicate_frames_received() const { | |
| 130 return sequencer_.num_duplicate_frames_received(); | |
| 131 } | |
| 132 | |
| 133 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { | |
| 134 rst_received_ = true; | |
| 135 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); | |
| 136 | |
| 137 stream_error_ = frame.error_code; | |
| 138 CloseWriteSide(); | |
| 139 CloseReadSide(); | |
| 140 } | |
| 141 | |
| 142 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, | |
| 143 ConnectionCloseSource /*source*/) { | |
| 144 if (read_side_closed_ && write_side_closed_) { | |
| 145 return; | |
| 146 } | |
| 147 if (error != QUIC_NO_ERROR) { | |
| 148 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; | |
| 149 connection_error_ = error; | |
| 150 } | |
| 151 | |
| 152 CloseWriteSide(); | |
| 153 CloseReadSide(); | |
| 154 } | |
| 155 | |
| 156 void ReliableQuicStream::OnFinRead() { | |
| 157 DCHECK(sequencer_.IsClosed()); | |
| 158 // OnFinRead can be called due to a FIN flag in a headers block, so there may | |
| 159 // have been no OnStreamFrame call with a FIN in the frame. | |
| 160 fin_received_ = true; | |
| 161 // If fin_sent_ is true, then CloseWriteSide has already been called, and the | |
| 162 // stream will be destroyed by CloseReadSide, so don't need to call | |
| 163 // StreamDraining. | |
| 164 CloseReadSide(); | |
| 165 } | |
| 166 | |
| 167 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { | |
| 168 stream_error_ = error; | |
| 169 // Sending a RstStream results in calling CloseStream. | |
| 170 session()->SendRstStream(id(), error, stream_bytes_written_); | |
| 171 rst_sent_ = true; | |
| 172 } | |
| 173 | |
| 174 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, | |
| 175 const string& details) { | |
| 176 session()->connection()->CloseConnection( | |
| 177 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); | |
| 178 } | |
| 179 | |
| 180 void ReliableQuicStream::WriteOrBufferData( | |
| 181 StringPiece data, | |
| 182 bool fin, | |
| 183 QuicAckListenerInterface* ack_listener) { | |
| 184 if (data.empty() && !fin) { | |
| 185 QUIC_BUG << "data.empty() && !fin"; | |
| 186 return; | |
| 187 } | |
| 188 | |
| 189 if (fin_buffered_) { | |
| 190 QUIC_BUG << "Fin already buffered"; | |
| 191 return; | |
| 192 } | |
| 193 if (write_side_closed_) { | |
| 194 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | |
| 195 return; | |
| 196 } | |
| 197 | |
| 198 QuicConsumedData consumed_data(0, false); | |
| 199 fin_buffered_ = fin; | |
| 200 | |
| 201 if (queued_data_.empty()) { | |
| 202 struct iovec iov(MakeIovec(data)); | |
| 203 consumed_data = WritevData(&iov, 1, fin, ack_listener); | |
| 204 DCHECK_LE(consumed_data.bytes_consumed, data.length()); | |
| 205 } | |
| 206 | |
| 207 // If there's unconsumed data or an unconsumed fin, queue it. | |
| 208 if (consumed_data.bytes_consumed < data.length() || | |
| 209 (fin && !consumed_data.fin_consumed)) { | |
| 210 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); | |
| 211 queued_data_bytes_ += remainder.size(); | |
| 212 queued_data_.emplace_back(remainder.as_string(), ack_listener); | |
| 213 } | |
| 214 } | |
| 215 | |
| 216 void ReliableQuicStream::OnCanWrite() { | |
| 217 bool fin = false; | |
| 218 while (!queued_data_.empty()) { | |
| 219 PendingData* pending_data = &queued_data_.front(); | |
| 220 QuicAckListenerInterface* ack_listener = pending_data->ack_listener.get(); | |
| 221 if (queued_data_.size() == 1 && fin_buffered_) { | |
| 222 fin = true; | |
| 223 } | |
| 224 if (pending_data->offset > 0 && | |
| 225 pending_data->offset >= pending_data->data.size()) { | |
| 226 // This should be impossible because offset tracks the amount of | |
| 227 // pending_data written thus far. | |
| 228 QUIC_BUG << "Pending offset is beyond available data. offset: " | |
| 229 << pending_data->offset << " vs: " << pending_data->data.size(); | |
| 230 return; | |
| 231 } | |
| 232 size_t remaining_len = pending_data->data.size() - pending_data->offset; | |
| 233 struct iovec iov = { | |
| 234 const_cast<char*>(pending_data->data.data()) + pending_data->offset, | |
| 235 remaining_len}; | |
| 236 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, ack_listener); | |
| 237 queued_data_bytes_ -= consumed_data.bytes_consumed; | |
| 238 if (consumed_data.bytes_consumed == remaining_len && | |
| 239 fin == consumed_data.fin_consumed) { | |
| 240 queued_data_.pop_front(); | |
| 241 } else { | |
| 242 if (consumed_data.bytes_consumed > 0) { | |
| 243 pending_data->offset += consumed_data.bytes_consumed; | |
| 244 } | |
| 245 break; | |
| 246 } | |
| 247 } | |
| 248 } | |
| 249 | |
| 250 void ReliableQuicStream::MaybeSendBlocked() { | |
| 251 flow_controller_.MaybeSendBlocked(); | |
| 252 if (!stream_contributes_to_connection_flow_control_) { | |
| 253 return; | |
| 254 } | |
| 255 connection_flow_controller_->MaybeSendBlocked(); | |
| 256 // If the stream is blocked by connection-level flow control but not by | |
| 257 // stream-level flow control, add the stream to the write blocked list so that | |
| 258 // the stream will be given a chance to write when a connection-level | |
| 259 // WINDOW_UPDATE arrives. | |
| 260 if (connection_flow_controller_->IsBlocked() && | |
| 261 !flow_controller_.IsBlocked()) { | |
| 262 session_->MarkConnectionLevelWriteBlocked(id()); | |
| 263 } | |
| 264 } | |
| 265 | |
| 266 QuicConsumedData ReliableQuicStream::WritevData( | |
| 267 const struct iovec* iov, | |
| 268 int iov_count, | |
| 269 bool fin, | |
| 270 QuicAckListenerInterface* ack_listener) { | |
| 271 if (write_side_closed_) { | |
| 272 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | |
| 273 return QuicConsumedData(0, false); | |
| 274 } | |
| 275 | |
| 276 // How much data was provided. | |
| 277 size_t write_length = 0; | |
| 278 if (iov != nullptr) { | |
| 279 for (int i = 0; i < iov_count; ++i) { | |
| 280 write_length += iov[i].iov_len; | |
| 281 } | |
| 282 } | |
| 283 | |
| 284 // A FIN with zero data payload should not be flow control blocked. | |
| 285 bool fin_with_zero_data = (fin && write_length == 0); | |
| 286 | |
| 287 // How much data flow control permits to be written. | |
| 288 QuicByteCount send_window = flow_controller_.SendWindowSize(); | |
| 289 if (stream_contributes_to_connection_flow_control_) { | |
| 290 send_window = | |
| 291 min(send_window, connection_flow_controller_->SendWindowSize()); | |
| 292 } | |
| 293 | |
| 294 if (session_->ShouldYield(id())) { | |
| 295 session_->MarkConnectionLevelWriteBlocked(id()); | |
| 296 return QuicConsumedData(0, false); | |
| 297 } | |
| 298 | |
| 299 if (send_window == 0 && !fin_with_zero_data) { | |
| 300 // Quick return if nothing can be sent. | |
| 301 MaybeSendBlocked(); | |
| 302 return QuicConsumedData(0, false); | |
| 303 } | |
| 304 | |
| 305 if (write_length > send_window) { | |
| 306 // Don't send the FIN unless all the data will be sent. | |
| 307 fin = false; | |
| 308 | |
| 309 // Writing more data would be a violation of flow control. | |
| 310 write_length = static_cast<size_t>(send_window); | |
| 311 DVLOG(1) << "stream " << id() << " shortens write length to " | |
| 312 << write_length << " due to flow control"; | |
| 313 } | |
| 314 | |
| 315 QuicConsumedData consumed_data = | |
| 316 WritevDataInner(QuicIOVector(iov, iov_count, write_length), | |
| 317 stream_bytes_written_, fin, ack_listener); | |
| 318 stream_bytes_written_ += consumed_data.bytes_consumed; | |
| 319 | |
| 320 AddBytesSent(consumed_data.bytes_consumed); | |
| 321 | |
| 322 // The write may have generated a write error causing this stream to be | |
| 323 // closed. If so, simply return without marking the stream write blocked. | |
| 324 if (write_side_closed_) { | |
| 325 return consumed_data; | |
| 326 } | |
| 327 | |
| 328 if (consumed_data.bytes_consumed == write_length) { | |
| 329 if (!fin_with_zero_data) { | |
| 330 MaybeSendBlocked(); | |
| 331 } | |
| 332 if (fin && consumed_data.fin_consumed) { | |
| 333 fin_sent_ = true; | |
| 334 if (fin_received_) { | |
| 335 session_->StreamDraining(id_); | |
| 336 } | |
| 337 CloseWriteSide(); | |
| 338 } else if (fin && !consumed_data.fin_consumed) { | |
| 339 session_->MarkConnectionLevelWriteBlocked(id()); | |
| 340 } | |
| 341 } else { | |
| 342 session_->MarkConnectionLevelWriteBlocked(id()); | |
| 343 } | |
| 344 return consumed_data; | |
| 345 } | |
| 346 | |
| 347 QuicConsumedData ReliableQuicStream::WritevDataInner( | |
| 348 QuicIOVector iov, | |
| 349 QuicStreamOffset offset, | |
| 350 bool fin, | |
| 351 QuicAckListenerInterface* ack_notifier_delegate) { | |
| 352 return session()->WritevData(this, id(), iov, offset, fin, | |
| 353 ack_notifier_delegate); | |
| 354 } | |
| 355 | |
| 356 void ReliableQuicStream::CloseReadSide() { | |
| 357 if (read_side_closed_) { | |
| 358 return; | |
| 359 } | |
| 360 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); | |
| 361 | |
| 362 read_side_closed_ = true; | |
| 363 sequencer_.ReleaseBuffer(); | |
| 364 | |
| 365 if (write_side_closed_) { | |
| 366 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
| 367 session_->CloseStream(id()); | |
| 368 } | |
| 369 } | |
| 370 | |
| 371 void ReliableQuicStream::CloseWriteSide() { | |
| 372 if (write_side_closed_) { | |
| 373 return; | |
| 374 } | |
| 375 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); | |
| 376 | |
| 377 write_side_closed_ = true; | |
| 378 if (read_side_closed_) { | |
| 379 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
| 380 session_->CloseStream(id()); | |
| 381 } | |
| 382 } | |
| 383 | |
| 384 bool ReliableQuicStream::HasBufferedData() const { | |
| 385 return !queued_data_.empty(); | |
| 386 } | |
| 387 | |
| 388 QuicVersion ReliableQuicStream::version() const { | |
| 389 return session_->connection()->version(); | |
| 390 } | |
| 391 | |
| 392 void ReliableQuicStream::StopReading() { | |
| 393 DVLOG(1) << ENDPOINT << "Stop reading from stream " << id(); | |
| 394 sequencer_.StopReading(); | |
| 395 } | |
| 396 | |
| 397 const IPEndPoint& ReliableQuicStream::PeerAddressOfLatestPacket() const { | |
| 398 return session_->connection()->last_packet_source_address(); | |
| 399 } | |
| 400 | |
| 401 void ReliableQuicStream::OnClose() { | |
| 402 CloseReadSide(); | |
| 403 CloseWriteSide(); | |
| 404 | |
| 405 if (!fin_sent_ && !rst_sent_) { | |
| 406 // For flow control accounting, tell the peer how many bytes have been | |
| 407 // written on this stream before termination. Done here if needed, using a | |
| 408 // RST_STREAM frame. | |
| 409 DVLOG(1) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id(); | |
| 410 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, | |
| 411 stream_bytes_written_); | |
| 412 rst_sent_ = true; | |
| 413 } | |
| 414 | |
| 415 // The stream is being closed and will not process any further incoming bytes. | |
| 416 // As there may be more bytes in flight, to ensure that both endpoints have | |
| 417 // the same connection level flow control state, mark all unreceived or | |
| 418 // buffered bytes as consumed. | |
| 419 QuicByteCount bytes_to_consume = | |
| 420 flow_controller_.highest_received_byte_offset() - | |
| 421 flow_controller_.bytes_consumed(); | |
| 422 AddBytesConsumed(bytes_to_consume); | |
| 423 } | |
| 424 | |
| 425 void ReliableQuicStream::OnWindowUpdateFrame( | |
| 426 const QuicWindowUpdateFrame& frame) { | |
| 427 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { | |
| 428 // Writing can be done again! | |
| 429 // TODO(rjshade): This does not respect priorities (e.g. multiple | |
| 430 // outstanding POSTs are unblocked on arrival of | |
| 431 // SHLO with initial window). | |
| 432 // As long as the connection is not flow control blocked, write on! | |
| 433 OnCanWrite(); | |
| 434 } | |
| 435 } | |
| 436 | |
| 437 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset( | |
| 438 QuicStreamOffset new_offset) { | |
| 439 uint64_t increment = | |
| 440 new_offset - flow_controller_.highest_received_byte_offset(); | |
| 441 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { | |
| 442 return false; | |
| 443 } | |
| 444 | |
| 445 // If |new_offset| increased the stream flow controller's highest received | |
| 446 // offset, increase the connection flow controller's value by the incremental | |
| 447 // difference. | |
| 448 if (stream_contributes_to_connection_flow_control_) { | |
| 449 connection_flow_controller_->UpdateHighestReceivedOffset( | |
| 450 connection_flow_controller_->highest_received_byte_offset() + | |
| 451 increment); | |
| 452 } | |
| 453 return true; | |
| 454 } | |
| 455 | |
| 456 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) { | |
| 457 flow_controller_.AddBytesSent(bytes); | |
| 458 if (stream_contributes_to_connection_flow_control_) { | |
| 459 connection_flow_controller_->AddBytesSent(bytes); | |
| 460 } | |
| 461 } | |
| 462 | |
| 463 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) { | |
| 464 // Only adjust stream level flow controller if still reading. | |
| 465 if (!read_side_closed_) { | |
| 466 flow_controller_.AddBytesConsumed(bytes); | |
| 467 } | |
| 468 | |
| 469 if (stream_contributes_to_connection_flow_control_) { | |
| 470 connection_flow_controller_->AddBytesConsumed(bytes); | |
| 471 } | |
| 472 } | |
| 473 | |
| 474 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { | |
| 475 if (flow_controller_.UpdateSendWindowOffset(new_window)) { | |
| 476 OnCanWrite(); | |
| 477 } | |
| 478 } | |
| 479 | |
| 480 } // namespace net | |
| OLD | NEW |