Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: net/quic/quic_stream_sequencer.cc

Issue 1414573004: Introduces QuicStreamSequencerBufferInterface with one implementation. Refactor, no behavior change. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fixing the build file (previously missing a comma) Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/quic_stream_sequencer.h" 5 #include "net/quic/quic_stream_sequencer.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "net/quic/quic_clock.h" 12 #include "net/quic/quic_clock.h"
13 #include "net/quic/quic_flags.h" 13 #include "net/quic/quic_flags.h"
14 #include "net/quic/quic_frame_list.h" 14 #include "net/quic/quic_frame_list.h"
15 #include "net/quic/reliable_quic_stream.h" 15 #include "net/quic/reliable_quic_stream.h"
16 16
17 using std::min; 17 using std::min;
18 using std::numeric_limits; 18 using std::numeric_limits;
19 using std::string; 19 using std::string;
20 20
21 namespace net { 21 namespace net {
22 22
23 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream, 23 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream,
24 const QuicClock* clock) 24 const QuicClock* clock)
25 : stream_(quic_stream), 25 : stream_(quic_stream),
26 num_bytes_consumed_(0), 26 buffered_frames_(new QuicFrameList()),
27 close_offset_(numeric_limits<QuicStreamOffset>::max()), 27 close_offset_(numeric_limits<QuicStreamOffset>::max()),
28 blocked_(false), 28 blocked_(false),
29 num_bytes_buffered_(0),
30 num_frames_received_(0), 29 num_frames_received_(0),
31 num_duplicate_frames_received_(0), 30 num_duplicate_frames_received_(0),
32 num_early_frames_received_(0), 31 num_early_frames_received_(0),
33 clock_(clock), 32 clock_(clock),
34 ignore_read_data_(false) {} 33 ignore_read_data_(false) {}
35 34
36 QuicStreamSequencer::~QuicStreamSequencer() {} 35 QuicStreamSequencer::~QuicStreamSequencer() {}
37 36
38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) { 37 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
39 ++num_frames_received_; 38 ++num_frames_received_;
40 const QuicStreamOffset byte_offset = frame.offset; 39 const QuicStreamOffset byte_offset = frame.offset;
41 const size_t data_len = frame.data.length(); 40 const size_t data_len = frame.data.length();
42 if (data_len == 0 && !frame.fin) { 41 if (data_len == 0 && !frame.fin) {
43 // Stream frames must have data or a fin flag. 42 // Stream frames must have data or a fin flag.
44 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME, 43 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
45 "Empty stream frame without FIN set."); 44 "Empty stream frame without FIN set.");
46 return; 45 return;
47 } 46 }
48 47
49 if (frame.fin) { 48 if (frame.fin) {
50 CloseStreamAtOffset(frame.offset + data_len); 49 CloseStreamAtOffset(frame.offset + data_len);
51 if (data_len == 0) { 50 if (data_len == 0) {
52 return; 51 return;
53 } 52 }
54 } 53 }
55 size_t bytes_written; 54 size_t bytes_written;
56 QuicErrorCode result = buffered_frames_.WriteAtOffset( 55 QuicErrorCode result = buffered_frames_->OnStreamData(
57 byte_offset, frame.data, clock_->ApproximateNow(), &bytes_written); 56 byte_offset, frame.data, clock_->ApproximateNow(), &bytes_written);
58 57
59 if (result == QUIC_INVALID_STREAM_DATA) { 58 if (result == QUIC_INVALID_STREAM_DATA) {
60 stream_->CloseConnectionWithDetails( 59 stream_->CloseConnectionWithDetails(
61 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data."); 60 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
62 return; 61 return;
63 } 62 }
64 if (result == QUIC_NO_ERROR && bytes_written == 0) { 63 if (result == QUIC_NO_ERROR && bytes_written == 0) {
65 ++num_duplicate_frames_received_; 64 ++num_duplicate_frames_received_;
66 // Silently ignore duplicates. 65 // Silently ignore duplicates.
67 return; 66 return;
68 } 67 }
69 68
70 if (byte_offset > num_bytes_consumed_) { 69 if (byte_offset > buffered_frames_->BytesConsumed()) {
71 ++num_early_frames_received_; 70 ++num_early_frames_received_;
72 } 71 }
73 72
74 num_bytes_buffered_ += data_len;
75
76 if (blocked_) { 73 if (blocked_) {
77 return; 74 return;
78 } 75 }
79 76
80 if (byte_offset == num_bytes_consumed_) { 77 if (byte_offset == buffered_frames_->BytesConsumed()) {
81 if (FLAGS_quic_implement_stop_reading && ignore_read_data_) { 78 if (FLAGS_quic_implement_stop_reading && ignore_read_data_) {
82 FlushBufferedFrames(); 79 FlushBufferedFrames();
83 } else { 80 } else {
84 stream_->OnDataAvailable(); 81 stream_->OnDataAvailable();
85 } 82 }
86 } 83 }
87 } 84 }
88 85
89 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) { 86 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
90 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max(); 87 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
91 88
92 // If there is a scheduled close, the new offset should match it. 89 // If there is a scheduled close, the new offset should match it.
93 if (close_offset_ != kMaxOffset && offset != close_offset_) { 90 if (close_offset_ != kMaxOffset && offset != close_offset_) {
94 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS); 91 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
95 return; 92 return;
96 } 93 }
97 94
98 close_offset_ = offset; 95 close_offset_ = offset;
99 96
100 MaybeCloseStream(); 97 MaybeCloseStream();
101 } 98 }
102 99
103 bool QuicStreamSequencer::MaybeCloseStream() { 100 bool QuicStreamSequencer::MaybeCloseStream() {
104 if (blocked_ || !IsClosed()) { 101 if (blocked_ || !IsClosed()) {
105 return false; 102 return false;
106 } 103 }
107 104
108 DVLOG(1) << "Passing up termination, as we've processed " 105 DVLOG(1) << "Passing up termination, as we've processed "
109 << num_bytes_consumed_ << " of " << close_offset_ << " bytes."; 106 << buffered_frames_->BytesConsumed() << " of " << close_offset_
107 << " bytes.";
110 // This will cause the stream to consume the FIN. 108 // This will cause the stream to consume the FIN.
111 // Technically it's an error if |num_bytes_consumed| isn't exactly 109 // Technically it's an error if |num_bytes_consumed| isn't exactly
112 // equal to |close_offset|, but error handling seems silly at this point. 110 // equal to |close_offset|, but error handling seems silly at this point.
113 if (FLAGS_quic_implement_stop_reading && ignore_read_data_) { 111 if (FLAGS_quic_implement_stop_reading && ignore_read_data_) {
114 // The sequencer is discarding stream data and must notify the stream on 112 // The sequencer is discarding stream data and must notify the stream on
115 // receipt of a FIN because the consumer won't. 113 // receipt of a FIN because the consumer won't.
116 stream_->OnFinRead(); 114 stream_->OnFinRead();
117 } else { 115 } else {
118 stream_->OnDataAvailable(); 116 stream_->OnDataAvailable();
119 } 117 }
120 buffered_frames_.Clear(); 118 buffered_frames_->Clear();
121 num_bytes_buffered_ = 0;
122 return true; 119 return true;
123 } 120 }
124 121
125 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const { 122 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const {
126 DCHECK(!blocked_); 123 DCHECK(!blocked_);
127 return buffered_frames_.GetReadableRegions(iov, iov_len); 124 return buffered_frames_->GetReadableRegions(iov, iov_len);
128 } 125 }
129 126
130 bool QuicStreamSequencer::GetReadableRegion(iovec* iov, 127 bool QuicStreamSequencer::GetReadableRegion(iovec* iov,
131 QuicTime* timestamp) const { 128 QuicTime* timestamp) const {
132 DCHECK(!blocked_); 129 DCHECK(!blocked_);
133 return buffered_frames_.GetReadableRegion(iov, timestamp); 130 return buffered_frames_->GetReadableRegion(iov, timestamp);
134 } 131 }
135 132
136 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) { 133 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
137 DCHECK(!blocked_); 134 DCHECK(!blocked_);
138 size_t bytes_read = buffered_frames_.ReadvAndInvalidate(iov, iov_len); 135 size_t bytes_read = buffered_frames_->Readv(iov, iov_len);
139 RecordBytesConsumed(bytes_read); 136 stream_->AddBytesConsumed(bytes_read);
140 return static_cast<int>(bytes_read); 137 return static_cast<int>(bytes_read);
141 } 138 }
142 139
143 bool QuicStreamSequencer::HasBytesToRead() const { 140 bool QuicStreamSequencer::HasBytesToRead() const {
144 return buffered_frames_.HasBytesToRead(); 141 return buffered_frames_->HasBytesToRead();
145 } 142 }
146 143
147 bool QuicStreamSequencer::IsClosed() const { 144 bool QuicStreamSequencer::IsClosed() const {
148 return num_bytes_consumed_ >= close_offset_; 145 return buffered_frames_->BytesConsumed() >= close_offset_;
149 } 146 }
150 147
151 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) { 148 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
152 DCHECK(!blocked_); 149 DCHECK(!blocked_);
153 bool result = 150 bool result = buffered_frames_->MarkConsumed(num_bytes_consumed);
154 buffered_frames_.IncreaseTotalReadAndInvalidate(num_bytes_consumed);
155 if (!result) { 151 if (!result) {
156 LOG(DFATAL) << "Invalid argument to MarkConsumed." 152 LOG(DFATAL) << "Invalid argument to MarkConsumed."
157 << " expect to consume: " << num_bytes_consumed 153 << " expect to consume: " << num_bytes_consumed
158 << ", but not enough bytes available."; 154 << ", but not enough bytes available.";
159 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); 155 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
160 return; 156 return;
161 } 157 }
162 RecordBytesConsumed(num_bytes_consumed); 158 stream_->AddBytesConsumed(num_bytes_consumed);
163 } 159 }
164 160
165 void QuicStreamSequencer::SetBlockedUntilFlush() { 161 void QuicStreamSequencer::SetBlockedUntilFlush() {
166 blocked_ = true; 162 blocked_ = true;
167 } 163 }
168 164
169 void QuicStreamSequencer::SetUnblocked() { 165 void QuicStreamSequencer::SetUnblocked() {
170 blocked_ = false; 166 blocked_ = false;
171 if (IsClosed() || HasBytesToRead()) { 167 if (IsClosed() || HasBytesToRead()) {
172 stream_->OnDataAvailable(); 168 stream_->OnDataAvailable();
173 } 169 }
174 } 170 }
175 171
176 void QuicStreamSequencer::StopReading() { 172 void QuicStreamSequencer::StopReading() {
177 if (ignore_read_data_) { 173 if (ignore_read_data_) {
178 return; 174 return;
179 } 175 }
180 ignore_read_data_ = true; 176 ignore_read_data_ = true;
181 FlushBufferedFrames(); 177 FlushBufferedFrames();
182 } 178 }
183 179
184 void QuicStreamSequencer::FlushBufferedFrames() { 180 void QuicStreamSequencer::FlushBufferedFrames() {
185 DCHECK(ignore_read_data_); 181 DCHECK(ignore_read_data_);
186 size_t bytes_flushed = buffered_frames_.FlushBufferedFrames(); 182 size_t bytes_flushed = buffered_frames_->FlushBufferedFrames();
187 DVLOG(1) << "Flushing buffered data at offset " << num_bytes_consumed_ 183 DVLOG(1) << "Flushing buffered data at offset "
188 << " length " << bytes_flushed << " for stream " << stream_->id(); 184 << buffered_frames_->BytesConsumed() << " length " << bytes_flushed
189 RecordBytesConsumed(bytes_flushed); 185 << " for stream " << stream_->id();
186 stream_->AddBytesConsumed(bytes_flushed);
190 MaybeCloseStream(); 187 MaybeCloseStream();
191 } 188 }
192 189
193 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) { 190 size_t QuicStreamSequencer::NumBytesBuffered() const {
194 num_bytes_consumed_ += bytes_consumed; 191 return buffered_frames_->BytesBuffered();
195 num_bytes_buffered_ -= bytes_consumed; 192 }
196 193
197 stream_->AddBytesConsumed(bytes_consumed); 194 QuicStreamOffset QuicStreamSequencer::NumBytesConsumed() const {
195 return buffered_frames_->BytesConsumed();
198 } 196 }
199 197
200 } // namespace net 198 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698