Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: trunk/src/net/quic/reliable_quic_stream.cc

Issue 15018013: Revert 198736 "Land Recent QUIC changes" (Closed) Base URL: svn://svn.chromium.org/chrome/
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/reliable_quic_stream.h" 5 #include "net/quic/reliable_quic_stream.h"
6 6
7 #include "net/quic/quic_session.h" 7 #include "net/quic/quic_session.h"
8 #include "net/quic/quic_spdy_decompressor.h"
9 8
10 using base::StringPiece; 9 using base::StringPiece;
11 using std::min;
12 10
13 namespace net { 11 namespace net {
14 12
15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, 13 ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
16 QuicSession* session) 14 QuicSession* session)
17 : sequencer_(this), 15 : sequencer_(this),
18 id_(id), 16 id_(id),
19 session_(session), 17 session_(session),
20 visitor_(NULL), 18 visitor_(NULL),
21 stream_bytes_read_(0), 19 stream_bytes_read_(0),
22 stream_bytes_written_(0), 20 stream_bytes_written_(0),
23 headers_complete_(false),
24 headers_id_(0),
25 stream_error_(QUIC_STREAM_NO_ERROR), 21 stream_error_(QUIC_STREAM_NO_ERROR),
26 connection_error_(QUIC_NO_ERROR), 22 connection_error_(QUIC_NO_ERROR),
27 read_side_closed_(false), 23 read_side_closed_(false),
28 write_side_closed_(false), 24 write_side_closed_(false),
29 fin_buffered_(false), 25 fin_buffered_(false),
30 fin_sent_(false) { 26 fin_sent_(false) {
31 } 27 }
32 28
33 ReliableQuicStream::~ReliableQuicStream() { 29 ReliableQuicStream::~ReliableQuicStream() {
34 } 30 }
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 CloseWriteSide(); 87 CloseWriteSide();
92 } 88 }
93 CloseReadSide(); 89 CloseReadSide();
94 } 90 }
95 91
96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { 92 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) {
97 stream_error_ = error; 93 stream_error_ = error;
98 session()->SendRstStream(id(), error); 94 session()->SendRstStream(id(), error);
99 } 95 }
100 96
101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) {
102 if (headers_complete_ && decompressed_headers_.empty()) {
103 return sequencer_.Readv(iov, iov_len);
104 }
105 size_t bytes_consumed = 0;
106 int iov_index = 0;
107 while (iov_index < iov_len &&
108 decompressed_headers_.length() > bytes_consumed) {
109 int bytes_to_read = min(iov[iov_index].iov_len,
110 decompressed_headers_.length() - bytes_consumed);
111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
112 memcpy(iov_ptr,
113 decompressed_headers_.data() + bytes_consumed, bytes_to_read);
114 bytes_consumed += bytes_to_read;
115 ++iov_index;
116 }
117 decompressed_headers_.erase(0, bytes_consumed);
118 return bytes_consumed;
119 }
120
121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) {
122 if (headers_complete_ && decompressed_headers_.empty()) {
123 return sequencer_.GetReadableRegions(iov, iov_len);
124 }
125 if (iov_len == 0) {
126 return 0;
127 }
128 iov[0].iov_base = static_cast<void*>(
129 const_cast<char*>(decompressed_headers_.data()));
130 iov[0].iov_len = decompressed_headers_.length();
131 return 1;
132 }
133
134 bool ReliableQuicStream::IsHalfClosed() const { 97 bool ReliableQuicStream::IsHalfClosed() const {
135 if (!headers_complete_ || !decompressed_headers_.empty()) {
136 return false;
137 }
138 return sequencer_.IsHalfClosed(); 98 return sequencer_.IsHalfClosed();
139 } 99 }
140 100
141 bool ReliableQuicStream::IsClosed() const { 101 bool ReliableQuicStream::IsClosed() const {
142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); 102 return write_side_closed_ && (read_side_closed_ || IsHalfClosed());
143 } 103 }
144 104
145 bool ReliableQuicStream::HasBytesToRead() const { 105 bool ReliableQuicStream::HasBytesToRead() const {
146 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); 106 return sequencer_.HasBytesToRead();
147 } 107 }
148 108
149 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { 109 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
150 return session_->peer_address(); 110 return session_->peer_address();
151 } 111 }
152 112
153 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { 113 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
154 return WriteOrBuffer(data, fin); 114 return WriteOrBuffer(data, fin);
155 } 115 }
156 116
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 } 181 }
222 DLOG(INFO) << "Done reading from stream " << id(); 182 DLOG(INFO) << "Done reading from stream " << id();
223 183
224 read_side_closed_ = true; 184 read_side_closed_ = true;
225 if (write_side_closed_) { 185 if (write_side_closed_) {
226 DLOG(INFO) << "Closing stream: " << id(); 186 DLOG(INFO) << "Closing stream: " << id();
227 session_->CloseStream(id()); 187 session_->CloseStream(id());
228 } 188 }
229 } 189 }
230 190
231 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
232 if (id() == kCryptoStreamId) {
233 // The crypto stream does not use compression.
234 return ProcessData(data, data_len);
235 }
236 uint32 total_bytes_consumed = 0;
237 if (headers_id_ == 0u) {
238 // The headers ID has not yet been read. Strip it from the beginning of
239 // the data stream.
240 DCHECK_GT(4u, headers_id_buffer_.length());
241 size_t missing_size = 4 - headers_id_buffer_.length();
242 if (data_len < missing_size) {
243 StringPiece(data, data_len).AppendToString(&headers_id_buffer_);
244 return data_len;
245 }
246 total_bytes_consumed += missing_size;
247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_);
248 DCHECK_EQ(4u, headers_id_buffer_.length());
249 memcpy(&headers_id_, headers_id_buffer_.data(), 4);
250 headers_id_buffer_.clear();
251 data += missing_size;
252 data_len -= missing_size;
253 }
254 DCHECK_NE(0u, headers_id_);
255
256 // Once the headers are finished, we simply pass the data through.
257 if (headers_complete_ && decompressed_headers_.empty()) {
258 DVLOG(1) << "Delegating procesing to ProcessData";
259 return total_bytes_consumed + ProcessData(data, data_len);
260 }
261
262 QuicHeaderId current_header_id =
263 session_->decompressor()->current_header_id();
264 // Ensure that this header id looks sane.
265 if (headers_id_ < current_header_id ||
266 headers_id_ > kMaxHeaderIdDelta + current_header_id) {
267 DVLOG(1) << "Invalud headers for stream: " << id()
268 << " header_id: " << headers_id_;
269 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
270 }
271
272 // If we are head-of-line blocked on decompression, then back up.
273 if (current_header_id != headers_id_) {
274 session_->MarkDecompressionBlocked(headers_id_, id());
275 DVLOG(1) << "Unable to decmpress header data for stream: " << id()
276 << " header_id: " << headers_id_;
277 return total_bytes_consumed;
278 }
279
280 // Decompressed data will be delivered to decompressed_headers_.
281 size_t bytes_consumed = session_->decompressor()->DecompressData(
282 StringPiece(data, data_len), this);
283 total_bytes_consumed += bytes_consumed;
284
285 // Headers are complete if the decompressor has moved on to the
286 // next stream.
287 headers_complete_ =
288 session_->decompressor()->current_header_id() != headers_id_;
289
290 if (!decompressed_headers_.empty()) {
291 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
292 decompressed_headers_.length());
293 if (bytes_processed == decompressed_headers_.length()) {
294 decompressed_headers_.clear();
295 } else {
296 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
297 }
298 }
299
300 // We have processed all of the decompressed data but we might
301 // have some more raw data to process.
302 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) {
303 total_bytes_consumed += ProcessData(data + bytes_consumed,
304 data_len - bytes_consumed);
305 }
306
307 // The sequencer will push any additional buffered frames if this data
308 // has been completely consumed.
309 return total_bytes_consumed;
310 }
311
312 uint32 ReliableQuicStream::ProcessHeaderData() {
313 if (decompressed_headers_.empty()) {
314 return 0;
315 }
316
317 size_t bytes_processed = ProcessData(decompressed_headers_.data(),
318 decompressed_headers_.length());
319 if (bytes_processed == decompressed_headers_.length()) {
320 decompressed_headers_.clear();
321 } else {
322 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
323 }
324 return bytes_processed;
325 }
326
327 void ReliableQuicStream::OnDecompressorAvailable() {
328 DCHECK_EQ(headers_id_,
329 session_->decompressor()->current_header_id());
330 DCHECK(!headers_complete_);
331 DCHECK_EQ(0u, decompressed_headers_.length());
332
333 size_t total_bytes_consumed = 0;
334 struct iovec iovecs[5];
335 while (!headers_complete_) {
336 size_t num_iovecs =
337 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs));
338
339 if (num_iovecs == 0) {
340 return;
341 }
342 for (size_t i = 0; i < num_iovecs && !headers_complete_; i++) {
343 total_bytes_consumed += session_->decompressor()->DecompressData(
344 StringPiece(static_cast<char*>(iovecs[i].iov_base),
345 iovecs[i].iov_len), this);
346
347 headers_complete_ =
348 session_->decompressor()->current_header_id() != headers_id_;
349 }
350 }
351
352 // Either the headers are complete, or the all data as been consumed.
353 sequencer_.MarkConsumed(total_bytes_consumed);
354
355 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
356
357 if (headers_complete_ && decompressed_headers_.empty()) {
358 sequencer_.FlushBufferedFrames();
359 }
360 }
361
362 bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
363 data.AppendToString(&decompressed_headers_);
364 return true;
365 }
366
367 void ReliableQuicStream::CloseWriteSide() { 191 void ReliableQuicStream::CloseWriteSide() {
368 if (write_side_closed_) { 192 if (write_side_closed_) {
369 return; 193 return;
370 } 194 }
371 DLOG(INFO) << "Done writing to stream " << id(); 195 DLOG(INFO) << "Done writing to stream " << id();
372 196
373 write_side_closed_ = true; 197 write_side_closed_ = true;
374 if (read_side_closed_) { 198 if (read_side_closed_) {
375 DLOG(INFO) << "Closing stream: " << id(); 199 DLOG(INFO) << "Closing stream: " << id();
376 session_->CloseStream(id()); 200 session_->CloseStream(id());
377 } 201 }
378 } 202 }
379 203
380 void ReliableQuicStream::OnClose() { 204 void ReliableQuicStream::OnClose() {
381 CloseReadSide(); 205 CloseReadSide();
382 CloseWriteSide(); 206 CloseWriteSide();
383 207
384 if (visitor_) { 208 if (visitor_) {
385 Visitor* visitor = visitor_; 209 Visitor* visitor = visitor_;
386 // Calling Visitor::OnClose() may result the destruction of the visitor, 210 // Calling Visitor::OnClose() may result the destruction of the visitor,
387 // so we need to ensure we don't call it again. 211 // so we need to ensure we don't call it again.
388 visitor_ = NULL; 212 visitor_ = NULL;
389 visitor->OnClose(this); 213 visitor->OnClose(this);
390 } 214 }
391 } 215 }
392 216
393 } // namespace net 217 } // namespace net
OLDNEW
« no previous file with comments | « trunk/src/net/quic/reliable_quic_stream.h ('k') | trunk/src/net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698