Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(334)

Unified Diff: net/quic/reliable_quic_stream.cc

Issue 103973007: Land Recent QUIC Changes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix for android compile error Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/quic/reliable_quic_stream.cc
diff --git a/net/quic/reliable_quic_stream.cc b/net/quic/reliable_quic_stream.cc
index bc472f0bca8f7dde0ae6eb2c55652340e897cc42..1db53573efdfe5639c24b9764a9ff716c26b335e 100644
--- a/net/quic/reliable_quic_stream.cc
+++ b/net/quic/reliable_quic_stream.cc
@@ -13,32 +13,14 @@ using std::min;
namespace net {
+#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
+
namespace {
-// This is somewhat arbitrary. It's possible, but unlikely, we will either fail
-// to set a priority client-side, or cancel a stream before stripping the
-// priority from the wire server-side. In either case, start out with a
-// priority in the middle.
-QuicPriority kDefaultPriority = 3;
-
-// Appends bytes from data into partial_data_buffer. Once partial_data_buffer
-// reaches 4 bytes, copies the data into 'result' and clears
-// partial_data_buffer.
-// Returns the number of bytes consumed.
-uint32 StripUint32(const char* data, uint32 data_len,
- string* partial_data_buffer,
- uint32* result) {
- DCHECK_GT(4u, partial_data_buffer->length());
- size_t missing_size = 4 - partial_data_buffer->length();
- if (data_len < missing_size) {
- StringPiece(data, data_len).AppendToString(partial_data_buffer);
- return data_len;
- }
- StringPiece(data, missing_size).AppendToString(partial_data_buffer);
- DCHECK_EQ(4u, partial_data_buffer->length());
- memcpy(result, partial_data_buffer->data(), 4);
- partial_data_buffer->clear();
- return missing_size;
+struct iovec MakeIovec(StringPiece data) {
+ struct iovec iov = {const_cast<char*>(data.data()),
+ static_cast<size_t>(data.size())};
+ return iov;
}
} // namespace
@@ -48,18 +30,12 @@ ReliableQuicStream::ReliableQuicStream(QuicStreamId id,
: sequencer_(this),
id_(id),
session_(session),
- visitor_(NULL),
stream_bytes_read_(0),
stream_bytes_written_(0),
- headers_decompressed_(false),
- priority_(kDefaultPriority),
- headers_id_(0),
- decompression_failed_(false),
stream_error_(QUIC_STREAM_NO_ERROR),
connection_error_(QUIC_NO_ERROR),
read_side_closed_(false),
write_side_closed_(false),
- priority_parsed_(false),
fin_buffered_(false),
fin_sent_(false),
is_server_(session_->is_server()) {
@@ -136,81 +112,16 @@ void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
session()->connection()->SendConnectionCloseWithDetails(error, details);
}
-size_t ReliableQuicStream::Readv(const struct iovec* iov, size_t iov_len) {
- if (headers_decompressed_ && decompressed_headers_.empty()) {
- return sequencer_.Readv(iov, iov_len);
- }
- size_t bytes_consumed = 0;
- size_t iov_index = 0;
- while (iov_index < iov_len &&
- decompressed_headers_.length() > bytes_consumed) {
- size_t bytes_to_read = min(iov[iov_index].iov_len,
- decompressed_headers_.length() - bytes_consumed);
- char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base);
- memcpy(iov_ptr,
- decompressed_headers_.data() + bytes_consumed, bytes_to_read);
- bytes_consumed += bytes_to_read;
- ++iov_index;
- }
- decompressed_headers_.erase(0, bytes_consumed);
- return bytes_consumed;
-}
-
-int ReliableQuicStream::GetReadableRegions(iovec* iov, size_t iov_len) {
- if (headers_decompressed_ && decompressed_headers_.empty()) {
- return sequencer_.GetReadableRegions(iov, iov_len);
- }
- if (iov_len == 0) {
- return 0;
- }
- iov[0].iov_base = static_cast<void*>(
- const_cast<char*>(decompressed_headers_.data()));
- iov[0].iov_len = decompressed_headers_.length();
- return 1;
-}
-
-bool ReliableQuicStream::IsDoneReading() const {
- if (!headers_decompressed_ || !decompressed_headers_.empty()) {
- return false;
- }
- return sequencer_.IsClosed();
-}
-
-bool ReliableQuicStream::HasBytesToRead() const {
- return !decompressed_headers_.empty() || sequencer_.HasBytesToRead();
-}
-
-const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
- return session_->peer_address();
-}
-
-QuicSpdyCompressor* ReliableQuicStream::compressor() {
- return session_->compressor();
-}
-
-bool ReliableQuicStream::GetSSLInfo(SSLInfo* ssl_info) {
- return session_->GetSSLInfo(ssl_info);
-}
-
-QuicConsumedData ReliableQuicStream::WriteData(StringPiece data, bool fin) {
+void ReliableQuicStream::WriteOrBufferData(StringPiece data, bool fin) {
DCHECK(data.size() > 0 || fin);
- return WriteOrBuffer(data, fin);
-}
-
-
-void ReliableQuicStream::set_priority(QuicPriority priority) {
- DCHECK_EQ(0u, stream_bytes_written_);
- priority_ = priority;
-}
-
-QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
DCHECK(!fin_buffered_);
QuicConsumedData consumed_data(0, false);
fin_buffered_ = fin;
if (queued_data_.empty()) {
- consumed_data = WriteDataInternal(string(data.data(), data.length()), fin);
+ struct iovec iov(MakeIovec(data));
+ consumed_data = WritevData(&iov, 1, fin, NULL);
DCHECK_LE(consumed_data.bytes_consumed, data.length());
}
@@ -221,8 +132,6 @@ QuicConsumedData ReliableQuicStream::WriteOrBuffer(StringPiece data, bool fin) {
string(data.data() + consumed_data.bytes_consumed,
data.length() - consumed_data.bytes_consumed));
}
-
- return QuicConsumedData(data.size(), true);
}
void ReliableQuicStream::OnCanWrite() {
@@ -232,7 +141,8 @@ void ReliableQuicStream::OnCanWrite() {
if (queued_data_.size() == 1 && fin_buffered_) {
fin = true;
}
- QuicConsumedData consumed_data = WriteDataInternal(data, fin);
+ struct iovec iov(MakeIovec(data));
+ QuicConsumedData consumed_data = WritevData(&iov, 1, fin, NULL);
if (consumed_data.bytes_consumed == data.size() &&
fin == consumed_data.fin_consumed) {
queued_data_.pop_front();
@@ -243,14 +153,7 @@ void ReliableQuicStream::OnCanWrite() {
}
}
-QuicConsumedData ReliableQuicStream::WriteDataInternal(
- StringPiece data, bool fin) {
- struct iovec iov = {const_cast<char*>(data.data()),
- static_cast<size_t>(data.size())};
- return WritevDataInternal(&iov, 1, fin, NULL);
-}
-
-QuicConsumedData ReliableQuicStream::WritevDataInternal(
+QuicConsumedData ReliableQuicStream::WritevData(
const struct iovec* iov,
int iov_count,
bool fin,
@@ -280,10 +183,6 @@ QuicConsumedData ReliableQuicStream::WritevDataInternal(
return consumed_data;
}
-QuicPriority ReliableQuicStream::EffectivePriority() const {
- return priority();
-}
-
void ReliableQuicStream::CloseReadSide() {
if (read_side_closed_) {
return;
@@ -297,165 +196,6 @@ void ReliableQuicStream::CloseReadSide() {
}
}
-uint32 ReliableQuicStream::ProcessRawData(const char* data, uint32 data_len) {
- DCHECK_NE(0u, data_len);
- if (id() == kCryptoStreamId) {
- // The crypto stream does not use compression.
- return ProcessData(data, data_len);
- }
-
- uint32 total_bytes_consumed = 0;
- if (headers_id_ == 0u) {
- total_bytes_consumed += StripPriorityAndHeaderId(data, data_len);
- data += total_bytes_consumed;
- data_len -= total_bytes_consumed;
- if (data_len == 0 || total_bytes_consumed == 0) {
- return total_bytes_consumed;
- }
- }
- DCHECK_NE(0u, headers_id_);
-
- // Once the headers are finished, we simply pass the data through.
- if (headers_decompressed_) {
- // Some buffered header data remains.
- if (!decompressed_headers_.empty()) {
- ProcessHeaderData();
- }
- if (decompressed_headers_.empty()) {
- DVLOG(1) << "Delegating procesing to ProcessData";
- total_bytes_consumed += ProcessData(data, data_len);
- }
- return total_bytes_consumed;
- }
-
- QuicHeaderId current_header_id =
- session_->decompressor()->current_header_id();
- // Ensure that this header id looks sane.
- if (headers_id_ < current_header_id ||
- headers_id_ > kMaxHeaderIdDelta + current_header_id) {
- DVLOG(1) << ENDPOINT
- << "Invalid headers for stream: " << id()
- << " header_id: " << headers_id_
- << " current_header_id: " << current_header_id;
- session_->connection()->SendConnectionClose(QUIC_INVALID_HEADER_ID);
- return total_bytes_consumed;
- }
-
- // If we are head-of-line blocked on decompression, then back up.
- if (current_header_id != headers_id_) {
- session_->MarkDecompressionBlocked(headers_id_, id());
- DVLOG(1) << ENDPOINT
- << "Unable to decompress header data for stream: " << id()
- << " header_id: " << headers_id_;
- return total_bytes_consumed;
- }
-
- // Decompressed data will be delivered to decompressed_headers_.
- size_t bytes_consumed = session_->decompressor()->DecompressData(
- StringPiece(data, data_len), this);
- DCHECK_NE(0u, bytes_consumed);
- if (bytes_consumed > data_len) {
- DCHECK(false) << "DecompressData returned illegal value";
- OnDecompressionError();
- return total_bytes_consumed;
- }
- total_bytes_consumed += bytes_consumed;
- data += bytes_consumed;
- data_len -= bytes_consumed;
-
- if (decompression_failed_) {
- // The session will have been closed in OnDecompressionError.
- return total_bytes_consumed;
- }
-
- // Headers are complete if the decompressor has moved on to the
- // next stream.
- headers_decompressed_ =
- session_->decompressor()->current_header_id() != headers_id_;
- if (!headers_decompressed_) {
- DCHECK_EQ(0u, data_len);
- }
-
- ProcessHeaderData();
-
- if (!headers_decompressed_ || !decompressed_headers_.empty()) {
- return total_bytes_consumed;
- }
-
- // We have processed all of the decompressed data but we might
- // have some more raw data to process.
- if (data_len > 0) {
- total_bytes_consumed += ProcessData(data, data_len);
- }
-
- // The sequencer will push any additional buffered frames if this data
- // has been completely consumed.
- return total_bytes_consumed;
-}
-
-uint32 ReliableQuicStream::ProcessHeaderData() {
- if (decompressed_headers_.empty()) {
- return 0;
- }
-
- size_t bytes_processed = ProcessData(decompressed_headers_.data(),
- decompressed_headers_.length());
- if (bytes_processed == decompressed_headers_.length()) {
- decompressed_headers_.clear();
- } else {
- decompressed_headers_ = decompressed_headers_.erase(0, bytes_processed);
- }
- return bytes_processed;
-}
-
-void ReliableQuicStream::OnDecompressorAvailable() {
- DCHECK_EQ(headers_id_,
- session_->decompressor()->current_header_id());
- DCHECK(!headers_decompressed_);
- DCHECK(!decompression_failed_);
- DCHECK_EQ(0u, decompressed_headers_.length());
-
- while (!headers_decompressed_) {
- struct iovec iovec;
- if (sequencer_.GetReadableRegions(&iovec, 1) == 0) {
- return;
- }
-
- size_t bytes_consumed = session_->decompressor()->DecompressData(
- StringPiece(static_cast<char*>(iovec.iov_base),
- iovec.iov_len),
- this);
- DCHECK_LE(bytes_consumed, iovec.iov_len);
- if (decompression_failed_) {
- return;
- }
- sequencer_.MarkConsumed(bytes_consumed);
-
- headers_decompressed_ =
- session_->decompressor()->current_header_id() != headers_id_;
- }
-
- // Either the headers are complete, or the all data as been consumed.
- ProcessHeaderData(); // Unprocessed headers remain in decompressed_headers_.
- if (IsDoneReading()) {
- OnFinRead();
- } else if (headers_decompressed_ && decompressed_headers_.empty()) {
- sequencer_.FlushBufferedFrames();
- }
-}
-
-bool ReliableQuicStream::OnDecompressedData(StringPiece data) {
- data.AppendToString(&decompressed_headers_);
- return true;
-}
-
-void ReliableQuicStream::OnDecompressionError() {
- DCHECK(!decompression_failed_);
- decompression_failed_ = true;
- session_->connection()->SendConnectionClose(QUIC_DECOMPRESSION_FAILURE);
-}
-
-
void ReliableQuicStream::CloseWriteSide() {
if (write_side_closed_) {
return;
@@ -476,45 +216,6 @@ bool ReliableQuicStream::HasBufferedData() {
void ReliableQuicStream::OnClose() {
CloseReadSide();
CloseWriteSide();
-
- if (visitor_) {
- Visitor* visitor = visitor_;
- // Calling Visitor::OnClose() may result the destruction of the visitor,
- // so we need to ensure we don't call it again.
- visitor_ = NULL;
- visitor->OnClose(this);
- }
-}
-
-uint32 ReliableQuicStream::StripPriorityAndHeaderId(
- const char* data, uint32 data_len) {
- uint32 total_bytes_parsed = 0;
-
- if (!priority_parsed_ && session_->connection()->is_server()) {
- QuicPriority temporary_priority = priority_;
- total_bytes_parsed = StripUint32(
- data, data_len, &headers_id_and_priority_buffer_, &temporary_priority);
- if (total_bytes_parsed > 0 && headers_id_and_priority_buffer_.empty()) {
- priority_parsed_ = true;
-
- // Spdy priorities are inverted, so the highest numerical value is the
- // lowest legal priority.
- if (temporary_priority > static_cast<QuicPriority>(kLowestPriority)) {
- session_->connection()->SendConnectionClose(QUIC_INVALID_PRIORITY);
- return 0;
- }
- priority_ = temporary_priority;
- }
- data += total_bytes_parsed;
- data_len -= total_bytes_parsed;
- }
- if (data_len > 0 && headers_id_ == 0u) {
- // The headers ID has not yet been read. Strip it from the beginning of
- // the data stream.
- total_bytes_parsed += StripUint32(
- data, data_len, &headers_id_and_priority_buffer_, &headers_id_);
- }
- return total_bytes_parsed;
}
} // namespace net
« no previous file with comments | « net/quic/reliable_quic_stream.h ('k') | net/quic/reliable_quic_stream_test.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698