| Index: net/quic/bidirectional_stream_quic_impl.cc
|
| diff --git a/net/quic/bidirectional_stream_quic_impl.cc b/net/quic/bidirectional_stream_quic_impl.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..bd2812e21cdc8ef579c3202a3b96680e691bf718
|
| --- /dev/null
|
| +++ b/net/quic/bidirectional_stream_quic_impl.cc
|
| @@ -0,0 +1,264 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "net/quic/bidirectional_stream_quic_impl.h"
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/timer/timer.h"
|
| +#include "net/http/bidirectional_stream_request_info.h"
|
| +#include "net/socket/next_proto.h"
|
| +#include "net/spdy/spdy_header_block.h"
|
| +#include "net/spdy/spdy_http_utils.h"
|
| +
|
| +namespace net {
|
| +
|
| +BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
|
| + const base::WeakPtr<QuicChromiumClientSession>& session)
|
| + : session_(session),
|
| + was_handshake_confirmed_(session->IsCryptoHandshakeConfirmed()),
|
| + stream_(nullptr),
|
| + request_info_(nullptr),
|
| + delegate_(nullptr),
|
| + response_status_(OK),
|
| + negotiated_protocol_(kProtoUnknown),
|
| + read_buffer_len_(0),
|
| + headers_bytes_received_(0),
|
| + headers_bytes_sent_(0),
|
| + closed_stream_received_bytes_(0),
|
| + closed_stream_sent_bytes_(0),
|
| + has_sent_headers_(false),
|
| + has_received_headers_(false),
|
| + weak_factory_(this) {
|
| + DCHECK(session_);
|
| + session_->AddObserver(this);
|
| +}
|
| +
|
| +BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() {
|
| + Cancel();
|
| + if (session_)
|
| + session_->RemoveObserver(this);
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::Start(
|
| + const BidirectionalStreamRequestInfo* request_info,
|
| + const BoundNetLog& net_log,
|
| + BidirectionalStreamJob::Delegate* delegate,
|
| + scoped_ptr<base::Timer> /* timer */) {
|
| + DCHECK(!stream_);
|
| +
|
| + if (!session_) {
|
| + NotifyError(was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR
|
| + : ERR_QUIC_HANDSHAKE_FAILED);
|
| + return;
|
| + }
|
| +
|
| + delegate_ = delegate;
|
| + request_info_ = request_info;
|
| +
|
| + int rv = stream_request_.StartRequest(
|
| + session_, &stream_,
|
| + base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady,
|
| + weak_factory_.GetWeakPtr()));
|
| + if (rv == OK) {
|
| + OnStreamReady(rv);
|
| + } else if (!was_handshake_confirmed_) {
|
| + NotifyError(ERR_QUIC_HANDSHAKE_FAILED);
|
| + }
|
| +}
|
| +
|
| +int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
|
| + DCHECK(buffer);
|
| + DCHECK(buffer_len);
|
| +
|
| + if (!stream_) {
|
| + // If the stream is already closed, there is no body to read.
|
| + return response_status_;
|
| + }
|
| + int rv = stream_->Read(buffer, buffer_len);
|
| + if (rv != ERR_IO_PENDING) {
|
| + if (stream_->IsDoneReading()) {
|
| + // If the write side is closed, OnFinRead() will call
|
| + // BidirectionalStreamQuicImpl::OnClose().
|
| + stream_->OnFinRead();
|
| + }
|
| + return rv;
|
| + }
|
| + // Read will complete asynchronously and Delegate::OnReadCompleted will be
|
| + // called upon completion.
|
| + read_buffer_ = buffer;
|
| + read_buffer_len_ = buffer_len;
|
| + return ERR_IO_PENDING;
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::SendData(IOBuffer* data,
|
| + int length,
|
| + bool end_stream) {
|
| + DCHECK(stream_);
|
| + DCHECK(length > 0 || (length == 0 && end_stream));
|
| +
|
| + base::StringPiece string_data(data->data(), length);
|
| + int rv = stream_->WriteStreamData(
|
| + string_data, end_stream,
|
| + base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
|
| + weak_factory_.GetWeakPtr()));
|
| + DCHECK(rv == OK || rv == ERR_IO_PENDING);
|
| + if (rv == OK) {
|
| + base::ThreadTaskRunnerHandle::Get()->PostTask(
|
| + FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
|
| + weak_factory_.GetWeakPtr(), OK));
|
| + }
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::Cancel() {
|
| + if (stream_) {
|
| + stream_->SetDelegate(nullptr);
|
| + stream_->Reset(QUIC_STREAM_CANCELLED);
|
| + ResetStream();
|
| + }
|
| +}
|
| +
|
| +NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
|
| + return negotiated_protocol_;
|
| +}
|
| +
|
| +int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
|
| + if (stream_)
|
| + return headers_bytes_received_ + stream_->stream_bytes_read();
|
| + return headers_bytes_received_ + closed_stream_received_bytes_;
|
| +}
|
| +
|
| +int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const {
|
| + if (stream_)
|
| + return headers_bytes_sent_ + stream_->stream_bytes_written();
|
| + return headers_bytes_sent_ + closed_stream_sent_bytes_;
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnHeadersAvailable(
|
| + const SpdyHeaderBlock& headers,
|
| + size_t frame_len) {
|
| + headers_bytes_received_ += frame_len;
|
| + negotiated_protocol_ = kProtoQUIC1SPDY3;
|
| + if (!has_received_headers_) {
|
| + has_received_headers_ = true;
|
| + delegate_->OnHeadersReceived(headers);
|
| + } else {
|
| + if (stream_->IsDoneReading()) {
|
| + // If the write side is closed, OnFinRead() will call
|
| + // BidirectionalStreamQuicImpl::OnClose().
|
| + stream_->OnFinRead();
|
| + }
|
| + delegate_->OnTrailersReceived(headers);
|
| + }
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnDataAvailable() {
|
| + // Return early if ReadData has not been called.
|
| + if (!read_buffer_)
|
| + return;
|
| +
|
| + CHECK(read_buffer_);
|
| + CHECK_NE(0, read_buffer_len_);
|
| + int rv = ReadData(read_buffer_.get(), read_buffer_len_);
|
| + if (rv == ERR_IO_PENDING) {
|
| + // Spurrious notification. Wait for the next one.
|
| + return;
|
| + }
|
| + read_buffer_ = nullptr;
|
| + read_buffer_len_ = 0;
|
| + delegate_->OnDataRead(rv);
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnClose(QuicErrorCode error) {
|
| + DCHECK(stream_);
|
| + if (error == QUIC_NO_ERROR &&
|
| + stream_->stream_error() == QUIC_STREAM_NO_ERROR) {
|
| + ResetStream();
|
| + return;
|
| + }
|
| + ResetStream();
|
| + NotifyError(was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR
|
| + : ERR_QUIC_HANDSHAKE_FAILED);
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnError(int error) {
|
| + NotifyError(error);
|
| +}
|
| +
|
| +bool BidirectionalStreamQuicImpl::HasSendHeadersComplete() {
|
| + return has_sent_headers_;
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnCryptoHandshakeConfirmed() {
|
| + was_handshake_confirmed_ = true;
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnSessionClosed(
|
| + int error,
|
| + bool /*port_migration_detected*/) {
|
| + DCHECK_NE(OK, error);
|
| + session_.reset();
|
| + NotifyError(error);
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
|
| + DCHECK_NE(ERR_IO_PENDING, rv);
|
| + DCHECK(rv == OK || !stream_);
|
| + if (rv == OK) {
|
| + stream_->SetDelegate(this);
|
| + SendRequestHeaders();
|
| + } else {
|
| + NotifyError(rv);
|
| + }
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
|
| + DCHECK(rv == OK || !stream_);
|
| + if (rv == OK) {
|
| + delegate_->OnDataSent();
|
| + } else {
|
| + NotifyError(rv);
|
| + }
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::SendRequestHeaders() {
|
| + DCHECK(!has_sent_headers_);
|
| + DCHECK(stream_);
|
| +
|
| + SpdyHeaderBlock headers;
|
| + HttpRequestInfo http_request_info;
|
| + http_request_info.url = request_info_->url;
|
| + http_request_info.method = request_info_->method;
|
| + http_request_info.extra_headers = request_info_->extra_headers;
|
| +
|
| + CreateSpdyHeadersFromHttpRequest(http_request_info,
|
| + http_request_info.extra_headers, HTTP2, true,
|
| + &headers);
|
| + size_t frame_len = stream_->WriteHeaders(
|
| + headers, request_info_->end_stream_on_headers, nullptr);
|
| + headers_bytes_sent_ += frame_len;
|
| + has_sent_headers_ = true;
|
| + delegate_->OnHeadersSent();
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::NotifyError(int error) {
|
| + DCHECK_NE(OK, error);
|
| + DCHECK_NE(ERR_IO_PENDING, error);
|
| +
|
| + response_status_ = error;
|
| + ResetStream();
|
| + delegate_->OnFailed(error);
|
| +}
|
| +
|
| +void BidirectionalStreamQuicImpl::ResetStream() {
|
| + if (!stream_)
|
| + return;
|
| + closed_stream_received_bytes_ = stream_->stream_bytes_read();
|
| + closed_stream_sent_bytes_ = stream_->stream_bytes_written();
|
| + stream_->SetDelegate(nullptr);
|
| + stream_ = nullptr;
|
| +}
|
| +
|
| +} // namespace net
|
|
|