OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/proxy/multi_threaded_proxy_resolver.h" | 5 #include "net/proxy/multi_threaded_proxy_resolver.h" |
6 | 6 |
7 #include <deque> | 7 #include <deque> |
8 #include <utility> | 8 #include <utility> |
9 #include <vector> | 9 #include <vector> |
10 | 10 |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 size_t max_num_threads, | 110 size_t max_num_threads, |
111 const scoped_refptr<ProxyResolverScriptData>& script_data, | 111 const scoped_refptr<ProxyResolverScriptData>& script_data, |
112 scoped_refptr<Executor> executor); | 112 scoped_refptr<Executor> executor); |
113 | 113 |
114 ~MultiThreadedProxyResolver() override; | 114 ~MultiThreadedProxyResolver() override; |
115 | 115 |
116 // ProxyResolver implementation: | 116 // ProxyResolver implementation: |
117 int GetProxyForURL(const GURL& url, | 117 int GetProxyForURL(const GURL& url, |
118 ProxyInfo* results, | 118 ProxyInfo* results, |
119 const CompletionCallback& callback, | 119 const CompletionCallback& callback, |
120 RequestHandle* request, | 120 scoped_ptr<Request>* request, |
121 const BoundNetLog& net_log) override; | 121 const BoundNetLog& net_log) override; |
122 void CancelRequest(RequestHandle request) override; | |
123 LoadState GetLoadState(RequestHandle request) const override; | |
124 | 122 |
125 private: | 123 private: |
126 class GetProxyForURLJob; | 124 class GetProxyForURLJob; |
| 125 class RequestImpl; |
127 // FIFO queue of pending jobs waiting to be started. | 126 // FIFO queue of pending jobs waiting to be started. |
128 // TODO(eroman): Make this priority queue. | 127 // TODO(eroman): Make this priority queue. |
129 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; | 128 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; |
130 typedef std::vector<scoped_refptr<Executor>> ExecutorList; | 129 typedef std::vector<scoped_refptr<Executor>> ExecutorList; |
131 | 130 |
132 // Returns an idle worker thread which is ready to receive GetProxyForURL() | 131 // Returns an idle worker thread which is ready to receive GetProxyForURL() |
133 // requests. If all threads are occupied, returns NULL. | 132 // requests. If all threads are occupied, returns NULL. |
134 Executor* FindIdleExecutor(); | 133 Executor* FindIdleExecutor(); |
135 | 134 |
136 // Creates a new worker thread, and appends it to |executors_|. | 135 // Creates a new worker thread, and appends it to |executors_|. |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
224 | 223 |
225 virtual ~Job() {} | 224 virtual ~Job() {} |
226 | 225 |
227 private: | 226 private: |
228 const Type type_; | 227 const Type type_; |
229 CompletionCallback callback_; | 228 CompletionCallback callback_; |
230 Executor* executor_; | 229 Executor* executor_; |
231 bool was_cancelled_; | 230 bool was_cancelled_; |
232 }; | 231 }; |
233 | 232 |
| 233 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request { |
| 234 public: |
| 235 explicit RequestImpl(scoped_refptr<Job> job) : job_(std::move(job)) {} |
| 236 |
| 237 ~RequestImpl() override { job_->Cancel(); } |
| 238 |
| 239 LoadState GetLoadState() override { |
| 240 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; |
| 241 } |
| 242 |
| 243 private: |
| 244 scoped_refptr<Job> job_; |
| 245 }; |
| 246 |
234 // CreateResolverJob ----------------------------------------------------------- | 247 // CreateResolverJob ----------------------------------------------------------- |
235 | 248 |
236 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. | 249 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. |
237 class CreateResolverJob : public Job { | 250 class CreateResolverJob : public Job { |
238 public: | 251 public: |
239 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, | 252 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, |
240 ProxyResolverFactory* factory) | 253 ProxyResolverFactory* factory) |
241 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), | 254 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), |
242 script_data_(script_data), | 255 script_data_(script_data), |
243 factory_(factory) {} | 256 factory_(factory) {} |
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
435 DCHECK(CalledOnValidThread()); | 448 DCHECK(CalledOnValidThread()); |
436 // We will cancel all outstanding requests. | 449 // We will cancel all outstanding requests. |
437 pending_jobs_.clear(); | 450 pending_jobs_.clear(); |
438 | 451 |
439 for (auto& executor : executors_) { | 452 for (auto& executor : executors_) { |
440 executor->Destroy(); | 453 executor->Destroy(); |
441 } | 454 } |
442 } | 455 } |
443 | 456 |
444 int MultiThreadedProxyResolver::GetProxyForURL( | 457 int MultiThreadedProxyResolver::GetProxyForURL( |
445 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, | 458 const GURL& url, |
446 RequestHandle* request, const BoundNetLog& net_log) { | 459 ProxyInfo* results, |
| 460 const CompletionCallback& callback, |
| 461 scoped_ptr<Request>* request, |
| 462 const BoundNetLog& net_log) { |
447 DCHECK(CalledOnValidThread()); | 463 DCHECK(CalledOnValidThread()); |
448 DCHECK(!callback.is_null()); | 464 DCHECK(!callback.is_null()); |
449 | 465 |
450 scoped_refptr<GetProxyForURLJob> job( | 466 scoped_refptr<GetProxyForURLJob> job( |
451 new GetProxyForURLJob(url, results, callback, net_log)); | 467 new GetProxyForURLJob(url, results, callback, net_log)); |
452 | 468 |
453 // Completion will be notified through |callback|, unless the caller cancels | 469 // Completion will be notified through |callback|, unless the caller cancels |
454 // the request using |request|. | 470 // the request using |request|. |
455 if (request) | 471 if (request) |
456 *request = reinterpret_cast<RequestHandle>(job.get()); | 472 request->reset(new RequestImpl(job)); |
457 | 473 |
458 // If there is an executor that is ready to run this request, submit it! | 474 // If there is an executor that is ready to run this request, submit it! |
459 Executor* executor = FindIdleExecutor(); | 475 Executor* executor = FindIdleExecutor(); |
460 if (executor) { | 476 if (executor) { |
461 DCHECK_EQ(0u, pending_jobs_.size()); | 477 DCHECK_EQ(0u, pending_jobs_.size()); |
462 executor->StartJob(job.get()); | 478 executor->StartJob(job.get()); |
463 return ERR_IO_PENDING; | 479 return ERR_IO_PENDING; |
464 } | 480 } |
465 | 481 |
466 // Otherwise queue this request. (We will schedule it to a thread once one | 482 // Otherwise queue this request. (We will schedule it to a thread once one |
467 // becomes available). | 483 // becomes available). |
468 job->WaitingForThread(); | 484 job->WaitingForThread(); |
469 pending_jobs_.push_back(job); | 485 pending_jobs_.push_back(job); |
470 | 486 |
471 // If we haven't already reached the thread limit, provision a new thread to | 487 // If we haven't already reached the thread limit, provision a new thread to |
472 // drain the requests more quickly. | 488 // drain the requests more quickly. |
473 if (executors_.size() < max_num_threads_) | 489 if (executors_.size() < max_num_threads_) |
474 AddNewExecutor(); | 490 AddNewExecutor(); |
475 | 491 |
476 return ERR_IO_PENDING; | 492 return ERR_IO_PENDING; |
477 } | 493 } |
478 | 494 |
479 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { | |
480 DCHECK(CalledOnValidThread()); | |
481 DCHECK(req); | |
482 | |
483 Job* job = reinterpret_cast<Job*>(req); | |
484 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); | |
485 | |
486 if (job->executor()) { | |
487 // If the job was already submitted to the executor, just mark it | |
488 // as cancelled so the user callback isn't run on completion. | |
489 job->Cancel(); | |
490 } else { | |
491 // Otherwise the job is just sitting in a queue. | |
492 PendingJobsQueue::iterator it = | |
493 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); | |
494 DCHECK(it != pending_jobs_.end()); | |
495 pending_jobs_.erase(it); | |
496 } | |
497 } | |
498 | |
499 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { | |
500 DCHECK(CalledOnValidThread()); | |
501 DCHECK(req); | |
502 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; | |
503 } | |
504 | 495 |
505 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { | 496 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { |
506 DCHECK(CalledOnValidThread()); | 497 DCHECK(CalledOnValidThread()); |
507 for (ExecutorList::iterator it = executors_.begin(); | 498 for (ExecutorList::iterator it = executors_.begin(); |
508 it != executors_.end(); ++it) { | 499 it != executors_.end(); ++it) { |
509 Executor* executor = it->get(); | 500 Executor* executor = it->get(); |
510 if (!executor->outstanding_job()) | 501 if (!executor->outstanding_job()) |
511 return executor; | 502 return executor; |
512 } | 503 } |
513 return NULL; | 504 return NULL; |
514 } | 505 } |
515 | 506 |
516 void MultiThreadedProxyResolver::AddNewExecutor() { | 507 void MultiThreadedProxyResolver::AddNewExecutor() { |
517 DCHECK(CalledOnValidThread()); | 508 DCHECK(CalledOnValidThread()); |
518 DCHECK_LT(executors_.size(), max_num_threads_); | 509 DCHECK_LT(executors_.size(), max_num_threads_); |
519 // The "thread number" is used to give the thread a unique name. | 510 // The "thread number" is used to give the thread a unique name. |
520 int thread_number = executors_.size(); | 511 int thread_number = executors_.size(); |
521 Executor* executor = new Executor(this, thread_number); | 512 Executor* executor = new Executor(this, thread_number); |
522 executor->StartJob( | 513 executor->StartJob( |
523 new CreateResolverJob(script_data_, resolver_factory_.get())); | 514 new CreateResolverJob(script_data_, resolver_factory_.get())); |
524 executors_.push_back(make_scoped_refptr(executor)); | 515 executors_.push_back(make_scoped_refptr(executor)); |
525 } | 516 } |
526 | 517 |
527 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { | 518 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
528 DCHECK(CalledOnValidThread()); | 519 DCHECK(CalledOnValidThread()); |
529 if (pending_jobs_.empty()) | 520 while (!pending_jobs_.empty()) { |
530 return; | 521 scoped_refptr<Job> job = pending_jobs_.front(); |
531 | 522 pending_jobs_.pop_front(); |
532 // Get the next job to process (FIFO). Transfer it from the pending queue | 523 if (!job->was_cancelled()) { |
533 // to the executor. | 524 executor->StartJob(job.get()); |
534 scoped_refptr<Job> job = pending_jobs_.front(); | 525 return; |
535 pending_jobs_.pop_front(); | 526 } |
536 executor->StartJob(job.get()); | 527 } |
537 } | 528 } |
538 | 529 |
539 } // namespace | 530 } // namespace |
540 | 531 |
541 class MultiThreadedProxyResolverFactory::Job | 532 class MultiThreadedProxyResolverFactory::Job |
542 : public ProxyResolverFactory::Request, | 533 : public ProxyResolverFactory::Request, |
543 public Executor::Coordinator { | 534 public Executor::Coordinator { |
544 public: | 535 public: |
545 Job(MultiThreadedProxyResolverFactory* factory, | 536 Job(MultiThreadedProxyResolverFactory* factory, |
546 const scoped_refptr<ProxyResolverScriptData>& script_data, | 537 const scoped_refptr<ProxyResolverScriptData>& script_data, |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
624 return ERR_IO_PENDING; | 615 return ERR_IO_PENDING; |
625 } | 616 } |
626 | 617 |
627 void MultiThreadedProxyResolverFactory::RemoveJob( | 618 void MultiThreadedProxyResolverFactory::RemoveJob( |
628 MultiThreadedProxyResolverFactory::Job* job) { | 619 MultiThreadedProxyResolverFactory::Job* job) { |
629 size_t erased = jobs_.erase(job); | 620 size_t erased = jobs_.erase(job); |
630 DCHECK_EQ(1u, erased); | 621 DCHECK_EQ(1u, erased); |
631 } | 622 } |
632 | 623 |
633 } // namespace net | 624 } // namespace net |
OLD | NEW |