Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(370)

Side by Side Diff: net/http/http_stream_factory_impl_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_stream_factory_impl_job.h" 5 #include "net/http/http_stream_factory_impl_job.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <string> 8 #include <string>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/bind_helpers.h" 11 #include "base/bind_helpers.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/metrics/histogram_macros.h" 14 #include "base/metrics/histogram_macros.h"
15 #include "base/profiler/scoped_tracker.h" 15 #include "base/profiler/scoped_tracker.h"
16 #include "base/single_thread_task_runner.h" 16 #include "base/single_thread_task_runner.h"
17 #include "base/stl_util.h" 17 #include "base/stl_util.h"
18 #include "base/strings/string_number_conversions.h" 18 #include "base/strings/string_number_conversions.h"
19 #include "base/strings/string_util.h" 19 #include "base/strings/string_util.h"
20 #include "base/strings/stringprintf.h" 20 #include "base/strings/stringprintf.h"
21 #include "base/thread_task_runner_handle.h" 21 #include "base/thread_task_runner_handle.h"
22 #include "base/values.h" 22 #include "base/values.h"
23 #include "build/build_config.h" 23 #include "build/build_config.h"
24 #include "net/base/connection_type_histograms.h" 24 #include "net/base/connection_type_histograms.h"
25 #include "net/base/net_util.h" 25 #include "net/base/net_util.h"
26 #include "net/base/port_util.h" 26 #include "net/base/port_util.h"
27 #include "net/cert/cert_verifier.h" 27 #include "net/cert/cert_verifier.h"
28 #include "net/http/bidirectional_stream.h"
28 #include "net/http/http_basic_stream.h" 29 #include "net/http/http_basic_stream.h"
29 #include "net/http/http_network_session.h" 30 #include "net/http/http_network_session.h"
30 #include "net/http/http_proxy_client_socket.h" 31 #include "net/http/http_proxy_client_socket.h"
31 #include "net/http/http_proxy_client_socket_pool.h" 32 #include "net/http/http_proxy_client_socket_pool.h"
32 #include "net/http/http_request_info.h" 33 #include "net/http/http_request_info.h"
33 #include "net/http/http_server_properties.h" 34 #include "net/http/http_server_properties.h"
34 #include "net/http/http_stream_factory.h" 35 #include "net/http/http_stream_factory.h"
35 #include "net/http/http_stream_factory_impl_request.h" 36 #include "net/http/http_stream_factory_impl_request.h"
36 #include "net/log/net_log.h" 37 #include "net/log/net_log.h"
37 #include "net/quic/quic_http_stream.h" 38 #include "net/quic/quic_http_stream.h"
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
127 quic_request_(session_->quic_stream_factory()), 128 quic_request_(session_->quic_stream_factory()),
128 using_existing_quic_session_(false), 129 using_existing_quic_session_(false),
129 spdy_certificate_error_(OK), 130 spdy_certificate_error_(OK),
130 establishing_tunnel_(false), 131 establishing_tunnel_(false),
131 was_npn_negotiated_(false), 132 was_npn_negotiated_(false),
132 protocol_negotiated_(kProtoUnknown), 133 protocol_negotiated_(kProtoUnknown),
133 num_streams_(0), 134 num_streams_(0),
134 spdy_session_direct_(false), 135 spdy_session_direct_(false),
135 job_status_(STATUS_RUNNING), 136 job_status_(STATUS_RUNNING),
136 other_job_status_(STATUS_RUNNING), 137 other_job_status_(STATUS_RUNNING),
138 for_bidirectional_(false),
137 ptr_factory_(this) { 139 ptr_factory_(this) {
138 DCHECK(stream_factory); 140 DCHECK(stream_factory);
139 DCHECK(session); 141 DCHECK(session);
140 if (IsQuicAlternative()) { 142 if (IsQuicAlternative()) {
141 DCHECK(session_->params().enable_quic); 143 DCHECK(session_->params().enable_quic);
142 using_quic_ = true; 144 using_quic_ = true;
143 } 145 }
144 } 146 }
145 147
146 HttpStreamFactoryImpl::Job::~Job() { 148 HttpStreamFactoryImpl::Job::~Job() {
(...skipping 11 matching lines...) Expand all
158 session_->proxy_service()->CancelPacRequest(pac_request_); 160 session_->proxy_service()->CancelPacRequest(pac_request_);
159 161
160 // The stream could be in a partial state. It is not reusable. 162 // The stream could be in a partial state. It is not reusable.
161 if (stream_.get() && next_state_ != STATE_DONE) 163 if (stream_.get() && next_state_ != STATE_DONE)
162 stream_->Close(true /* not reusable */); 164 stream_->Close(true /* not reusable */);
163 } 165 }
164 166
165 void HttpStreamFactoryImpl::Job::Start(Request* request) { 167 void HttpStreamFactoryImpl::Job::Start(Request* request) {
166 DCHECK(request); 168 DCHECK(request);
167 request_ = request; 169 request_ = request;
170 // Saves |for_bidirectional_|, since request is nulled when job is orphaned.
171 for_bidirectional_ = request_->for_bidirectional();
168 StartInternal(); 172 StartInternal();
169 } 173 }
170 174
171 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) { 175 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) {
172 DCHECK_GT(num_streams, 0); 176 DCHECK_GT(num_streams, 0);
173 base::WeakPtr<HttpServerProperties> http_server_properties = 177 base::WeakPtr<HttpServerProperties> http_server_properties =
174 session_->http_server_properties(); 178 session_->http_server_properties();
175 if (http_server_properties && 179 if (http_server_properties &&
176 http_server_properties->SupportsRequestPriority( 180 http_server_properties->SupportsRequestPriority(
177 HostPortPair::FromURL(request_info_.url))) { 181 HostPortPair::FromURL(request_info_.url))) {
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
348 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 352 MaybeCopyConnectionAttemptsFromSocketOrHandle();
349 353
350 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy()); 354 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy());
351 request_->OnWebSocketHandshakeStreamReady(this, 355 request_->OnWebSocketHandshakeStreamReady(this,
352 server_ssl_config_, 356 server_ssl_config_,
353 proxy_info_, 357 proxy_info_,
354 websocket_stream_.release()); 358 websocket_stream_.release());
355 // |this| may be deleted after this call. 359 // |this| may be deleted after this call.
356 } 360 }
357 361
362 void HttpStreamFactoryImpl::Job::OnBidirectionalStreamReadyCallback() {
363 DCHECK(bidirectional_stream_);
364 DCHECK(!IsPreconnecting());
365 DCHECK(!stream_factory_->for_websockets_);
366
367 MaybeCopyConnectionAttemptsFromSocketOrHandle();
368
369 if (IsOrphaned()) {
370 stream_factory_->OnOrphanedJobComplete(this);
371 } else {
372 request_->Complete(was_npn_negotiated(), protocol_negotiated(),
373 using_spdy());
374 request_->OnBidirectionalStreamReady(this, server_ssl_config_, proxy_info_,
375 bidirectional_stream_.release());
376 }
377 // |this| may be deleted after this call.
378 }
379
358 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() { 380 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() {
359 DCHECK(stream_.get()); 381 DCHECK(stream_.get() || bidirectional_stream_.get());
360 DCHECK(!IsPreconnecting()); 382 DCHECK(!IsPreconnecting());
361 DCHECK(using_spdy()); 383 DCHECK(using_spdy());
362 // Note: an event loop iteration has passed, so |new_spdy_session_| may be 384 // Note: an event loop iteration has passed, so |new_spdy_session_| may be
363 // NULL at this point if the SpdySession closed immediately after creation. 385 // NULL at this point if the SpdySession closed immediately after creation.
364 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_; 386 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_;
365 new_spdy_session_.reset(); 387 new_spdy_session_.reset();
366 388
367 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 389 MaybeCopyConnectionAttemptsFromSocketOrHandle();
368 390
369 // TODO(jgraettinger): Notify the factory, and let that notify |request_|, 391 // TODO(jgraettinger): Notify the factory, and let that notify |request_|,
370 // rather than notifying |request_| directly. 392 // rather than notifying |request_| directly.
371 if (IsOrphaned()) { 393 if (IsOrphaned()) {
372 if (spdy_session) { 394 if (spdy_session) {
373 stream_factory_->OnNewSpdySessionReady( 395 stream_factory_->OnNewSpdySessionReady(
374 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_, 396 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_,
375 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_); 397 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_);
376 } 398 }
377 stream_factory_->OnOrphanedJobComplete(this); 399 stream_factory_->OnOrphanedJobComplete(this);
378 } else { 400 } else {
379 request_->OnNewSpdySessionReady( 401 if (for_bidirectional_) {
380 this, stream_.Pass(), spdy_session, spdy_session_direct_); 402 request_->OnNewSpdySessionReady(this, nullptr /** spdy_http_stream */,
Bence 2015/10/01 05:29:28 I think the syntax most often used is /*spdy_http_
xunjieli 2015/10/01 18:41:16 Done.
403 bidirectional_stream_.Pass(),
404 spdy_session, spdy_session_direct_);
405
406 } else {
407 request_->OnNewSpdySessionReady(this, stream_.Pass(),
408 nullptr /** bidirectional_stream */,
Bence 2015/10/01 05:29:28 /*bidirectional_stream=*/ nullptr
xunjieli 2015/10/01 18:41:16 Done.
409 spdy_session, spdy_session_direct_);
410 }
381 } 411 }
382 // |this| may be deleted after this call. 412 // |this| may be deleted after this call.
383 } 413 }
384 414
385 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) { 415 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) {
386 DCHECK(!IsPreconnecting()); 416 DCHECK(!IsPreconnecting());
387 417
388 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 418 MaybeCopyConnectionAttemptsFromSocketOrHandle();
389 419
390 if (IsOrphaned()) { 420 if (IsOrphaned()) {
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
557 next_state_ = STATE_DONE; 587 next_state_ = STATE_DONE;
558 if (new_spdy_session_.get()) { 588 if (new_spdy_session_.get()) {
559 base::ThreadTaskRunnerHandle::Get()->PostTask( 589 base::ThreadTaskRunnerHandle::Get()->PostTask(
560 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback, 590 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback,
561 ptr_factory_.GetWeakPtr())); 591 ptr_factory_.GetWeakPtr()));
562 } else if (stream_factory_->for_websockets_) { 592 } else if (stream_factory_->for_websockets_) {
563 DCHECK(websocket_stream_); 593 DCHECK(websocket_stream_);
564 base::ThreadTaskRunnerHandle::Get()->PostTask( 594 base::ThreadTaskRunnerHandle::Get()->PostTask(
565 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback, 595 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback,
566 ptr_factory_.GetWeakPtr())); 596 ptr_factory_.GetWeakPtr()));
597 } else if (for_bidirectional_) {
598 if (bidirectional_stream_ == nullptr) {
599 base::ThreadTaskRunnerHandle::Get()->PostTask(
600 FROM_HERE, base::Bind(&Job::OnStreamFailedCallback,
601 ptr_factory_.GetWeakPtr(), ERR_FAILED));
602 } else {
603 base::ThreadTaskRunnerHandle::Get()->PostTask(
604 FROM_HERE, base::Bind(&Job::OnBidirectionalStreamReadyCallback,
605 ptr_factory_.GetWeakPtr()));
606 }
567 } else { 607 } else {
568 DCHECK(stream_.get()); 608 DCHECK(stream_.get());
569 base::ThreadTaskRunnerHandle::Get()->PostTask( 609 base::ThreadTaskRunnerHandle::Get()->PostTask(
570 FROM_HERE, 610 FROM_HERE,
571 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr())); 611 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr()));
572 } 612 }
573 return ERR_IO_PENDING; 613 return ERR_IO_PENDING;
574 614
575 default: 615 default:
576 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN || 616 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN ||
(...skipping 564 matching lines...) Expand 10 before | Expand all | Expand 10 after
1141 // We're always waiting here for the delegate to call us back. 1181 // We're always waiting here for the delegate to call us back.
1142 return ERR_IO_PENDING; 1182 return ERR_IO_PENDING;
1143 } 1183 }
1144 1184
1145 int HttpStreamFactoryImpl::Job::SetSpdyHttpStream( 1185 int HttpStreamFactoryImpl::Job::SetSpdyHttpStream(
1146 base::WeakPtr<SpdySession> session, bool direct) { 1186 base::WeakPtr<SpdySession> session, bool direct) {
1147 // TODO(ricea): Restore the code for WebSockets over SPDY once it's 1187 // TODO(ricea): Restore the code for WebSockets over SPDY once it's
1148 // implemented. 1188 // implemented.
1149 if (stream_factory_->for_websockets_) 1189 if (stream_factory_->for_websockets_)
1150 return ERR_NOT_IMPLEMENTED; 1190 return ERR_NOT_IMPLEMENTED;
1191 if (for_bidirectional_) {
1192 bidirectional_stream_.reset(new BidirectionalStream(session));
1193 return OK;
1194 }
1151 1195
1152 // TODO(willchan): Delete this code, because eventually, the 1196 // TODO(willchan): Delete this code, because eventually, the
1153 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it 1197 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it
1154 // will know when SpdySessions become available. 1198 // will know when SpdySessions become available.
1155 1199
1156 bool use_relative_url = direct || request_info_.url.SchemeIs("https"); 1200 bool use_relative_url = direct || request_info_.url.SchemeIs("https");
1157 stream_.reset(new SpdyHttpStream(session, use_relative_url)); 1201 stream_.reset(new SpdyHttpStream(session, use_relative_url));
1158 return OK; 1202 return OK;
1159 } 1203 }
1160 1204
(...skipping 450 matching lines...) Expand 10 before | Expand all | Expand 10 after
1611 if (connection_->socket()) { 1655 if (connection_->socket()) {
1612 ConnectionAttempts socket_attempts; 1656 ConnectionAttempts socket_attempts;
1613 connection_->socket()->GetConnectionAttempts(&socket_attempts); 1657 connection_->socket()->GetConnectionAttempts(&socket_attempts);
1614 request_->AddConnectionAttempts(socket_attempts); 1658 request_->AddConnectionAttempts(socket_attempts);
1615 } else { 1659 } else {
1616 request_->AddConnectionAttempts(connection_->connection_attempts()); 1660 request_->AddConnectionAttempts(connection_->connection_attempts());
1617 } 1661 }
1618 } 1662 }
1619 1663
1620 } // namespace net 1664 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698