OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/quic/reliable_quic_stream.h" | |
6 | |
7 #include "base/logging.h" | |
8 #include "net/quic/iovector.h" | |
9 #include "net/quic/quic_bug_tracker.h" | |
10 #include "net/quic/quic_flags.h" | |
11 #include "net/quic/quic_flow_controller.h" | |
12 #include "net/quic/quic_session.h" | |
13 #include "net/quic/quic_write_blocked_list.h" | |
14 | |
15 using base::StringPiece; | |
16 using std::min; | |
17 using std::string; | |
18 | |
19 namespace net { | |
20 | |
21 #define ENDPOINT \ | |
22 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ") | |
23 | |
24 namespace { | |
25 | |
26 struct iovec MakeIovec(StringPiece data) { | |
27 struct iovec iov = {const_cast<char*>(data.data()), | |
28 static_cast<size_t>(data.size())}; | |
29 return iov; | |
30 } | |
31 | |
32 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) { | |
33 return session->config()->GetInitialStreamFlowControlWindowToSend(); | |
34 } | |
35 | |
36 size_t GetReceivedFlowControlWindow(QuicSession* session) { | |
37 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { | |
38 return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); | |
39 } | |
40 | |
41 return kMinimumFlowControlSendWindow; | |
42 } | |
43 | |
44 } // namespace | |
45 | |
46 ReliableQuicStream::PendingData::PendingData( | |
47 string data_in, | |
48 QuicAckListenerInterface* ack_listener_in) | |
49 : data(data_in), offset(0), ack_listener(ack_listener_in) {} | |
50 | |
51 ReliableQuicStream::PendingData::~PendingData() {} | |
52 | |
53 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session) | |
54 : queued_data_bytes_(0), | |
55 sequencer_(this, session->connection()->clock()), | |
56 id_(id), | |
57 session_(session), | |
58 stream_bytes_read_(0), | |
59 stream_bytes_written_(0), | |
60 stream_error_(QUIC_STREAM_NO_ERROR), | |
61 connection_error_(QUIC_NO_ERROR), | |
62 read_side_closed_(false), | |
63 write_side_closed_(false), | |
64 fin_buffered_(false), | |
65 fin_sent_(false), | |
66 fin_received_(false), | |
67 rst_sent_(false), | |
68 rst_received_(false), | |
69 perspective_(session_->perspective()), | |
70 flow_controller_(session_->connection(), | |
71 id_, | |
72 perspective_, | |
73 GetReceivedFlowControlWindow(session), | |
74 GetInitialStreamFlowControlWindowToSend(session), | |
75 session_->flow_controller()->auto_tune_receive_window()), | |
76 connection_flow_controller_(session_->flow_controller()), | |
77 stream_contributes_to_connection_flow_control_(true) { | |
78 SetFromConfig(); | |
79 } | |
80 | |
81 ReliableQuicStream::~ReliableQuicStream() {} | |
82 | |
83 void ReliableQuicStream::SetFromConfig() {} | |
84 | |
85 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) { | |
86 DCHECK_EQ(frame.stream_id, id_); | |
87 | |
88 DCHECK(!(read_side_closed_ && write_side_closed_)); | |
89 | |
90 if (frame.fin) { | |
91 fin_received_ = true; | |
92 if (fin_sent_) { | |
93 session_->StreamDraining(id_); | |
94 } | |
95 } | |
96 | |
97 if (read_side_closed_) { | |
98 DVLOG(1) << ENDPOINT << "Ignoring data in frame " << frame.stream_id; | |
99 // The subclass does not want to read data: blackhole the data. | |
100 return; | |
101 } | |
102 | |
103 // This count includes duplicate data received. | |
104 size_t frame_payload_size = frame.data_length; | |
105 stream_bytes_read_ += frame_payload_size; | |
106 | |
107 // Flow control is interested in tracking highest received offset. | |
108 // Only interested in received frames that carry data. | |
109 if ((!FLAGS_quic_ignore_zero_length_frames || frame_payload_size > 0) && | |
110 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { | |
111 // As the highest received offset has changed, check to see if this is a | |
112 // violation of flow control. | |
113 if (flow_controller_.FlowControlViolation() || | |
114 connection_flow_controller_->FlowControlViolation()) { | |
115 CloseConnectionWithDetails( | |
116 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, | |
117 "Flow control violation after increasing offset"); | |
118 return; | |
119 } | |
120 } | |
121 | |
122 sequencer_.OnStreamFrame(frame); | |
123 } | |
124 | |
125 int ReliableQuicStream::num_frames_received() const { | |
126 return sequencer_.num_frames_received(); | |
127 } | |
128 | |
129 int ReliableQuicStream::num_early_frames_received() const { | |
130 return sequencer_.num_early_frames_received(); | |
131 } | |
132 | |
133 int ReliableQuicStream::num_duplicate_frames_received() const { | |
134 return sequencer_.num_duplicate_frames_received(); | |
135 } | |
136 | |
137 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { | |
138 rst_received_ = true; | |
139 MaybeIncreaseHighestReceivedOffset(frame.byte_offset); | |
140 | |
141 stream_error_ = frame.error_code; | |
142 CloseWriteSide(); | |
143 CloseReadSide(); | |
144 } | |
145 | |
146 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error, | |
147 ConnectionCloseSource /*source*/) { | |
148 if (read_side_closed_ && write_side_closed_) { | |
149 return; | |
150 } | |
151 if (error != QUIC_NO_ERROR) { | |
152 stream_error_ = QUIC_STREAM_CONNECTION_ERROR; | |
153 connection_error_ = error; | |
154 } | |
155 | |
156 CloseWriteSide(); | |
157 CloseReadSide(); | |
158 } | |
159 | |
160 void ReliableQuicStream::OnFinRead() { | |
161 DCHECK(sequencer_.IsClosed()); | |
162 // OnFinRead can be called due to a FIN flag in a headers block, so there may | |
163 // have been no OnStreamFrame call with a FIN in the frame. | |
164 fin_received_ = true; | |
165 // If fin_sent_ is true, then CloseWriteSide has already been called, and the | |
166 // stream will be destroyed by CloseReadSide, so don't need to call | |
167 // StreamDraining. | |
168 CloseReadSide(); | |
169 } | |
170 | |
171 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) { | |
172 stream_error_ = error; | |
173 // Sending a RstStream results in calling CloseStream. | |
174 session()->SendRstStream(id(), error, stream_bytes_written_); | |
175 rst_sent_ = true; | |
176 } | |
177 | |
178 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, | |
179 const string& details) { | |
180 session()->connection()->CloseConnection( | |
181 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); | |
182 } | |
183 | |
184 void ReliableQuicStream::WriteOrBufferData( | |
185 StringPiece data, | |
186 bool fin, | |
187 QuicAckListenerInterface* ack_listener) { | |
188 if (data.empty() && !fin) { | |
189 QUIC_BUG << "data.empty() && !fin"; | |
190 return; | |
191 } | |
192 | |
193 if (fin_buffered_) { | |
194 QUIC_BUG << "Fin already buffered"; | |
195 return; | |
196 } | |
197 if (write_side_closed_) { | |
198 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | |
199 return; | |
200 } | |
201 | |
202 QuicConsumedData consumed_data(0, false); | |
203 fin_buffered_ = fin; | |
204 | |
205 if (queued_data_.empty()) { | |
206 struct iovec iov(MakeIovec(data)); | |
207 consumed_data = WritevData(&iov, 1, fin, ack_listener); | |
208 DCHECK_LE(consumed_data.bytes_consumed, data.length()); | |
209 } | |
210 | |
211 // If there's unconsumed data or an unconsumed fin, queue it. | |
212 if (consumed_data.bytes_consumed < data.length() || | |
213 (fin && !consumed_data.fin_consumed)) { | |
214 StringPiece remainder(data.substr(consumed_data.bytes_consumed)); | |
215 queued_data_bytes_ += remainder.size(); | |
216 queued_data_.emplace_back(remainder.as_string(), ack_listener); | |
217 } | |
218 } | |
219 | |
220 void ReliableQuicStream::OnCanWrite() { | |
221 bool fin = false; | |
222 while (!queued_data_.empty()) { | |
223 PendingData* pending_data = &queued_data_.front(); | |
224 QuicAckListenerInterface* ack_listener = pending_data->ack_listener.get(); | |
225 if (queued_data_.size() == 1 && fin_buffered_) { | |
226 fin = true; | |
227 } | |
228 if (pending_data->offset > 0 && | |
229 pending_data->offset >= pending_data->data.size()) { | |
230 // This should be impossible because offset tracks the amount of | |
231 // pending_data written thus far. | |
232 QUIC_BUG << "Pending offset is beyond available data. offset: " | |
233 << pending_data->offset << " vs: " << pending_data->data.size(); | |
234 return; | |
235 } | |
236 size_t remaining_len = pending_data->data.size() - pending_data->offset; | |
237 struct iovec iov = { | |
238 const_cast<char*>(pending_data->data.data()) + pending_data->offset, | |
239 remaining_len}; | |
240 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, ack_listener); | |
241 queued_data_bytes_ -= consumed_data.bytes_consumed; | |
242 if (consumed_data.bytes_consumed == remaining_len && | |
243 fin == consumed_data.fin_consumed) { | |
244 queued_data_.pop_front(); | |
245 } else { | |
246 if (consumed_data.bytes_consumed > 0) { | |
247 pending_data->offset += consumed_data.bytes_consumed; | |
248 } | |
249 break; | |
250 } | |
251 } | |
252 } | |
253 | |
254 void ReliableQuicStream::MaybeSendBlocked() { | |
255 flow_controller_.MaybeSendBlocked(); | |
256 if (!stream_contributes_to_connection_flow_control_) { | |
257 return; | |
258 } | |
259 connection_flow_controller_->MaybeSendBlocked(); | |
260 // If the stream is blocked by connection-level flow control but not by | |
261 // stream-level flow control, add the stream to the write blocked list so that | |
262 // the stream will be given a chance to write when a connection-level | |
263 // WINDOW_UPDATE arrives. | |
264 if (connection_flow_controller_->IsBlocked() && | |
265 !flow_controller_.IsBlocked()) { | |
266 session_->MarkConnectionLevelWriteBlocked(id()); | |
267 } | |
268 } | |
269 | |
270 QuicConsumedData ReliableQuicStream::WritevData( | |
271 const struct iovec* iov, | |
272 int iov_count, | |
273 bool fin, | |
274 QuicAckListenerInterface* ack_listener) { | |
275 if (write_side_closed_) { | |
276 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; | |
277 return QuicConsumedData(0, false); | |
278 } | |
279 | |
280 // How much data was provided. | |
281 size_t write_length = TotalIovecLength(iov, iov_count); | |
282 | |
283 // A FIN with zero data payload should not be flow control blocked. | |
284 bool fin_with_zero_data = (fin && write_length == 0); | |
285 | |
286 // How much data flow control permits to be written. | |
287 QuicByteCount send_window = flow_controller_.SendWindowSize(); | |
288 if (stream_contributes_to_connection_flow_control_) { | |
289 send_window = | |
290 min(send_window, connection_flow_controller_->SendWindowSize()); | |
291 } | |
292 | |
293 if (session_->ShouldYield(id())) { | |
294 session_->MarkConnectionLevelWriteBlocked(id()); | |
295 return QuicConsumedData(0, false); | |
296 } | |
297 | |
298 if (send_window == 0 && !fin_with_zero_data) { | |
299 // Quick return if nothing can be sent. | |
300 MaybeSendBlocked(); | |
301 return QuicConsumedData(0, false); | |
302 } | |
303 | |
304 if (write_length > send_window) { | |
305 // Don't send the FIN unless all the data will be sent. | |
306 fin = false; | |
307 | |
308 // Writing more data would be a violation of flow control. | |
309 write_length = static_cast<size_t>(send_window); | |
310 } | |
311 | |
312 QuicConsumedData consumed_data = | |
313 WritevDataInner(QuicIOVector(iov, iov_count, write_length), | |
314 stream_bytes_written_, fin, ack_listener); | |
315 stream_bytes_written_ += consumed_data.bytes_consumed; | |
316 | |
317 AddBytesSent(consumed_data.bytes_consumed); | |
318 | |
319 // The write may have generated a write error causing this stream to be | |
320 // closed. If so, simply return without marking the stream write blocked. | |
321 if (write_side_closed_) { | |
322 return consumed_data; | |
323 } | |
324 | |
325 if (consumed_data.bytes_consumed == write_length) { | |
326 if (!fin_with_zero_data) { | |
327 MaybeSendBlocked(); | |
328 } | |
329 if (fin && consumed_data.fin_consumed) { | |
330 fin_sent_ = true; | |
331 if (fin_received_) { | |
332 session_->StreamDraining(id_); | |
333 } | |
334 CloseWriteSide(); | |
335 } else if (fin && !consumed_data.fin_consumed) { | |
336 session_->MarkConnectionLevelWriteBlocked(id()); | |
337 } | |
338 } else { | |
339 session_->MarkConnectionLevelWriteBlocked(id()); | |
340 } | |
341 return consumed_data; | |
342 } | |
343 | |
344 QuicConsumedData ReliableQuicStream::WritevDataInner( | |
345 QuicIOVector iov, | |
346 QuicStreamOffset offset, | |
347 bool fin, | |
348 QuicAckListenerInterface* ack_notifier_delegate) { | |
349 return session()->WritevData(this, id(), iov, offset, fin, | |
350 ack_notifier_delegate); | |
351 } | |
352 | |
353 void ReliableQuicStream::CloseReadSide() { | |
354 if (read_side_closed_) { | |
355 return; | |
356 } | |
357 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); | |
358 | |
359 read_side_closed_ = true; | |
360 if (write_side_closed_) { | |
361 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
362 session_->CloseStream(id()); | |
363 } | |
364 } | |
365 | |
366 void ReliableQuicStream::CloseWriteSide() { | |
367 if (write_side_closed_) { | |
368 return; | |
369 } | |
370 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); | |
371 | |
372 write_side_closed_ = true; | |
373 if (read_side_closed_) { | |
374 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); | |
375 session_->CloseStream(id()); | |
376 } | |
377 } | |
378 | |
379 bool ReliableQuicStream::HasBufferedData() const { | |
380 return !queued_data_.empty(); | |
381 } | |
382 | |
383 QuicVersion ReliableQuicStream::version() const { | |
384 return session_->connection()->version(); | |
385 } | |
386 | |
387 void ReliableQuicStream::StopReading() { | |
388 DVLOG(1) << ENDPOINT << "Stop reading from stream " << id(); | |
389 sequencer_.StopReading(); | |
390 } | |
391 | |
392 const IPEndPoint& ReliableQuicStream::PeerAddressOfLatestPacket() const { | |
393 return session_->connection()->last_packet_source_address(); | |
394 } | |
395 | |
396 void ReliableQuicStream::OnClose() { | |
397 CloseReadSide(); | |
398 CloseWriteSide(); | |
399 | |
400 if (!fin_sent_ && !rst_sent_) { | |
401 // For flow control accounting, tell the peer how many bytes have been | |
402 // written on this stream before termination. Done here if needed, using a | |
403 // RST_STREAM frame. | |
404 DVLOG(1) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id(); | |
405 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, | |
406 stream_bytes_written_); | |
407 rst_sent_ = true; | |
408 } | |
409 | |
410 // The stream is being closed and will not process any further incoming bytes. | |
411 // As there may be more bytes in flight, to ensure that both endpoints have | |
412 // the same connection level flow control state, mark all unreceived or | |
413 // buffered bytes as consumed. | |
414 QuicByteCount bytes_to_consume = | |
415 flow_controller_.highest_received_byte_offset() - | |
416 flow_controller_.bytes_consumed(); | |
417 AddBytesConsumed(bytes_to_consume); | |
418 } | |
419 | |
420 void ReliableQuicStream::OnWindowUpdateFrame( | |
421 const QuicWindowUpdateFrame& frame) { | |
422 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) { | |
423 // Writing can be done again! | |
424 // TODO(rjshade): This does not respect priorities (e.g. multiple | |
425 // outstanding POSTs are unblocked on arrival of | |
426 // SHLO with initial window). | |
427 // As long as the connection is not flow control blocked, write on! | |
428 OnCanWrite(); | |
429 } | |
430 } | |
431 | |
432 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset( | |
433 QuicStreamOffset new_offset) { | |
434 uint64_t increment = | |
435 new_offset - flow_controller_.highest_received_byte_offset(); | |
436 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { | |
437 return false; | |
438 } | |
439 | |
440 // If |new_offset| increased the stream flow controller's highest received | |
441 // offset, increase the connection flow controller's value by the incremental | |
442 // difference. | |
443 if (stream_contributes_to_connection_flow_control_) { | |
444 connection_flow_controller_->UpdateHighestReceivedOffset( | |
445 connection_flow_controller_->highest_received_byte_offset() + | |
446 increment); | |
447 } | |
448 return true; | |
449 } | |
450 | |
451 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) { | |
452 flow_controller_.AddBytesSent(bytes); | |
453 if (stream_contributes_to_connection_flow_control_) { | |
454 connection_flow_controller_->AddBytesSent(bytes); | |
455 } | |
456 } | |
457 | |
458 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) { | |
459 // Only adjust stream level flow controller if still reading. | |
460 if (!read_side_closed_) { | |
461 flow_controller_.AddBytesConsumed(bytes); | |
462 } | |
463 | |
464 if (stream_contributes_to_connection_flow_control_) { | |
465 connection_flow_controller_->AddBytesConsumed(bytes); | |
466 } | |
467 } | |
468 | |
469 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { | |
470 if (flow_controller_.UpdateSendWindowOffset(new_window)) { | |
471 OnCanWrite(); | |
472 } | |
473 } | |
474 | |
475 } // namespace net | |
OLD | NEW |