Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(620)

Unified Diff: net/http/http_stream_factory_impl_job_controller.cc

Issue 1941083002: JobController 1: Adding a new class HttpStreamFactoryImpl::JobController (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/http/http_stream_factory_impl_job_controller.cc
diff --git a/net/http/http_stream_factory_impl_job_controller.cc b/net/http/http_stream_factory_impl_job_controller.cc
new file mode 100644
index 0000000000000000000000000000000000000000..6b648d3cc3771b74a0491c66bf0d806dda4faaeb
--- /dev/null
+++ b/net/http/http_stream_factory_impl_job_controller.cc
@@ -0,0 +1,520 @@
+// Copyright (c) 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/http/http_stream_factory_impl_job_controller.h"
+
+#include "net/http/bidirectional_stream_impl.h"
+#include "net/http/http_stream_factory_impl_job.h"
+#include "net/http/http_stream_factory_impl_request.h"
+#include "net/spdy/spdy_session.h"
+
+namespace net {
+
+HttpStreamFactoryImpl::JobController::JobController(
+ HttpStreamFactoryImpl* factory)
+ : factory_(factory),
+ request_(NULL),
+ main_job_(NULL),
+ alternative_job_(NULL) {
+ DCHECK(factory);
+}
+
+HttpStreamFactoryImpl::JobController::~JobController() {
+ std::set<const Job*> tmp_job_set;
+ tmp_job_set.swap(orphaned_job_set_);
+ STLDeleteContainerPointers(tmp_job_set.begin(), tmp_job_set.end());
Ryan Hamilton 2016/05/06 20:49:01 Would this work: STLDeleteContainerPointers(orpha
Zhongyi Shi 2016/05/12 07:26:23 Acknowledged.
+ DCHECK(orphaned_job_set_.empty());
+}
+
+HttpStreamFactoryImpl::Request*
+HttpStreamFactoryImpl::JobController::CreatRequest(
+ const HttpRequestInfo& request_info,
+ HttpStreamRequest::Delegate* delegate,
+ WebSocketHandshakeStreamBase::CreateHelper*
+ websocket_handshake_stream_create_helper,
+ const BoundNetLog& net_log,
+ HttpStreamRequest::StreamType stream_type) {
+ DCHECK(factory_);
+ DCHECK(!request_);
+
+ request_ = new Request(request_info.url, this, delegate,
+ websocket_handshake_stream_create_helper, net_log,
+ stream_type);
+ return request_;
+}
+
+void HttpStreamFactoryImpl::JobController::Start(
+ HttpNetworkSession* session,
+ const HttpRequestInfo& request_info,
+ RequestPriority priority,
+ const SSLConfig& server_ssl_config,
+ const SSLConfig& proxy_ssl_config,
+ HttpStreamRequest::Delegate* delegate,
+ HttpStreamRequest::StreamType stream_type,
+ const BoundNetLog& net_log) {
+ DCHECK(!main_job_);
+ DCHECK(!alternative_job_);
+ HostPortPair destination(HostPortPair::FromURL(request_info.url));
+ GURL origin_url =
+ factory_->ApplyHostMappingRules(request_info.url, &destination);
+
+ main_job_ =
+ new Job(this, session, request_info, priority, server_ssl_config,
+ proxy_ssl_config, destination, origin_url, net_log.net_log());
+ jobs_.insert(main_job_);
+ AttachJob(main_job_);
+
+ // Create an alternative job if alternative service is set up for this domain.
+ const AlternativeService alternative_service =
+ factory_->GetAlternativeServiceFor(request_info, delegate, stream_type);
+
+ if (alternative_service.protocol != UNINITIALIZED_ALTERNATE_PROTOCOL) {
+ // Never share connection with other jobs for FTP requests.
+ DVLOG(1) << "Selected alternative service (host: "
+ << alternative_service.host_port_pair().host()
+ << " port: " << alternative_service.host_port_pair().port() << ")";
+
+ DCHECK(!request_info.url.SchemeIs("ftp"));
+ HostPortPair alternative_destination(alternative_service.host_port_pair());
+ ignore_result(factory_->ApplyHostMappingRules(request_info.url,
Ryan Hamilton 2016/05/06 20:49:02 I wonder if GetAlternativeServiceFor and ApplyHost
Zhongyi Shi 2016/05/12 07:26:23 Done! They are all moved in the new patch.
+ &alternative_destination));
+
+ alternative_job_ =
+ new Job(this, session, request_info, priority, server_ssl_config,
+ proxy_ssl_config, alternative_destination, origin_url,
+ alternative_service, net_log.net_log());
+ jobs_.insert(alternative_job_);
+ AttachJob(alternative_job_);
+
+ main_job_->WaitFor(alternative_job_);
+ // Make sure to wait until we call WaitFor(), before starting
+ // |alternative_job|, otherwise |alternative_job| will not notify |job|
+ // appropriately.
+ alternative_job_->Start(request_);
+ }
+ // Even if |alternative_job| has already finished, it will not have notified
+ // the request yet, since we defer that to the next iteration of the
+ // MessageLoop, so starting |job| is always safe.
+ main_job_->Start(request_);
+}
+
+void HttpStreamFactoryImpl::JobController::Preconnect(
+ int num_streams,
+ HttpNetworkSession* session,
+ const HttpRequestInfo& request_info,
+ const SSLConfig& server_ssl_config,
+ const SSLConfig& proxy_ssl_config) {
+ DCHECK(!main_job_);
+ DCHECK(!alternative_job_);
+
+ HostPortPair destination(HostPortPair::FromURL(request_info.url));
+ GURL origin_url =
+ factory_->ApplyHostMappingRules(request_info.url, &destination);
+
+ const AlternativeService alternative_service =
+ factory_->GetAlternativeServiceFor(request_info, nullptr,
+ HttpStreamRequest::HTTP_STREAM);
+
+ if (alternative_service.protocol != UNINITIALIZED_ALTERNATE_PROTOCOL) {
+ if (session->params().quic_disable_preconnect_if_0rtt &&
+ alternative_service.protocol == QUIC &&
+ session->quic_stream_factory()->ZeroRTTEnabledFor(QuicServerId(
+ alternative_service.host_port_pair(), request_info.privacy_mode))) {
+ MaybeNotifyFactoryOfCompletion();
+ return;
+ }
+ destination = alternative_service.host_port_pair();
+ ignore_result(
+ factory_->ApplyHostMappingRules(request_info.url, &destination));
+ }
+
+ // Due to how the socket pools handle priorities and idle sockets, only IDLE
+ // priority currently makes sense for preconnects. The priority for
+ // preconnects is currently ignored (see RequestSocketsForPool()), but could
+ // be used at some point for proxy resolution or something.
+ main_job_ = new Job(this, session, request_info, IDLE, server_ssl_config,
+ proxy_ssl_config, destination, origin_url,
+ alternative_service, session->net_log());
+ jobs_.insert(main_job_);
+ main_job_->Preconnect(num_streams);
+}
+
+LoadState HttpStreamFactoryImpl::JobController::GetLoadState() const {
+ DCHECK(request_);
+ if (bound_job_.get())
+ return bound_job_->GetLoadState();
+ DCHECK(!jobs_.empty());
+
+ // Just pick the first one.
+ return (*jobs_.begin())->GetLoadState();
+}
+
+void HttpStreamFactoryImpl::JobController::SetPriority(
+ RequestPriority priority) {
+ for (std::set<HttpStreamFactoryImpl::Job*>::const_iterator it = jobs_.begin();
+ it != jobs_.end(); ++it) {
Ryan Hamilton 2016/05/06 20:49:02 nit: I think you can do a C++ loop here: for (aut
Zhongyi Shi 2016/05/12 07:26:23 Acknowledged.
+ (*it)->SetPriority(priority);
+ }
+ if (bound_job_)
+ bound_job_->SetPriority(priority);
+}
+
+int HttpStreamFactoryImpl::JobController::RestartTunnelWithProxyAuth(
+ const AuthCredentials& credentials) {
+ DCHECK(bound_job_.get());
+ return bound_job_->RestartTunnelWithProxyAuth(credentials);
+}
+
+void HttpStreamFactoryImpl::JobController::AttachJob(Job* job) {
+ DCHECK(job);
+ jobs_.insert(job);
+ factory_->request_map_[job] = request_;
Ryan Hamilton 2016/05/06 20:49:02 Is request_map_ still used in your new world? If s
Zhongyi Shi 2016/05/12 07:26:23 This is still used as it is in the old code to mak
+}
+
+void HttpStreamFactoryImpl::JobController::BindJob(Job* job) {
+ DCHECK(request_);
+ DCHECK(job);
+ DCHECK(!bound_job_.get());
+ DCHECK(ContainsKey(jobs_, job));
+ bound_job_.reset(job);
+ jobs_.erase(job);
+ factory_->request_map_.erase(job);
+
+ request_->net_log().AddEvent(
+ NetLog::TYPE_HTTP_STREAM_REQUEST_BOUND_TO_JOB,
+ job->net_log().source().ToEventParametersCallback());
+ job->net_log().AddEvent(
+ NetLog::TYPE_HTTP_STREAM_JOB_BOUND_TO_REQUEST,
+ request_->net_log().source().ToEventParametersCallback());
+
+ OrphanJobs();
+}
+
+void HttpStreamFactoryImpl::JobController::OrphanJobs() {
+ DCHECK(request_);
+ request_->RemoveRequestFromSpdySessionRequestMap();
+
+ std::set<Job*> tmp;
+ tmp.swap(jobs_);
+
+ for (Job* job : tmp) {
+ DCHECK(ContainsKey(factory_->request_map_, job));
+ DCHECK_EQ(factory_->request_map_[job], request_);
+ DCHECK(!ContainsKey(orphaned_job_set_, job));
+
+ factory_->request_map_.erase(job);
+
+ orphaned_job_set_.insert(job);
+ job->Orphan(request_);
+ }
+}
+
+void HttpStreamFactoryImpl::JobController::CancelJobs() {
+ DCHECK(request_);
+ request_->RemoveRequestFromSpdySessionRequestMap();
+ std::set<Job*> tmp;
+ tmp.swap(jobs_);
+
+ for (Job* job : tmp) {
+ factory_->request_map_.erase(job);
+ delete job;
+ }
+}
+
+void HttpStreamFactoryImpl::JobController::OnJobSucceeded(Job* job) {
+ // |job| should only be NULL if we're being serviced by a late bound
+ // SpdySession (one that was not created by a job in our |jobs_| set).
+ if (!job) {
+ DCHECK(!bound_job_.get());
+ DCHECK(!jobs_.empty());
+ // NOTE(willchan): We do *NOT* call OrphanJobs() here. The reason is because
+ // we *WANT* to cancel the unnecessary Jobs from other requests if another
+ // Job completes first.
+ // TODO(mbelshe): Revisit this when we implement ip connection pooling of
+ // SpdySessions. Do we want to orphan the jobs for a different hostname so
+ // they complete? Or do we want to prevent connecting a new SpdySession if
+ // we've already got one available for a different hostname where the ip
+ // address matches up?
+ CancelJobs();
+ return;
+ }
+ if (!bound_job_.get()) {
+ if (jobs_.size() > 1)
+ job->ReportJobSucceededForRequest();
+ // Notify all the other jobs that this one succeeded.
+ for (std::set<Job*>::iterator it = jobs_.begin(); it != jobs_.end(); ++it) {
+ if (*it != job) {
+ (*it)->MarkOtherJobComplete(*job);
+ }
+ }
+ // We may have other jobs in |jobs_|. For example, if we start multiple jobs
+ // for Alternate-Protocol.
+ BindJob(job);
+ return;
+ }
+ DCHECK(jobs_.empty());
+}
+
+void HttpStreamFactoryImpl::JobController::OnRequestFinish() {
+ DCHECK(request_);
+ if (bound_job_.get()) {
+ DCHECK(jobs_.empty());
+ }
+ request_ = NULL;
+ bound_job_.reset();
+ MaybeNotifyFactoryOfCompletion();
+}
+
+void HttpStreamFactoryImpl::JobController::MarkRequestComplete(
+ bool was_npn_negotiated,
+ NextProto protocol_negotiated,
+ bool using_spdy) {
+ if (request_)
+ request_->Complete(was_npn_negotiated, protocol_negotiated, using_spdy);
+}
+
+void HttpStreamFactoryImpl::JobController::OnStreamReady(
+ Job* job,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ HttpStream* stream) {
+ if (!request_)
+ return;
+ DCHECK(!factory_->for_websockets_);
+ DCHECK_EQ(HttpStreamRequest::HTTP_STREAM, request_->stream_type());
+ DCHECK(stream);
+
+ if (job)
+ OnJobSucceeded(job);
+ request_->OnStreamReady(used_ssl_config, used_proxy_info, stream);
+}
+
+void HttpStreamFactoryImpl::JobController::OnBidirectionalStreamImplReady(
+ Job* job,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ BidirectionalStreamImpl* stream) {
+ if (!request_)
+ return;
+ DCHECK(!factory_->for_websockets_);
+ DCHECK_EQ(HttpStreamRequest::BIDIRECTIONAL_STREAM, request_->stream_type());
+ DCHECK(stream);
+
+ if (job)
+ OnJobSucceeded(job);
+ request_->OnBidirectionalStreamImplReady(used_ssl_config, used_proxy_info,
+ stream);
+}
+
+void HttpStreamFactoryImpl::JobController::OnWebSocketHandshakeStreamReady(
+ Job* job,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ WebSocketHandshakeStreamBase* stream) {
+ if (!request_)
+ return;
+ DCHECK(factory_->for_websockets_);
+ DCHECK_EQ(HttpStreamRequest::HTTP_STREAM, request_->stream_type());
+ DCHECK(stream);
+
+ if (job)
+ OnJobSucceeded(job);
+ request_->OnWebSocketHandshakeStreamReady(used_ssl_config, used_proxy_info,
+ stream);
+}
+
+void HttpStreamFactoryImpl::JobController::OnStreamFailed(
+ Job* job,
+ int status,
+ const SSLConfig& used_ssl_config,
+ SSLFailureState ssl_failure_state) {
+ if (!request_)
+ return;
+ DCHECK_NE(OK, status);
+ DCHECK(job);
+
+ if (!bound_job_.get()) {
+ if (jobs_.size() > 1) {
+ // Hey, we've got other jobs! Maybe one of them will succeed, let's just
+ // ignore this failure.
+ jobs_.erase(job);
+ factory_->request_map_.erase(job);
+ // Notify all the other jobs that this one failed.
+ for (std::set<Job*>::iterator it = jobs_.begin(); it != jobs_.end(); ++it)
+ (*it)->MarkOtherJobComplete(*job);
+ delete job;
+ return;
+ } else {
+ BindJob(job);
+ }
+ } else {
+ DCHECK(jobs_.empty());
+ }
+
+ request_->OnStreamFailed(status, used_ssl_config, ssl_failure_state);
+}
+
+void HttpStreamFactoryImpl::JobController::OnCertificateError(
+ Job* job,
+ int status,
+ const SSLConfig& used_ssl_config,
+ const SSLInfo& ssl_info) {
+ if (!request_)
+ return;
+ DCHECK_NE(OK, status);
+ if (!bound_job_.get())
+ BindJob(job);
+ else
+ DCHECK(jobs_.empty());
+
+ request_->OnCertificateError(status, used_ssl_config, ssl_info);
+}
+
+void HttpStreamFactoryImpl::JobController::OnNeedsProxyAuth(
+ Job* job,
+ const HttpResponseInfo& proxy_response,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ HttpAuthController* auth_controller) {
+ if (!request_)
+ return;
+ if (!bound_job_.get())
+ BindJob(job);
+ else
+ DCHECK(jobs_.empty());
+ request_->OnNeedsProxyAuth(proxy_response, used_ssl_config, used_proxy_info,
+ auth_controller);
+}
+
+void HttpStreamFactoryImpl::JobController::OnNeedsClientAuth(
+ Job* job,
+ const SSLConfig& used_ssl_config,
+ SSLCertRequestInfo* cert_info) {
+ if (!request_)
+ return;
+ if (!bound_job_.get())
+ BindJob(job);
+ else
+ DCHECK(jobs_.empty());
+
+ request_->OnNeedsClientAuth(used_ssl_config, cert_info);
+}
+
+void HttpStreamFactoryImpl::JobController::OnHttpsProxyTunnelResponse(
+ Job* job,
+ const HttpResponseInfo& response_info,
+ const SSLConfig& used_ssl_config,
+ const ProxyInfo& used_proxy_info,
+ HttpStream* stream) {
+ if (!bound_job_.get())
+ BindJob(job);
+ else
+ DCHECK(jobs_.empty());
+ if (!request_)
+ return;
+ request_->OnHttpsProxyTunnelResponse(response_info, used_ssl_config,
+ used_proxy_info, stream);
+}
+
+void HttpStreamFactoryImpl::JobController::OnNewSpdySessionReady(
+ Job* job,
+ std::unique_ptr<HttpStream> stream,
+ std::unique_ptr<BidirectionalStreamImpl> bidirectional_stream_impl,
+ const base::WeakPtr<SpdySession>& spdy_session,
+ bool direct) {
+ DCHECK(job);
+ DCHECK(job->using_spdy());
+
+ // Cache these values in case the job gets deleted.
+ const SSLConfig used_ssl_config = job->server_ssl_config();
+ const ProxyInfo used_proxy_info = job->proxy_info();
+ const bool was_npn_negotiated = job->was_npn_negotiated();
+ const NextProto protocol_negotiated = job->protocol_negotiated();
+ const bool using_spdy = job->using_spdy();
+ const BoundNetLog net_log = job->net_log();
+
+ // Cache this so we can still use it if the JobController is deleted.
+ HttpStreamFactoryImpl* factory = factory_;
+
+ // Notify |request_|.
+ if (!job->IsPreconnecting() && !job->IsOrphaned()) {
+ DCHECK(request_);
+
+ // The first case is the usual case.
+ if (!bound_job_.get()) {
+ BindJob(job);
+ } else { // This is the case for HTTPS proxy tunneling.
+ DCHECK_EQ(bound_job_.get(), job);
+ DCHECK(jobs_.empty());
+ }
+
+ request_->Complete(was_npn_negotiated, protocol_negotiated, using_spdy);
+
+ request_->OnNewSpdySessionReady(
+ std::move(stream), std::move(bidirectional_stream_impl), spdy_session,
+ used_ssl_config, used_proxy_info, was_npn_negotiated,
+ protocol_negotiated, using_spdy, net_log, direct);
+ }
+
+ // Notify |factory_|. |request_| might be deleted already.
+ if (spdy_session && spdy_session->IsAvailable()) {
+ factory->OnNewSpdySessionReady(spdy_session, direct, used_ssl_config,
+ used_proxy_info, was_npn_negotiated,
+ protocol_negotiated, using_spdy, net_log);
+ }
+}
+
+void HttpStreamFactoryImpl::JobController::OnPreconnectsComplete(Job* job) {
+ jobs_.erase(job);
+ delete job;
+ factory_->OnPreconnectsCompleteInternal();
+ MaybeNotifyFactoryOfCompletion();
+}
+
+void HttpStreamFactoryImpl::JobController::OnOrphanedJobComplete(
+ const Job* job) {
+ orphaned_job_set_.erase(job);
+ delete job;
+ MaybeNotifyFactoryOfCompletion();
+}
+
+void HttpStreamFactoryImpl::JobController::AddConnectionAttemptsToRequest(
+ const ConnectionAttempts& attempts) {
+ DCHECK(request_);
+ request_->AddConnectionAttempts(attempts);
+}
+
+void HttpStreamFactoryImpl::JobController::SetSpdySessionKey(
+ const SpdySessionKey& spdy_session_key) {
+ DCHECK(request_);
+ if (!request_->HasSpdySessionKey()) {
+ request_->SetSpdySessionKey(spdy_session_key);
+ }
+}
+
+void HttpStreamFactoryImpl::JobController::
+ RemoveRequestFromSpdySessionRequestMap() {
+ DCHECK(request_);
+ request_->RemoveRequestFromSpdySessionRequestMap();
+}
+
+void HttpStreamFactoryImpl::JobController::MaybeNotifyFactoryOfCompletion() {
+ if (!request_ && jobs_.empty() && orphaned_job_set_.empty()) {
+ DCHECK(jobs_.empty());
+ factory_->OnJobControllerComplete(this);
+ }
+}
+
+const BoundNetLog& HttpStreamFactoryImpl::JobController::GetNetLogFromRequest()
+ const {
+ DCHECK(request_);
+ return request_->net_log();
+}
+
+WebSocketHandshakeStreamBase::CreateHelper* HttpStreamFactoryImpl::
+ JobController::websocket_handshake_stream_create_helper() {
+ DCHECK(request_);
+ return request_->websocket_handshake_stream_create_helper();
+}
+}

Powered by Google App Engine
This is Rietveld 408576698