Index: net/socket/websocket_transport_client_socket_pool.cc |
diff --git a/net/socket/websocket_transport_client_socket_pool.cc b/net/socket/websocket_transport_client_socket_pool.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..03531eb5e3d72045a02423ffb20e8bf9058a1078 |
--- /dev/null |
+++ b/net/socket/websocket_transport_client_socket_pool.cc |
@@ -0,0 +1,827 @@ |
+// Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "net/socket/websocket_transport_client_socket_pool.h" |
+ |
+#include <algorithm> |
+#include <map> |
+#include <utility> |
+ |
+#include "base/compiler_specific.h" |
+#include "base/containers/linked_list.h" |
+#include "base/logging.h" |
+#include "base/memory/singleton.h" |
+#include "base/numerics/safe_conversions.h" |
+#include "base/stl_util.h" |
+#include "base/strings/string_util.h" |
+#include "base/time/time.h" |
+#include "base/values.h" |
+#include "net/base/ip_endpoint.h" |
+#include "net/base/net_errors.h" |
+#include "net/base/net_log.h" |
+#include "net/socket/client_socket_factory.h" |
+#include "net/socket/client_socket_handle.h" |
+#include "net/socket/client_socket_pool_base.h" |
+ |
+namespace net { |
+ |
+namespace { |
+ |
+using base::TimeDelta; |
+ |
+// TODO(ricea): For now, we implement a global timeout for compatability with |
+// TransportConnectJob. Since WebSocketTransportConnectJob controls the address |
+// selection process more tightly, it could do something smarter here. |
+const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes. |
+ |
+} // namespace |
+ |
+class WebSocketEndpointLockManager { |
+ public: |
+ typedef WebSocketTransportConnectJob::SubJob SubJob; |
+ static WebSocketEndpointLockManager* GetInstance(); |
+ |
+ // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the |
+ // lock was not acquired, then |job->GotEndpointLock()| will be called when it |
+ // is. |
+ int LockEndpoint(const IPEndPoint& endpoint, SubJob* job); |
+ |
+ // Record the IPEndPoint associated with a particular socket. This is |
+ // necessary because TCPClientSocket refuses to return the PeerAddress after |
+ // the connection is disconnected. The association will be forgotten when |
+ // UnlockSocket() is called. The |socket| pointer must not be deleted between |
+ // the call to RememberSocket(). |
+ void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint); |
+ |
+ // Release the lock on an endpoint, and, if appropriate, trigger the next |
+ // socket connection. It is permitted to call UnlockSocket() multiple times |
+ // for the same |socket|; all calls after the first will be ignored. |
+ void UnlockSocket(StreamSocket* socket); |
+ |
+ // Release the lock on |endpoint|. Most callers should use UnlockSocket() |
+ // instead. |
+ void UnlockEndpoint(const IPEndPoint& endpoint); |
+ |
+ private: |
+ typedef base::LinkedList<SubJob> ConnectJobQueue; |
+ typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap; |
+ typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap; |
+ |
+ WebSocketEndpointLockManager() {} |
+ ~WebSocketEndpointLockManager() { |
+ DCHECK(endpoint_job_map_.empty()); |
+ DCHECK(socket_endpoint_map_.empty()); |
+ } |
+ |
+ EndPointJobMap endpoint_job_map_; |
+ SocketEndPointMap socket_endpoint_map_; |
+ |
+ friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager); |
+}; |
+ |
+WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() { |
+ return Singleton<WebSocketEndpointLockManager>::get(); |
+} |
+ |
+class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> { |
+ public: |
+ SubJob(const AddressList& addresses, |
+ WebSocketTransportConnectJob* parent_job, |
+ SubJobType type) |
+ : addresses_(addresses), |
+ parent_job_(parent_job), |
+ next_state_(STATE_NONE), |
+ current_address_index_(0), |
+ type_(type) {} |
+ |
+ ~SubJob() { |
+ // We don't worry about cancelling the TCP connect, since ~StreamSocket will |
+ // take care of it. |
+ if (next()) { |
+ DCHECK(previous()); |
+ DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_); |
+ RemoveFromList(); |
+ } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) { |
+ DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size())); |
+ WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint( |
+ addresses_[current_address_index_]); |
+ } |
+ } |
+ |
+ // Start connecting. |
+ int Start() { |
+ DCHECK_EQ(STATE_NONE, next_state_); |
+ next_state_ = STATE_WAIT_FOR_LOCK; |
+ return DoLoop(OK); |
+ } |
+ |
+ bool started() { return next_state_ != STATE_NONE; } |
+ |
+ // Called by WebSocketEndpointLockManager when the lock becomes available. |
+ void GotEndpointLock() { OnIOComplete(OK); } |
+ |
+ LoadState GetLoadState() const { |
+ switch (next_state_) { |
+ case STATE_WAIT_FOR_LOCK: |
+ case STATE_WAIT_FOR_LOCK_COMPLETE: |
+ // TODO(ricea): Add a WebSocket-specific LOAD_STATE ? |
+ return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET; |
+ case STATE_TRANSPORT_CONNECT: |
+ case STATE_TRANSPORT_CONNECT_COMPLETE: |
+ return LOAD_STATE_CONNECTING; |
+ case STATE_NONE: |
+ return LOAD_STATE_IDLE; |
+ } |
+ NOTREACHED(); |
+ return LOAD_STATE_IDLE; |
+ } |
+ |
+ SubJobType type() const { return type_; } |
+ |
+ scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); } |
+ |
+ private: |
+ enum State { |
+ STATE_NONE, |
+ STATE_WAIT_FOR_LOCK, |
+ STATE_WAIT_FOR_LOCK_COMPLETE, |
+ STATE_TRANSPORT_CONNECT, |
+ STATE_TRANSPORT_CONNECT_COMPLETE, |
+ }; |
+ |
+ ClientSocketFactory* client_socket_factory() const { |
+ return parent_job_->data_.client_socket_factory(); |
+ } |
+ |
+ const BoundNetLog& net_log() const { return parent_job_->net_log(); } |
+ |
+ void OnIOComplete(int result) { |
+ int rv = DoLoop(result); |
+ if (rv != ERR_IO_PENDING) |
+ parent_job_->OnSubJobComplete(rv, this); // |this| deleted |
+ } |
+ |
+ int DoLoop(int result) { |
+ DCHECK_NE(next_state_, STATE_NONE); |
+ |
+ int rv = result; |
+ do { |
+ State state = next_state_; |
+ next_state_ = STATE_NONE; |
+ switch (state) { |
+ case STATE_WAIT_FOR_LOCK: |
+ rv = DoEndpointLock(); |
+ break; |
+ case STATE_WAIT_FOR_LOCK_COMPLETE: |
+ rv = DoEndpointLockComplete(); |
+ break; |
+ case STATE_TRANSPORT_CONNECT: |
+ DCHECK_EQ(OK, rv); |
+ rv = DoTransportConnect(); |
+ break; |
+ case STATE_TRANSPORT_CONNECT_COMPLETE: |
+ rv = DoTransportConnectComplete(rv); |
+ break; |
+ default: |
+ NOTREACHED(); |
+ rv = ERR_FAILED; |
+ break; |
+ } |
+ } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE); |
+ |
+ return rv; |
+ } |
+ |
+ int DoEndpointLock() { |
+ DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size())); |
+ int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint( |
+ addresses_[current_address_index_], this); |
+ next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE; |
+ return rv; |
+ } |
+ |
+ int DoEndpointLockComplete() { |
+ next_state_ = STATE_TRANSPORT_CONNECT; |
+ return OK; |
+ } |
+ |
+ int DoTransportConnect() { |
+ next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE; |
+ DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size())); |
+ AddressList one_address(addresses_[current_address_index_]); |
+ transport_socket_ = client_socket_factory()->CreateTransportClientSocket( |
+ one_address, net_log().net_log(), net_log().source()); |
+ // This use of base::Unretained() is safe because transport_socket_ is |
+ // destroyed in the destructor. |
+ return transport_socket_->Connect( |
+ base::Bind(&SubJob::OnIOComplete, base::Unretained(this))); |
+ } |
+ |
+ int DoTransportConnectComplete(int result) { |
+ WebSocketEndpointLockManager* endpoint_lock_manager = |
+ WebSocketEndpointLockManager::GetInstance(); |
+ if (result != OK) { |
+ endpoint_lock_manager->UnlockEndpoint(addresses_[current_address_index_]); |
+ |
+ if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) { |
+ // Try falling back to the next address in the list. |
+ next_state_ = STATE_WAIT_FOR_LOCK; |
+ ++current_address_index_; |
+ result = OK; |
+ } |
+ |
+ return result; |
+ } |
+ |
+ endpoint_lock_manager->RememberSocket(transport_socket_.get(), |
+ addresses_[current_address_index_]); |
+ |
+ return result; |
+ } |
+ |
+ const AddressList addresses_; |
+ WebSocketTransportConnectJob* const parent_job_; |
+ |
+ State next_state_; |
+ int current_address_index_; |
+ const SubJobType type_; |
+ |
+ scoped_ptr<StreamSocket> transport_socket_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(SubJob); |
+}; |
+ |
+int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint, |
+ SubJob* job) { |
+ EndPointJobMap::value_type insert_value(endpoint, NULL); |
+ std::pair<EndPointJobMap::iterator, bool> rv = |
+ endpoint_job_map_.insert(insert_value); |
+ if (rv.second) { |
+ DVLOG(3) << "Locking endpoint " << endpoint.ToString(); |
+ rv.first->second = new ConnectJobQueue; |
+ return OK; |
+ } |
+ DVLOG(3) << "Waiting for endpoint " << endpoint.ToString(); |
+ rv.first->second->Append(job); |
+ return ERR_IO_PENDING; |
+} |
+ |
+void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket, |
+ const IPEndPoint& endpoint) { |
+ bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type( |
+ socket, endpoint)).second; |
+ DCHECK(inserted); |
+ DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end()); |
+ DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for " |
+ << endpoint.ToString() << " (" << socket_endpoint_map_.size() |
+ << " sockets remembered)"; |
+} |
+ |
+void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) { |
+ SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket); |
+ if (socket_it == socket_endpoint_map_.end()) { |
+ DVLOG(3) << "Ignoring request to unlock already-unlocked socket" |
+ "(StreamSocket*)" << socket; |
+ return; |
+ } |
+ const IPEndPoint& endpoint = socket_it->second; |
+ DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for " |
+ << endpoint.ToString() << " (" << socket_endpoint_map_.size() |
+ << " sockets left)"; |
+ UnlockEndpoint(endpoint); |
+ socket_endpoint_map_.erase(socket_it); |
+} |
+ |
+void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) { |
+ EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint); |
+ CHECK(found_it != endpoint_job_map_.end()); // Security critical |
+ ConnectJobQueue* queue = found_it->second; |
+ if (queue->empty()) { |
+ DVLOG(3) << "Unlocking endpoint " << endpoint.ToString(); |
+ delete queue; |
+ endpoint_job_map_.erase(found_it); |
+ } else { |
+ DVLOG(3) << "Unlocking endpoint " << endpoint.ToString() |
+ << " and activating next waiter"; |
+ SubJob* next_job = queue->head()->value(); |
+ next_job->RemoveFromList(); |
+ next_job->GotEndpointLock(); |
+ } |
+} |
+ |
+WebSocketTransportConnectJob::WebSocketTransportConnectJob( |
+ const std::string& group_name, |
+ RequestPriority priority, |
+ const scoped_refptr<TransportSocketParams>& params, |
+ TimeDelta timeout_duration, |
+ const CompletionCallback& callback, |
+ ClientSocketFactory* client_socket_factory, |
+ HostResolver* host_resolver, |
+ ClientSocketHandle* handle, |
+ Delegate* delegate, |
+ NetLog* pool_net_log, |
+ const BoundNetLog& request_net_log) |
+ : ConnectJob(group_name, |
+ timeout_duration, |
+ priority, |
+ delegate, |
+ BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)), |
+ data_(params, client_socket_factory, host_resolver, &connect_timing_), |
+ race_result_(TransportConnectJobCommon::CONNECTION_LATENCY_UNKNOWN), |
+ handle_(handle), |
+ callback_(callback), |
+ request_net_log_(request_net_log), |
+ had_ipv4_(false), |
+ had_ipv6_(false) { |
+ data_.SetOnIOComplete(this); |
+} |
+ |
+WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {} |
+ |
+LoadState WebSocketTransportConnectJob::GetLoadState() const { |
+ LoadState load_state = LOAD_STATE_RESOLVING_HOST; |
+ if (ipv6_job_) |
+ load_state = ipv6_job_->GetLoadState(); |
+ // LOAD_STATE_CONNECTING is better than |
+ // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET. |
+ if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING) |
+ load_state = ipv4_job_->GetLoadState(); |
+ return load_state; |
+} |
+ |
+int WebSocketTransportConnectJob::DoResolveHost() { |
+ return data_.DoResolveHost(priority(), net_log()); |
+} |
+ |
+int WebSocketTransportConnectJob::DoResolveHostComplete(int result) { |
+ return data_.DoResolveHostComplete(result, net_log()); |
+} |
+ |
+int WebSocketTransportConnectJob::DoTransportConnect() { |
+ AddressList ipv4_addresses; |
+ AddressList ipv6_addresses; |
+ int result = ERR_UNEXPECTED; |
+ data_.set_next_state( |
+ TransportConnectJobCommon::STATE_TRANSPORT_CONNECT_COMPLETE); |
+ |
+ for (AddressList::const_iterator it = data_.addresses().begin(); |
+ it != data_.addresses().end(); |
+ ++it) { |
+ switch (it->GetFamily()) { |
+ case ADDRESS_FAMILY_IPV4: |
+ ipv4_addresses.push_back(*it); |
+ break; |
+ |
+ case ADDRESS_FAMILY_IPV6: |
+ ipv6_addresses.push_back(*it); |
+ break; |
+ |
+ default: |
+ DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily(); |
+ break; |
+ } |
+ } |
+ |
+ if (!ipv4_addresses.empty()) { |
+ had_ipv4_ = true; |
+ ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4)); |
+ } |
+ |
+ if (!ipv6_addresses.empty()) { |
+ had_ipv6_ = true; |
+ ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6)); |
+ result = ipv6_job_->Start(); |
+ if (result == OK) { |
+ SetSocket(ipv6_job_->PassSocket()); |
+ race_result_ = |
+ had_ipv4_ |
+ ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_RACEABLE |
+ : TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_SOLO; |
+ return result; |
+ } |
+ if (result == ERR_IO_PENDING && ipv4_job_) { |
+ // This use of base::Unretained is safe because fallback_timer_ is owned |
+ // by this object. |
+ fallback_timer_.Start( |
+ FROM_HERE, |
+ TimeDelta::FromMilliseconds( |
+ TransportConnectJobCommon::kIPv6FallbackTimerInMs), |
+ base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync, |
+ base::Unretained(this))); |
+ } |
+ if (result != ERR_IO_PENDING) |
+ ipv6_job_.reset(); |
+ } |
+ |
+ if (ipv4_job_ && !ipv6_job_) { |
+ result = ipv4_job_->Start(); |
+ if (result == OK) { |
+ SetSocket(ipv4_job_->PassSocket()); |
+ race_result_ = |
+ had_ipv6_ |
+ ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_WINS_RACE |
+ : TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_NO_RACE; |
+ } |
+ } |
+ |
+ return result; |
+} |
+ |
+int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) { |
+ if (result == OK) |
+ data_.HistogramDuration(race_result_); |
+ return result; |
+} |
+ |
+void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) { |
+ if (result == OK) { |
+ switch (job->type()) { |
+ case SUB_JOB_IPV4: |
+ race_result_ = |
+ had_ipv6_ |
+ ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_WINS_RACE |
+ : TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_NO_RACE; |
+ break; |
+ |
+ case SUB_JOB_IPV6: |
+ race_result_ = |
+ had_ipv4_ |
+ ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_RACEABLE |
+ : TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_SOLO; |
+ break; |
+ } |
+ SetSocket(job->PassSocket()); |
+ |
+ // Make sure all connections are cancelled even if this object fails to be |
+ // deleted. |
+ ipv4_job_.reset(); |
+ ipv6_job_.reset(); |
+ } else { |
+ switch (job->type()) { |
+ case SUB_JOB_IPV4: |
+ ipv4_job_.reset(); |
+ break; |
+ |
+ case SUB_JOB_IPV6: |
+ ipv6_job_.reset(); |
+ if (ipv4_job_ && !ipv4_job_->started()) { |
+ fallback_timer_.Stop(); |
+ result = ipv4_job_->Start(); |
+ if (result != ERR_IO_PENDING) { |
+ OnSubJobComplete(result, ipv4_job_.get()); |
+ return; |
+ } |
+ } |
+ break; |
+ } |
+ if (ipv4_job_ || ipv6_job_) |
+ return; |
+ } |
+ data_.OnIOComplete(this, result); |
+} |
+ |
+void WebSocketTransportConnectJob::StartIPv4JobAsync() { |
+ DCHECK(ipv4_job_); |
+ int result = ipv4_job_->Start(); |
+ if (result != ERR_IO_PENDING) |
+ OnSubJobComplete(result, ipv4_job_.get()); |
+} |
+ |
+int WebSocketTransportConnectJob::ConnectInternal() { |
+ return data_.DoConnectInternal(this); |
+} |
+ |
+WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool( |
+ int max_sockets, |
+ int max_sockets_per_group, |
+ ClientSocketPoolHistograms* histograms, |
+ HostResolver* host_resolver, |
+ ClientSocketFactory* client_socket_factory, |
+ NetLog* net_log) |
+ : TransportClientSocketPool(max_sockets, |
+ max_sockets_per_group, |
+ histograms, |
+ host_resolver, |
+ client_socket_factory, |
+ net_log), |
+ connect_job_delegate_(this), |
+ histograms_(histograms), |
+ pool_net_log_(net_log), |
+ client_socket_factory_(client_socket_factory), |
+ host_resolver_(host_resolver), |
+ max_sockets_(max_sockets), |
+ handed_out_socket_count_(0), |
+ is_stalled_(false), |
+ weak_factory_(this) {} |
+ |
+WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() { |
+ DCHECK(pending_connects_.empty()); |
+ DCHECK_EQ(0, handed_out_socket_count_); |
+ DCHECK(!is_stalled_); |
+} |
+ |
+// static |
+void WebSocketTransportClientSocketPool::UnlockEndpoint( |
+ ClientSocketHandle* handle) { |
+ DCHECK(handle->is_initialized()); |
+ WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket()); |
+} |
+ |
+int WebSocketTransportClientSocketPool::RequestSocket( |
+ const std::string& group_name, |
+ const void* params, |
+ RequestPriority priority, |
+ ClientSocketHandle* handle, |
+ const CompletionCallback& callback, |
+ const BoundNetLog& request_net_log) { |
+ DCHECK(params); |
+ const scoped_refptr<TransportSocketParams>& casted_params = |
+ *static_cast<const scoped_refptr<TransportSocketParams>*>(params); |
+ |
+ NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params); |
+ |
+ CHECK(!callback.is_null()); |
+ CHECK(handle); |
+ |
+ request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL); |
+ |
+ if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) { |
+ request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS); |
+ is_stalled_ = true; |
+ return ERR_IO_PENDING; |
+ } |
+ |
+ scoped_ptr<WebSocketTransportConnectJob> connect_job( |
+ new WebSocketTransportConnectJob( |
+ group_name, |
+ priority, |
+ casted_params, |
+ TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds), |
+ callback, |
+ client_socket_factory_, |
+ host_resolver_, |
+ handle, |
+ &connect_job_delegate_, |
+ pool_net_log_, |
+ request_net_log)); |
+ |
+ int rv = connect_job->Connect(); |
+ request_net_log.AddEvent( |
+ NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB, |
+ connect_job->net_log().source().ToEventParametersCallback()); |
tyoshino (SeeGerritForStatus)
2014/06/11 07:55:11
should we log this even when rv == ERR_IO_PENDING?
Adam Rice
2014/06/11 08:17:09
Because the WebSocketTransportClientSocketPool use
|
+ if (rv == OK) { |
+ HandOutSocket(connect_job->PassSocket(), |
+ connect_job->connect_timing(), |
+ handle, |
+ request_net_log); |
+ request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL); |
+ } else if (rv == ERR_IO_PENDING) { |
+ // TODO(ricea): Implement backup job timer? |
+ AddJob(handle, connect_job.Pass()); |
+ } else { |
+ scoped_ptr<StreamSocket> error_socket; |
+ connect_job->GetAdditionalErrorState(handle); |
+ error_socket = connect_job->PassSocket(); |
+ if (error_socket) { |
+ HandOutSocket(error_socket.Pass(), |
+ connect_job->connect_timing(), |
+ handle, |
+ request_net_log); |
+ } |
+ } |
+ |
+ if (rv != ERR_IO_PENDING) { |
+ request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv); |
+ } |
+ |
+ return rv; |
+} |
+ |
+void WebSocketTransportClientSocketPool::RequestSockets( |
+ const std::string& group_name, |
+ const void* params, |
+ int num_sockets, |
+ const BoundNetLog& net_log) { |
+ NOTIMPLEMENTED(); |
+} |
+ |
+void WebSocketTransportClientSocketPool::CancelRequest( |
+ const std::string& group_name, |
+ ClientSocketHandle* handle) { |
+ CancelJob(handle); |
+} |
+ |
+void WebSocketTransportClientSocketPool::ReleaseSocket( |
+ const std::string& group_name, |
+ scoped_ptr<StreamSocket> socket, |
+ int id) { |
+ WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get()); |
+ CHECK_GT(handed_out_socket_count_, 0); |
+ --handed_out_socket_count_; |
+ if (!ReachedMaxSocketsLimit()) |
+ is_stalled_ = false; |
+} |
+ |
+void WebSocketTransportClientSocketPool::FlushWithError(int error) { |
+ CancelAllConnectJobs(); |
+} |
+ |
+void WebSocketTransportClientSocketPool::CloseIdleSockets() { |
+ // We have no idle sockets. |
+} |
+ |
+int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; } |
+ |
+int WebSocketTransportClientSocketPool::IdleSocketCountInGroup( |
+ const std::string& group_name) const { |
+ return 0; |
+} |
+ |
+LoadState WebSocketTransportClientSocketPool::GetLoadState( |
+ const std::string& group_name, |
+ const ClientSocketHandle* handle) const { |
+ if (pending_callbacks_.count(handle)) |
+ return LOAD_STATE_CONNECTING; |
+ return LookupConnectJob(handle)->GetLoadState(); |
+} |
+ |
+base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue( |
+ const std::string& name, |
+ const std::string& type, |
+ bool include_nested_pools) const { |
+ base::DictionaryValue* dict = new base::DictionaryValue(); |
+ dict->SetString("name", name); |
+ dict->SetString("type", type); |
+ dict->SetInteger("handed_out_socket_count", handed_out_socket_count_); |
+ dict->SetInteger("connecting_socket_count", pending_connects_.size()); |
+ dict->SetInteger("idle_socket_count", 0); |
+ dict->SetInteger("max_socket_count", max_sockets_); |
+ dict->SetInteger("max_sockets_per_group", max_sockets_); |
+ dict->SetInteger("pool_generation_number", 0); |
+ return dict; |
+} |
+ |
+TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const { |
+ return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds); |
+} |
+ |
+ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms() |
+ const { |
+ return histograms_; |
+} |
+ |
+bool WebSocketTransportClientSocketPool::IsStalled() const { |
+ return is_stalled_; |
+} |
+ |
+void WebSocketTransportClientSocketPool::AddHigherLayeredPool( |
tyoshino (SeeGerritForStatus)
2014/06/11 06:55:04
no need to override?
Adam Rice
2014/06/11 08:17:09
It seemed a little risky to me. The base class imp
tyoshino (SeeGerritForStatus)
2014/06/11 08:22:22
Yeah, right, like IsStalled.
|
+ HigherLayeredPool* higher_pool) { |
+ CHECK(higher_pool); |
+ CHECK(!ContainsKey(higher_pools_, higher_pool)); |
+ higher_pools_.insert(higher_pool); |
+} |
+ |
+void WebSocketTransportClientSocketPool::RemoveHigherLayeredPool( |
tyoshino (SeeGerritForStatus)
2014/06/11 06:55:04
no need to override?
Adam Rice
2014/06/11 08:17:09
Removed.
|
+ HigherLayeredPool* higher_pool) { |
+ CHECK(higher_pool); |
+ CHECK(ContainsKey(higher_pools_, higher_pool)); |
+ higher_pools_.erase(higher_pool); |
+} |
+ |
+void WebSocketTransportClientSocketPool::OnConnectJobComplete( |
+ int result, |
+ WebSocketTransportConnectJob* job) { |
+ DCHECK_NE(ERR_IO_PENDING, result); |
+ |
+ scoped_ptr<StreamSocket> socket = job->PassSocket(); |
+ |
+ BoundNetLog request_net_log = job->request_net_log(); |
+ CompletionCallback callback = job->callback(); |
+ LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing(); |
+ |
+ ClientSocketHandle* const handle = job->handle(); |
+ |
+ if (result == OK) { |
+ DCHECK(socket.get()); |
+ HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log); |
+ request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL); |
+ } else { |
+ // If we got a socket, it must contain error information so pass that |
+ // up so that the caller can retrieve it. |
+ bool handed_out_socket = false; |
+ job->GetAdditionalErrorState(handle); |
+ if (socket.get()) { |
+ handed_out_socket = true; |
+ HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log); |
+ } |
+ request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result); |
+ |
+ if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit()) |
+ is_stalled_ = false; |
+ } |
+ bool delete_succeeded = DeleteJob(handle); |
+ DCHECK(delete_succeeded); |
+ InvokeUserCallbackLater(handle, callback, result); |
+} |
+ |
+void WebSocketTransportClientSocketPool::InvokeUserCallbackLater( |
+ ClientSocketHandle* handle, |
+ const CompletionCallback& callback, |
+ int rv) { |
+ DCHECK(!pending_callbacks_.count(handle)); |
+ pending_callbacks_.insert(handle); |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback, |
+ weak_factory_.GetWeakPtr(), |
+ handle, |
+ callback, |
+ rv)); |
+} |
+ |
+void WebSocketTransportClientSocketPool::InvokeUserCallback( |
+ ClientSocketHandle* handle, |
+ const CompletionCallback& callback, |
+ int rv) { |
+ if (pending_callbacks_.erase(handle)) |
+ callback.Run(rv); |
+} |
+ |
+bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const { |
+ return base::checked_cast<int>(pending_connects_.size()) + |
+ handed_out_socket_count_ >= |
+ max_sockets_; |
+} |
+ |
+void WebSocketTransportClientSocketPool::HandOutSocket( |
+ scoped_ptr<StreamSocket> socket, |
+ const LoadTimingInfo::ConnectTiming& connect_timing, |
+ ClientSocketHandle* handle, |
+ const BoundNetLog& net_log) { |
+ DCHECK(socket); |
+ handle->SetSocket(socket.Pass()); |
+ handle->set_reuse_type(ClientSocketHandle::UNUSED); |
+ handle->set_idle_time(TimeDelta()); |
+ handle->set_pool_id(0); |
+ handle->set_connect_timing(connect_timing); |
+ |
+ net_log.AddEvent( |
+ NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET, |
+ handle->socket()->NetLog().source().ToEventParametersCallback()); |
+ |
+ ++handed_out_socket_count_; |
+} |
+ |
+void WebSocketTransportClientSocketPool::AddJob( |
+ ClientSocketHandle* handle, |
+ scoped_ptr<WebSocketTransportConnectJob> connect_job) { |
+ bool inserted = |
+ pending_connects_.insert(PendingConnectsMap::value_type( |
+ handle, connect_job.release())).second; |
+ DCHECK(inserted); |
+} |
+ |
+bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) { |
+ PendingConnectsMap::iterator it = pending_connects_.find(handle); |
+ if (it == pending_connects_.end()) |
+ return false; |
+ delete it->second, it->second = NULL; |
+ pending_connects_.erase(it); |
+ return true; |
+} |
+ |
+void WebSocketTransportClientSocketPool::CancelJob(ClientSocketHandle* handle) { |
tyoshino (SeeGerritForStatus)
2014/06/11 07:55:11
merge to CancelRequest?
Adam Rice
2014/06/11 08:17:09
Done.
|
+ if (!DeleteJob(handle)) |
+ pending_callbacks_.erase(handle); |
+} |
+ |
+void WebSocketTransportClientSocketPool::CancelAllConnectJobs() { |
+ STLDeleteValues(&pending_connects_); |
+} |
+ |
+const WebSocketTransportConnectJob* |
+WebSocketTransportClientSocketPool::LookupConnectJob( |
+ const ClientSocketHandle* handle) const { |
+ PendingConnectsMap::const_iterator it = pending_connects_.find(handle); |
+ CHECK(it != pending_connects_.end()); |
+ return it->second; |
+} |
+ |
+WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate( |
+ WebSocketTransportClientSocketPool* owner) |
+ : owner_(owner) {} |
+ |
+WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {} |
+ |
+void |
+WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete( |
+ int result, |
+ ConnectJob* job) { |
+ owner_->OnConnectJobComplete(result, |
+ static_cast<WebSocketTransportConnectJob*>(job)); |
+} |
+ |
+} // namespace net |