Chromium Code Reviews| Index: net/socket/websocket_transport_client_socket_pool.cc |
| diff --git a/net/socket/websocket_transport_client_socket_pool.cc b/net/socket/websocket_transport_client_socket_pool.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..91a8da8865298a2c197920e1f580c1c6b23a7da3 |
| --- /dev/null |
| +++ b/net/socket/websocket_transport_client_socket_pool.cc |
| @@ -0,0 +1,954 @@ |
| +// Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "net/socket/websocket_transport_client_socket_pool.h" |
| + |
| +#include <algorithm> |
| +#include <map> |
| + |
| +#include "base/compiler_specific.h" |
| +#include "base/containers/linked_list.h" |
| +#include "base/logging.h" |
| +#include "base/memory/singleton.h" |
| +#include "base/metrics/histogram.h" |
| +#include "base/numerics/safe_conversions.h" |
| +#include "base/stl_util.h" |
| +#include "base/strings/string_util.h" |
| +#include "base/time/time.h" |
| +#include "base/values.h" |
| +#include "net/base/ip_endpoint.h" |
| +#include "net/base/net_errors.h" |
| +#include "net/base/net_log.h" |
| +#include "net/socket/client_socket_factory.h" |
| +#include "net/socket/client_socket_handle.h" |
| +#include "net/socket/client_socket_pool_base.h" |
| +#include "net/socket/socket_net_log_params.h" |
| +#include "net/socket/tcp_client_socket.h" |
| + |
| +namespace net { |
| + |
| +namespace { |
| + |
| +using base::TimeDelta; |
| + |
| +// TODO(ricea): For now, we implement a global timeout for compatability will |
| +// TransportConnectJob. Since WebSocketTransportConnectJob controls the address |
| +// selection process more tightly, it could do something smarter here. |
| +const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes. |
| + |
| +} // namespace |
| + |
| +// TODO(willchan): Base this off RTT instead of statically setting it. |
| +const int WebSocketTransportConnectJob::kIPv6FallbackTimerInMs = 300; |
| + |
| +class WebSocketEndpointLockManager { |
| + public: |
| + typedef WebSocketTransportConnectJob::SubJob SubJob; |
| + static WebSocketEndpointLockManager* GetInstance(); |
| + |
| + // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the |
| + // lock was not acquired, then |job->GotEndpointLock()| will be called at some |
| + // future time. |
| + int LockEndpoint(const IPEndPoint& endpoint, SubJob* job); |
| + |
| + // Record the IPEndPoint associated with a particular socket. This is |
| + // necessary because TCPClientSocket refuses to return the PeerAddress after |
| + // the connection is disconnected. The association will be forgotten when |
| + // UnlockSocket() is called. The |socket| pointer must not be deleted between |
| + // the call to RememberSocket(). |
| + void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint); |
| + |
| + // Release the lock on an endpoint, and, if appropriate, trigger the next |
| + // socket connection. It is permitted to call UnlockSocket() multiple times |
| + // for the same |socket|; all calls after the first will be ignored. |
| + void UnlockSocket(StreamSocket* socket); |
| + |
| + // Release the lock on |endpoint|. Most callers should use UnlockSocket() |
| + // instead. |
| + void UnlockEndpoint(const IPEndPoint& endpoint); |
| + |
| + private: |
| + typedef base::LinkedList<SubJob> ConnectJobQueue; |
| + typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap; |
| + typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap; |
| + |
| + WebSocketEndpointLockManager() {} |
| + ~WebSocketEndpointLockManager() { |
| + DCHECK(endpoint_job_map_.empty()); |
| + DCHECK(socket_endpoint_map_.empty()); |
| + } |
| + |
| + EndPointJobMap endpoint_job_map_; |
| + SocketEndPointMap socket_endpoint_map_; |
| + |
| + friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager); |
| +}; |
| + |
| +WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() { |
| + return Singleton<WebSocketEndpointLockManager>::get(); |
| +} |
| + |
| +class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> { |
| + public: |
| + SubJob(const AddressList& addresses, |
| + WebSocketTransportConnectJob* parent_job, |
| + SubJobType type) |
| + : addresses_(addresses), |
| + parent_job_(parent_job), |
| + next_state_(STATE_NONE), |
| + current_address_index_(0), |
| + type_(type) {} |
| + |
| + ~SubJob() { |
| + // We don't worry about cancelling the host resolution and TCP connect, |
| + // since ~SingleRequestHostResolver and ~StreamSocket will take care of it. |
| + if (next()) { |
| + DCHECK(previous()); |
| + DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_); |
| + RemoveFromList(); |
| + } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) { |
| + DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size())); |
| + WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint( |
| + addresses_[current_address_index_]); |
| + } |
| + } |
| + |
| + // Start connecting. |
| + int Start() { |
| + DCHECK_EQ(STATE_NONE, next_state_); |
| + next_state_ = STATE_WAIT_FOR_LOCK; |
| + return DoLoop(OK); |
| + } |
| + |
| + bool started() { return next_state_ != STATE_NONE; } |
| + |
| + // Called by WebSocketEndpointLockManager when the lock becomes available. |
| + void GotEndpointLock() { OnIOComplete(OK); } |
| + |
| + LoadState GetLoadState() const { |
| + switch (next_state_) { |
| + case STATE_WAIT_FOR_LOCK: |
| + case STATE_WAIT_FOR_LOCK_COMPLETE: |
| + // TODO(ricea): Add a WebSocket-specific LOAD_STATE ? |
| + return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET; |
| + case STATE_TRANSPORT_CONNECT: |
| + case STATE_TRANSPORT_CONNECT_COMPLETE: |
| + return LOAD_STATE_CONNECTING; |
| + case STATE_NONE: |
| + return LOAD_STATE_IDLE; |
| + } |
| + NOTREACHED(); |
| + return LOAD_STATE_IDLE; |
| + } |
| + |
| + SubJobType type() const { return type_; } |
| + |
| + scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); } |
| + |
| + private: |
| + enum State { |
| + STATE_NONE, |
| + STATE_WAIT_FOR_LOCK, |
| + STATE_WAIT_FOR_LOCK_COMPLETE, |
| + STATE_TRANSPORT_CONNECT, |
| + STATE_TRANSPORT_CONNECT_COMPLETE, |
| + }; |
| + |
| + ClientSocketFactory* client_socket_factory() const { |
| + return parent_job_->client_socket_factory_; |
| + } |
| + const scoped_refptr<TransportSocketParams>& params() const { |
| + return parent_job_->params_; |
| + } |
| + const BoundNetLog& net_log() const { return parent_job_->net_log(); } |
| + |
| + void OnIOComplete(int result) { |
| + int rv = DoLoop(result); |
| + if (rv != ERR_IO_PENDING) |
| + parent_job_->OnSubJobComplete(rv, this); // |this| deleted |
| + } |
| + |
| + int DoLoop(int result) { |
| + DCHECK_NE(next_state_, STATE_NONE); |
| + |
| + int rv = result; |
| + do { |
| + State state = next_state_; |
| + next_state_ = STATE_NONE; |
| + switch (state) { |
| + case STATE_WAIT_FOR_LOCK: |
| + rv = DoEndpointLock(); |
| + break; |
| + case STATE_WAIT_FOR_LOCK_COMPLETE: |
| + rv = DoEndpointLockComplete(); |
| + break; |
| + case STATE_TRANSPORT_CONNECT: |
| + DCHECK_EQ(OK, rv); |
| + rv = DoTransportConnect(); |
| + break; |
| + case STATE_TRANSPORT_CONNECT_COMPLETE: |
| + rv = DoTransportConnectComplete(rv); |
| + break; |
| + default: |
| + NOTREACHED(); |
| + rv = ERR_FAILED; |
| + break; |
| + } |
| + } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE); |
| + |
| + return rv; |
| + } |
| + |
| + int DoEndpointLock() { |
| + DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size())); |
| + int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint( |
| + addresses_[current_address_index_], this); |
| + next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE; |
| + return rv; |
| + } |
| + |
| + int DoEndpointLockComplete() { |
| + next_state_ = STATE_TRANSPORT_CONNECT; |
| + return OK; |
| + } |
| + |
| + int DoTransportConnect() { |
| + next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE; |
| + DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size())); |
| + AddressList one_address(addresses_[current_address_index_]); |
| + transport_socket_ = client_socket_factory()->CreateTransportClientSocket( |
| + one_address, net_log().net_log(), net_log().source()); |
| + // This use of base::Unretained() is safe because transport_socket_ is |
| + // destroyed in the destructor. |
| + int rv = transport_socket_->Connect( |
| + base::Bind(&SubJob::OnIOComplete, base::Unretained(this))); |
| + return rv; |
| + } |
| + |
| + int DoTransportConnectComplete(int result) { |
| + WebSocketEndpointLockManager* endpoint_lock_manager = |
| + WebSocketEndpointLockManager::GetInstance(); |
| + if (result != OK) { |
| + endpoint_lock_manager->UnlockEndpoint(addresses_[current_address_index_]); |
| + |
| + if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) { |
| + // Try to fall back to the next address in the list. |
| + next_state_ = STATE_WAIT_FOR_LOCK; |
| + ++current_address_index_; |
| + result = OK; |
| + } |
| + |
| + return result; |
| + } |
| + |
| + endpoint_lock_manager->RememberSocket(transport_socket_.get(), |
| + addresses_[current_address_index_]); |
| + |
| + return result; |
| + } |
| + |
| + const AddressList addresses_; |
| + WebSocketTransportConnectJob* const parent_job_; |
| + |
| + State next_state_; |
| + int current_address_index_; |
| + const SubJobType type_; |
| + |
| + scoped_ptr<StreamSocket> transport_socket_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SubJob); |
| +}; |
| + |
| +int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint, |
| + SubJob* job) { |
| + EndPointJobMap::value_type insert_value(endpoint, NULL); |
| + std::pair<EndPointJobMap::iterator, bool> rv = |
| + endpoint_job_map_.insert(insert_value); |
| + if (rv.second) { |
| + DVLOG(3) << "Locking endpoint " << endpoint.ToString(); |
| + rv.first->second = new ConnectJobQueue; |
| + return OK; |
| + } |
| + DVLOG(3) << "Waiting for endpoint " << endpoint.ToString(); |
| + rv.first->second->Append(job); |
| + return ERR_IO_PENDING; |
| +} |
| + |
| +void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket, |
| + const IPEndPoint& endpoint) { |
| + bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type( |
| + socket, endpoint)).second; |
| + DCHECK(inserted); |
| + DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end()); |
| + DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for " |
| + << endpoint.ToString() << " (" << socket_endpoint_map_.size() |
| + << " sockets remembered)"; |
| +} |
| + |
| +void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) { |
| + SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket); |
| + if (socket_it == socket_endpoint_map_.end()) { |
| + DVLOG(3) << "Ignoring request to unlock already-unlocked socket" |
| + "(StreamSocket*)" << socket; |
| + return; |
| + } |
| + const IPEndPoint& endpoint = socket_it->second; |
| + DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for " |
| + << endpoint.ToString() << " (" << socket_endpoint_map_.size() |
| + << " sockets left)"; |
| + UnlockEndpoint(endpoint); |
| + socket_endpoint_map_.erase(socket_it); |
| +} |
| + |
| +void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) { |
| + EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint); |
| + CHECK(found_it != endpoint_job_map_.end()); // Security critical |
| + ConnectJobQueue* queue = found_it->second; |
| + if (queue->empty()) { |
| + DVLOG(3) << "Unlocking endpoint " << endpoint.ToString(); |
| + delete queue; |
| + endpoint_job_map_.erase(found_it); |
| + } else { |
| + DVLOG(3) << "Unlocking endpoint " << endpoint.ToString() |
| + << " and activating next waiter"; |
| + SubJob* next_job = queue->head()->value(); |
| + next_job->RemoveFromList(); |
| + next_job->GotEndpointLock(); |
| + } |
| +} |
| + |
| +WebSocketTransportConnectJob::WebSocketTransportConnectJob( |
| + const std::string& group_name, |
| + RequestPriority priority, |
| + const scoped_refptr<TransportSocketParams>& params, |
| + base::TimeDelta timeout_duration, |
| + const CompletionCallback& callback, |
| + ClientSocketFactory* client_socket_factory, |
| + HostResolver* host_resolver, |
| + ClientSocketHandle* handle, |
| + Delegate* delegate, |
| + NetLog* pool_net_log, |
| + const BoundNetLog& request_net_log) |
| + : ConnectJob(group_name, |
| + timeout_duration, |
| + priority, |
| + delegate, |
| + BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)), |
| + params_(params), |
| + client_socket_factory_(client_socket_factory), |
| + resolver_(host_resolver), |
| + histogram_(CONNECTION_LATENCY_UNKNOWN), |
| + handle_(handle), |
| + callback_(callback), |
| + request_net_log_(request_net_log), |
| + next_state_(STATE_NONE), |
| + had_ipv4_(false), |
| + had_ipv6_(false) {} |
| + |
| +WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {} |
| + |
| +LoadState WebSocketTransportConnectJob::GetLoadState() const { |
| + LoadState load_state = LOAD_STATE_RESOLVING_HOST; |
| + if (ipv6_job_) |
| + load_state = ipv6_job_->GetLoadState(); |
| + // LOAD_STATE_CONNECTING is better than |
| + // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET. |
| + if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING) |
| + load_state = ipv4_job_->GetLoadState(); |
| + return load_state; |
| +} |
| + |
| +void WebSocketTransportConnectJob::OnIOComplete(int result) { |
| + result = DoLoop(result); |
| + if (result != ERR_IO_PENDING) |
| + NotifyDelegateOfCompletion(result); |
| +} |
| + |
| +int WebSocketTransportConnectJob::DoLoop(int result) { |
| + DCHECK_NE(next_state_, STATE_NONE); |
| + |
| + int rv = result; |
| + do { |
| + State state = next_state_; |
| + next_state_ = STATE_NONE; |
| + switch (state) { |
| + case STATE_RESOLVE_HOST: |
| + rv = DoResolveHost(); |
| + break; |
| + case STATE_RESOLVE_HOST_COMPLETE: |
| + rv = DoResolveHostComplete(rv); |
| + break; |
| + case STATE_TRANSPORT_CONNECT: |
| + DCHECK_EQ(OK, rv); |
| + rv = DoTransportConnect(); |
| + break; |
| + case STATE_TRANSPORT_CONNECT_COMPLETE: |
| + rv = DoTransportConnectComplete(rv); |
| + break; |
| + default: |
| + NOTREACHED(); |
| + rv = ERR_FAILED; |
| + break; |
| + } |
| + } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE); |
| + |
| + return rv; |
| +} |
| + |
| +int WebSocketTransportConnectJob::DoResolveHost() { |
| + connect_timing_.dns_start = base::TimeTicks::Now(); |
| + next_state_ = STATE_RESOLVE_HOST_COMPLETE; |
| + |
| + // This use of base::Unretained is safe because resolver_ is destroyed in this |
| + // object's destructor. |
| + int result = |
| + resolver_.Resolve(params_->destination(), |
| + priority(), |
| + &addresses_, |
| + base::Bind(&WebSocketTransportConnectJob::OnIOComplete, |
| + base::Unretained(this)), |
| + net_log()); |
| + |
| + return result; |
| +} |
| + |
| +int WebSocketTransportConnectJob::DoResolveHostComplete(int result) { |
| + connect_timing_.dns_end = base::TimeTicks::Now(); |
| + // Overwrite connection start time, since for connections that do not go |
| + // through proxies, |connect_start| should not include dns lookup time. |
| + connect_timing_.connect_start = connect_timing_.dns_end; |
| + |
| + if (result == OK) { |
| + // Invoke callback, and abort if it fails. |
| + if (!params_->host_resolution_callback().is_null()) |
| + result = params_->host_resolution_callback().Run(addresses_, net_log()); |
| + |
| + if (result == OK) |
| + next_state_ = STATE_TRANSPORT_CONNECT; |
| + } |
| + |
| + return result; |
| +} |
| + |
| +int WebSocketTransportConnectJob::DoTransportConnect() { |
| + AddressList ipv4_addresses; |
| + AddressList ipv6_addresses; |
| + int result = ERR_UNEXPECTED; |
| + next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE; |
| + |
| + for (AddressList::const_iterator it = addresses_.begin(); |
| + it != addresses_.end(); |
| + ++it) { |
| + switch (it->GetFamily()) { |
| + case ADDRESS_FAMILY_IPV4: |
| + ipv4_addresses.push_back(*it); |
| + break; |
| + |
| + case ADDRESS_FAMILY_IPV6: |
| + ipv6_addresses.push_back(*it); |
| + break; |
| + |
| + default: |
| + DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily(); |
| + break; |
| + } |
| + } |
| + |
| + if (!ipv4_addresses.empty()) { |
| + had_ipv4_ = true; |
| + ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4)); |
| + } |
| + |
| + if (!ipv6_addresses.empty()) { |
| + had_ipv6_ = true; |
| + ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6)); |
| + result = ipv6_job_->Start(); |
| + if (result == OK) { |
| + SetSocket(ipv6_job_->PassSocket()); |
| + histogram_ = had_ipv4_ ? CONNECTION_LATENCY_IPV6_RACEABLE |
| + : CONNECTION_LATENCY_IPV6_SOLO; |
| + return result; |
| + } |
| + if (result == ERR_IO_PENDING && ipv4_job_) { |
| + // This use of base::Unretained is safe because fallback_timer_ is owned |
| + // by this object. |
| + fallback_timer_.Start( |
| + FROM_HERE, |
| + base::TimeDelta::FromMilliseconds(kIPv6FallbackTimerInMs), |
| + base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync, |
| + base::Unretained(this))); |
| + } |
| + if (result != ERR_IO_PENDING) |
| + ipv6_job_.reset(); |
| + } |
| + |
| + if (ipv4_job_ && !ipv6_job_) { |
| + result = ipv4_job_->Start(); |
| + if (result == OK) { |
| + SetSocket(ipv4_job_->PassSocket()); |
| + histogram_ = had_ipv6_ ? CONNECTION_LATENCY_IPV4_WINS_RACE |
| + : CONNECTION_LATENCY_IPV4_NO_RACE; |
| + } |
| + } |
| + |
| + return result; |
| +} |
| + |
| +int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) { |
| + if (result == OK) { |
| + DCHECK(!connect_timing_.connect_start.is_null()); |
|
tyoshino (SeeGerritForStatus)
2014/05/22 06:04:01
please try to reduce code duplication even by spli
Adam Rice
2014/05/22 14:08:00
Okay, I have split out the common code from here a
|
| + DCHECK(!connect_timing_.dns_start.is_null()); |
| + const base::TimeTicks now = base::TimeTicks::Now(); |
| + const base::TimeDelta total_duration = now - connect_timing_.dns_start; |
| + const base::TimeDelta one_millisecond = |
| + base::TimeDelta::FromMilliseconds(1); |
| + const base::TimeDelta ten_minutes = base::TimeDelta::FromMinutes(10); |
| + |
| + UMA_HISTOGRAM_CUSTOM_TIMES("Net.DNS_Resolution_And_TCP_Connection_Latency2", |
| + total_duration, |
| + one_millisecond, |
| + ten_minutes, |
| + 100); |
| + |
| + base::TimeDelta connect_duration = now - connect_timing_.connect_start; |
| + UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency", |
| + connect_duration, |
| + one_millisecond, |
| + ten_minutes, |
| + 100); |
| + |
| + switch (histogram_) { |
| + case CONNECTION_LATENCY_IPV4_WINS_RACE: |
| + UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv4_Wins_Race", |
| + connect_duration, |
| + one_millisecond, |
| + ten_minutes, |
| + 100); |
| + break; |
| + |
| + case CONNECTION_LATENCY_IPV4_NO_RACE: |
| + UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv4_No_Race", |
| + connect_duration, |
| + one_millisecond, |
| + ten_minutes, |
| + 100); |
| + break; |
| + |
| + case CONNECTION_LATENCY_IPV6_RACEABLE: |
| + UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv6_Raceable", |
| + connect_duration, |
| + one_millisecond, |
| + ten_minutes, |
| + 100); |
| + break; |
| + |
| + case CONNECTION_LATENCY_IPV6_SOLO: |
| + UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv6_Solo", |
| + connect_duration, |
| + one_millisecond, |
| + ten_minutes, |
| + 100); |
| + break; |
| + |
| + default: |
| + NOTREACHED(); |
| + break; |
| + } |
| + } |
| + return result; |
| +} |
| + |
| +void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) { |
| + if (result == OK) { |
| + switch (job->type()) { |
| + case SUB_JOB_IPV4: |
| + histogram_ = had_ipv6_ ? CONNECTION_LATENCY_IPV4_WINS_RACE |
| + : CONNECTION_LATENCY_IPV4_NO_RACE; |
| + break; |
| + |
| + case SUB_JOB_IPV6: |
| + histogram_ = had_ipv4_ ? CONNECTION_LATENCY_IPV6_RACEABLE |
| + : CONNECTION_LATENCY_IPV6_SOLO; |
| + break; |
| + } |
| + SetSocket(job->PassSocket()); |
| + |
| + // Make sure all connections are cancelled even if this object fails to be |
| + // deleted. |
| + ipv4_job_.reset(); |
| + ipv6_job_.reset(); |
| + } else { |
| + switch (job->type()) { |
| + case SUB_JOB_IPV4: |
| + ipv4_job_.reset(); |
| + break; |
| + |
| + case SUB_JOB_IPV6: |
| + ipv6_job_.reset(); |
| + if (ipv4_job_ && !ipv4_job_->started()) { |
| + fallback_timer_.Stop(); |
| + result = ipv4_job_->Start(); |
| + if (result != ERR_IO_PENDING) { |
| + OnSubJobComplete(result, ipv4_job_.get()); |
| + return; |
| + } |
| + } |
| + break; |
| + } |
| + if (ipv4_job_ || ipv6_job_) |
| + result = ERR_IO_PENDING; |
| + } |
| + OnIOComplete(result); |
| +} |
| + |
| +void WebSocketTransportConnectJob::StartIPv4JobAsync() { |
| + DCHECK(ipv4_job_); |
| + int result = ipv4_job_->Start(); |
| + if (result != ERR_IO_PENDING) |
| + OnSubJobComplete(result, ipv4_job_.get()); |
| +} |
| + |
| +int WebSocketTransportConnectJob::ConnectInternal() { |
| + next_state_ = STATE_RESOLVE_HOST; |
| + return DoLoop(OK); |
| +} |
| + |
| +WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool( |
| + int max_sockets, |
| + int max_sockets_per_group, |
| + ClientSocketPoolHistograms* histograms, |
| + HostResolver* host_resolver, |
| + ClientSocketFactory* client_socket_factory, |
| + NetLog* net_log) |
| + : TransportClientSocketPool(max_sockets, |
| + max_sockets_per_group, |
| + histograms, |
| + host_resolver, |
| + client_socket_factory, |
| + net_log), |
| + connect_job_delegate_(this), |
| + histograms_(histograms), |
| + pool_net_log_(net_log), |
| + client_socket_factory_(client_socket_factory), |
| + host_resolver_(host_resolver), |
| + max_sockets_(max_sockets), |
| + handed_out_socket_count_(0), |
| + is_stalled_(false), |
| + weak_factory_(this) {} |
| + |
| +WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() { |
| + DCHECK(pending_connects_.empty()); |
| + DCHECK_EQ(0, handed_out_socket_count_); |
| + DCHECK(!is_stalled_); |
| +} |
| + |
| +// static |
| +void WebSocketTransportClientSocketPool::UnlockEndpoint( |
| + ClientSocketHandle* handle) { |
| + DCHECK(handle->is_initialized()); |
| + WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket()); |
| +} |
| + |
| +int WebSocketTransportClientSocketPool::RequestSocket( |
| + const std::string& group_name, |
| + const void* params, |
| + RequestPriority priority, |
| + ClientSocketHandle* handle, |
| + const CompletionCallback& callback, |
| + const BoundNetLog& request_net_log) { |
| + DCHECK(params); |
| + const scoped_refptr<TransportSocketParams>& casted_params = |
| + *static_cast<const scoped_refptr<TransportSocketParams>*>(params); |
| + |
| + if (request_net_log.IsLogging()) { |
| + // TODO(eroman): Split out the host and port parameters. |
| + request_net_log.AddEvent( |
| + NetLog::TYPE_TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKET, |
| + CreateNetLogHostPortPairCallback( |
| + &casted_params->destination().host_port_pair())); |
| + } |
| + |
| + CHECK(!callback.is_null()); |
| + CHECK(handle); |
| + |
| + request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL); |
| + |
| + if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) { |
| + request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS); |
| + is_stalled_ = true; |
| + return ERR_IO_PENDING; |
| + } |
| + |
| + scoped_ptr<WebSocketTransportConnectJob> connect_job( |
| + new WebSocketTransportConnectJob( |
| + group_name, |
| + priority, |
| + casted_params, |
| + base::TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds), |
| + callback, |
| + client_socket_factory_, |
| + host_resolver_, |
| + handle, |
| + &connect_job_delegate_, |
| + pool_net_log_, |
| + request_net_log)); |
| + |
| + int rv = connect_job->Connect(); |
| + request_net_log.AddEvent( |
| + NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB, |
| + connect_job->net_log().source().ToEventParametersCallback()); |
| + if (rv == OK) { |
| + HandOutSocket(connect_job->PassSocket(), |
| + connect_job->connect_timing(), |
| + handle, |
| + request_net_log); |
| + } else if (rv == ERR_IO_PENDING) { |
| + // TODO(ricea): Implement backup job timer? |
| + AddJob(handle, connect_job.Pass()); |
| + } else { |
| + scoped_ptr<StreamSocket> error_socket; |
| + connect_job->GetAdditionalErrorState(handle); |
| + error_socket = connect_job->PassSocket(); |
| + if (error_socket) { |
| + HandOutSocket(error_socket.Pass(), |
| + connect_job->connect_timing(), |
| + handle, |
| + request_net_log); |
| + } |
| + } |
| + |
| + if (rv != ERR_IO_PENDING) { |
| + request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv); |
| + } |
| + |
| + return rv; |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::RequestSockets( |
| + const std::string& group_name, |
| + const void* params, |
| + int num_sockets, |
| + const BoundNetLog& net_log) { |
| + NOTIMPLEMENTED(); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::CancelRequest( |
| + const std::string& group_name, |
| + ClientSocketHandle* handle) { |
| + CancelJob(handle); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::ReleaseSocket( |
| + const std::string& group_name, |
| + scoped_ptr<StreamSocket> socket, |
| + int id) { |
| + WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get()); |
| + CHECK_GT(handed_out_socket_count_, 0); |
| + --handed_out_socket_count_; |
| + if (!ReachedMaxSocketsLimit()) |
| + is_stalled_ = false; |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::FlushWithError(int error) { |
| + CancelAllConnectJobs(); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::CloseIdleSockets() { |
| + // We have no idle sockets. |
| +} |
| + |
| +int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; } |
| + |
| +int WebSocketTransportClientSocketPool::IdleSocketCountInGroup( |
| + const std::string& group_name) const { |
| + return 0; |
| +} |
| + |
| +LoadState WebSocketTransportClientSocketPool::GetLoadState( |
| + const std::string& group_name, |
| + const ClientSocketHandle* handle) const { |
| + if (pending_callbacks_.count(handle)) |
| + return LOAD_STATE_CONNECTING; |
| + return LookupConnectJob(handle)->GetLoadState(); |
| +} |
| + |
| +base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue( |
| + const std::string& name, |
| + const std::string& type, |
| + bool include_nested_pools) const { |
| + base::DictionaryValue* dict = new base::DictionaryValue(); |
| + dict->SetString("name", name); |
| + dict->SetString("type", type); |
| + dict->SetInteger("handed_out_socket_count", handed_out_socket_count_); |
| + dict->SetInteger("connecting_socket_count", pending_connects_.size()); |
| + dict->SetInteger("idle_socket_count", 0); |
| + dict->SetInteger("max_socket_count", max_sockets_); |
| + dict->SetInteger("max_sockets_per_group", max_sockets_); |
| + dict->SetInteger("pool_generation_number", 0); |
| + return dict; |
| +} |
| + |
| +base::TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const { |
| + return base::TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds); |
| +} |
| + |
| +ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms() |
| + const { |
| + return histograms_; |
| +} |
| + |
| +bool WebSocketTransportClientSocketPool::IsStalled() const { |
| + return is_stalled_; |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::AddHigherLayeredPool( |
| + HigherLayeredPool* higher_pool) { |
| + CHECK(higher_pool); |
| + CHECK(!ContainsKey(higher_pools_, higher_pool)); |
| + higher_pools_.insert(higher_pool); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::RemoveHigherLayeredPool( |
| + HigherLayeredPool* higher_pool) { |
| + CHECK(higher_pool); |
| + CHECK(ContainsKey(higher_pools_, higher_pool)); |
| + higher_pools_.erase(higher_pool); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::OnConnectJobComplete( |
| + int result, |
| + WebSocketTransportConnectJob* job) { |
| + DCHECK_NE(ERR_IO_PENDING, result); |
| + |
| + scoped_ptr<StreamSocket> socket = job->PassSocket(); |
| + |
| + BoundNetLog request_net_log = job->request_net_log(); |
| + CompletionCallback callback = job->callback(); |
| + LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing(); |
| + |
| + ClientSocketHandle* const handle = job->handle(); |
| + |
| + if (result == OK) { |
| + DCHECK(socket.get()); |
| + HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log); |
| + request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL); |
| + } else { |
| + // If we got a socket, it must contain error information so pass that |
| + // up so that the caller can retrieve it. |
| + bool handed_out_socket = false; |
| + job->GetAdditionalErrorState(handle); |
| + if (socket.get()) { |
| + handed_out_socket = true; |
| + HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log); |
| + } |
| + request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result); |
| + |
| + if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit()) |
| + is_stalled_ = false; |
| + } |
| + bool delete_succeeded = DeleteJob(handle); |
| + DCHECK(delete_succeeded); |
| + InvokeUserCallbackLater(handle, callback, result); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::InvokeUserCallbackLater( |
| + ClientSocketHandle* handle, |
| + const CompletionCallback& callback, |
| + int rv) { |
| + DCHECK(!pending_callbacks_.count(handle)); |
| + pending_callbacks_.insert(handle); |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback, |
| + weak_factory_.GetWeakPtr(), |
| + handle, |
| + callback, |
| + rv)); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::InvokeUserCallback( |
| + ClientSocketHandle* handle, |
| + const CompletionCallback& callback, |
| + int rv) { |
| + if (pending_callbacks_.erase(handle)) |
| + callback.Run(rv); |
| +} |
| + |
| +bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const { |
| + return base::checked_cast<int>(pending_connects_.size()) + |
| + handed_out_socket_count_ >= |
| + max_sockets_; |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::HandOutSocket( |
| + scoped_ptr<StreamSocket> socket, |
| + const LoadTimingInfo::ConnectTiming& connect_timing, |
| + ClientSocketHandle* handle, |
| + const BoundNetLog& net_log) { |
| + DCHECK(socket); |
| + handle->SetSocket(socket.Pass()); |
| + handle->set_reuse_type(ClientSocketHandle::UNUSED); |
| + handle->set_idle_time(base::TimeDelta()); |
| + handle->set_pool_id(0); |
| + handle->set_connect_timing(connect_timing); |
| + |
| + net_log.AddEvent( |
| + NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET, |
| + handle->socket()->NetLog().source().ToEventParametersCallback()); |
| + |
| + ++handed_out_socket_count_; |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::AddJob( |
| + ClientSocketHandle* handle, |
| + scoped_ptr<WebSocketTransportConnectJob> connect_job) { |
| + bool inserted = |
| + pending_connects_.insert(PendingConnectsMap::value_type( |
| + handle, connect_job.release())).second; |
| + DCHECK(inserted); |
| +} |
| + |
| +bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) { |
| + PendingConnectsMap::iterator it = pending_connects_.find(handle); |
| + if (it == pending_connects_.end()) |
| + return false; |
| + delete it->second, it->second = NULL; |
| + pending_connects_.erase(it); |
| + return true; |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::CancelJob(ClientSocketHandle* handle) { |
| + if (!DeleteJob(handle)) |
| + pending_callbacks_.erase(handle); |
| +} |
| + |
| +void WebSocketTransportClientSocketPool::CancelAllConnectJobs() { |
| + STLDeleteValues(&pending_connects_); |
| +} |
| + |
| +const WebSocketTransportConnectJob* |
| +WebSocketTransportClientSocketPool::LookupConnectJob( |
| + const ClientSocketHandle* handle) const { |
| + PendingConnectsMap::const_iterator it = pending_connects_.find(handle); |
| + CHECK(it != pending_connects_.end()); |
| + return it->second; |
| +} |
| + |
| +WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate( |
| + WebSocketTransportClientSocketPool* owner) |
| + : owner_(owner) {} |
| + |
| +WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {} |
| + |
| +void |
| +WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete( |
| + int result, |
| + ConnectJob* job) { |
| + owner_->OnConnectJobComplete(result, |
| + static_cast<WebSocketTransportConnectJob*>(job)); |
| +} |
| + |
| +} // namespace net |