Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(238)

Side by Side Diff: net/socket/websocket_transport_client_socket_pool.cc

Issue 2328453002: Refactor WebSocketTransportClientSocketPool's socket handing out code (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/socket/websocket_transport_client_socket_pool.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/socket/websocket_transport_client_socket_pool.h" 5 #include "net/socket/websocket_transport_client_socket_pool.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
(...skipping 341 matching lines...) Expand 10 before | Expand all | Expand 10 after
352 StalledRequestMap::value_type(handle, iterator)); 352 StalledRequestMap::value_type(handle, iterator));
353 return ERR_IO_PENDING; 353 return ERR_IO_PENDING;
354 } 354 }
355 355
356 std::unique_ptr<WebSocketTransportConnectJob> connect_job( 356 std::unique_ptr<WebSocketTransportConnectJob> connect_job(
357 new WebSocketTransportConnectJob( 357 new WebSocketTransportConnectJob(
358 group_name, priority, respect_limits, casted_params, 358 group_name, priority, respect_limits, casted_params,
359 ConnectionTimeout(), callback, client_socket_factory_, host_resolver_, 359 ConnectionTimeout(), callback, client_socket_factory_, host_resolver_,
360 handle, &connect_job_delegate_, pool_net_log_, request_net_log)); 360 handle, &connect_job_delegate_, pool_net_log_, request_net_log));
361 361
362 int rv = connect_job->Connect(); 362 int result = connect_job->Connect();
363
363 // Regardless of the outcome of |connect_job|, it will always be bound to 364 // Regardless of the outcome of |connect_job|, it will always be bound to
364 // |handle|, since this pool uses early-binding. So the binding is logged 365 // |handle|, since this pool uses early-binding. So the binding is logged
365 // here, without waiting for the result. 366 // here, without waiting for the result.
366 request_net_log.AddEvent( 367 request_net_log.AddEvent(
367 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB, 368 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
368 connect_job->net_log().source().ToEventParametersCallback()); 369 connect_job->net_log().source().ToEventParametersCallback());
369 if (rv == OK) { 370
370 HandOutSocket(connect_job->PassSocket(), 371 if (result == ERR_IO_PENDING) {
371 connect_job->connect_timing(),
372 handle,
373 request_net_log);
374 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
375 } else if (rv == ERR_IO_PENDING) {
376 // TODO(ricea): Implement backup job timer? 372 // TODO(ricea): Implement backup job timer?
377 AddJob(handle, std::move(connect_job)); 373 AddJob(handle, std::move(connect_job));
378 } else { 374 } else {
379 std::unique_ptr<StreamSocket> error_socket; 375 TryHandOutSocket(result, connect_job.get());
380 connect_job->GetAdditionalErrorState(handle);
381 error_socket = connect_job->PassSocket();
382 if (error_socket) {
383 HandOutSocket(std::move(error_socket), connect_job->connect_timing(),
384 handle, request_net_log);
385 }
386 } 376 }
387 377
388 if (rv != ERR_IO_PENDING) { 378 return result;
389 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
390 }
391
392 return rv;
393 } 379 }
394 380
395 void WebSocketTransportClientSocketPool::RequestSockets( 381 void WebSocketTransportClientSocketPool::RequestSockets(
396 const std::string& group_name, 382 const std::string& group_name,
397 const void* params, 383 const void* params,
398 int num_sockets, 384 int num_sockets,
399 const BoundNetLog& net_log) { 385 const BoundNetLog& net_log) {
400 NOTIMPLEMENTED(); 386 NOTIMPLEMENTED();
401 } 387 }
402 388
403 void WebSocketTransportClientSocketPool::CancelRequest( 389 void WebSocketTransportClientSocketPool::CancelRequest(
404 const std::string& group_name, 390 const std::string& group_name,
405 ClientSocketHandle* handle) { 391 ClientSocketHandle* handle) {
406 DCHECK(!handle->is_initialized()); 392 DCHECK(!handle->is_initialized());
407 if (DeleteStalledRequest(handle)) 393 if (DeleteStalledRequest(handle))
408 return; 394 return;
409 std::unique_ptr<StreamSocket> socket = handle->PassSocket(); 395 std::unique_ptr<StreamSocket> socket = handle->PassSocket();
410 if (socket) 396 if (socket)
411 ReleaseSocket(handle->group_name(), std::move(socket), handle->id()); 397 ReleaseSocket(handle->group_name(), std::move(socket), handle->id());
412 if (!DeleteJob(handle)) 398 if (!DeleteJob(handle))
413 pending_callbacks_.erase(handle); 399 pending_callbacks_.erase(handle);
414 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty()) 400
415 ActivateStalledRequest(); 401 ActivateStalledRequest();
416 } 402 }
417 403
418 void WebSocketTransportClientSocketPool::ReleaseSocket( 404 void WebSocketTransportClientSocketPool::ReleaseSocket(
419 const std::string& group_name, 405 const std::string& group_name,
420 std::unique_ptr<StreamSocket> socket, 406 std::unique_ptr<StreamSocket> socket,
421 int id) { 407 int id) {
422 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get()); 408 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
423 CHECK_GT(handed_out_socket_count_, 0); 409 CHECK_GT(handed_out_socket_count_, 0);
424 --handed_out_socket_count_; 410 --handed_out_socket_count_;
425 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty()) 411
426 ActivateStalledRequest(); 412 ActivateStalledRequest();
427 } 413 }
428 414
429 void WebSocketTransportClientSocketPool::FlushWithError(int error) { 415 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
416 DCHECK_NE(error, OK);
417
430 // Sockets which are in LOAD_STATE_CONNECTING are in danger of unlocking 418 // Sockets which are in LOAD_STATE_CONNECTING are in danger of unlocking
431 // sockets waiting for the endpoint lock. If they connected synchronously, 419 // sockets waiting for the endpoint lock. If they connected synchronously,
432 // then OnConnectJobComplete(). The |flushing_| flag tells this object to 420 // then OnConnectJobComplete(). The |flushing_| flag tells this object to
433 // ignore spurious calls to OnConnectJobComplete(). It is safe to ignore those 421 // ignore spurious calls to OnConnectJobComplete(). It is safe to ignore those
434 // calls because this method will delete the jobs and call their callbacks 422 // calls because this method will delete the jobs and call their callbacks
435 // anyway. 423 // anyway.
436 flushing_ = true; 424 flushing_ = true;
437 for (PendingConnectsMap::iterator it = pending_connects_.begin(); 425 for (PendingConnectsMap::iterator it = pending_connects_.begin();
438 it != pending_connects_.end(); 426 it != pending_connects_.end();
439 ++it) { 427 ++it) {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
496 // TODO(ricea): For now, we implement a global timeout for compatibility with 484 // TODO(ricea): For now, we implement a global timeout for compatibility with
497 // TransportConnectJob. Since WebSocketTransportConnectJob controls the 485 // TransportConnectJob. Since WebSocketTransportConnectJob controls the
498 // address selection process more tightly, it could do something smarter here. 486 // address selection process more tightly, it could do something smarter here.
499 return base::TimeDelta::FromSeconds(TransportConnectJob::kTimeoutInSeconds); 487 return base::TimeDelta::FromSeconds(TransportConnectJob::kTimeoutInSeconds);
500 } 488 }
501 489
502 bool WebSocketTransportClientSocketPool::IsStalled() const { 490 bool WebSocketTransportClientSocketPool::IsStalled() const {
503 return !stalled_request_queue_.empty(); 491 return !stalled_request_queue_.empty();
504 } 492 }
505 493
494 bool WebSocketTransportClientSocketPool::TryHandOutSocket(
495 int result,
496 WebSocketTransportConnectJob* job) {
497 DCHECK_NE(result, ERR_IO_PENDING);
498
499 std::unique_ptr<StreamSocket> socket = job->PassSocket();
500 ClientSocketHandle* const handle = job->handle();
501 BoundNetLog request_net_log = job->request_net_log();
502 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
503
504 if (result == OK) {
505 DCHECK(socket.get());
yhirano 2016/09/12 10:29:33 .get() is not needed.
tyoshino (SeeGerritForStatus) 2016/09/13 02:21:01 Done
506
507 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
508
509 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
510
511 return true;
512 }
513
514 bool handed_out_socket = false;
515
516 // If we got a socket, it must contain error information so pass that
517 // up so that the caller can retrieve it.
518 job->GetAdditionalErrorState(handle);
519 if (socket.get()) {
yhirano 2016/09/12 10:29:33 .get() is not needed.
tyoshino (SeeGerritForStatus) 2016/09/13 02:21:01 Done
520 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
521 handed_out_socket = true;
522 }
523
524 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
525
526 return handed_out_socket;
527 }
528
506 void WebSocketTransportClientSocketPool::OnConnectJobComplete( 529 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
507 int result, 530 int result,
508 WebSocketTransportConnectJob* job) { 531 WebSocketTransportConnectJob* job) {
509 DCHECK_NE(ERR_IO_PENDING, result); 532 DCHECK_NE(ERR_IO_PENDING, result);
510 533
511 std::unique_ptr<StreamSocket> socket = job->PassSocket();
512
513 // See comment in FlushWithError. 534 // See comment in FlushWithError.
514 if (flushing_) { 535 if (flushing_) {
536 std::unique_ptr<StreamSocket> socket = job->PassSocket();
515 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get()); 537 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
516 return; 538 return;
517 } 539 }
518 540
519 BoundNetLog request_net_log = job->request_net_log(); 541 bool handed_out_socket = TryHandOutSocket(result, job);
520 CompletionCallback callback = job->callback();
521 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
522 542
523 ClientSocketHandle* const handle = job->handle(); 543 ClientSocketHandle* const handle = job->handle();
524 bool handed_out_socket = false;
525 544
526 if (result == OK) {
527 DCHECK(socket.get());
528 handed_out_socket = true;
529 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
530 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
531 } else {
532 // If we got a socket, it must contain error information so pass that
533 // up so that the caller can retrieve it.
534 job->GetAdditionalErrorState(handle);
535 if (socket.get()) {
536 handed_out_socket = true;
537 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
538 }
539 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
540 }
541 bool delete_succeeded = DeleteJob(handle); 545 bool delete_succeeded = DeleteJob(handle);
542 DCHECK(delete_succeeded); 546 DCHECK(delete_succeeded);
543 if (!handed_out_socket && !stalled_request_queue_.empty() && 547
544 !ReachedMaxSocketsLimit()) 548 if (!handed_out_socket)
545 ActivateStalledRequest(); 549 ActivateStalledRequest();
546 InvokeUserCallbackLater(handle, callback, result); 550
551 InvokeUserCallbackLater(handle, job->callback(), result);
547 } 552 }
548 553
549 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater( 554 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
550 ClientSocketHandle* handle, 555 ClientSocketHandle* handle,
551 const CompletionCallback& callback, 556 const CompletionCallback& callback,
552 int rv) { 557 int rv) {
553 DCHECK(!pending_callbacks_.count(handle)); 558 DCHECK(!pending_callbacks_.count(handle));
554 pending_callbacks_.insert(handle); 559 pending_callbacks_.insert(handle);
555 base::ThreadTaskRunnerHandle::Get()->PostTask( 560 base::ThreadTaskRunnerHandle::Get()->PostTask(
556 FROM_HERE, 561 FROM_HERE,
(...skipping 14 matching lines...) Expand all
571 base::checked_cast<int>(pending_connects_.size()) >= 576 base::checked_cast<int>(pending_connects_.size()) >=
572 max_sockets_ - handed_out_socket_count_; 577 max_sockets_ - handed_out_socket_count_;
573 } 578 }
574 579
575 void WebSocketTransportClientSocketPool::HandOutSocket( 580 void WebSocketTransportClientSocketPool::HandOutSocket(
576 std::unique_ptr<StreamSocket> socket, 581 std::unique_ptr<StreamSocket> socket,
577 const LoadTimingInfo::ConnectTiming& connect_timing, 582 const LoadTimingInfo::ConnectTiming& connect_timing,
578 ClientSocketHandle* handle, 583 ClientSocketHandle* handle,
579 const BoundNetLog& net_log) { 584 const BoundNetLog& net_log) {
580 DCHECK(socket); 585 DCHECK(socket);
581 handle->SetSocket(std::move(socket));
582 DCHECK_EQ(ClientSocketHandle::UNUSED, handle->reuse_type()); 586 DCHECK_EQ(ClientSocketHandle::UNUSED, handle->reuse_type());
583 DCHECK_EQ(0, handle->idle_time().InMicroseconds()); 587 DCHECK_EQ(0, handle->idle_time().InMicroseconds());
588
589 handle->SetSocket(std::move(socket));
584 handle->set_pool_id(0); 590 handle->set_pool_id(0);
585 handle->set_connect_timing(connect_timing); 591 handle->set_connect_timing(connect_timing);
586 592
587 net_log.AddEvent( 593 net_log.AddEvent(
588 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET, 594 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
589 handle->socket()->NetLog().source().ToEventParametersCallback()); 595 handle->socket()->NetLog().source().ToEventParametersCallback());
590 596
591 ++handed_out_socket_count_; 597 ++handed_out_socket_count_;
592 } 598 }
593 599
(...skipping 23 matching lines...) Expand all
617 623
618 const WebSocketTransportConnectJob* 624 const WebSocketTransportConnectJob*
619 WebSocketTransportClientSocketPool::LookupConnectJob( 625 WebSocketTransportClientSocketPool::LookupConnectJob(
620 const ClientSocketHandle* handle) const { 626 const ClientSocketHandle* handle) const {
621 PendingConnectsMap::const_iterator it = pending_connects_.find(handle); 627 PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
622 CHECK(it != pending_connects_.end()); 628 CHECK(it != pending_connects_.end());
623 return it->second; 629 return it->second;
624 } 630 }
625 631
626 void WebSocketTransportClientSocketPool::ActivateStalledRequest() { 632 void WebSocketTransportClientSocketPool::ActivateStalledRequest() {
627 DCHECK(!stalled_request_queue_.empty());
628 DCHECK(!ReachedMaxSocketsLimit());
629 // Usually we will only be able to activate one stalled request at a time, 633 // Usually we will only be able to activate one stalled request at a time,
630 // however if all the connects fail synchronously for some reason, we may be 634 // however if all the connects fail synchronously for some reason, we may be
631 // able to clear the whole queue at once. 635 // able to clear the whole queue at once.
632 while (!stalled_request_queue_.empty() && !ReachedMaxSocketsLimit()) { 636 while (!stalled_request_queue_.empty() && !ReachedMaxSocketsLimit()) {
633 StalledRequest request(stalled_request_queue_.front()); 637 StalledRequest request(stalled_request_queue_.front());
634 stalled_request_queue_.pop_front(); 638 stalled_request_queue_.pop_front();
635 stalled_request_map_.erase(request.handle); 639 stalled_request_map_.erase(request.handle);
640
636 int rv = RequestSocket("ignored", &request.params, request.priority, 641 int rv = RequestSocket("ignored", &request.params, request.priority,
637 // Stalled requests can't have |respect_limits| 642 // Stalled requests can't have |respect_limits|
638 // DISABLED. 643 // DISABLED.
639 RespectLimits::ENABLED, request.handle, 644 RespectLimits::ENABLED, request.handle,
640 request.callback, request.net_log); 645 request.callback, request.net_log);
646
641 // ActivateStalledRequest() never returns synchronously, so it is never 647 // ActivateStalledRequest() never returns synchronously, so it is never
642 // called re-entrantly. 648 // called re-entrantly.
643 if (rv != ERR_IO_PENDING) 649 if (rv != ERR_IO_PENDING)
644 InvokeUserCallbackLater(request.handle, request.callback, rv); 650 InvokeUserCallbackLater(request.handle, request.callback, rv);
645 } 651 }
646 } 652 }
647 653
648 bool WebSocketTransportClientSocketPool::DeleteStalledRequest( 654 bool WebSocketTransportClientSocketPool::DeleteStalledRequest(
649 ClientSocketHandle* handle) { 655 ClientSocketHandle* handle) {
650 StalledRequestMap::iterator it = stalled_request_map_.find(handle); 656 StalledRequestMap::iterator it = stalled_request_map_.find(handle);
(...skipping 29 matching lines...) Expand all
680 handle(handle), 686 handle(handle),
681 callback(callback), 687 callback(callback),
682 net_log(net_log) {} 688 net_log(net_log) {}
683 689
684 WebSocketTransportClientSocketPool::StalledRequest::StalledRequest( 690 WebSocketTransportClientSocketPool::StalledRequest::StalledRequest(
685 const StalledRequest& other) = default; 691 const StalledRequest& other) = default;
686 692
687 WebSocketTransportClientSocketPool::StalledRequest::~StalledRequest() {} 693 WebSocketTransportClientSocketPool::StalledRequest::~StalledRequest() {}
688 694
689 } // namespace net 695 } // namespace net
OLDNEW
« no previous file with comments | « net/socket/websocket_transport_client_socket_pool.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698