Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(786)

Side by Side Diff: net/socket/websocket_transport_client_socket_pool.cc

Issue 240873003: Create WebSocketTransportClientSocketPool (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Rebase. Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/websocket_transport_client_socket_pool.h"
6
7 #include <algorithm>
8 #include <map>
9 #include <utility>
10
11 #include "base/compiler_specific.h"
12 #include "base/containers/linked_list.h"
13 #include "base/logging.h"
14 #include "base/memory/singleton.h"
15 #include "base/numerics/safe_conversions.h"
16 #include "base/stl_util.h"
17 #include "base/strings/string_util.h"
18 #include "base/time/time.h"
19 #include "base/values.h"
20 #include "net/base/ip_endpoint.h"
21 #include "net/base/net_errors.h"
22 #include "net/base/net_log.h"
23 #include "net/socket/client_socket_factory.h"
24 #include "net/socket/client_socket_handle.h"
25 #include "net/socket/client_socket_pool_base.h"
26
27 namespace net {
28
29 namespace {
30
31 using base::TimeDelta;
32
33 // TODO(ricea): For now, we implement a global timeout for compatability with
34 // TransportConnectJob. Since WebSocketTransportConnectJob controls the address
35 // selection process more tightly, it could do something smarter here.
36 const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes.
37
38 } // namespace
39
40 class WebSocketEndpointLockManager {
41 public:
42 typedef WebSocketTransportConnectJob::SubJob SubJob;
43 static WebSocketEndpointLockManager* GetInstance();
44
45 // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the
46 // lock was not acquired, then |job->GotEndpointLock()| will be called when it
47 // is.
48 int LockEndpoint(const IPEndPoint& endpoint, SubJob* job);
49
50 // Record the IPEndPoint associated with a particular socket. This is
51 // necessary because TCPClientSocket refuses to return the PeerAddress after
52 // the connection is disconnected. The association will be forgotten when
53 // UnlockSocket() is called. The |socket| pointer must not be deleted between
54 // the call to RememberSocket().
55 void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint);
56
57 // Release the lock on an endpoint, and, if appropriate, trigger the next
58 // socket connection. It is permitted to call UnlockSocket() multiple times
59 // for the same |socket|; all calls after the first will be ignored.
60 void UnlockSocket(StreamSocket* socket);
61
62 // Release the lock on |endpoint|. Most callers should use UnlockSocket()
63 // instead.
64 void UnlockEndpoint(const IPEndPoint& endpoint);
65
66 private:
67 typedef base::LinkedList<SubJob> ConnectJobQueue;
68 typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap;
69 typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap;
70
71 WebSocketEndpointLockManager() {}
72 ~WebSocketEndpointLockManager() {
73 DCHECK(endpoint_job_map_.empty());
74 DCHECK(socket_endpoint_map_.empty());
75 }
76
77 EndPointJobMap endpoint_job_map_;
78 SocketEndPointMap socket_endpoint_map_;
79
80 friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>;
81
82 DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager);
83 };
84
85 WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() {
86 return Singleton<WebSocketEndpointLockManager>::get();
87 }
88
89 class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> {
90 public:
91 SubJob(const AddressList& addresses,
92 WebSocketTransportConnectJob* parent_job,
93 SubJobType type)
94 : addresses_(addresses),
95 parent_job_(parent_job),
96 next_state_(STATE_NONE),
97 current_address_index_(0),
98 type_(type) {}
99
100 ~SubJob() {
101 // We don't worry about cancelling the TCP connect, since ~StreamSocket will
102 // take care of it.
103 if (next()) {
104 DCHECK(previous());
105 DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
106 RemoveFromList();
107 } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) {
108 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
109 WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(
110 addresses_[current_address_index_]);
111 }
112 }
113
114 // Start connecting.
115 int Start() {
116 DCHECK_EQ(STATE_NONE, next_state_);
117 next_state_ = STATE_WAIT_FOR_LOCK;
118 return DoLoop(OK);
119 }
120
121 bool started() { return next_state_ != STATE_NONE; }
122
123 // Called by WebSocketEndpointLockManager when the lock becomes available.
124 void GotEndpointLock() { OnIOComplete(OK); }
125
126 LoadState GetLoadState() const {
127 switch (next_state_) {
128 case STATE_WAIT_FOR_LOCK:
129 case STATE_WAIT_FOR_LOCK_COMPLETE:
130 // TODO(ricea): Add a WebSocket-specific LOAD_STATE ?
131 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
132 case STATE_TRANSPORT_CONNECT:
133 case STATE_TRANSPORT_CONNECT_COMPLETE:
134 return LOAD_STATE_CONNECTING;
135 case STATE_NONE:
136 return LOAD_STATE_IDLE;
137 }
138 NOTREACHED();
139 return LOAD_STATE_IDLE;
140 }
141
142 SubJobType type() const { return type_; }
143
144 scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); }
145
146 private:
147 enum State {
148 STATE_NONE,
149 STATE_WAIT_FOR_LOCK,
150 STATE_WAIT_FOR_LOCK_COMPLETE,
151 STATE_TRANSPORT_CONNECT,
152 STATE_TRANSPORT_CONNECT_COMPLETE,
153 };
154
155 ClientSocketFactory* client_socket_factory() const {
156 return parent_job_->data_.client_socket_factory();
157 }
158
159 const BoundNetLog& net_log() const { return parent_job_->net_log(); }
160
161 void OnIOComplete(int result) {
162 int rv = DoLoop(result);
163 if (rv != ERR_IO_PENDING)
164 parent_job_->OnSubJobComplete(rv, this); // |this| deleted
165 }
166
167 int DoLoop(int result) {
168 DCHECK_NE(next_state_, STATE_NONE);
169
170 int rv = result;
171 do {
172 State state = next_state_;
173 next_state_ = STATE_NONE;
174 switch (state) {
175 case STATE_WAIT_FOR_LOCK:
176 rv = DoEndpointLock();
177 break;
178 case STATE_WAIT_FOR_LOCK_COMPLETE:
179 rv = DoEndpointLockComplete();
180 break;
181 case STATE_TRANSPORT_CONNECT:
182 DCHECK_EQ(OK, rv);
183 rv = DoTransportConnect();
184 break;
185 case STATE_TRANSPORT_CONNECT_COMPLETE:
186 rv = DoTransportConnectComplete(rv);
187 break;
188 default:
189 NOTREACHED();
190 rv = ERR_FAILED;
191 break;
192 }
193 } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
194
195 return rv;
196 }
197
198 int DoEndpointLock() {
199 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
200 int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint(
201 addresses_[current_address_index_], this);
202 next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE;
203 return rv;
204 }
205
206 int DoEndpointLockComplete() {
207 next_state_ = STATE_TRANSPORT_CONNECT;
208 return OK;
209 }
210
211 int DoTransportConnect() {
212 next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
213 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
214 AddressList one_address(addresses_[current_address_index_]);
215 transport_socket_ = client_socket_factory()->CreateTransportClientSocket(
216 one_address, net_log().net_log(), net_log().source());
217 // This use of base::Unretained() is safe because transport_socket_ is
218 // destroyed in the destructor.
219 return transport_socket_->Connect(
220 base::Bind(&SubJob::OnIOComplete, base::Unretained(this)));
221 }
222
223 int DoTransportConnectComplete(int result) {
224 WebSocketEndpointLockManager* endpoint_lock_manager =
225 WebSocketEndpointLockManager::GetInstance();
226 if (result != OK) {
227 endpoint_lock_manager->UnlockEndpoint(addresses_[current_address_index_]);
228
229 if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) {
230 // Try falling back to the next address in the list.
231 next_state_ = STATE_WAIT_FOR_LOCK;
232 ++current_address_index_;
233 result = OK;
234 }
235
236 return result;
237 }
238
239 endpoint_lock_manager->RememberSocket(transport_socket_.get(),
240 addresses_[current_address_index_]);
241
242 return result;
243 }
244
245 const AddressList addresses_;
246 WebSocketTransportConnectJob* const parent_job_;
247
248 State next_state_;
249 int current_address_index_;
250 const SubJobType type_;
251
252 scoped_ptr<StreamSocket> transport_socket_;
253
254 DISALLOW_COPY_AND_ASSIGN(SubJob);
255 };
256
257 int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint,
258 SubJob* job) {
259 EndPointJobMap::value_type insert_value(endpoint, NULL);
260 std::pair<EndPointJobMap::iterator, bool> rv =
261 endpoint_job_map_.insert(insert_value);
262 if (rv.second) {
263 DVLOG(3) << "Locking endpoint " << endpoint.ToString();
264 rv.first->second = new ConnectJobQueue;
265 return OK;
266 }
267 DVLOG(3) << "Waiting for endpoint " << endpoint.ToString();
268 rv.first->second->Append(job);
269 return ERR_IO_PENDING;
270 }
271
272 void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket,
273 const IPEndPoint& endpoint) {
274 bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type(
275 socket, endpoint)).second;
276 DCHECK(inserted);
277 DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end());
278 DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for "
279 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
280 << " sockets remembered)";
281 }
282
283 void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) {
284 SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket);
285 if (socket_it == socket_endpoint_map_.end()) {
286 DVLOG(3) << "Ignoring request to unlock already-unlocked socket"
287 "(StreamSocket*)" << socket;
288 return;
289 }
290 const IPEndPoint& endpoint = socket_it->second;
291 DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for "
292 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
293 << " sockets left)";
294 UnlockEndpoint(endpoint);
295 socket_endpoint_map_.erase(socket_it);
296 }
297
298 void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) {
299 EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint);
300 CHECK(found_it != endpoint_job_map_.end()); // Security critical
301 ConnectJobQueue* queue = found_it->second;
302 if (queue->empty()) {
303 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString();
304 delete queue;
305 endpoint_job_map_.erase(found_it);
306 } else {
307 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString()
308 << " and activating next waiter";
309 SubJob* next_job = queue->head()->value();
310 next_job->RemoveFromList();
311 next_job->GotEndpointLock();
312 }
313 }
314
315 WebSocketTransportConnectJob::WebSocketTransportConnectJob(
316 const std::string& group_name,
317 RequestPriority priority,
318 const scoped_refptr<TransportSocketParams>& params,
319 TimeDelta timeout_duration,
320 const CompletionCallback& callback,
321 ClientSocketFactory* client_socket_factory,
322 HostResolver* host_resolver,
323 ClientSocketHandle* handle,
324 Delegate* delegate,
325 NetLog* pool_net_log,
326 const BoundNetLog& request_net_log)
327 : ConnectJob(group_name,
328 timeout_duration,
329 priority,
330 delegate,
331 BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
332 data_(params, client_socket_factory, host_resolver, &connect_timing_),
333 race_result_(TransportConnectJobCommon::CONNECTION_LATENCY_UNKNOWN),
334 handle_(handle),
335 callback_(callback),
336 request_net_log_(request_net_log),
337 had_ipv4_(false),
338 had_ipv6_(false) {
339 data_.SetOnIOComplete(this);
340 }
341
342 WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
343
344 LoadState WebSocketTransportConnectJob::GetLoadState() const {
345 LoadState load_state = LOAD_STATE_RESOLVING_HOST;
346 if (ipv6_job_)
347 load_state = ipv6_job_->GetLoadState();
348 // LOAD_STATE_CONNECTING is better than
349 // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET.
350 if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
351 load_state = ipv4_job_->GetLoadState();
352 return load_state;
353 }
354
355 int WebSocketTransportConnectJob::DoResolveHost() {
356 return data_.DoResolveHost(priority(), net_log());
357 }
358
359 int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
360 return data_.DoResolveHostComplete(result, net_log());
361 }
362
363 int WebSocketTransportConnectJob::DoTransportConnect() {
364 AddressList ipv4_addresses;
365 AddressList ipv6_addresses;
366 int result = ERR_UNEXPECTED;
367 data_.set_next_state(
368 TransportConnectJobCommon::STATE_TRANSPORT_CONNECT_COMPLETE);
369
370 for (AddressList::const_iterator it = data_.addresses().begin();
371 it != data_.addresses().end();
372 ++it) {
373 switch (it->GetFamily()) {
374 case ADDRESS_FAMILY_IPV4:
375 ipv4_addresses.push_back(*it);
376 break;
377
378 case ADDRESS_FAMILY_IPV6:
379 ipv6_addresses.push_back(*it);
380 break;
381
382 default:
383 DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
384 break;
385 }
386 }
387
388 if (!ipv4_addresses.empty()) {
389 had_ipv4_ = true;
390 ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4));
391 }
392
393 if (!ipv6_addresses.empty()) {
394 had_ipv6_ = true;
395 ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6));
396 result = ipv6_job_->Start();
397 if (result == OK) {
398 SetSocket(ipv6_job_->PassSocket());
399 race_result_ =
400 had_ipv4_
401 ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_RACEABLE
402 : TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_SOLO;
403 return result;
404 }
405 if (result == ERR_IO_PENDING && ipv4_job_) {
406 // This use of base::Unretained is safe because fallback_timer_ is owned
407 // by this object.
408 fallback_timer_.Start(
409 FROM_HERE,
410 TimeDelta::FromMilliseconds(
411 TransportConnectJobCommon::kIPv6FallbackTimerInMs),
412 base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
413 base::Unretained(this)));
414 }
415 if (result != ERR_IO_PENDING)
416 ipv6_job_.reset();
417 }
418
419 if (ipv4_job_ && !ipv6_job_) {
420 result = ipv4_job_->Start();
421 if (result == OK) {
422 SetSocket(ipv4_job_->PassSocket());
423 race_result_ =
424 had_ipv6_
425 ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_WINS_RACE
426 : TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_NO_RACE;
427 }
428 }
429
430 return result;
431 }
432
433 int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
434 if (result == OK)
435 data_.HistogramDuration(race_result_);
436 return result;
437 }
438
439 void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) {
440 if (result == OK) {
441 switch (job->type()) {
442 case SUB_JOB_IPV4:
443 race_result_ =
444 had_ipv6_
445 ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_WINS_RACE
446 : TransportConnectJobCommon::CONNECTION_LATENCY_IPV4_NO_RACE;
447 break;
448
449 case SUB_JOB_IPV6:
450 race_result_ =
451 had_ipv4_
452 ? TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_RACEABLE
453 : TransportConnectJobCommon::CONNECTION_LATENCY_IPV6_SOLO;
454 break;
455 }
456 SetSocket(job->PassSocket());
457
458 // Make sure all connections are cancelled even if this object fails to be
459 // deleted.
460 ipv4_job_.reset();
461 ipv6_job_.reset();
462 } else {
463 switch (job->type()) {
464 case SUB_JOB_IPV4:
465 ipv4_job_.reset();
466 break;
467
468 case SUB_JOB_IPV6:
469 ipv6_job_.reset();
470 if (ipv4_job_ && !ipv4_job_->started()) {
471 fallback_timer_.Stop();
472 result = ipv4_job_->Start();
473 if (result != ERR_IO_PENDING) {
474 OnSubJobComplete(result, ipv4_job_.get());
475 return;
476 }
477 }
478 break;
479 }
480 if (ipv4_job_ || ipv6_job_)
481 return;
482 }
483 data_.OnIOComplete(this, result);
484 }
485
486 void WebSocketTransportConnectJob::StartIPv4JobAsync() {
487 DCHECK(ipv4_job_);
488 int result = ipv4_job_->Start();
489 if (result != ERR_IO_PENDING)
490 OnSubJobComplete(result, ipv4_job_.get());
491 }
492
493 int WebSocketTransportConnectJob::ConnectInternal() {
494 return data_.DoConnectInternal(this);
495 }
496
497 WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
498 int max_sockets,
499 int max_sockets_per_group,
500 ClientSocketPoolHistograms* histograms,
501 HostResolver* host_resolver,
502 ClientSocketFactory* client_socket_factory,
503 NetLog* net_log)
504 : TransportClientSocketPool(max_sockets,
505 max_sockets_per_group,
506 histograms,
507 host_resolver,
508 client_socket_factory,
509 net_log),
510 connect_job_delegate_(this),
511 histograms_(histograms),
512 pool_net_log_(net_log),
513 client_socket_factory_(client_socket_factory),
514 host_resolver_(host_resolver),
515 max_sockets_(max_sockets),
516 handed_out_socket_count_(0),
517 is_stalled_(false),
518 weak_factory_(this) {}
519
520 WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
521 DCHECK(pending_connects_.empty());
522 DCHECK_EQ(0, handed_out_socket_count_);
523 DCHECK(!is_stalled_);
524 }
525
526 // static
527 void WebSocketTransportClientSocketPool::UnlockEndpoint(
528 ClientSocketHandle* handle) {
529 DCHECK(handle->is_initialized());
530 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket());
531 }
532
533 int WebSocketTransportClientSocketPool::RequestSocket(
534 const std::string& group_name,
535 const void* params,
536 RequestPriority priority,
537 ClientSocketHandle* handle,
538 const CompletionCallback& callback,
539 const BoundNetLog& request_net_log) {
540 DCHECK(params);
541 const scoped_refptr<TransportSocketParams>& casted_params =
542 *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
543
544 NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params);
545
546 CHECK(!callback.is_null());
547 CHECK(handle);
548
549 request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
550
551 if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
552 request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
553 is_stalled_ = true;
554 return ERR_IO_PENDING;
555 }
556
557 scoped_ptr<WebSocketTransportConnectJob> connect_job(
558 new WebSocketTransportConnectJob(
559 group_name,
560 priority,
561 casted_params,
562 TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds),
563 callback,
564 client_socket_factory_,
565 host_resolver_,
566 handle,
567 &connect_job_delegate_,
568 pool_net_log_,
569 request_net_log));
570
571 int rv = connect_job->Connect();
572 request_net_log.AddEvent(
573 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
574 connect_job->net_log().source().ToEventParametersCallback());
tyoshino (SeeGerritForStatus) 2014/06/11 07:55:11 should we log this even when rv == ERR_IO_PENDING?
Adam Rice 2014/06/11 08:17:09 Because the WebSocketTransportClientSocketPool use
575 if (rv == OK) {
576 HandOutSocket(connect_job->PassSocket(),
577 connect_job->connect_timing(),
578 handle,
579 request_net_log);
580 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
581 } else if (rv == ERR_IO_PENDING) {
582 // TODO(ricea): Implement backup job timer?
583 AddJob(handle, connect_job.Pass());
584 } else {
585 scoped_ptr<StreamSocket> error_socket;
586 connect_job->GetAdditionalErrorState(handle);
587 error_socket = connect_job->PassSocket();
588 if (error_socket) {
589 HandOutSocket(error_socket.Pass(),
590 connect_job->connect_timing(),
591 handle,
592 request_net_log);
593 }
594 }
595
596 if (rv != ERR_IO_PENDING) {
597 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
598 }
599
600 return rv;
601 }
602
603 void WebSocketTransportClientSocketPool::RequestSockets(
604 const std::string& group_name,
605 const void* params,
606 int num_sockets,
607 const BoundNetLog& net_log) {
608 NOTIMPLEMENTED();
609 }
610
611 void WebSocketTransportClientSocketPool::CancelRequest(
612 const std::string& group_name,
613 ClientSocketHandle* handle) {
614 CancelJob(handle);
615 }
616
617 void WebSocketTransportClientSocketPool::ReleaseSocket(
618 const std::string& group_name,
619 scoped_ptr<StreamSocket> socket,
620 int id) {
621 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
622 CHECK_GT(handed_out_socket_count_, 0);
623 --handed_out_socket_count_;
624 if (!ReachedMaxSocketsLimit())
625 is_stalled_ = false;
626 }
627
628 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
629 CancelAllConnectJobs();
630 }
631
632 void WebSocketTransportClientSocketPool::CloseIdleSockets() {
633 // We have no idle sockets.
634 }
635
636 int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; }
637
638 int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
639 const std::string& group_name) const {
640 return 0;
641 }
642
643 LoadState WebSocketTransportClientSocketPool::GetLoadState(
644 const std::string& group_name,
645 const ClientSocketHandle* handle) const {
646 if (pending_callbacks_.count(handle))
647 return LOAD_STATE_CONNECTING;
648 return LookupConnectJob(handle)->GetLoadState();
649 }
650
651 base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue(
652 const std::string& name,
653 const std::string& type,
654 bool include_nested_pools) const {
655 base::DictionaryValue* dict = new base::DictionaryValue();
656 dict->SetString("name", name);
657 dict->SetString("type", type);
658 dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
659 dict->SetInteger("connecting_socket_count", pending_connects_.size());
660 dict->SetInteger("idle_socket_count", 0);
661 dict->SetInteger("max_socket_count", max_sockets_);
662 dict->SetInteger("max_sockets_per_group", max_sockets_);
663 dict->SetInteger("pool_generation_number", 0);
664 return dict;
665 }
666
667 TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
668 return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
669 }
670
671 ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms()
672 const {
673 return histograms_;
674 }
675
676 bool WebSocketTransportClientSocketPool::IsStalled() const {
677 return is_stalled_;
678 }
679
680 void WebSocketTransportClientSocketPool::AddHigherLayeredPool(
tyoshino (SeeGerritForStatus) 2014/06/11 06:55:04 no need to override?
Adam Rice 2014/06/11 08:17:09 It seemed a little risky to me. The base class imp
tyoshino (SeeGerritForStatus) 2014/06/11 08:22:22 Yeah, right, like IsStalled.
681 HigherLayeredPool* higher_pool) {
682 CHECK(higher_pool);
683 CHECK(!ContainsKey(higher_pools_, higher_pool));
684 higher_pools_.insert(higher_pool);
685 }
686
687 void WebSocketTransportClientSocketPool::RemoveHigherLayeredPool(
tyoshino (SeeGerritForStatus) 2014/06/11 06:55:04 no need to override?
Adam Rice 2014/06/11 08:17:09 Removed.
688 HigherLayeredPool* higher_pool) {
689 CHECK(higher_pool);
690 CHECK(ContainsKey(higher_pools_, higher_pool));
691 higher_pools_.erase(higher_pool);
692 }
693
694 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
695 int result,
696 WebSocketTransportConnectJob* job) {
697 DCHECK_NE(ERR_IO_PENDING, result);
698
699 scoped_ptr<StreamSocket> socket = job->PassSocket();
700
701 BoundNetLog request_net_log = job->request_net_log();
702 CompletionCallback callback = job->callback();
703 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
704
705 ClientSocketHandle* const handle = job->handle();
706
707 if (result == OK) {
708 DCHECK(socket.get());
709 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
710 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
711 } else {
712 // If we got a socket, it must contain error information so pass that
713 // up so that the caller can retrieve it.
714 bool handed_out_socket = false;
715 job->GetAdditionalErrorState(handle);
716 if (socket.get()) {
717 handed_out_socket = true;
718 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
719 }
720 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
721
722 if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit())
723 is_stalled_ = false;
724 }
725 bool delete_succeeded = DeleteJob(handle);
726 DCHECK(delete_succeeded);
727 InvokeUserCallbackLater(handle, callback, result);
728 }
729
730 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
731 ClientSocketHandle* handle,
732 const CompletionCallback& callback,
733 int rv) {
734 DCHECK(!pending_callbacks_.count(handle));
735 pending_callbacks_.insert(handle);
736 base::MessageLoop::current()->PostTask(
737 FROM_HERE,
738 base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
739 weak_factory_.GetWeakPtr(),
740 handle,
741 callback,
742 rv));
743 }
744
745 void WebSocketTransportClientSocketPool::InvokeUserCallback(
746 ClientSocketHandle* handle,
747 const CompletionCallback& callback,
748 int rv) {
749 if (pending_callbacks_.erase(handle))
750 callback.Run(rv);
751 }
752
753 bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
754 return base::checked_cast<int>(pending_connects_.size()) +
755 handed_out_socket_count_ >=
756 max_sockets_;
757 }
758
759 void WebSocketTransportClientSocketPool::HandOutSocket(
760 scoped_ptr<StreamSocket> socket,
761 const LoadTimingInfo::ConnectTiming& connect_timing,
762 ClientSocketHandle* handle,
763 const BoundNetLog& net_log) {
764 DCHECK(socket);
765 handle->SetSocket(socket.Pass());
766 handle->set_reuse_type(ClientSocketHandle::UNUSED);
767 handle->set_idle_time(TimeDelta());
768 handle->set_pool_id(0);
769 handle->set_connect_timing(connect_timing);
770
771 net_log.AddEvent(
772 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
773 handle->socket()->NetLog().source().ToEventParametersCallback());
774
775 ++handed_out_socket_count_;
776 }
777
778 void WebSocketTransportClientSocketPool::AddJob(
779 ClientSocketHandle* handle,
780 scoped_ptr<WebSocketTransportConnectJob> connect_job) {
781 bool inserted =
782 pending_connects_.insert(PendingConnectsMap::value_type(
783 handle, connect_job.release())).second;
784 DCHECK(inserted);
785 }
786
787 bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
788 PendingConnectsMap::iterator it = pending_connects_.find(handle);
789 if (it == pending_connects_.end())
790 return false;
791 delete it->second, it->second = NULL;
792 pending_connects_.erase(it);
793 return true;
794 }
795
796 void WebSocketTransportClientSocketPool::CancelJob(ClientSocketHandle* handle) {
tyoshino (SeeGerritForStatus) 2014/06/11 07:55:11 merge to CancelRequest?
Adam Rice 2014/06/11 08:17:09 Done.
797 if (!DeleteJob(handle))
798 pending_callbacks_.erase(handle);
799 }
800
801 void WebSocketTransportClientSocketPool::CancelAllConnectJobs() {
802 STLDeleteValues(&pending_connects_);
803 }
804
805 const WebSocketTransportConnectJob*
806 WebSocketTransportClientSocketPool::LookupConnectJob(
807 const ClientSocketHandle* handle) const {
808 PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
809 CHECK(it != pending_connects_.end());
810 return it->second;
811 }
812
813 WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
814 WebSocketTransportClientSocketPool* owner)
815 : owner_(owner) {}
816
817 WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
818
819 void
820 WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
821 int result,
822 ConnectJob* job) {
823 owner_->OnConnectJobComplete(result,
824 static_cast<WebSocketTransportConnectJob*>(job));
825 }
826
827 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698