Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(414)

Unified Diff: net/http/http_stream_factory_impl_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Self review Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/http/http_stream_factory_impl_job.cc
diff --git a/net/http/http_stream_factory_impl_job.cc b/net/http/http_stream_factory_impl_job.cc
index eabbed0bba902d2475b00a21b1aff6706dadd966..f2ce8d833e985a2cb22a4ddb1a14072036d02912 100644
--- a/net/http/http_stream_factory_impl_job.cc
+++ b/net/http/http_stream_factory_impl_job.cc
@@ -50,6 +50,11 @@
#include "net/ssl/ssl_connection_status_flags.h"
#include "net/ssl/ssl_failure_state.h"
+#if defined(ENABLE_BIDIRECTIONAL_STREAM)
+#include "net/http/bidirectional_stream_job.h"
+#include "net/spdy/bidirectional_stream_spdy_job.h"
+#endif
+
namespace net {
// Returns parameters associated with the start of a HTTP stream job.
@@ -137,6 +142,7 @@ HttpStreamFactoryImpl::Job::Job(HttpStreamFactoryImpl* stream_factory,
spdy_session_direct_(false),
job_status_(STATUS_RUNNING),
other_job_status_(STATUS_RUNNING),
+ for_bidirectional_(false),
ptr_factory_(this) {
DCHECK(stream_factory);
DCHECK(session);
@@ -168,6 +174,8 @@ HttpStreamFactoryImpl::Job::~Job() {
void HttpStreamFactoryImpl::Job::Start(Request* request) {
DCHECK(request);
request_ = request;
+ // Saves |for_bidirectional_|, since request is nulled when job is orphaned.
+ for_bidirectional_ = request_->for_bidirectional();
StartInternal();
}
@@ -358,8 +366,33 @@ void HttpStreamFactoryImpl::Job::OnWebSocketHandshakeStreamReadyCallback() {
// |this| may be deleted after this call.
}
+void HttpStreamFactoryImpl::Job::OnBidirectionalStreamJobReadyCallback() {
+#if defined(ENABLE_BIDIRECTIONAL_STREAM)
+ DCHECK(bidirectional_stream_job_);
+
+ MaybeCopyConnectionAttemptsFromSocketOrHandle();
+
+ if (IsOrphaned()) {
+ stream_factory_->OnOrphanedJobComplete(this);
+ } else {
+ request_->Complete(was_npn_negotiated(), protocol_negotiated(),
+ using_spdy());
+ request_->OnBidirectionalStreamJobReady(
+ this, server_ssl_config_, proxy_info_,
+ bidirectional_stream_job_.release());
+ }
+// |this| may be deleted after this call.
+#else
+ DCHECK(false);
+#endif
+}
+
void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() {
+#if defined(ENABLE_BIDIRECTIONAL_STREAM)
+ DCHECK(stream_.get() || bidirectional_stream_job_.get());
+#else
DCHECK(stream_.get());
+#endif
DCHECK(!IsPreconnecting());
DCHECK(using_spdy());
// Note: an event loop iteration has passed, so |new_spdy_session_| may be
@@ -379,8 +412,22 @@ void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() {
}
stream_factory_->OnOrphanedJobComplete(this);
} else {
- request_->OnNewSpdySessionReady(
- this, stream_.Pass(), spdy_session, spdy_session_direct_);
+ if (for_bidirectional_) {
+#if defined(ENABLE_BIDIRECTIONAL_STREAM)
+ DCHECK(bidirectional_stream_job_);
+ request_->OnNewSpdySessionReady(this, /*spdy_http_stream=*/nullptr,
+ std::move(bidirectional_stream_job_),
+ spdy_session, spdy_session_direct_);
+#else
+ DCHECK(false);
+#endif
+
+ } else {
+ DCHECK(stream_);
+ request_->OnNewSpdySessionReady(this, std::move(stream_),
+ /** bidirectional_stream_job=*/nullptr,
+ spdy_session, spdy_session_direct_);
+ }
}
// |this| may be deleted after this call.
}
@@ -567,6 +614,20 @@ int HttpStreamFactoryImpl::Job::RunLoop(int result) {
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback,
ptr_factory_.GetWeakPtr()));
+ } else if (for_bidirectional_) {
+#if defined(ENABLE_BIDIRECTIONAL_STREAM)
+ if (!bidirectional_stream_job_) {
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::Bind(&Job::OnStreamFailedCallback,
+ ptr_factory_.GetWeakPtr(), ERR_FAILED));
+ } else {
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::Bind(&Job::OnBidirectionalStreamJobReadyCallback,
+ ptr_factory_.GetWeakPtr()));
+ }
+#else
+ DCHECK(false);
+#endif
} else {
DCHECK(stream_.get());
base::ThreadTaskRunnerHandle::Get()->PostTask(
@@ -1144,12 +1205,23 @@ int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) {
return ERR_IO_PENDING;
}
-int HttpStreamFactoryImpl::Job::SetSpdyHttpStream(
- base::WeakPtr<SpdySession> session, bool direct) {
+int HttpStreamFactoryImpl::Job::SetSpdyHttpStreamOrBidirectionalStreamJob(
+ base::WeakPtr<SpdySession> session,
+ bool direct) {
// TODO(ricea): Restore the code for WebSockets over SPDY once it's
// implemented.
if (stream_factory_->for_websockets_)
return ERR_NOT_IMPLEMENTED;
+ if (for_bidirectional_) {
+#if defined(ENABLE_BIDIRECTIONAL_STREAM)
+ // TODO(xunjieli): Create QUIC's version of BidirectionalStreamJob.
+ bidirectional_stream_job_.reset(new BidirectionalStreamSpdyJob(session));
+ return OK;
+#else
+ DCHECK(false);
+ return ERR_FAILED;
+#endif
+ }
// TODO(willchan): Delete this code, because eventually, the
// HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it
@@ -1203,7 +1275,8 @@ int HttpStreamFactoryImpl::Job::DoCreateStream() {
connection_->socket()->Disconnect();
connection_->Reset();
- int set_result = SetSpdyHttpStream(existing_spdy_session_, direct);
+ int set_result = SetSpdyHttpStreamOrBidirectionalStreamJob(
+ existing_spdy_session_, direct);
existing_spdy_session_.reset();
return set_result;
}
@@ -1216,7 +1289,7 @@ int HttpStreamFactoryImpl::Job::DoCreateStream() {
return result;
}
if (spdy_session) {
- return SetSpdyHttpStream(spdy_session, direct);
+ return SetSpdyHttpStreamOrBidirectionalStreamJob(spdy_session, direct);
}
result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket(
@@ -1251,12 +1324,13 @@ int HttpStreamFactoryImpl::Job::DoCreateStream() {
if (http_server_properties)
http_server_properties->SetSupportsSpdy(host_port_pair, true);
- // Create a SpdyHttpStream attached to the session;
- // OnNewSpdySessionReadyCallback is not called until an event loop
+ // Create a SpdyHttpStream or a BidirectionalStreamJob attached to the
+ // session; OnNewSpdySessionReadyCallback is not called until an event loop
// iteration later, so if the SpdySession is closed between then, allow
// reuse state from the underlying socket, sampled by SpdyHttpStream,
// bubble up to the request.
- return SetSpdyHttpStream(new_spdy_session_, spdy_session_direct_);
+ return SetSpdyHttpStreamOrBidirectionalStreamJob(new_spdy_session_,
+ spdy_session_direct_);
}
int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) {

Powered by Google App Engine
This is Rietveld 408576698