Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: net/socket/websocket_transport_client_socket_pool.cc

Issue 240873003: Create WebSocketTransportClientSocketPool (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Add a comment explaining the timing of the net logging of the handle binding. Update copyright mess… Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/websocket_transport_client_socket_pool.h"
6
7 #include <algorithm>
8 #include <map>
9 #include <utility>
10
11 #include "base/compiler_specific.h"
12 #include "base/containers/linked_list.h"
13 #include "base/logging.h"
14 #include "base/memory/singleton.h"
15 #include "base/numerics/safe_conversions.h"
16 #include "base/stl_util.h"
17 #include "base/strings/string_util.h"
18 #include "base/time/time.h"
19 #include "base/values.h"
20 #include "net/base/ip_endpoint.h"
21 #include "net/base/net_errors.h"
22 #include "net/base/net_log.h"
23 #include "net/socket/client_socket_factory.h"
24 #include "net/socket/client_socket_handle.h"
25 #include "net/socket/client_socket_pool_base.h"
26
27 namespace net {
28
29 namespace {
30
31 using base::TimeDelta;
32
33 // TODO(ricea): For now, we implement a global timeout for compatability with
34 // TransportConnectJob. Since WebSocketTransportConnectJob controls the address
35 // selection process more tightly, it could do something smarter here.
36 const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes.
37
38 } // namespace
39
40 class WebSocketEndpointLockManager {
41 public:
42 typedef WebSocketTransportConnectJob::SubJob SubJob;
43 static WebSocketEndpointLockManager* GetInstance();
44
45 // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the
46 // lock was not acquired, then |job->GotEndpointLock()| will be called when it
47 // is.
48 int LockEndpoint(const IPEndPoint& endpoint, SubJob* job);
49
50 // Record the IPEndPoint associated with a particular socket. This is
51 // necessary because TCPClientSocket refuses to return the PeerAddress after
52 // the connection is disconnected. The association will be forgotten when
53 // UnlockSocket() is called. The |socket| pointer must not be deleted between
54 // the call to RememberSocket().
tyoshino (SeeGerritForStatus) 2014/06/12 06:39:20 maybe this should be between the call to Remember
Adam Rice 2014/06/12 10:17:33 Thanks. Done.
55 void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint);
56
57 // Release the lock on an endpoint, and, if appropriate, trigger the next
58 // socket connection. It is permitted to call UnlockSocket() multiple times
59 // for the same |socket|; all calls after the first will be ignored.
60 void UnlockSocket(StreamSocket* socket);
61
62 // Release the lock on |endpoint|. Most callers should use UnlockSocket()
63 // instead.
64 void UnlockEndpoint(const IPEndPoint& endpoint);
65
66 private:
67 typedef base::LinkedList<SubJob> ConnectJobQueue;
68 typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap;
69 typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap;
70
71 WebSocketEndpointLockManager() {}
72 ~WebSocketEndpointLockManager() {
73 DCHECK(endpoint_job_map_.empty());
74 DCHECK(socket_endpoint_map_.empty());
75 }
76
77 EndPointJobMap endpoint_job_map_;
78 SocketEndPointMap socket_endpoint_map_;
79
80 friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>;
81
82 DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager);
83 };
84
85 WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() {
86 return Singleton<WebSocketEndpointLockManager>::get();
87 }
88
89 class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> {
90 public:
91 SubJob(const AddressList& addresses,
92 WebSocketTransportConnectJob* parent_job,
93 SubJobType type)
94 : addresses_(addresses),
95 parent_job_(parent_job),
96 next_state_(STATE_NONE),
97 current_address_index_(0),
98 type_(type) {}
99
100 ~SubJob() {
101 // We don't worry about cancelling the TCP connect, since ~StreamSocket will
102 // take care of it.
103 if (next()) {
104 DCHECK(previous());
105 DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
106 RemoveFromList();
107 } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) {
108 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
109 WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(
110 addresses_[current_address_index_]);
111 }
112 }
113
114 // Start connecting.
115 int Start() {
116 DCHECK_EQ(STATE_NONE, next_state_);
117 next_state_ = STATE_WAIT_FOR_LOCK;
118 return DoLoop(OK);
119 }
120
121 bool started() { return next_state_ != STATE_NONE; }
122
123 // Called by WebSocketEndpointLockManager when the lock becomes available.
124 void GotEndpointLock() { OnIOComplete(OK); }
tyoshino (SeeGerritForStatus) 2014/06/12 06:39:20 how about DCHECK on next_state_?
Adam Rice 2014/06/12 10:17:33 Done.
125
126 LoadState GetLoadState() const {
127 switch (next_state_) {
128 case STATE_WAIT_FOR_LOCK:
129 case STATE_WAIT_FOR_LOCK_COMPLETE:
130 // TODO(ricea): Add a WebSocket-specific LOAD_STATE ?
131 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
132 case STATE_TRANSPORT_CONNECT:
133 case STATE_TRANSPORT_CONNECT_COMPLETE:
134 return LOAD_STATE_CONNECTING;
135 case STATE_NONE:
136 return LOAD_STATE_IDLE;
137 }
138 NOTREACHED();
139 return LOAD_STATE_IDLE;
140 }
141
142 SubJobType type() const { return type_; }
143
144 scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); }
145
146 private:
147 enum State {
148 STATE_NONE,
149 STATE_WAIT_FOR_LOCK,
150 STATE_WAIT_FOR_LOCK_COMPLETE,
151 STATE_TRANSPORT_CONNECT,
152 STATE_TRANSPORT_CONNECT_COMPLETE,
153 };
154
155 ClientSocketFactory* client_socket_factory() const {
156 return parent_job_->data_.client_socket_factory();
157 }
158
159 const BoundNetLog& net_log() const { return parent_job_->net_log(); }
160
161 void OnIOComplete(int result) {
162 int rv = DoLoop(result);
163 if (rv != ERR_IO_PENDING)
164 parent_job_->OnSubJobComplete(rv, this); // |this| deleted
165 }
166
167 int DoLoop(int result) {
168 DCHECK_NE(next_state_, STATE_NONE);
169
170 int rv = result;
171 do {
172 State state = next_state_;
173 next_state_ = STATE_NONE;
174 switch (state) {
175 case STATE_WAIT_FOR_LOCK:
176 rv = DoEndpointLock();
177 break;
178 case STATE_WAIT_FOR_LOCK_COMPLETE:
179 rv = DoEndpointLockComplete();
180 break;
181 case STATE_TRANSPORT_CONNECT:
182 DCHECK_EQ(OK, rv);
183 rv = DoTransportConnect();
184 break;
185 case STATE_TRANSPORT_CONNECT_COMPLETE:
186 rv = DoTransportConnectComplete(rv);
187 break;
188 default:
189 NOTREACHED();
190 rv = ERR_FAILED;
191 break;
192 }
193 } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
194
195 return rv;
196 }
197
198 int DoEndpointLock() {
199 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
tyoshino (SeeGerritForStatus) 2014/06/12 06:39:20 factor out this DCHECK and addresses_[]?
Adam Rice 2014/06/12 10:17:32 Done.
200 int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint(
201 addresses_[current_address_index_], this);
202 next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE;
203 return rv;
204 }
205
206 int DoEndpointLockComplete() {
207 next_state_ = STATE_TRANSPORT_CONNECT;
208 return OK;
209 }
210
211 int DoTransportConnect() {
tyoshino (SeeGerritForStatus) 2014/06/12 06:39:20 add TODO to update g_last_connect_time and report
Adam Rice 2014/06/12 10:17:33 Done.
212 next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
213 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
214 AddressList one_address(addresses_[current_address_index_]);
215 transport_socket_ = client_socket_factory()->CreateTransportClientSocket(
216 one_address, net_log().net_log(), net_log().source());
217 // This use of base::Unretained() is safe because transport_socket_ is
218 // destroyed in the destructor.
219 return transport_socket_->Connect(
220 base::Bind(&SubJob::OnIOComplete, base::Unretained(this)));
221 }
222
223 int DoTransportConnectComplete(int result) {
224 WebSocketEndpointLockManager* endpoint_lock_manager =
225 WebSocketEndpointLockManager::GetInstance();
226 if (result != OK) {
227 endpoint_lock_manager->UnlockEndpoint(addresses_[current_address_index_]);
228
229 if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) {
230 // Try falling back to the next address in the list.
231 next_state_ = STATE_WAIT_FOR_LOCK;
232 ++current_address_index_;
233 result = OK;
234 }
235
236 return result;
237 }
238
239 endpoint_lock_manager->RememberSocket(transport_socket_.get(),
240 addresses_[current_address_index_]);
241
242 return result;
243 }
244
245 const AddressList addresses_;
246 WebSocketTransportConnectJob* const parent_job_;
247
248 State next_state_;
249 int current_address_index_;
tyoshino (SeeGerritForStatus) 2014/06/12 06:39:20 how about grouping addresses_ and current_address_
Adam Rice 2014/06/12 10:17:33 Done.
250 const SubJobType type_;
251
252 scoped_ptr<StreamSocket> transport_socket_;
253
254 DISALLOW_COPY_AND_ASSIGN(SubJob);
255 };
256
257 int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint,
258 SubJob* job) {
259 EndPointJobMap::value_type insert_value(endpoint, NULL);
260 std::pair<EndPointJobMap::iterator, bool> rv =
261 endpoint_job_map_.insert(insert_value);
262 if (rv.second) {
263 DVLOG(3) << "Locking endpoint " << endpoint.ToString();
264 rv.first->second = new ConnectJobQueue;
265 return OK;
266 }
267 DVLOG(3) << "Waiting for endpoint " << endpoint.ToString();
268 rv.first->second->Append(job);
269 return ERR_IO_PENDING;
270 }
271
272 void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket,
273 const IPEndPoint& endpoint) {
274 bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type(
275 socket, endpoint)).second;
276 DCHECK(inserted);
277 DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end());
278 DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for "
279 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
280 << " sockets remembered)";
281 }
282
283 void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) {
284 SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket);
285 if (socket_it == socket_endpoint_map_.end()) {
286 DVLOG(3) << "Ignoring request to unlock already-unlocked socket"
287 "(StreamSocket*)" << socket;
288 return;
289 }
290 const IPEndPoint& endpoint = socket_it->second;
291 DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for "
292 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
293 << " sockets left)";
294 UnlockEndpoint(endpoint);
295 socket_endpoint_map_.erase(socket_it);
296 }
297
298 void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) {
299 EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint);
300 CHECK(found_it != endpoint_job_map_.end()); // Security critical
301 ConnectJobQueue* queue = found_it->second;
302 if (queue->empty()) {
303 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString();
304 delete queue;
305 endpoint_job_map_.erase(found_it);
306 } else {
307 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString()
308 << " and activating next waiter";
309 SubJob* next_job = queue->head()->value();
310 next_job->RemoveFromList();
311 next_job->GotEndpointLock();
312 }
313 }
314
315 WebSocketTransportConnectJob::WebSocketTransportConnectJob(
316 const std::string& group_name,
317 RequestPriority priority,
318 const scoped_refptr<TransportSocketParams>& params,
319 TimeDelta timeout_duration,
320 const CompletionCallback& callback,
321 ClientSocketFactory* client_socket_factory,
322 HostResolver* host_resolver,
323 ClientSocketHandle* handle,
324 Delegate* delegate,
325 NetLog* pool_net_log,
326 const BoundNetLog& request_net_log)
327 : ConnectJob(group_name,
328 timeout_duration,
329 priority,
330 delegate,
331 BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
332 data_(params, client_socket_factory, host_resolver, &connect_timing_),
333 race_result_(TransportConnectJobHelper::CONNECTION_LATENCY_UNKNOWN),
334 handle_(handle),
335 callback_(callback),
336 request_net_log_(request_net_log),
337 had_ipv4_(false),
338 had_ipv6_(false) {
339 data_.SetOnIOComplete(this);
340 }
341
342 WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
343
344 LoadState WebSocketTransportConnectJob::GetLoadState() const {
345 LoadState load_state = LOAD_STATE_RESOLVING_HOST;
346 if (ipv6_job_)
347 load_state = ipv6_job_->GetLoadState();
348 // LOAD_STATE_CONNECTING is better than
349 // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET.
350 if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
351 load_state = ipv4_job_->GetLoadState();
352 return load_state;
353 }
354
355 int WebSocketTransportConnectJob::DoResolveHost() {
356 return data_.DoResolveHost(priority(), net_log());
357 }
358
359 int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
360 return data_.DoResolveHostComplete(result, net_log());
361 }
362
363 int WebSocketTransportConnectJob::DoTransportConnect() {
364 AddressList ipv4_addresses;
365 AddressList ipv6_addresses;
366 int result = ERR_UNEXPECTED;
367 data_.set_next_state(
368 TransportConnectJobHelper::STATE_TRANSPORT_CONNECT_COMPLETE);
369
370 for (AddressList::const_iterator it = data_.addresses().begin();
371 it != data_.addresses().end();
372 ++it) {
373 switch (it->GetFamily()) {
374 case ADDRESS_FAMILY_IPV4:
375 ipv4_addresses.push_back(*it);
376 break;
377
378 case ADDRESS_FAMILY_IPV6:
379 ipv6_addresses.push_back(*it);
380 break;
381
382 default:
383 DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
384 break;
385 }
386 }
387
388 if (!ipv4_addresses.empty()) {
389 had_ipv4_ = true;
390 ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4));
391 }
392
393 if (!ipv6_addresses.empty()) {
394 had_ipv6_ = true;
395 ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6));
396 result = ipv6_job_->Start();
397 if (result == OK) {
398 SetSocket(ipv6_job_->PassSocket());
399 race_result_ =
400 had_ipv4_
401 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
402 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
403 return result;
404 }
405 if (result == ERR_IO_PENDING && ipv4_job_) {
406 // This use of base::Unretained is safe because fallback_timer_ is owned
407 // by this object.
408 fallback_timer_.Start(
409 FROM_HERE,
410 TimeDelta::FromMilliseconds(
411 TransportConnectJobHelper::kIPv6FallbackTimerInMs),
412 base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
413 base::Unretained(this)));
414 }
415 if (result != ERR_IO_PENDING)
416 ipv6_job_.reset();
417 }
418
419 if (ipv4_job_ && !ipv6_job_) {
420 result = ipv4_job_->Start();
421 if (result == OK) {
422 SetSocket(ipv4_job_->PassSocket());
423 race_result_ =
424 had_ipv6_
425 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
426 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
427 }
428 }
429
430 return result;
431 }
432
433 int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
434 if (result == OK)
435 data_.HistogramDuration(race_result_);
436 return result;
437 }
438
439 void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) {
440 if (result == OK) {
441 switch (job->type()) {
442 case SUB_JOB_IPV4:
443 race_result_ =
444 had_ipv6_
445 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
446 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
447 break;
448
449 case SUB_JOB_IPV6:
450 race_result_ =
451 had_ipv4_
452 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
453 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
454 break;
455 }
456 SetSocket(job->PassSocket());
457
458 // Make sure all connections are cancelled even if this object fails to be
459 // deleted.
460 ipv4_job_.reset();
461 ipv6_job_.reset();
462 } else {
463 switch (job->type()) {
464 case SUB_JOB_IPV4:
465 ipv4_job_.reset();
466 break;
467
468 case SUB_JOB_IPV6:
469 ipv6_job_.reset();
470 if (ipv4_job_ && !ipv4_job_->started()) {
471 fallback_timer_.Stop();
472 result = ipv4_job_->Start();
473 if (result != ERR_IO_PENDING) {
474 OnSubJobComplete(result, ipv4_job_.get());
475 return;
476 }
477 }
478 break;
479 }
480 if (ipv4_job_ || ipv6_job_)
481 return;
482 }
483 data_.OnIOComplete(this, result);
484 }
485
486 void WebSocketTransportConnectJob::StartIPv4JobAsync() {
487 DCHECK(ipv4_job_);
488 int result = ipv4_job_->Start();
489 if (result != ERR_IO_PENDING)
490 OnSubJobComplete(result, ipv4_job_.get());
491 }
492
493 int WebSocketTransportConnectJob::ConnectInternal() {
494 return data_.DoConnectInternal(this);
495 }
496
497 WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
498 int max_sockets,
499 int max_sockets_per_group,
500 ClientSocketPoolHistograms* histograms,
501 HostResolver* host_resolver,
502 ClientSocketFactory* client_socket_factory,
503 NetLog* net_log)
504 : TransportClientSocketPool(max_sockets,
505 max_sockets_per_group,
506 histograms,
507 host_resolver,
508 client_socket_factory,
509 net_log),
510 connect_job_delegate_(this),
511 histograms_(histograms),
512 pool_net_log_(net_log),
513 client_socket_factory_(client_socket_factory),
514 host_resolver_(host_resolver),
515 max_sockets_(max_sockets),
516 handed_out_socket_count_(0),
517 is_stalled_(false),
518 weak_factory_(this) {}
519
520 WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
521 DCHECK(pending_connects_.empty());
522 DCHECK_EQ(0, handed_out_socket_count_);
523 DCHECK(!is_stalled_);
524 }
525
526 // static
527 void WebSocketTransportClientSocketPool::UnlockEndpoint(
528 ClientSocketHandle* handle) {
529 DCHECK(handle->is_initialized());
530 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket());
531 }
532
533 int WebSocketTransportClientSocketPool::RequestSocket(
534 const std::string& group_name,
535 const void* params,
536 RequestPriority priority,
537 ClientSocketHandle* handle,
538 const CompletionCallback& callback,
539 const BoundNetLog& request_net_log) {
540 DCHECK(params);
541 const scoped_refptr<TransportSocketParams>& casted_params =
542 *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
543
544 NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params);
545
546 CHECK(!callback.is_null());
547 CHECK(handle);
548
549 request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
550
551 if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
552 request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
553 is_stalled_ = true;
554 return ERR_IO_PENDING;
555 }
556
557 scoped_ptr<WebSocketTransportConnectJob> connect_job(
558 new WebSocketTransportConnectJob(
559 group_name,
560 priority,
561 casted_params,
562 TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds),
563 callback,
564 client_socket_factory_,
565 host_resolver_,
566 handle,
567 &connect_job_delegate_,
568 pool_net_log_,
569 request_net_log));
570
571 int rv = connect_job->Connect();
572 // Regardless of the outcome of |connect_job|, it will always be bound to
573 // |handle|, since this pool uses early-binding. So the binding is logged
574 // here, without waiting for the result.
575 request_net_log.AddEvent(
576 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
577 connect_job->net_log().source().ToEventParametersCallback());
578 if (rv == OK) {
579 HandOutSocket(connect_job->PassSocket(),
580 connect_job->connect_timing(),
581 handle,
582 request_net_log);
583 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
584 } else if (rv == ERR_IO_PENDING) {
585 // TODO(ricea): Implement backup job timer?
586 AddJob(handle, connect_job.Pass());
587 } else {
588 scoped_ptr<StreamSocket> error_socket;
589 connect_job->GetAdditionalErrorState(handle);
590 error_socket = connect_job->PassSocket();
591 if (error_socket) {
592 HandOutSocket(error_socket.Pass(),
593 connect_job->connect_timing(),
594 handle,
595 request_net_log);
596 }
597 }
598
599 if (rv != ERR_IO_PENDING) {
600 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
601 }
602
603 return rv;
604 }
605
606 void WebSocketTransportClientSocketPool::RequestSockets(
607 const std::string& group_name,
608 const void* params,
609 int num_sockets,
610 const BoundNetLog& net_log) {
611 NOTIMPLEMENTED();
612 }
613
614 void WebSocketTransportClientSocketPool::CancelRequest(
615 const std::string& group_name,
616 ClientSocketHandle* handle) {
617 if (!DeleteJob(handle))
618 pending_callbacks_.erase(handle);
619 }
620
621 void WebSocketTransportClientSocketPool::ReleaseSocket(
622 const std::string& group_name,
623 scoped_ptr<StreamSocket> socket,
624 int id) {
625 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
626 CHECK_GT(handed_out_socket_count_, 0);
627 --handed_out_socket_count_;
628 if (!ReachedMaxSocketsLimit())
629 is_stalled_ = false;
630 }
631
632 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
633 CancelAllConnectJobs();
634 }
635
636 void WebSocketTransportClientSocketPool::CloseIdleSockets() {
637 // We have no idle sockets.
638 }
639
640 int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; }
641
642 int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
643 const std::string& group_name) const {
644 return 0;
645 }
646
647 LoadState WebSocketTransportClientSocketPool::GetLoadState(
648 const std::string& group_name,
649 const ClientSocketHandle* handle) const {
650 if (pending_callbacks_.count(handle))
651 return LOAD_STATE_CONNECTING;
652 return LookupConnectJob(handle)->GetLoadState();
653 }
654
655 base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue(
656 const std::string& name,
657 const std::string& type,
658 bool include_nested_pools) const {
659 base::DictionaryValue* dict = new base::DictionaryValue();
660 dict->SetString("name", name);
661 dict->SetString("type", type);
662 dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
663 dict->SetInteger("connecting_socket_count", pending_connects_.size());
664 dict->SetInteger("idle_socket_count", 0);
665 dict->SetInteger("max_socket_count", max_sockets_);
666 dict->SetInteger("max_sockets_per_group", max_sockets_);
667 dict->SetInteger("pool_generation_number", 0);
668 return dict;
669 }
670
671 TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
672 return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
673 }
674
675 ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms()
676 const {
677 return histograms_;
678 }
679
680 bool WebSocketTransportClientSocketPool::IsStalled() const {
681 return is_stalled_;
682 }
683
684 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
685 int result,
686 WebSocketTransportConnectJob* job) {
687 DCHECK_NE(ERR_IO_PENDING, result);
688
689 scoped_ptr<StreamSocket> socket = job->PassSocket();
690
691 BoundNetLog request_net_log = job->request_net_log();
692 CompletionCallback callback = job->callback();
693 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
694
695 ClientSocketHandle* const handle = job->handle();
696
697 if (result == OK) {
698 DCHECK(socket.get());
699 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
700 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
701 } else {
702 // If we got a socket, it must contain error information so pass that
703 // up so that the caller can retrieve it.
704 bool handed_out_socket = false;
705 job->GetAdditionalErrorState(handle);
706 if (socket.get()) {
707 handed_out_socket = true;
708 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
709 }
710 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
711
712 if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit())
713 is_stalled_ = false;
714 }
715 bool delete_succeeded = DeleteJob(handle);
716 DCHECK(delete_succeeded);
717 InvokeUserCallbackLater(handle, callback, result);
718 }
719
720 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
721 ClientSocketHandle* handle,
722 const CompletionCallback& callback,
723 int rv) {
724 DCHECK(!pending_callbacks_.count(handle));
725 pending_callbacks_.insert(handle);
726 base::MessageLoop::current()->PostTask(
727 FROM_HERE,
728 base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
729 weak_factory_.GetWeakPtr(),
730 handle,
731 callback,
732 rv));
733 }
734
735 void WebSocketTransportClientSocketPool::InvokeUserCallback(
736 ClientSocketHandle* handle,
737 const CompletionCallback& callback,
738 int rv) {
739 if (pending_callbacks_.erase(handle))
740 callback.Run(rv);
741 }
742
743 bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
744 return base::checked_cast<int>(pending_connects_.size()) +
745 handed_out_socket_count_ >=
746 max_sockets_;
747 }
748
749 void WebSocketTransportClientSocketPool::HandOutSocket(
750 scoped_ptr<StreamSocket> socket,
751 const LoadTimingInfo::ConnectTiming& connect_timing,
752 ClientSocketHandle* handle,
753 const BoundNetLog& net_log) {
754 DCHECK(socket);
755 handle->SetSocket(socket.Pass());
756 handle->set_reuse_type(ClientSocketHandle::UNUSED);
757 handle->set_idle_time(TimeDelta());
758 handle->set_pool_id(0);
759 handle->set_connect_timing(connect_timing);
760
761 net_log.AddEvent(
762 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
763 handle->socket()->NetLog().source().ToEventParametersCallback());
764
765 ++handed_out_socket_count_;
766 }
767
768 void WebSocketTransportClientSocketPool::AddJob(
769 ClientSocketHandle* handle,
770 scoped_ptr<WebSocketTransportConnectJob> connect_job) {
771 bool inserted =
772 pending_connects_.insert(PendingConnectsMap::value_type(
773 handle, connect_job.release())).second;
774 DCHECK(inserted);
775 }
776
777 bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
778 PendingConnectsMap::iterator it = pending_connects_.find(handle);
779 if (it == pending_connects_.end())
780 return false;
781 delete it->second, it->second = NULL;
782 pending_connects_.erase(it);
783 return true;
784 }
785
786 void WebSocketTransportClientSocketPool::CancelAllConnectJobs() {
787 STLDeleteValues(&pending_connects_);
788 }
789
790 const WebSocketTransportConnectJob*
791 WebSocketTransportClientSocketPool::LookupConnectJob(
792 const ClientSocketHandle* handle) const {
793 PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
794 CHECK(it != pending_connects_.end());
795 return it->second;
796 }
797
798 WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
799 WebSocketTransportClientSocketPool* owner)
800 : owner_(owner) {}
801
802 WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
803
804 void
805 WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
806 int result,
807 ConnectJob* job) {
808 owner_->OnConnectJobComplete(result,
809 static_cast<WebSocketTransportConnectJob*>(job));
810 }
811
812 } // namespace net
OLDNEW
« no previous file with comments | « net/socket/websocket_transport_client_socket_pool.h ('k') | net/socket/websocket_transport_client_socket_pool_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698