Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(285)

Side by Side Diff: net/http/http_stream_factory_impl_job.cc

Issue 1326503003: Added a net::BidirectionalStream to expose a bidirectional streaming interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address Misha's comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/http/http_stream_factory_impl_job.h" 5 #include "net/http/http_stream_factory_impl_job.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <string> 8 #include <string>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
43 #include "net/socket/ssl_client_socket.h" 43 #include "net/socket/ssl_client_socket.h"
44 #include "net/socket/ssl_client_socket_pool.h" 44 #include "net/socket/ssl_client_socket_pool.h"
45 #include "net/spdy/spdy_http_stream.h" 45 #include "net/spdy/spdy_http_stream.h"
46 #include "net/spdy/spdy_protocol.h" 46 #include "net/spdy/spdy_protocol.h"
47 #include "net/spdy/spdy_session.h" 47 #include "net/spdy/spdy_session.h"
48 #include "net/spdy/spdy_session_pool.h" 48 #include "net/spdy/spdy_session_pool.h"
49 #include "net/ssl/ssl_cert_request_info.h" 49 #include "net/ssl/ssl_cert_request_info.h"
50 #include "net/ssl/ssl_connection_status_flags.h" 50 #include "net/ssl/ssl_connection_status_flags.h"
51 #include "net/ssl/ssl_failure_state.h" 51 #include "net/ssl/ssl_failure_state.h"
52 52
53 #if defined(ENABLE_BIDIRECTIONAL_STREAM)
54 #include "net/http/bidirectional_stream_job.h"
55 #include "net/spdy/bidirectional_stream_spdy_job.h"
56 #endif
57
53 namespace net { 58 namespace net {
54 59
55 // Returns parameters associated with the start of a HTTP stream job. 60 // Returns parameters associated with the start of a HTTP stream job.
56 scoped_ptr<base::Value> NetLogHttpStreamJobCallback( 61 scoped_ptr<base::Value> NetLogHttpStreamJobCallback(
57 const NetLog::Source& source, 62 const NetLog::Source& source,
58 const GURL* original_url, 63 const GURL* original_url,
59 const GURL* url, 64 const GURL* url,
60 const AlternativeService* alternative_service, 65 const AlternativeService* alternative_service,
61 RequestPriority priority, 66 RequestPriority priority,
62 NetLogCaptureMode /* capture_mode */) { 67 NetLogCaptureMode /* capture_mode */) {
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
130 quic_request_(session_->quic_stream_factory()), 135 quic_request_(session_->quic_stream_factory()),
131 using_existing_quic_session_(false), 136 using_existing_quic_session_(false),
132 spdy_certificate_error_(OK), 137 spdy_certificate_error_(OK),
133 establishing_tunnel_(false), 138 establishing_tunnel_(false),
134 was_npn_negotiated_(false), 139 was_npn_negotiated_(false),
135 protocol_negotiated_(kProtoUnknown), 140 protocol_negotiated_(kProtoUnknown),
136 num_streams_(0), 141 num_streams_(0),
137 spdy_session_direct_(false), 142 spdy_session_direct_(false),
138 job_status_(STATUS_RUNNING), 143 job_status_(STATUS_RUNNING),
139 other_job_status_(STATUS_RUNNING), 144 other_job_status_(STATUS_RUNNING),
145 for_bidirectional_(false),
140 ptr_factory_(this) { 146 ptr_factory_(this) {
141 DCHECK(stream_factory); 147 DCHECK(stream_factory);
142 DCHECK(session); 148 DCHECK(session);
143 if (IsQuicAlternative()) { 149 if (IsQuicAlternative()) {
144 DCHECK(session_->params().enable_quic); 150 DCHECK(session_->params().enable_quic);
145 using_quic_ = true; 151 using_quic_ = true;
146 } 152 }
147 } 153 }
148 154
149 HttpStreamFactoryImpl::Job::~Job() { 155 HttpStreamFactoryImpl::Job::~Job() {
(...skipping 11 matching lines...) Expand all
161 session_->proxy_service()->CancelPacRequest(pac_request_); 167 session_->proxy_service()->CancelPacRequest(pac_request_);
162 168
163 // The stream could be in a partial state. It is not reusable. 169 // The stream could be in a partial state. It is not reusable.
164 if (stream_.get() && next_state_ != STATE_DONE) 170 if (stream_.get() && next_state_ != STATE_DONE)
165 stream_->Close(true /* not reusable */); 171 stream_->Close(true /* not reusable */);
166 } 172 }
167 173
168 void HttpStreamFactoryImpl::Job::Start(Request* request) { 174 void HttpStreamFactoryImpl::Job::Start(Request* request) {
169 DCHECK(request); 175 DCHECK(request);
170 request_ = request; 176 request_ = request;
177 // Saves |for_bidirectional_|, since request is nulled when job is orphaned.
178 for_bidirectional_ = request_->for_bidirectional();
171 StartInternal(); 179 StartInternal();
172 } 180 }
173 181
174 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) { 182 int HttpStreamFactoryImpl::Job::Preconnect(int num_streams) {
175 DCHECK_GT(num_streams, 0); 183 DCHECK_GT(num_streams, 0);
176 base::WeakPtr<HttpServerProperties> http_server_properties = 184 base::WeakPtr<HttpServerProperties> http_server_properties =
177 session_->http_server_properties(); 185 session_->http_server_properties();
178 if (http_server_properties && 186 if (http_server_properties &&
179 http_server_properties->SupportsRequestPriority( 187 http_server_properties->SupportsRequestPriority(
180 HostPortPair::FromURL(request_info_.url))) { 188 HostPortPair::FromURL(request_info_.url))) {
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 359 MaybeCopyConnectionAttemptsFromSocketOrHandle();
352 360
353 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy()); 361 request_->Complete(was_npn_negotiated(), protocol_negotiated(), using_spdy());
354 request_->OnWebSocketHandshakeStreamReady(this, 362 request_->OnWebSocketHandshakeStreamReady(this,
355 server_ssl_config_, 363 server_ssl_config_,
356 proxy_info_, 364 proxy_info_,
357 websocket_stream_.release()); 365 websocket_stream_.release());
358 // |this| may be deleted after this call. 366 // |this| may be deleted after this call.
359 } 367 }
360 368
369 void HttpStreamFactoryImpl::Job::OnBidirectionalStreamJobReadyCallback() {
370 #if defined(ENABLE_BIDIRECTIONAL_STREAM)
371 DCHECK(bidirectional_stream_job_);
372
373 MaybeCopyConnectionAttemptsFromSocketOrHandle();
374
375 if (IsOrphaned()) {
376 stream_factory_->OnOrphanedJobComplete(this);
377 } else {
378 request_->Complete(was_npn_negotiated(), protocol_negotiated(),
379 using_spdy());
380 request_->OnBidirectionalStreamJobReady(
381 this, server_ssl_config_, proxy_info_,
382 bidirectional_stream_job_.release());
383 }
384 // |this| may be deleted after this call.
385 #else
386 DCHECK(false);
387 #endif
388 }
389
361 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() { 390 void HttpStreamFactoryImpl::Job::OnNewSpdySessionReadyCallback() {
391 #if defined(ENABLE_BIDIRECTIONAL_STREAM)
392 DCHECK(stream_.get() || bidirectional_stream_job_.get());
393 #else
362 DCHECK(stream_.get()); 394 DCHECK(stream_.get());
395 #endif
363 DCHECK(!IsPreconnecting()); 396 DCHECK(!IsPreconnecting());
364 DCHECK(using_spdy()); 397 DCHECK(using_spdy());
365 // Note: an event loop iteration has passed, so |new_spdy_session_| may be 398 // Note: an event loop iteration has passed, so |new_spdy_session_| may be
366 // NULL at this point if the SpdySession closed immediately after creation. 399 // NULL at this point if the SpdySession closed immediately after creation.
367 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_; 400 base::WeakPtr<SpdySession> spdy_session = new_spdy_session_;
368 new_spdy_session_.reset(); 401 new_spdy_session_.reset();
369 402
370 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 403 MaybeCopyConnectionAttemptsFromSocketOrHandle();
371 404
372 // TODO(jgraettinger): Notify the factory, and let that notify |request_|, 405 // TODO(jgraettinger): Notify the factory, and let that notify |request_|,
373 // rather than notifying |request_| directly. 406 // rather than notifying |request_| directly.
374 if (IsOrphaned()) { 407 if (IsOrphaned()) {
375 if (spdy_session) { 408 if (spdy_session) {
376 stream_factory_->OnNewSpdySessionReady( 409 stream_factory_->OnNewSpdySessionReady(
377 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_, 410 spdy_session, spdy_session_direct_, server_ssl_config_, proxy_info_,
378 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_); 411 was_npn_negotiated(), protocol_negotiated(), using_spdy(), net_log_);
379 } 412 }
380 stream_factory_->OnOrphanedJobComplete(this); 413 stream_factory_->OnOrphanedJobComplete(this);
381 } else { 414 } else {
382 request_->OnNewSpdySessionReady( 415 if (for_bidirectional_) {
383 this, stream_.Pass(), spdy_session, spdy_session_direct_); 416 #if defined(ENABLE_BIDIRECTIONAL_STREAM)
417 DCHECK(bidirectional_stream_job_);
418 request_->OnNewSpdySessionReady(this, /*spdy_http_stream=*/nullptr,
419 std::move(bidirectional_stream_job_),
420 spdy_session, spdy_session_direct_);
421 #else
422 DCHECK(false);
423 #endif
424
425 } else {
426 DCHECK(stream_);
427 request_->OnNewSpdySessionReady(this, std::move(stream_),
428 /** bidirectional_stream_job=*/nullptr,
429 spdy_session, spdy_session_direct_);
430 }
384 } 431 }
385 // |this| may be deleted after this call. 432 // |this| may be deleted after this call.
386 } 433 }
387 434
388 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) { 435 void HttpStreamFactoryImpl::Job::OnStreamFailedCallback(int result) {
389 DCHECK(!IsPreconnecting()); 436 DCHECK(!IsPreconnecting());
390 437
391 MaybeCopyConnectionAttemptsFromSocketOrHandle(); 438 MaybeCopyConnectionAttemptsFromSocketOrHandle();
392 439
393 if (IsOrphaned()) { 440 if (IsOrphaned()) {
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 next_state_ = STATE_DONE; 607 next_state_ = STATE_DONE;
561 if (new_spdy_session_.get()) { 608 if (new_spdy_session_.get()) {
562 base::ThreadTaskRunnerHandle::Get()->PostTask( 609 base::ThreadTaskRunnerHandle::Get()->PostTask(
563 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback, 610 FROM_HERE, base::Bind(&Job::OnNewSpdySessionReadyCallback,
564 ptr_factory_.GetWeakPtr())); 611 ptr_factory_.GetWeakPtr()));
565 } else if (stream_factory_->for_websockets_) { 612 } else if (stream_factory_->for_websockets_) {
566 DCHECK(websocket_stream_); 613 DCHECK(websocket_stream_);
567 base::ThreadTaskRunnerHandle::Get()->PostTask( 614 base::ThreadTaskRunnerHandle::Get()->PostTask(
568 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback, 615 FROM_HERE, base::Bind(&Job::OnWebSocketHandshakeStreamReadyCallback,
569 ptr_factory_.GetWeakPtr())); 616 ptr_factory_.GetWeakPtr()));
617 } else if (for_bidirectional_) {
618 #if defined(ENABLE_BIDIRECTIONAL_STREAM)
619 if (!bidirectional_stream_job_) {
620 base::ThreadTaskRunnerHandle::Get()->PostTask(
621 FROM_HERE, base::Bind(&Job::OnStreamFailedCallback,
622 ptr_factory_.GetWeakPtr(), ERR_FAILED));
623 } else {
624 base::ThreadTaskRunnerHandle::Get()->PostTask(
625 FROM_HERE, base::Bind(&Job::OnBidirectionalStreamJobReadyCallback,
626 ptr_factory_.GetWeakPtr()));
627 }
628 #else
629 DCHECK(false);
630 #endif
570 } else { 631 } else {
571 DCHECK(stream_.get()); 632 DCHECK(stream_.get());
572 base::ThreadTaskRunnerHandle::Get()->PostTask( 633 base::ThreadTaskRunnerHandle::Get()->PostTask(
573 FROM_HERE, 634 FROM_HERE,
574 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr())); 635 base::Bind(&Job::OnStreamReadyCallback, ptr_factory_.GetWeakPtr()));
575 } 636 }
576 return ERR_IO_PENDING; 637 return ERR_IO_PENDING;
577 638
578 default: 639 default:
579 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN || 640 DCHECK(result != ERR_ALTERNATIVE_CERT_NOT_VALID_FOR_ORIGIN ||
(...skipping 557 matching lines...) Expand 10 before | Expand all | Expand 10 after
1137 1198
1138 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) { 1199 int HttpStreamFactoryImpl::Job::DoWaitingUserAction(int result) {
1139 // This state indicates that the stream request is in a partially 1200 // This state indicates that the stream request is in a partially
1140 // completed state, and we've called back to the delegate for more 1201 // completed state, and we've called back to the delegate for more
1141 // information. 1202 // information.
1142 1203
1143 // We're always waiting here for the delegate to call us back. 1204 // We're always waiting here for the delegate to call us back.
1144 return ERR_IO_PENDING; 1205 return ERR_IO_PENDING;
1145 } 1206 }
1146 1207
1147 int HttpStreamFactoryImpl::Job::SetSpdyHttpStream( 1208 int HttpStreamFactoryImpl::Job::SetSpdyHttpStreamOrBidirectionalStreamJob(
1148 base::WeakPtr<SpdySession> session, bool direct) { 1209 base::WeakPtr<SpdySession> session,
1210 bool direct) {
1149 // TODO(ricea): Restore the code for WebSockets over SPDY once it's 1211 // TODO(ricea): Restore the code for WebSockets over SPDY once it's
1150 // implemented. 1212 // implemented.
1151 if (stream_factory_->for_websockets_) 1213 if (stream_factory_->for_websockets_)
1152 return ERR_NOT_IMPLEMENTED; 1214 return ERR_NOT_IMPLEMENTED;
1215 if (for_bidirectional_) {
1216 #if defined(ENABLE_BIDIRECTIONAL_STREAM)
1217 // TODO(xunjieli): Create QUIC's version of BidirectionalStreamJob.
1218 bidirectional_stream_job_.reset(new BidirectionalStreamSpdyJob(session));
1219 return OK;
1220 #else
1221 DCHECK(false);
1222 return ERR_FAILED;
1223 #endif
1224 }
1153 1225
1154 // TODO(willchan): Delete this code, because eventually, the 1226 // TODO(willchan): Delete this code, because eventually, the
1155 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it 1227 // HttpStreamFactoryImpl will be creating all the SpdyHttpStreams, since it
1156 // will know when SpdySessions become available. 1228 // will know when SpdySessions become available.
1157 1229
1158 bool use_relative_url = direct || request_info_.url.SchemeIs("https"); 1230 bool use_relative_url = direct || request_info_.url.SchemeIs("https");
1159 stream_.reset(new SpdyHttpStream(session, use_relative_url)); 1231 stream_.reset(new SpdyHttpStream(session, use_relative_url));
1160 return OK; 1232 return OK;
1161 } 1233 }
1162 1234
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
1196 1268
1197 CHECK(!stream_.get()); 1269 CHECK(!stream_.get());
1198 1270
1199 bool direct = !IsHttpsProxyAndHttpUrl(); 1271 bool direct = !IsHttpsProxyAndHttpUrl();
1200 if (existing_spdy_session_.get()) { 1272 if (existing_spdy_session_.get()) {
1201 // We picked up an existing session, so we don't need our socket. 1273 // We picked up an existing session, so we don't need our socket.
1202 if (connection_->socket()) 1274 if (connection_->socket())
1203 connection_->socket()->Disconnect(); 1275 connection_->socket()->Disconnect();
1204 connection_->Reset(); 1276 connection_->Reset();
1205 1277
1206 int set_result = SetSpdyHttpStream(existing_spdy_session_, direct); 1278 int set_result = SetSpdyHttpStreamOrBidirectionalStreamJob(
1279 existing_spdy_session_, direct);
1207 existing_spdy_session_.reset(); 1280 existing_spdy_session_.reset();
1208 return set_result; 1281 return set_result;
1209 } 1282 }
1210 1283
1211 SpdySessionKey spdy_session_key = GetSpdySessionKey(); 1284 SpdySessionKey spdy_session_key = GetSpdySessionKey();
1212 base::WeakPtr<SpdySession> spdy_session; 1285 base::WeakPtr<SpdySession> spdy_session;
1213 int result = valid_spdy_session_pool_->FindAvailableSession( 1286 int result = valid_spdy_session_pool_->FindAvailableSession(
1214 spdy_session_key, net_log_, &spdy_session); 1287 spdy_session_key, net_log_, &spdy_session);
1215 if (result != OK) { 1288 if (result != OK) {
1216 return result; 1289 return result;
1217 } 1290 }
1218 if (spdy_session) { 1291 if (spdy_session) {
1219 return SetSpdyHttpStream(spdy_session, direct); 1292 return SetSpdyHttpStreamOrBidirectionalStreamJob(spdy_session, direct);
1220 } 1293 }
1221 1294
1222 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket( 1295 result = valid_spdy_session_pool_->CreateAvailableSessionFromSocket(
1223 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_, 1296 spdy_session_key, connection_.Pass(), net_log_, spdy_certificate_error_,
1224 using_ssl_, &spdy_session); 1297 using_ssl_, &spdy_session);
1225 if (result != OK) { 1298 if (result != OK) {
1226 return result; 1299 return result;
1227 } 1300 }
1228 1301
1229 if (!spdy_session->HasAcceptableTransportSecurity()) { 1302 if (!spdy_session->HasAcceptableTransportSecurity()) {
(...skipping 14 matching lines...) Expand all
1244 } 1317 }
1245 1318
1246 new_spdy_session_ = spdy_session; 1319 new_spdy_session_ = spdy_session;
1247 spdy_session_direct_ = direct; 1320 spdy_session_direct_ = direct;
1248 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair(); 1321 const HostPortPair& host_port_pair = spdy_session_key.host_port_pair();
1249 base::WeakPtr<HttpServerProperties> http_server_properties = 1322 base::WeakPtr<HttpServerProperties> http_server_properties =
1250 session_->http_server_properties(); 1323 session_->http_server_properties();
1251 if (http_server_properties) 1324 if (http_server_properties)
1252 http_server_properties->SetSupportsSpdy(host_port_pair, true); 1325 http_server_properties->SetSupportsSpdy(host_port_pair, true);
1253 1326
1254 // Create a SpdyHttpStream attached to the session; 1327 // Create a SpdyHttpStream or a BidirectionalStreamJob attached to the
1255 // OnNewSpdySessionReadyCallback is not called until an event loop 1328 // session; OnNewSpdySessionReadyCallback is not called until an event loop
1256 // iteration later, so if the SpdySession is closed between then, allow 1329 // iteration later, so if the SpdySession is closed between then, allow
1257 // reuse state from the underlying socket, sampled by SpdyHttpStream, 1330 // reuse state from the underlying socket, sampled by SpdyHttpStream,
1258 // bubble up to the request. 1331 // bubble up to the request.
1259 return SetSpdyHttpStream(new_spdy_session_, spdy_session_direct_); 1332 return SetSpdyHttpStreamOrBidirectionalStreamJob(new_spdy_session_,
1333 spdy_session_direct_);
1260 } 1334 }
1261 1335
1262 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) { 1336 int HttpStreamFactoryImpl::Job::DoCreateStreamComplete(int result) {
1263 if (result < 0) 1337 if (result < 0)
1264 return result; 1338 return result;
1265 1339
1266 session_->proxy_service()->ReportSuccess(proxy_info_, 1340 session_->proxy_service()->ReportSuccess(proxy_info_,
1267 session_->network_delegate()); 1341 session_->network_delegate());
1268 next_state_ = STATE_NONE; 1342 next_state_ = STATE_NONE;
1269 return OK; 1343 return OK;
(...skipping 345 matching lines...) Expand 10 before | Expand all | Expand 10 after
1615 if (connection_->socket()) { 1689 if (connection_->socket()) {
1616 ConnectionAttempts socket_attempts; 1690 ConnectionAttempts socket_attempts;
1617 connection_->socket()->GetConnectionAttempts(&socket_attempts); 1691 connection_->socket()->GetConnectionAttempts(&socket_attempts);
1618 request_->AddConnectionAttempts(socket_attempts); 1692 request_->AddConnectionAttempts(socket_attempts);
1619 } else { 1693 } else {
1620 request_->AddConnectionAttempts(connection_->connection_attempts()); 1694 request_->AddConnectionAttempts(connection_->connection_attempts());
1621 } 1695 }
1622 } 1696 }
1623 1697
1624 } // namespace net 1698 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698