Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Unified Diff: net/spdy/bidirectional_stream_spdy_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Matt's comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/spdy/bidirectional_stream_spdy_job.cc
diff --git a/net/spdy/bidirectional_stream_spdy_job.cc b/net/spdy/bidirectional_stream_spdy_job.cc
new file mode 100644
index 0000000000000000000000000000000000000000..fbc0d083f4601bd9ad58c20877f75ed71907d2f0
--- /dev/null
+++ b/net/spdy/bidirectional_stream_spdy_job.cc
@@ -0,0 +1,266 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/spdy/bidirectional_stream_spdy_job.h"
+
+#include "base/bind.h"
+#include "base/memory/scoped_ptr.h"
mmenke 2015/12/11 22:19:58 Nit: This should be in the header.
xunjieli 2015/12/11 23:48:40 Done.
+#include "base/time/time.h"
+#include "base/timer/timer.h"
+#include "net/base/request_priority.h"
+#include "net/spdy/spdy_buffer.h"
+#include "net/spdy/spdy_header_block.h"
+#include "net/spdy/spdy_http_utils.h"
+#include "net/spdy/spdy_stream.h"
+
+namespace net {
+
+const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
mmenke 2015/12/11 22:19:59 global must be POD types. Standard way to do this
xunjieli 2015/12/11 23:48:40 Done.
+
+BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
+ const base::WeakPtr<SpdySession>& spdy_session,
+ scoped_ptr<base::Timer> timer)
+ : spdy_session_(spdy_session),
+ timer_(timer.release()),
mmenke 2015/12/11 22:19:58 std::move(timer)
xunjieli 2015/12/11 23:48:40 Done.
+ stream_closed_(false),
+ closed_stream_status_(ERR_FAILED),
+ more_read_data_pending_(false),
+ negotiated_protocol_(kProtoUnknown),
+ closed_stream_received_bytes_(0),
+ closed_stream_sent_bytes_(0),
+ weak_factory_(this) {}
+
+BidirectionalStreamSpdyJob::BidirectionalStreamSpdyJob(
mmenke 2015/12/11 22:19:59 definition order should match delcaration order.
xunjieli 2015/12/11 23:48:40 Done.
+ const base::WeakPtr<SpdySession>& spdy_session)
+ : BidirectionalStreamSpdyJob(
+ spdy_session,
+ make_scoped_ptr(new base::Timer(false, false))) {}
+
+BidirectionalStreamSpdyJob::~BidirectionalStreamSpdyJob() {
+ if (stream_.get()) {
mmenke 2015/12/11 22:19:59 nit: .get() not needed.
xunjieli 2015/12/11 23:48:40 Done.
+ stream_->DetachDelegate();
+ DCHECK(!stream_.get());
mmenke 2015/12/11 22:19:58 Am I missing something, or can this not currently
mmenke 2015/12/11 22:19:58 nit: .get() not needed.
xunjieli 2015/12/11 23:48:40 Why is it unsafe? The delegate can't delete the jo
xunjieli 2015/12/11 23:48:40 Done.
mmenke 2015/12/14 19:48:37 Sorry, I wasn't remotely clear there. See: https:
mmenke 2015/12/14 19:52:34 And I think we should have tests for destruction d
xunjieli 2015/12/14 21:03:05 Done. Thanks for pointing me to the code. I have d
+ }
+}
+
+void BidirectionalStreamSpdyJob::Start(
+ const HttpRequestInfo& request_info,
+ RequestPriority priority,
+ const BoundNetLog& net_log,
+ BidirectionalStreamJob::Delegate* delegate) {
+ delegate_ = delegate;
+ DCHECK(!stream_);
+ if (!spdy_session_) {
+ delegate_->OnFailed(ERR_CONNECTION_CLOSED);
+ return;
+ }
+
+ request_info_ = request_info;
+
+ int rv = stream_request_.StartRequest(
+ SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_.url, priority,
+ net_log, base::Bind(&BidirectionalStreamSpdyJob::OnStreamInitialized,
+ weak_factory_.GetWeakPtr()));
+ if (rv != ERR_IO_PENDING)
+ OnStreamInitialized(rv);
+}
+
+int BidirectionalStreamSpdyJob::ReadData(IOBuffer* buf, int buf_len) {
+ if (stream_.get())
+ CHECK(!stream_->IsIdle());
+
+ CHECK(buf);
+ CHECK(buf_len);
+ CHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
+
+ if (!stream_closed_)
+ CHECK(stream_);
mmenke 2015/12/11 22:19:58 All these CHECKs should be DCHECKs.
xunjieli 2015/12/11 23:48:40 Done.
+
+ // If there is data buffered, complete the IO immediately.
+ if (!data_queue_.IsEmpty()) {
+ return data_queue_.Dequeue(buf->data(), buf_len);
+ } else if (stream_closed_) {
+ return closed_stream_status_;
+ }
+ // Read will complete asynchronously and Delegate::OnReadCompleted will be
+ // called upon completion.
+ user_buffer_ = buf;
+ user_buffer_len_ = buf_len;
+ return ERR_IO_PENDING;
+}
+
+void BidirectionalStreamSpdyJob::SendData(IOBuffer* data,
+ int length,
+ bool end_stream) {
+ CHECK(!stream_closed_);
+ CHECK(stream_);
+
+ stream_->SendData(data, length,
+ end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
+}
+
+void BidirectionalStreamSpdyJob::Cancel() {
+ if (!stream_)
+ return;
+ // Cancels the stream and detaches the delegate so it doesn't get called back.
+ stream_->DetachDelegate();
+ DCHECK(!stream_);
+}
+
+NextProto BidirectionalStreamSpdyJob::GetProtocol() const {
+ return negotiated_protocol_;
+}
+
+int64_t BidirectionalStreamSpdyJob::GetTotalReceivedBytes() const {
+ if (stream_closed_)
+ return closed_stream_received_bytes_;
+
+ if (!stream_)
+ return 0;
+
+ return stream_->raw_received_bytes();
+}
+
+int64_t BidirectionalStreamSpdyJob::GetTotalSentBytes() const {
+ if (stream_closed_)
+ return closed_stream_sent_bytes_;
+
+ if (!stream_)
+ return 0;
+
+ return stream_->raw_sent_bytes();
+}
+
+void BidirectionalStreamSpdyJob::OnRequestHeadersSent() {
+ DCHECK(stream_);
+
+ delegate_->OnHeadersSent();
+}
+
+SpdyResponseHeadersStatus BidirectionalStreamSpdyJob::OnResponseHeadersUpdated(
+ const SpdyHeaderBlock& response_headers) {
+ DCHECK(stream_);
+
+ negotiated_protocol_ = stream_->GetProtocol();
+ delegate_->OnHeadersReceived(response_headers);
+ return RESPONSE_HEADERS_ARE_COMPLETE;
+}
+
+void BidirectionalStreamSpdyJob::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
+ DCHECK(stream_);
+ DCHECK(!stream_closed_);
+
+ if (buffer) {
mmenke 2015/12/11 22:19:58 Maybe early return if NULL?
xunjieli 2015/12/11 23:48:40 Done.
+ data_queue_.Enqueue(buffer.Pass());
mmenke 2015/12/11 22:19:59 std::move
mmenke 2015/12/11 22:19:59 What about flow control? I'm not familiar with th
xunjieli 2015/12/11 23:48:40 Done.
xunjieli 2015/12/11 23:48:40 Good question. I added comment. I believe that Spd
mmenke 2015/12/14 19:48:37 I'll dig into this a bit before responding.
xunjieli 2015/12/14 21:03:05 Acknowledged.
+ if (user_buffer_) {
+ // Handing small chunks of data to the caller creates measurable overhead.
+ // So buffer data in short time-spans and send a single read notification.
+ ScheduleBufferedRead();
mmenke 2015/12/11 22:19:58 Can we just not make this call if data_queue_ was
xunjieli 2015/12/11 23:48:40 There is one case where we still need to invoke On
mmenke 2015/12/14 19:48:37 Think it would actually be simpler to just have On
xunjieli 2015/12/14 21:03:05 We are having OnClose calling into Delegate via De
+ }
+ }
+ // If |buffer| is null, BidirectionalStreamSpdyJob::OnClose will be invoked by
+ // SpdyStream to indicate the end of stream.
+}
+
+void BidirectionalStreamSpdyJob::OnDataSent() {
+ DCHECK(stream_);
+ DCHECK(!stream_closed_);
+
+ delegate_->OnDataSent();
+}
+
+void BidirectionalStreamSpdyJob::OnTrailers(const SpdyHeaderBlock& trailers) {
+ DCHECK(stream_);
+ DCHECK(!stream_closed_);
+
+ delegate_->OnTrailersReceived(trailers);
+}
+
+void BidirectionalStreamSpdyJob::OnClose(int status) {
+ DCHECK(stream_);
+
+ stream_closed_ = true;
+ closed_stream_status_ = status;
+ closed_stream_received_bytes_ = stream_->raw_received_bytes();
+ closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
+ stream_.reset();
+
+ if (status != OK) {
+ delegate_->OnFailed(status);
+ return;
+ }
+ // Complete any remaining read, as all data has been buffered.
+ // If user has not called ReadData (i.e |user_buffer_| is nullptr), this will
+ // do nothing.
+ DCHECK_EQ(OK, status);
+ timer_->Stop();
+ DoBufferedRead();
+}
+
+void BidirectionalStreamSpdyJob::SendRequestHeaders() {
+ scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
+ CreateSpdyHeadersFromHttpRequest(request_info_, request_info_.extra_headers,
+ stream_->GetProtocolVersion(), true,
+ headers.get());
+ stream_->SendRequestHeaders(headers.Pass(), MORE_DATA_TO_SEND);
mmenke 2015/12/11 22:19:58 std::move
xunjieli 2015/12/11 23:48:40 Done.
+}
+
+void BidirectionalStreamSpdyJob::OnStreamInitialized(int rv) {
+ DCHECK_NE(ERR_IO_PENDING, rv);
mmenke 2015/12/11 22:19:58 nit: include base/logging.h
xunjieli 2015/12/11 23:48:40 Done.
+ if (rv == OK) {
+ stream_ = stream_request_.ReleaseStream();
+ stream_->SetDelegate(this);
+ SendRequestHeaders();
+ return;
+ }
+ delegate_->OnFailed(static_cast<Error>(rv));
+}
+
+void BidirectionalStreamSpdyJob::ScheduleBufferedRead() {
+ // If there is already a scheduled DoBufferedRead, don't issue
+ // another one. Mark that we have received more data and return.
+ if (timer_->IsRunning()) {
+ more_read_data_pending_ = true;
+ return;
+ }
+
+ more_read_data_pending_ = false;
+ timer_->Start(FROM_HERE, kBufferTime,
mmenke 2015/12/11 22:19:58 nit: Include base/location.h for FROM_HERE.
xunjieli 2015/12/11 23:48:40 Done.
+ base::Bind(&BidirectionalStreamSpdyJob::DoBufferedRead,
+ weak_factory_.GetWeakPtr()));
+}
+
+void BidirectionalStreamSpdyJob::DoBufferedRead() {
+ DCHECK(!timer_->IsRunning());
+ // If the stream errored out, do not complete the read.
mmenke 2015/12/11 22:19:59 Should we just DCHECK on this, and not allow it, i
xunjieli 2015/12/11 23:48:40 Done.
+ if (!stream_ && !stream_closed_)
+ return;
+ if (stream_closed_ && closed_stream_status_ != OK)
+ return;
+
+ // When |more_read_data_pending_| is true, it means that more data has arrived
+ // since started waiting. Wait a little longer and continue to buffer.
+ if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
mmenke 2015/12/11 22:19:58 Why do we need more_read_data_pending_? Can't we
xunjieli 2015/12/11 23:48:41 I was copying SpdyHttpStream's logic. The ShouldWa
mmenke 2015/12/14 19:48:37 Hrm...I think the extra variable here makes this c
xunjieli 2015/12/14 21:03:05 I don't think ShouldWaitForMoreBufferedData is eno
+ ScheduleBufferedRead();
+ return;
+ }
+
+ int rv = 0;
+ if (user_buffer_) {
+ rv = ReadData(user_buffer_.get(), user_buffer_len_);
+ DCHECK_NE(ERR_IO_PENDING, rv);
+ user_buffer_ = nullptr;
+ user_buffer_len_ = 0;
+ delegate_->OnDataRead(rv);
+ }
+}
+
+bool BidirectionalStreamSpdyJob::ShouldWaitForMoreBufferedData() const {
+ if (stream_closed_)
+ return false;
+ DCHECK_GT(user_buffer_len_, 0);
+ return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_);
+}
+
+} // namespace net

Powered by Google App Engine
This is Rietveld 408576698