| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/quic/reliable_quic_stream.h" | 5 #include "net/quic/reliable_quic_stream.h" |
| 6 | 6 |
| 7 #include "net/quic/quic_session.h" | 7 #include "net/quic/quic_session.h" |
| 8 #include "net/quic/quic_spdy_decompressor.h" | |
| 9 | 8 |
| 10 using base::StringPiece; | 9 using base::StringPiece; |
| 11 using std::min; | |
| 12 | 10 |
| 13 namespace net { | 11 namespace net { |
| 14 | 12 |
| 15 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, | 13 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
| 16 QuicSession* session) | 14 QuicSession* session) |
| 17 : sequencer_(this), | 15 : sequencer_(this), |
| 18 id_(id), | 16 id_(id), |
| 19 session_(session), | 17 session_(session), |
| 20 visitor_(NULL), | 18 visitor_(NULL), |
| 21 stream_bytes_read_(0), | 19 stream_bytes_read_(0), |
| 22 stream_bytes_written_(0), | 20 stream_bytes_written_(0), |
| 23 headers_complete_(false), | |
| 24 headers_id_(0), | |
| 25 stream_error_(QUIC_STREAM_NO_ERROR), | 21 stream_error_(QUIC_STREAM_NO_ERROR), |
| 26 connection_error_(QUIC_NO_ERROR), | 22 connection_error_(QUIC_NO_ERROR), |
| 27 read_side_closed_(false), | 23 read_side_closed_(false), |
| 28 write_side_closed_(false), | 24 write_side_closed_(false), |
| 29 fin_buffered_(false), | 25 fin_buffered_(false), |
| 30 fin_sent_(false) { | 26 fin_sent_(false) { |
| 31 } | 27 } |
| 32 | 28 |
| 33 ReliableQuicStream::~ReliableQuicStream() { | 29 ReliableQuicStream::~ReliableQuicStream() { |
| 34 } | 30 } |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 91 CloseWriteSide(); | 87 CloseWriteSide(); |
| 92 } | 88 } |
| 93 CloseReadSide(); | 89 CloseReadSide(); |
| 94 } | 90 } |
| 95 | 91 |
| 96 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { | 92 void ReliableQuicStream::Close(QuicRstStreamErrorCode error) { |
| 97 stream_error_ = error; | 93 stream_error_ = error; |
| 98 session()->SendRstStream(id(), error); | 94 session()->SendRstStream(id(), error); |
| 99 } | 95 } |
| 100 | 96 |
| 101 int ReliableQuicStream::Readv(const struct iovec* iov, int iov_len) { | |
| 102 if (headers_complete_ && decompressed_headers_.empty()) { | |
| 103 return sequencer_.Readv(iov, iov_len); | |
| 104 } | |
| 105 size_t bytes_consumed = 0; | |
| 106 int iov_index = 0; | |
| 107 while (iov_index < iov_len && | |
| 108 decompressed_headers_.length() > bytes_consumed) { | |
| 109 int bytes_to_read = min(iov[iov_index].iov_len, | |
| 110 decompressed_headers_.length() - bytes_consumed); | |
| 111 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); | |
| 112 memcpy(iov_ptr, | |
| 113 decompressed_headers_.data() + bytes_consumed, bytes_to_read); | |
| 114 bytes_consumed += bytes_to_read; | |
| 115 ++iov_index; | |
| 116 } | |
| 117 decompressed_headers_.erase(0, bytes_consumed); | |
| 118 return bytes_consumed; | |
| 119 } | |
| 120 | |
| 121 int ReliableQuicStream::GetReadableRegions(iovec* iov, int iov_len) { | |
| 122 if (headers_complete_ && decompressed_headers_.empty()) { | |
| 123 return sequencer_.GetReadableRegions(iov, iov_len); | |
| 124 } | |
| 125 if (iov_len == 0) { | |
| 126 return 0; | |
| 127 } | |
| 128 iov[0].iov_base = static_cast<void*>( | |
| 129 const_cast<char*>(decompressed_headers_.data())); | |
| 130 iov[0].iov_len = decompressed_headers_.length(); | |
| 131 return 1; | |
| 132 } | |
| 133 | |
| 134 bool ReliableQuicStream::IsHalfClosed() const { | 97 bool ReliableQuicStream::IsHalfClosed() const { |
| 135 if (!headers_complete_ || !decompressed_headers_.empty()) { | |
| 136 return false; | |
| 137 } | |
| 138 return sequencer_.IsHalfClosed(); | 98 return sequencer_.IsHalfClosed(); |
| 139 } | 99 } |
| 140 | 100 |
| 141 bool ReliableQuicStream::IsClosed() const { | 101 bool ReliableQuicStream::IsClosed() const { |
| 142 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); | 102 return write_side_closed_ && (read_side_closed_ || IsHalfClosed()); |
| 143 } | 103 } |
| 144 | 104 |
| 145 bool ReliableQuicStream::HasBytesToRead() const { | 105 bool ReliableQuicStream::HasBytesToRead() const { |
| 146 return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); | 106 return sequencer_.HasBytesToRead(); |
| 147 } | 107 } |
| 148 | 108 |
| 149 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { | 109 const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { |
| 150 return session_->peer_address(); | 110 return session_->peer_address(); |
| 151 } | 111 } |
| 152 | 112 |
| 153 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { | 113 QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { |
| 154 return WriteOrBuffer(data, fin); | 114 return WriteOrBuffer(data, fin); |
| 155 } | 115 } |
| 156 | 116 |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 221 } | 181 } |
| 222 DLOG(INFO) << "Done reading from stream " << id(); | 182 DLOG(INFO) << "Done reading from stream " << id(); |
| 223 | 183 |
| 224 read_side_closed_ = true; | 184 read_side_closed_ = true; |
| 225 if (write_side_closed_) { | 185 if (write_side_closed_) { |
| 226 DLOG(INFO) << "Closing stream: " << id(); | 186 DLOG(INFO) << "Closing stream: " << id(); |
| 227 session_->CloseStream(id()); | 187 session_->CloseStream(id()); |
| 228 } | 188 } |
| 229 } | 189 } |
| 230 | 190 |
| 231 uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { | |
| 232 if (id() == kCryptoStreamId) { | |
| 233 // The crypto stream does not use compression. | |
| 234 return ProcessData(data, data_len); | |
| 235 } | |
| 236 uint32 total_bytes_consumed = 0; | |
| 237 if (headers_id_ == 0u) { | |
| 238 // The headers ID has not yet been read. Strip it from the beginning of | |
| 239 // the data stream. | |
| 240 DCHECK_GT(4u, headers_id_buffer_.length()); | |
| 241 size_t missing_size = 4 - headers_id_buffer_.length(); | |
| 242 if (data_len < missing_size) { | |
| 243 StringPiece(data, data_len).AppendToString(&headers_id_buffer_); | |
| 244 return data_len; | |
| 245 } | |
| 246 total_bytes_consumed += missing_size; | |
| 247 StringPiece(data, missing_size).AppendToString(&headers_id_buffer_); | |
| 248 DCHECK_EQ(4u, headers_id_buffer_.length()); | |
| 249 memcpy(&headers_id_, headers_id_buffer_.data(), 4); | |
| 250 headers_id_buffer_.clear(); | |
| 251 data += missing_size; | |
| 252 data_len -= missing_size; | |
| 253 } | |
| 254 DCHECK_NE(0u, headers_id_); | |
| 255 | |
| 256 // Once the headers are finished, we simply pass the data through. | |
| 257 if (headers_complete_ && decompressed_headers_.empty()) { | |
| 258 DVLOG(1) << "Delegating procesing to ProcessData"; | |
| 259 return total_bytes_consumed + ProcessData(data, data_len); | |
| 260 } | |
| 261 | |
| 262 QuicHeaderId current_header_id = | |
| 263 session_->decompressor()->current_header_id(); | |
| 264 // Ensure that this header id looks sane. | |
| 265 if (headers_id_ < current_header_id || | |
| 266 headers_id_ > kMaxHeaderIdDelta + current_header_id) { | |
| 267 DVLOG(1) << "Invalud headers for stream: " << id() | |
| 268 << " header_id: " << headers_id_; | |
| 269 session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); | |
| 270 } | |
| 271 | |
| 272 // If we are head-of-line blocked on decompression, then back up. | |
| 273 if (current_header_id != headers_id_) { | |
| 274 session_->MarkDecompressionBlocked(headers_id_, id()); | |
| 275 DVLOG(1) << "Unable to decmpress header data for stream: " << id() | |
| 276 << " header_id: " << headers_id_; | |
| 277 return total_bytes_consumed; | |
| 278 } | |
| 279 | |
| 280 // Decompressed data will be delivered to decompressed_headers_. | |
| 281 size_t bytes_consumed = session_->decompressor()->DecompressData( | |
| 282 StringPiece(data, data_len), this); | |
| 283 total_bytes_consumed += bytes_consumed; | |
| 284 | |
| 285 // Headers are complete if the decompressor has moved on to the | |
| 286 // next stream. | |
| 287 headers_complete_ = | |
| 288 session_->decompressor()->current_header_id() != headers_id_; | |
| 289 | |
| 290 if (!decompressed_headers_.empty()) { | |
| 291 size_t bytes_processed = ProcessData(decompressed_headers_.data(), | |
| 292 decompressed_headers_.length()); | |
| 293 if (bytes_processed == decompressed_headers_.length()) { | |
| 294 decompressed_headers_.clear(); | |
| 295 } else { | |
| 296 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | |
| 297 } | |
| 298 } | |
| 299 | |
| 300 // We have processed all of the decompressed data but we might | |
| 301 // have some more raw data to process. | |
| 302 if (decompressed_headers_.empty() || total_bytes_consumed < data_len) { | |
| 303 total_bytes_consumed += ProcessData(data + bytes_consumed, | |
| 304 data_len - bytes_consumed); | |
| 305 } | |
| 306 | |
| 307 // The sequencer will push any additional buffered frames if this data | |
| 308 // has been completely consumed. | |
| 309 return total_bytes_consumed; | |
| 310 } | |
| 311 | |
| 312 uint32 ReliableQuicStream::ProcessHeaderData() { | |
| 313 if (decompressed_headers_.empty()) { | |
| 314 return 0; | |
| 315 } | |
| 316 | |
| 317 size_t bytes_processed = ProcessData(decompressed_headers_.data(), | |
| 318 decompressed_headers_.length()); | |
| 319 if (bytes_processed == decompressed_headers_.length()) { | |
| 320 decompressed_headers_.clear(); | |
| 321 } else { | |
| 322 decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); | |
| 323 } | |
| 324 return bytes_processed; | |
| 325 } | |
| 326 | |
| 327 void ReliableQuicStream::OnDecompressorAvailable() { | |
| 328 DCHECK_EQ(headers_id_, | |
| 329 session_->decompressor()->current_header_id()); | |
| 330 DCHECK(!headers_complete_); | |
| 331 DCHECK_EQ(0u, decompressed_headers_.length()); | |
| 332 | |
| 333 size_t total_bytes_consumed = 0; | |
| 334 struct iovec iovecs[5]; | |
| 335 while (!headers_complete_) { | |
| 336 size_t num_iovecs = | |
| 337 sequencer_.GetReadableRegions(iovecs, arraysize(iovecs)); | |
| 338 | |
| 339 if (num_iovecs == 0) { | |
| 340 return; | |
| 341 } | |
| 342 for (size_t i = 0; i < num_iovecs && !headers_complete_; i++) { | |
| 343 total_bytes_consumed += session_->decompressor()->DecompressData( | |
| 344 StringPiece(static_cast<char*>(iovecs[i].iov_base), | |
| 345 iovecs[i].iov_len), this); | |
| 346 | |
| 347 headers_complete_ = | |
| 348 session_->decompressor()->current_header_id() != headers_id_; | |
| 349 } | |
| 350 } | |
| 351 | |
| 352 // Either the headers are complete, or the all data as been consumed. | |
| 353 sequencer_.MarkConsumed(total_bytes_consumed); | |
| 354 | |
| 355 ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. | |
| 356 | |
| 357 if (headers_complete_ && decompressed_headers_.empty()) { | |
| 358 sequencer_.FlushBufferedFrames(); | |
| 359 } | |
| 360 } | |
| 361 | |
| 362 bool ReliableQuicStream::OnDecompressedData(StringPiece data) { | |
| 363 data.AppendToString(&decompressed_headers_); | |
| 364 return true; | |
| 365 } | |
| 366 | |
| 367 void ReliableQuicStream::CloseWriteSide() { | 191 void ReliableQuicStream::CloseWriteSide() { |
| 368 if (write_side_closed_) { | 192 if (write_side_closed_) { |
| 369 return; | 193 return; |
| 370 } | 194 } |
| 371 DLOG(INFO) << "Done writing to stream " << id(); | 195 DLOG(INFO) << "Done writing to stream " << id(); |
| 372 | 196 |
| 373 write_side_closed_ = true; | 197 write_side_closed_ = true; |
| 374 if (read_side_closed_) { | 198 if (read_side_closed_) { |
| 375 DLOG(INFO) << "Closing stream: " << id(); | 199 DLOG(INFO) << "Closing stream: " << id(); |
| 376 session_->CloseStream(id()); | 200 session_->CloseStream(id()); |
| 377 } | 201 } |
| 378 } | 202 } |
| 379 | 203 |
| 380 void ReliableQuicStream::OnClose() { | 204 void ReliableQuicStream::OnClose() { |
| 381 CloseReadSide(); | 205 CloseReadSide(); |
| 382 CloseWriteSide(); | 206 CloseWriteSide(); |
| 383 | 207 |
| 384 if (visitor_) { | 208 if (visitor_) { |
| 385 Visitor* visitor = visitor_; | 209 Visitor* visitor = visitor_; |
| 386 // Calling Visitor::OnClose() may result the destruction of the visitor, | 210 // Calling Visitor::OnClose() may result the destruction of the visitor, |
| 387 // so we need to ensure we don't call it again. | 211 // so we need to ensure we don't call it again. |
| 388 visitor_ = NULL; | 212 visitor_ = NULL; |
| 389 visitor->OnClose(this); | 213 visitor->OnClose(this); |
| 390 } | 214 } |
| 391 } | 215 } |
| 392 | 216 |
| 393 } // namespace net | 217 } // namespace net |
| OLD | NEW |