Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(118)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 100173005: Break out the basic reliable QUIC stream functionality from the (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "net/quic/quic_session.h" 7 #include "net/quic/quic_session.h"
8 #include "net/quic/quic_spdy_decompressor.h" 8 #include "net/quic/quic_spdy_decompressor.h"
9 #include "net/spdy/write_blocked_list.h" 9 #include "net/spdy/write_blocked_list.h"
10 10
11 using base::StringPiece; 11 using base::StringPiece;
12 using std::min; 12 using std::min;
13 13
14 namespace net { 14 namespace net {
15 15
16 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
17
16 namespace { 18 namespace {
17 19
18 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail
19 // to set a priority client-side, or cancel a stream before stripping the
20 // priority from the wire server-side. In either case, start out with a
21 // priority in the middle.
22 QuicPriority kDefaultPriority = 3;
23
24 // Appends bytes from data into partial_data_buffer. Once partial_data_buffer
25 // reaches 4 bytes, copies the data into 'result' and clears
26 // partial_data_buffer.
27 // Returns the number of bytes consumed.
28 uint32 StripUint32(const char* data, uint32 data_len,
29 string* partial_data_buffer,
30 uint32* result) {
31 DCHECK_GT(4u, partial_data_buffer->length());
32 size_t missing_size = 4 - partial_data_buffer->length();
33 if (data_len < missing_size) {
34 StringPiece(data, data_len).AppendToString(partial_data_buffer);
35 return data_len;
36 }
37 StringPiece(data, missing_size).AppendToString(partial_data_buffer);
38 DCHECK_EQ(4u, partial_data_buffer->length());
39 memcpy(result, partial_data_buffer->data(), 4);
40 partial_data_buffer->clear();
41 return missing_size;
42 }
43
44 struct iovec MakeIovec(StringPiece data) { 20 struct iovec MakeIovec(StringPiece data) {
45 struct iovec iov = {const_cast<char*>(data.data()), 21 struct iovec iov = {const_cast<char*>(data.data()),
46 static_cast<size_t>(data.size())}; 22 static_cast<size_t>(data.size())};
47 return iov; 23 return iov;
48 } 24 }
49 25
50 } // namespace 26 } // namespace
51 27
52 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 28 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
53 QuicSession* session) 29 QuicSession* session)
54 : sequencer_(this), 30 : sequencer_(this),
55 id_(id), 31 id_(id),
56 session_(session), 32 session_(session),
57 visitor_(NULL),
58 stream_bytes_read_(0), 33 stream_bytes_read_(0),
59 stream_bytes_written_(0), 34 stream_bytes_written_(0),
60 headers_decompressed_(false),
61 priority_(kDefaultPriority),
62 headers_id_(0),
63 decompression_failed_(false),
64 stream_error_(QUIC_STREAM_NO_ERROR), 35 stream_error_(QUIC_STREAM_NO_ERROR),
65 connection_error_(QUIC_NO_ERROR), 36 connection_error_(QUIC_NO_ERROR),
66 read_side_closed_(false), 37 read_side_closed_(false),
67 write_side_closed_(false), 38 write_side_closed_(false),
68 priority_parsed_(false),
69 fin_buffered_(false), 39 fin_buffered_(false),
70 fin_sent_(false), 40 fin_sent_(false),
71 is_server_(session_->is_server()) { 41 is_server_(session_->is_server()) {
72 } 42 }
73 43
74 ReliableQuicStream::~ReliableQuicStream() { 44 ReliableQuicStream::~ReliableQuicStream() {
75 } 45 }
76 46
77 bool ReliableQuicStream::WillAcceptStreamFrame( 47 bool ReliableQuicStream::WillAcceptStreamFrame(
78 const QuicStreamFrame& frame) const { 48 const QuicStreamFrame& frame) const {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
135 105
136 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 106 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
137 session()->connection()->SendConnectionClose(error); 107 session()->connection()->SendConnectionClose(error);
138 } 108 }
139 109
140 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 110 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
141 const string& details) { 111 const string& details) {
142 session()->connection()->SendConnectionCloseWithDetails(error, details); 112 session()->connection()->SendConnectionCloseWithDetails(error, details);
143 } 113 }
144 114
145 size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
146 if (headers_decompressed_ && decompressed_headers_.empty()) {
147 return sequencer_.Readv(iov, iov_len);
148 }
149 size_t bytes_consumed = 0;
150 size_t iov_index = 0;
151 while (iov_index < iov_len &&
152 decompressed_headers_.length() > bytes_consumed) {
153 size_t bytes_to_read = min(iov[iov_index].iov_len,
154 decompressed_headers_.length() - bytes_consumed);
155 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
156 memcpy(iov_ptr,
157 decompressed_headers_.data() + bytes_consumed, bytes_to_read);
158 bytes_consumed += bytes_to_read;
159 ++iov_index;
160 }
161 decompressed_headers_.erase(0, bytes_consumed);
162 return bytes_consumed;
163 }
164
165 int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
166 if (headers_decompressed_ && decompressed_headers_.empty()) {
167 return sequencer_.GetReadableRegions(iov, iov_len);
168 }
169 if (iov_len == 0) {
170 return 0;
171 }
172 iov[0].iov_base = static_cast<void*>(
173 const_cast<char*>(decompressed_headers_.data()));
174 iov[0].iov_len = decompressed_headers_.length();
175 return 1;
176 }
177
178 bool ReliableQuicStream::IsDoneReading() const {
179 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
180 return false;
181 }
182 return sequencer_.IsClosed();
183 }
184
185 bool ReliableQuicStream::HasBytesToRead() const {
186 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
187 }
188
189 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
190 return session_->peer_address();
191 }
192
193 QuicSpdyCompressor* ReliableQuicStream::compressor() {
194 return session_->compressor();
195 }
196
197 bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
198 return session_->GetSSLInfo(ssl_info);
199 }
200
201 void ReliableQuicStream::set_priority(QuicPriority priority) {
202 DCHECK_EQ(0u, stream_bytes_written_);
203 priority_ = priority;
204 }
205
206 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { 115 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
207 DCHECK(data.size() > 0 || fin); 116 DCHECK(data.size() > 0 || fin);
208 DCHECK(!fin_buffered_); 117 DCHECK(!fin_buffered_);
209 118
210 QuicConsumedData consumed_data(0, false); 119 QuicConsumedData consumed_data(0, false);
211 fin_buffered_ = fin; 120 fin_buffered_ = fin;
212 121
213 if (queued_data_.empty()) { 122 if (queued_data_.empty()) {
214 struct iovec iov(MakeIovec(data)); 123 struct iovec iov(MakeIovec(data));
215 consumed_data = WritevData(&iov, 1, fin, NULL); 124 consumed_data = WritevData(&iov, 1, fin, NULL);
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
267 CloseWriteSide(); 176 CloseWriteSide();
268 } else if (fin && !consumed_data.fin_consumed) { 177 } else if (fin && !consumed_data.fin_consumed) {
269 session_->MarkWriteBlocked(id(), EffectivePriority()); 178 session_->MarkWriteBlocked(id(), EffectivePriority());
270 } 179 }
271 } else { 180 } else {
272 session_->MarkWriteBlocked(id(), EffectivePriority()); 181 session_->MarkWriteBlocked(id(), EffectivePriority());
273 } 182 }
274 return consumed_data; 183 return consumed_data;
275 } 184 }
276 185
277 QuicPriority ReliableQuicStream::EffectivePriority() const {
278 return priority();
279 }
280
281 void ReliableQuicStream::CloseReadSide() { 186 void ReliableQuicStream::CloseReadSide() {
282 if (read_side_closed_) { 187 if (read_side_closed_) {
283 return; 188 return;
284 } 189 }
285 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 190 DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
286 191
287 read_side_closed_ = true; 192 read_side_closed_ = true;
288 if (write_side_closed_) { 193 if (write_side_closed_) {
289 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 194 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
290 session_->CloseStream(id()); 195 session_->CloseStream(id());
291 } 196 }
292 } 197 }
293 198
294 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
295 DCHECK_NE(0u, data_len);
296 if (id() == kCryptoStreamId) {
297 // The crypto stream does not use compression.
298 return ProcessData(data, data_len);
299 }
300
301 uint32 total_bytes_consumed = 0;
302 if (headers_id_ == 0u) {
303 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
304 data += total_bytes_consumed;
305 data_len -= total_bytes_consumed;
306 if (data_len == 0 || total_bytes_consumed == 0) {
307 return total_bytes_consumed;
308 }
309 }
310 DCHECK_NE(0u, headers_id_);
311
312 // Once the headers are finished, we simply pass the data through.
313 if (headers_decompressed_) {
314 // Some buffered header data remains.
315 if (!decompressed_headers_.empty()) {
316 ProcessHeaderData();
317 }
318 if (decompressed_headers_.empty()) {
319 DVLOG(1) << "Delegating procesing to ProcessData";
320 total_bytes_consumed += ProcessData(data, data_len);
321 }
322 return total_bytes_consumed;
323 }
324
325 QuicHeaderId current_header_id =
326 session_->decompressor()->current_header_id();
327 // Ensure that this header id looks sane.
328 if (headers_id_ < current_header_id ||
329 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
330 DVLOG(1) << ENDPOINT
331 << "Invalid headers for stream: " << id()
332 << " header_id: " << headers_id_
333 << " current_header_id: " << current_header_id;
334 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
335 return total_bytes_consumed;
336 }
337
338 // If we are head-of-line blocked on decompression, then back up.
339 if (current_header_id != headers_id_) {
340 session_->MarkDecompressionBlocked(headers_id_, id());
341 DVLOG(1) << ENDPOINT
342 << "Unable to decompress header data for stream: " << id()
343 << " header_id: " << headers_id_;
344 return total_bytes_consumed;
345 }
346
347 // Decompressed data will be delivered to decompressed_headers_.
348 size_t bytes_consumed = session_->decompressor()->DecompressData(
349 StringPiece(data, data_len), this);
350 DCHECK_NE(0u, bytes_consumed);
351 if (bytes_consumed > data_len) {
352 DCHECK(false) << "DecompressData returned illegal value";
353 OnDecompressionError();
354 return total_bytes_consumed;
355 }
356 total_bytes_consumed += bytes_consumed;
357 data += bytes_consumed;
358 data_len -= bytes_consumed;
359
360 if (decompression_failed_) {
361 // The session will have been closed in OnDecompressionError.
362 return total_bytes_consumed;
363 }
364
365 // Headers are complete if the decompressor has moved on to the
366 // next stream.
367 headers_decompressed_ =
368 session_->decompressor()->current_header_id() != headers_id_;
369 if (!headers_decompressed_) {
370 DCHECK_EQ(0u, data_len);
371 }
372
373 ProcessHeaderData();
374
375 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
376 return total_bytes_consumed;
377 }
378
379 // We have processed all of the decompressed data but we might
380 // have some more raw data to process.
381 if (data_len > 0) {
382 total_bytes_consumed += ProcessData(data, data_len);
383 }
384
385 // The sequencer will push any additional buffered frames if this data
386 // has been completely consumed.
387 return total_bytes_consumed;
388 }
389
390 uint32 ReliableQuicStream::ProcessHeaderData() {
391 if (decompressed_headers_.empty()) {
392 return 0;
393 }
394
395 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
396 decompressed_headers_.length());
397 if (bytes_processed == decompressed_headers_.length()) {
398 decompressed_headers_.clear();
399 } else {
400 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
401 }
402 return bytes_processed;
403 }
404
405 void ReliableQuicStream::OnDecompressorAvailable() {
406 DCHECK_EQ(headers_id_,
407 session_->decompressor()->current_header_id());
408 DCHECK(!headers_decompressed_);
409 DCHECK(!decompression_failed_);
410 DCHECK_EQ(0u, decompressed_headers_.length());
411
412 while (!headers_decompressed_) {
413 struct iovec iovec;
414 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
415 return;
416 }
417
418 size_t bytes_consumed = session_->decompressor()->DecompressData(
419 StringPiece(static_cast<char*>(iovec.iov_base),
420 iovec.iov_len),
421 this);
422 DCHECK_LE(bytes_consumed, iovec.iov_len);
423 if (decompression_failed_) {
424 return;
425 }
426 sequencer_.MarkConsumed(bytes_consumed);
427
428 headers_decompressed_ =
429 session_->decompressor()->current_header_id() != headers_id_;
430 }
431
432 // Either the headers are complete, or the all data as been consumed.
433 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
434 if (IsDoneReading()) {
435 OnFinRead();
436 } else if (headers_decompressed_ && decompressed_headers_.empty()) {
437 sequencer_.FlushBufferedFrames();
438 }
439 }
440
441 bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
442 data.AppendToString(&decompressed_headers_);
443 return true;
444 }
445
446 void ReliableQuicStream::OnDecompressionError() {
447 DCHECK(!decompression_failed_);
448 decompression_failed_ = true;
449 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
450 }
451
452
453 void ReliableQuicStream::CloseWriteSide() { 199 void ReliableQuicStream::CloseWriteSide() {
454 if (write_side_closed_) { 200 if (write_side_closed_) {
455 return; 201 return;
456 } 202 }
457 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 203 DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
458 204
459 write_side_closed_ = true; 205 write_side_closed_ = true;
460 if (read_side_closed_) { 206 if (read_side_closed_) {
461 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 207 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
462 session_->CloseStream(id()); 208 session_->CloseStream(id());
463 } 209 }
464 } 210 }
465 211
466 bool ReliableQuicStream::HasBufferedData() { 212 bool ReliableQuicStream::HasBufferedData() {
467 return !queued_data_.empty(); 213 return !queued_data_.empty();
468 } 214 }
469 215
470 void ReliableQuicStream::OnClose() { 216 void ReliableQuicStream::OnClose() {
471 CloseReadSide(); 217 CloseReadSide();
472 CloseWriteSide(); 218 CloseWriteSide();
473
474 if (visitor_) {
475 Visitor* visitor = visitor_;
476 // Calling Visitor::OnClose() may result the destruction of the visitor,
477 // so we need to ensure we don't call it again.
478 visitor_ = NULL;
479 visitor->OnClose(this);
480 }
481 }
482
483 uint32 ReliableQuicStream::StripPriorityAndHeaderId(
484 const char* data, uint32 data_len) {
485 uint32 total_bytes_parsed = 0;
486
487 if (!priority_parsed_ && session_->connection()->is_server()) {
488 QuicPriority temporary_priority = priority_;
489 total_bytes_parsed = StripUint32(
490 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
491 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) {
492 priority_parsed_ = true;
493
494 // Spdy priorities are inverted, so the highest numerical value is the
495 // lowest legal priority.
496 if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
497 session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
498 return 0;
499 }
500 priority_ = temporary_priority;
501 }
502 data += total_bytes_parsed;
503 data_len -= total_bytes_parsed;
504 }
505 if (data_len > 0 && headers_id_ == 0u) {
506 // The headers ID has not yet been read. Strip it from the beginning of
507 // the data stream.
508 total_bytes_parsed += StripUint32(
509 data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
510 }
511 return total_bytes_parsed;
512 } 219 }
513 220
514 } // namespace net 221 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698