OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/quic/reliable_quic_stream.h" | |
6 | |
7 #include "base/logging.h" | |
8 #include "base/profiler/scoped_tracker.h" | |
9 #include "net/quic/iovector.h" | |
10 #include "net/quic/quic_flow_controller.h" | |
11 #include "net/quic/quic_session.h" | |
12 #include "net/quic/quic_write_blocked_list.h" | |
13 | |
14 using base::StringPiece; | |
15 using std::min; | |
16 using std::string; | |
17 | |
18 namespace net { | |
19 | |
20 #define ENDPOINT (is_server_ ? "Server: " : " Client: ") | |
21 | |
22 namespace { | |
23 | |
24 struct iovec MakeIovec(StringPiece data) { | |
25 struct iovec iov = {const_cast<char*>(data.data()), | |
26 static_cast<size_t>(data.size())}; | |
27 return iov; | |
28 } | |
29 | |
30 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) { | |
31 return session->config()->GetInitialStreamFlowControlWindowToSend(); | |
32 } | |
33 | |
34 size_t GetReceivedFlowControlWindow(QuicSession* session) { | |
35 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { | |
36 return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); | |
37 } | |
38 | |
39 return kMinimumFlowControlSendWindow; | |
40 } | |
41 | |
42 } // namespace | |
43 | |
44 // Wrapper that aggregates OnAckNotifications for packets sent using | |
45 // WriteOrBufferData and delivers them to the original | |
46 // QuicAckNotifier::DelegateInterface after all bytes written using | |
47 // WriteOrBufferData are acked. This level of indirection is | |
48 // necessary because the delegate interface provides no mechanism that | |
49 // WriteOrBufferData can use to inform it that the write required | |
50 // multiple WritevData calls or that only part of the data has been | |
51 // sent out by the time ACKs start arriving. | |
52 class ReliableQuicStream::ProxyAckNotifierDelegate | |
53 : public QuicAckNotifier::DelegateInterface { | |
54 public: | |
55 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate) | |
56 : delegate_(delegate), | |
57 pending_acks_(0), | |
58 wrote_last_data_(false), | |
59 num_retransmitted_packets_(0), | |
60 num_retransmitted_bytes_(0) { | |
61 } | |
62 | |
63 void OnAckNotification(int num_retransmitted_packets, | |
64 int num_retransmitted_bytes, | |
65 QuicTime::Delta delta_largest_observed) override { | |
66 DCHECK_LT(0, pending_acks_); | |
67 --pending_acks_; | |
68 num_retransmitted_packets_ += num_retransmitted_packets; | |
69 num_retransmitted_bytes_ += num_retransmitted_bytes; | |
70 | |
71 if (wrote_last_data_ && pending_acks_ == 0) { | |
72 delegate_->OnAckNotification(num_retransmitted_packets_, | |
73 num_retransmitted_bytes_, | |
74 delta_largest_observed); | |
75 } | |
76 } | |
77 | |
78 void WroteData(bool last_data) { | |
79 DCHECK(!wrote_last_data_); | |
80 ++pending_acks_; | |
81 wrote_last_data_ = last_data; | |
82 } | |
83 | |
84 protected: | |
85 // Delegates are ref counted. | |
86 ~ProxyAckNotifierDelegate() override {} | |
87 | |
88 private: | |
89 // Original delegate. delegate_->OnAckNotification will be called when: | |
90 // wrote_last_data_ == true and pending_acks_ == 0 | |
91 scoped_refptr<DelegateInterface> delegate_; | |
92 | |
93 // Number of outstanding acks. | |
94 int pending_acks_; | |
95 | |
96 // True if no pending writes remain. | |
97 bool wrote_last_data_; | |
98 | |
99 // Accumulators. | |
100 int num_original_packets_; | |
101 int num_original_bytes_; | |
102 int num_retransmitted_packets_; | |
103 int num_retransmitted_bytes_; | |
104 | |
105 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate); | |
106 }; | |
107 | |
108 ReliableQuicStream::PendingData::PendingData( | |
109 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in) | |
110 : data(data_in), delegate(delegate_in) { | |
111 } | |
112 | |
113 ReliableQuicStream::PendingData::~PendingData() { | |
114 } | |
115 | |
116 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | |
117 : sequencer_(this), | |
118 id_(id), | |
119 session_(session), | |
120 stream_bytes_read_(0), | |
121 stream_bytes_written_(0), | |
122 stream_error_(QUIC_STREAM_NO_ERROR), | |
123 connection_error_(QUIC_NO_ERROR), | |
124 read_side_closed_(false), | |
125 write_side_closed_(false), | |
126 fin_buffered_(false), | |
127 fin_sent_(false), | |
128 fin_received_(false), | |
129 rst_sent_(false), | |
130 rst_received_(false), | |
131 fec_policy_(FEC_PROTECT_OPTIONAL), | |
132 is_server_(session_->is_server()), | |
133 flow_controller_( | |
134 session_->connection(), id_, is_server_, | |
135 GetReceivedFlowControlWindow(session), | |
136 GetInitialStreamFlowControlWindowToSend(session), | |
137 GetInitialStreamFlowControlWindowToSend(session)), | |
138 connection_flow_controller_(session_->flow_controller()), | |
139 stream_contributes_to_connection_flow_control_(true) { | |
140 } | |
141 | |
142 ReliableQuicStream::~ReliableQuicStream() { | |
143 } | |
144 | |
145 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { | |
146 if (read_side_closed_) { | |
147 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id; | |
148 // We don't want to be reading: blackhole the data. | |
149 return; | |
150 } | |
151 | |
152 if (frame.stream_id != id_) { | |
153 session_->connection()->SendConnectionClose(QUIC_INTERNAL_ERROR); | |
154 return; | |
155 } | |
156 | |
157 if (frame.fin) { | |
158 fin_received_ = true; | |
159 } | |
160 | |
161 // This count include duplicate data received. | |
162 size_t frame_payload_size = frame.data.TotalBufferSize(); | |
163 stream_bytes_read_ += frame_payload_size; | |
164 | |
165 // Flow control is interested in tracking highest received offset. | |
166 if (MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { | |
167 // As the highest received offset has changed, we should check to see if | |
168 // this is a violation of flow control. | |
169 if (flow_controller_.FlowControlViolation() || | |
170 connection_flow_controller_->FlowControlViolation()) { | |
171 session_->connection()->SendConnectionClose( | |
172 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA); | |
173 return; | |
174 } | |
175 } | |
176 | |
177 sequencer_.OnStreamFrame(frame); | |
178 } | |
179 | |
180 int ReliableQuicStream::num_frames_received() const { | |
181 return sequencer_.num_frames_received(); | |
182 } | |
183 | |
184 int ReliableQuicStream::num_duplicate_frames_received() const { | |
185 return sequencer_.num_duplicate_frames_received(); | |
186 } | |
187 | |
188 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { | |
189 rst_received_ = true; | |
190 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); | |
191 | |
192 stream_error_ = frame.error_code; | |
193 CloseWriteSide(); | |
194 CloseReadSide(); | |
195 } | |
196 | |
197 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, | |
198 bool from_peer) { | |
199 if (read_side_closed_ && write_side_closed_) { | |
200 return; | |
201 } | |
202 if (error != QUIC_NO_ERROR) { | |
203 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; | |
204 connection_error_ = error; | |
205 } | |
206 | |
207 CloseWriteSide(); | |
208 CloseReadSide(); | |
209 } | |
210 | |
211 void ReliableQuicStream::OnFinRead() { | |
212 DCHECK(sequencer_.IsClosed()); | |
213 fin_received_ = true; | |
214 CloseReadSide(); | |
215 } | |
216 | |
217 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { | |
218 DCHECK_NE(QUIC_STREAM_NO_ERROR, error); | |
219 stream_error_ = error; | |
220 // Sending a RstStream results in calling CloseStream. | |
221 session()->SendRstStream(id(), error, stream_bytes_written_); | |
222 rst_sent_ = true; | |
223 } | |
224 | |
225 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { | |
226 // TODO(vadimt): Remove ScopedTracker below once crbug.com/422516 is fixed. | |
227 tracked_objects::ScopedTracker tracking_profile( | |
228 FROM_HERE_WITH_EXPLICIT_FUNCTION( | |
229 "422516 ReliableQuicStream::CloseConnection")); | |
230 | |
231 session()->connection()->SendConnectionClose(error); | |
232 } | |
233 | |
234 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, | |
235 const string& details) { | |
236 session()->connection()->SendConnectionCloseWithDetails(error, details); | |
237 } | |
238 | |
239 QuicVersion ReliableQuicStream::version() const { | |
240 return session()->connection()->version(); | |
241 } | |
242 | |
243 void ReliableQuicStream::WriteOrBufferData( | |
244 StringPiece data, | |
245 bool fin, | |
246 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | |
247 if (data.empty() && !fin) { | |
248 LOG(DFATAL) << "data.empty() && !fin"; | |
249 return; | |
250 } | |
251 | |
252 if (fin_buffered_) { | |
253 LOG(DFATAL) << "Fin already buffered"; | |
254 return; | |
255 } | |
256 | |
257 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate; | |
258 if (ack_notifier_delegate != nullptr) { | |
259 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate); | |
260 } | |
261 | |
262 QuicConsumedData consumed_data(0, false); | |
263 fin_buffered_ = fin; | |
264 | |
265 if (queued_data_.empty()) { | |
266 struct iovec iov(MakeIovec(data)); | |
267 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get()); | |
268 DCHECK_LE(consumed_data.bytes_consumed, data.length()); | |
269 } | |
270 | |
271 bool write_completed; | |
272 // If there's unconsumed data or an unconsumed fin, queue it. | |
273 if (consumed_data.bytes_consumed < data.length() || | |
274 (fin && !consumed_data.fin_consumed)) { | |
275 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); | |
276 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate)); | |
277 write_completed = false; | |
278 } else { | |
279 write_completed = true; | |
280 } | |
281 | |
282 if ((proxy_delegate.get() != nullptr) && | |
283 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) { | |
284 proxy_delegate->WroteData(write_completed); | |
285 } | |
286 } | |
287 | |
288 void ReliableQuicStream::OnCanWrite() { | |
289 bool fin = false; | |
290 while (!queued_data_.empty()) { | |
291 PendingData* pending_data = &queued_data_.front(); | |
292 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get(); | |
293 if (queued_data_.size() == 1 && fin_buffered_) { | |
294 fin = true; | |
295 } | |
296 struct iovec iov(MakeIovec(pending_data->data)); | |
297 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate); | |
298 if (consumed_data.bytes_consumed == pending_data->data.size() && | |
299 fin == consumed_data.fin_consumed) { | |
300 queued_data_.pop_front(); | |
301 if (delegate != nullptr) { | |
302 delegate->WroteData(true); | |
303 } | |
304 } else { | |
305 if (consumed_data.bytes_consumed > 0) { | |
306 pending_data->data.erase(0, consumed_data.bytes_consumed); | |
307 if (delegate != nullptr) { | |
308 delegate->WroteData(false); | |
309 } | |
310 } | |
311 break; | |
312 } | |
313 } | |
314 } | |
315 | |
316 void ReliableQuicStream::MaybeSendBlocked() { | |
317 flow_controller_.MaybeSendBlocked(); | |
318 if (!stream_contributes_to_connection_flow_control_) { | |
319 return; | |
320 } | |
321 connection_flow_controller_->MaybeSendBlocked(); | |
322 // If we are connection level flow control blocked, then add the stream | |
323 // to the write blocked list. It will be given a chance to write when a | |
324 // connection level WINDOW_UPDATE arrives. | |
325 if (connection_flow_controller_->IsBlocked() && | |
326 !flow_controller_.IsBlocked()) { | |
327 session_->MarkWriteBlocked(id(), EffectivePriority()); | |
328 } | |
329 } | |
330 | |
331 QuicConsumedData ReliableQuicStream::WritevData( | |
332 const struct iovec* iov, | |
333 int iov_count, | |
334 bool fin, | |
335 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { | |
336 if (write_side_closed_) { | |
337 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | |
338 return QuicConsumedData(0, false); | |
339 } | |
340 | |
341 // How much data we want to write. | |
342 size_t write_length = TotalIovecLength(iov, iov_count); | |
343 | |
344 // A FIN with zero data payload should not be flow control blocked. | |
345 bool fin_with_zero_data = (fin && write_length == 0); | |
346 | |
347 if (flow_controller_.IsEnabled()) { | |
348 // How much data we are allowed to write from flow control. | |
349 QuicByteCount send_window = flow_controller_.SendWindowSize(); | |
350 if (stream_contributes_to_connection_flow_control_) { | |
351 send_window = | |
352 min(send_window, connection_flow_controller_->SendWindowSize()); | |
353 } | |
354 | |
355 if (send_window == 0 && !fin_with_zero_data) { | |
356 // Quick return if we can't send anything. | |
357 MaybeSendBlocked(); | |
358 return QuicConsumedData(0, false); | |
359 } | |
360 | |
361 if (write_length > send_window) { | |
362 // Don't send the FIN if we aren't going to send all the data. | |
363 fin = false; | |
364 | |
365 // Writing more data would be a violation of flow control. | |
366 write_length = static_cast<size_t>(send_window); | |
367 } | |
368 } | |
369 | |
370 // Fill an IOVector with bytes from the iovec. | |
371 IOVector data; | |
372 data.AppendIovecAtMostBytes(iov, iov_count, write_length); | |
373 | |
374 QuicConsumedData consumed_data = session()->WritevData( | |
375 id(), data, stream_bytes_written_, fin, GetFecProtection(), | |
376 ack_notifier_delegate); | |
377 stream_bytes_written_ += consumed_data.bytes_consumed; | |
378 | |
379 AddBytesSent(consumed_data.bytes_consumed); | |
380 | |
381 if (consumed_data.bytes_consumed == write_length) { | |
382 if (!fin_with_zero_data) { | |
383 MaybeSendBlocked(); | |
384 } | |
385 if (fin && consumed_data.fin_consumed) { | |
386 fin_sent_ = true; | |
387 CloseWriteSide(); | |
388 } else if (fin && !consumed_data.fin_consumed) { | |
389 session_->MarkWriteBlocked(id(), EffectivePriority()); | |
390 } | |
391 } else { | |
392 session_->MarkWriteBlocked(id(), EffectivePriority()); | |
393 } | |
394 return consumed_data; | |
395 } | |
396 | |
397 FecProtection ReliableQuicStream::GetFecProtection() { | |
398 return fec_policy_ == FEC_PROTECT_ALWAYS ? MUST_FEC_PROTECT : MAY_FEC_PROTECT; | |
399 } | |
400 | |
401 void ReliableQuicStream::CloseReadSide() { | |
402 if (read_side_closed_) { | |
403 return; | |
404 } | |
405 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); | |
406 | |
407 read_side_closed_ = true; | |
408 if (write_side_closed_) { | |
409 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
410 session_->CloseStream(id()); | |
411 } | |
412 } | |
413 | |
414 void ReliableQuicStream::CloseWriteSide() { | |
415 if (write_side_closed_) { | |
416 return; | |
417 } | |
418 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); | |
419 | |
420 write_side_closed_ = true; | |
421 if (read_side_closed_) { | |
422 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
423 session_->CloseStream(id()); | |
424 } | |
425 } | |
426 | |
427 bool ReliableQuicStream::HasBufferedData() const { | |
428 return !queued_data_.empty(); | |
429 } | |
430 | |
431 void ReliableQuicStream::OnClose() { | |
432 CloseReadSide(); | |
433 CloseWriteSide(); | |
434 | |
435 if (!fin_sent_ && !rst_sent_) { | |
436 // For flow control accounting, we must tell the peer how many bytes we have | |
437 // written on this stream before termination. Done here if needed, using a | |
438 // RST frame. | |
439 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id(); | |
440 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, | |
441 stream_bytes_written_); | |
442 rst_sent_ = true; | |
443 } | |
444 | |
445 // We are closing the stream and will not process any further incoming bytes. | |
446 // As there may be more bytes in flight and we need to ensure that both | |
447 // endpoints have the same connection level flow control state, mark all | |
448 // unreceived or buffered bytes as consumed. | |
449 QuicByteCount bytes_to_consume = | |
450 flow_controller_.highest_received_byte_offset() - | |
451 flow_controller_.bytes_consumed(); | |
452 AddBytesConsumed(bytes_to_consume); | |
453 } | |
454 | |
455 void ReliableQuicStream::OnWindowUpdateFrame( | |
456 const QuicWindowUpdateFrame& frame) { | |
457 if (!flow_controller_.IsEnabled()) { | |
458 DLOG(DFATAL) << "Flow control not enabled! " << version(); | |
459 return; | |
460 } | |
461 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { | |
462 // We can write again! | |
463 // TODO(rjshade): This does not respect priorities (e.g. multiple | |
464 // outstanding POSTs are unblocked on arrival of | |
465 // SHLO with initial window). | |
466 // As long as the connection is not flow control blocked, we can write! | |
467 OnCanWrite(); | |
468 } | |
469 } | |
470 | |
471 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset( | |
472 QuicStreamOffset new_offset) { | |
473 if (!flow_controller_.IsEnabled()) { | |
474 return false; | |
475 } | |
476 uint64 increment = | |
477 new_offset - flow_controller_.highest_received_byte_offset(); | |
478 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { | |
479 return false; | |
480 } | |
481 | |
482 // If |new_offset| increased the stream flow controller's highest received | |
483 // offset, then we need to increase the connection flow controller's value | |
484 // by the incremental difference. | |
485 if (stream_contributes_to_connection_flow_control_) { | |
486 connection_flow_controller_->UpdateHighestReceivedOffset( | |
487 connection_flow_controller_->highest_received_byte_offset() + | |
488 increment); | |
489 } | |
490 return true; | |
491 } | |
492 | |
493 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) { | |
494 if (flow_controller_.IsEnabled()) { | |
495 flow_controller_.AddBytesSent(bytes); | |
496 if (stream_contributes_to_connection_flow_control_) { | |
497 connection_flow_controller_->AddBytesSent(bytes); | |
498 } | |
499 } | |
500 } | |
501 | |
502 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) { | |
503 if (flow_controller_.IsEnabled()) { | |
504 // Only adjust stream level flow controller if we are still reading. | |
505 if (!read_side_closed_) { | |
506 flow_controller_.AddBytesConsumed(bytes); | |
507 } | |
508 | |
509 if (stream_contributes_to_connection_flow_control_) { | |
510 connection_flow_controller_->AddBytesConsumed(bytes); | |
511 } | |
512 } | |
513 } | |
514 | |
515 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { | |
516 if (flow_controller_.UpdateSendWindowOffset(new_window)) { | |
517 OnCanWrite(); | |
518 } | |
519 } | |
520 | |
521 bool ReliableQuicStream::IsFlowControlBlocked() { | |
522 if (flow_controller_.IsBlocked()) { | |
523 return true; | |
524 } | |
525 return stream_contributes_to_connection_flow_control_ && | |
526 connection_flow_controller_->IsBlocked(); | |
527 } | |
528 | |
529 } // namespace net | |
OLD | NEW |