Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(35)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 2193073003: Move shared files in net/quic/ into net/quic/core/ (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: io_thread_unittest.cc Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/quic/reliable_quic_stream.h"
6
7 #include "base/logging.h"
8 #include "net/quic/iovector.h"
9 #include "net/quic/quic_bug_tracker.h"
10 #include "net/quic/quic_flags.h"
11 #include "net/quic/quic_flow_controller.h"
12 #include "net/quic/quic_session.h"
13 #include "net/quic/quic_write_blocked_list.h"
14
15 using base::StringPiece;
16 using std::min;
17 using std::string;
18
19 namespace net {
20
21 #define ENDPOINT \
22 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
23
24 namespace {
25
26 struct iovec MakeIovec(StringPiece data) {
27 struct iovec iov = {const_cast<char*>(data.data()),
28 static_cast<size_t>(data.size())};
29 return iov;
30 }
31
32 size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
33 return session->config()->GetInitialStreamFlowControlWindowToSend();
34 }
35
36 size_t GetReceivedFlowControlWindow(QuicSession* session) {
37 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
38 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
39 }
40
41 return kMinimumFlowControlSendWindow;
42 }
43
44 } // namespace
45
46 ReliableQuicStream::PendingData::PendingData(
47 string data_in,
48 QuicAckListenerInterface* ack_listener_in)
49 : data(data_in), offset(0), ack_listener(ack_listener_in) {}
50
51 ReliableQuicStream::PendingData::~PendingData() {}
52
53 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
54 : queued_data_bytes_(0),
55 sequencer_(this, session->connection()->clock()),
56 id_(id),
57 session_(session),
58 stream_bytes_read_(0),
59 stream_bytes_written_(0),
60 stream_error_(QUIC_STREAM_NO_ERROR),
61 connection_error_(QUIC_NO_ERROR),
62 read_side_closed_(false),
63 write_side_closed_(false),
64 fin_buffered_(false),
65 fin_sent_(false),
66 fin_received_(false),
67 rst_sent_(false),
68 rst_received_(false),
69 perspective_(session_->perspective()),
70 flow_controller_(session_->connection(),
71 id_,
72 perspective_,
73 GetReceivedFlowControlWindow(session),
74 GetInitialStreamFlowControlWindowToSend(session),
75 session_->flow_controller()->auto_tune_receive_window()),
76 connection_flow_controller_(session_->flow_controller()),
77 stream_contributes_to_connection_flow_control_(true) {
78 SetFromConfig();
79 }
80
81 ReliableQuicStream::~ReliableQuicStream() {}
82
83 void ReliableQuicStream::SetFromConfig() {}
84
85 void ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
86 DCHECK_EQ(frame.stream_id, id_);
87
88 DCHECK(!(read_side_closed_ && write_side_closed_));
89
90 if (frame.fin) {
91 fin_received_ = true;
92 if (fin_sent_) {
93 session_->StreamDraining(id_);
94 }
95 }
96
97 if (read_side_closed_) {
98 DVLOG(1) << ENDPOINT << "Ignoring data in frame " << frame.stream_id;
99 // The subclass does not want to read data: blackhole the data.
100 return;
101 }
102
103 // This count includes duplicate data received.
104 size_t frame_payload_size = frame.data_length;
105 stream_bytes_read_ += frame_payload_size;
106
107 // Flow control is interested in tracking highest received offset.
108 // Only interested in received frames that carry data.
109 if ((!FLAGS_quic_ignore_zero_length_frames || frame_payload_size > 0) &&
110 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
111 // As the highest received offset has changed, check to see if this is a
112 // violation of flow control.
113 if (flow_controller_.FlowControlViolation() ||
114 connection_flow_controller_->FlowControlViolation()) {
115 CloseConnectionWithDetails(
116 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
117 "Flow control violation after increasing offset");
118 return;
119 }
120 }
121
122 sequencer_.OnStreamFrame(frame);
123 }
124
125 int ReliableQuicStream::num_frames_received() const {
126 return sequencer_.num_frames_received();
127 }
128
129 int ReliableQuicStream::num_early_frames_received() const {
130 return sequencer_.num_early_frames_received();
131 }
132
133 int ReliableQuicStream::num_duplicate_frames_received() const {
134 return sequencer_.num_duplicate_frames_received();
135 }
136
137 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
138 rst_received_ = true;
139 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
140
141 stream_error_ = frame.error_code;
142 CloseWriteSide();
143 CloseReadSide();
144 }
145
146 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
147 ConnectionCloseSource /*source*/) {
148 if (read_side_closed_ && write_side_closed_) {
149 return;
150 }
151 if (error != QUIC_NO_ERROR) {
152 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
153 connection_error_ = error;
154 }
155
156 CloseWriteSide();
157 CloseReadSide();
158 }
159
160 void ReliableQuicStream::OnFinRead() {
161 DCHECK(sequencer_.IsClosed());
162 // OnFinRead can be called due to a FIN flag in a headers block, so there may
163 // have been no OnStreamFrame call with a FIN in the frame.
164 fin_received_ = true;
165 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
166 // stream will be destroyed by CloseReadSide, so don't need to call
167 // StreamDraining.
168 CloseReadSide();
169 }
170
171 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
172 stream_error_ = error;
173 // Sending a RstStream results in calling CloseStream.
174 session()->SendRstStream(id(), error, stream_bytes_written_);
175 rst_sent_ = true;
176 }
177
178 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
179 const string& details) {
180 session()->connection()->CloseConnection(
181 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
182 }
183
184 void ReliableQuicStream::WriteOrBufferData(
185 StringPiece data,
186 bool fin,
187 QuicAckListenerInterface* ack_listener) {
188 if (data.empty() && !fin) {
189 QUIC_BUG << "data.empty() && !fin";
190 return;
191 }
192
193 if (fin_buffered_) {
194 QUIC_BUG << "Fin already buffered";
195 return;
196 }
197 if (write_side_closed_) {
198 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
199 return;
200 }
201
202 QuicConsumedData consumed_data(0, false);
203 fin_buffered_ = fin;
204
205 if (queued_data_.empty()) {
206 struct iovec iov(MakeIovec(data));
207 consumed_data = WritevData(&iov, 1, fin, ack_listener);
208 DCHECK_LE(consumed_data.bytes_consumed, data.length());
209 }
210
211 // If there's unconsumed data or an unconsumed fin, queue it.
212 if (consumed_data.bytes_consumed < data.length() ||
213 (fin && !consumed_data.fin_consumed)) {
214 StringPiece remainder(data.substr(consumed_data.bytes_consumed));
215 queued_data_bytes_ += remainder.size();
216 queued_data_.emplace_back(remainder.as_string(), ack_listener);
217 }
218 }
219
220 void ReliableQuicStream::OnCanWrite() {
221 bool fin = false;
222 while (!queued_data_.empty()) {
223 PendingData* pending_data = &queued_data_.front();
224 QuicAckListenerInterface* ack_listener = pending_data->ack_listener.get();
225 if (queued_data_.size() == 1 && fin_buffered_) {
226 fin = true;
227 }
228 if (pending_data->offset > 0 &&
229 pending_data->offset >= pending_data->data.size()) {
230 // This should be impossible because offset tracks the amount of
231 // pending_data written thus far.
232 QUIC_BUG << "Pending offset is beyond available data. offset: "
233 << pending_data->offset << " vs: " << pending_data->data.size();
234 return;
235 }
236 size_t remaining_len = pending_data->data.size() - pending_data->offset;
237 struct iovec iov = {
238 const_cast<char*>(pending_data->data.data()) + pending_data->offset,
239 remaining_len};
240 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, ack_listener);
241 queued_data_bytes_ -= consumed_data.bytes_consumed;
242 if (consumed_data.bytes_consumed == remaining_len &&
243 fin == consumed_data.fin_consumed) {
244 queued_data_.pop_front();
245 } else {
246 if (consumed_data.bytes_consumed > 0) {
247 pending_data->offset += consumed_data.bytes_consumed;
248 }
249 break;
250 }
251 }
252 }
253
254 void ReliableQuicStream::MaybeSendBlocked() {
255 flow_controller_.MaybeSendBlocked();
256 if (!stream_contributes_to_connection_flow_control_) {
257 return;
258 }
259 connection_flow_controller_->MaybeSendBlocked();
260 // If the stream is blocked by connection-level flow control but not by
261 // stream-level flow control, add the stream to the write blocked list so that
262 // the stream will be given a chance to write when a connection-level
263 // WINDOW_UPDATE arrives.
264 if (connection_flow_controller_->IsBlocked() &&
265 !flow_controller_.IsBlocked()) {
266 session_->MarkConnectionLevelWriteBlocked(id());
267 }
268 }
269
270 QuicConsumedData ReliableQuicStream::WritevData(
271 const struct iovec* iov,
272 int iov_count,
273 bool fin,
274 QuicAckListenerInterface* ack_listener) {
275 if (write_side_closed_) {
276 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
277 return QuicConsumedData(0, false);
278 }
279
280 // How much data was provided.
281 size_t write_length = TotalIovecLength(iov, iov_count);
282
283 // A FIN with zero data payload should not be flow control blocked.
284 bool fin_with_zero_data = (fin && write_length == 0);
285
286 // How much data flow control permits to be written.
287 QuicByteCount send_window = flow_controller_.SendWindowSize();
288 if (stream_contributes_to_connection_flow_control_) {
289 send_window =
290 min(send_window, connection_flow_controller_->SendWindowSize());
291 }
292
293 if (session_->ShouldYield(id())) {
294 session_->MarkConnectionLevelWriteBlocked(id());
295 return QuicConsumedData(0, false);
296 }
297
298 if (send_window == 0 && !fin_with_zero_data) {
299 // Quick return if nothing can be sent.
300 MaybeSendBlocked();
301 return QuicConsumedData(0, false);
302 }
303
304 if (write_length > send_window) {
305 // Don't send the FIN unless all the data will be sent.
306 fin = false;
307
308 // Writing more data would be a violation of flow control.
309 write_length = static_cast<size_t>(send_window);
310 }
311
312 QuicConsumedData consumed_data =
313 WritevDataInner(QuicIOVector(iov, iov_count, write_length),
314 stream_bytes_written_, fin, ack_listener);
315 stream_bytes_written_ += consumed_data.bytes_consumed;
316
317 AddBytesSent(consumed_data.bytes_consumed);
318
319 // The write may have generated a write error causing this stream to be
320 // closed. If so, simply return without marking the stream write blocked.
321 if (write_side_closed_) {
322 return consumed_data;
323 }
324
325 if (consumed_data.bytes_consumed == write_length) {
326 if (!fin_with_zero_data) {
327 MaybeSendBlocked();
328 }
329 if (fin && consumed_data.fin_consumed) {
330 fin_sent_ = true;
331 if (fin_received_) {
332 session_->StreamDraining(id_);
333 }
334 CloseWriteSide();
335 } else if (fin && !consumed_data.fin_consumed) {
336 session_->MarkConnectionLevelWriteBlocked(id());
337 }
338 } else {
339 session_->MarkConnectionLevelWriteBlocked(id());
340 }
341 return consumed_data;
342 }
343
344 QuicConsumedData ReliableQuicStream::WritevDataInner(
345 QuicIOVector iov,
346 QuicStreamOffset offset,
347 bool fin,
348 QuicAckListenerInterface* ack_notifier_delegate) {
349 return session()->WritevData(this, id(), iov, offset, fin,
350 ack_notifier_delegate);
351 }
352
353 void ReliableQuicStream::CloseReadSide() {
354 if (read_side_closed_) {
355 return;
356 }
357 DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
358
359 read_side_closed_ = true;
360 if (write_side_closed_) {
361 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
362 session_->CloseStream(id());
363 }
364 }
365
366 void ReliableQuicStream::CloseWriteSide() {
367 if (write_side_closed_) {
368 return;
369 }
370 DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
371
372 write_side_closed_ = true;
373 if (read_side_closed_) {
374 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
375 session_->CloseStream(id());
376 }
377 }
378
379 bool ReliableQuicStream::HasBufferedData() const {
380 return !queued_data_.empty();
381 }
382
383 QuicVersion ReliableQuicStream::version() const {
384 return session_->connection()->version();
385 }
386
387 void ReliableQuicStream::StopReading() {
388 DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
389 sequencer_.StopReading();
390 }
391
392 const IPEndPoint& ReliableQuicStream::PeerAddressOfLatestPacket() const {
393 return session_->connection()->last_packet_source_address();
394 }
395
396 void ReliableQuicStream::OnClose() {
397 CloseReadSide();
398 CloseWriteSide();
399
400 if (!fin_sent_ && !rst_sent_) {
401 // For flow control accounting, tell the peer how many bytes have been
402 // written on this stream before termination. Done here if needed, using a
403 // RST_STREAM frame.
404 DVLOG(1) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
405 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
406 stream_bytes_written_);
407 rst_sent_ = true;
408 }
409
410 // The stream is being closed and will not process any further incoming bytes.
411 // As there may be more bytes in flight, to ensure that both endpoints have
412 // the same connection level flow control state, mark all unreceived or
413 // buffered bytes as consumed.
414 QuicByteCount bytes_to_consume =
415 flow_controller_.highest_received_byte_offset() -
416 flow_controller_.bytes_consumed();
417 AddBytesConsumed(bytes_to_consume);
418 }
419
420 void ReliableQuicStream::OnWindowUpdateFrame(
421 const QuicWindowUpdateFrame& frame) {
422 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
423 // Writing can be done again!
424 // TODO(rjshade): This does not respect priorities (e.g. multiple
425 // outstanding POSTs are unblocked on arrival of
426 // SHLO with initial window).
427 // As long as the connection is not flow control blocked, write on!
428 OnCanWrite();
429 }
430 }
431
432 bool ReliableQuicStream::MaybeIncreaseHighestReceivedOffset(
433 QuicStreamOffset new_offset) {
434 uint64_t increment =
435 new_offset - flow_controller_.highest_received_byte_offset();
436 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
437 return false;
438 }
439
440 // If |new_offset| increased the stream flow controller's highest received
441 // offset, increase the connection flow controller's value by the incremental
442 // difference.
443 if (stream_contributes_to_connection_flow_control_) {
444 connection_flow_controller_->UpdateHighestReceivedOffset(
445 connection_flow_controller_->highest_received_byte_offset() +
446 increment);
447 }
448 return true;
449 }
450
451 void ReliableQuicStream::AddBytesSent(QuicByteCount bytes) {
452 flow_controller_.AddBytesSent(bytes);
453 if (stream_contributes_to_connection_flow_control_) {
454 connection_flow_controller_->AddBytesSent(bytes);
455 }
456 }
457
458 void ReliableQuicStream::AddBytesConsumed(QuicByteCount bytes) {
459 // Only adjust stream level flow controller if still reading.
460 if (!read_side_closed_) {
461 flow_controller_.AddBytesConsumed(bytes);
462 }
463
464 if (stream_contributes_to_connection_flow_control_) {
465 connection_flow_controller_->AddBytesConsumed(bytes);
466 }
467 }
468
469 void ReliableQuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) {
470 if (flow_controller_.UpdateSendWindowOffset(new_window)) {
471 OnCanWrite();
472 }
473 }
474
475 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698