| Index: net/quic/quic_data_stream.cc
|
| diff --git a/net/quic/quic_data_stream.cc b/net/quic/quic_data_stream.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..29616b8c5ab56844006efe16d62bd9fbb883a247
|
| --- /dev/null
|
| +++ b/net/quic/quic_data_stream.cc
|
| @@ -0,0 +1,333 @@
|
| +// Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "net/quic/quic_data_stream.h"
|
| +
|
| +#include "base/logging.h"
|
| +#include "net/quic/quic_session.h"
|
| +#include "net/quic/quic_spdy_decompressor.h"
|
| +#include "net/spdy/write_blocked_list.h"
|
| +
|
| +using base::StringPiece;
|
| +using std::min;
|
| +
|
| +namespace net {
|
| +
|
| +#define ENDPOINT (session()->is_server() ? "Server: " : " Client: ")
|
| +
|
| +namespace {
|
| +
|
| +// This is somewhat arbitrary. It's possible, but unlikely, we will either fail
|
| +// to set a priority client-side, or cancel a stream before stripping the
|
| +// priority from the wire server-side. In either case, start out with a
|
| +// priority in the middle.
|
| +QuicPriority kDefaultPriority = 3;
|
| +
|
| +// Appends bytes from data into partial_data_buffer. Once partial_data_buffer
|
| +// reaches 4 bytes, copies the data into 'result' and clears
|
| +// partial_data_buffer.
|
| +// Returns the number of bytes consumed.
|
| +uint32 StripUint32(const char* data, uint32 data_len,
|
| + string* partial_data_buffer,
|
| + uint32* result) {
|
| + DCHECK_GT(4u, partial_data_buffer->length());
|
| + size_t missing_size = 4 - partial_data_buffer->length();
|
| + if (data_len < missing_size) {
|
| + StringPiece(data, data_len).AppendToString(partial_data_buffer);
|
| + return data_len;
|
| + }
|
| + StringPiece(data, missing_size).AppendToString(partial_data_buffer);
|
| + DCHECK_EQ(4u, partial_data_buffer->length());
|
| + memcpy(result, partial_data_buffer->data(), 4);
|
| + partial_data_buffer->clear();
|
| + return missing_size;
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| +QuicDataStream::QuicDataStream(QuicStreamId id,
|
| + QuicSession* session)
|
| + : ReliableQuicStream(id, session),
|
| + visitor_(NULL),
|
| + headers_decompressed_(false),
|
| + priority_(kDefaultPriority),
|
| + headers_id_(0),
|
| + decompression_failed_(false),
|
| + priority_parsed_(false) {
|
| + DCHECK_NE(kCryptoStreamId, id);
|
| +}
|
| +
|
| +QuicDataStream::~QuicDataStream() {
|
| +}
|
| +
|
| +size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) {
|
| + if (FinishedReadingHeaders()) {
|
| + // If the headers have been read, simply delegate to the sequencer's
|
| + // Readv method.
|
| + return sequencer()->Readv(iov, iov_len);
|
| + }
|
| + // Otherwise, copy decompressed header data into |iov|.
|
| + size_t bytes_consumed = 0;
|
| + size_t iov_index = 0;
|
| + while (iov_index < iov_len &&
|
| + decompressed_headers_.length() > bytes_consumed) {
|
| + size_t bytes_to_read = min(iov[iov_index].iov_len,
|
| + decompressed_headers_.length() - bytes_consumed);
|
| + char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
|
| + memcpy(iov_ptr,
|
| + decompressed_headers_.data() + bytes_consumed, bytes_to_read);
|
| + bytes_consumed += bytes_to_read;
|
| + ++iov_index;
|
| + }
|
| + decompressed_headers_.erase(0, bytes_consumed);
|
| + return bytes_consumed;
|
| +}
|
| +
|
| +int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) {
|
| + if (FinishedReadingHeaders()) {
|
| + return sequencer()->GetReadableRegions(iov, iov_len);
|
| + }
|
| + if (iov_len == 0) {
|
| + return 0;
|
| + }
|
| + iov[0].iov_base = static_cast<void*>(
|
| + const_cast<char*>(decompressed_headers_.data()));
|
| + iov[0].iov_len = decompressed_headers_.length();
|
| + return 1;
|
| +}
|
| +
|
| +bool QuicDataStream::IsDoneReading() const {
|
| + if (!headers_decompressed_ || !decompressed_headers_.empty()) {
|
| + return false;
|
| + }
|
| + return sequencer()->IsClosed();
|
| +}
|
| +
|
| +bool QuicDataStream::HasBytesToRead() const {
|
| + return !decompressed_headers_.empty() || sequencer()->HasBytesToRead();
|
| +}
|
| +
|
| +void QuicDataStream::set_priority(QuicPriority priority) {
|
| + DCHECK_EQ(0u, stream_bytes_written());
|
| + priority_ = priority;
|
| +}
|
| +
|
| +QuicPriority QuicDataStream::EffectivePriority() const {
|
| + return priority();
|
| +}
|
| +
|
| +uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) {
|
| + DCHECK_NE(0u, data_len);
|
| +
|
| + uint32 total_bytes_consumed = 0;
|
| + if (headers_id_ == 0u) {
|
| + total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
|
| + data += total_bytes_consumed;
|
| + data_len -= total_bytes_consumed;
|
| + if (data_len == 0 || total_bytes_consumed == 0) {
|
| + return total_bytes_consumed;
|
| + }
|
| + }
|
| + DCHECK_NE(0u, headers_id_);
|
| +
|
| + // Once the headers are finished, we simply pass the data through.
|
| + if (headers_decompressed_) {
|
| + // Some buffered header data remains.
|
| + if (!decompressed_headers_.empty()) {
|
| + ProcessHeaderData();
|
| + }
|
| + if (decompressed_headers_.empty()) {
|
| + DVLOG(1) << "Delegating procesing to ProcessData";
|
| + total_bytes_consumed += ProcessData(data, data_len);
|
| + }
|
| + return total_bytes_consumed;
|
| + }
|
| +
|
| + QuicHeaderId current_header_id =
|
| + session()->decompressor()->current_header_id();
|
| + // Ensure that this header id looks sane.
|
| + if (headers_id_ < current_header_id ||
|
| + headers_id_ > kMaxHeaderIdDelta + current_header_id) {
|
| + DVLOG(1) << ENDPOINT
|
| + << "Invalid headers for stream: " << id()
|
| + << " header_id: " << headers_id_
|
| + << " current_header_id: " << current_header_id;
|
| + session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
|
| + return total_bytes_consumed;
|
| + }
|
| +
|
| + // If we are head-of-line blocked on decompression, then back up.
|
| + if (current_header_id != headers_id_) {
|
| + session()->MarkDecompressionBlocked(headers_id_, id());
|
| + DVLOG(1) << ENDPOINT
|
| + << "Unable to decompress header data for stream: " << id()
|
| + << " header_id: " << headers_id_;
|
| + return total_bytes_consumed;
|
| + }
|
| +
|
| + // Decompressed data will be delivered to decompressed_headers_.
|
| + size_t bytes_consumed = session()->decompressor()->DecompressData(
|
| + StringPiece(data, data_len), this);
|
| + DCHECK_NE(0u, bytes_consumed);
|
| + if (bytes_consumed > data_len) {
|
| + DCHECK(false) << "DecompressData returned illegal value";
|
| + OnDecompressionError();
|
| + return total_bytes_consumed;
|
| + }
|
| + total_bytes_consumed += bytes_consumed;
|
| + data += bytes_consumed;
|
| + data_len -= bytes_consumed;
|
| +
|
| + if (decompression_failed_) {
|
| + // The session will have been closed in OnDecompressionError.
|
| + return total_bytes_consumed;
|
| + }
|
| +
|
| + // Headers are complete if the decompressor has moved on to the
|
| + // next stream.
|
| + headers_decompressed_ =
|
| + session()->decompressor()->current_header_id() != headers_id_;
|
| + if (!headers_decompressed_) {
|
| + DCHECK_EQ(0u, data_len);
|
| + }
|
| +
|
| + ProcessHeaderData();
|
| +
|
| + if (!headers_decompressed_ || !decompressed_headers_.empty()) {
|
| + return total_bytes_consumed;
|
| + }
|
| +
|
| + // We have processed all of the decompressed data but we might
|
| + // have some more raw data to process.
|
| + if (data_len > 0) {
|
| + total_bytes_consumed += ProcessData(data, data_len);
|
| + }
|
| +
|
| + // The sequencer will push any additional buffered frames if this data
|
| + // has been completely consumed.
|
| + return total_bytes_consumed;
|
| +}
|
| +
|
| +const IPEndPoint& QuicDataStream::GetPeerAddress() {
|
| + return session()->peer_address();
|
| +}
|
| +
|
| +QuicSpdyCompressor* QuicDataStream::compressor() {
|
| + return session()->compressor();
|
| +}
|
| +
|
| +bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) {
|
| + return session()->GetSSLInfo(ssl_info);
|
| +}
|
| +
|
| +uint32 QuicDataStream::ProcessHeaderData() {
|
| + if (decompressed_headers_.empty()) {
|
| + return 0;
|
| + }
|
| +
|
| + size_t bytes_processed = ProcessData(decompressed_headers_.data(),
|
| + decompressed_headers_.length());
|
| + if (bytes_processed == decompressed_headers_.length()) {
|
| + decompressed_headers_.clear();
|
| + } else {
|
| + decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
|
| + }
|
| + return bytes_processed;
|
| +}
|
| +
|
| +void QuicDataStream::OnDecompressorAvailable() {
|
| + DCHECK_EQ(headers_id_,
|
| + session()->decompressor()->current_header_id());
|
| + DCHECK(!headers_decompressed_);
|
| + DCHECK(!decompression_failed_);
|
| + DCHECK_EQ(0u, decompressed_headers_.length());
|
| +
|
| + while (!headers_decompressed_) {
|
| + struct iovec iovec;
|
| + if (sequencer()->GetReadableRegions(&iovec, 1) == 0) {
|
| + return;
|
| + }
|
| +
|
| + size_t bytes_consumed = session()->decompressor()->DecompressData(
|
| + StringPiece(static_cast<char*>(iovec.iov_base),
|
| + iovec.iov_len),
|
| + this);
|
| + DCHECK_LE(bytes_consumed, iovec.iov_len);
|
| + if (decompression_failed_) {
|
| + return;
|
| + }
|
| + sequencer()->MarkConsumed(bytes_consumed);
|
| +
|
| + headers_decompressed_ =
|
| + session()->decompressor()->current_header_id() != headers_id_;
|
| + }
|
| +
|
| + // Either the headers are complete, or the all data as been consumed.
|
| + ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
|
| + if (IsDoneReading()) {
|
| + OnFinRead();
|
| + } else if (FinishedReadingHeaders()) {
|
| + sequencer()->FlushBufferedFrames();
|
| + }
|
| +}
|
| +
|
| +bool QuicDataStream::OnDecompressedData(StringPiece data) {
|
| + data.AppendToString(&decompressed_headers_);
|
| + return true;
|
| +}
|
| +
|
| +void QuicDataStream::OnDecompressionError() {
|
| + DCHECK(!decompression_failed_);
|
| + decompression_failed_ = true;
|
| + session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
|
| +}
|
| +
|
| +void QuicDataStream::OnClose() {
|
| + ReliableQuicStream::OnClose();
|
| +
|
| + if (visitor_) {
|
| + Visitor* visitor = visitor_;
|
| + // Calling Visitor::OnClose() may result the destruction of the visitor,
|
| + // so we need to ensure we don't call it again.
|
| + visitor_ = NULL;
|
| + visitor->OnClose(this);
|
| + }
|
| +}
|
| +
|
| +uint32 QuicDataStream::StripPriorityAndHeaderId(
|
| + const char* data, uint32 data_len) {
|
| + uint32 total_bytes_parsed = 0;
|
| +
|
| + if (!priority_parsed_ && session()->connection()->is_server()) {
|
| + QuicPriority temporary_priority = priority_;
|
| + total_bytes_parsed = StripUint32(
|
| + data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
|
| + if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) {
|
| + priority_parsed_ = true;
|
| +
|
| + // Spdy priorities are inverted, so the highest numerical value is the
|
| + // lowest legal priority.
|
| + if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
|
| + session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
|
| + return 0;
|
| + }
|
| + priority_ = temporary_priority;
|
| + }
|
| + data += total_bytes_parsed;
|
| + data_len -= total_bytes_parsed;
|
| + }
|
| + if (data_len > 0 && headers_id_ == 0u) {
|
| + // The headers ID has not yet been read. Strip it from the beginning of
|
| + // the data stream.
|
| + total_bytes_parsed += StripUint32(
|
| + data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
|
| + }
|
| + return total_bytes_parsed;
|
| +}
|
| +
|
| +bool QuicDataStream::FinishedReadingHeaders() {
|
| + return headers_decompressed_ && decompressed_headers_.empty();
|
| +}
|
| +
|
| +} // namespace net
|
|
|