Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: net/socket/websocket_transport_client_socket_pool.cc

Issue 2328453002: Refactor WebSocketTransportClientSocketPool's socket handing out code (Closed)
Patch Set: Rebase Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/socket/websocket_transport_client_socket_pool.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/socket/websocket_transport_client_socket_pool.h" 5 #include "net/socket/websocket_transport_client_socket_pool.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/compiler_specific.h" 10 #include "base/compiler_specific.h"
(...skipping 344 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 StalledRequestMap::value_type(handle, iterator)); 355 StalledRequestMap::value_type(handle, iterator));
356 return ERR_IO_PENDING; 356 return ERR_IO_PENDING;
357 } 357 }
358 358
359 std::unique_ptr<WebSocketTransportConnectJob> connect_job( 359 std::unique_ptr<WebSocketTransportConnectJob> connect_job(
360 new WebSocketTransportConnectJob( 360 new WebSocketTransportConnectJob(
361 group_name, priority, respect_limits, casted_params, 361 group_name, priority, respect_limits, casted_params,
362 ConnectionTimeout(), callback, client_socket_factory_, host_resolver_, 362 ConnectionTimeout(), callback, client_socket_factory_, host_resolver_,
363 handle, &connect_job_delegate_, pool_net_log_, request_net_log)); 363 handle, &connect_job_delegate_, pool_net_log_, request_net_log));
364 364
365 int rv = connect_job->Connect(); 365 int result = connect_job->Connect();
366
366 // Regardless of the outcome of |connect_job|, it will always be bound to 367 // Regardless of the outcome of |connect_job|, it will always be bound to
367 // |handle|, since this pool uses early-binding. So the binding is logged 368 // |handle|, since this pool uses early-binding. So the binding is logged
368 // here, without waiting for the result. 369 // here, without waiting for the result.
369 request_net_log.AddEvent( 370 request_net_log.AddEvent(
370 NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB, 371 NetLogEventType::SOCKET_POOL_BOUND_TO_CONNECT_JOB,
371 connect_job->net_log().source().ToEventParametersCallback()); 372 connect_job->net_log().source().ToEventParametersCallback());
372 if (rv == OK) { 373
373 HandOutSocket(connect_job->PassSocket(), 374 if (result == ERR_IO_PENDING) {
374 connect_job->connect_timing(),
375 handle,
376 request_net_log);
377 request_net_log.EndEvent(NetLogEventType::SOCKET_POOL);
378 } else if (rv == ERR_IO_PENDING) {
379 // TODO(ricea): Implement backup job timer? 375 // TODO(ricea): Implement backup job timer?
380 AddJob(handle, std::move(connect_job)); 376 AddJob(handle, std::move(connect_job));
381 } else { 377 } else {
382 std::unique_ptr<StreamSocket> error_socket; 378 TryHandOutSocket(result, connect_job.get());
383 connect_job->GetAdditionalErrorState(handle);
384 error_socket = connect_job->PassSocket();
385 if (error_socket) {
386 HandOutSocket(std::move(error_socket), connect_job->connect_timing(),
387 handle, request_net_log);
388 }
389 } 379 }
390 380
391 if (rv != ERR_IO_PENDING) { 381 return result;
392 request_net_log.EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL, rv);
393 }
394
395 return rv;
396 } 382 }
397 383
398 void WebSocketTransportClientSocketPool::RequestSockets( 384 void WebSocketTransportClientSocketPool::RequestSockets(
399 const std::string& group_name, 385 const std::string& group_name,
400 const void* params, 386 const void* params,
401 int num_sockets, 387 int num_sockets,
402 const BoundNetLog& net_log) { 388 const BoundNetLog& net_log) {
403 NOTIMPLEMENTED(); 389 NOTIMPLEMENTED();
404 } 390 }
405 391
406 void WebSocketTransportClientSocketPool::CancelRequest( 392 void WebSocketTransportClientSocketPool::CancelRequest(
407 const std::string& group_name, 393 const std::string& group_name,
408 ClientSocketHandle* handle) { 394 ClientSocketHandle* handle) {
409 DCHECK(!handle->is_initialized()); 395 DCHECK(!handle->is_initialized());
410 if (DeleteStalledRequest(handle)) 396 if (DeleteStalledRequest(handle))
411 return; 397 return;
412 std::unique_ptr<StreamSocket> socket = handle->PassSocket(); 398 std::unique_ptr<StreamSocket> socket = handle->PassSocket();
413 if (socket) 399 if (socket)
414 ReleaseSocket(handle->group_name(), std::move(socket), handle->id()); 400 ReleaseSocket(handle->group_name(), std::move(socket), handle->id());
415 if (!DeleteJob(handle)) 401 if (!DeleteJob(handle))
416 pending_callbacks_.erase(handle); 402 pending_callbacks_.erase(handle);
417 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty()) 403
418 ActivateStalledRequest(); 404 ActivateStalledRequest();
419 } 405 }
420 406
421 void WebSocketTransportClientSocketPool::ReleaseSocket( 407 void WebSocketTransportClientSocketPool::ReleaseSocket(
422 const std::string& group_name, 408 const std::string& group_name,
423 std::unique_ptr<StreamSocket> socket, 409 std::unique_ptr<StreamSocket> socket,
424 int id) { 410 int id) {
425 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get()); 411 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
426 CHECK_GT(handed_out_socket_count_, 0); 412 CHECK_GT(handed_out_socket_count_, 0);
427 --handed_out_socket_count_; 413 --handed_out_socket_count_;
428 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty()) 414
429 ActivateStalledRequest(); 415 ActivateStalledRequest();
430 } 416 }
431 417
432 void WebSocketTransportClientSocketPool::FlushWithError(int error) { 418 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
419 DCHECK_NE(error, OK);
420
433 // Sockets which are in LOAD_STATE_CONNECTING are in danger of unlocking 421 // Sockets which are in LOAD_STATE_CONNECTING are in danger of unlocking
434 // sockets waiting for the endpoint lock. If they connected synchronously, 422 // sockets waiting for the endpoint lock. If they connected synchronously,
435 // then OnConnectJobComplete(). The |flushing_| flag tells this object to 423 // then OnConnectJobComplete(). The |flushing_| flag tells this object to
436 // ignore spurious calls to OnConnectJobComplete(). It is safe to ignore those 424 // ignore spurious calls to OnConnectJobComplete(). It is safe to ignore those
437 // calls because this method will delete the jobs and call their callbacks 425 // calls because this method will delete the jobs and call their callbacks
438 // anyway. 426 // anyway.
439 flushing_ = true; 427 flushing_ = true;
440 for (PendingConnectsMap::iterator it = pending_connects_.begin(); 428 for (PendingConnectsMap::iterator it = pending_connects_.begin();
441 it != pending_connects_.end(); 429 it != pending_connects_.end();
442 ++it) { 430 ++it) {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
499 // TODO(ricea): For now, we implement a global timeout for compatibility with 487 // TODO(ricea): For now, we implement a global timeout for compatibility with
500 // TransportConnectJob. Since WebSocketTransportConnectJob controls the 488 // TransportConnectJob. Since WebSocketTransportConnectJob controls the
501 // address selection process more tightly, it could do something smarter here. 489 // address selection process more tightly, it could do something smarter here.
502 return base::TimeDelta::FromSeconds(TransportConnectJob::kTimeoutInSeconds); 490 return base::TimeDelta::FromSeconds(TransportConnectJob::kTimeoutInSeconds);
503 } 491 }
504 492
505 bool WebSocketTransportClientSocketPool::IsStalled() const { 493 bool WebSocketTransportClientSocketPool::IsStalled() const {
506 return !stalled_request_queue_.empty(); 494 return !stalled_request_queue_.empty();
507 } 495 }
508 496
497 bool WebSocketTransportClientSocketPool::TryHandOutSocket(
498 int result,
499 WebSocketTransportConnectJob* job) {
500 DCHECK_NE(result, ERR_IO_PENDING);
501
502 std::unique_ptr<StreamSocket> socket = job->PassSocket();
503 ClientSocketHandle* const handle = job->handle();
504 BoundNetLog request_net_log = job->request_net_log();
505 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
506
507 if (result == OK) {
508 DCHECK(socket);
509
510 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
511
512 request_net_log.EndEvent(NetLogEventType::SOCKET_POOL);
513
514 return true;
515 }
516
517 bool handed_out_socket = false;
518
519 // If we got a socket, it must contain error information so pass that
520 // up so that the caller can retrieve it.
521 job->GetAdditionalErrorState(handle);
522 if (socket) {
523 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
524 handed_out_socket = true;
525 }
526
527 request_net_log.EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
528 result);
529
530 return handed_out_socket;
531 }
532
509 void WebSocketTransportClientSocketPool::OnConnectJobComplete( 533 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
510 int result, 534 int result,
511 WebSocketTransportConnectJob* job) { 535 WebSocketTransportConnectJob* job) {
512 DCHECK_NE(ERR_IO_PENDING, result); 536 DCHECK_NE(ERR_IO_PENDING, result);
513 537
514 std::unique_ptr<StreamSocket> socket = job->PassSocket();
515
516 // See comment in FlushWithError. 538 // See comment in FlushWithError.
517 if (flushing_) { 539 if (flushing_) {
540 std::unique_ptr<StreamSocket> socket = job->PassSocket();
518 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get()); 541 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
519 return; 542 return;
520 } 543 }
521 544
522 BoundNetLog request_net_log = job->request_net_log(); 545 bool handed_out_socket = TryHandOutSocket(result, job);
546
523 CompletionCallback callback = job->callback(); 547 CompletionCallback callback = job->callback();
524 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
525 548
526 ClientSocketHandle* const handle = job->handle(); 549 ClientSocketHandle* const handle = job->handle();
527 bool handed_out_socket = false;
528 550
529 if (result == OK) {
530 DCHECK(socket.get());
531 handed_out_socket = true;
532 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
533 request_net_log.EndEvent(NetLogEventType::SOCKET_POOL);
534 } else {
535 // If we got a socket, it must contain error information so pass that
536 // up so that the caller can retrieve it.
537 job->GetAdditionalErrorState(handle);
538 if (socket.get()) {
539 handed_out_socket = true;
540 HandOutSocket(std::move(socket), connect_timing, handle, request_net_log);
541 }
542 request_net_log.EndEventWithNetErrorCode(NetLogEventType::SOCKET_POOL,
543 result);
544 }
545 bool delete_succeeded = DeleteJob(handle); 551 bool delete_succeeded = DeleteJob(handle);
546 DCHECK(delete_succeeded); 552 DCHECK(delete_succeeded);
547 if (!handed_out_socket && !stalled_request_queue_.empty() && 553
548 !ReachedMaxSocketsLimit()) 554 job = NULL;
555
556 if (!handed_out_socket)
549 ActivateStalledRequest(); 557 ActivateStalledRequest();
558
550 InvokeUserCallbackLater(handle, callback, result); 559 InvokeUserCallbackLater(handle, callback, result);
551 } 560 }
552 561
553 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater( 562 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
554 ClientSocketHandle* handle, 563 ClientSocketHandle* handle,
555 const CompletionCallback& callback, 564 const CompletionCallback& callback,
556 int rv) { 565 int rv) {
557 DCHECK(!pending_callbacks_.count(handle)); 566 DCHECK(!pending_callbacks_.count(handle));
558 pending_callbacks_.insert(handle); 567 pending_callbacks_.insert(handle);
559 base::ThreadTaskRunnerHandle::Get()->PostTask( 568 base::ThreadTaskRunnerHandle::Get()->PostTask(
(...skipping 15 matching lines...) Expand all
575 base::checked_cast<int>(pending_connects_.size()) >= 584 base::checked_cast<int>(pending_connects_.size()) >=
576 max_sockets_ - handed_out_socket_count_; 585 max_sockets_ - handed_out_socket_count_;
577 } 586 }
578 587
579 void WebSocketTransportClientSocketPool::HandOutSocket( 588 void WebSocketTransportClientSocketPool::HandOutSocket(
580 std::unique_ptr<StreamSocket> socket, 589 std::unique_ptr<StreamSocket> socket,
581 const LoadTimingInfo::ConnectTiming& connect_timing, 590 const LoadTimingInfo::ConnectTiming& connect_timing,
582 ClientSocketHandle* handle, 591 ClientSocketHandle* handle,
583 const BoundNetLog& net_log) { 592 const BoundNetLog& net_log) {
584 DCHECK(socket); 593 DCHECK(socket);
585 handle->SetSocket(std::move(socket));
586 DCHECK_EQ(ClientSocketHandle::UNUSED, handle->reuse_type()); 594 DCHECK_EQ(ClientSocketHandle::UNUSED, handle->reuse_type());
587 DCHECK_EQ(0, handle->idle_time().InMicroseconds()); 595 DCHECK_EQ(0, handle->idle_time().InMicroseconds());
596
597 handle->SetSocket(std::move(socket));
588 handle->set_pool_id(0); 598 handle->set_pool_id(0);
589 handle->set_connect_timing(connect_timing); 599 handle->set_connect_timing(connect_timing);
590 600
591 net_log.AddEvent( 601 net_log.AddEvent(
592 NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET, 602 NetLogEventType::SOCKET_POOL_BOUND_TO_SOCKET,
593 handle->socket()->NetLog().source().ToEventParametersCallback()); 603 handle->socket()->NetLog().source().ToEventParametersCallback());
594 604
595 ++handed_out_socket_count_; 605 ++handed_out_socket_count_;
596 } 606 }
597 607
(...skipping 23 matching lines...) Expand all
621 631
622 const WebSocketTransportConnectJob* 632 const WebSocketTransportConnectJob*
623 WebSocketTransportClientSocketPool::LookupConnectJob( 633 WebSocketTransportClientSocketPool::LookupConnectJob(
624 const ClientSocketHandle* handle) const { 634 const ClientSocketHandle* handle) const {
625 PendingConnectsMap::const_iterator it = pending_connects_.find(handle); 635 PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
626 CHECK(it != pending_connects_.end()); 636 CHECK(it != pending_connects_.end());
627 return it->second; 637 return it->second;
628 } 638 }
629 639
630 void WebSocketTransportClientSocketPool::ActivateStalledRequest() { 640 void WebSocketTransportClientSocketPool::ActivateStalledRequest() {
631 DCHECK(!stalled_request_queue_.empty());
632 DCHECK(!ReachedMaxSocketsLimit());
633 // Usually we will only be able to activate one stalled request at a time, 641 // Usually we will only be able to activate one stalled request at a time,
634 // however if all the connects fail synchronously for some reason, we may be 642 // however if all the connects fail synchronously for some reason, we may be
635 // able to clear the whole queue at once. 643 // able to clear the whole queue at once.
636 while (!stalled_request_queue_.empty() && !ReachedMaxSocketsLimit()) { 644 while (!stalled_request_queue_.empty() && !ReachedMaxSocketsLimit()) {
637 StalledRequest request(stalled_request_queue_.front()); 645 StalledRequest request(stalled_request_queue_.front());
638 stalled_request_queue_.pop_front(); 646 stalled_request_queue_.pop_front();
639 stalled_request_map_.erase(request.handle); 647 stalled_request_map_.erase(request.handle);
648
640 int rv = RequestSocket("ignored", &request.params, request.priority, 649 int rv = RequestSocket("ignored", &request.params, request.priority,
641 // Stalled requests can't have |respect_limits| 650 // Stalled requests can't have |respect_limits|
642 // DISABLED. 651 // DISABLED.
643 RespectLimits::ENABLED, request.handle, 652 RespectLimits::ENABLED, request.handle,
644 request.callback, request.net_log); 653 request.callback, request.net_log);
654
645 // ActivateStalledRequest() never returns synchronously, so it is never 655 // ActivateStalledRequest() never returns synchronously, so it is never
646 // called re-entrantly. 656 // called re-entrantly.
647 if (rv != ERR_IO_PENDING) 657 if (rv != ERR_IO_PENDING)
648 InvokeUserCallbackLater(request.handle, request.callback, rv); 658 InvokeUserCallbackLater(request.handle, request.callback, rv);
649 } 659 }
650 } 660 }
651 661
652 bool WebSocketTransportClientSocketPool::DeleteStalledRequest( 662 bool WebSocketTransportClientSocketPool::DeleteStalledRequest(
653 ClientSocketHandle* handle) { 663 ClientSocketHandle* handle) {
654 StalledRequestMap::iterator it = stalled_request_map_.find(handle); 664 StalledRequestMap::iterator it = stalled_request_map_.find(handle);
(...skipping 29 matching lines...) Expand all
684 handle(handle), 694 handle(handle),
685 callback(callback), 695 callback(callback),
686 net_log(net_log) {} 696 net_log(net_log) {}
687 697
688 WebSocketTransportClientSocketPool::StalledRequest::StalledRequest( 698 WebSocketTransportClientSocketPool::StalledRequest::StalledRequest(
689 const StalledRequest& other) = default; 699 const StalledRequest& other) = default;
690 700
691 WebSocketTransportClientSocketPool::StalledRequest::~StalledRequest() {} 701 WebSocketTransportClientSocketPool::StalledRequest::~StalledRequest() {}
692 702
693 } // namespace net 703 } // namespace net
OLDNEW
« no previous file with comments | « net/socket/websocket_transport_client_socket_pool.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698