Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(245)

Unified Diff: net/socket/websocket_transport_client_socket_pool.cc

Issue 240873003: Create WebSocketTransportClientSocketPool (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Factor out CurrentAddress() method in SubJob class. Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/socket/websocket_transport_client_socket_pool.cc
diff --git a/net/socket/websocket_transport_client_socket_pool.cc b/net/socket/websocket_transport_client_socket_pool.cc
new file mode 100644
index 0000000000000000000000000000000000000000..c01038b4badf626fc7b842ce0b6597737496a3d3
--- /dev/null
+++ b/net/socket/websocket_transport_client_socket_pool.cc
@@ -0,0 +1,824 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/socket/websocket_transport_client_socket_pool.h"
+
+#include <algorithm>
+#include <limits>
+#include <map>
+#include <utility>
+
+#include "base/compiler_specific.h"
+#include "base/containers/linked_list.h"
+#include "base/logging.h"
+#include "base/memory/singleton.h"
+#include "base/numerics/safe_conversions.h"
+#include "base/stl_util.h"
+#include "base/strings/string_util.h"
+#include "base/time/time.h"
+#include "base/values.h"
+#include "net/base/ip_endpoint.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_log.h"
+#include "net/socket/client_socket_factory.h"
+#include "net/socket/client_socket_handle.h"
+#include "net/socket/client_socket_pool_base.h"
+
+namespace net {
+
+namespace {
+
+using base::TimeDelta;
+
+// TODO(ricea): For now, we implement a global timeout for compatability with
+// TransportConnectJob. Since WebSocketTransportConnectJob controls the address
+// selection process more tightly, it could do something smarter here.
+const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes.
+
+} // namespace
+
+class WebSocketEndpointLockManager {
Johnny 2014/06/16 23:41:11 Should this be in it's own .h & .cc?
Adam Rice 2014/06/19 13:55:20 Done.
+ public:
+ typedef WebSocketTransportConnectJob::SubJob SubJob;
+ static WebSocketEndpointLockManager* GetInstance();
+
+ // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the
+ // lock was not acquired, then |job->GotEndpointLock()| will be called when it
+ // is.
+ int LockEndpoint(const IPEndPoint& endpoint, SubJob* job);
+
+ // Record the IPEndPoint associated with a particular socket. This is
+ // necessary because TCPClientSocket refuses to return the PeerAddress after
+ // the connection is disconnected. The association will be forgotten when
+ // UnlockSocket() is called. The |socket| pointer must not be deleted between
+ // the call to RememberSocket() and the call to UnlockSocket().
+ void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint);
+
+ // Release the lock on an endpoint, and, if appropriate, trigger the next
+ // socket connection. It is permitted to call UnlockSocket() multiple times
+ // for the same |socket|; all calls after the first will be ignored.
Johnny 2014/06/16 23:41:11 Is this expected to happen? If not, why not be mor
Adam Rice 2014/06/19 13:55:20 I clarified the comment. In the case of a successf
+ void UnlockSocket(StreamSocket* socket);
+
+ // Release the lock on |endpoint|. Most callers should use UnlockSocket()
+ // instead.
+ void UnlockEndpoint(const IPEndPoint& endpoint);
+
+ private:
+ typedef base::LinkedList<SubJob> ConnectJobQueue;
+ typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap;
+ typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap;
+
+ WebSocketEndpointLockManager() {}
+ ~WebSocketEndpointLockManager() {
+ DCHECK(endpoint_job_map_.empty());
+ DCHECK(socket_endpoint_map_.empty());
+ }
+
+ EndPointJobMap endpoint_job_map_;
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 please explain what an endpoint_job_map_ entry wit
Adam Rice 2014/06/19 13:55:20 Done.
+ SocketEndPointMap socket_endpoint_map_;
+
+ friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>;
+
+ DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager);
+};
+
+WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() {
+ return Singleton<WebSocketEndpointLockManager>::get();
+}
+
+class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> {
+ public:
+ SubJob(const AddressList& addresses,
+ WebSocketTransportConnectJob* parent_job,
+ SubJobType type)
+ : parent_job_(parent_job),
+ addresses_(addresses),
+ current_address_index_(0),
+ next_state_(STATE_NONE),
+ type_(type) {
+ CHECK_LE(addresses_.size(),
Johnny 2014/06/16 23:41:11 This is a pretty loose CHECK. I'm not sure what it
Adam Rice 2014/06/19 13:55:20 current_address_index_ was an int in imitation of
+ static_cast<size_t>(std::numeric_limits<int>::max()));
+ }
+
+ ~SubJob() {
+ // We don't worry about cancelling the TCP connect, since ~StreamSocket will
+ // take care of it.
+ if (next()) {
+ DCHECK(previous());
+ DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
+ RemoveFromList();
+ } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) {
+ WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 don't we call UnlockEndpoint() twice when DoTransp
Adam Rice 2014/06/19 13:55:20 No, because next_state_ is set to STATE_NONE in Do
+ CurrentAddress());
+ }
+ }
+
+ // Start connecting.
+ int Start() {
+ DCHECK_EQ(STATE_NONE, next_state_);
+ next_state_ = STATE_WAIT_FOR_LOCK;
+ return DoLoop(OK);
+ }
+
+ bool started() { return next_state_ != STATE_NONE; }
+
+ // Called by WebSocketEndpointLockManager when the lock becomes available.
+ void GotEndpointLock() {
+ DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
+ OnIOComplete(OK);
+ }
+
+ LoadState GetLoadState() const {
+ switch (next_state_) {
+ case STATE_WAIT_FOR_LOCK:
+ case STATE_WAIT_FOR_LOCK_COMPLETE:
+ // TODO(ricea): Add a WebSocket-specific LOAD_STATE ?
+ return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
+ case STATE_TRANSPORT_CONNECT:
+ case STATE_TRANSPORT_CONNECT_COMPLETE:
+ return LOAD_STATE_CONNECTING;
+ case STATE_NONE:
+ return LOAD_STATE_IDLE;
+ }
+ NOTREACHED();
+ return LOAD_STATE_IDLE;
+ }
+
+ SubJobType type() const { return type_; }
+
+ scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); }
+
+ private:
+ enum State {
+ STATE_NONE,
+ STATE_WAIT_FOR_LOCK,
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 [optional] wondering if s/WAIT/OBTAIN/ sounds bett
Adam Rice 2014/06/19 13:55:20 Sounds good to me. Most of the time we don't actua
+ STATE_WAIT_FOR_LOCK_COMPLETE,
+ STATE_TRANSPORT_CONNECT,
+ STATE_TRANSPORT_CONNECT_COMPLETE,
+ };
+
+ ClientSocketFactory* client_socket_factory() const {
+ return parent_job_->data_.client_socket_factory();
+ }
+
+ const BoundNetLog& net_log() const { return parent_job_->net_log(); }
+
+ const IPEndPoint& CurrentAddress() const {
+ DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
+ return addresses_[current_address_index_];
+ }
+
+ void OnIOComplete(int result) {
+ int rv = DoLoop(result);
+ if (rv != ERR_IO_PENDING)
+ parent_job_->OnSubJobComplete(rv, this); // |this| deleted
+ }
+
+ int DoLoop(int result) {
+ DCHECK_NE(next_state_, STATE_NONE);
+
+ int rv = result;
+ do {
+ State state = next_state_;
+ next_state_ = STATE_NONE;
+ switch (state) {
+ case STATE_WAIT_FOR_LOCK:
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 DCHECK_EQ(OK, rv)
Adam Rice 2014/06/19 13:55:20 Done.
+ rv = DoEndpointLock();
+ break;
+ case STATE_WAIT_FOR_LOCK_COMPLETE:
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 DCHECK_EQ(OK, rv)
Adam Rice 2014/06/19 13:55:20 Done.
+ rv = DoEndpointLockComplete();
+ break;
+ case STATE_TRANSPORT_CONNECT:
+ DCHECK_EQ(OK, rv);
+ rv = DoTransportConnect();
+ break;
+ case STATE_TRANSPORT_CONNECT_COMPLETE:
+ rv = DoTransportConnectComplete(rv);
+ break;
+ default:
+ NOTREACHED();
+ rv = ERR_FAILED;
+ break;
+ }
+ } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
+
+ return rv;
+ }
+
+ int DoEndpointLock() {
+ int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint(
+ CurrentAddress(), this);
+ next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE;
+ return rv;
+ }
+
+ int DoEndpointLockComplete() {
+ next_state_ = STATE_TRANSPORT_CONNECT;
+ return OK;
+ }
+
+ int DoTransportConnect() {
+ // TODO(ricea): Update global g_last_connect_time and report
+ // ConnectInterval.
+ next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
+ AddressList one_address(CurrentAddress());
+ transport_socket_ = client_socket_factory()->CreateTransportClientSocket(
+ one_address, net_log().net_log(), net_log().source());
+ // This use of base::Unretained() is safe because transport_socket_ is
+ // destroyed in the destructor.
+ return transport_socket_->Connect(
+ base::Bind(&SubJob::OnIOComplete, base::Unretained(this)));
+ }
+
+ int DoTransportConnectComplete(int result) {
+ WebSocketEndpointLockManager* endpoint_lock_manager =
+ WebSocketEndpointLockManager::GetInstance();
+ if (result != OK) {
+ endpoint_lock_manager->UnlockEndpoint(CurrentAddress());
+
+ if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) {
+ // Try falling back to the next address in the list.
+ next_state_ = STATE_WAIT_FOR_LOCK;
+ ++current_address_index_;
+ result = OK;
+ }
+
+ return result;
+ }
+
+ endpoint_lock_manager->RememberSocket(transport_socket_.get(),
+ CurrentAddress());
+
+ return result;
Johnny 2014/06/16 23:41:11 Sanity check: Looks like WebSocketBasicHandshakeSt
Adam Rice 2014/06/19 13:55:20 Yes.
+ }
+
+ WebSocketTransportConnectJob* const parent_job_;
+
+ const AddressList addresses_;
+ int current_address_index_;
+
+ State next_state_;
+ const SubJobType type_;
+
+ scoped_ptr<StreamSocket> transport_socket_;
+
+ DISALLOW_COPY_AND_ASSIGN(SubJob);
+};
+
+int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint,
+ SubJob* job) {
+ EndPointJobMap::value_type insert_value(endpoint, NULL);
+ std::pair<EndPointJobMap::iterator, bool> rv =
+ endpoint_job_map_.insert(insert_value);
+ if (rv.second) {
+ DVLOG(3) << "Locking endpoint " << endpoint.ToString();
+ rv.first->second = new ConnectJobQueue;
+ return OK;
+ }
+ DVLOG(3) << "Waiting for endpoint " << endpoint.ToString();
+ rv.first->second->Append(job);
+ return ERR_IO_PENDING;
+}
+
+void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket,
+ const IPEndPoint& endpoint) {
+ bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type(
+ socket, endpoint)).second;
+ DCHECK(inserted);
+ DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end());
+ DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for "
+ << endpoint.ToString() << " (" << socket_endpoint_map_.size()
+ << " sockets remembered)";
+}
+
+void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) {
+ SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket);
+ if (socket_it == socket_endpoint_map_.end()) {
+ DVLOG(3) << "Ignoring request to unlock already-unlocked socket"
+ "(StreamSocket*)" << socket;
+ return;
+ }
+ const IPEndPoint& endpoint = socket_it->second;
+ DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for "
+ << endpoint.ToString() << " (" << socket_endpoint_map_.size()
+ << " sockets left)";
+ UnlockEndpoint(endpoint);
+ socket_endpoint_map_.erase(socket_it);
+}
+
+void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) {
+ EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint);
+ CHECK(found_it != endpoint_job_map_.end()); // Security critical
+ ConnectJobQueue* queue = found_it->second;
+ if (queue->empty()) {
+ DVLOG(3) << "Unlocking endpoint " << endpoint.ToString();
+ delete queue;
+ endpoint_job_map_.erase(found_it);
+ } else {
+ DVLOG(3) << "Unlocking endpoint " << endpoint.ToString()
+ << " and activating next waiter";
+ SubJob* next_job = queue->head()->value();
+ next_job->RemoveFromList();
+ next_job->GotEndpointLock();
+ }
+}
+
+WebSocketTransportConnectJob::WebSocketTransportConnectJob(
+ const std::string& group_name,
+ RequestPriority priority,
+ const scoped_refptr<TransportSocketParams>& params,
+ TimeDelta timeout_duration,
+ const CompletionCallback& callback,
+ ClientSocketFactory* client_socket_factory,
+ HostResolver* host_resolver,
+ ClientSocketHandle* handle,
+ Delegate* delegate,
+ NetLog* pool_net_log,
+ const BoundNetLog& request_net_log)
+ : ConnectJob(group_name,
+ timeout_duration,
+ priority,
+ delegate,
+ BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
+ data_(params, client_socket_factory, host_resolver, &connect_timing_),
+ race_result_(TransportConnectJobHelper::CONNECTION_LATENCY_UNKNOWN),
+ handle_(handle),
+ callback_(callback),
+ request_net_log_(request_net_log),
+ had_ipv4_(false),
+ had_ipv6_(false) {
+ data_.SetOnIOComplete(this);
+}
+
+WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
+
+LoadState WebSocketTransportConnectJob::GetLoadState() const {
+ LoadState load_state = LOAD_STATE_RESOLVING_HOST;
+ if (ipv6_job_)
+ load_state = ipv6_job_->GetLoadState();
+ // LOAD_STATE_CONNECTING is better than
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 could you please elaborate better for what point
Adam Rice 2014/06/19 13:55:20 I updated the comment. PTAL.
tyoshino (SeeGerritForStatus) 2014/06/20 12:12:06 Thanks
+ // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET.
+ if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
+ load_state = ipv4_job_->GetLoadState();
+ return load_state;
+}
+
+int WebSocketTransportConnectJob::DoResolveHost() {
+ return data_.DoResolveHost(priority(), net_log());
+}
+
+int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
+ return data_.DoResolveHostComplete(result, net_log());
+}
+
+int WebSocketTransportConnectJob::DoTransportConnect() {
+ AddressList ipv4_addresses;
+ AddressList ipv6_addresses;
+ int result = ERR_UNEXPECTED;
+ data_.set_next_state(
+ TransportConnectJobHelper::STATE_TRANSPORT_CONNECT_COMPLETE);
+
+ for (AddressList::const_iterator it = data_.addresses().begin();
+ it != data_.addresses().end();
+ ++it) {
+ switch (it->GetFamily()) {
+ case ADDRESS_FAMILY_IPV4:
+ ipv4_addresses.push_back(*it);
+ break;
+
+ case ADDRESS_FAMILY_IPV6:
+ ipv6_addresses.push_back(*it);
+ break;
+
+ default:
+ DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 do we want to ignore this?
Adam Rice 2014/06/19 13:55:20 At the moment this cannot happen, so we could put
tyoshino (SeeGerritForStatus) 2014/06/20 12:12:06 OK
+ break;
+ }
+ }
+
+ if (!ipv4_addresses.empty()) {
+ had_ipv4_ = true;
+ ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4));
+ }
+
+ if (!ipv6_addresses.empty()) {
+ had_ipv6_ = true;
+ ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6));
+ result = ipv6_job_->Start();
+ if (result == OK) {
+ SetSocket(ipv6_job_->PassSocket());
+ race_result_ =
+ had_ipv4_
+ ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
+ : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
+ return result;
+ }
+ if (result == ERR_IO_PENDING && ipv4_job_) {
+ // This use of base::Unretained is safe because fallback_timer_ is owned
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 |fallback_timer_|
Adam Rice 2014/06/19 13:55:20 Done.
+ // by this object.
+ fallback_timer_.Start(
+ FROM_HERE,
+ TimeDelta::FromMilliseconds(
+ TransportConnectJobHelper::kIPv6FallbackTimerInMs),
+ base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
+ base::Unretained(this)));
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 return here?
+ }
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 how about breaking L417 into two if-s and return w
Adam Rice 2014/06/19 13:55:20 Very clever. Done.
+ if (result != ERR_IO_PENDING)
+ ipv6_job_.reset();
+ }
+
+ if (ipv4_job_ && !ipv6_job_) {
+ result = ipv4_job_->Start();
+ if (result == OK) {
+ SetSocket(ipv4_job_->PassSocket());
+ race_result_ =
+ had_ipv6_
+ ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
+ : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
+ }
+ }
+
+ return result;
+}
+
+int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
+ if (result == OK)
+ data_.HistogramDuration(race_result_);
+ return result;
+}
+
+void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) {
+ if (result == OK) {
+ switch (job->type()) {
+ case SUB_JOB_IPV4:
+ race_result_ =
+ had_ipv6_
+ ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
+ : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
+ break;
+
+ case SUB_JOB_IPV6:
+ race_result_ =
+ had_ipv4_
+ ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
+ : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
+ break;
+ }
+ SetSocket(job->PassSocket());
+
+ // Make sure all connections are cancelled even if this object fails to be
+ // deleted.
+ ipv4_job_.reset();
+ ipv6_job_.reset();
+ } else {
+ switch (job->type()) {
+ case SUB_JOB_IPV4:
+ ipv4_job_.reset();
+ break;
+
+ case SUB_JOB_IPV6:
+ ipv6_job_.reset();
+ if (ipv4_job_ && !ipv4_job_->started()) {
+ fallback_timer_.Stop();
+ result = ipv4_job_->Start();
+ if (result != ERR_IO_PENDING) {
+ OnSubJobComplete(result, ipv4_job_.get());
+ return;
+ }
+ }
+ break;
+ }
+ if (ipv4_job_ || ipv6_job_)
+ return;
+ }
+ data_.OnIOComplete(this, result);
+}
+
+void WebSocketTransportConnectJob::StartIPv4JobAsync() {
+ DCHECK(ipv4_job_);
+ int result = ipv4_job_->Start();
+ if (result != ERR_IO_PENDING)
+ OnSubJobComplete(result, ipv4_job_.get());
+}
+
+int WebSocketTransportConnectJob::ConnectInternal() {
+ return data_.DoConnectInternal(this);
+}
+
+WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
+ int max_sockets,
+ int max_sockets_per_group,
+ ClientSocketPoolHistograms* histograms,
+ HostResolver* host_resolver,
+ ClientSocketFactory* client_socket_factory,
+ NetLog* net_log)
+ : TransportClientSocketPool(max_sockets,
+ max_sockets_per_group,
+ histograms,
+ host_resolver,
+ client_socket_factory,
+ net_log),
+ connect_job_delegate_(this),
+ histograms_(histograms),
+ pool_net_log_(net_log),
+ client_socket_factory_(client_socket_factory),
+ host_resolver_(host_resolver),
+ max_sockets_(max_sockets),
+ handed_out_socket_count_(0),
+ is_stalled_(false),
+ weak_factory_(this) {}
+
+WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
+ DCHECK(pending_connects_.empty());
+ DCHECK_EQ(0, handed_out_socket_count_);
+ DCHECK(!is_stalled_);
+}
+
+// static
+void WebSocketTransportClientSocketPool::UnlockEndpoint(
+ ClientSocketHandle* handle) {
+ DCHECK(handle->is_initialized());
+ WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket());
+}
+
+int WebSocketTransportClientSocketPool::RequestSocket(
+ const std::string& group_name,
+ const void* params,
+ RequestPriority priority,
+ ClientSocketHandle* handle,
+ const CompletionCallback& callback,
+ const BoundNetLog& request_net_log) {
+ DCHECK(params);
+ const scoped_refptr<TransportSocketParams>& casted_params =
+ *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
+
+ NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params);
+
+ CHECK(!callback.is_null());
+ CHECK(handle);
+
+ request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
+
+ if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
+ request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
+ is_stalled_ = true;
+ return ERR_IO_PENDING;
+ }
+
+ scoped_ptr<WebSocketTransportConnectJob> connect_job(
+ new WebSocketTransportConnectJob(
+ group_name,
+ priority,
+ casted_params,
+ TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds),
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 ConnectionTimeout()?
Adam Rice 2014/06/19 13:55:20 Done.
+ callback,
+ client_socket_factory_,
+ host_resolver_,
+ handle,
+ &connect_job_delegate_,
+ pool_net_log_,
+ request_net_log));
+
+ int rv = connect_job->Connect();
+ // Regardless of the outcome of |connect_job|, it will always be bound to
+ // |handle|, since this pool uses early-binding. So the binding is logged
+ // here, without waiting for the result.
+ request_net_log.AddEvent(
+ NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
+ connect_job->net_log().source().ToEventParametersCallback());
+ if (rv == OK) {
+ HandOutSocket(connect_job->PassSocket(),
+ connect_job->connect_timing(),
+ handle,
+ request_net_log);
+ request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
+ } else if (rv == ERR_IO_PENDING) {
+ // TODO(ricea): Implement backup job timer?
+ AddJob(handle, connect_job.Pass());
+ } else {
+ scoped_ptr<StreamSocket> error_socket;
+ connect_job->GetAdditionalErrorState(handle);
+ error_socket = connect_job->PassSocket();
+ if (error_socket) {
+ HandOutSocket(error_socket.Pass(),
+ connect_job->connect_timing(),
+ handle,
+ request_net_log);
+ }
+ }
+
+ if (rv != ERR_IO_PENDING) {
+ request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
+ }
+
+ return rv;
+}
+
+void WebSocketTransportClientSocketPool::RequestSockets(
+ const std::string& group_name,
+ const void* params,
+ int num_sockets,
+ const BoundNetLog& net_log) {
+ NOTIMPLEMENTED();
+}
+
+void WebSocketTransportClientSocketPool::CancelRequest(
+ const std::string& group_name,
+ ClientSocketHandle* handle) {
+ if (!DeleteJob(handle))
+ pending_callbacks_.erase(handle);
+}
+
+void WebSocketTransportClientSocketPool::ReleaseSocket(
+ const std::string& group_name,
+ scoped_ptr<StreamSocket> socket,
+ int id) {
+ WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
+ CHECK_GT(handed_out_socket_count_, 0);
+ --handed_out_socket_count_;
+ if (!ReachedMaxSocketsLimit())
+ is_stalled_ = false;
+}
+
+void WebSocketTransportClientSocketPool::FlushWithError(int error) {
+ CancelAllConnectJobs();
+}
+
+void WebSocketTransportClientSocketPool::CloseIdleSockets() {
+ // We have no idle sockets.
+}
+
+int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; }
+
+int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
+ const std::string& group_name) const {
+ return 0;
+}
+
+LoadState WebSocketTransportClientSocketPool::GetLoadState(
+ const std::string& group_name,
+ const ClientSocketHandle* handle) const {
+ if (pending_callbacks_.count(handle))
+ return LOAD_STATE_CONNECTING;
+ return LookupConnectJob(handle)->GetLoadState();
+}
+
+base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue(
+ const std::string& name,
+ const std::string& type,
+ bool include_nested_pools) const {
+ base::DictionaryValue* dict = new base::DictionaryValue();
+ dict->SetString("name", name);
+ dict->SetString("type", type);
+ dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
+ dict->SetInteger("connecting_socket_count", pending_connects_.size());
+ dict->SetInteger("idle_socket_count", 0);
+ dict->SetInteger("max_socket_count", max_sockets_);
+ dict->SetInteger("max_sockets_per_group", max_sockets_);
+ dict->SetInteger("pool_generation_number", 0);
+ return dict;
+}
+
+TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
+ return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
+}
+
+ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms()
+ const {
+ return histograms_;
+}
+
+bool WebSocketTransportClientSocketPool::IsStalled() const {
+ return is_stalled_;
+}
+
+void WebSocketTransportClientSocketPool::OnConnectJobComplete(
+ int result,
+ WebSocketTransportConnectJob* job) {
+ DCHECK_NE(ERR_IO_PENDING, result);
+
+ scoped_ptr<StreamSocket> socket = job->PassSocket();
+
+ BoundNetLog request_net_log = job->request_net_log();
+ CompletionCallback callback = job->callback();
+ LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
+
+ ClientSocketHandle* const handle = job->handle();
+
+ if (result == OK) {
+ DCHECK(socket.get());
+ HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
+ request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
+ } else {
+ // If we got a socket, it must contain error information so pass that
+ // up so that the caller can retrieve it.
+ bool handed_out_socket = false;
+ job->GetAdditionalErrorState(handle);
+ if (socket.get()) {
+ handed_out_socket = true;
+ HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
+ }
+ request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
+
+ if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit())
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 is this correct? pending_connects_.size() decrease
Adam Rice 2014/06/19 13:55:20 No, it's very wrong. I didn't write any code to re
+ is_stalled_ = false;
+ }
+ bool delete_succeeded = DeleteJob(handle);
+ DCHECK(delete_succeeded);
+ InvokeUserCallbackLater(handle, callback, result);
+}
+
+void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
+ ClientSocketHandle* handle,
+ const CompletionCallback& callback,
+ int rv) {
+ DCHECK(!pending_callbacks_.count(handle));
+ pending_callbacks_.insert(handle);
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
+ weak_factory_.GetWeakPtr(),
+ handle,
+ callback,
+ rv));
+}
+
+void WebSocketTransportClientSocketPool::InvokeUserCallback(
+ ClientSocketHandle* handle,
+ const CompletionCallback& callback,
+ int rv) {
+ if (pending_callbacks_.erase(handle))
+ callback.Run(rv);
+}
+
+bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
+ return base::checked_cast<int>(pending_connects_.size()) +
+ handed_out_socket_count_ >=
+ max_sockets_;
+}
+
+void WebSocketTransportClientSocketPool::HandOutSocket(
+ scoped_ptr<StreamSocket> socket,
+ const LoadTimingInfo::ConnectTiming& connect_timing,
+ ClientSocketHandle* handle,
+ const BoundNetLog& net_log) {
+ DCHECK(socket);
+ handle->SetSocket(socket.Pass());
+ handle->set_reuse_type(ClientSocketHandle::UNUSED);
+ handle->set_idle_time(TimeDelta());
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 needed?
Adam Rice 2014/06/19 13:55:20 No. Replaced with DCHECKs.
+ handle->set_pool_id(0);
+ handle->set_connect_timing(connect_timing);
+
+ net_log.AddEvent(
+ NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
+ handle->socket()->NetLog().source().ToEventParametersCallback());
+
+ ++handed_out_socket_count_;
+}
+
+void WebSocketTransportClientSocketPool::AddJob(
+ ClientSocketHandle* handle,
+ scoped_ptr<WebSocketTransportConnectJob> connect_job) {
+ bool inserted =
+ pending_connects_.insert(PendingConnectsMap::value_type(
+ handle, connect_job.release())).second;
+ DCHECK(inserted);
+}
+
+bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
+ PendingConnectsMap::iterator it = pending_connects_.find(handle);
+ if (it == pending_connects_.end())
+ return false;
+ delete it->second, it->second = NULL;
+ pending_connects_.erase(it);
+ return true;
+}
+
+void WebSocketTransportClientSocketPool::CancelAllConnectJobs() {
+ STLDeleteValues(&pending_connects_);
+}
+
+const WebSocketTransportConnectJob*
+WebSocketTransportClientSocketPool::LookupConnectJob(
+ const ClientSocketHandle* handle) const {
+ PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
+ CHECK(it != pending_connects_.end());
+ return it->second;
+}
+
+WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
+ WebSocketTransportClientSocketPool* owner)
+ : owner_(owner) {}
+
+WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
+
+void
+WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
+ int result,
+ ConnectJob* job) {
+ owner_->OnConnectJobComplete(result,
+ static_cast<WebSocketTransportConnectJob*>(job));
+}
+
+} // namespace net

Powered by Google App Engine
This is Rietveld 408576698