Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(199)

Side by Side Diff: net/http/http_stream_factory_impl_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Comments Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_stream_factory_impl_job.h" 5 #include "net/http/http_stream_factory_impl_job.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <string> 8 #include <string>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 24 matching lines...) Expand all
35 #include "net/http/http_stream_factory.h" 35 #include "net/http/http_stream_factory.h"
36 #include "net/http/http_stream_factory_impl_request.h" 36 #include "net/http/http_stream_factory_impl_request.h"
37 #include "net/log/net_log.h" 37 #include "net/log/net_log.h"
38 #include "net/quic/quic_http_stream.h" 38 #include "net/quic/quic_http_stream.h"
39 #include "net/socket/client_socket_handle.h" 39 #include "net/socket/client_socket_handle.h"
40 #include "net/socket/client_socket_pool.h" 40 #include "net/socket/client_socket_pool.h"
41 #include "net/socket/client_socket_pool_manager.h" 41 #include "net/socket/client_socket_pool_manager.h"
42 #include "net/socket/socks_client_socket_pool.h" 42 #include "net/socket/socks_client_socket_pool.h"
43 #include "net/socket/ssl_client_socket.h" 43 #include "net/socket/ssl_client_socket.h"
44 #include "net/socket/ssl_client_socket_pool.h" 44 #include "net/socket/ssl_client_socket_pool.h"
45 #include "net/spdy/bidirectional_spdy_stream.h"
45 #include "net/spdy/spdy_http_stream.h" 46 #include "net/spdy/spdy_http_stream.h"
46 #include "net/spdy/spdy_protocol.h" 47 #include "net/spdy/spdy_protocol.h"
47 #include "net/spdy/spdy_session.h" 48 #include "net/spdy/spdy_session.h"
48 #include "net/spdy/spdy_session_pool.h" 49 #include "net/spdy/spdy_session_pool.h"
49 #include "net/ssl/ssl_cert_request_info.h" 50 #include "net/ssl/ssl_cert_request_info.h"
50 #include "net/ssl/ssl_connection_status_flags.h" 51 #include "net/ssl/ssl_connection_status_flags.h"
51 #include "net/ssl/ssl_failure_state.h" 52 #include "net/ssl/ssl_failure_state.h"
52 53
53 namespace net { 54 namespace net {
54 55
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
130 quic_request_(session_->quic_stream_factory()), 131 quic_request_(session_->quic_stream_factory()),
131 using_existing_quic_session_(false), 132 using_existing_quic_session_(false),
132 spdy_certificate_error_(OK), 133 spdy_certificate_error_(OK),
133 establishing_tunnel_(false), 134 establishing_tunnel_(false),
134 was_npn_negotiated_(false), 135 was_npn_negotiated_(false),
135 protocol_negotiated_(kProtoUnknown), 136 protocol_negotiated_(kProtoUnknown),
136 num_streams_(0), 137 num_streams_(0),
137 spdy_session_direct_(false), 138 spdy_session_direct_(false),
138 job_status_(STATUS_RUNNING), 139 job_status_(STATUS_RUNNING),
139 other_job_status_(STATUS_RUNNING), 140 other_job_status_(STATUS_RUNNING),
141 for_bidirectional_(false),
140 ptr_factory_(this) { 142 ptr_factory_(this) {
141 DCHECK(stream_factory); 143 DCHECK(stream_factory);
142 DCHECK(session); 144 DCHECK(session);
143 if (IsQuicAlternative()) { 145 if (IsQuicAlternative()) {
144 DCHECK(session_->params().enable_quic); 146 DCHECK(session_->params().enable_quic);
145 using_quic_ = true; 147 using_quic_ = true;
146 } 148 }
147 } 149 }
148 150
149 HttpStreamFactoryImpl::Job::~Job() { 151 HttpStreamFactoryImpl::Job::~Job() {
(...skipping 11 matching lines...) Expand all
161 session_->proxy_service()->CancelPacRequest(pac_request_); 163 session_->proxy_service()->CancelPacRequest(pac_request_);
162 164
163 // The stream could be in a partial state. It is not reusable. 165 // The stream could be in a partial state. It is not reusable.
164 if (stream_.get() && next_state_ != STATE_DONE) 166 if (stream_.get() && next_state_ != STATE_DONE)
165 stream_->Close(true /* not reusable */); 167 stream_->Close(true /* not reusable */);
166 } 168 }
167 169
168 void HttpStreamFactoryImpl::Job::Start(Request* request) { 170 void HttpStreamFactoryImpl::Job::Start(Request* request) {
169 DCHECK(request); 171 DCHECK(request);
170 request_ = request; 172 request_ = request;
173 // Saves |for_bidirectional_|, since request is nulled when job is orphaned.
174 for_bidirectional_ = request_->for_bidirectional();
171 StartInternal(); 175 StartInternal();
172 } 176 }
173 177
174 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) { 178 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) {
175 DCHECK_GT(num_streams, 0); 179 DCHECK_GT(num_streams, 0);
176 base::WeakPtr<HttpServerProperties> http_server_properties = 180 base::WeakPtr<HttpServerProperties> http_server_properties =
177 session_->http_server_properties(); 181 session_->http_server_properties();
178 if (http_server_properties && 182 if (http_server_properties &&
179 http_server_properties->SupportsRequestPriority( 183 http_server_properties->SupportsRequestPriority(
180 HostPortPair::FromURL(request_info_.url))) { 184 HostPortPair::FromURL(request_info_.url))) {
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 355 MaybeCopyConnectionAttemptsFromSocketOrHandle();
352 356
353 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy()); 357 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy());
354 request_->OnWebSocketHandshakeStreamReady(this, 358 request_->OnWebSocketHandshakeStreamReady(this,
355 server_ssl_config_, 359 server_ssl_config_,
356 proxy_info_, 360 proxy_info_,
357 websocket_stream_.release()); 361 websocket_stream_.release());
358 // |this| may be deleted after this call. 362 // |this| may be deleted after this call.
359 } 363 }
360 364
365 void HttpStreamFactoryImpl::Job::OnBidirectionalStreamReadyCallback() {
366 DCHECK(bidirectional_stream_);
367
368 MaybeCopyConnectionAttemptsFromSocketOrHandle();
369
370 if (IsOrphaned()) {
371 stream_factory_->OnOrphanedJobComplete(this);
372 } else {
373 request_->Complete(was_npn_negotiated(), protocol_negotiated(),
374 using_spdy());
375 request_->OnBidirectionalStreamReady(this, server_ssl_config_, proxy_info_,
376 bidirectional_stream_.release());
377 }
378 // |this| may be deleted after this call.
379 }
380
361 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() { 381 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() {
362 DCHECK(stream_.get()); 382 DCHECK(stream_.get() || bidirectional_stream_.get());
363 DCHECK(!IsPreconnecting()); 383 DCHECK(!IsPreconnecting());
364 DCHECK(using_spdy()); 384 DCHECK(using_spdy());
365 // Note: an event loop iteration has passed, so |new_spdy_session_| may be 385 // Note: an event loop iteration has passed, so |new_spdy_session_| may be
366 // NULL at this point if the SpdySession closed immediately after creation. 386 // NULL at this point if the SpdySession closed immediately after creation.
367 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_; 387 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_;
368 new_spdy_session_.reset(); 388 new_spdy_session_.reset();
369 389
370 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 390 MaybeCopyConnectionAttemptsFromSocketOrHandle();
371 391
372 // TODO(jgraettinger): Notify the factory, and let that notify |request_|, 392 // TODO(jgraettinger): Notify the factory, and let that notify |request_|,
373 // rather than notifying |request_| directly. 393 // rather than notifying |request_| directly.
374 if (IsOrphaned()) { 394 if (IsOrphaned()) {
375 if (spdy_session) { 395 if (spdy_session) {
376 stream_factory_->OnNewSpdySessionReady( 396 stream_factory_->OnNewSpdySessionReady(
377 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_, 397 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_,
378 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_); 398 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_);
379 } 399 }
380 stream_factory_->OnOrphanedJobComplete(this); 400 stream_factory_->OnOrphanedJobComplete(this);
381 } else { 401 } else {
382 request_->OnNewSpdySessionReady( 402 if (for_bidirectional_) {
383 this, stream_.Pass(), spdy_session, spdy_session_direct_); 403 request_->OnNewSpdySessionReady(this, /*spdy_http_stream=*/nullptr,
404 bidirectional_stream_.Pass(),
405 spdy_session, spdy_session_direct_);
406
407 } else {
408 request_->OnNewSpdySessionReady(this, stream_.Pass(),
409 /** bidirectional_stream=*/nullptr,
mef 2015/10/20 21:56:35 I wonder what would happen if you call Pass() on s
xunjieli 2015/10/21 19:35:36 I am not sure. I guess that will give nulls? I wil
410 spdy_session, spdy_session_direct_);
411 }
384 } 412 }
385 // |this| may be deleted after this call. 413 // |this| may be deleted after this call.
386 } 414 }
387 415
388 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) { 416 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) {
389 DCHECK(!IsPreconnecting()); 417 DCHECK(!IsPreconnecting());
390 418
391 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 419 MaybeCopyConnectionAttemptsFromSocketOrHandle();
392 420
393 if (IsOrphaned()) { 421 if (IsOrphaned()) {
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 next_state_ = STATE_DONE; 588 next_state_ = STATE_DONE;
561 if (new_spdy_session_.get()) { 589 if (new_spdy_session_.get()) {
562 base::ThreadTaskRunnerHandle::Get()->PostTask( 590 base::ThreadTaskRunnerHandle::Get()->PostTask(
563 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback, 591 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback,
564 ptr_factory_.GetWeakPtr())); 592 ptr_factory_.GetWeakPtr()));
565 } else if (stream_factory_->for_websockets_) { 593 } else if (stream_factory_->for_websockets_) {
566 DCHECK(websocket_stream_); 594 DCHECK(websocket_stream_);
567 base::ThreadTaskRunnerHandle::Get()->PostTask( 595 base::ThreadTaskRunnerHandle::Get()->PostTask(
568 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback, 596 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback,
569 ptr_factory_.GetWeakPtr())); 597 ptr_factory_.GetWeakPtr()));
598 } else if (for_bidirectional_) {
599 if (bidirectional_stream_ == nullptr) {
mef 2015/10/20 21:56:35 why do you need this guard? Shouldn't it go to def
xunjieli 2015/10/21 19:35:36 If H2 is not negotiated, we will still enter the O
600 base::ThreadTaskRunnerHandle::Get()->PostTask(
601 FROM_HERE, base::Bind(&Job::OnStreamFailedCallback,
602 ptr_factory_.GetWeakPtr(), ERR_FAILED));
603 } else {
604 base::ThreadTaskRunnerHandle::Get()->PostTask(
605 FROM_HERE, base::Bind(&Job::OnBidirectionalStreamReadyCallback,
606 ptr_factory_.GetWeakPtr()));
607 }
570 } else { 608 } else {
571 DCHECK(stream_.get()); 609 DCHECK(stream_.get());
572 base::ThreadTaskRunnerHandle::Get()->PostTask( 610 base::ThreadTaskRunnerHandle::Get()->PostTask(
573 FROM_HERE, 611 FROM_HERE,
574 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr())); 612 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr()));
575 } 613 }
576 return ERR_IO_PENDING; 614 return ERR_IO_PENDING;
577 615
578 default: 616 default:
579 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN || 617 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN ||
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
646 NOTREACHED() << "bad state"; 684 NOTREACHED() << "bad state";
647 rv = ERR_FAILED; 685 rv = ERR_FAILED;
648 break; 686 break;
649 } 687 }
650 } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE); 688 } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
651 return rv; 689 return rv;
652 } 690 }
653 691
654 int HttpStreamFactoryImpl::Job::StartInternal() { 692 int HttpStreamFactoryImpl::Job::StartInternal() {
655 CHECK_EQ(STATE_NONE, next_state_); 693 CHECK_EQ(STATE_NONE, next_state_);
694
mef 2015/10/20 21:56:35 spurious nl?
xunjieli 2015/10/21 19:35:36 Done.
656 next_state_ = STATE_START; 695 next_state_ = STATE_START;
657 int rv = RunLoop(OK); 696 int rv = RunLoop(OK);
658 DCHECK_EQ(ERR_IO_PENDING, rv); 697 DCHECK_EQ(ERR_IO_PENDING, rv);
659 return rv; 698 return rv;
660 } 699 }
661 700
662 int HttpStreamFactoryImpl::Job::DoStart() { 701 int HttpStreamFactoryImpl::Job::DoStart() {
663 if (IsSpdyAlternative() || IsQuicAlternative()) { 702 if (IsSpdyAlternative() || IsQuicAlternative()) {
664 server_ = alternative_service_.host_port_pair(); 703 server_ = alternative_service_.host_port_pair();
665 } else { 704 } else {
(...skipping 472 matching lines...) Expand 10 before | Expand all | Expand 10 after
1138 1177
1139 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) { 1178 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) {
1140 // This state indicates that the stream request is in a partially 1179 // This state indicates that the stream request is in a partially
1141 // completed state, and we've called back to the delegate for more 1180 // completed state, and we've called back to the delegate for more
1142 // information. 1181 // information.
1143 1182
1144 // We're always waiting here for the delegate to call us back. 1183 // We're always waiting here for the delegate to call us back.
1145 return ERR_IO_PENDING; 1184 return ERR_IO_PENDING;
1146 } 1185 }
1147 1186
1148 int HttpStreamFactoryImpl::Job::SetSpdyHttpStream( 1187 int HttpStreamFactoryImpl::Job::SetSpdyHttpStreamOrBidirectionalStream(
1149 base::WeakPtr<SpdySession> session, bool direct) { 1188 base::WeakPtr<SpdySession> session,
1189 bool direct) {
1150 // TODO(ricea): Restore the code for WebSockets over SPDY once it's 1190 // TODO(ricea): Restore the code for WebSockets over SPDY once it's
1151 // implemented. 1191 // implemented.
1152 if (stream_factory_->for_websockets_) 1192 if (stream_factory_->for_websockets_)
1153 return ERR_NOT_IMPLEMENTED; 1193 return ERR_NOT_IMPLEMENTED;
1194 if (for_bidirectional_) {
1195 // TODO(xunjieli): Enable creation of QUIC's version of BidirectionalStream.
1196 bidirectional_stream_.reset(new BidirectionalSpdyStream(session));
1197 return OK;
1198 }
1154 1199
1155 // TODO(willchan): Delete this code, because eventually, the 1200 // TODO(willchan): Delete this code, because eventually, the
1156 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it 1201 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it
1157 // will know when SpdySessions become available. 1202 // will know when SpdySessions become available.
1158 1203
1159 bool use_relative_url = direct || request_info_.url.SchemeIs("https"); 1204 bool use_relative_url = direct || request_info_.url.SchemeIs("https");
1160 stream_.reset(new SpdyHttpStream(session, use_relative_url)); 1205 stream_.reset(new SpdyHttpStream(session, use_relative_url));
1161 return OK; 1206 return OK;
1162 } 1207 }
1163 1208
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
1197 1242
1198 CHECK(!stream_.get()); 1243 CHECK(!stream_.get());
1199 1244
1200 bool direct = !IsHttpsProxyAndHttpUrl(); 1245 bool direct = !IsHttpsProxyAndHttpUrl();
1201 if (existing_spdy_session_.get()) { 1246 if (existing_spdy_session_.get()) {
1202 // We picked up an existing session, so we don't need our socket. 1247 // We picked up an existing session, so we don't need our socket.
1203 if (connection_->socket()) 1248 if (connection_->socket())
1204 connection_->socket()->Disconnect(); 1249 connection_->socket()->Disconnect();
1205 connection_->Reset(); 1250 connection_->Reset();
1206 1251
1207 int set_result = SetSpdyHttpStream(existing_spdy_session_, direct); 1252 int set_result =
1253 SetSpdyHttpStreamOrBidirectionalStream(existing_spdy_session_, direct);
1208 existing_spdy_session_.reset(); 1254 existing_spdy_session_.reset();
1209 return set_result; 1255 return set_result;
1210 } 1256 }
1211 1257
1212 SpdySessionKey spdy_session_key = GetSpdySessionKey(); 1258 SpdySessionKey spdy_session_key = GetSpdySessionKey();
1213 base::WeakPtr<SpdySession> spdy_session; 1259 base::WeakPtr<SpdySession> spdy_session;
1214 int result = valid_spdy_session_pool_->FindAvailableSession( 1260 int result = valid_spdy_session_pool_->FindAvailableSession(
1215 spdy_session_key, net_log_, &spdy_session); 1261 spdy_session_key, net_log_, &spdy_session);
1216 if (result != OK) { 1262 if (result != OK) {
1217 return result; 1263 return result;
1218 } 1264 }
1219 if (spdy_session) { 1265 if (spdy_session) {
1220 return SetSpdyHttpStream(spdy_session, direct); 1266 return SetSpdyHttpStreamOrBidirectionalStream(spdy_session, direct);
1221 } 1267 }
1222 1268
1223 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket( 1269 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket(
1224 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_, 1270 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_,
1225 using_ssl_, &spdy_session); 1271 using_ssl_, &spdy_session);
1226 if (result != OK) { 1272 if (result != OK) {
1227 return result; 1273 return result;
1228 } 1274 }
1229 1275
1230 if (!spdy_session->HasAcceptableTransportSecurity()) { 1276 if (!spdy_session->HasAcceptableTransportSecurity()) {
(...skipping 14 matching lines...) Expand all
1245 } 1291 }
1246 1292
1247 new_spdy_session_ = spdy_session; 1293 new_spdy_session_ = spdy_session;
1248 spdy_session_direct_ = direct; 1294 spdy_session_direct_ = direct;
1249 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair(); 1295 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair();
1250 base::WeakPtr<HttpServerProperties> http_server_properties = 1296 base::WeakPtr<HttpServerProperties> http_server_properties =
1251 session_->http_server_properties(); 1297 session_->http_server_properties();
1252 if (http_server_properties) 1298 if (http_server_properties)
1253 http_server_properties->SetSupportsSpdy(host_port_pair, true); 1299 http_server_properties->SetSupportsSpdy(host_port_pair, true);
1254 1300
1255 // Create a SpdyHttpStream attached to the session; 1301 // Create a SpdyHttpStream or a BidirectionalStream attached to the session;
1256 // OnNewSpdySessionReadyCallback is not called until an event loop 1302 // OnNewSpdySessionReadyCallback is not called until an event loop
1257 // iteration later, so if the SpdySession is closed between then, allow 1303 // iteration later, so if the SpdySession is closed between then, allow
1258 // reuse state from the underlying socket, sampled by SpdyHttpStream, 1304 // reuse state from the underlying socket, sampled by SpdyHttpStream,
1259 // bubble up to the request. 1305 // bubble up to the request.
1260 return SetSpdyHttpStream(new_spdy_session_, spdy_session_direct_); 1306 return SetSpdyHttpStreamOrBidirectionalStream(new_spdy_session_,
1307 spdy_session_direct_);
1261 } 1308 }
1262 1309
1263 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) { 1310 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) {
1264 if (result < 0) 1311 if (result < 0)
1265 return result; 1312 return result;
1266 1313
1267 session_->proxy_service()->ReportSuccess(proxy_info_, 1314 session_->proxy_service()->ReportSuccess(proxy_info_,
1268 session_->network_delegate()); 1315 session_->network_delegate());
1269 next_state_ = STATE_NONE; 1316 next_state_ = STATE_NONE;
1270 return OK; 1317 return OK;
(...skipping 345 matching lines...) Expand 10 before | Expand all | Expand 10 after
1616 if (connection_->socket()) { 1663 if (connection_->socket()) {
1617 ConnectionAttempts socket_attempts; 1664 ConnectionAttempts socket_attempts;
1618 connection_->socket()->GetConnectionAttempts(&socket_attempts); 1665 connection_->socket()->GetConnectionAttempts(&socket_attempts);
1619 request_->AddConnectionAttempts(socket_attempts); 1666 request_->AddConnectionAttempts(socket_attempts);
1620 } else { 1667 } else {
1621 request_->AddConnectionAttempts(connection_->connection_attempts()); 1668 request_->AddConnectionAttempts(connection_->connection_attempts());
1622 } 1669 }
1623 } 1670 }
1624 1671
1625 } // namespace net 1672 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698