Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(108)

Side by Side Diff: net/quic/quic_data_stream.cc

Issue 185203003: Killing off QUICv12, including cleaning out all of the code for handling (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Deleted unused StripUint32 Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/quic_data_stream.h ('k') | net/quic/quic_data_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/quic_data_stream.h" 5 #include "net/quic/quic_data_stream.h"
6 6
7 #include "base/logging.h" 7 #include "base/logging.h"
8 #include "net/quic/quic_session.h" 8 #include "net/quic/quic_session.h"
9 #include "net/quic/quic_spdy_decompressor.h"
10 #include "net/quic/quic_utils.h" 9 #include "net/quic/quic_utils.h"
11 #include "net/quic/quic_write_blocked_list.h" 10 #include "net/quic/quic_write_blocked_list.h"
12 11
13 using base::StringPiece; 12 using base::StringPiece;
14 using std::min; 13 using std::min;
15 14
16 namespace net { 15 namespace net {
17 16
18 #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") 17 #define ENDPOINT (session()->is_server() ? "Server: " : " Client: ")
19 18
20 namespace { 19 namespace {
21 20
22 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail 21 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail
23 // to set a priority client-side, or cancel a stream before stripping the 22 // to set a priority client-side, or cancel a stream before stripping the
24 // priority from the wire server-side. In either case, start out with a 23 // priority from the wire server-side. In either case, start out with a
25 // priority in the middle. 24 // priority in the middle.
26 QuicPriority kDefaultPriority = 3; 25 QuicPriority kDefaultPriority = 3;
27 26
28 // Appends bytes from data into partial_data_buffer. Once partial_data_buffer
29 // reaches 4 bytes, copies the data into 'result' and clears
30 // partial_data_buffer.
31 // Returns the number of bytes consumed.
32 uint32 StripUint32(const char* data, uint32 data_len,
33 string* partial_data_buffer,
34 uint32* result) {
35 DCHECK_GT(4u, partial_data_buffer->length());
36 size_t missing_size = 4 - partial_data_buffer->length();
37 if (data_len < missing_size) {
38 StringPiece(data, data_len).AppendToString(partial_data_buffer);
39 return data_len;
40 }
41 StringPiece(data, missing_size).AppendToString(partial_data_buffer);
42 DCHECK_EQ(4u, partial_data_buffer->length());
43 memcpy(result, partial_data_buffer->data(), 4);
44 partial_data_buffer->clear();
45 return missing_size;
46 }
47
48 } // namespace 27 } // namespace
49 28
50 QuicDataStream::QuicDataStream(QuicStreamId id, 29 QuicDataStream::QuicDataStream(QuicStreamId id,
51 QuicSession* session) 30 QuicSession* session)
52 : ReliableQuicStream(id, session), 31 : ReliableQuicStream(id, session),
53 visitor_(NULL), 32 visitor_(NULL),
54 headers_decompressed_(false), 33 headers_decompressed_(false),
55 priority_(kDefaultPriority), 34 priority_(kDefaultPriority),
56 headers_id_(0),
57 decompression_failed_(false), 35 decompression_failed_(false),
58 priority_parsed_(false) { 36 priority_parsed_(false) {
59 DCHECK_NE(kCryptoStreamId, id); 37 DCHECK_NE(kCryptoStreamId, id);
60 if (version() > QUIC_VERSION_12) { 38 // Don't receive any callbacks from the sequencer until headers
61 // Don't receive any callbacks from the sequencer until headers 39 // are complete.
62 // are complete. 40 sequencer()->SetBlockedUntilFlush();
63 sequencer()->SetBlockedUntilFlush();
64 }
65 } 41 }
66 42
67 QuicDataStream::~QuicDataStream() { 43 QuicDataStream::~QuicDataStream() {
68 } 44 }
69 45
70 size_t QuicDataStream::WriteHeaders(const SpdyHeaderBlock& header_block, 46 size_t QuicDataStream::WriteHeaders(const SpdyHeaderBlock& header_block,
71 bool fin) { 47 bool fin) {
72 size_t bytes_written = session()->WriteHeaders(id(), header_block, fin); 48 size_t bytes_written = session()->WriteHeaders(id(), header_block, fin);
73 if (fin) { 49 if (fin) {
74 // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent. 50 // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent.
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
131 void QuicDataStream::set_priority(QuicPriority priority) { 107 void QuicDataStream::set_priority(QuicPriority priority) {
132 DCHECK_EQ(0u, stream_bytes_written()); 108 DCHECK_EQ(0u, stream_bytes_written());
133 priority_ = priority; 109 priority_ = priority;
134 } 110 }
135 111
136 QuicPriority QuicDataStream::EffectivePriority() const { 112 QuicPriority QuicDataStream::EffectivePriority() const {
137 return priority(); 113 return priority();
138 } 114 }
139 115
140 uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { 116 uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) {
141 if (version() <= QUIC_VERSION_12) {
142 return ProcessRawData12(data, data_len);
143 }
144
145 if (!FinishedReadingHeaders()) { 117 if (!FinishedReadingHeaders()) {
146 LOG(DFATAL) << "ProcessRawData called before headers have been finished"; 118 LOG(DFATAL) << "ProcessRawData called before headers have been finished";
147 return 0; 119 return 0;
148 } 120 }
149 return ProcessData(data, data_len); 121 return ProcessData(data, data_len);
150 } 122 }
151 123
152 uint32 QuicDataStream::ProcessRawData12(const char* data, uint32 data_len) {
153 DCHECK_NE(0u, data_len);
154
155 uint32 total_bytes_consumed = 0;
156 if (headers_id_ == 0u) {
157 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
158 data += total_bytes_consumed;
159 data_len -= total_bytes_consumed;
160 if (data_len == 0 || total_bytes_consumed == 0) {
161 return total_bytes_consumed;
162 }
163 }
164 DCHECK_NE(0u, headers_id_);
165
166 // Once the headers are finished, we simply pass the data through.
167 if (headers_decompressed_) {
168 // Some buffered header data remains.
169 if (!decompressed_headers_.empty()) {
170 ProcessHeaderData();
171 }
172 if (decompressed_headers_.empty()) {
173 DVLOG(1) << "Delegating procesing to ProcessData";
174 total_bytes_consumed += ProcessData(data, data_len);
175 }
176 return total_bytes_consumed;
177 }
178
179 QuicHeaderId current_header_id =
180 session()->decompressor()->current_header_id();
181 // Ensure that this header id looks sane.
182 if (headers_id_ < current_header_id ||
183 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
184 DVLOG(1) << ENDPOINT
185 << "Invalid headers for stream: " << id()
186 << " header_id: " << headers_id_
187 << " current_header_id: " << current_header_id;
188 session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
189 return total_bytes_consumed;
190 }
191
192 // If we are head-of-line blocked on decompression, then back up.
193 if (current_header_id != headers_id_) {
194 session()->MarkDecompressionBlocked(headers_id_, id());
195 DVLOG(1) << ENDPOINT
196 << "Unable to decompress header data for stream: " << id()
197 << " header_id: " << headers_id_;
198 return total_bytes_consumed;
199 }
200
201 // Decompressed data will be delivered to decompressed_headers_.
202 size_t bytes_consumed = session()->decompressor()->DecompressData(
203 StringPiece(data, data_len), this);
204 DCHECK_NE(0u, bytes_consumed);
205 if (bytes_consumed > data_len) {
206 DCHECK(false) << "DecompressData returned illegal value";
207 OnDecompressionError();
208 return total_bytes_consumed;
209 }
210 total_bytes_consumed += bytes_consumed;
211 data += bytes_consumed;
212 data_len -= bytes_consumed;
213
214 if (decompression_failed_) {
215 // The session will have been closed in OnDecompressionError.
216 return total_bytes_consumed;
217 }
218
219 // Headers are complete if the decompressor has moved on to the
220 // next stream.
221 headers_decompressed_ =
222 session()->decompressor()->current_header_id() != headers_id_;
223 if (!headers_decompressed_) {
224 DCHECK_EQ(0u, data_len);
225 }
226
227 ProcessHeaderData();
228
229 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
230 return total_bytes_consumed;
231 }
232
233 // We have processed all of the decompressed data but we might
234 // have some more raw data to process.
235 if (data_len > 0) {
236 total_bytes_consumed += ProcessData(data, data_len);
237 }
238
239 // The sequencer will push any additional buffered frames if this data
240 // has been completely consumed.
241 return total_bytes_consumed;
242 }
243
244 const IPEndPoint& QuicDataStream::GetPeerAddress() { 124 const IPEndPoint& QuicDataStream::GetPeerAddress() {
245 return session()->peer_address(); 125 return session()->peer_address();
246 } 126 }
247 127
248 QuicSpdyCompressor* QuicDataStream::compressor() {
249 return session()->compressor();
250 }
251
252 bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { 128 bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) {
253 return session()->GetSSLInfo(ssl_info); 129 return session()->GetSSLInfo(ssl_info);
254 } 130 }
255 131
256 uint32 QuicDataStream::ProcessHeaderData() { 132 uint32 QuicDataStream::ProcessHeaderData() {
257 if (decompressed_headers_.empty()) { 133 if (decompressed_headers_.empty()) {
258 return 0; 134 return 0;
259 } 135 }
260 136
261 size_t bytes_processed = ProcessData(decompressed_headers_.data(), 137 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
262 decompressed_headers_.length()); 138 decompressed_headers_.length());
263 if (bytes_processed == decompressed_headers_.length()) { 139 if (bytes_processed == decompressed_headers_.length()) {
264 decompressed_headers_.clear(); 140 decompressed_headers_.clear();
265 } else { 141 } else {
266 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); 142 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
267 } 143 }
268 return bytes_processed; 144 return bytes_processed;
269 } 145 }
270 146
271 void QuicDataStream::OnDecompressorAvailable() {
272 DCHECK_LE(QUIC_VERSION_12, version());
273 DCHECK_EQ(headers_id_,
274 session()->decompressor()->current_header_id());
275 DCHECK(!headers_decompressed_);
276 DCHECK(!decompression_failed_);
277 DCHECK_EQ(0u, decompressed_headers_.length());
278
279 while (!headers_decompressed_) {
280 struct iovec iovec;
281 if (sequencer()->GetReadableRegions(&iovec, 1) == 0) {
282 return;
283 }
284
285 size_t bytes_consumed = session()->decompressor()->DecompressData(
286 StringPiece(static_cast<char*>(iovec.iov_base),
287 iovec.iov_len),
288 this);
289 DCHECK_LE(bytes_consumed, iovec.iov_len);
290 if (decompression_failed_) {
291 return;
292 }
293 sequencer()->MarkConsumed(bytes_consumed);
294
295 headers_decompressed_ =
296 session()->decompressor()->current_header_id() != headers_id_;
297 }
298
299 // Either the headers are complete, or the all data as been consumed.
300 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
301 if (IsDoneReading()) {
302 OnFinRead();
303 } else if (FinishedReadingHeaders()) {
304 sequencer()->FlushBufferedFrames();
305 }
306 }
307
308 bool QuicDataStream::OnDecompressedData(StringPiece data) {
309 DCHECK_GE(QUIC_VERSION_12, version());
310 data.AppendToString(&decompressed_headers_);
311 return true;
312 }
313
314 void QuicDataStream::OnDecompressionError() {
315 DCHECK_LE(QUIC_VERSION_12, version());
316 DCHECK(!decompression_failed_);
317 decompression_failed_ = true;
318 session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
319 }
320
321 void QuicDataStream::OnStreamHeaders(StringPiece headers_data) { 147 void QuicDataStream::OnStreamHeaders(StringPiece headers_data) {
322 DCHECK_LT(QUIC_VERSION_12, version());
323 headers_data.AppendToString(&decompressed_headers_); 148 headers_data.AppendToString(&decompressed_headers_);
324 ProcessHeaderData(); 149 ProcessHeaderData();
325 } 150 }
326 151
327 void QuicDataStream::OnStreamHeadersPriority(QuicPriority priority) { 152 void QuicDataStream::OnStreamHeadersPriority(QuicPriority priority) {
328 DCHECK(session()->connection()->is_server()); 153 DCHECK(session()->connection()->is_server());
329 set_priority(priority); 154 set_priority(priority);
330 } 155 }
331 156
332 void QuicDataStream::OnStreamHeadersComplete(bool fin, size_t frame_len) { 157 void QuicDataStream::OnStreamHeadersComplete(bool fin, size_t frame_len) {
333 DCHECK_LT(QUIC_VERSION_12, version());
334 headers_decompressed_ = true; 158 headers_decompressed_ = true;
335 if (fin) { 159 if (fin) {
336 sequencer()->OnStreamFrame(QuicStreamFrame(id(), fin, 0, IOVector())); 160 sequencer()->OnStreamFrame(QuicStreamFrame(id(), fin, 0, IOVector()));
337 } 161 }
338 ProcessHeaderData(); 162 ProcessHeaderData();
339 if (FinishedReadingHeaders()) { 163 if (FinishedReadingHeaders()) {
340 sequencer()->FlushBufferedFrames(); 164 sequencer()->FlushBufferedFrames();
341 } 165 }
342 } 166 }
343 167
344 void QuicDataStream::OnClose() { 168 void QuicDataStream::OnClose() {
345 ReliableQuicStream::OnClose(); 169 ReliableQuicStream::OnClose();
346 170
347 if (visitor_) { 171 if (visitor_) {
348 Visitor* visitor = visitor_; 172 Visitor* visitor = visitor_;
349 // Calling Visitor::OnClose() may result the destruction of the visitor, 173 // Calling Visitor::OnClose() may result the destruction of the visitor,
350 // so we need to ensure we don't call it again. 174 // so we need to ensure we don't call it again.
351 visitor_ = NULL; 175 visitor_ = NULL;
352 visitor->OnClose(this); 176 visitor->OnClose(this);
353 } 177 }
354 } 178 }
355 179
356 uint32 QuicDataStream::StripPriorityAndHeaderId(
357 const char* data, uint32 data_len) {
358 uint32 total_bytes_parsed = 0;
359
360 if (!priority_parsed_ && session()->connection()->is_server()) {
361 QuicPriority temporary_priority = priority_;
362 total_bytes_parsed = StripUint32(
363 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
364 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
365 priority_parsed_ = true;
366
367 // Spdy priorities are inverted, so the highest numerical value is the
368 // lowest legal priority.
369 if (temporary_priority > QuicUtils::LowestPriority()) {
370 session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
371 return 0;
372 }
373 priority_ = temporary_priority;
374 }
375 data += total_bytes_parsed;
376 data_len -= total_bytes_parsed;
377 }
378 if (data_len > 0 && headers_id_ == 0u) {
379 // The headers ID has not yet been read. Strip it from the beginning of
380 // the data stream.
381 total_bytes_parsed += StripUint32(
382 data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
383 }
384 return total_bytes_parsed;
385 }
386
387 bool QuicDataStream::FinishedReadingHeaders() { 180 bool QuicDataStream::FinishedReadingHeaders() {
388 return (headers_id_ != 0 || version() > QUIC_VERSION_12) && 181 return headers_decompressed_ && decompressed_headers_.empty();
389 headers_decompressed_ && decompressed_headers_.empty();
390 } 182 }
391 183
392 void QuicDataStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) { 184 void QuicDataStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
393 DVLOG(1) << "Received WindowUpdateFrame for stream: " << id() 185 DVLOG(1) << "Received WindowUpdateFrame for stream: " << id()
394 << ", with byte offset: " << frame.byte_offset; 186 << ", with byte offset: " << frame.byte_offset;
395 // TODO(rjshade): Adjust flow control window. 187 // TODO(rjshade): Adjust flow control window.
396 } 188 }
397 189
398 } // namespace net 190 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/quic_data_stream.h ('k') | net/quic/quic_data_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698