Chromium Code Reviews| Index: net/quic/bidirectional_stream_quic_job.cc |
| diff --git a/net/quic/bidirectional_stream_quic_job.cc b/net/quic/bidirectional_stream_quic_job.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..9c58a8c628de9754ea24452f66ab87ea94776cde |
| --- /dev/null |
| +++ b/net/quic/bidirectional_stream_quic_job.cc |
| @@ -0,0 +1,272 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
|
Ryan Hamilton
2016/02/27 00:21:14
It looks like this class has a lot of duplicate co
xunjieli
2016/02/29 15:21:37
I'd like to think this is a thin wrapper on top of
|
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "net/quic/bidirectional_stream_quic_job.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/location.h" |
| +#include "base/logging.h" |
| +#include "base/timer/timer.h" |
| +#include "net/http/bidirectional_stream_request_info.h" |
| +#include "net/socket/next_proto.h" |
| +#include "net/spdy/spdy_header_block.h" |
| +#include "net/spdy/spdy_http_utils.h" |
| + |
| +namespace net { |
| + |
| +BidirectionalStreamQuicJob::BidirectionalStreamQuicJob( |
| + const base::WeakPtr<QuicChromiumClientSession>& session) |
| + : session_(session), |
| + was_handshake_confirmed_(session->IsCryptoHandshakeConfirmed()), |
| + stream_(nullptr), |
| + request_info_(nullptr), |
| + delegate_(nullptr), |
| + response_status_(OK), |
| + negotiated_protocol_(kProtoUnknown), |
| + read_buffer_len_(0), |
| + headers_bytes_received_(0), |
| + headers_bytes_sent_(0), |
| + closed_stream_received_bytes_(0), |
| + closed_stream_sent_bytes_(0), |
| + has_sent_headers_(false), |
| + has_received_headers_(false), |
| + weak_factory_(this) { |
| + DCHECK(session_); |
| + session_->AddObserver(this); |
| +} |
| + |
| +BidirectionalStreamQuicJob::~BidirectionalStreamQuicJob() { |
| + Cancel(); |
| + if (session_) |
| + session_->RemoveObserver(this); |
| +} |
| + |
| +void BidirectionalStreamQuicJob::Start( |
| + const BidirectionalStreamRequestInfo* request_info, |
| + const BoundNetLog& net_log, |
| + BidirectionalStreamJob::Delegate* delegate, |
| + scoped_ptr<base::Timer> /* timer */) { |
| + DCHECK(!stream_); |
| + |
| + if (!session_) { |
| + response_status_ = was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR |
| + : ERR_QUIC_HANDSHAKE_FAILED; |
| + NotifyError(response_status_); |
| + return; |
| + } |
| + |
| + delegate_ = delegate; |
| + request_info_ = request_info; |
| + |
| + int rv = stream_request_.StartRequest( |
| + session_, &stream_, base::Bind(&BidirectionalStreamQuicJob::OnStreamReady, |
| + weak_factory_.GetWeakPtr())); |
| + if (rv == OK) { |
| + OnStreamReady(rv); |
| + } else if (!was_handshake_confirmed_) { |
| + response_status_ = ERR_QUIC_HANDSHAKE_FAILED; |
| + NotifyError(response_status_); |
| + } |
| +} |
| + |
| +int BidirectionalStreamQuicJob::ReadData(IOBuffer* buf, int buf_len) { |
| + DCHECK(buf); |
| + DCHECK(buf_len); |
| + |
| + if (!stream_) { |
| + // If the stream is already closed, there is no body to read. |
| + return response_status_; |
| + } |
| + int rv = stream_->Read(buf, buf_len); |
| + if (rv != ERR_IO_PENDING) { |
| + if (stream_->IsDoneReading()) { |
| + stream_->SetDelegate(nullptr); |
| + stream_->OnFinRead(); // If write side is close, will call OnClose. |
| + ResetStream(); |
| + } |
| + return rv; |
| + } |
| + // Read will complete asynchronously and Delegate::OnReadCompleted will be |
| + // called upon completion. |
| + read_buffer_ = buf; |
| + read_buffer_len_ = buf_len; |
| + return ERR_IO_PENDING; |
| +} |
| + |
| +void BidirectionalStreamQuicJob::SendData(IOBuffer* data, |
| + int length, |
| + bool end_stream) { |
| + DCHECK(stream_); |
| + |
| + if (length > 0 || end_stream) { |
| + base::StringPiece string_data(data->data(), length); |
| + int rv = stream_->WriteStreamData( |
| + string_data, end_stream, |
| + base::Bind(&BidirectionalStreamQuicJob::OnSendDataComplete, |
| + weak_factory_.GetWeakPtr())); |
| + DCHECK(rv == OK || rv == ERR_IO_PENDING); |
| + if (rv == OK) { |
| + base::ThreadTaskRunnerHandle::Get()->PostTask( |
| + FROM_HERE, base::Bind(&BidirectionalStreamQuicJob::OnSendDataComplete, |
| + weak_factory_.GetWeakPtr(), OK)); |
| + } |
| + } |
| +} |
| + |
| +void BidirectionalStreamQuicJob::Cancel() { |
| + if (stream_) { |
| + stream_->SetDelegate(nullptr); |
| + stream_->Reset(QUIC_STREAM_CANCELLED); |
| + ResetStream(); |
| + } |
| +} |
| + |
| +NextProto BidirectionalStreamQuicJob::GetProtocol() const { |
| + return negotiated_protocol_; |
| +} |
| + |
| +int64_t BidirectionalStreamQuicJob::GetTotalReceivedBytes() const { |
| + int64_t total_received_bytes = headers_bytes_received_; |
| + if (stream_) { |
| + total_received_bytes += stream_->stream_bytes_read(); |
| + } else { |
| + total_received_bytes += closed_stream_received_bytes_; |
| + } |
| + return total_received_bytes; |
| +} |
| + |
| +int64_t BidirectionalStreamQuicJob::GetTotalSentBytes() const { |
| + int64_t total_sent_bytes = headers_bytes_sent_; |
| + if (stream_) { |
| + total_sent_bytes += stream_->stream_bytes_written(); |
| + } else { |
| + total_sent_bytes += closed_stream_sent_bytes_; |
| + } |
| + return total_sent_bytes; |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnHeadersAvailable( |
| + const SpdyHeaderBlock& headers, |
| + size_t frame_len) { |
| + headers_bytes_received_ += frame_len; |
| + negotiated_protocol_ = kProtoQUIC1SPDY3; |
| + if (!has_received_headers_) { |
| + has_received_headers_ = true; |
| + delegate_->OnHeadersReceived(headers); |
| + } else { |
| + if (stream_->IsDoneReading()) { |
| + stream_->SetDelegate(nullptr); |
| + stream_->OnFinRead(); // If write side is close, will call OnClose |
| + ResetStream(); |
| + } |
| + delegate_->OnTrailersReceived(headers); |
| + } |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnDataAvailable() { |
| + // Return early if ReadData has not been called. |
| + if (!read_buffer_) |
| + return; |
| + |
| + CHECK(read_buffer_); |
| + CHECK_NE(0, read_buffer_len_); |
| + int rv = ReadData(read_buffer_.get(), read_buffer_len_); |
| + if (rv == ERR_IO_PENDING) { |
| + // Spurrious notification. Wait for the next one. |
| + return; |
| + } |
| + read_buffer_ = nullptr; |
| + read_buffer_len_ = 0; |
| + delegate_->OnDataRead(rv); |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnClose(QuicErrorCode error) { |
| + DCHECK(stream_); |
| + if (error == QUIC_NO_ERROR && |
| + stream_->stream_error() == QUIC_STREAM_NO_ERROR) { |
| + ResetStream(); |
| + return; |
| + } |
| + response_status_ = was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR |
| + : ERR_QUIC_HANDSHAKE_FAILED; |
| + ResetStream(); |
| + NotifyError(response_status_); |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnError(int error) { |
| + NotifyError(error); |
| +} |
| + |
| +bool BidirectionalStreamQuicJob::HasSendHeadersComplete() { |
| + return has_sent_headers_; |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnCryptoHandshakeConfirmed() { |
| + was_handshake_confirmed_ = true; |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnSessionClosed(int error) { |
| + DCHECK_NE(OK, error); |
| + session_.reset(); |
| + NotifyError(error); |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnStreamReady(int rv) { |
| + DCHECK_NE(ERR_IO_PENDING, rv); |
| + DCHECK(rv == OK || !stream_); |
| + if (rv == OK) { |
| + stream_->SetDelegate(this); |
| + SendRequestHeaders(); |
| + } else { |
| + response_status_ = rv; |
| + NotifyError(response_status_); |
| + } |
| +} |
| + |
| +void BidirectionalStreamQuicJob::OnSendDataComplete(int rv) { |
| + DCHECK(rv == OK || !stream_); |
| + if (rv == OK) { |
| + delegate_->OnDataSent(); |
| + } else { |
| + NotifyError(rv); |
| + } |
| +} |
| + |
| +void BidirectionalStreamQuicJob::SendRequestHeaders() { |
| + DCHECK(!has_sent_headers_); |
| + DCHECK(stream_); |
| + |
| + SpdyHeaderBlock headers; |
| + HttpRequestInfo http_request_info; |
| + http_request_info.url = request_info_->url; |
| + http_request_info.method = request_info_->method; |
| + http_request_info.extra_headers = request_info_->extra_headers; |
| + |
| + CreateSpdyHeadersFromHttpRequest(http_request_info, |
| + http_request_info.extra_headers, HTTP2, true, |
| + &headers); |
| + size_t frame_len = stream_->WriteHeaders( |
| + headers, request_info_->end_stream_on_headers, nullptr); |
| + headers_bytes_sent_ += frame_len; |
| + has_sent_headers_ = true; |
| + delegate_->OnHeadersSent(); |
| +} |
| + |
| +void BidirectionalStreamQuicJob::NotifyError(int error) { |
| + DCHECK_NE(OK, error); |
| + DCHECK_NE(ERR_IO_PENDING, error); |
| + |
| + ResetStream(); |
| + delegate_->OnFailed(error); |
| +} |
| + |
| +void BidirectionalStreamQuicJob::ResetStream() { |
| + if (!stream_) |
| + return; |
| + closed_stream_received_bytes_ = stream_->stream_bytes_read(); |
| + closed_stream_sent_bytes_ = stream_->stream_bytes_written(); |
| + stream_ = nullptr; |
| +} |
| + |
| +} // namespace net |