Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Side by Side Diff: net/socket/websocket_transport_client_socket_pool.cc

Issue 240873003: Create WebSocketTransportClientSocketPool (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Factor out CurrentAddress() method in SubJob class. Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/socket/websocket_transport_client_socket_pool.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <map>
10 #include <utility>
11
12 #include "base/compiler_specific.h"
13 #include "base/containers/linked_list.h"
14 #include "base/logging.h"
15 #include "base/memory/singleton.h"
16 #include "base/numerics/safe_conversions.h"
17 #include "base/stl_util.h"
18 #include "base/strings/string_util.h"
19 #include "base/time/time.h"
20 #include "base/values.h"
21 #include "net/base/ip_endpoint.h"
22 #include "net/base/net_errors.h"
23 #include "net/base/net_log.h"
24 #include "net/socket/client_socket_factory.h"
25 #include "net/socket/client_socket_handle.h"
26 #include "net/socket/client_socket_pool_base.h"
27
28 namespace net {
29
30 namespace {
31
32 using base::TimeDelta;
33
34 // TODO(ricea): For now, we implement a global timeout for compatability with
35 // TransportConnectJob. Since WebSocketTransportConnectJob controls the address
36 // selection process more tightly, it could do something smarter here.
37 const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes.
38
39 } // namespace
40
41 class WebSocketEndpointLockManager {
Johnny 2014/06/16 23:41:11 Should this be in it's own .h & .cc?
Adam Rice 2014/06/19 13:55:20 Done.
42 public:
43 typedef WebSocketTransportConnectJob::SubJob SubJob;
44 static WebSocketEndpointLockManager* GetInstance();
45
46 // Returns OK if lock was acquired immediately, ERR_IO_PENDING if not. If the
47 // lock was not acquired, then |job->GotEndpointLock()| will be called when it
48 // is.
49 int LockEndpoint(const IPEndPoint& endpoint, SubJob* job);
50
51 // Record the IPEndPoint associated with a particular socket. This is
52 // necessary because TCPClientSocket refuses to return the PeerAddress after
53 // the connection is disconnected. The association will be forgotten when
54 // UnlockSocket() is called. The |socket| pointer must not be deleted between
55 // the call to RememberSocket() and the call to UnlockSocket().
56 void RememberSocket(StreamSocket* socket, const IPEndPoint& endpoint);
57
58 // Release the lock on an endpoint, and, if appropriate, trigger the next
59 // socket connection. It is permitted to call UnlockSocket() multiple times
60 // for the same |socket|; all calls after the first will be ignored.
Johnny 2014/06/16 23:41:11 Is this expected to happen? If not, why not be mor
Adam Rice 2014/06/19 13:55:20 I clarified the comment. In the case of a successf
61 void UnlockSocket(StreamSocket* socket);
62
63 // Release the lock on |endpoint|. Most callers should use UnlockSocket()
64 // instead.
65 void UnlockEndpoint(const IPEndPoint& endpoint);
66
67 private:
68 typedef base::LinkedList<SubJob> ConnectJobQueue;
69 typedef std::map<IPEndPoint, ConnectJobQueue*> EndPointJobMap;
70 typedef std::map<StreamSocket*, IPEndPoint> SocketEndPointMap;
71
72 WebSocketEndpointLockManager() {}
73 ~WebSocketEndpointLockManager() {
74 DCHECK(endpoint_job_map_.empty());
75 DCHECK(socket_endpoint_map_.empty());
76 }
77
78 EndPointJobMap endpoint_job_map_;
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 please explain what an endpoint_job_map_ entry wit
Adam Rice 2014/06/19 13:55:20 Done.
79 SocketEndPointMap socket_endpoint_map_;
80
81 friend struct DefaultSingletonTraits<WebSocketEndpointLockManager>;
82
83 DISALLOW_COPY_AND_ASSIGN(WebSocketEndpointLockManager);
84 };
85
86 WebSocketEndpointLockManager* WebSocketEndpointLockManager::GetInstance() {
87 return Singleton<WebSocketEndpointLockManager>::get();
88 }
89
90 class WebSocketTransportConnectJob::SubJob : public base::LinkNode<SubJob> {
91 public:
92 SubJob(const AddressList& addresses,
93 WebSocketTransportConnectJob* parent_job,
94 SubJobType type)
95 : parent_job_(parent_job),
96 addresses_(addresses),
97 current_address_index_(0),
98 next_state_(STATE_NONE),
99 type_(type) {
100 CHECK_LE(addresses_.size(),
Johnny 2014/06/16 23:41:11 This is a pretty loose CHECK. I'm not sure what it
Adam Rice 2014/06/19 13:55:20 current_address_index_ was an int in imitation of
101 static_cast<size_t>(std::numeric_limits<int>::max()));
102 }
103
104 ~SubJob() {
105 // We don't worry about cancelling the TCP connect, since ~StreamSocket will
106 // take care of it.
107 if (next()) {
108 DCHECK(previous());
109 DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
110 RemoveFromList();
111 } else if (next_state_ == STATE_TRANSPORT_CONNECT_COMPLETE) {
112 WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 don't we call UnlockEndpoint() twice when DoTransp
Adam Rice 2014/06/19 13:55:20 No, because next_state_ is set to STATE_NONE in Do
113 CurrentAddress());
114 }
115 }
116
117 // Start connecting.
118 int Start() {
119 DCHECK_EQ(STATE_NONE, next_state_);
120 next_state_ = STATE_WAIT_FOR_LOCK;
121 return DoLoop(OK);
122 }
123
124 bool started() { return next_state_ != STATE_NONE; }
125
126 // Called by WebSocketEndpointLockManager when the lock becomes available.
127 void GotEndpointLock() {
128 DCHECK_EQ(STATE_WAIT_FOR_LOCK_COMPLETE, next_state_);
129 OnIOComplete(OK);
130 }
131
132 LoadState GetLoadState() const {
133 switch (next_state_) {
134 case STATE_WAIT_FOR_LOCK:
135 case STATE_WAIT_FOR_LOCK_COMPLETE:
136 // TODO(ricea): Add a WebSocket-specific LOAD_STATE ?
137 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
138 case STATE_TRANSPORT_CONNECT:
139 case STATE_TRANSPORT_CONNECT_COMPLETE:
140 return LOAD_STATE_CONNECTING;
141 case STATE_NONE:
142 return LOAD_STATE_IDLE;
143 }
144 NOTREACHED();
145 return LOAD_STATE_IDLE;
146 }
147
148 SubJobType type() const { return type_; }
149
150 scoped_ptr<StreamSocket> PassSocket() { return transport_socket_.Pass(); }
151
152 private:
153 enum State {
154 STATE_NONE,
155 STATE_WAIT_FOR_LOCK,
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 [optional] wondering if s/WAIT/OBTAIN/ sounds bett
Adam Rice 2014/06/19 13:55:20 Sounds good to me. Most of the time we don't actua
156 STATE_WAIT_FOR_LOCK_COMPLETE,
157 STATE_TRANSPORT_CONNECT,
158 STATE_TRANSPORT_CONNECT_COMPLETE,
159 };
160
161 ClientSocketFactory* client_socket_factory() const {
162 return parent_job_->data_.client_socket_factory();
163 }
164
165 const BoundNetLog& net_log() const { return parent_job_->net_log(); }
166
167 const IPEndPoint& CurrentAddress() const {
168 DCHECK_LT(current_address_index_, static_cast<int>(addresses_.size()));
169 return addresses_[current_address_index_];
170 }
171
172 void OnIOComplete(int result) {
173 int rv = DoLoop(result);
174 if (rv != ERR_IO_PENDING)
175 parent_job_->OnSubJobComplete(rv, this); // |this| deleted
176 }
177
178 int DoLoop(int result) {
179 DCHECK_NE(next_state_, STATE_NONE);
180
181 int rv = result;
182 do {
183 State state = next_state_;
184 next_state_ = STATE_NONE;
185 switch (state) {
186 case STATE_WAIT_FOR_LOCK:
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 DCHECK_EQ(OK, rv)
Adam Rice 2014/06/19 13:55:20 Done.
187 rv = DoEndpointLock();
188 break;
189 case STATE_WAIT_FOR_LOCK_COMPLETE:
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 DCHECK_EQ(OK, rv)
Adam Rice 2014/06/19 13:55:20 Done.
190 rv = DoEndpointLockComplete();
191 break;
192 case STATE_TRANSPORT_CONNECT:
193 DCHECK_EQ(OK, rv);
194 rv = DoTransportConnect();
195 break;
196 case STATE_TRANSPORT_CONNECT_COMPLETE:
197 rv = DoTransportConnectComplete(rv);
198 break;
199 default:
200 NOTREACHED();
201 rv = ERR_FAILED;
202 break;
203 }
204 } while (rv != ERR_IO_PENDING && next_state_ != STATE_NONE);
205
206 return rv;
207 }
208
209 int DoEndpointLock() {
210 int rv = WebSocketEndpointLockManager::GetInstance()->LockEndpoint(
211 CurrentAddress(), this);
212 next_state_ = STATE_WAIT_FOR_LOCK_COMPLETE;
213 return rv;
214 }
215
216 int DoEndpointLockComplete() {
217 next_state_ = STATE_TRANSPORT_CONNECT;
218 return OK;
219 }
220
221 int DoTransportConnect() {
222 // TODO(ricea): Update global g_last_connect_time and report
223 // ConnectInterval.
224 next_state_ = STATE_TRANSPORT_CONNECT_COMPLETE;
225 AddressList one_address(CurrentAddress());
226 transport_socket_ = client_socket_factory()->CreateTransportClientSocket(
227 one_address, net_log().net_log(), net_log().source());
228 // This use of base::Unretained() is safe because transport_socket_ is
229 // destroyed in the destructor.
230 return transport_socket_->Connect(
231 base::Bind(&SubJob::OnIOComplete, base::Unretained(this)));
232 }
233
234 int DoTransportConnectComplete(int result) {
235 WebSocketEndpointLockManager* endpoint_lock_manager =
236 WebSocketEndpointLockManager::GetInstance();
237 if (result != OK) {
238 endpoint_lock_manager->UnlockEndpoint(CurrentAddress());
239
240 if (current_address_index_ + 1 < static_cast<int>(addresses_.size())) {
241 // Try falling back to the next address in the list.
242 next_state_ = STATE_WAIT_FOR_LOCK;
243 ++current_address_index_;
244 result = OK;
245 }
246
247 return result;
248 }
249
250 endpoint_lock_manager->RememberSocket(transport_socket_.get(),
251 CurrentAddress());
252
253 return result;
Johnny 2014/06/16 23:41:11 Sanity check: Looks like WebSocketBasicHandshakeSt
Adam Rice 2014/06/19 13:55:20 Yes.
254 }
255
256 WebSocketTransportConnectJob* const parent_job_;
257
258 const AddressList addresses_;
259 int current_address_index_;
260
261 State next_state_;
262 const SubJobType type_;
263
264 scoped_ptr<StreamSocket> transport_socket_;
265
266 DISALLOW_COPY_AND_ASSIGN(SubJob);
267 };
268
269 int WebSocketEndpointLockManager::LockEndpoint(const IPEndPoint& endpoint,
270 SubJob* job) {
271 EndPointJobMap::value_type insert_value(endpoint, NULL);
272 std::pair<EndPointJobMap::iterator, bool> rv =
273 endpoint_job_map_.insert(insert_value);
274 if (rv.second) {
275 DVLOG(3) << "Locking endpoint " << endpoint.ToString();
276 rv.first->second = new ConnectJobQueue;
277 return OK;
278 }
279 DVLOG(3) << "Waiting for endpoint " << endpoint.ToString();
280 rv.first->second->Append(job);
281 return ERR_IO_PENDING;
282 }
283
284 void WebSocketEndpointLockManager::RememberSocket(StreamSocket* socket,
285 const IPEndPoint& endpoint) {
286 bool inserted = socket_endpoint_map_.insert(SocketEndPointMap::value_type(
287 socket, endpoint)).second;
288 DCHECK(inserted);
289 DCHECK(endpoint_job_map_.find(endpoint) != endpoint_job_map_.end());
290 DVLOG(3) << "Remembered (StreamSocket*)" << socket << " for "
291 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
292 << " sockets remembered)";
293 }
294
295 void WebSocketEndpointLockManager::UnlockSocket(StreamSocket* socket) {
296 SocketEndPointMap::iterator socket_it = socket_endpoint_map_.find(socket);
297 if (socket_it == socket_endpoint_map_.end()) {
298 DVLOG(3) << "Ignoring request to unlock already-unlocked socket"
299 "(StreamSocket*)" << socket;
300 return;
301 }
302 const IPEndPoint& endpoint = socket_it->second;
303 DVLOG(3) << "Unlocking (StreamSocket*)" << socket << " for "
304 << endpoint.ToString() << " (" << socket_endpoint_map_.size()
305 << " sockets left)";
306 UnlockEndpoint(endpoint);
307 socket_endpoint_map_.erase(socket_it);
308 }
309
310 void WebSocketEndpointLockManager::UnlockEndpoint(const IPEndPoint& endpoint) {
311 EndPointJobMap::iterator found_it = endpoint_job_map_.find(endpoint);
312 CHECK(found_it != endpoint_job_map_.end()); // Security critical
313 ConnectJobQueue* queue = found_it->second;
314 if (queue->empty()) {
315 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString();
316 delete queue;
317 endpoint_job_map_.erase(found_it);
318 } else {
319 DVLOG(3) << "Unlocking endpoint " << endpoint.ToString()
320 << " and activating next waiter";
321 SubJob* next_job = queue->head()->value();
322 next_job->RemoveFromList();
323 next_job->GotEndpointLock();
324 }
325 }
326
327 WebSocketTransportConnectJob::WebSocketTransportConnectJob(
328 const std::string& group_name,
329 RequestPriority priority,
330 const scoped_refptr<TransportSocketParams>& params,
331 TimeDelta timeout_duration,
332 const CompletionCallback& callback,
333 ClientSocketFactory* client_socket_factory,
334 HostResolver* host_resolver,
335 ClientSocketHandle* handle,
336 Delegate* delegate,
337 NetLog* pool_net_log,
338 const BoundNetLog& request_net_log)
339 : ConnectJob(group_name,
340 timeout_duration,
341 priority,
342 delegate,
343 BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
344 data_(params, client_socket_factory, host_resolver, &connect_timing_),
345 race_result_(TransportConnectJobHelper::CONNECTION_LATENCY_UNKNOWN),
346 handle_(handle),
347 callback_(callback),
348 request_net_log_(request_net_log),
349 had_ipv4_(false),
350 had_ipv6_(false) {
351 data_.SetOnIOComplete(this);
352 }
353
354 WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
355
356 LoadState WebSocketTransportConnectJob::GetLoadState() const {
357 LoadState load_state = LOAD_STATE_RESOLVING_HOST;
358 if (ipv6_job_)
359 load_state = ipv6_job_->GetLoadState();
360 // LOAD_STATE_CONNECTING is better than
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 could you please elaborate better for what point
Adam Rice 2014/06/19 13:55:20 I updated the comment. PTAL.
tyoshino (SeeGerritForStatus) 2014/06/20 12:12:06 Thanks
361 // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET.
362 if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
363 load_state = ipv4_job_->GetLoadState();
364 return load_state;
365 }
366
367 int WebSocketTransportConnectJob::DoResolveHost() {
368 return data_.DoResolveHost(priority(), net_log());
369 }
370
371 int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
372 return data_.DoResolveHostComplete(result, net_log());
373 }
374
375 int WebSocketTransportConnectJob::DoTransportConnect() {
376 AddressList ipv4_addresses;
377 AddressList ipv6_addresses;
378 int result = ERR_UNEXPECTED;
379 data_.set_next_state(
380 TransportConnectJobHelper::STATE_TRANSPORT_CONNECT_COMPLETE);
381
382 for (AddressList::const_iterator it = data_.addresses().begin();
383 it != data_.addresses().end();
384 ++it) {
385 switch (it->GetFamily()) {
386 case ADDRESS_FAMILY_IPV4:
387 ipv4_addresses.push_back(*it);
388 break;
389
390 case ADDRESS_FAMILY_IPV6:
391 ipv6_addresses.push_back(*it);
392 break;
393
394 default:
395 DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 do we want to ignore this?
Adam Rice 2014/06/19 13:55:20 At the moment this cannot happen, so we could put
tyoshino (SeeGerritForStatus) 2014/06/20 12:12:06 OK
396 break;
397 }
398 }
399
400 if (!ipv4_addresses.empty()) {
401 had_ipv4_ = true;
402 ipv4_job_.reset(new SubJob(ipv4_addresses, this, SUB_JOB_IPV4));
403 }
404
405 if (!ipv6_addresses.empty()) {
406 had_ipv6_ = true;
407 ipv6_job_.reset(new SubJob(ipv6_addresses, this, SUB_JOB_IPV6));
408 result = ipv6_job_->Start();
409 if (result == OK) {
410 SetSocket(ipv6_job_->PassSocket());
411 race_result_ =
412 had_ipv4_
413 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
414 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
415 return result;
416 }
417 if (result == ERR_IO_PENDING && ipv4_job_) {
418 // This use of base::Unretained is safe because fallback_timer_ is owned
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 |fallback_timer_|
Adam Rice 2014/06/19 13:55:20 Done.
419 // by this object.
420 fallback_timer_.Start(
421 FROM_HERE,
422 TimeDelta::FromMilliseconds(
423 TransportConnectJobHelper::kIPv6FallbackTimerInMs),
424 base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
425 base::Unretained(this)));
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 return here?
426 }
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 how about breaking L417 into two if-s and return w
Adam Rice 2014/06/19 13:55:20 Very clever. Done.
427 if (result != ERR_IO_PENDING)
428 ipv6_job_.reset();
429 }
430
431 if (ipv4_job_ && !ipv6_job_) {
432 result = ipv4_job_->Start();
433 if (result == OK) {
434 SetSocket(ipv4_job_->PassSocket());
435 race_result_ =
436 had_ipv6_
437 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
438 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
439 }
440 }
441
442 return result;
443 }
444
445 int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
446 if (result == OK)
447 data_.HistogramDuration(race_result_);
448 return result;
449 }
450
451 void WebSocketTransportConnectJob::OnSubJobComplete(int result, SubJob* job) {
452 if (result == OK) {
453 switch (job->type()) {
454 case SUB_JOB_IPV4:
455 race_result_ =
456 had_ipv6_
457 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
458 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
459 break;
460
461 case SUB_JOB_IPV6:
462 race_result_ =
463 had_ipv4_
464 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
465 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
466 break;
467 }
468 SetSocket(job->PassSocket());
469
470 // Make sure all connections are cancelled even if this object fails to be
471 // deleted.
472 ipv4_job_.reset();
473 ipv6_job_.reset();
474 } else {
475 switch (job->type()) {
476 case SUB_JOB_IPV4:
477 ipv4_job_.reset();
478 break;
479
480 case SUB_JOB_IPV6:
481 ipv6_job_.reset();
482 if (ipv4_job_ && !ipv4_job_->started()) {
483 fallback_timer_.Stop();
484 result = ipv4_job_->Start();
485 if (result != ERR_IO_PENDING) {
486 OnSubJobComplete(result, ipv4_job_.get());
487 return;
488 }
489 }
490 break;
491 }
492 if (ipv4_job_ || ipv6_job_)
493 return;
494 }
495 data_.OnIOComplete(this, result);
496 }
497
498 void WebSocketTransportConnectJob::StartIPv4JobAsync() {
499 DCHECK(ipv4_job_);
500 int result = ipv4_job_->Start();
501 if (result != ERR_IO_PENDING)
502 OnSubJobComplete(result, ipv4_job_.get());
503 }
504
505 int WebSocketTransportConnectJob::ConnectInternal() {
506 return data_.DoConnectInternal(this);
507 }
508
509 WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
510 int max_sockets,
511 int max_sockets_per_group,
512 ClientSocketPoolHistograms* histograms,
513 HostResolver* host_resolver,
514 ClientSocketFactory* client_socket_factory,
515 NetLog* net_log)
516 : TransportClientSocketPool(max_sockets,
517 max_sockets_per_group,
518 histograms,
519 host_resolver,
520 client_socket_factory,
521 net_log),
522 connect_job_delegate_(this),
523 histograms_(histograms),
524 pool_net_log_(net_log),
525 client_socket_factory_(client_socket_factory),
526 host_resolver_(host_resolver),
527 max_sockets_(max_sockets),
528 handed_out_socket_count_(0),
529 is_stalled_(false),
530 weak_factory_(this) {}
531
532 WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
533 DCHECK(pending_connects_.empty());
534 DCHECK_EQ(0, handed_out_socket_count_);
535 DCHECK(!is_stalled_);
536 }
537
538 // static
539 void WebSocketTransportClientSocketPool::UnlockEndpoint(
540 ClientSocketHandle* handle) {
541 DCHECK(handle->is_initialized());
542 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(handle->socket());
543 }
544
545 int WebSocketTransportClientSocketPool::RequestSocket(
546 const std::string& group_name,
547 const void* params,
548 RequestPriority priority,
549 ClientSocketHandle* handle,
550 const CompletionCallback& callback,
551 const BoundNetLog& request_net_log) {
552 DCHECK(params);
553 const scoped_refptr<TransportSocketParams>& casted_params =
554 *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
555
556 NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params);
557
558 CHECK(!callback.is_null());
559 CHECK(handle);
560
561 request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
562
563 if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
564 request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
565 is_stalled_ = true;
566 return ERR_IO_PENDING;
567 }
568
569 scoped_ptr<WebSocketTransportConnectJob> connect_job(
570 new WebSocketTransportConnectJob(
571 group_name,
572 priority,
573 casted_params,
574 TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds),
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:42 ConnectionTimeout()?
Adam Rice 2014/06/19 13:55:20 Done.
575 callback,
576 client_socket_factory_,
577 host_resolver_,
578 handle,
579 &connect_job_delegate_,
580 pool_net_log_,
581 request_net_log));
582
583 int rv = connect_job->Connect();
584 // Regardless of the outcome of |connect_job|, it will always be bound to
585 // |handle|, since this pool uses early-binding. So the binding is logged
586 // here, without waiting for the result.
587 request_net_log.AddEvent(
588 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
589 connect_job->net_log().source().ToEventParametersCallback());
590 if (rv == OK) {
591 HandOutSocket(connect_job->PassSocket(),
592 connect_job->connect_timing(),
593 handle,
594 request_net_log);
595 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
596 } else if (rv == ERR_IO_PENDING) {
597 // TODO(ricea): Implement backup job timer?
598 AddJob(handle, connect_job.Pass());
599 } else {
600 scoped_ptr<StreamSocket> error_socket;
601 connect_job->GetAdditionalErrorState(handle);
602 error_socket = connect_job->PassSocket();
603 if (error_socket) {
604 HandOutSocket(error_socket.Pass(),
605 connect_job->connect_timing(),
606 handle,
607 request_net_log);
608 }
609 }
610
611 if (rv != ERR_IO_PENDING) {
612 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
613 }
614
615 return rv;
616 }
617
618 void WebSocketTransportClientSocketPool::RequestSockets(
619 const std::string& group_name,
620 const void* params,
621 int num_sockets,
622 const BoundNetLog& net_log) {
623 NOTIMPLEMENTED();
624 }
625
626 void WebSocketTransportClientSocketPool::CancelRequest(
627 const std::string& group_name,
628 ClientSocketHandle* handle) {
629 if (!DeleteJob(handle))
630 pending_callbacks_.erase(handle);
631 }
632
633 void WebSocketTransportClientSocketPool::ReleaseSocket(
634 const std::string& group_name,
635 scoped_ptr<StreamSocket> socket,
636 int id) {
637 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
638 CHECK_GT(handed_out_socket_count_, 0);
639 --handed_out_socket_count_;
640 if (!ReachedMaxSocketsLimit())
641 is_stalled_ = false;
642 }
643
644 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
645 CancelAllConnectJobs();
646 }
647
648 void WebSocketTransportClientSocketPool::CloseIdleSockets() {
649 // We have no idle sockets.
650 }
651
652 int WebSocketTransportClientSocketPool::IdleSocketCount() const { return 0; }
653
654 int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
655 const std::string& group_name) const {
656 return 0;
657 }
658
659 LoadState WebSocketTransportClientSocketPool::GetLoadState(
660 const std::string& group_name,
661 const ClientSocketHandle* handle) const {
662 if (pending_callbacks_.count(handle))
663 return LOAD_STATE_CONNECTING;
664 return LookupConnectJob(handle)->GetLoadState();
665 }
666
667 base::DictionaryValue* WebSocketTransportClientSocketPool::GetInfoAsValue(
668 const std::string& name,
669 const std::string& type,
670 bool include_nested_pools) const {
671 base::DictionaryValue* dict = new base::DictionaryValue();
672 dict->SetString("name", name);
673 dict->SetString("type", type);
674 dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
675 dict->SetInteger("connecting_socket_count", pending_connects_.size());
676 dict->SetInteger("idle_socket_count", 0);
677 dict->SetInteger("max_socket_count", max_sockets_);
678 dict->SetInteger("max_sockets_per_group", max_sockets_);
679 dict->SetInteger("pool_generation_number", 0);
680 return dict;
681 }
682
683 TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
684 return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
685 }
686
687 ClientSocketPoolHistograms* WebSocketTransportClientSocketPool::histograms()
688 const {
689 return histograms_;
690 }
691
692 bool WebSocketTransportClientSocketPool::IsStalled() const {
693 return is_stalled_;
694 }
695
696 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
697 int result,
698 WebSocketTransportConnectJob* job) {
699 DCHECK_NE(ERR_IO_PENDING, result);
700
701 scoped_ptr<StreamSocket> socket = job->PassSocket();
702
703 BoundNetLog request_net_log = job->request_net_log();
704 CompletionCallback callback = job->callback();
705 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
706
707 ClientSocketHandle* const handle = job->handle();
708
709 if (result == OK) {
710 DCHECK(socket.get());
711 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
712 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
713 } else {
714 // If we got a socket, it must contain error information so pass that
715 // up so that the caller can retrieve it.
716 bool handed_out_socket = false;
717 job->GetAdditionalErrorState(handle);
718 if (socket.get()) {
719 handed_out_socket = true;
720 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
721 }
722 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
723
724 if (!handed_out_socket && is_stalled_ && !ReachedMaxSocketsLimit())
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 is this correct? pending_connects_.size() decrease
Adam Rice 2014/06/19 13:55:20 No, it's very wrong. I didn't write any code to re
725 is_stalled_ = false;
726 }
727 bool delete_succeeded = DeleteJob(handle);
728 DCHECK(delete_succeeded);
729 InvokeUserCallbackLater(handle, callback, result);
730 }
731
732 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
733 ClientSocketHandle* handle,
734 const CompletionCallback& callback,
735 int rv) {
736 DCHECK(!pending_callbacks_.count(handle));
737 pending_callbacks_.insert(handle);
738 base::MessageLoop::current()->PostTask(
739 FROM_HERE,
740 base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
741 weak_factory_.GetWeakPtr(),
742 handle,
743 callback,
744 rv));
745 }
746
747 void WebSocketTransportClientSocketPool::InvokeUserCallback(
748 ClientSocketHandle* handle,
749 const CompletionCallback& callback,
750 int rv) {
751 if (pending_callbacks_.erase(handle))
752 callback.Run(rv);
753 }
754
755 bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
756 return base::checked_cast<int>(pending_connects_.size()) +
757 handed_out_socket_count_ >=
758 max_sockets_;
759 }
760
761 void WebSocketTransportClientSocketPool::HandOutSocket(
762 scoped_ptr<StreamSocket> socket,
763 const LoadTimingInfo::ConnectTiming& connect_timing,
764 ClientSocketHandle* handle,
765 const BoundNetLog& net_log) {
766 DCHECK(socket);
767 handle->SetSocket(socket.Pass());
768 handle->set_reuse_type(ClientSocketHandle::UNUSED);
769 handle->set_idle_time(TimeDelta());
tyoshino (SeeGerritForStatus) 2014/06/18 06:49:43 needed?
Adam Rice 2014/06/19 13:55:20 No. Replaced with DCHECKs.
770 handle->set_pool_id(0);
771 handle->set_connect_timing(connect_timing);
772
773 net_log.AddEvent(
774 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
775 handle->socket()->NetLog().source().ToEventParametersCallback());
776
777 ++handed_out_socket_count_;
778 }
779
780 void WebSocketTransportClientSocketPool::AddJob(
781 ClientSocketHandle* handle,
782 scoped_ptr<WebSocketTransportConnectJob> connect_job) {
783 bool inserted =
784 pending_connects_.insert(PendingConnectsMap::value_type(
785 handle, connect_job.release())).second;
786 DCHECK(inserted);
787 }
788
789 bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
790 PendingConnectsMap::iterator it = pending_connects_.find(handle);
791 if (it == pending_connects_.end())
792 return false;
793 delete it->second, it->second = NULL;
794 pending_connects_.erase(it);
795 return true;
796 }
797
798 void WebSocketTransportClientSocketPool::CancelAllConnectJobs() {
799 STLDeleteValues(&pending_connects_);
800 }
801
802 const WebSocketTransportConnectJob*
803 WebSocketTransportClientSocketPool::LookupConnectJob(
804 const ClientSocketHandle* handle) const {
805 PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
806 CHECK(it != pending_connects_.end());
807 return it->second;
808 }
809
810 WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
811 WebSocketTransportClientSocketPool* owner)
812 : owner_(owner) {}
813
814 WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
815
816 void
817 WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
818 int result,
819 ConnectJob* job) {
820 owner_->OnConnectJobComplete(result,
821 static_cast<WebSocketTransportConnectJob*>(job));
822 }
823
824 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698