Index: net/quic/quic_data_stream.cc |
diff --git a/net/quic/quic_data_stream.cc b/net/quic/quic_data_stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..29616b8c5ab56844006efe16d62bd9fbb883a247 |
--- /dev/null |
+++ b/net/quic/quic_data_stream.cc |
@@ -0,0 +1,333 @@ |
+// Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "net/quic/quic_data_stream.h" |
+ |
+#include "base/logging.h" |
+#include "net/quic/quic_session.h" |
+#include "net/quic/quic_spdy_decompressor.h" |
+#include "net/spdy/write_blocked_list.h" |
+ |
+using base::StringPiece; |
+using std::min; |
+ |
+namespace net { |
+ |
+#define ENDPOINT (session()->is_server() ? "Server: " : " Client: ") |
+ |
+namespace { |
+ |
+// This is somewhat arbitrary. It's possible, but unlikely, we will either fail |
+// to set a priority client-side, or cancel a stream before stripping the |
+// priority from the wire server-side. In either case, start out with a |
+// priority in the middle. |
+QuicPriority kDefaultPriority = 3; |
+ |
+// Appends bytes from data into partial_data_buffer. Once partial_data_buffer |
+// reaches 4 bytes, copies the data into 'result' and clears |
+// partial_data_buffer. |
+// Returns the number of bytes consumed. |
+uint32 StripUint32(const char* data, uint32 data_len, |
+ string* partial_data_buffer, |
+ uint32* result) { |
+ DCHECK_GT(4u, partial_data_buffer->length()); |
+ size_t missing_size = 4 - partial_data_buffer->length(); |
+ if (data_len < missing_size) { |
+ StringPiece(data, data_len).AppendToString(partial_data_buffer); |
+ return data_len; |
+ } |
+ StringPiece(data, missing_size).AppendToString(partial_data_buffer); |
+ DCHECK_EQ(4u, partial_data_buffer->length()); |
+ memcpy(result, partial_data_buffer->data(), 4); |
+ partial_data_buffer->clear(); |
+ return missing_size; |
+} |
+ |
+} // namespace |
+ |
+QuicDataStream::QuicDataStream(QuicStreamId id, |
+ QuicSession* session) |
+ : ReliableQuicStream(id, session), |
+ visitor_(NULL), |
+ headers_decompressed_(false), |
+ priority_(kDefaultPriority), |
+ headers_id_(0), |
+ decompression_failed_(false), |
+ priority_parsed_(false) { |
+ DCHECK_NE(kCryptoStreamId, id); |
+} |
+ |
+QuicDataStream::~QuicDataStream() { |
+} |
+ |
+size_t QuicDataStream::Readv(const struct iovec* iov, size_t iov_len) { |
+ if (FinishedReadingHeaders()) { |
+ // If the headers have been read, simply delegate to the sequencer's |
+ // Readv method. |
+ return sequencer()->Readv(iov, iov_len); |
+ } |
+ // Otherwise, copy decompressed header data into |iov|. |
+ size_t bytes_consumed = 0; |
+ size_t iov_index = 0; |
+ while (iov_index < iov_len && |
+ decompressed_headers_.length() > bytes_consumed) { |
+ size_t bytes_to_read = min(iov[iov_index].iov_len, |
+ decompressed_headers_.length() - bytes_consumed); |
+ char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); |
+ memcpy(iov_ptr, |
+ decompressed_headers_.data() + bytes_consumed, bytes_to_read); |
+ bytes_consumed += bytes_to_read; |
+ ++iov_index; |
+ } |
+ decompressed_headers_.erase(0, bytes_consumed); |
+ return bytes_consumed; |
+} |
+ |
+int QuicDataStream::GetReadableRegions(iovec* iov, size_t iov_len) { |
+ if (FinishedReadingHeaders()) { |
+ return sequencer()->GetReadableRegions(iov, iov_len); |
+ } |
+ if (iov_len == 0) { |
+ return 0; |
+ } |
+ iov[0].iov_base = static_cast<void*>( |
+ const_cast<char*>(decompressed_headers_.data())); |
+ iov[0].iov_len = decompressed_headers_.length(); |
+ return 1; |
+} |
+ |
+bool QuicDataStream::IsDoneReading() const { |
+ if (!headers_decompressed_ || !decompressed_headers_.empty()) { |
+ return false; |
+ } |
+ return sequencer()->IsClosed(); |
+} |
+ |
+bool QuicDataStream::HasBytesToRead() const { |
+ return !decompressed_headers_.empty() || sequencer()->HasBytesToRead(); |
+} |
+ |
+void QuicDataStream::set_priority(QuicPriority priority) { |
+ DCHECK_EQ(0u, stream_bytes_written()); |
+ priority_ = priority; |
+} |
+ |
+QuicPriority QuicDataStream::EffectivePriority() const { |
+ return priority(); |
+} |
+ |
+uint32 QuicDataStream::ProcessRawData(const char* data, uint32 data_len) { |
+ DCHECK_NE(0u, data_len); |
+ |
+ uint32 total_bytes_consumed = 0; |
+ if (headers_id_ == 0u) { |
+ total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); |
+ data += total_bytes_consumed; |
+ data_len -= total_bytes_consumed; |
+ if (data_len == 0 || total_bytes_consumed == 0) { |
+ return total_bytes_consumed; |
+ } |
+ } |
+ DCHECK_NE(0u, headers_id_); |
+ |
+ // Once the headers are finished, we simply pass the data through. |
+ if (headers_decompressed_) { |
+ // Some buffered header data remains. |
+ if (!decompressed_headers_.empty()) { |
+ ProcessHeaderData(); |
+ } |
+ if (decompressed_headers_.empty()) { |
+ DVLOG(1) << "Delegating procesing to ProcessData"; |
+ total_bytes_consumed += ProcessData(data, data_len); |
+ } |
+ return total_bytes_consumed; |
+ } |
+ |
+ QuicHeaderId current_header_id = |
+ session()->decompressor()->current_header_id(); |
+ // Ensure that this header id looks sane. |
+ if (headers_id_ < current_header_id || |
+ headers_id_ > kMaxHeaderIdDelta + current_header_id) { |
+ DVLOG(1) << ENDPOINT |
+ << "Invalid headers for stream: " << id() |
+ << " header_id: " << headers_id_ |
+ << " current_header_id: " << current_header_id; |
+ session()->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); |
+ return total_bytes_consumed; |
+ } |
+ |
+ // If we are head-of-line blocked on decompression, then back up. |
+ if (current_header_id != headers_id_) { |
+ session()->MarkDecompressionBlocked(headers_id_, id()); |
+ DVLOG(1) << ENDPOINT |
+ << "Unable to decompress header data for stream: " << id() |
+ << " header_id: " << headers_id_; |
+ return total_bytes_consumed; |
+ } |
+ |
+ // Decompressed data will be delivered to decompressed_headers_. |
+ size_t bytes_consumed = session()->decompressor()->DecompressData( |
+ StringPiece(data, data_len), this); |
+ DCHECK_NE(0u, bytes_consumed); |
+ if (bytes_consumed > data_len) { |
+ DCHECK(false) << "DecompressData returned illegal value"; |
+ OnDecompressionError(); |
+ return total_bytes_consumed; |
+ } |
+ total_bytes_consumed += bytes_consumed; |
+ data += bytes_consumed; |
+ data_len -= bytes_consumed; |
+ |
+ if (decompression_failed_) { |
+ // The session will have been closed in OnDecompressionError. |
+ return total_bytes_consumed; |
+ } |
+ |
+ // Headers are complete if the decompressor has moved on to the |
+ // next stream. |
+ headers_decompressed_ = |
+ session()->decompressor()->current_header_id() != headers_id_; |
+ if (!headers_decompressed_) { |
+ DCHECK_EQ(0u, data_len); |
+ } |
+ |
+ ProcessHeaderData(); |
+ |
+ if (!headers_decompressed_ || !decompressed_headers_.empty()) { |
+ return total_bytes_consumed; |
+ } |
+ |
+ // We have processed all of the decompressed data but we might |
+ // have some more raw data to process. |
+ if (data_len > 0) { |
+ total_bytes_consumed += ProcessData(data, data_len); |
+ } |
+ |
+ // The sequencer will push any additional buffered frames if this data |
+ // has been completely consumed. |
+ return total_bytes_consumed; |
+} |
+ |
+const IPEndPoint& QuicDataStream::GetPeerAddress() { |
+ return session()->peer_address(); |
+} |
+ |
+QuicSpdyCompressor* QuicDataStream::compressor() { |
+ return session()->compressor(); |
+} |
+ |
+bool QuicDataStream::GetSSLInfo(SSLInfo* ssl_info) { |
+ return session()->GetSSLInfo(ssl_info); |
+} |
+ |
+uint32 QuicDataStream::ProcessHeaderData() { |
+ if (decompressed_headers_.empty()) { |
+ return 0; |
+ } |
+ |
+ size_t bytes_processed = ProcessData(decompressed_headers_.data(), |
+ decompressed_headers_.length()); |
+ if (bytes_processed == decompressed_headers_.length()) { |
+ decompressed_headers_.clear(); |
+ } else { |
+ decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); |
+ } |
+ return bytes_processed; |
+} |
+ |
+void QuicDataStream::OnDecompressorAvailable() { |
+ DCHECK_EQ(headers_id_, |
+ session()->decompressor()->current_header_id()); |
+ DCHECK(!headers_decompressed_); |
+ DCHECK(!decompression_failed_); |
+ DCHECK_EQ(0u, decompressed_headers_.length()); |
+ |
+ while (!headers_decompressed_) { |
+ struct iovec iovec; |
+ if (sequencer()->GetReadableRegions(&iovec, 1) == 0) { |
+ return; |
+ } |
+ |
+ size_t bytes_consumed = session()->decompressor()->DecompressData( |
+ StringPiece(static_cast<char*>(iovec.iov_base), |
+ iovec.iov_len), |
+ this); |
+ DCHECK_LE(bytes_consumed, iovec.iov_len); |
+ if (decompression_failed_) { |
+ return; |
+ } |
+ sequencer()->MarkConsumed(bytes_consumed); |
+ |
+ headers_decompressed_ = |
+ session()->decompressor()->current_header_id() != headers_id_; |
+ } |
+ |
+ // Either the headers are complete, or the all data as been consumed. |
+ ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. |
+ if (IsDoneReading()) { |
+ OnFinRead(); |
+ } else if (FinishedReadingHeaders()) { |
+ sequencer()->FlushBufferedFrames(); |
+ } |
+} |
+ |
+bool QuicDataStream::OnDecompressedData(StringPiece data) { |
+ data.AppendToString(&decompressed_headers_); |
+ return true; |
+} |
+ |
+void QuicDataStream::OnDecompressionError() { |
+ DCHECK(!decompression_failed_); |
+ decompression_failed_ = true; |
+ session()->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); |
+} |
+ |
+void QuicDataStream::OnClose() { |
+ ReliableQuicStream::OnClose(); |
+ |
+ if (visitor_) { |
+ Visitor* visitor = visitor_; |
+ // Calling Visitor::OnClose() may result the destruction of the visitor, |
+ // so we need to ensure we don't call it again. |
+ visitor_ = NULL; |
+ visitor->OnClose(this); |
+ } |
+} |
+ |
+uint32 QuicDataStream::StripPriorityAndHeaderId( |
+ const char* data, uint32 data_len) { |
+ uint32 total_bytes_parsed = 0; |
+ |
+ if (!priority_parsed_ && session()->connection()->is_server()) { |
+ QuicPriority temporary_priority = priority_; |
+ total_bytes_parsed = StripUint32( |
+ data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); |
+ if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.size() == 0) { |
+ priority_parsed_ = true; |
+ |
+ // Spdy priorities are inverted, so the highest numerical value is the |
+ // lowest legal priority. |
+ if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { |
+ session()->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); |
+ return 0; |
+ } |
+ priority_ = temporary_priority; |
+ } |
+ data += total_bytes_parsed; |
+ data_len -= total_bytes_parsed; |
+ } |
+ if (data_len > 0 && headers_id_ == 0u) { |
+ // The headers ID has not yet been read. Strip it from the beginning of |
+ // the data stream. |
+ total_bytes_parsed += StripUint32( |
+ data, data_len, &headers_id_and_priority_buffer_, &headers_id_); |
+ } |
+ return total_bytes_parsed; |
+} |
+ |
+bool QuicDataStream::FinishedReadingHeaders() { |
+ return headers_decompressed_ && decompressed_headers_.empty(); |
+} |
+ |
+} // namespace net |