Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(159)

Side by Side Diff: net/quic/core/reliable_quic_stream.cc

Issue 2480183002: Rename ReliableQuicStream to QuicStream. No behavior change, not protected. (Closed)
Patch Set: add missing files Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/quic/core/reliable_quic_stream.h ('k') | net/quic/core/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/core/reliable_quic_stream.h"
6
7 #include "base/logging.h"
8 #include "net/quic/core/quic_bug_tracker.h"
9 #include "net/quic/core/quic_flags.h"
10 #include "net/quic/core/quic_flow_controller.h"
11 #include "net/quic/core/quic_session.h"
12 #include "net/quic/core/quic_write_blocked_list.h"
13
14 using base::StringPiece;
15 using std::min;
16 using std::string;
17
18 namespace net {
19
20 #define ENDPOINT \
21 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
22
23 namespace {
24
25 struct iovec MakeIovec(StringPiece data) {
26 struct iovec iov = {const_cast<char*>(data.data()),
27 static_cast<size_t>(data.size())};
28 return iov;
29 }
30
31 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
32 return session->config()->GetInitialStreamFlowControlWindowToSend();
33 }
34
35 size_t GetReceivedFlowControlWindow(QuicSession* session) {
36 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
37 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
38 }
39
40 return kMinimumFlowControlSendWindow;
41 }
42
43 } // namespace
44
45 ReliableQuicStream::PendingData::PendingData(
46 string data_in,
47 QuicAckListenerInterface* ack_listener_in)
48 : data(std::move(data_in)), offset(0), ack_listener(ack_listener_in) {}
49
50 ReliableQuicStream::PendingData::~PendingData() {}
51
52 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
53 : queued_data_bytes_(0),
54 sequencer_(this, session->connection()->clock()),
55 id_(id),
56 session_(session),
57 stream_bytes_read_(0),
58 stream_bytes_written_(0),
59 stream_error_(QUIC_STREAM_NO_ERROR),
60 connection_error_(QUIC_NO_ERROR),
61 read_side_closed_(false),
62 write_side_closed_(false),
63 fin_buffered_(false),
64 fin_sent_(false),
65 fin_received_(false),
66 rst_sent_(false),
67 rst_received_(false),
68 perspective_(session_->perspective()),
69 flow_controller_(session_->connection(),
70 id_,
71 perspective_,
72 GetReceivedFlowControlWindow(session),
73 GetInitialStreamFlowControlWindowToSend(session),
74 session_->flow_controller()->auto_tune_receive_window()),
75 connection_flow_controller_(session_->flow_controller()),
76 stream_contributes_to_connection_flow_control_(true),
77 busy_counter_(0) {
78 SetFromConfig();
79 }
80
81 ReliableQuicStream::~ReliableQuicStream() {}
82
83 void ReliableQuicStream::SetFromConfig() {}
84
85 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
86 DCHECK_EQ(frame.stream_id, id_);
87
88 DCHECK(!(read_side_closed_ && write_side_closed_));
89
90 if (frame.fin) {
91 fin_received_ = true;
92 if (fin_sent_) {
93 session_->StreamDraining(id_);
94 }
95 }
96
97 if (read_side_closed_) {
98 DVLOG(1) << ENDPOINT << "Stream " << frame.stream_id
99 << " is closed for reading. Ignoring newly received stream data.";
100 // The subclass does not want to read data: blackhole the data.
101 return;
102 }
103
104 // This count includes duplicate data received.
105 size_t frame_payload_size = frame.data_length;
106 stream_bytes_read_ += frame_payload_size;
107
108 // Flow control is interested in tracking highest received offset.
109 // Only interested in received frames that carry data.
110 if (frame_payload_size > 0 &&
111 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
112 // As the highest received offset has changed, check to see if this is a
113 // violation of flow control.
114 if (flow_controller_.FlowControlViolation() ||
115 connection_flow_controller_->FlowControlViolation()) {
116 CloseConnectionWithDetails(
117 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
118 "Flow control violation after increasing offset");
119 return;
120 }
121 }
122
123 sequencer_.OnStreamFrame(frame);
124 }
125
126 int ReliableQuicStream::num_frames_received() const {
127 return sequencer_.num_frames_received();
128 }
129
130 int ReliableQuicStream::num_duplicate_frames_received() const {
131 return sequencer_.num_duplicate_frames_received();
132 }
133
134 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
135 rst_received_ = true;
136 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
137
138 stream_error_ = frame.error_code;
139 CloseWriteSide();
140 CloseReadSide();
141 }
142
143 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
144 ConnectionCloseSource /*source*/) {
145 if (read_side_closed_ && write_side_closed_) {
146 return;
147 }
148 if (error != QUIC_NO_ERROR) {
149 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
150 connection_error_ = error;
151 }
152
153 CloseWriteSide();
154 CloseReadSide();
155 }
156
157 void ReliableQuicStream::OnFinRead() {
158 DCHECK(sequencer_.IsClosed());
159 // OnFinRead can be called due to a FIN flag in a headers block, so there may
160 // have been no OnStreamFrame call with a FIN in the frame.
161 fin_received_ = true;
162 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
163 // stream will be destroyed by CloseReadSide, so don't need to call
164 // StreamDraining.
165 CloseReadSide();
166 }
167
168 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
169 stream_error_ = error;
170 // Sending a RstStream results in calling CloseStream.
171 session()->SendRstStream(id(), error, stream_bytes_written_);
172 rst_sent_ = true;
173 }
174
175 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
176 const string& details) {
177 session()->connection()->CloseConnection(
178 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
179 }
180
181 void ReliableQuicStream::WriteOrBufferData(
182 StringPiece data,
183 bool fin,
184 QuicAckListenerInterface* ack_listener) {
185 if (data.empty() && !fin) {
186 QUIC_BUG << "data.empty() && !fin";
187 return;
188 }
189
190 if (fin_buffered_) {
191 QUIC_BUG << "Fin already buffered";
192 return;
193 }
194 if (write_side_closed_) {
195 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
196 return;
197 }
198
199 QuicConsumedData consumed_data(0, false);
200 fin_buffered_ = fin;
201
202 if (queued_data_.empty()) {
203 struct iovec iov(MakeIovec(data));
204 consumed_data = WritevData(&iov, 1, fin, ack_listener);
205 DCHECK_LE(consumed_data.bytes_consumed, data.length());
206 }
207
208 // If there's unconsumed data or an unconsumed fin, queue it.
209 if (consumed_data.bytes_consumed < data.length() ||
210 (fin && !consumed_data.fin_consumed)) {
211 StringPiece remainder(data.substr(consumed_data.bytes_consumed));
212 queued_data_bytes_ += remainder.size();
213 queued_data_.emplace_back(remainder.as_string(), ack_listener);
214 }
215 }
216
217 void ReliableQuicStream::OnCanWrite() {
218 bool fin = false;
219 while (!queued_data_.empty()) {
220 PendingData* pending_data = &queued_data_.front();
221 QuicAckListenerInterface* ack_listener = pending_data->ack_listener.get();
222 if (queued_data_.size() == 1 && fin_buffered_) {
223 fin = true;
224 }
225 if (pending_data->offset > 0 &&
226 pending_data->offset >= pending_data->data.size()) {
227 // This should be impossible because offset tracks the amount of
228 // pending_data written thus far.
229 QUIC_BUG << "Pending offset is beyond available data. offset: "
230 << pending_data->offset << " vs: " << pending_data->data.size();
231 return;
232 }
233 size_t remaining_len = pending_data->data.size() - pending_data->offset;
234 struct iovec iov = {
235 const_cast<char*>(pending_data->data.data()) + pending_data->offset,
236 remaining_len};
237 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, ack_listener);
238 queued_data_bytes_ -= consumed_data.bytes_consumed;
239 if (consumed_data.bytes_consumed == remaining_len &&
240 fin == consumed_data.fin_consumed) {
241 queued_data_.pop_front();
242 } else {
243 if (consumed_data.bytes_consumed > 0) {
244 pending_data->offset += consumed_data.bytes_consumed;
245 }
246 break;
247 }
248 }
249 }
250
251 void ReliableQuicStream::MaybeSendBlocked() {
252 flow_controller_.MaybeSendBlocked();
253 if (!stream_contributes_to_connection_flow_control_) {
254 return;
255 }
256 connection_flow_controller_->MaybeSendBlocked();
257 // If the stream is blocked by connection-level flow control but not by
258 // stream-level flow control, add the stream to the write blocked list so that
259 // the stream will be given a chance to write when a connection-level
260 // WINDOW_UPDATE arrives.
261 if (connection_flow_controller_->IsBlocked() &&
262 !flow_controller_.IsBlocked()) {
263 session_->MarkConnectionLevelWriteBlocked(id());
264 }
265 }
266
267 QuicConsumedData ReliableQuicStream::WritevData(
268 const struct iovec* iov,
269 int iov_count,
270 bool fin,
271 QuicAckListenerInterface* ack_listener) {
272 if (write_side_closed_) {
273 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
274 return QuicConsumedData(0, false);
275 }
276
277 // How much data was provided.
278 size_t write_length = 0;
279 if (iov != nullptr) {
280 for (int i = 0; i < iov_count; ++i) {
281 write_length += iov[i].iov_len;
282 }
283 }
284
285 // A FIN with zero data payload should not be flow control blocked.
286 bool fin_with_zero_data = (fin && write_length == 0);
287
288 // How much data flow control permits to be written.
289 QuicByteCount send_window = flow_controller_.SendWindowSize();
290 if (stream_contributes_to_connection_flow_control_) {
291 send_window =
292 min(send_window, connection_flow_controller_->SendWindowSize());
293 }
294
295 if (session_->ShouldYield(id())) {
296 session_->MarkConnectionLevelWriteBlocked(id());
297 return QuicConsumedData(0, false);
298 }
299
300 if (send_window == 0 && !fin_with_zero_data) {
301 // Quick return if nothing can be sent.
302 MaybeSendBlocked();
303 return QuicConsumedData(0, false);
304 }
305
306 if (write_length > send_window) {
307 // Don't send the FIN unless all the data will be sent.
308 fin = false;
309
310 // Writing more data would be a violation of flow control.
311 write_length = static_cast<size_t>(send_window);
312 DVLOG(1) << "stream " << id() << " shortens write length to "
313 << write_length << " due to flow control";
314 }
315
316 QuicConsumedData consumed_data =
317 WritevDataInner(QuicIOVector(iov, iov_count, write_length),
318 stream_bytes_written_, fin, ack_listener);
319 stream_bytes_written_ += consumed_data.bytes_consumed;
320
321 AddBytesSent(consumed_data.bytes_consumed);
322
323 // The write may have generated a write error causing this stream to be
324 // closed. If so, simply return without marking the stream write blocked.
325 if (write_side_closed_) {
326 return consumed_data;
327 }
328
329 if (consumed_data.bytes_consumed == write_length) {
330 if (!fin_with_zero_data) {
331 MaybeSendBlocked();
332 }
333 if (fin && consumed_data.fin_consumed) {
334 fin_sent_ = true;
335 if (fin_received_) {
336 session_->StreamDraining(id_);
337 }
338 CloseWriteSide();
339 } else if (fin && !consumed_data.fin_consumed) {
340 session_->MarkConnectionLevelWriteBlocked(id());
341 }
342 } else {
343 session_->MarkConnectionLevelWriteBlocked(id());
344 }
345 return consumed_data;
346 }
347
348 QuicConsumedData ReliableQuicStream::WritevDataInner(
349 QuicIOVector iov,
350 QuicStreamOffset offset,
351 bool fin,
352 QuicAckListenerInterface* ack_notifier_delegate) {
353 return session()->WritevData(this, id(), iov, offset, fin,
354 ack_notifier_delegate);
355 }
356
357 void ReliableQuicStream::CloseReadSide() {
358 if (read_side_closed_) {
359 return;
360 }
361 DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
362
363 read_side_closed_ = true;
364 sequencer_.ReleaseBuffer();
365
366 if (write_side_closed_) {
367 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
368 session_->CloseStream(id());
369 }
370 }
371
372 void ReliableQuicStream::CloseWriteSide() {
373 if (write_side_closed_) {
374 return;
375 }
376 DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
377
378 write_side_closed_ = true;
379 if (read_side_closed_) {
380 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
381 session_->CloseStream(id());
382 }
383 }
384
385 bool ReliableQuicStream::HasBufferedData() const {
386 return !queued_data_.empty();
387 }
388
389 QuicVersion ReliableQuicStream::version() const {
390 return session_->connection()->version();
391 }
392
393 void ReliableQuicStream::StopReading() {
394 DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
395 sequencer_.StopReading();
396 }
397
398 const IPEndPoint& ReliableQuicStream::PeerAddressOfLatestPacket() const {
399 return session_->connection()->last_packet_source_address();
400 }
401
402 void ReliableQuicStream::OnClose() {
403 CloseReadSide();
404 CloseWriteSide();
405
406 if (!fin_sent_ && !rst_sent_) {
407 // For flow control accounting, tell the peer how many bytes have been
408 // written on this stream before termination. Done here if needed, using a
409 // RST_STREAM frame.
410 DVLOG(1) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
411 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
412 stream_bytes_written_);
413 rst_sent_ = true;
414 }
415
416 // The stream is being closed and will not process any further incoming bytes.
417 // As there may be more bytes in flight, to ensure that both endpoints have
418 // the same connection level flow control state, mark all unreceived or
419 // buffered bytes as consumed.
420 QuicByteCount bytes_to_consume =
421 flow_controller_.highest_received_byte_offset() -
422 flow_controller_.bytes_consumed();
423 AddBytesConsumed(bytes_to_consume);
424 }
425
426 void ReliableQuicStream::OnWindowUpdateFrame(
427 const QuicWindowUpdateFrame& frame) {
428 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
429 // Writing can be done again!
430 // TODO(rjshade): This does not respect priorities (e.g. multiple
431 // outstanding POSTs are unblocked on arrival of
432 // SHLO with initial window).
433 // As long as the connection is not flow control blocked, write on!
434 OnCanWrite();
435 }
436 }
437
438 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset(
439 QuicStreamOffset new_offset) {
440 uint64_t increment =
441 new_offset - flow_controller_.highest_received_byte_offset();
442 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
443 return false;
444 }
445
446 // If |new_offset| increased the stream flow controller's highest received
447 // offset, increase the connection flow controller's value by the incremental
448 // difference.
449 if (stream_contributes_to_connection_flow_control_) {
450 connection_flow_controller_->UpdateHighestReceivedOffset(
451 connection_flow_controller_->highest_received_byte_offset() +
452 increment);
453 }
454 return true;
455 }
456
457 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) {
458 flow_controller_.AddBytesSent(bytes);
459 if (stream_contributes_to_connection_flow_control_) {
460 connection_flow_controller_->AddBytesSent(bytes);
461 }
462 }
463
464 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) {
465 // Only adjust stream level flow controller if still reading.
466 if (!read_side_closed_) {
467 flow_controller_.AddBytesConsumed(bytes);
468 }
469
470 if (stream_contributes_to_connection_flow_control_) {
471 connection_flow_controller_->AddBytesConsumed(bytes);
472 }
473 }
474
475 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) {
476 if (flow_controller_.UpdateSendWindowOffset(new_window)) {
477 OnCanWrite();
478 }
479 }
480
481 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/core/reliable_quic_stream.h ('k') | net/quic/core/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698