Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(193)

Side by Side Diff: net/quic/reliable_quic_stream.cc

Issue 103973007: Land Recent QUIC Changes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix for android compile error Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "net/quic/quic_session.h" 7 #include "net/quic/quic_session.h"
8 #include "net/quic/quic_spdy_decompressor.h" 8 #include "net/quic/quic_spdy_decompressor.h"
9 #include "net/spdy/write_blocked_list.h" 9 #include "net/spdy/write_blocked_list.h"
10 10
11 using base::StringPiece; 11 using base::StringPiece;
12 using std::min; 12 using std::min;
13 13
14 namespace net { 14 namespace net {
15 15
16 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
17
16 namespace { 18 namespace {
17 19
18 // This is somewhat arbitrary. It's possible, but unlikely, we will either fail 20 struct iovec MakeIovec(StringPiece data) {
19 // to set a priority client-side, or cancel a stream before stripping the 21 struct iovec iov = {const_cast<char*>(data.data()),
20 // priority from the wire server-side. In either case, start out with a 22 static_cast<size_t>(data.size())};
21 // priority in the middle. 23 return iov;
22 QuicPriority kDefaultPriority = 3;
23
24 // Appends bytes from data into partial_data_buffer. Once partial_data_buffer
25 // reaches 4 bytes, copies the data into 'result' and clears
26 // partial_data_buffer.
27 // Returns the number of bytes consumed.
28 uint32 StripUint32(const char* data, uint32 data_len,
29 string* partial_data_buffer,
30 uint32* result) {
31 DCHECK_GT(4u, partial_data_buffer->length());
32 size_t missing_size = 4 - partial_data_buffer->length();
33 if (data_len < missing_size) {
34 StringPiece(data, data_len).AppendToString(partial_data_buffer);
35 return data_len;
36 }
37 StringPiece(data, missing_size).AppendToString(partial_data_buffer);
38 DCHECK_EQ(4u, partial_data_buffer->length());
39 memcpy(result, partial_data_buffer->data(), 4);
40 partial_data_buffer->clear();
41 return missing_size;
42 } 24 }
43 25
44 } // namespace 26 } // namespace
45 27
46 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 28 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
47 QuicSession* session) 29 QuicSession* session)
48 : sequencer_(this), 30 : sequencer_(this),
49 id_(id), 31 id_(id),
50 session_(session), 32 session_(session),
51 visitor_(NULL),
52 stream_bytes_read_(0), 33 stream_bytes_read_(0),
53 stream_bytes_written_(0), 34 stream_bytes_written_(0),
54 headers_decompressed_(false),
55 priority_(kDefaultPriority),
56 headers_id_(0),
57 decompression_failed_(false),
58 stream_error_(QUIC_STREAM_NO_ERROR), 35 stream_error_(QUIC_STREAM_NO_ERROR),
59 connection_error_(QUIC_NO_ERROR), 36 connection_error_(QUIC_NO_ERROR),
60 read_side_closed_(false), 37 read_side_closed_(false),
61 write_side_closed_(false), 38 write_side_closed_(false),
62 priority_parsed_(false),
63 fin_buffered_(false), 39 fin_buffered_(false),
64 fin_sent_(false), 40 fin_sent_(false),
65 is_server_(session_->is_server()) { 41 is_server_(session_->is_server()) {
66 } 42 }
67 43
68 ReliableQuicStream::~ReliableQuicStream() { 44 ReliableQuicStream::~ReliableQuicStream() {
69 } 45 }
70 46
71 bool ReliableQuicStream::WillAcceptStreamFrame( 47 bool ReliableQuicStream::WillAcceptStreamFrame(
72 const QuicStreamFrame& frame) const { 48 const QuicStreamFrame& frame) const {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
129 105
130 void ReliableQuicStream::CloseConnection(QuicErrorCode error) { 106 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
131 session()->connection()->SendConnectionClose(error); 107 session()->connection()->SendConnectionClose(error);
132 } 108 }
133 109
134 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, 110 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
135 const string& details) { 111 const string& details) {
136 session()->connection()->SendConnectionCloseWithDetails(error, details); 112 session()->connection()->SendConnectionCloseWithDetails(error, details);
137 } 113 }
138 114
139 size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { 115 void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
140 if (headers_decompressed_ && decompressed_headers_.empty()) {
141 return sequencer_.Readv(iov, iov_len);
142 }
143 size_t bytes_consumed = 0;
144 size_t iov_index = 0;
145 while (iov_index < iov_len &&
146 decompressed_headers_.length() > bytes_consumed) {
147 size_t bytes_to_read = min(iov[iov_index].iov_len,
148 decompressed_headers_.length() - bytes_consumed);
149 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
150 memcpy(iov_ptr,
151 decompressed_headers_.data() + bytes_consumed, bytes_to_read);
152 bytes_consumed += bytes_to_read;
153 ++iov_index;
154 }
155 decompressed_headers_.erase(0, bytes_consumed);
156 return bytes_consumed;
157 }
158
159 int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
160 if (headers_decompressed_ && decompressed_headers_.empty()) {
161 return sequencer_.GetReadableRegions(iov, iov_len);
162 }
163 if (iov_len == 0) {
164 return 0;
165 }
166 iov[0].iov_base = static_cast<void*>(
167 const_cast<char*>(decompressed_headers_.data()));
168 iov[0].iov_len = decompressed_headers_.length();
169 return 1;
170 }
171
172 bool ReliableQuicStream::IsDoneReading() const {
173 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
174 return false;
175 }
176 return sequencer_.IsClosed();
177 }
178
179 bool ReliableQuicStream::HasBytesToRead() const {
180 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
181 }
182
183 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
184 return session_->peer_address();
185 }
186
187 QuicSpdyCompressor* ReliableQuicStream::compressor() {
188 return session_->compressor();
189 }
190
191 bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
192 return session_->GetSSLInfo(ssl_info);
193 }
194
195 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
196 DCHECK(data.size() > 0 || fin); 116 DCHECK(data.size() > 0 || fin);
197 return WriteOrBuffer(data, fin);
198 }
199
200
201 void ReliableQuicStream::set_priority(QuicPriority priority) {
202 DCHECK_EQ(0u, stream_bytes_written_);
203 priority_ = priority;
204 }
205
206 QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
207 DCHECK(!fin_buffered_); 117 DCHECK(!fin_buffered_);
208 118
209 QuicConsumedData consumed_data(0, false); 119 QuicConsumedData consumed_data(0, false);
210 fin_buffered_ = fin; 120 fin_buffered_ = fin;
211 121
212 if (queued_data_.empty()) { 122 if (queued_data_.empty()) {
213 consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); 123 struct iovec iov(MakeIovec(data));
124 consumed_data = WritevData(&iov, 1, fin, NULL);
214 DCHECK_LE(consumed_data.bytes_consumed, data.length()); 125 DCHECK_LE(consumed_data.bytes_consumed, data.length());
215 } 126 }
216 127
217 // If there's unconsumed data or an unconsumed fin, queue it. 128 // If there's unconsumed data or an unconsumed fin, queue it.
218 if (consumed_data.bytes_consumed < data.length() || 129 if (consumed_data.bytes_consumed < data.length() ||
219 (fin && !consumed_data.fin_consumed)) { 130 (fin && !consumed_data.fin_consumed)) {
220 queued_data_.push_back( 131 queued_data_.push_back(
221 string(data.data() + consumed_data.bytes_consumed, 132 string(data.data() + consumed_data.bytes_consumed,
222 data.length() - consumed_data.bytes_consumed)); 133 data.length() - consumed_data.bytes_consumed));
223 } 134 }
224
225 return QuicConsumedData(data.size(), true);
226 } 135 }
227 136
228 void ReliableQuicStream::OnCanWrite() { 137 void ReliableQuicStream::OnCanWrite() {
229 bool fin = false; 138 bool fin = false;
230 while (!queued_data_.empty()) { 139 while (!queued_data_.empty()) {
231 const string& data = queued_data_.front(); 140 const string& data = queued_data_.front();
232 if (queued_data_.size() == 1 && fin_buffered_) { 141 if (queued_data_.size() == 1 && fin_buffered_) {
233 fin = true; 142 fin = true;
234 } 143 }
235 QuicConsumedData consumed_data = WriteDataInternal(data, fin); 144 struct iovec iov(MakeIovec(data));
145 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL);
236 if (consumed_data.bytes_consumed == data.size() && 146 if (consumed_data.bytes_consumed == data.size() &&
237 fin == consumed_data.fin_consumed) { 147 fin == consumed_data.fin_consumed) {
238 queued_data_.pop_front(); 148 queued_data_.pop_front();
239 } else { 149 } else {
240 queued_data_.front().erase(0, consumed_data.bytes_consumed); 150 queued_data_.front().erase(0, consumed_data.bytes_consumed);
241 break; 151 break;
242 } 152 }
243 } 153 }
244 } 154 }
245 155
246 QuicConsumedData ReliableQuicStream::WriteDataInternal( 156 QuicConsumedData ReliableQuicStream::WritevData(
247 StringPiece data, bool fin) {
248 struct iovec iov = {const_cast<char*>(data.data()),
249 static_cast<size_t>(data.size())};
250 return WritevDataInternal(&iov, 1, fin, NULL);
251 }
252
253 QuicConsumedData ReliableQuicStream::WritevDataInternal(
254 const struct iovec* iov, 157 const struct iovec* iov,
255 int iov_count, 158 int iov_count,
256 bool fin, 159 bool fin,
257 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) { 160 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
258 if (write_side_closed_) { 161 if (write_side_closed_) {
259 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed"; 162 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
260 return QuicConsumedData(0, false); 163 return QuicConsumedData(0, false);
261 } 164 }
262 165
263 size_t write_length = 0u; 166 size_t write_length = 0u;
264 for (int i = 0; i < iov_count; ++i) { 167 for (int i = 0; i < iov_count; ++i) {
265 write_length += iov[i].iov_len; 168 write_length += iov[i].iov_len;
266 } 169 }
267 QuicConsumedData consumed_data = session()->WritevData( 170 QuicConsumedData consumed_data = session()->WritevData(
268 id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate); 171 id(), iov, iov_count, stream_bytes_written_, fin, ack_notifier_delegate);
269 stream_bytes_written_ += consumed_data.bytes_consumed; 172 stream_bytes_written_ += consumed_data.bytes_consumed;
270 if (consumed_data.bytes_consumed == write_length) { 173 if (consumed_data.bytes_consumed == write_length) {
271 if (fin && consumed_data.fin_consumed) { 174 if (fin && consumed_data.fin_consumed) {
272 fin_sent_ = true; 175 fin_sent_ = true;
273 CloseWriteSide(); 176 CloseWriteSide();
274 } else if (fin && !consumed_data.fin_consumed) { 177 } else if (fin && !consumed_data.fin_consumed) {
275 session_->MarkWriteBlocked(id(), EffectivePriority()); 178 session_->MarkWriteBlocked(id(), EffectivePriority());
276 } 179 }
277 } else { 180 } else {
278 session_->MarkWriteBlocked(id(), EffectivePriority()); 181 session_->MarkWriteBlocked(id(), EffectivePriority());
279 } 182 }
280 return consumed_data; 183 return consumed_data;
281 } 184 }
282 185
283 QuicPriority ReliableQuicStream::EffectivePriority() const {
284 return priority();
285 }
286
287 void ReliableQuicStream::CloseReadSide() { 186 void ReliableQuicStream::CloseReadSide() {
288 if (read_side_closed_) { 187 if (read_side_closed_) {
289 return; 188 return;
290 } 189 }
291 DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); 190 DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
292 191
293 read_side_closed_ = true; 192 read_side_closed_ = true;
294 if (write_side_closed_) { 193 if (write_side_closed_) {
295 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 194 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
296 session_->CloseStream(id()); 195 session_->CloseStream(id());
297 } 196 }
298 } 197 }
299 198
300 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
301 DCHECK_NE(0u, data_len);
302 if (id() == kCryptoStreamId) {
303 // The crypto stream does not use compression.
304 return ProcessData(data, data_len);
305 }
306
307 uint32 total_bytes_consumed = 0;
308 if (headers_id_ == 0u) {
309 total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
310 data += total_bytes_consumed;
311 data_len -= total_bytes_consumed;
312 if (data_len == 0 || total_bytes_consumed == 0) {
313 return total_bytes_consumed;
314 }
315 }
316 DCHECK_NE(0u, headers_id_);
317
318 // Once the headers are finished, we simply pass the data through.
319 if (headers_decompressed_) {
320 // Some buffered header data remains.
321 if (!decompressed_headers_.empty()) {
322 ProcessHeaderData();
323 }
324 if (decompressed_headers_.empty()) {
325 DVLOG(1) << "Delegating procesing to ProcessData";
326 total_bytes_consumed += ProcessData(data, data_len);
327 }
328 return total_bytes_consumed;
329 }
330
331 QuicHeaderId current_header_id =
332 session_->decompressor()->current_header_id();
333 // Ensure that this header id looks sane.
334 if (headers_id_ < current_header_id ||
335 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
336 DVLOG(1) << ENDPOINT
337 << "Invalid headers for stream: " << id()
338 << " header_id: " << headers_id_
339 << " current_header_id: " << current_header_id;
340 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
341 return total_bytes_consumed;
342 }
343
344 // If we are head-of-line blocked on decompression, then back up.
345 if (current_header_id != headers_id_) {
346 session_->MarkDecompressionBlocked(headers_id_, id());
347 DVLOG(1) << ENDPOINT
348 << "Unable to decompress header data for stream: " << id()
349 << " header_id: " << headers_id_;
350 return total_bytes_consumed;
351 }
352
353 // Decompressed data will be delivered to decompressed_headers_.
354 size_t bytes_consumed = session_->decompressor()->DecompressData(
355 StringPiece(data, data_len), this);
356 DCHECK_NE(0u, bytes_consumed);
357 if (bytes_consumed > data_len) {
358 DCHECK(false) << "DecompressData returned illegal value";
359 OnDecompressionError();
360 return total_bytes_consumed;
361 }
362 total_bytes_consumed += bytes_consumed;
363 data += bytes_consumed;
364 data_len -= bytes_consumed;
365
366 if (decompression_failed_) {
367 // The session will have been closed in OnDecompressionError.
368 return total_bytes_consumed;
369 }
370
371 // Headers are complete if the decompressor has moved on to the
372 // next stream.
373 headers_decompressed_ =
374 session_->decompressor()->current_header_id() != headers_id_;
375 if (!headers_decompressed_) {
376 DCHECK_EQ(0u, data_len);
377 }
378
379 ProcessHeaderData();
380
381 if (!headers_decompressed_ || !decompressed_headers_.empty()) {
382 return total_bytes_consumed;
383 }
384
385 // We have processed all of the decompressed data but we might
386 // have some more raw data to process.
387 if (data_len > 0) {
388 total_bytes_consumed += ProcessData(data, data_len);
389 }
390
391 // The sequencer will push any additional buffered frames if this data
392 // has been completely consumed.
393 return total_bytes_consumed;
394 }
395
396 uint32 ReliableQuicStream::ProcessHeaderData() {
397 if (decompressed_headers_.empty()) {
398 return 0;
399 }
400
401 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
402 decompressed_headers_.length());
403 if (bytes_processed == decompressed_headers_.length()) {
404 decompressed_headers_.clear();
405 } else {
406 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
407 }
408 return bytes_processed;
409 }
410
411 void ReliableQuicStream::OnDecompressorAvailable() {
412 DCHECK_EQ(headers_id_,
413 session_->decompressor()->current_header_id());
414 DCHECK(!headers_decompressed_);
415 DCHECK(!decompression_failed_);
416 DCHECK_EQ(0u, decompressed_headers_.length());
417
418 while (!headers_decompressed_) {
419 struct iovec iovec;
420 if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
421 return;
422 }
423
424 size_t bytes_consumed = session_->decompressor()->DecompressData(
425 StringPiece(static_cast<char*>(iovec.iov_base),
426 iovec.iov_len),
427 this);
428 DCHECK_LE(bytes_consumed, iovec.iov_len);
429 if (decompression_failed_) {
430 return;
431 }
432 sequencer_.MarkConsumed(bytes_consumed);
433
434 headers_decompressed_ =
435 session_->decompressor()->current_header_id() != headers_id_;
436 }
437
438 // Either the headers are complete, or the all data as been consumed.
439 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
440 if (IsDoneReading()) {
441 OnFinRead();
442 } else if (headers_decompressed_ && decompressed_headers_.empty()) {
443 sequencer_.FlushBufferedFrames();
444 }
445 }
446
447 bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
448 data.AppendToString(&decompressed_headers_);
449 return true;
450 }
451
452 void ReliableQuicStream::OnDecompressionError() {
453 DCHECK(!decompression_failed_);
454 decompression_failed_ = true;
455 session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
456 }
457
458
459 void ReliableQuicStream::CloseWriteSide() { 199 void ReliableQuicStream::CloseWriteSide() {
460 if (write_side_closed_) { 200 if (write_side_closed_) {
461 return; 201 return;
462 } 202 }
463 DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); 203 DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
464 204
465 write_side_closed_ = true; 205 write_side_closed_ = true;
466 if (read_side_closed_) { 206 if (read_side_closed_) {
467 DVLOG(1) << ENDPOINT << "Closing stream: " << id(); 207 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
468 session_->CloseStream(id()); 208 session_->CloseStream(id());
469 } 209 }
470 } 210 }
471 211
472 bool ReliableQuicStream::HasBufferedData() { 212 bool ReliableQuicStream::HasBufferedData() {
473 return !queued_data_.empty(); 213 return !queued_data_.empty();
474 } 214 }
475 215
476 void ReliableQuicStream::OnClose() { 216 void ReliableQuicStream::OnClose() {
477 CloseReadSide(); 217 CloseReadSide();
478 CloseWriteSide(); 218 CloseWriteSide();
479
480 if (visitor_) {
481 Visitor* visitor = visitor_;
482 // Calling Visitor::OnClose() may result the destruction of the visitor,
483 // so we need to ensure we don't call it again.
484 visitor_ = NULL;
485 visitor->OnClose(this);
486 }
487 }
488
489 uint32 ReliableQuicStream::StripPriorityAndHeaderId(
490 const char* data, uint32 data_len) {
491 uint32 total_bytes_parsed = 0;
492
493 if (!priority_parsed_ && session_->connection()->is_server()) {
494 QuicPriority temporary_priority = priority_;
495 total_bytes_parsed = StripUint32(
496 data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
497 if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) {
498 priority_parsed_ = true;
499
500 // Spdy priorities are inverted, so the highest numerical value is the
501 // lowest legal priority.
502 if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
503 session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
504 return 0;
505 }
506 priority_ = temporary_priority;
507 }
508 data += total_bytes_parsed;
509 data_len -= total_bytes_parsed;
510 }
511 if (data_len > 0 && headers_id_ == 0u) {
512 // The headers ID has not yet been read. Strip it from the beginning of
513 // the data stream.
514 total_bytes_parsed += StripUint32(
515 data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
516 }
517 return total_bytes_parsed;
518 } 219 }
519 220
520 } // namespace net 221 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698