Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(216)

Side by Side Diff: net/quic/core/reliable_quic_stream.cc

Issue 2487613002: Landing Recent QUIC changes until 12:43 PM, Nov 5, 2016 UTC+8 (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/quic/core/reliable_quic_stream.h ('k') | net/quic/core/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/core/reliable_quic_stream.h"
6
7 #include "base/logging.h"
8 #include "net/quic/core/quic_bug_tracker.h"
9 #include "net/quic/core/quic_flags.h"
10 #include "net/quic/core/quic_flow_controller.h"
11 #include "net/quic/core/quic_session.h"
12 #include "net/quic/core/quic_write_blocked_list.h"
13
14 using base::StringPiece;
15 using std::min;
16 using std::string;
17
18 namespace net {
19
20 #define ENDPOINT \
21 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
22
23 namespace {
24
25 struct iovec MakeIovec(StringPiece data) {
26 struct iovec iov = {const_cast<char*>(data.data()),
27 static_cast<size_t>(data.size())};
28 return iov;
29 }
30
31 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
32 return session->config()->GetInitialStreamFlowControlWindowToSend();
33 }
34
35 size_t GetReceivedFlowControlWindow(QuicSession* session) {
36 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
37 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
38 }
39
40 return kMinimumFlowControlSendWindow;
41 }
42
43 } // namespace
44
45 ReliableQuicStream::PendingData::PendingData(
46 string data_in,
47 QuicAckListenerInterface* ack_listener_in)
48 : data(std::move(data_in)), offset(0), ack_listener(ack_listener_in) {}
49
50 ReliableQuicStream::PendingData::~PendingData() {}
51
52 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
53 : queued_data_bytes_(0),
54 sequencer_(this, session->connection()->clock()),
55 id_(id),
56 session_(session),
57 stream_bytes_read_(0),
58 stream_bytes_written_(0),
59 stream_error_(QUIC_STREAM_NO_ERROR),
60 connection_error_(QUIC_NO_ERROR),
61 read_side_closed_(false),
62 write_side_closed_(false),
63 fin_buffered_(false),
64 fin_sent_(false),
65 fin_received_(false),
66 rst_sent_(false),
67 rst_received_(false),
68 perspective_(session_->perspective()),
69 flow_controller_(session_->connection(),
70 id_,
71 perspective_,
72 GetReceivedFlowControlWindow(session),
73 GetInitialStreamFlowControlWindowToSend(session),
74 session_->flow_controller()->auto_tune_receive_window()),
75 connection_flow_controller_(session_->flow_controller()),
76 stream_contributes_to_connection_flow_control_(true),
77 busy_counter_(0) {
78 SetFromConfig();
79 }
80
81 ReliableQuicStream::~ReliableQuicStream() {}
82
83 void ReliableQuicStream::SetFromConfig() {}
84
85 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
86 DCHECK_EQ(frame.stream_id, id_);
87
88 DCHECK(!(read_side_closed_ && write_side_closed_));
89
90 if (frame.fin) {
91 fin_received_ = true;
92 if (fin_sent_) {
93 session_->StreamDraining(id_);
94 }
95 }
96
97 if (read_side_closed_) {
98 DVLOG(1) << ENDPOINT << "Ignoring data in frame " << frame.stream_id;
99 // The subclass does not want to read data: blackhole the data.
100 return;
101 }
102
103 // This count includes duplicate data received.
104 size_t frame_payload_size = frame.data_length;
105 stream_bytes_read_ += frame_payload_size;
106
107 // Flow control is interested in tracking highest received offset.
108 // Only interested in received frames that carry data.
109 if (frame_payload_size > 0 &&
110 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
111 // As the highest received offset has changed, check to see if this is a
112 // violation of flow control.
113 if (flow_controller_.FlowControlViolation() ||
114 connection_flow_controller_->FlowControlViolation()) {
115 CloseConnectionWithDetails(
116 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
117 "Flow control violation after increasing offset");
118 return;
119 }
120 }
121
122 sequencer_.OnStreamFrame(frame);
123 }
124
125 int ReliableQuicStream::num_frames_received() const {
126 return sequencer_.num_frames_received();
127 }
128
129 int ReliableQuicStream::num_duplicate_frames_received() const {
130 return sequencer_.num_duplicate_frames_received();
131 }
132
133 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
134 rst_received_ = true;
135 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
136
137 stream_error_ = frame.error_code;
138 CloseWriteSide();
139 CloseReadSide();
140 }
141
142 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
143 ConnectionCloseSource /*source*/) {
144 if (read_side_closed_ && write_side_closed_) {
145 return;
146 }
147 if (error != QUIC_NO_ERROR) {
148 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
149 connection_error_ = error;
150 }
151
152 CloseWriteSide();
153 CloseReadSide();
154 }
155
156 void ReliableQuicStream::OnFinRead() {
157 DCHECK(sequencer_.IsClosed());
158 // OnFinRead can be called due to a FIN flag in a headers block, so there may
159 // have been no OnStreamFrame call with a FIN in the frame.
160 fin_received_ = true;
161 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
162 // stream will be destroyed by CloseReadSide, so don't need to call
163 // StreamDraining.
164 CloseReadSide();
165 }
166
167 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
168 stream_error_ = error;
169 // Sending a RstStream results in calling CloseStream.
170 session()->SendRstStream(id(), error, stream_bytes_written_);
171 rst_sent_ = true;
172 }
173
174 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
175 const string& details) {
176 session()->connection()->CloseConnection(
177 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
178 }
179
180 void ReliableQuicStream::WriteOrBufferData(
181 StringPiece data,
182 bool fin,
183 QuicAckListenerInterface* ack_listener) {
184 if (data.empty() && !fin) {
185 QUIC_BUG << "data.empty() && !fin";
186 return;
187 }
188
189 if (fin_buffered_) {
190 QUIC_BUG << "Fin already buffered";
191 return;
192 }
193 if (write_side_closed_) {
194 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
195 return;
196 }
197
198 QuicConsumedData consumed_data(0, false);
199 fin_buffered_ = fin;
200
201 if (queued_data_.empty()) {
202 struct iovec iov(MakeIovec(data));
203 consumed_data = WritevData(&iov, 1, fin, ack_listener);
204 DCHECK_LE(consumed_data.bytes_consumed, data.length());
205 }
206
207 // If there's unconsumed data or an unconsumed fin, queue it.
208 if (consumed_data.bytes_consumed < data.length() ||
209 (fin && !consumed_data.fin_consumed)) {
210 StringPiece remainder(data.substr(consumed_data.bytes_consumed));
211 queued_data_bytes_ += remainder.size();
212 queued_data_.emplace_back(remainder.as_string(), ack_listener);
213 }
214 }
215
216 void ReliableQuicStream::OnCanWrite() {
217 bool fin = false;
218 while (!queued_data_.empty()) {
219 PendingData* pending_data = &queued_data_.front();
220 QuicAckListenerInterface* ack_listener = pending_data->ack_listener.get();
221 if (queued_data_.size() == 1 && fin_buffered_) {
222 fin = true;
223 }
224 if (pending_data->offset > 0 &&
225 pending_data->offset >= pending_data->data.size()) {
226 // This should be impossible because offset tracks the amount of
227 // pending_data written thus far.
228 QUIC_BUG << "Pending offset is beyond available data. offset: "
229 << pending_data->offset << " vs: " << pending_data->data.size();
230 return;
231 }
232 size_t remaining_len = pending_data->data.size() - pending_data->offset;
233 struct iovec iov = {
234 const_cast<char*>(pending_data->data.data()) + pending_data->offset,
235 remaining_len};
236 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, ack_listener);
237 queued_data_bytes_ -= consumed_data.bytes_consumed;
238 if (consumed_data.bytes_consumed == remaining_len &&
239 fin == consumed_data.fin_consumed) {
240 queued_data_.pop_front();
241 } else {
242 if (consumed_data.bytes_consumed > 0) {
243 pending_data->offset += consumed_data.bytes_consumed;
244 }
245 break;
246 }
247 }
248 }
249
250 void ReliableQuicStream::MaybeSendBlocked() {
251 flow_controller_.MaybeSendBlocked();
252 if (!stream_contributes_to_connection_flow_control_) {
253 return;
254 }
255 connection_flow_controller_->MaybeSendBlocked();
256 // If the stream is blocked by connection-level flow control but not by
257 // stream-level flow control, add the stream to the write blocked list so that
258 // the stream will be given a chance to write when a connection-level
259 // WINDOW_UPDATE arrives.
260 if (connection_flow_controller_->IsBlocked() &&
261 !flow_controller_.IsBlocked()) {
262 session_->MarkConnectionLevelWriteBlocked(id());
263 }
264 }
265
266 QuicConsumedData ReliableQuicStream::WritevData(
267 const struct iovec* iov,
268 int iov_count,
269 bool fin,
270 QuicAckListenerInterface* ack_listener) {
271 if (write_side_closed_) {
272 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
273 return QuicConsumedData(0, false);
274 }
275
276 // How much data was provided.
277 size_t write_length = 0;
278 if (iov != nullptr) {
279 for (int i = 0; i < iov_count; ++i) {
280 write_length += iov[i].iov_len;
281 }
282 }
283
284 // A FIN with zero data payload should not be flow control blocked.
285 bool fin_with_zero_data = (fin && write_length == 0);
286
287 // How much data flow control permits to be written.
288 QuicByteCount send_window = flow_controller_.SendWindowSize();
289 if (stream_contributes_to_connection_flow_control_) {
290 send_window =
291 min(send_window, connection_flow_controller_->SendWindowSize());
292 }
293
294 if (session_->ShouldYield(id())) {
295 session_->MarkConnectionLevelWriteBlocked(id());
296 return QuicConsumedData(0, false);
297 }
298
299 if (send_window == 0 && !fin_with_zero_data) {
300 // Quick return if nothing can be sent.
301 MaybeSendBlocked();
302 return QuicConsumedData(0, false);
303 }
304
305 if (write_length > send_window) {
306 // Don't send the FIN unless all the data will be sent.
307 fin = false;
308
309 // Writing more data would be a violation of flow control.
310 write_length = static_cast<size_t>(send_window);
311 DVLOG(1) << "stream " << id() << " shortens write length to "
312 << write_length << " due to flow control";
313 }
314
315 QuicConsumedData consumed_data =
316 WritevDataInner(QuicIOVector(iov, iov_count, write_length),
317 stream_bytes_written_, fin, ack_listener);
318 stream_bytes_written_ += consumed_data.bytes_consumed;
319
320 AddBytesSent(consumed_data.bytes_consumed);
321
322 // The write may have generated a write error causing this stream to be
323 // closed. If so, simply return without marking the stream write blocked.
324 if (write_side_closed_) {
325 return consumed_data;
326 }
327
328 if (consumed_data.bytes_consumed == write_length) {
329 if (!fin_with_zero_data) {
330 MaybeSendBlocked();
331 }
332 if (fin && consumed_data.fin_consumed) {
333 fin_sent_ = true;
334 if (fin_received_) {
335 session_->StreamDraining(id_);
336 }
337 CloseWriteSide();
338 } else if (fin && !consumed_data.fin_consumed) {
339 session_->MarkConnectionLevelWriteBlocked(id());
340 }
341 } else {
342 session_->MarkConnectionLevelWriteBlocked(id());
343 }
344 return consumed_data;
345 }
346
347 QuicConsumedData ReliableQuicStream::WritevDataInner(
348 QuicIOVector iov,
349 QuicStreamOffset offset,
350 bool fin,
351 QuicAckListenerInterface* ack_notifier_delegate) {
352 return session()->WritevData(this, id(), iov, offset, fin,
353 ack_notifier_delegate);
354 }
355
356 void ReliableQuicStream::CloseReadSide() {
357 if (read_side_closed_) {
358 return;
359 }
360 DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
361
362 read_side_closed_ = true;
363 sequencer_.ReleaseBuffer();
364
365 if (write_side_closed_) {
366 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
367 session_->CloseStream(id());
368 }
369 }
370
371 void ReliableQuicStream::CloseWriteSide() {
372 if (write_side_closed_) {
373 return;
374 }
375 DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
376
377 write_side_closed_ = true;
378 if (read_side_closed_) {
379 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
380 session_->CloseStream(id());
381 }
382 }
383
384 bool ReliableQuicStream::HasBufferedData() const {
385 return !queued_data_.empty();
386 }
387
388 QuicVersion ReliableQuicStream::version() const {
389 return session_->connection()->version();
390 }
391
392 void ReliableQuicStream::StopReading() {
393 DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
394 sequencer_.StopReading();
395 }
396
397 const IPEndPoint& ReliableQuicStream::PeerAddressOfLatestPacket() const {
398 return session_->connection()->last_packet_source_address();
399 }
400
401 void ReliableQuicStream::OnClose() {
402 CloseReadSide();
403 CloseWriteSide();
404
405 if (!fin_sent_ && !rst_sent_) {
406 // For flow control accounting, tell the peer how many bytes have been
407 // written on this stream before termination. Done here if needed, using a
408 // RST_STREAM frame.
409 DVLOG(1) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
410 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
411 stream_bytes_written_);
412 rst_sent_ = true;
413 }
414
415 // The stream is being closed and will not process any further incoming bytes.
416 // As there may be more bytes in flight, to ensure that both endpoints have
417 // the same connection level flow control state, mark all unreceived or
418 // buffered bytes as consumed.
419 QuicByteCount bytes_to_consume =
420 flow_controller_.highest_received_byte_offset() -
421 flow_controller_.bytes_consumed();
422 AddBytesConsumed(bytes_to_consume);
423 }
424
425 void ReliableQuicStream::OnWindowUpdateFrame(
426 const QuicWindowUpdateFrame& frame) {
427 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
428 // Writing can be done again!
429 // TODO(rjshade): This does not respect priorities (e.g. multiple
430 // outstanding POSTs are unblocked on arrival of
431 // SHLO with initial window).
432 // As long as the connection is not flow control blocked, write on!
433 OnCanWrite();
434 }
435 }
436
437 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset(
438 QuicStreamOffset new_offset) {
439 uint64_t increment =
440 new_offset - flow_controller_.highest_received_byte_offset();
441 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
442 return false;
443 }
444
445 // If |new_offset| increased the stream flow controller's highest received
446 // offset, increase the connection flow controller's value by the incremental
447 // difference.
448 if (stream_contributes_to_connection_flow_control_) {
449 connection_flow_controller_->UpdateHighestReceivedOffset(
450 connection_flow_controller_->highest_received_byte_offset() +
451 increment);
452 }
453 return true;
454 }
455
456 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) {
457 flow_controller_.AddBytesSent(bytes);
458 if (stream_contributes_to_connection_flow_control_) {
459 connection_flow_controller_->AddBytesSent(bytes);
460 }
461 }
462
463 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) {
464 // Only adjust stream level flow controller if still reading.
465 if (!read_side_closed_) {
466 flow_controller_.AddBytesConsumed(bytes);
467 }
468
469 if (stream_contributes_to_connection_flow_control_) {
470 connection_flow_controller_->AddBytesConsumed(bytes);
471 }
472 }
473
474 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) {
475 if (flow_controller_.UpdateSendWindowOffset(new_window)) {
476 OnCanWrite();
477 }
478 }
479
480 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/core/reliable_quic_stream.h ('k') | net/quic/core/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698