Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(569)

Side by Side Diff: net/http/http_stream_factory_impl_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Get rid of Delegate::OnClose Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_stream_factory_impl_job.h" 5 #include "net/http/http_stream_factory_impl_job.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <string> 8 #include <string>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 24 matching lines...) Expand all
35 #include "net/http/http_stream_factory.h" 35 #include "net/http/http_stream_factory.h"
36 #include "net/http/http_stream_factory_impl_request.h" 36 #include "net/http/http_stream_factory_impl_request.h"
37 #include "net/log/net_log.h" 37 #include "net/log/net_log.h"
38 #include "net/quic/quic_http_stream.h" 38 #include "net/quic/quic_http_stream.h"
39 #include "net/socket/client_socket_handle.h" 39 #include "net/socket/client_socket_handle.h"
40 #include "net/socket/client_socket_pool.h" 40 #include "net/socket/client_socket_pool.h"
41 #include "net/socket/client_socket_pool_manager.h" 41 #include "net/socket/client_socket_pool_manager.h"
42 #include "net/socket/socks_client_socket_pool.h" 42 #include "net/socket/socks_client_socket_pool.h"
43 #include "net/socket/ssl_client_socket.h" 43 #include "net/socket/ssl_client_socket.h"
44 #include "net/socket/ssl_client_socket_pool.h" 44 #include "net/socket/ssl_client_socket_pool.h"
45 #include "net/spdy/bidirectional_stream_spdy_job.h"
45 #include "net/spdy/spdy_http_stream.h" 46 #include "net/spdy/spdy_http_stream.h"
46 #include "net/spdy/spdy_protocol.h" 47 #include "net/spdy/spdy_protocol.h"
47 #include "net/spdy/spdy_session.h" 48 #include "net/spdy/spdy_session.h"
48 #include "net/spdy/spdy_session_pool.h" 49 #include "net/spdy/spdy_session_pool.h"
49 #include "net/ssl/ssl_cert_request_info.h" 50 #include "net/ssl/ssl_cert_request_info.h"
50 #include "net/ssl/ssl_connection_status_flags.h" 51 #include "net/ssl/ssl_connection_status_flags.h"
51 #include "net/ssl/ssl_failure_state.h" 52 #include "net/ssl/ssl_failure_state.h"
52 53
53 namespace net { 54 namespace net {
54 55
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
130 quic_request_(session_->quic_stream_factory()), 131 quic_request_(session_->quic_stream_factory()),
131 using_existing_quic_session_(false), 132 using_existing_quic_session_(false),
132 spdy_certificate_error_(OK), 133 spdy_certificate_error_(OK),
133 establishing_tunnel_(false), 134 establishing_tunnel_(false),
134 was_npn_negotiated_(false), 135 was_npn_negotiated_(false),
135 protocol_negotiated_(kProtoUnknown), 136 protocol_negotiated_(kProtoUnknown),
136 num_streams_(0), 137 num_streams_(0),
137 spdy_session_direct_(false), 138 spdy_session_direct_(false),
138 job_status_(STATUS_RUNNING), 139 job_status_(STATUS_RUNNING),
139 other_job_status_(STATUS_RUNNING), 140 other_job_status_(STATUS_RUNNING),
141 for_bidirectional_(false),
140 ptr_factory_(this) { 142 ptr_factory_(this) {
141 DCHECK(stream_factory); 143 DCHECK(stream_factory);
142 DCHECK(session); 144 DCHECK(session);
143 if (IsQuicAlternative()) { 145 if (IsQuicAlternative()) {
144 DCHECK(session_->params().enable_quic); 146 DCHECK(session_->params().enable_quic);
145 using_quic_ = true; 147 using_quic_ = true;
146 } 148 }
147 } 149 }
148 150
149 HttpStreamFactoryImpl::Job::~Job() { 151 HttpStreamFactoryImpl::Job::~Job() {
(...skipping 11 matching lines...) Expand all
161 session_->proxy_service()->CancelPacRequest(pac_request_); 163 session_->proxy_service()->CancelPacRequest(pac_request_);
162 164
163 // The stream could be in a partial state. It is not reusable. 165 // The stream could be in a partial state. It is not reusable.
164 if (stream_.get() && next_state_ != STATE_DONE) 166 if (stream_.get() && next_state_ != STATE_DONE)
165 stream_->Close(true /* not reusable */); 167 stream_->Close(true /* not reusable */);
166 } 168 }
167 169
168 void HttpStreamFactoryImpl::Job::Start(Request* request) { 170 void HttpStreamFactoryImpl::Job::Start(Request* request) {
169 DCHECK(request); 171 DCHECK(request);
170 request_ = request; 172 request_ = request;
173 // Saves |for_bidirectional_|, since request is nulled when job is orphaned.
174 for_bidirectional_ = request_->for_bidirectional();
171 StartInternal(); 175 StartInternal();
172 } 176 }
173 177
174 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) { 178 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) {
175 DCHECK_GT(num_streams, 0); 179 DCHECK_GT(num_streams, 0);
176 base::WeakPtr<HttpServerProperties> http_server_properties = 180 base::WeakPtr<HttpServerProperties> http_server_properties =
177 session_->http_server_properties(); 181 session_->http_server_properties();
178 if (http_server_properties && 182 if (http_server_properties &&
179 http_server_properties->SupportsRequestPriority( 183 http_server_properties->SupportsRequestPriority(
180 HostPortPair::FromURL(request_info_.url))) { 184 HostPortPair::FromURL(request_info_.url))) {
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 355 MaybeCopyConnectionAttemptsFromSocketOrHandle();
352 356
353 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy()); 357 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy());
354 request_->OnWebSocketHandshakeStreamReady(this, 358 request_->OnWebSocketHandshakeStreamReady(this,
355 server_ssl_config_, 359 server_ssl_config_,
356 proxy_info_, 360 proxy_info_,
357 websocket_stream_.release()); 361 websocket_stream_.release());
358 // |this| may be deleted after this call. 362 // |this| may be deleted after this call.
359 } 363 }
360 364
365 void HttpStreamFactoryImpl::Job::OnBidirectionalStreamJobReadyCallback() {
366 DCHECK(bidirectional_stream_job_);
367
368 MaybeCopyConnectionAttemptsFromSocketOrHandle();
369
370 if (IsOrphaned()) {
371 stream_factory_->OnOrphanedJobComplete(this);
372 } else {
373 request_->Complete(was_npn_negotiated(), protocol_negotiated(),
374 using_spdy());
375 request_->OnBidirectionalStreamJobReady(
376 this, server_ssl_config_, proxy_info_,
377 bidirectional_stream_job_.release());
378 }
379 // |this| may be deleted after this call.
380 }
381
361 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() { 382 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() {
362 DCHECK(stream_.get()); 383 DCHECK(stream_.get() || bidirectional_stream_job_.get());
363 DCHECK(!IsPreconnecting()); 384 DCHECK(!IsPreconnecting());
364 DCHECK(using_spdy()); 385 DCHECK(using_spdy());
365 // Note: an event loop iteration has passed, so |new_spdy_session_| may be 386 // Note: an event loop iteration has passed, so |new_spdy_session_| may be
366 // NULL at this point if the SpdySession closed immediately after creation. 387 // NULL at this point if the SpdySession closed immediately after creation.
367 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_; 388 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_;
368 new_spdy_session_.reset(); 389 new_spdy_session_.reset();
369 390
370 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 391 MaybeCopyConnectionAttemptsFromSocketOrHandle();
371 392
372 // TODO(jgraettinger): Notify the factory, and let that notify |request_|, 393 // TODO(jgraettinger): Notify the factory, and let that notify |request_|,
373 // rather than notifying |request_| directly. 394 // rather than notifying |request_| directly.
374 if (IsOrphaned()) { 395 if (IsOrphaned()) {
375 if (spdy_session) { 396 if (spdy_session) {
376 stream_factory_->OnNewSpdySessionReady( 397 stream_factory_->OnNewSpdySessionReady(
377 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_, 398 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_,
378 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_); 399 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_);
379 } 400 }
380 stream_factory_->OnOrphanedJobComplete(this); 401 stream_factory_->OnOrphanedJobComplete(this);
381 } else { 402 } else {
382 request_->OnNewSpdySessionReady( 403 if (for_bidirectional_) {
383 this, stream_.Pass(), spdy_session, spdy_session_direct_); 404 DCHECK(bidirectional_stream_job_);
405 request_->OnNewSpdySessionReady(this, /*spdy_http_stream=*/nullptr,
406 bidirectional_stream_job_.Pass(),
407 spdy_session, spdy_session_direct_);
408
409 } else {
410 DCHECK(stream_);
411 request_->OnNewSpdySessionReady(this, stream_.Pass(),
412 /** bidirectional_stream_job=*/nullptr,
413 spdy_session, spdy_session_direct_);
414 }
384 } 415 }
385 // |this| may be deleted after this call. 416 // |this| may be deleted after this call.
386 } 417 }
387 418
388 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) { 419 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) {
389 DCHECK(!IsPreconnecting()); 420 DCHECK(!IsPreconnecting());
390 421
391 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 422 MaybeCopyConnectionAttemptsFromSocketOrHandle();
392 423
393 if (IsOrphaned()) { 424 if (IsOrphaned()) {
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 next_state_ = STATE_DONE; 591 next_state_ = STATE_DONE;
561 if (new_spdy_session_.get()) { 592 if (new_spdy_session_.get()) {
562 base::ThreadTaskRunnerHandle::Get()->PostTask( 593 base::ThreadTaskRunnerHandle::Get()->PostTask(
563 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback, 594 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback,
564 ptr_factory_.GetWeakPtr())); 595 ptr_factory_.GetWeakPtr()));
565 } else if (stream_factory_->for_websockets_) { 596 } else if (stream_factory_->for_websockets_) {
566 DCHECK(websocket_stream_); 597 DCHECK(websocket_stream_);
567 base::ThreadTaskRunnerHandle::Get()->PostTask( 598 base::ThreadTaskRunnerHandle::Get()->PostTask(
568 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback, 599 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback,
569 ptr_factory_.GetWeakPtr())); 600 ptr_factory_.GetWeakPtr()));
601 } else if (for_bidirectional_) {
602 if (!bidirectional_stream_job_) {
603 base::ThreadTaskRunnerHandle::Get()->PostTask(
604 FROM_HERE, base::Bind(&Job::OnStreamFailedCallback,
605 ptr_factory_.GetWeakPtr(), ERR_FAILED));
606 } else {
607 base::ThreadTaskRunnerHandle::Get()->PostTask(
608 FROM_HERE, base::Bind(&Job::OnBidirectionalStreamJobReadyCallback,
609 ptr_factory_.GetWeakPtr()));
610 }
570 } else { 611 } else {
571 DCHECK(stream_.get()); 612 DCHECK(stream_.get());
572 base::ThreadTaskRunnerHandle::Get()->PostTask( 613 base::ThreadTaskRunnerHandle::Get()->PostTask(
573 FROM_HERE, 614 FROM_HERE,
574 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr())); 615 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr()));
575 } 616 }
576 return ERR_IO_PENDING; 617 return ERR_IO_PENDING;
577 618
578 default: 619 default:
579 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN || 620 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN ||
(...skipping 557 matching lines...) Expand 10 before | Expand all | Expand 10 after
1137 1178
1138 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) { 1179 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) {
1139 // This state indicates that the stream request is in a partially 1180 // This state indicates that the stream request is in a partially
1140 // completed state, and we've called back to the delegate for more 1181 // completed state, and we've called back to the delegate for more
1141 // information. 1182 // information.
1142 1183
1143 // We're always waiting here for the delegate to call us back. 1184 // We're always waiting here for the delegate to call us back.
1144 return ERR_IO_PENDING; 1185 return ERR_IO_PENDING;
1145 } 1186 }
1146 1187
1147 int HttpStreamFactoryImpl::Job::SetSpdyHttpStream( 1188 int HttpStreamFactoryImpl::Job::SetSpdyHttpStreamOrBidirectionalStreamJob(
1148 base::WeakPtr<SpdySession> session, bool direct) { 1189 base::WeakPtr<SpdySession> session,
1190 bool direct) {
1149 // TODO(ricea): Restore the code for WebSockets over SPDY once it's 1191 // TODO(ricea): Restore the code for WebSockets over SPDY once it's
1150 // implemented. 1192 // implemented.
1151 if (stream_factory_->for_websockets_) 1193 if (stream_factory_->for_websockets_)
1152 return ERR_NOT_IMPLEMENTED; 1194 return ERR_NOT_IMPLEMENTED;
1195 if (for_bidirectional_) {
1196 // TODO(xunjieli): Create QUIC's version of BidirectionalStreamJob.
1197 bidirectional_stream_job_.reset(new BidirectionalStreamSpdyJob(session));
1198 return OK;
1199 }
1153 1200
1154 // TODO(willchan): Delete this code, because eventually, the 1201 // TODO(willchan): Delete this code, because eventually, the
1155 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it 1202 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it
1156 // will know when SpdySessions become available. 1203 // will know when SpdySessions become available.
1157 1204
1158 bool use_relative_url = direct || request_info_.url.SchemeIs("https"); 1205 bool use_relative_url = direct || request_info_.url.SchemeIs("https");
1159 stream_.reset(new SpdyHttpStream(session, use_relative_url)); 1206 stream_.reset(new SpdyHttpStream(session, use_relative_url));
1160 return OK; 1207 return OK;
1161 } 1208 }
1162 1209
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
1196 1243
1197 CHECK(!stream_.get()); 1244 CHECK(!stream_.get());
1198 1245
1199 bool direct = !IsHttpsProxyAndHttpUrl(); 1246 bool direct = !IsHttpsProxyAndHttpUrl();
1200 if (existing_spdy_session_.get()) { 1247 if (existing_spdy_session_.get()) {
1201 // We picked up an existing session, so we don't need our socket. 1248 // We picked up an existing session, so we don't need our socket.
1202 if (connection_->socket()) 1249 if (connection_->socket())
1203 connection_->socket()->Disconnect(); 1250 connection_->socket()->Disconnect();
1204 connection_->Reset(); 1251 connection_->Reset();
1205 1252
1206 int set_result = SetSpdyHttpStream(existing_spdy_session_, direct); 1253 int set_result = SetSpdyHttpStreamOrBidirectionalStreamJob(
1254 existing_spdy_session_, direct);
1207 existing_spdy_session_.reset(); 1255 existing_spdy_session_.reset();
1208 return set_result; 1256 return set_result;
1209 } 1257 }
1210 1258
1211 SpdySessionKey spdy_session_key = GetSpdySessionKey(); 1259 SpdySessionKey spdy_session_key = GetSpdySessionKey();
1212 base::WeakPtr<SpdySession> spdy_session; 1260 base::WeakPtr<SpdySession> spdy_session;
1213 int result = valid_spdy_session_pool_->FindAvailableSession( 1261 int result = valid_spdy_session_pool_->FindAvailableSession(
1214 spdy_session_key, net_log_, &spdy_session); 1262 spdy_session_key, net_log_, &spdy_session);
1215 if (result != OK) { 1263 if (result != OK) {
1216 return result; 1264 return result;
1217 } 1265 }
1218 if (spdy_session) { 1266 if (spdy_session) {
1219 return SetSpdyHttpStream(spdy_session, direct); 1267 return SetSpdyHttpStreamOrBidirectionalStreamJob(spdy_session, direct);
1220 } 1268 }
1221 1269
1222 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket( 1270 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket(
1223 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_, 1271 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_,
1224 using_ssl_, &spdy_session); 1272 using_ssl_, &spdy_session);
1225 if (result != OK) { 1273 if (result != OK) {
1226 return result; 1274 return result;
1227 } 1275 }
1228 1276
1229 if (!spdy_session->HasAcceptableTransportSecurity()) { 1277 if (!spdy_session->HasAcceptableTransportSecurity()) {
(...skipping 14 matching lines...) Expand all
1244 } 1292 }
1245 1293
1246 new_spdy_session_ = spdy_session; 1294 new_spdy_session_ = spdy_session;
1247 spdy_session_direct_ = direct; 1295 spdy_session_direct_ = direct;
1248 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair(); 1296 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair();
1249 base::WeakPtr<HttpServerProperties> http_server_properties = 1297 base::WeakPtr<HttpServerProperties> http_server_properties =
1250 session_->http_server_properties(); 1298 session_->http_server_properties();
1251 if (http_server_properties) 1299 if (http_server_properties)
1252 http_server_properties->SetSupportsSpdy(host_port_pair, true); 1300 http_server_properties->SetSupportsSpdy(host_port_pair, true);
1253 1301
1254 // Create a SpdyHttpStream attached to the session; 1302 // Create a SpdyHttpStream or a BidirectionalStreamJob attached to the
1255 // OnNewSpdySessionReadyCallback is not called until an event loop 1303 // session; OnNewSpdySessionReadyCallback is not called until an event loop
1256 // iteration later, so if the SpdySession is closed between then, allow 1304 // iteration later, so if the SpdySession is closed between then, allow
1257 // reuse state from the underlying socket, sampled by SpdyHttpStream, 1305 // reuse state from the underlying socket, sampled by SpdyHttpStream,
1258 // bubble up to the request. 1306 // bubble up to the request.
1259 return SetSpdyHttpStream(new_spdy_session_, spdy_session_direct_); 1307 return SetSpdyHttpStreamOrBidirectionalStreamJob(new_spdy_session_,
1308 spdy_session_direct_);
1260 } 1309 }
1261 1310
1262 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) { 1311 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) {
1263 if (result < 0) 1312 if (result < 0)
1264 return result; 1313 return result;
1265 1314
1266 session_->proxy_service()->ReportSuccess(proxy_info_, 1315 session_->proxy_service()->ReportSuccess(proxy_info_,
1267 session_->network_delegate()); 1316 session_->network_delegate());
1268 next_state_ = STATE_NONE; 1317 next_state_ = STATE_NONE;
1269 return OK; 1318 return OK;
(...skipping 345 matching lines...) Expand 10 before | Expand all | Expand 10 after
1615 if (connection_->socket()) { 1664 if (connection_->socket()) {
1616 ConnectionAttempts socket_attempts; 1665 ConnectionAttempts socket_attempts;
1617 connection_->socket()->GetConnectionAttempts(&socket_attempts); 1666 connection_->socket()->GetConnectionAttempts(&socket_attempts);
1618 request_->AddConnectionAttempts(socket_attempts); 1667 request_->AddConnectionAttempts(socket_attempts);
1619 } else { 1668 } else {
1620 request_->AddConnectionAttempts(connection_->connection_attempts()); 1669 request_->AddConnectionAttempts(connection_->connection_attempts());
1621 } 1670 }
1622 } 1671 }
1623 1672
1624 } // namespace net 1673 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698