Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(146)

Side by Side Diff: net/http/http_stream_factory_impl_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Avoid inlining Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_stream_factory_impl_job.h" 5 #include "net/http/http_stream_factory_impl_job.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <string> 8 #include <string>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 24 matching lines...) Expand all
35 #include "net/http/http_stream_factory.h" 35 #include "net/http/http_stream_factory.h"
36 #include "net/http/http_stream_factory_impl_request.h" 36 #include "net/http/http_stream_factory_impl_request.h"
37 #include "net/log/net_log.h" 37 #include "net/log/net_log.h"
38 #include "net/quic/quic_http_stream.h" 38 #include "net/quic/quic_http_stream.h"
39 #include "net/socket/client_socket_handle.h" 39 #include "net/socket/client_socket_handle.h"
40 #include "net/socket/client_socket_pool.h" 40 #include "net/socket/client_socket_pool.h"
41 #include "net/socket/client_socket_pool_manager.h" 41 #include "net/socket/client_socket_pool_manager.h"
42 #include "net/socket/socks_client_socket_pool.h" 42 #include "net/socket/socks_client_socket_pool.h"
43 #include "net/socket/ssl_client_socket.h" 43 #include "net/socket/ssl_client_socket.h"
44 #include "net/socket/ssl_client_socket_pool.h" 44 #include "net/socket/ssl_client_socket_pool.h"
45 #include "net/spdy/bidirectional_spdy_stream.h"
45 #include "net/spdy/spdy_http_stream.h" 46 #include "net/spdy/spdy_http_stream.h"
46 #include "net/spdy/spdy_protocol.h" 47 #include "net/spdy/spdy_protocol.h"
47 #include "net/spdy/spdy_session.h" 48 #include "net/spdy/spdy_session.h"
48 #include "net/spdy/spdy_session_pool.h" 49 #include "net/spdy/spdy_session_pool.h"
49 #include "net/ssl/ssl_cert_request_info.h" 50 #include "net/ssl/ssl_cert_request_info.h"
50 #include "net/ssl/ssl_connection_status_flags.h" 51 #include "net/ssl/ssl_connection_status_flags.h"
51 #include "net/ssl/ssl_failure_state.h" 52 #include "net/ssl/ssl_failure_state.h"
52 53
53 namespace net { 54 namespace net {
54 55
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
130 quic_request_(session_->quic_stream_factory()), 131 quic_request_(session_->quic_stream_factory()),
131 using_existing_quic_session_(false), 132 using_existing_quic_session_(false),
132 spdy_certificate_error_(OK), 133 spdy_certificate_error_(OK),
133 establishing_tunnel_(false), 134 establishing_tunnel_(false),
134 was_npn_negotiated_(false), 135 was_npn_negotiated_(false),
135 protocol_negotiated_(kProtoUnknown), 136 protocol_negotiated_(kProtoUnknown),
136 num_streams_(0), 137 num_streams_(0),
137 spdy_session_direct_(false), 138 spdy_session_direct_(false),
138 job_status_(STATUS_RUNNING), 139 job_status_(STATUS_RUNNING),
139 other_job_status_(STATUS_RUNNING), 140 other_job_status_(STATUS_RUNNING),
141 for_bidirectional_(false),
140 ptr_factory_(this) { 142 ptr_factory_(this) {
141 DCHECK(stream_factory); 143 DCHECK(stream_factory);
142 DCHECK(session); 144 DCHECK(session);
143 if (IsQuicAlternative()) { 145 if (IsQuicAlternative()) {
144 DCHECK(session_->params().enable_quic); 146 DCHECK(session_->params().enable_quic);
145 using_quic_ = true; 147 using_quic_ = true;
146 } 148 }
147 } 149 }
148 150
149 HttpStreamFactoryImpl::Job::~Job() { 151 HttpStreamFactoryImpl::Job::~Job() {
(...skipping 11 matching lines...) Expand all
161 session_->proxy_service()->CancelPacRequest(pac_request_); 163 session_->proxy_service()->CancelPacRequest(pac_request_);
162 164
163 // The stream could be in a partial state. It is not reusable. 165 // The stream could be in a partial state. It is not reusable.
164 if (stream_.get() && next_state_ != STATE_DONE) 166 if (stream_.get() && next_state_ != STATE_DONE)
165 stream_->Close(true /* not reusable */); 167 stream_->Close(true /* not reusable */);
166 } 168 }
167 169
168 void HttpStreamFactoryImpl::Job::Start(Request* request) { 170 void HttpStreamFactoryImpl::Job::Start(Request* request) {
169 DCHECK(request); 171 DCHECK(request);
170 request_ = request; 172 request_ = request;
173 // Saves |for_bidirectional_|, since request is nulled when job is orphaned.
174 for_bidirectional_ = request_->for_bidirectional();
171 StartInternal(); 175 StartInternal();
172 } 176 }
173 177
174 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) { 178 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) {
175 DCHECK_GT(num_streams, 0); 179 DCHECK_GT(num_streams, 0);
176 base::WeakPtr<HttpServerProperties> http_server_properties = 180 base::WeakPtr<HttpServerProperties> http_server_properties =
177 session_->http_server_properties(); 181 session_->http_server_properties();
178 if (http_server_properties && 182 if (http_server_properties &&
179 http_server_properties->SupportsRequestPriority( 183 http_server_properties->SupportsRequestPriority(
180 HostPortPair::FromURL(request_info_.url))) { 184 HostPortPair::FromURL(request_info_.url))) {
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 355 MaybeCopyConnectionAttemptsFromSocketOrHandle();
352 356
353 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy()); 357 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy());
354 request_->OnWebSocketHandshakeStreamReady(this, 358 request_->OnWebSocketHandshakeStreamReady(this,
355 server_ssl_config_, 359 server_ssl_config_,
356 proxy_info_, 360 proxy_info_,
357 websocket_stream_.release()); 361 websocket_stream_.release());
358 // |this| may be deleted after this call. 362 // |this| may be deleted after this call.
359 } 363 }
360 364
365 void HttpStreamFactoryImpl::Job::OnBidirectionalStreamReadyCallback() {
366 DCHECK(bidirectional_stream_);
367
368 MaybeCopyConnectionAttemptsFromSocketOrHandle();
369
370 if (IsOrphaned()) {
371 stream_factory_->OnOrphanedJobComplete(this);
372 } else {
373 request_->Complete(was_npn_negotiated(), protocol_negotiated(),
374 using_spdy());
375 request_->OnBidirectionalStreamReady(this, server_ssl_config_, proxy_info_,
376 bidirectional_stream_.release());
377 }
378 // |this| may be deleted after this call.
379 }
380
361 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() { 381 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() {
362 DCHECK(stream_.get()); 382 DCHECK(stream_.get() || bidirectional_stream_.get());
363 DCHECK(!IsPreconnecting()); 383 DCHECK(!IsPreconnecting());
364 DCHECK(using_spdy()); 384 DCHECK(using_spdy());
365 // Note: an event loop iteration has passed, so |new_spdy_session_| may be 385 // Note: an event loop iteration has passed, so |new_spdy_session_| may be
366 // NULL at this point if the SpdySession closed immediately after creation. 386 // NULL at this point if the SpdySession closed immediately after creation.
367 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_; 387 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_;
368 new_spdy_session_.reset(); 388 new_spdy_session_.reset();
369 389
370 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 390 MaybeCopyConnectionAttemptsFromSocketOrHandle();
371 391
372 // TODO(jgraettinger): Notify the factory, and let that notify |request_|, 392 // TODO(jgraettinger): Notify the factory, and let that notify |request_|,
373 // rather than notifying |request_| directly. 393 // rather than notifying |request_| directly.
374 if (IsOrphaned()) { 394 if (IsOrphaned()) {
375 if (spdy_session) { 395 if (spdy_session) {
376 stream_factory_->OnNewSpdySessionReady( 396 stream_factory_->OnNewSpdySessionReady(
377 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_, 397 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_,
378 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_); 398 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_);
379 } 399 }
380 stream_factory_->OnOrphanedJobComplete(this); 400 stream_factory_->OnOrphanedJobComplete(this);
381 } else { 401 } else {
382 request_->OnNewSpdySessionReady( 402 if (for_bidirectional_) {
383 this, stream_.Pass(), spdy_session, spdy_session_direct_); 403 DCHECK(bidirectional_stream_);
404 request_->OnNewSpdySessionReady(this, /*spdy_http_stream=*/nullptr,
405 bidirectional_stream_.Pass(),
406 spdy_session, spdy_session_direct_);
407
408 } else {
409 DCHECK(stream_);
410 request_->OnNewSpdySessionReady(this, stream_.Pass(),
411 /** bidirectional_stream=*/nullptr,
412 spdy_session, spdy_session_direct_);
413 }
384 } 414 }
385 // |this| may be deleted after this call. 415 // |this| may be deleted after this call.
386 } 416 }
387 417
388 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) { 418 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) {
389 DCHECK(!IsPreconnecting()); 419 DCHECK(!IsPreconnecting());
390 420
391 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 421 MaybeCopyConnectionAttemptsFromSocketOrHandle();
392 422
393 if (IsOrphaned()) { 423 if (IsOrphaned()) {
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 next_state_ = STATE_DONE; 590 next_state_ = STATE_DONE;
561 if (new_spdy_session_.get()) { 591 if (new_spdy_session_.get()) {
562 base::ThreadTaskRunnerHandle::Get()->PostTask( 592 base::ThreadTaskRunnerHandle::Get()->PostTask(
563 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback, 593 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback,
564 ptr_factory_.GetWeakPtr())); 594 ptr_factory_.GetWeakPtr()));
565 } else if (stream_factory_->for_websockets_) { 595 } else if (stream_factory_->for_websockets_) {
566 DCHECK(websocket_stream_); 596 DCHECK(websocket_stream_);
567 base::ThreadTaskRunnerHandle::Get()->PostTask( 597 base::ThreadTaskRunnerHandle::Get()->PostTask(
568 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback, 598 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback,
569 ptr_factory_.GetWeakPtr())); 599 ptr_factory_.GetWeakPtr()));
600 } else if (for_bidirectional_) {
601 if (!bidirectional_stream_) {
602 base::ThreadTaskRunnerHandle::Get()->PostTask(
603 FROM_HERE, base::Bind(&Job::OnStreamFailedCallback,
604 ptr_factory_.GetWeakPtr(), ERR_FAILED));
605 } else {
606 base::ThreadTaskRunnerHandle::Get()->PostTask(
607 FROM_HERE, base::Bind(&Job::OnBidirectionalStreamReadyCallback,
608 ptr_factory_.GetWeakPtr()));
609 }
570 } else { 610 } else {
571 DCHECK(stream_.get()); 611 DCHECK(stream_.get());
572 base::ThreadTaskRunnerHandle::Get()->PostTask( 612 base::ThreadTaskRunnerHandle::Get()->PostTask(
573 FROM_HERE, 613 FROM_HERE,
574 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr())); 614 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr()));
575 } 615 }
576 return ERR_IO_PENDING; 616 return ERR_IO_PENDING;
577 617
578 default: 618 default:
579 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN || 619 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN ||
(...skipping 558 matching lines...) Expand 10 before | Expand all | Expand 10 after
1138 1178
1139 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) { 1179 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) {
1140 // This state indicates that the stream request is in a partially 1180 // This state indicates that the stream request is in a partially
1141 // completed state, and we've called back to the delegate for more 1181 // completed state, and we've called back to the delegate for more
1142 // information. 1182 // information.
1143 1183
1144 // We're always waiting here for the delegate to call us back. 1184 // We're always waiting here for the delegate to call us back.
1145 return ERR_IO_PENDING; 1185 return ERR_IO_PENDING;
1146 } 1186 }
1147 1187
1148 int HttpStreamFactoryImpl::Job::SetSpdyHttpStream( 1188 int HttpStreamFactoryImpl::Job::SetSpdyHttpStreamOrBidirectionalStream(
1149 base::WeakPtr<SpdySession> session, bool direct) { 1189 base::WeakPtr<SpdySession> session,
1190 bool direct) {
1150 // TODO(ricea): Restore the code for WebSockets over SPDY once it's 1191 // TODO(ricea): Restore the code for WebSockets over SPDY once it's
1151 // implemented. 1192 // implemented.
1152 if (stream_factory_->for_websockets_) 1193 if (stream_factory_->for_websockets_)
1153 return ERR_NOT_IMPLEMENTED; 1194 return ERR_NOT_IMPLEMENTED;
1195 if (for_bidirectional_) {
1196 // TODO(xunjieli): Enable creation of QUIC's version of BidirectionalStream.
1197 bidirectional_stream_.reset(new BidirectionalSpdyStream(session));
1198 return OK;
1199 }
1154 1200
1155 // TODO(willchan): Delete this code, because eventually, the 1201 // TODO(willchan): Delete this code, because eventually, the
1156 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it 1202 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it
1157 // will know when SpdySessions become available. 1203 // will know when SpdySessions become available.
1158 1204
1159 bool use_relative_url = direct || request_info_.url.SchemeIs("https"); 1205 bool use_relative_url = direct || request_info_.url.SchemeIs("https");
1160 stream_.reset(new SpdyHttpStream(session, use_relative_url)); 1206 stream_.reset(new SpdyHttpStream(session, use_relative_url));
1161 return OK; 1207 return OK;
1162 } 1208 }
1163 1209
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
1197 1243
1198 CHECK(!stream_.get()); 1244 CHECK(!stream_.get());
1199 1245
1200 bool direct = !IsHttpsProxyAndHttpUrl(); 1246 bool direct = !IsHttpsProxyAndHttpUrl();
1201 if (existing_spdy_session_.get()) { 1247 if (existing_spdy_session_.get()) {
1202 // We picked up an existing session, so we don't need our socket. 1248 // We picked up an existing session, so we don't need our socket.
1203 if (connection_->socket()) 1249 if (connection_->socket())
1204 connection_->socket()->Disconnect(); 1250 connection_->socket()->Disconnect();
1205 connection_->Reset(); 1251 connection_->Reset();
1206 1252
1207 int set_result = SetSpdyHttpStream(existing_spdy_session_, direct); 1253 int set_result =
1254 SetSpdyHttpStreamOrBidirectionalStream(existing_spdy_session_, direct);
1208 existing_spdy_session_.reset(); 1255 existing_spdy_session_.reset();
1209 return set_result; 1256 return set_result;
1210 } 1257 }
1211 1258
1212 SpdySessionKey spdy_session_key = GetSpdySessionKey(); 1259 SpdySessionKey spdy_session_key = GetSpdySessionKey();
1213 base::WeakPtr<SpdySession> spdy_session; 1260 base::WeakPtr<SpdySession> spdy_session;
1214 int result = valid_spdy_session_pool_->FindAvailableSession( 1261 int result = valid_spdy_session_pool_->FindAvailableSession(
1215 spdy_session_key, net_log_, &spdy_session); 1262 spdy_session_key, net_log_, &spdy_session);
1216 if (result != OK) { 1263 if (result != OK) {
1217 return result; 1264 return result;
1218 } 1265 }
1219 if (spdy_session) { 1266 if (spdy_session) {
1220 return SetSpdyHttpStream(spdy_session, direct); 1267 return SetSpdyHttpStreamOrBidirectionalStream(spdy_session, direct);
1221 } 1268 }
1222 1269
1223 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket( 1270 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket(
1224 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_, 1271 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_,
1225 using_ssl_, &spdy_session); 1272 using_ssl_, &spdy_session);
1226 if (result != OK) { 1273 if (result != OK) {
1227 return result; 1274 return result;
1228 } 1275 }
1229 1276
1230 if (!spdy_session->HasAcceptableTransportSecurity()) { 1277 if (!spdy_session->HasAcceptableTransportSecurity()) {
(...skipping 14 matching lines...) Expand all
1245 } 1292 }
1246 1293
1247 new_spdy_session_ = spdy_session; 1294 new_spdy_session_ = spdy_session;
1248 spdy_session_direct_ = direct; 1295 spdy_session_direct_ = direct;
1249 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair(); 1296 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair();
1250 base::WeakPtr<HttpServerProperties> http_server_properties = 1297 base::WeakPtr<HttpServerProperties> http_server_properties =
1251 session_->http_server_properties(); 1298 session_->http_server_properties();
1252 if (http_server_properties) 1299 if (http_server_properties)
1253 http_server_properties->SetSupportsSpdy(host_port_pair, true); 1300 http_server_properties->SetSupportsSpdy(host_port_pair, true);
1254 1301
1255 // Create a SpdyHttpStream attached to the session; 1302 // Create a SpdyHttpStream or a BidirectionalStream attached to the session;
1256 // OnNewSpdySessionReadyCallback is not called until an event loop 1303 // OnNewSpdySessionReadyCallback is not called until an event loop
1257 // iteration later, so if the SpdySession is closed between then, allow 1304 // iteration later, so if the SpdySession is closed between then, allow
1258 // reuse state from the underlying socket, sampled by SpdyHttpStream, 1305 // reuse state from the underlying socket, sampled by SpdyHttpStream,
1259 // bubble up to the request. 1306 // bubble up to the request.
1260 return SetSpdyHttpStream(new_spdy_session_, spdy_session_direct_); 1307 return SetSpdyHttpStreamOrBidirectionalStream(new_spdy_session_,
1308 spdy_session_direct_);
1261 } 1309 }
1262 1310
1263 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) { 1311 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) {
1264 if (result < 0) 1312 if (result < 0)
1265 return result; 1313 return result;
1266 1314
1267 session_->proxy_service()->ReportSuccess(proxy_info_, 1315 session_->proxy_service()->ReportSuccess(proxy_info_,
1268 session_->network_delegate()); 1316 session_->network_delegate());
1269 next_state_ = STATE_NONE; 1317 next_state_ = STATE_NONE;
1270 return OK; 1318 return OK;
(...skipping 345 matching lines...) Expand 10 before | Expand all | Expand 10 after
1616 if (connection_->socket()) { 1664 if (connection_->socket()) {
1617 ConnectionAttempts socket_attempts; 1665 ConnectionAttempts socket_attempts;
1618 connection_->socket()->GetConnectionAttempts(&socket_attempts); 1666 connection_->socket()->GetConnectionAttempts(&socket_attempts);
1619 request_->AddConnectionAttempts(socket_attempts); 1667 request_->AddConnectionAttempts(socket_attempts);
1620 } else { 1668 } else {
1621 request_->AddConnectionAttempts(connection_->connection_attempts()); 1669 request_->AddConnectionAttempts(connection_->connection_attempts());
1622 } 1670 }
1623 } 1671 }
1624 1672
1625 } // namespace net 1673 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698