Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1203)

Side by Side Diff: net/socket/websocket_transport_client_socket_pool.cc

Issue 240873003: Create WebSocketTransportClientSocketPool (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Minor fixes and comment tidying. Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/websocket_transport_client_socket_pool.h"
6
7 #include <algorithm>
8 #include <map>
9
10 #include "base/compiler_specific.h"
11 #include "base/containers/linked_list.h"
12 #include "base/logging.h"
13 #include "base/memory/singleton.h"
14 #include "base/metrics/histogram.h"
15 #include "base/numerics/safe_conversions.h"
16 #include "base/stl_util.h"
17 #include "base/strings/string_util.h"
18 #include "base/time/time.h"
19 #include "base/values.h"
20 #include "net/base/ip_endpoint.h"
21 #include "net/base/net_errors.h"
22 #include "net/base/net_log.h"
23 #include "net/socket/client_socket_factory.h"
24 #include "net/socket/client_socket_handle.h"
25 #include "net/socket/client_socket_pool_base.h"
26 #include "net/socket/socket_net_log_params.h"
27 #include "net/socket/tcp_client_socket.h"
28
29 namespace net {
30
31 namespace {
32
33 using base::TimeDelta;
34
35 // TODO(ricea): For now, we implement a global timeout for compatability will
36 // TransportConnectJob. Since WebSocketTransportConnectJob controls the address
37 // selection process more tightly, it could do something smarter here.
38 const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes.
39
40 } // namespace
41
42 // TODO(willchan): Base this off RTT instead of statically setting it.
43 const int WebSocketTransportConnectJob::kIPv6FallbackTimerInMs = 300;
44
45 class WebSocketEndpointLockManager {
46 public:
47 typedef WebSocketTransportConnectJob::SubJob SubJob;
48 static WebSocketEndpointLockManager* GetInstance();
49
50 // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the
51 // lock was not acquired, then |job->GotEndpointLock()| will be called at some
52 // future time.
53 int LockEndpoint(const IPEndPoint& endpoint, SubJob* job);
54
55 // Record the IPEndPoint associated with a particular socket. This is
56 // necessary because TCPClientSocket refuses to return the PeerAddress after
57 // the connection is disconnected. The association will be forgotten when
58 // UnlockSocket() is called. The |socket| pointer must not be deleted between
59 // the call to RememberSocket().
60 void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint);
61
62 // Release the lock on an endpoint, and, if appropriate, trigger the next
63 // socket connection. It is permitted to call UnlockSocket() multiple times
64 // for the same |socket|; all calls after the first will be ignored.
65 void UnlockSocket(StreamSocket* socket);
66
67 // Release the lock on |endpoint|. Most callers should use UnlockSocket()
68 // instead.
69 void UnlockEndpoint(const IPEndPoint& endpoint);
70
71 private:
72 typedef base::LinkedList<SubJob> ConnectJobQueue;
73 typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap;
74 typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap;
75
76 WebSocketEndpointLockManager() {}
77 ~WebSocketEndpointLockManager() {
78 DCHECK(endpoint_job_map_.empty());
79 DCHECK(socket_endpoint_map_.empty());
80 }
81
82 EndPointJobMap endpoint_job_map_;
83 SocketEndPointMap socket_endpoint_map_;
84
85 friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>;
86
87 DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager);
88 };
89
90 WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() {
91 return Singleton<WebSocketEndpointLockManager>::get();
92 }
93
94 class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> {
95 public:
96 SubJob(const AddressList& addresses,
97 WebSocketTransportConnectJob* parent_job,
98 SubJobType type)
99 : addresses_(addresses),
100 parent_job_(parent_job),
101 next_state_(STATE_NONE),
102 current_address_index_(0),
103 type_(type) {}
104
105 ~SubJob() {
106 // We don't worry about cancelling the host resolution and TCP connect,
107 // since ~SingleRequestHostResolver and ~StreamSocket will take care of it.
108 if (next()) {
109 DCHECK(previous());
110 DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
111 RemoveFromList();
112 } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) {
113 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
114 WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(
115 addresses_[current_address_index_]);
116 }
117 }
118
119 // Start connecting.
120 int Start() {
121 DCHECK_EQ(STATE_NONE, next_state_);
122 next_state_ = STATE_WAIT_FOR_LOCK;
123 return DoLoop(OK);
124 }
125
126 bool started() { return next_state_ != STATE_NONE; }
127
128 // Called by WebSocketEndpointLockManager when the lock becomes available.
129 void GotEndpointLock() { OnIOComplete(OK); }
130
131 LoadState GetLoadState() const {
132 switch (next_state_) {
133 case STATE_WAIT_FOR_LOCK:
134 case STATE_WAIT_FOR_LOCK_COMPLETE:
135 // TODO(ricea): Add a WebSocket-specific LOAD_STATE ?
136 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
137 case STATE_TRANSPORT_CONNECT:
138 case STATE_TRANSPORT_CONNECT_COMPLETE:
139 return LOAD_STATE_CONNECTING;
140 case STATE_NONE:
141 return LOAD_STATE_IDLE;
142 }
143 NOTREACHED();
144 return LOAD_STATE_IDLE;
145 }
146
147 SubJobType type() const { return type_; }
148
149 scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); }
150
151 private:
152 enum State {
153 STATE_NONE,
154 STATE_WAIT_FOR_LOCK,
155 STATE_WAIT_FOR_LOCK_COMPLETE,
156 STATE_TRANSPORT_CONNECT,
157 STATE_TRANSPORT_CONNECT_COMPLETE,
158 };
159
160 ClientSocketFactory* client_socket_factory() const {
161 return parent_job_->client_socket_factory_;
162 }
163 const scoped_refptr<TransportSocketParams>& params() const {
164 return parent_job_->params_;
165 }
166 const BoundNetLog& net_log() const { return parent_job_->net_log(); }
167
168 void OnIOComplete(int result) {
169 int rv = DoLoop(result);
170 if (rv != ERR_IO_PENDING)
171 parent_job_->OnSubJobComplete(rv, this); // |this| deleted
172 }
173
174 int DoLoop(int result) {
175 DCHECK_NE(next_state_, STATE_NONE);
176
177 int rv = result;
178 do {
179 State state = next_state_;
180 next_state_ = STATE_NONE;
181 switch (state) {
182 case STATE_WAIT_FOR_LOCK:
183 rv = DoEndpointLock();
184 break;
185 case STATE_WAIT_FOR_LOCK_COMPLETE:
186 rv = DoEndpointLockComplete();
187 break;
188 case STATE_TRANSPORT_CONNECT:
189 DCHECK_EQ(OK, rv);
190 rv = DoTransportConnect();
191 break;
192 case STATE_TRANSPORT_CONNECT_COMPLETE:
193 rv = DoTransportConnectComplete(rv);
194 break;
195 default:
196 NOTREACHED();
197 rv = ERR_FAILED;
198 break;
199 }
200 } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
201
202 return rv;
203 }
204
205 int DoEndpointLock() {
206 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
207 int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint(
208 addresses_[current_address_index_], this);
209 next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE;
210 return rv;
211 }
212
213 int DoEndpointLockComplete() {
214 next_state_ = STATE_TRANSPORT_CONNECT;
215 return OK;
216 }
217
218 int DoTransportConnect() {
219 next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
220 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
221 AddressList one_address(addresses_[current_address_index_]);
222 transport_socket_ = client_socket_factory()->CreateTransportClientSocket(
223 one_address, net_log().net_log(), net_log().source());
224 // This use of base::Unretained() is safe because transport_socket_ is
225 // destroyed in the destructor.
226 int rv = transport_socket_->Connect(
227 base::Bind(&SubJob::OnIOComplete, base::Unretained(this)));
228 return rv;
229 }
230
231 int DoTransportConnectComplete(int result) {
232 WebSocketEndpointLockManager* endpoint_lock_manager =
233 WebSocketEndpointLockManager::GetInstance();
234 if (result != OK) {
235 endpoint_lock_manager->UnlockEndpoint(addresses_[current_address_index_]);
236
237 if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) {
238 // Try to fall back to the next address in the list.
239 next_state_ = STATE_WAIT_FOR_LOCK;
240 ++current_address_index_;
241 result = OK;
242 }
243
244 return result;
245 }
246
247 endpoint_lock_manager->RememberSocket(transport_socket_.get(),
248 addresses_[current_address_index_]);
249
250 return result;
251 }
252
253 const AddressList addresses_;
254 WebSocketTransportConnectJob* const parent_job_;
255
256 State next_state_;
257 int current_address_index_;
258 const SubJobType type_;
259
260 scoped_ptr<StreamSocket> transport_socket_;
261
262 DISALLOW_COPY_AND_ASSIGN(SubJob);
263 };
264
265 int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint,
266 SubJob* job) {
267 EndPointJobMap::value_type insert_value(endpoint, NULL);
268 std::pair<EndPointJobMap::iterator, bool> rv =
269 endpoint_job_map_.insert(insert_value);
270 if (rv.second) {
271 DVLOG(3) << "Locking endpoint " << endpoint.ToString();
272 rv.first->second = new ConnectJobQueue;
273 return OK;
274 }
275 DVLOG(3) << "Waiting for endpoint " << endpoint.ToString();
276 rv.first->second->Append(job);
277 return ERR_IO_PENDING;
278 }
279
280 void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket,
281 const IPEndPoint& endpoint) {
282 bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type(
283 socket, endpoint)).second;
284 DCHECK(inserted);
285 DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end());
286 DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for "
287 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
288 << " sockets remembered)";
289 }
290
291 void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) {
292 SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket);
293 if (socket_it == socket_endpoint_map_.end()) {
294 DVLOG(3) << "Ignoring request to unlock already-unlocked socket"
295 "(StreamSocket*)" << socket;
296 return;
297 }
298 const IPEndPoint& endpoint = socket_it->second;
299 DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for "
300 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
301 << " sockets left)";
302 UnlockEndpoint(endpoint);
303 socket_endpoint_map_.erase(socket_it);
304 }
305
306 void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) {
307 EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint);
308 CHECK(found_it != endpoint_job_map_.end()); // Security critical
309 ConnectJobQueue* queue = found_it->second;
310 if (queue->empty()) {
311 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString();
312 delete queue;
313 endpoint_job_map_.erase(found_it);
314 } else {
315 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString()
316 << " and activating next waiter";
317 SubJob* next_job = queue->head()->value();
318 next_job->RemoveFromList();
319 next_job->GotEndpointLock();
320 }
321 }
322
323 WebSocketTransportConnectJob::WebSocketTransportConnectJob(
324 const std::string& group_name,
325 RequestPriority priority,
326 const scoped_refptr<TransportSocketParams>& params,
327 base::TimeDelta timeout_duration,
328 const CompletionCallback& callback,
329 ClientSocketFactory* client_socket_factory,
330 HostResolver* host_resolver,
331 ClientSocketHandle* handle,
332 Delegate* delegate,
333 NetLog* pool_net_log,
334 const BoundNetLog& request_net_log)
335 : ConnectJob(group_name,
336 timeout_duration,
337 priority,
338 delegate,
339 BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
340 params_(params),
341 client_socket_factory_(client_socket_factory),
342 resolver_(host_resolver),
343 histogram_(CONNECTION_LATENCY_UNKNOWN),
344 handle_(handle),
345 callback_(callback),
346 request_net_log_(request_net_log),
347 next_state_(STATE_NONE),
348 had_ipv4_(false),
349 had_ipv6_(false) {}
350
351 WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
352
353 LoadState WebSocketTransportConnectJob::GetLoadState() const {
354 LoadState load_state = LOAD_STATE_RESOLVING_HOST;
355 if (ipv6_job_)
356 load_state = ipv6_job_->GetLoadState();
357 // LOAD_STATE_CONNECTING is better than
358 // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET.
359 if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
360 load_state = ipv4_job_->GetLoadState();
361 return load_state;
362 }
363
364 void WebSocketTransportConnectJob::OnIOComplete(int result) {
365 result = DoLoop(result);
366 if (result != ERR_IO_PENDING)
367 NotifyDelegateOfCompletion(result);
368 }
369
370 int WebSocketTransportConnectJob::DoLoop(int result) {
371 DCHECK_NE(next_state_, STATE_NONE);
372
373 int rv = result;
374 do {
375 State state = next_state_;
376 next_state_ = STATE_NONE;
377 switch (state) {
378 case STATE_RESOLVE_HOST:
379 rv = DoResolveHost();
380 break;
381 case STATE_RESOLVE_HOST_COMPLETE:
382 rv = DoResolveHostComplete(rv);
383 break;
384 case STATE_TRANSPORT_CONNECT:
385 DCHECK_EQ(OK, rv);
386 rv = DoTransportConnect();
387 break;
388 case STATE_TRANSPORT_CONNECT_COMPLETE:
389 rv = DoTransportConnectComplete(rv);
390 break;
391 default:
392 NOTREACHED();
393 rv = ERR_FAILED;
394 break;
395 }
396 } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
397
398 return rv;
399 }
400
401 int WebSocketTransportConnectJob::DoResolveHost() {
402 connect_timing_.dns_start = base::TimeTicks::Now();
403 next_state_ = STATE_RESOLVE_HOST_COMPLETE;
404
405 // This use of base::Unretained is safe because resolver_ is destroyed in this
406 // object's destructor.
407 int result =
408 resolver_.Resolve(params_->destination(),
409 priority(),
410 &addresses_,
411 base::Bind(&WebSocketTransportConnectJob::OnIOComplete,
412 base::Unretained(this)),
413 net_log());
414
415 return result;
416 }
417
418 int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
419 connect_timing_.dns_end = base::TimeTicks::Now();
420 // Overwrite connection start time, since for connections that do not go
421 // through proxies, |connect_start| should not include dns lookup time.
422 connect_timing_.connect_start = connect_timing_.dns_end;
423
424 if (result == OK) {
425 // Invoke callback, and abort if it fails.
426 if (!params_->host_resolution_callback().is_null())
427 result = params_->host_resolution_callback().Run(addresses_, net_log());
428
429 if (result == OK)
430 next_state_ = STATE_TRANSPORT_CONNECT;
431 }
432
433 return result;
434 }
435
436 int WebSocketTransportConnectJob::DoTransportConnect() {
437 AddressList ipv4_addresses;
438 AddressList ipv6_addresses;
439 int result = ERR_UNEXPECTED;
440 next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
441
442 for (AddressList::const_iterator it = addresses_.begin();
443 it != addresses_.end();
444 ++it) {
445 switch (it->GetFamily()) {
446 case ADDRESS_FAMILY_IPV4:
447 ipv4_addresses.push_back(*it);
448 break;
449
450 case ADDRESS_FAMILY_IPV6:
451 ipv6_addresses.push_back(*it);
452 break;
453
454 default:
455 DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
456 break;
457 }
458 }
459
460 if (!ipv4_addresses.empty()) {
461 had_ipv4_ = true;
462 ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4));
463 }
464
465 if (!ipv6_addresses.empty()) {
466 had_ipv6_ = true;
467 ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6));
468 result = ipv6_job_->Start();
469 if (result == OK) {
470 SetSocket(ipv6_job_->PassSocket());
471 histogram_ = had_ipv4_ ? CONNECTION_LATENCY_IPV6_RACEABLE
472 : CONNECTION_LATENCY_IPV6_SOLO;
473 return result;
474 }
475 if (result == ERR_IO_PENDING && ipv4_job_) {
476 // This use of base::Unretained is safe because fallback_timer_ is owned
477 // by this object.
478 fallback_timer_.Start(
479 FROM_HERE,
480 base::TimeDelta::FromMilliseconds(kIPv6FallbackTimerInMs),
481 base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
482 base::Unretained(this)));
483 }
484 if (result != ERR_IO_PENDING)
485 ipv6_job_.reset();
486 }
487
488 if (ipv4_job_ && !ipv6_job_) {
489 result = ipv4_job_->Start();
490 if (result == OK) {
491 SetSocket(ipv4_job_->PassSocket());
492 histogram_ = had_ipv6_ ? CONNECTION_LATENCY_IPV4_WINS_RACE
493 : CONNECTION_LATENCY_IPV4_NO_RACE;
494 }
495 }
496
497 return result;
498 }
499
500 int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
501 if (result == OK) {
502 DCHECK(!connect_timing_.connect_start.is_null());
tyoshino (SeeGerritForStatus) 2014/05/22 06:04:01 please try to reduce code duplication even by spli
Adam Rice 2014/05/22 14:08:00 Okay, I have split out the common code from here a
503 DCHECK(!connect_timing_.dns_start.is_null());
504 const base::TimeTicks now = base::TimeTicks::Now();
505 const base::TimeDelta total_duration = now - connect_timing_.dns_start;
506 const base::TimeDelta one_millisecond =
507 base::TimeDelta::FromMilliseconds(1);
508 const base::TimeDelta ten_minutes = base::TimeDelta::FromMinutes(10);
509
510 UMA_HISTOGRAM_CUSTOM_TIMES("Net.DNS_Resolution_And_TCP_Connection_Latency2",
511 total_duration,
512 one_millisecond,
513 ten_minutes,
514 100);
515
516 base::TimeDelta connect_duration = now - connect_timing_.connect_start;
517 UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency",
518 connect_duration,
519 one_millisecond,
520 ten_minutes,
521 100);
522
523 switch (histogram_) {
524 case CONNECTION_LATENCY_IPV4_WINS_RACE:
525 UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv4_Wins_Race",
526 connect_duration,
527 one_millisecond,
528 ten_minutes,
529 100);
530 break;
531
532 case CONNECTION_LATENCY_IPV4_NO_RACE:
533 UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv4_No_Race",
534 connect_duration,
535 one_millisecond,
536 ten_minutes,
537 100);
538 break;
539
540 case CONNECTION_LATENCY_IPV6_RACEABLE:
541 UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv6_Raceable",
542 connect_duration,
543 one_millisecond,
544 ten_minutes,
545 100);
546 break;
547
548 case CONNECTION_LATENCY_IPV6_SOLO:
549 UMA_HISTOGRAM_CUSTOM_TIMES("Net.TCP_Connection_Latency_IPv6_Solo",
550 connect_duration,
551 one_millisecond,
552 ten_minutes,
553 100);
554 break;
555
556 default:
557 NOTREACHED();
558 break;
559 }
560 }
561 return result;
562 }
563
564 void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) {
565 if (result == OK) {
566 switch (job->type()) {
567 case SUB_JOB_IPV4:
568 histogram_ = had_ipv6_ ? CONNECTION_LATENCY_IPV4_WINS_RACE
569 : CONNECTION_LATENCY_IPV4_NO_RACE;
570 break;
571
572 case SUB_JOB_IPV6:
573 histogram_ = had_ipv4_ ? CONNECTION_LATENCY_IPV6_RACEABLE
574 : CONNECTION_LATENCY_IPV6_SOLO;
575 break;
576 }
577 SetSocket(job->PassSocket());
578
579 // Make sure all connections are cancelled even if this object fails to be
580 // deleted.
581 ipv4_job_.reset();
582 ipv6_job_.reset();
583 } else {
584 switch (job->type()) {
585 case SUB_JOB_IPV4:
586 ipv4_job_.reset();
587 break;
588
589 case SUB_JOB_IPV6:
590 ipv6_job_.reset();
591 if (ipv4_job_ && !ipv4_job_->started()) {
592 fallback_timer_.Stop();
593 result = ipv4_job_->Start();
594 if (result != ERR_IO_PENDING) {
595 OnSubJobComplete(result, ipv4_job_.get());
596 return;
597 }
598 }
599 break;
600 }
601 if (ipv4_job_ || ipv6_job_)
602 result = ERR_IO_PENDING;
603 }
604 OnIOComplete(result);
605 }
606
607 void WebSocketTransportConnectJob::StartIPv4JobAsync() {
608 DCHECK(ipv4_job_);
609 int result = ipv4_job_->Start();
610 if (result != ERR_IO_PENDING)
611 OnSubJobComplete(result, ipv4_job_.get());
612 }
613
614 int WebSocketTransportConnectJob::ConnectInternal() {
615 next_state_ = STATE_RESOLVE_HOST;
616 return DoLoop(OK);
617 }
618
619 WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
620 int max_sockets,
621 int max_sockets_per_group,
622 ClientSocketPoolHistograms* histograms,
623 HostResolver* host_resolver,
624 ClientSocketFactory* client_socket_factory,
625 NetLog* net_log)
626 : TransportClientSocketPool(max_sockets,
627 max_sockets_per_group,
628 histograms,
629 host_resolver,
630 client_socket_factory,
631 net_log),
632 connect_job_delegate_(this),
633 histograms_(histograms),
634 pool_net_log_(net_log),
635 client_socket_factory_(client_socket_factory),
636 host_resolver_(host_resolver),
637 max_sockets_(max_sockets),
638 handed_out_socket_count_(0),
639 is_stalled_(false),
640 weak_factory_(this) {}
641
642 WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
643 DCHECK(pending_connects_.empty());
644 DCHECK_EQ(0, handed_out_socket_count_);
645 DCHECK(!is_stalled_);
646 }
647
648 // static
649 void WebSocketTransportClientSocketPool::UnlockEndpoint(
650 ClientSocketHandle* handle) {
651 DCHECK(handle->is_initialized());
652 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket());
653 }
654
655 int WebSocketTransportClientSocketPool::RequestSocket(
656 const std::string& group_name,
657 const void* params,
658 RequestPriority priority,
659 ClientSocketHandle* handle,
660 const CompletionCallback& callback,
661 const BoundNetLog& request_net_log) {
662 DCHECK(params);
663 const scoped_refptr<TransportSocketParams>& casted_params =
664 *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
665
666 if (request_net_log.IsLogging()) {
667 // TODO(eroman): Split out the host and port parameters.
668 request_net_log.AddEvent(
669 NetLog::TYPE_TCP_CLIENT_SOCKET_POOL_REQUESTED_SOCKET,
670 CreateNetLogHostPortPairCallback(
671 &casted_params->destination().host_port_pair()));
672 }
673
674 CHECK(!callback.is_null());
675 CHECK(handle);
676
677 request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
678
679 if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
680 request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
681 is_stalled_ = true;
682 return ERR_IO_PENDING;
683 }
684
685 scoped_ptr<WebSocketTransportConnectJob> connect_job(
686 new WebSocketTransportConnectJob(
687 group_name,
688 priority,
689 casted_params,
690 base::TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds),
691 callback,
692 client_socket_factory_,
693 host_resolver_,
694 handle,
695 &connect_job_delegate_,
696 pool_net_log_,
697 request_net_log));
698
699 int rv = connect_job->Connect();
700 request_net_log.AddEvent(
701 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
702 connect_job->net_log().source().ToEventParametersCallback());
703 if (rv == OK) {
704 HandOutSocket(connect_job->PassSocket(),
705 connect_job->connect_timing(),
706 handle,
707 request_net_log);
708 } else if (rv == ERR_IO_PENDING) {
709 // TODO(ricea): Implement backup job timer?
710 AddJob(handle, connect_job.Pass());
711 } else {
712 scoped_ptr<StreamSocket> error_socket;
713 connect_job->GetAdditionalErrorState(handle);
714 error_socket = connect_job->PassSocket();
715 if (error_socket) {
716 HandOutSocket(error_socket.Pass(),
717 connect_job->connect_timing(),
718 handle,
719 request_net_log);
720 }
721 }
722
723 if (rv != ERR_IO_PENDING) {
724 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
725 }
726
727 return rv;
728 }
729
730 void WebSocketTransportClientSocketPool::RequestSockets(
731 const std::string& group_name,
732 const void* params,
733 int num_sockets,
734 const BoundNetLog& net_log) {
735 NOTIMPLEMENTED();
736 }
737
738 void WebSocketTransportClientSocketPool::CancelRequest(
739 const std::string& group_name,
740 ClientSocketHandle* handle) {
741 CancelJob(handle);
742 }
743
744 void WebSocketTransportClientSocketPool::ReleaseSocket(
745 const std::string& group_name,
746 scoped_ptr<StreamSocket> socket,
747 int id) {
748 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
749 CHECK_GT(handed_out_socket_count_, 0);
750 --handed_out_socket_count_;
751 if (!ReachedMaxSocketsLimit())
752 is_stalled_ = false;
753 }
754
755 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
756 CancelAllConnectJobs();
757 }
758
759 void WebSocketTransportClientSocketPool::CloseIdleSockets() {
760 // We have no idle sockets.
761 }
762
763 int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; }
764
765 int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
766 const std::string& group_name) const {
767 return 0;
768 }
769
770 LoadState WebSocketTransportClientSocketPool::GetLoadState(
771 const std::string& group_name,
772 const ClientSocketHandle* handle) const {
773 if (pending_callbacks_.count(handle))
774 return LOAD_STATE_CONNECTING;
775 return LookupConnectJob(handle)->GetLoadState();
776 }
777
778 base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue(
779 const std::string& name,
780 const std::string& type,
781 bool include_nested_pools) const {
782 base::DictionaryValue* dict = new base::DictionaryValue();
783 dict->SetString("name", name);
784 dict->SetString("type", type);
785 dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
786 dict->SetInteger("connecting_socket_count", pending_connects_.size());
787 dict->SetInteger("idle_socket_count", 0);
788 dict->SetInteger("max_socket_count", max_sockets_);
789 dict->SetInteger("max_sockets_per_group", max_sockets_);
790 dict->SetInteger("pool_generation_number", 0);
791 return dict;
792 }
793
794 base::TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
795 return base::TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
796 }
797
798 ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms()
799 const {
800 return histograms_;
801 }
802
803 bool WebSocketTransportClientSocketPool::IsStalled() const {
804 return is_stalled_;
805 }
806
807 void WebSocketTransportClientSocketPool::AddHigherLayeredPool(
808 HigherLayeredPool* higher_pool) {
809 CHECK(higher_pool);
810 CHECK(!ContainsKey(higher_pools_, higher_pool));
811 higher_pools_.insert(higher_pool);
812 }
813
814 void WebSocketTransportClientSocketPool::RemoveHigherLayeredPool(
815 HigherLayeredPool* higher_pool) {
816 CHECK(higher_pool);
817 CHECK(ContainsKey(higher_pools_, higher_pool));
818 higher_pools_.erase(higher_pool);
819 }
820
821 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
822 int result,
823 WebSocketTransportConnectJob* job) {
824 DCHECK_NE(ERR_IO_PENDING, result);
825
826 scoped_ptr<StreamSocket> socket = job->PassSocket();
827
828 BoundNetLog request_net_log = job->request_net_log();
829 CompletionCallback callback = job->callback();
830 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
831
832 ClientSocketHandle* const handle = job->handle();
833
834 if (result == OK) {
835 DCHECK(socket.get());
836 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
837 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
838 } else {
839 // If we got a socket, it must contain error information so pass that
840 // up so that the caller can retrieve it.
841 bool handed_out_socket = false;
842 job->GetAdditionalErrorState(handle);
843 if (socket.get()) {
844 handed_out_socket = true;
845 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
846 }
847 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
848
849 if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit())
850 is_stalled_ = false;
851 }
852 bool delete_succeeded = DeleteJob(handle);
853 DCHECK(delete_succeeded);
854 InvokeUserCallbackLater(handle, callback, result);
855 }
856
857 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
858 ClientSocketHandle* handle,
859 const CompletionCallback& callback,
860 int rv) {
861 DCHECK(!pending_callbacks_.count(handle));
862 pending_callbacks_.insert(handle);
863 base::MessageLoop::current()->PostTask(
864 FROM_HERE,
865 base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
866 weak_factory_.GetWeakPtr(),
867 handle,
868 callback,
869 rv));
870 }
871
872 void WebSocketTransportClientSocketPool::InvokeUserCallback(
873 ClientSocketHandle* handle,
874 const CompletionCallback& callback,
875 int rv) {
876 if (pending_callbacks_.erase(handle))
877 callback.Run(rv);
878 }
879
880 bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
881 return base::checked_cast<int>(pending_connects_.size()) +
882 handed_out_socket_count_ >=
883 max_sockets_;
884 }
885
886 void WebSocketTransportClientSocketPool::HandOutSocket(
887 scoped_ptr<StreamSocket> socket,
888 const LoadTimingInfo::ConnectTiming& connect_timing,
889 ClientSocketHandle* handle,
890 const BoundNetLog& net_log) {
891 DCHECK(socket);
892 handle->SetSocket(socket.Pass());
893 handle->set_reuse_type(ClientSocketHandle::UNUSED);
894 handle->set_idle_time(base::TimeDelta());
895 handle->set_pool_id(0);
896 handle->set_connect_timing(connect_timing);
897
898 net_log.AddEvent(
899 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
900 handle->socket()->NetLog().source().ToEventParametersCallback());
901
902 ++handed_out_socket_count_;
903 }
904
905 void WebSocketTransportClientSocketPool::AddJob(
906 ClientSocketHandle* handle,
907 scoped_ptr<WebSocketTransportConnectJob> connect_job) {
908 bool inserted =
909 pending_connects_.insert(PendingConnectsMap::value_type(
910 handle, connect_job.release())).second;
911 DCHECK(inserted);
912 }
913
914 bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
915 PendingConnectsMap::iterator it = pending_connects_.find(handle);
916 if (it == pending_connects_.end())
917 return false;
918 delete it->second, it->second = NULL;
919 pending_connects_.erase(it);
920 return true;
921 }
922
923 void WebSocketTransportClientSocketPool::CancelJob(ClientSocketHandle* handle) {
924 if (!DeleteJob(handle))
925 pending_callbacks_.erase(handle);
926 }
927
928 void WebSocketTransportClientSocketPool::CancelAllConnectJobs() {
929 STLDeleteValues(&pending_connects_);
930 }
931
932 const WebSocketTransportConnectJob*
933 WebSocketTransportClientSocketPool::LookupConnectJob(
934 const ClientSocketHandle* handle) const {
935 PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
936 CHECK(it != pending_connects_.end());
937 return it->second;
938 }
939
940 WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
941 WebSocketTransportClientSocketPool* owner)
942 : owner_(owner) {}
943
944 WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
945
946 void
947 WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
948 int result,
949 ConnectJob* job) {
950 owner_->OnConnectJobComplete(result,
951 static_cast<WebSocketTransportConnectJob*>(job));
952 }
953
954 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698