Index: net/quic/reliable_quic_stream.cc |
diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc |
index bc472f0bca8f7dde0ae6eb2c55652340e897cc42..1db53573efdfe5639c24b9764a9ff716c26b335e 100644 |
--- a/net/quic/reliable_quic_stream.cc |
+++ b/net/quic/reliable_quic_stream.cc |
@@ -13,32 +13,14 @@ using std::min; |
namespace net { |
+#define ENDPOINT (is_server_ ? "Server: " : " Client: ") |
+ |
namespace { |
-// This is somewhat arbitrary. It's possible, but unlikely, we will either fail |
-// to set a priority client-side, or cancel a stream before stripping the |
-// priority from the wire server-side. In either case, start out with a |
-// priority in the middle. |
-QuicPriority kDefaultPriority = 3; |
- |
-// Appends bytes from data into partial_data_buffer. Once partial_data_buffer |
-// reaches 4 bytes, copies the data into 'result' and clears |
-// partial_data_buffer. |
-// Returns the number of bytes consumed. |
-uint32 StripUint32(const char* data, uint32 data_len, |
- string* partial_data_buffer, |
- uint32* result) { |
- DCHECK_GT(4u, partial_data_buffer->length()); |
- size_t missing_size = 4 - partial_data_buffer->length(); |
- if (data_len < missing_size) { |
- StringPiece(data, data_len).AppendToString(partial_data_buffer); |
- return data_len; |
- } |
- StringPiece(data, missing_size).AppendToString(partial_data_buffer); |
- DCHECK_EQ(4u, partial_data_buffer->length()); |
- memcpy(result, partial_data_buffer->data(), 4); |
- partial_data_buffer->clear(); |
- return missing_size; |
+struct iovec MakeIovec(StringPiece data) { |
+ struct iovec iov = {const_cast<char*>(data.data()), |
+ static_cast<size_t>(data.size())}; |
+ return iov; |
} |
} // namespace |
@@ -48,18 +30,12 @@ ReliableQuicStream::ReliableQuicStream(QuicStreamId id, |
: sequencer_(this), |
id_(id), |
session_(session), |
- visitor_(NULL), |
stream_bytes_read_(0), |
stream_bytes_written_(0), |
- headers_decompressed_(false), |
- priority_(kDefaultPriority), |
- headers_id_(0), |
- decompression_failed_(false), |
stream_error_(QUIC_STREAM_NO_ERROR), |
connection_error_(QUIC_NO_ERROR), |
read_side_closed_(false), |
write_side_closed_(false), |
- priority_parsed_(false), |
fin_buffered_(false), |
fin_sent_(false), |
is_server_(session_->is_server()) { |
@@ -136,81 +112,16 @@ void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error, |
session()->connection()->SendConnectionCloseWithDetails(error, details); |
} |
-size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) { |
- if (headers_decompressed_ && decompressed_headers_.empty()) { |
- return sequencer_.Readv(iov, iov_len); |
- } |
- size_t bytes_consumed = 0; |
- size_t iov_index = 0; |
- while (iov_index < iov_len && |
- decompressed_headers_.length() > bytes_consumed) { |
- size_t bytes_to_read = min(iov[iov_index].iov_len, |
- decompressed_headers_.length() - bytes_consumed); |
- char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base); |
- memcpy(iov_ptr, |
- decompressed_headers_.data() + bytes_consumed, bytes_to_read); |
- bytes_consumed += bytes_to_read; |
- ++iov_index; |
- } |
- decompressed_headers_.erase(0, bytes_consumed); |
- return bytes_consumed; |
-} |
- |
-int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) { |
- if (headers_decompressed_ && decompressed_headers_.empty()) { |
- return sequencer_.GetReadableRegions(iov, iov_len); |
- } |
- if (iov_len == 0) { |
- return 0; |
- } |
- iov[0].iov_base = static_cast<void*>( |
- const_cast<char*>(decompressed_headers_.data())); |
- iov[0].iov_len = decompressed_headers_.length(); |
- return 1; |
-} |
- |
-bool ReliableQuicStream::IsDoneReading() const { |
- if (!headers_decompressed_ || !decompressed_headers_.empty()) { |
- return false; |
- } |
- return sequencer_.IsClosed(); |
-} |
- |
-bool ReliableQuicStream::HasBytesToRead() const { |
- return !decompressed_headers_.empty() || sequencer_.HasBytesToRead(); |
-} |
- |
-const IPEndPoint& ReliableQuicStream::GetPeerAddress() const { |
- return session_->peer_address(); |
-} |
- |
-QuicSpdyCompressor* ReliableQuicStream::compressor() { |
- return session_->compressor(); |
-} |
- |
-bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) { |
- return session_->GetSSLInfo(ssl_info); |
-} |
- |
-QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) { |
+void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) { |
DCHECK(data.size() > 0 || fin); |
- return WriteOrBuffer(data, fin); |
-} |
- |
- |
-void ReliableQuicStream::set_priority(QuicPriority priority) { |
- DCHECK_EQ(0u, stream_bytes_written_); |
- priority_ = priority; |
-} |
- |
-QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { |
DCHECK(!fin_buffered_); |
QuicConsumedData consumed_data(0, false); |
fin_buffered_ = fin; |
if (queued_data_.empty()) { |
- consumed_data = WriteDataInternal(string(data.data(), data.length()), fin); |
+ struct iovec iov(MakeIovec(data)); |
+ consumed_data = WritevData(&iov, 1, fin, NULL); |
DCHECK_LE(consumed_data.bytes_consumed, data.length()); |
} |
@@ -221,8 +132,6 @@ QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) { |
string(data.data() + consumed_data.bytes_consumed, |
data.length() - consumed_data.bytes_consumed)); |
} |
- |
- return QuicConsumedData(data.size(), true); |
} |
void ReliableQuicStream::OnCanWrite() { |
@@ -232,7 +141,8 @@ void ReliableQuicStream::OnCanWrite() { |
if (queued_data_.size() == 1 && fin_buffered_) { |
fin = true; |
} |
- QuicConsumedData consumed_data = WriteDataInternal(data, fin); |
+ struct iovec iov(MakeIovec(data)); |
+ QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL); |
if (consumed_data.bytes_consumed == data.size() && |
fin == consumed_data.fin_consumed) { |
queued_data_.pop_front(); |
@@ -243,14 +153,7 @@ void ReliableQuicStream::OnCanWrite() { |
} |
} |
-QuicConsumedData ReliableQuicStream::WriteDataInternal( |
- StringPiece data, bool fin) { |
- struct iovec iov = {const_cast<char*>(data.data()), |
- static_cast<size_t>(data.size())}; |
- return WritevDataInternal(&iov, 1, fin, NULL); |
-} |
- |
-QuicConsumedData ReliableQuicStream::WritevDataInternal( |
+QuicConsumedData ReliableQuicStream::WritevData( |
const struct iovec* iov, |
int iov_count, |
bool fin, |
@@ -280,10 +183,6 @@ QuicConsumedData ReliableQuicStream::WritevDataInternal( |
return consumed_data; |
} |
-QuicPriority ReliableQuicStream::EffectivePriority() const { |
- return priority(); |
-} |
- |
void ReliableQuicStream::CloseReadSide() { |
if (read_side_closed_) { |
return; |
@@ -297,165 +196,6 @@ void ReliableQuicStream::CloseReadSide() { |
} |
} |
-uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) { |
- DCHECK_NE(0u, data_len); |
- if (id() == kCryptoStreamId) { |
- // The crypto stream does not use compression. |
- return ProcessData(data, data_len); |
- } |
- |
- uint32 total_bytes_consumed = 0; |
- if (headers_id_ == 0u) { |
- total_bytes_consumed += StripPriorityAndHeaderId(data, data_len); |
- data += total_bytes_consumed; |
- data_len -= total_bytes_consumed; |
- if (data_len == 0 || total_bytes_consumed == 0) { |
- return total_bytes_consumed; |
- } |
- } |
- DCHECK_NE(0u, headers_id_); |
- |
- // Once the headers are finished, we simply pass the data through. |
- if (headers_decompressed_) { |
- // Some buffered header data remains. |
- if (!decompressed_headers_.empty()) { |
- ProcessHeaderData(); |
- } |
- if (decompressed_headers_.empty()) { |
- DVLOG(1) << "Delegating procesing to ProcessData"; |
- total_bytes_consumed += ProcessData(data, data_len); |
- } |
- return total_bytes_consumed; |
- } |
- |
- QuicHeaderId current_header_id = |
- session_->decompressor()->current_header_id(); |
- // Ensure that this header id looks sane. |
- if (headers_id_ < current_header_id || |
- headers_id_ > kMaxHeaderIdDelta + current_header_id) { |
- DVLOG(1) << ENDPOINT |
- << "Invalid headers for stream: " << id() |
- << " header_id: " << headers_id_ |
- << " current_header_id: " << current_header_id; |
- session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID); |
- return total_bytes_consumed; |
- } |
- |
- // If we are head-of-line blocked on decompression, then back up. |
- if (current_header_id != headers_id_) { |
- session_->MarkDecompressionBlocked(headers_id_, id()); |
- DVLOG(1) << ENDPOINT |
- << "Unable to decompress header data for stream: " << id() |
- << " header_id: " << headers_id_; |
- return total_bytes_consumed; |
- } |
- |
- // Decompressed data will be delivered to decompressed_headers_. |
- size_t bytes_consumed = session_->decompressor()->DecompressData( |
- StringPiece(data, data_len), this); |
- DCHECK_NE(0u, bytes_consumed); |
- if (bytes_consumed > data_len) { |
- DCHECK(false) << "DecompressData returned illegal value"; |
- OnDecompressionError(); |
- return total_bytes_consumed; |
- } |
- total_bytes_consumed += bytes_consumed; |
- data += bytes_consumed; |
- data_len -= bytes_consumed; |
- |
- if (decompression_failed_) { |
- // The session will have been closed in OnDecompressionError. |
- return total_bytes_consumed; |
- } |
- |
- // Headers are complete if the decompressor has moved on to the |
- // next stream. |
- headers_decompressed_ = |
- session_->decompressor()->current_header_id() != headers_id_; |
- if (!headers_decompressed_) { |
- DCHECK_EQ(0u, data_len); |
- } |
- |
- ProcessHeaderData(); |
- |
- if (!headers_decompressed_ || !decompressed_headers_.empty()) { |
- return total_bytes_consumed; |
- } |
- |
- // We have processed all of the decompressed data but we might |
- // have some more raw data to process. |
- if (data_len > 0) { |
- total_bytes_consumed += ProcessData(data, data_len); |
- } |
- |
- // The sequencer will push any additional buffered frames if this data |
- // has been completely consumed. |
- return total_bytes_consumed; |
-} |
- |
-uint32 ReliableQuicStream::ProcessHeaderData() { |
- if (decompressed_headers_.empty()) { |
- return 0; |
- } |
- |
- size_t bytes_processed = ProcessData(decompressed_headers_.data(), |
- decompressed_headers_.length()); |
- if (bytes_processed == decompressed_headers_.length()) { |
- decompressed_headers_.clear(); |
- } else { |
- decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed); |
- } |
- return bytes_processed; |
-} |
- |
-void ReliableQuicStream::OnDecompressorAvailable() { |
- DCHECK_EQ(headers_id_, |
- session_->decompressor()->current_header_id()); |
- DCHECK(!headers_decompressed_); |
- DCHECK(!decompression_failed_); |
- DCHECK_EQ(0u, decompressed_headers_.length()); |
- |
- while (!headers_decompressed_) { |
- struct iovec iovec; |
- if (sequencer_.GetReadableRegions(&iovec, 1) == 0) { |
- return; |
- } |
- |
- size_t bytes_consumed = session_->decompressor()->DecompressData( |
- StringPiece(static_cast<char*>(iovec.iov_base), |
- iovec.iov_len), |
- this); |
- DCHECK_LE(bytes_consumed, iovec.iov_len); |
- if (decompression_failed_) { |
- return; |
- } |
- sequencer_.MarkConsumed(bytes_consumed); |
- |
- headers_decompressed_ = |
- session_->decompressor()->current_header_id() != headers_id_; |
- } |
- |
- // Either the headers are complete, or the all data as been consumed. |
- ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_. |
- if (IsDoneReading()) { |
- OnFinRead(); |
- } else if (headers_decompressed_ && decompressed_headers_.empty()) { |
- sequencer_.FlushBufferedFrames(); |
- } |
-} |
- |
-bool ReliableQuicStream::OnDecompressedData(StringPiece data) { |
- data.AppendToString(&decompressed_headers_); |
- return true; |
-} |
- |
-void ReliableQuicStream::OnDecompressionError() { |
- DCHECK(!decompression_failed_); |
- decompression_failed_ = true; |
- session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE); |
-} |
- |
- |
void ReliableQuicStream::CloseWriteSide() { |
if (write_side_closed_) { |
return; |
@@ -476,45 +216,6 @@ bool ReliableQuicStream::HasBufferedData() { |
void ReliableQuicStream::OnClose() { |
CloseReadSide(); |
CloseWriteSide(); |
- |
- if (visitor_) { |
- Visitor* visitor = visitor_; |
- // Calling Visitor::OnClose() may result the destruction of the visitor, |
- // so we need to ensure we don't call it again. |
- visitor_ = NULL; |
- visitor->OnClose(this); |
- } |
-} |
- |
-uint32 ReliableQuicStream::StripPriorityAndHeaderId( |
- const char* data, uint32 data_len) { |
- uint32 total_bytes_parsed = 0; |
- |
- if (!priority_parsed_ && session_->connection()->is_server()) { |
- QuicPriority temporary_priority = priority_; |
- total_bytes_parsed = StripUint32( |
- data, data_len, &headers_id_and_priority_buffer_, &temporary_priority); |
- if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) { |
- priority_parsed_ = true; |
- |
- // Spdy priorities are inverted, so the highest numerical value is the |
- // lowest legal priority. |
- if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) { |
- session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY); |
- return 0; |
- } |
- priority_ = temporary_priority; |
- } |
- data += total_bytes_parsed; |
- data_len -= total_bytes_parsed; |
- } |
- if (data_len > 0 && headers_id_ == 0u) { |
- // The headers ID has not yet been read. Strip it from the beginning of |
- // the data stream. |
- total_bytes_parsed += StripUint32( |
- data, data_len, &headers_id_and_priority_buffer_, &headers_id_); |
- } |
- return total_bytes_parsed; |
} |
} // namespace net |