| Index: net/socket/websocket_transport_client_socket_pool.cc
|
| diff --git a/net/socket/websocket_transport_client_socket_pool.cc b/net/socket/websocket_transport_client_socket_pool.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..bdd5312bb7b424214d02bd191da1b5dc0108a902
|
| --- /dev/null
|
| +++ b/net/socket/websocket_transport_client_socket_pool.cc
|
| @@ -0,0 +1,813 @@
|
| +// Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "net/socket/websocket_transport_client_socket_pool.h"
|
| +
|
| +#include <algorithm>
|
| +#include <map>
|
| +#include <utility>
|
| +
|
| +#include "base/compiler_specific.h"
|
| +#include "base/containers/linked_list.h"
|
| +#include "base/logging.h"
|
| +#include "base/memory/singleton.h"
|
| +#include "base/numerics/safe_conversions.h"
|
| +#include "base/stl_util.h"
|
| +#include "base/strings/string_util.h"
|
| +#include "base/time/time.h"
|
| +#include "base/values.h"
|
| +#include "net/base/ip_endpoint.h"
|
| +#include "net/base/net_errors.h"
|
| +#include "net/base/net_log.h"
|
| +#include "net/socket/client_socket_factory.h"
|
| +#include "net/socket/client_socket_handle.h"
|
| +#include "net/socket/client_socket_pool_base.h"
|
| +
|
| +namespace net {
|
| +
|
| +namespace {
|
| +
|
| +using base::TimeDelta;
|
| +
|
| +// TODO(ricea): For now, we implement a global timeout for compatability with
|
| +// TransportConnectJob. Since WebSocketTransportConnectJob controls the address
|
| +// selection process more tightly, it could do something smarter here.
|
| +const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes.
|
| +
|
| +} // namespace
|
| +
|
| +class WebSocketEndpointLockManager {
|
| + public:
|
| + typedef WebSocketTransportConnectJob::SubJob SubJob;
|
| + static WebSocketEndpointLockManager* GetInstance();
|
| +
|
| + // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the
|
| + // lock was not acquired, then |job->GotEndpointLock()| will be called when it
|
| + // is.
|
| + int LockEndpoint(const IPEndPoint& endpoint, SubJob* job);
|
| +
|
| + // Record the IPEndPoint associated with a particular socket. This is
|
| + // necessary because TCPClientSocket refuses to return the PeerAddress after
|
| + // the connection is disconnected. The association will be forgotten when
|
| + // UnlockSocket() is called. The |socket| pointer must not be deleted between
|
| + // the call to RememberSocket().
|
| + void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint);
|
| +
|
| + // Release the lock on an endpoint, and, if appropriate, trigger the next
|
| + // socket connection. It is permitted to call UnlockSocket() multiple times
|
| + // for the same |socket|; all calls after the first will be ignored.
|
| + void UnlockSocket(StreamSocket* socket);
|
| +
|
| + // Release the lock on |endpoint|. Most callers should use UnlockSocket()
|
| + // instead.
|
| + void UnlockEndpoint(const IPEndPoint& endpoint);
|
| +
|
| + private:
|
| + typedef base::LinkedList<SubJob> ConnectJobQueue;
|
| + typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap;
|
| + typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap;
|
| +
|
| + WebSocketEndpointLockManager() {}
|
| + ~WebSocketEndpointLockManager() {
|
| + DCHECK(endpoint_job_map_.empty());
|
| + DCHECK(socket_endpoint_map_.empty());
|
| + }
|
| +
|
| + EndPointJobMap endpoint_job_map_;
|
| + SocketEndPointMap socket_endpoint_map_;
|
| +
|
| + friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager);
|
| +};
|
| +
|
| +WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() {
|
| + return Singleton<WebSocketEndpointLockManager>::get();
|
| +}
|
| +
|
| +class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> {
|
| + public:
|
| + SubJob(const AddressList& addresses,
|
| + WebSocketTransportConnectJob* parent_job,
|
| + SubJobType type)
|
| + : addresses_(addresses),
|
| + parent_job_(parent_job),
|
| + next_state_(STATE_NONE),
|
| + current_address_index_(0),
|
| + type_(type) {}
|
| +
|
| + ~SubJob() {
|
| + // We don't worry about cancelling the TCP connect, since ~StreamSocket will
|
| + // take care of it.
|
| + if (next()) {
|
| + DCHECK(previous());
|
| + DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
|
| + RemoveFromList();
|
| + } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) {
|
| + DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
|
| + WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(
|
| + addresses_[current_address_index_]);
|
| + }
|
| + }
|
| +
|
| + // Start connecting.
|
| + int Start() {
|
| + DCHECK_EQ(STATE_NONE, next_state_);
|
| + next_state_ = STATE_WAIT_FOR_LOCK;
|
| + return DoLoop(OK);
|
| + }
|
| +
|
| + bool started() { return next_state_ != STATE_NONE; }
|
| +
|
| + // Called by WebSocketEndpointLockManager when the lock becomes available.
|
| + void GotEndpointLock() { OnIOComplete(OK); }
|
| +
|
| + LoadState GetLoadState() const {
|
| + switch (next_state_) {
|
| + case STATE_WAIT_FOR_LOCK:
|
| + case STATE_WAIT_FOR_LOCK_COMPLETE:
|
| + // TODO(ricea): Add a WebSocket-specific LOAD_STATE ?
|
| + return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
|
| + case STATE_TRANSPORT_CONNECT:
|
| + case STATE_TRANSPORT_CONNECT_COMPLETE:
|
| + return LOAD_STATE_CONNECTING;
|
| + case STATE_NONE:
|
| + return LOAD_STATE_IDLE;
|
| + }
|
| + NOTREACHED();
|
| + return LOAD_STATE_IDLE;
|
| + }
|
| +
|
| + SubJobType type() const { return type_; }
|
| +
|
| + scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); }
|
| +
|
| + private:
|
| + enum State {
|
| + STATE_NONE,
|
| + STATE_WAIT_FOR_LOCK,
|
| + STATE_WAIT_FOR_LOCK_COMPLETE,
|
| + STATE_TRANSPORT_CONNECT,
|
| + STATE_TRANSPORT_CONNECT_COMPLETE,
|
| + };
|
| +
|
| + ClientSocketFactory* client_socket_factory() const {
|
| + return parent_job_->data_.client_socket_factory();
|
| + }
|
| +
|
| + const BoundNetLog& net_log() const { return parent_job_->net_log(); }
|
| +
|
| + void OnIOComplete(int result) {
|
| + int rv = DoLoop(result);
|
| + if (rv != ERR_IO_PENDING)
|
| + parent_job_->OnSubJobComplete(rv, this); // |this| deleted
|
| + }
|
| +
|
| + int DoLoop(int result) {
|
| + DCHECK_NE(next_state_, STATE_NONE);
|
| +
|
| + int rv = result;
|
| + do {
|
| + State state = next_state_;
|
| + next_state_ = STATE_NONE;
|
| + switch (state) {
|
| + case STATE_WAIT_FOR_LOCK:
|
| + rv = DoEndpointLock();
|
| + break;
|
| + case STATE_WAIT_FOR_LOCK_COMPLETE:
|
| + rv = DoEndpointLockComplete();
|
| + break;
|
| + case STATE_TRANSPORT_CONNECT:
|
| + DCHECK_EQ(OK, rv);
|
| + rv = DoTransportConnect();
|
| + break;
|
| + case STATE_TRANSPORT_CONNECT_COMPLETE:
|
| + rv = DoTransportConnectComplete(rv);
|
| + break;
|
| + default:
|
| + NOTREACHED();
|
| + rv = ERR_FAILED;
|
| + break;
|
| + }
|
| + } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
|
| +
|
| + return rv;
|
| + }
|
| +
|
| + int DoEndpointLock() {
|
| + DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
|
| + int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint(
|
| + addresses_[current_address_index_], this);
|
| + next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE;
|
| + return rv;
|
| + }
|
| +
|
| + int DoEndpointLockComplete() {
|
| + next_state_ = STATE_TRANSPORT_CONNECT;
|
| + return OK;
|
| + }
|
| +
|
| + int DoTransportConnect() {
|
| + next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
|
| + DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
|
| + AddressList one_address(addresses_[current_address_index_]);
|
| + transport_socket_ = client_socket_factory()->CreateTransportClientSocket(
|
| + one_address, net_log().net_log(), net_log().source());
|
| + // This use of base::Unretained() is safe because transport_socket_ is
|
| + // destroyed in the destructor.
|
| + return transport_socket_->Connect(
|
| + base::Bind(&SubJob::OnIOComplete, base::Unretained(this)));
|
| + }
|
| +
|
| + int DoTransportConnectComplete(int result) {
|
| + WebSocketEndpointLockManager* endpoint_lock_manager =
|
| + WebSocketEndpointLockManager::GetInstance();
|
| + if (result != OK) {
|
| + endpoint_lock_manager->UnlockEndpoint(addresses_[current_address_index_]);
|
| +
|
| + if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) {
|
| + // Try falling back to the next address in the list.
|
| + next_state_ = STATE_WAIT_FOR_LOCK;
|
| + ++current_address_index_;
|
| + result = OK;
|
| + }
|
| +
|
| + return result;
|
| + }
|
| +
|
| + endpoint_lock_manager->RememberSocket(transport_socket_.get(),
|
| + addresses_[current_address_index_]);
|
| +
|
| + return result;
|
| + }
|
| +
|
| + const AddressList addresses_;
|
| + WebSocketTransportConnectJob* const parent_job_;
|
| +
|
| + State next_state_;
|
| + int current_address_index_;
|
| + const SubJobType type_;
|
| +
|
| + scoped_ptr<StreamSocket> transport_socket_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(SubJob);
|
| +};
|
| +
|
| +int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint,
|
| + SubJob* job) {
|
| + EndPointJobMap::value_type insert_value(endpoint, NULL);
|
| + std::pair<EndPointJobMap::iterator, bool> rv =
|
| + endpoint_job_map_.insert(insert_value);
|
| + if (rv.second) {
|
| + DVLOG(3) << "Locking endpoint " << endpoint.ToString();
|
| + rv.first->second = new ConnectJobQueue;
|
| + return OK;
|
| + }
|
| + DVLOG(3) << "Waiting for endpoint " << endpoint.ToString();
|
| + rv.first->second->Append(job);
|
| + return ERR_IO_PENDING;
|
| +}
|
| +
|
| +void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket,
|
| + const IPEndPoint& endpoint) {
|
| + bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type(
|
| + socket, endpoint)).second;
|
| + DCHECK(inserted);
|
| + DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end());
|
| + DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for "
|
| + << endpoint.ToString() << " (" << socket_endpoint_map_.size()
|
| + << " sockets remembered)";
|
| +}
|
| +
|
| +void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) {
|
| + SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket);
|
| + if (socket_it == socket_endpoint_map_.end()) {
|
| + DVLOG(3) << "Ignoring request to unlock already-unlocked socket"
|
| + "(StreamSocket*)" << socket;
|
| + return;
|
| + }
|
| + const IPEndPoint& endpoint = socket_it->second;
|
| + DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for "
|
| + << endpoint.ToString() << " (" << socket_endpoint_map_.size()
|
| + << " sockets left)";
|
| + UnlockEndpoint(endpoint);
|
| + socket_endpoint_map_.erase(socket_it);
|
| +}
|
| +
|
| +void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) {
|
| + EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint);
|
| + CHECK(found_it != endpoint_job_map_.end()); // Security critical
|
| + ConnectJobQueue* queue = found_it->second;
|
| + if (queue->empty()) {
|
| + DVLOG(3) << "Unlocking endpoint " << endpoint.ToString();
|
| + delete queue;
|
| + endpoint_job_map_.erase(found_it);
|
| + } else {
|
| + DVLOG(3) << "Unlocking endpoint " << endpoint.ToString()
|
| + << " and activating next waiter";
|
| + SubJob* next_job = queue->head()->value();
|
| + next_job->RemoveFromList();
|
| + next_job->GotEndpointLock();
|
| + }
|
| +}
|
| +
|
| +WebSocketTransportConnectJob::WebSocketTransportConnectJob(
|
| + const std::string& group_name,
|
| + RequestPriority priority,
|
| + const scoped_refptr<TransportSocketParams>& params,
|
| + TimeDelta timeout_duration,
|
| + const CompletionCallback& callback,
|
| + ClientSocketFactory* client_socket_factory,
|
| + HostResolver* host_resolver,
|
| + ClientSocketHandle* handle,
|
| + Delegate* delegate,
|
| + NetLog* pool_net_log,
|
| + const BoundNetLog& request_net_log)
|
| + : ConnectJob(group_name,
|
| + timeout_duration,
|
| + priority,
|
| + delegate,
|
| + BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
|
| + data_(params, client_socket_factory, host_resolver, &connect_timing_),
|
| + race_result_(TransportConnectJobCommon::CONNECTION_LATENCY_UNKNOWN),
|
| + handle_(handle),
|
| + callback_(callback),
|
| + request_net_log_(request_net_log),
|
| + had_ipv4_(false),
|
| + had_ipv6_(false) {
|
| + data_.SetOnIOComplete(this);
|
| +}
|
| +
|
| +WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
|
| +
|
| +LoadState WebSocketTransportConnectJob::GetLoadState() const {
|
| + LoadState load_state = LOAD_STATE_RESOLVING_HOST;
|
| + if (ipv6_job_)
|
| + load_state = ipv6_job_->GetLoadState();
|
| + // LOAD_STATE_CONNECTING is better than
|
| + // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET.
|
| + if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
|
| + load_state = ipv4_job_->GetLoadState();
|
| + return load_state;
|
| +}
|
| +
|
| +int WebSocketTransportConnectJob::DoResolveHost() {
|
| + return data_.DoResolveHost(priority(), net_log());
|
| +}
|
| +
|
| +int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
|
| + return data_.DoResolveHostComplete(result, net_log());
|
| +}
|
| +
|
| +int WebSocketTransportConnectJob::DoTransportConnect() {
|
| + AddressList ipv4_addresses;
|
| + AddressList ipv6_addresses;
|
| + int result = ERR_UNEXPECTED;
|
| + data_.set_next_state(
|
| + TransportConnectJobCommon::STATE_TRANSPORT_CONNECT_COMPLETE);
|
| +
|
| + for (AddressList::const_iterator it = data_.addresses().begin();
|
| + it != data_.addresses().end();
|
| + ++it) {
|
| + switch (it->GetFamily()) {
|
| + case ADDRESS_FAMILY_IPV4:
|
| + ipv4_addresses.push_back(*it);
|
| + break;
|
| +
|
| + case ADDRESS_FAMILY_IPV6:
|
| + ipv6_addresses.push_back(*it);
|
| + break;
|
| +
|
| + default:
|
| + DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
|
| + break;
|
| + }
|
| + }
|
| +
|
| + if (!ipv4_addresses.empty()) {
|
| + had_ipv4_ = true;
|
| + ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4));
|
| + }
|
| +
|
| + if (!ipv6_addresses.empty()) {
|
| + had_ipv6_ = true;
|
| + ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6));
|
| + result = ipv6_job_->Start();
|
| + if (result == OK) {
|
| + SetSocket(ipv6_job_->PassSocket());
|
| + race_result_ =
|
| + had_ipv4_
|
| + ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_RACEABLE
|
| + : TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_SOLO;
|
| + return result;
|
| + }
|
| + if (result == ERR_IO_PENDING && ipv4_job_) {
|
| + // This use of base::Unretained is safe because fallback_timer_ is owned
|
| + // by this object.
|
| + fallback_timer_.Start(
|
| + FROM_HERE,
|
| + TimeDelta::FromMilliseconds(
|
| + TransportConnectJobCommon::kIPv6FallbackTimerInMs),
|
| + base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
|
| + base::Unretained(this)));
|
| + }
|
| + if (result != ERR_IO_PENDING)
|
| + ipv6_job_.reset();
|
| + }
|
| +
|
| + if (ipv4_job_ && !ipv6_job_) {
|
| + result = ipv4_job_->Start();
|
| + if (result == OK) {
|
| + SetSocket(ipv4_job_->PassSocket());
|
| + race_result_ =
|
| + had_ipv6_
|
| + ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_WINS_RACE
|
| + : TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_NO_RACE;
|
| + }
|
| + }
|
| +
|
| + return result;
|
| +}
|
| +
|
| +int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
|
| + if (result == OK)
|
| + data_.HistogramDuration(race_result_);
|
| + return result;
|
| +}
|
| +
|
| +void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) {
|
| + if (result == OK) {
|
| + switch (job->type()) {
|
| + case SUB_JOB_IPV4:
|
| + race_result_ =
|
| + had_ipv6_
|
| + ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_WINS_RACE
|
| + : TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_NO_RACE;
|
| + break;
|
| +
|
| + case SUB_JOB_IPV6:
|
| + race_result_ =
|
| + had_ipv4_
|
| + ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_RACEABLE
|
| + : TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_SOLO;
|
| + break;
|
| + }
|
| + SetSocket(job->PassSocket());
|
| +
|
| + // Make sure all connections are cancelled even if this object fails to be
|
| + // deleted.
|
| + ipv4_job_.reset();
|
| + ipv6_job_.reset();
|
| + } else {
|
| + switch (job->type()) {
|
| + case SUB_JOB_IPV4:
|
| + ipv4_job_.reset();
|
| + break;
|
| +
|
| + case SUB_JOB_IPV6:
|
| + ipv6_job_.reset();
|
| + if (ipv4_job_ && !ipv4_job_->started()) {
|
| + fallback_timer_.Stop();
|
| + result = ipv4_job_->Start();
|
| + if (result != ERR_IO_PENDING) {
|
| + OnSubJobComplete(result, ipv4_job_.get());
|
| + return;
|
| + }
|
| + }
|
| + break;
|
| + }
|
| + if (ipv4_job_ || ipv6_job_)
|
| + return;
|
| + }
|
| + data_.OnIOComplete(this, result);
|
| +}
|
| +
|
| +void WebSocketTransportConnectJob::StartIPv4JobAsync() {
|
| + DCHECK(ipv4_job_);
|
| + int result = ipv4_job_->Start();
|
| + if (result != ERR_IO_PENDING)
|
| + OnSubJobComplete(result, ipv4_job_.get());
|
| +}
|
| +
|
| +int WebSocketTransportConnectJob::ConnectInternal() {
|
| + return data_.DoConnectInternal(this);
|
| +}
|
| +
|
| +WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
|
| + int max_sockets,
|
| + int max_sockets_per_group,
|
| + ClientSocketPoolHistograms* histograms,
|
| + HostResolver* host_resolver,
|
| + ClientSocketFactory* client_socket_factory,
|
| + NetLog* net_log)
|
| + : TransportClientSocketPool(max_sockets,
|
| + max_sockets_per_group,
|
| + histograms,
|
| + host_resolver,
|
| + client_socket_factory,
|
| + net_log),
|
| + connect_job_delegate_(this),
|
| + histograms_(histograms),
|
| + pool_net_log_(net_log),
|
| + client_socket_factory_(client_socket_factory),
|
| + host_resolver_(host_resolver),
|
| + max_sockets_(max_sockets),
|
| + handed_out_socket_count_(0),
|
| + is_stalled_(false),
|
| + weak_factory_(this) {}
|
| +
|
| +WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
|
| + DCHECK(pending_connects_.empty());
|
| + DCHECK_EQ(0, handed_out_socket_count_);
|
| + DCHECK(!is_stalled_);
|
| +}
|
| +
|
| +// static
|
| +void WebSocketTransportClientSocketPool::UnlockEndpoint(
|
| + ClientSocketHandle* handle) {
|
| + DCHECK(handle->is_initialized());
|
| + WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket());
|
| +}
|
| +
|
| +int WebSocketTransportClientSocketPool::RequestSocket(
|
| + const std::string& group_name,
|
| + const void* params,
|
| + RequestPriority priority,
|
| + ClientSocketHandle* handle,
|
| + const CompletionCallback& callback,
|
| + const BoundNetLog& request_net_log) {
|
| + DCHECK(params);
|
| + const scoped_refptr<TransportSocketParams>& casted_params =
|
| + *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
|
| +
|
| + NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params);
|
| +
|
| + CHECK(!callback.is_null());
|
| + CHECK(handle);
|
| +
|
| + request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
|
| +
|
| + if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
|
| + request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
|
| + is_stalled_ = true;
|
| + return ERR_IO_PENDING;
|
| + }
|
| +
|
| + scoped_ptr<WebSocketTransportConnectJob> connect_job(
|
| + new WebSocketTransportConnectJob(
|
| + group_name,
|
| + priority,
|
| + casted_params,
|
| + TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds),
|
| + callback,
|
| + client_socket_factory_,
|
| + host_resolver_,
|
| + handle,
|
| + &connect_job_delegate_,
|
| + pool_net_log_,
|
| + request_net_log));
|
| +
|
| + int rv = connect_job->Connect();
|
| + request_net_log.AddEvent(
|
| + NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
|
| + connect_job->net_log().source().ToEventParametersCallback());
|
| + if (rv == OK) {
|
| + HandOutSocket(connect_job->PassSocket(),
|
| + connect_job->connect_timing(),
|
| + handle,
|
| + request_net_log);
|
| + request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
|
| + } else if (rv == ERR_IO_PENDING) {
|
| + // TODO(ricea): Implement backup job timer?
|
| + AddJob(handle, connect_job.Pass());
|
| + } else {
|
| + scoped_ptr<StreamSocket> error_socket;
|
| + connect_job->GetAdditionalErrorState(handle);
|
| + error_socket = connect_job->PassSocket();
|
| + if (error_socket) {
|
| + HandOutSocket(error_socket.Pass(),
|
| + connect_job->connect_timing(),
|
| + handle,
|
| + request_net_log);
|
| + }
|
| + }
|
| +
|
| + if (rv != ERR_IO_PENDING) {
|
| + request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
|
| + }
|
| +
|
| + return rv;
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::RequestSockets(
|
| + const std::string& group_name,
|
| + const void* params,
|
| + int num_sockets,
|
| + const BoundNetLog& net_log) {
|
| + NOTIMPLEMENTED();
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::CancelRequest(
|
| + const std::string& group_name,
|
| + ClientSocketHandle* handle) {
|
| + CancelJob(handle);
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::ReleaseSocket(
|
| + const std::string& group_name,
|
| + scoped_ptr<StreamSocket> socket,
|
| + int id) {
|
| + WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
|
| + CHECK_GT(handed_out_socket_count_, 0);
|
| + --handed_out_socket_count_;
|
| + if (!ReachedMaxSocketsLimit())
|
| + is_stalled_ = false;
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::FlushWithError(int error) {
|
| + CancelAllConnectJobs();
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::CloseIdleSockets() {
|
| + // We have no idle sockets.
|
| +}
|
| +
|
| +int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; }
|
| +
|
| +int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
|
| + const std::string& group_name) const {
|
| + return 0;
|
| +}
|
| +
|
| +LoadState WebSocketTransportClientSocketPool::GetLoadState(
|
| + const std::string& group_name,
|
| + const ClientSocketHandle* handle) const {
|
| + if (pending_callbacks_.count(handle))
|
| + return LOAD_STATE_CONNECTING;
|
| + return LookupConnectJob(handle)->GetLoadState();
|
| +}
|
| +
|
| +base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue(
|
| + const std::string& name,
|
| + const std::string& type,
|
| + bool include_nested_pools) const {
|
| + base::DictionaryValue* dict = new base::DictionaryValue();
|
| + dict->SetString("name", name);
|
| + dict->SetString("type", type);
|
| + dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
|
| + dict->SetInteger("connecting_socket_count", pending_connects_.size());
|
| + dict->SetInteger("idle_socket_count", 0);
|
| + dict->SetInteger("max_socket_count", max_sockets_);
|
| + dict->SetInteger("max_sockets_per_group", max_sockets_);
|
| + dict->SetInteger("pool_generation_number", 0);
|
| + return dict;
|
| +}
|
| +
|
| +TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
|
| + return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
|
| +}
|
| +
|
| +ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms()
|
| + const {
|
| + return histograms_;
|
| +}
|
| +
|
| +bool WebSocketTransportClientSocketPool::IsStalled() const {
|
| + return is_stalled_;
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::OnConnectJobComplete(
|
| + int result,
|
| + WebSocketTransportConnectJob* job) {
|
| + DCHECK_NE(ERR_IO_PENDING, result);
|
| +
|
| + scoped_ptr<StreamSocket> socket = job->PassSocket();
|
| +
|
| + BoundNetLog request_net_log = job->request_net_log();
|
| + CompletionCallback callback = job->callback();
|
| + LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
|
| +
|
| + ClientSocketHandle* const handle = job->handle();
|
| +
|
| + if (result == OK) {
|
| + DCHECK(socket.get());
|
| + HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
|
| + request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
|
| + } else {
|
| + // If we got a socket, it must contain error information so pass that
|
| + // up so that the caller can retrieve it.
|
| + bool handed_out_socket = false;
|
| + job->GetAdditionalErrorState(handle);
|
| + if (socket.get()) {
|
| + handed_out_socket = true;
|
| + HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
|
| + }
|
| + request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
|
| +
|
| + if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit())
|
| + is_stalled_ = false;
|
| + }
|
| + bool delete_succeeded = DeleteJob(handle);
|
| + DCHECK(delete_succeeded);
|
| + InvokeUserCallbackLater(handle, callback, result);
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
|
| + ClientSocketHandle* handle,
|
| + const CompletionCallback& callback,
|
| + int rv) {
|
| + DCHECK(!pending_callbacks_.count(handle));
|
| + pending_callbacks_.insert(handle);
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
|
| + weak_factory_.GetWeakPtr(),
|
| + handle,
|
| + callback,
|
| + rv));
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::InvokeUserCallback(
|
| + ClientSocketHandle* handle,
|
| + const CompletionCallback& callback,
|
| + int rv) {
|
| + if (pending_callbacks_.erase(handle))
|
| + callback.Run(rv);
|
| +}
|
| +
|
| +bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
|
| + return base::checked_cast<int>(pending_connects_.size()) +
|
| + handed_out_socket_count_ >=
|
| + max_sockets_;
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::HandOutSocket(
|
| + scoped_ptr<StreamSocket> socket,
|
| + const LoadTimingInfo::ConnectTiming& connect_timing,
|
| + ClientSocketHandle* handle,
|
| + const BoundNetLog& net_log) {
|
| + DCHECK(socket);
|
| + handle->SetSocket(socket.Pass());
|
| + handle->set_reuse_type(ClientSocketHandle::UNUSED);
|
| + handle->set_idle_time(TimeDelta());
|
| + handle->set_pool_id(0);
|
| + handle->set_connect_timing(connect_timing);
|
| +
|
| + net_log.AddEvent(
|
| + NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
|
| + handle->socket()->NetLog().source().ToEventParametersCallback());
|
| +
|
| + ++handed_out_socket_count_;
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::AddJob(
|
| + ClientSocketHandle* handle,
|
| + scoped_ptr<WebSocketTransportConnectJob> connect_job) {
|
| + bool inserted =
|
| + pending_connects_.insert(PendingConnectsMap::value_type(
|
| + handle, connect_job.release())).second;
|
| + DCHECK(inserted);
|
| +}
|
| +
|
| +bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
|
| + PendingConnectsMap::iterator it = pending_connects_.find(handle);
|
| + if (it == pending_connects_.end())
|
| + return false;
|
| + delete it->second, it->second = NULL;
|
| + pending_connects_.erase(it);
|
| + return true;
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::CancelJob(ClientSocketHandle* handle) {
|
| + if (!DeleteJob(handle))
|
| + pending_callbacks_.erase(handle);
|
| +}
|
| +
|
| +void WebSocketTransportClientSocketPool::CancelAllConnectJobs() {
|
| + STLDeleteValues(&pending_connects_);
|
| +}
|
| +
|
| +const WebSocketTransportConnectJob*
|
| +WebSocketTransportClientSocketPool::LookupConnectJob(
|
| + const ClientSocketHandle* handle) const {
|
| + PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
|
| + CHECK(it != pending_connects_.end());
|
| + return it->second;
|
| +}
|
| +
|
| +WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
|
| + WebSocketTransportClientSocketPool* owner)
|
| + : owner_(owner) {}
|
| +
|
| +WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
|
| +
|
| +void
|
| +WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
|
| + int result,
|
| + ConnectJob* job) {
|
| + owner_->OnConnectJobComplete(result,
|
| + static_cast<WebSocketTransportConnectJob*>(job));
|
| +}
|
| +
|
| +} // namespace net
|
|
|