OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/proxy/multi_threaded_proxy_resolver.h" | 5 #include "net/proxy/multi_threaded_proxy_resolver.h" |
6 | 6 |
7 #include <deque> | 7 #include <deque> |
8 #include <utility> | 8 #include <utility> |
9 #include <vector> | 9 #include <vector> |
10 | 10 |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 size_t max_num_threads, | 110 size_t max_num_threads, |
111 const scoped_refptr<ProxyResolverScriptData>& script_data, | 111 const scoped_refptr<ProxyResolverScriptData>& script_data, |
112 scoped_refptr<Executor> executor); | 112 scoped_refptr<Executor> executor); |
113 | 113 |
114 ~MultiThreadedProxyResolver() override; | 114 ~MultiThreadedProxyResolver() override; |
115 | 115 |
116 // ProxyResolver implementation: | 116 // ProxyResolver implementation: |
117 int GetProxyForURL(const GURL& url, | 117 int GetProxyForURL(const GURL& url, |
118 ProxyInfo* results, | 118 ProxyInfo* results, |
119 const CompletionCallback& callback, | 119 const CompletionCallback& callback, |
120 scoped_ptr<Request>* request, | 120 RequestHandle* request, |
121 const BoundNetLog& net_log) override; | 121 const BoundNetLog& net_log) override; |
| 122 void CancelRequest(RequestHandle request) override; |
| 123 LoadState GetLoadState(RequestHandle request) const override; |
122 | 124 |
123 private: | 125 private: |
124 class GetProxyForURLJob; | 126 class GetProxyForURLJob; |
125 class RequestImpl; | |
126 // FIFO queue of pending jobs waiting to be started. | 127 // FIFO queue of pending jobs waiting to be started. |
127 // TODO(eroman): Make this priority queue. | 128 // TODO(eroman): Make this priority queue. |
128 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; | 129 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; |
129 typedef std::vector<scoped_refptr<Executor>> ExecutorList; | 130 typedef std::vector<scoped_refptr<Executor>> ExecutorList; |
130 | 131 |
131 // Returns an idle worker thread which is ready to receive GetProxyForURL() | 132 // Returns an idle worker thread which is ready to receive GetProxyForURL() |
132 // requests. If all threads are occupied, returns NULL. | 133 // requests. If all threads are occupied, returns NULL. |
133 Executor* FindIdleExecutor(); | 134 Executor* FindIdleExecutor(); |
134 | 135 |
135 // Creates a new worker thread, and appends it to |executors_|. | 136 // Creates a new worker thread, and appends it to |executors_|. |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
223 | 224 |
224 virtual ~Job() {} | 225 virtual ~Job() {} |
225 | 226 |
226 private: | 227 private: |
227 const Type type_; | 228 const Type type_; |
228 CompletionCallback callback_; | 229 CompletionCallback callback_; |
229 Executor* executor_; | 230 Executor* executor_; |
230 bool was_cancelled_; | 231 bool was_cancelled_; |
231 }; | 232 }; |
232 | 233 |
233 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request { | |
234 public: | |
235 explicit RequestImpl(scoped_refptr<Job> job) : job_(std::move(job)) {} | |
236 | |
237 ~RequestImpl() override { job_->Cancel(); } | |
238 | |
239 LoadState GetLoadState() override { | |
240 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; | |
241 } | |
242 | |
243 private: | |
244 scoped_refptr<Job> job_; | |
245 }; | |
246 | |
247 // CreateResolverJob ----------------------------------------------------------- | 234 // CreateResolverJob ----------------------------------------------------------- |
248 | 235 |
249 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. | 236 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. |
250 class CreateResolverJob : public Job { | 237 class CreateResolverJob : public Job { |
251 public: | 238 public: |
252 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, | 239 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, |
253 ProxyResolverFactory* factory) | 240 ProxyResolverFactory* factory) |
254 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), | 241 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), |
255 script_data_(script_data), | 242 script_data_(script_data), |
256 factory_(factory) {} | 243 factory_(factory) {} |
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
448 DCHECK(CalledOnValidThread()); | 435 DCHECK(CalledOnValidThread()); |
449 // We will cancel all outstanding requests. | 436 // We will cancel all outstanding requests. |
450 pending_jobs_.clear(); | 437 pending_jobs_.clear(); |
451 | 438 |
452 for (auto& executor : executors_) { | 439 for (auto& executor : executors_) { |
453 executor->Destroy(); | 440 executor->Destroy(); |
454 } | 441 } |
455 } | 442 } |
456 | 443 |
457 int MultiThreadedProxyResolver::GetProxyForURL( | 444 int MultiThreadedProxyResolver::GetProxyForURL( |
458 const GURL& url, | 445 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, |
459 ProxyInfo* results, | 446 RequestHandle* request, const BoundNetLog& net_log) { |
460 const CompletionCallback& callback, | |
461 scoped_ptr<Request>* request, | |
462 const BoundNetLog& net_log) { | |
463 DCHECK(CalledOnValidThread()); | 447 DCHECK(CalledOnValidThread()); |
464 DCHECK(!callback.is_null()); | 448 DCHECK(!callback.is_null()); |
465 | 449 |
466 scoped_refptr<GetProxyForURLJob> job( | 450 scoped_refptr<GetProxyForURLJob> job( |
467 new GetProxyForURLJob(url, results, callback, net_log)); | 451 new GetProxyForURLJob(url, results, callback, net_log)); |
468 | 452 |
469 // Completion will be notified through |callback|, unless the caller cancels | 453 // Completion will be notified through |callback|, unless the caller cancels |
470 // the request using |request|. | 454 // the request using |request|. |
471 if (request) | 455 if (request) |
472 request->reset(new RequestImpl(job)); | 456 *request = reinterpret_cast<RequestHandle>(job.get()); |
473 | 457 |
474 // If there is an executor that is ready to run this request, submit it! | 458 // If there is an executor that is ready to run this request, submit it! |
475 Executor* executor = FindIdleExecutor(); | 459 Executor* executor = FindIdleExecutor(); |
476 if (executor) { | 460 if (executor) { |
477 DCHECK_EQ(0u, pending_jobs_.size()); | 461 DCHECK_EQ(0u, pending_jobs_.size()); |
478 executor->StartJob(job.get()); | 462 executor->StartJob(job.get()); |
479 return ERR_IO_PENDING; | 463 return ERR_IO_PENDING; |
480 } | 464 } |
481 | 465 |
482 // Otherwise queue this request. (We will schedule it to a thread once one | 466 // Otherwise queue this request. (We will schedule it to a thread once one |
483 // becomes available). | 467 // becomes available). |
484 job->WaitingForThread(); | 468 job->WaitingForThread(); |
485 pending_jobs_.push_back(job); | 469 pending_jobs_.push_back(job); |
486 | 470 |
487 // If we haven't already reached the thread limit, provision a new thread to | 471 // If we haven't already reached the thread limit, provision a new thread to |
488 // drain the requests more quickly. | 472 // drain the requests more quickly. |
489 if (executors_.size() < max_num_threads_) | 473 if (executors_.size() < max_num_threads_) |
490 AddNewExecutor(); | 474 AddNewExecutor(); |
491 | 475 |
492 return ERR_IO_PENDING; | 476 return ERR_IO_PENDING; |
493 } | 477 } |
494 | 478 |
| 479 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { |
| 480 DCHECK(CalledOnValidThread()); |
| 481 DCHECK(req); |
| 482 |
| 483 Job* job = reinterpret_cast<Job*>(req); |
| 484 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); |
| 485 |
| 486 if (job->executor()) { |
| 487 // If the job was already submitted to the executor, just mark it |
| 488 // as cancelled so the user callback isn't run on completion. |
| 489 job->Cancel(); |
| 490 } else { |
| 491 // Otherwise the job is just sitting in a queue. |
| 492 PendingJobsQueue::iterator it = |
| 493 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); |
| 494 DCHECK(it != pending_jobs_.end()); |
| 495 pending_jobs_.erase(it); |
| 496 } |
| 497 } |
| 498 |
| 499 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { |
| 500 DCHECK(CalledOnValidThread()); |
| 501 DCHECK(req); |
| 502 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; |
| 503 } |
495 | 504 |
496 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { | 505 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { |
497 DCHECK(CalledOnValidThread()); | 506 DCHECK(CalledOnValidThread()); |
498 for (ExecutorList::iterator it = executors_.begin(); | 507 for (ExecutorList::iterator it = executors_.begin(); |
499 it != executors_.end(); ++it) { | 508 it != executors_.end(); ++it) { |
500 Executor* executor = it->get(); | 509 Executor* executor = it->get(); |
501 if (!executor->outstanding_job()) | 510 if (!executor->outstanding_job()) |
502 return executor; | 511 return executor; |
503 } | 512 } |
504 return NULL; | 513 return NULL; |
505 } | 514 } |
506 | 515 |
507 void MultiThreadedProxyResolver::AddNewExecutor() { | 516 void MultiThreadedProxyResolver::AddNewExecutor() { |
508 DCHECK(CalledOnValidThread()); | 517 DCHECK(CalledOnValidThread()); |
509 DCHECK_LT(executors_.size(), max_num_threads_); | 518 DCHECK_LT(executors_.size(), max_num_threads_); |
510 // The "thread number" is used to give the thread a unique name. | 519 // The "thread number" is used to give the thread a unique name. |
511 int thread_number = executors_.size(); | 520 int thread_number = executors_.size(); |
512 Executor* executor = new Executor(this, thread_number); | 521 Executor* executor = new Executor(this, thread_number); |
513 executor->StartJob( | 522 executor->StartJob( |
514 new CreateResolverJob(script_data_, resolver_factory_.get())); | 523 new CreateResolverJob(script_data_, resolver_factory_.get())); |
515 executors_.push_back(make_scoped_refptr(executor)); | 524 executors_.push_back(make_scoped_refptr(executor)); |
516 } | 525 } |
517 | 526 |
518 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { | 527 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
519 DCHECK(CalledOnValidThread()); | 528 DCHECK(CalledOnValidThread()); |
520 while (!pending_jobs_.empty()) { | 529 if (pending_jobs_.empty()) |
521 scoped_refptr<Job> job = pending_jobs_.front(); | 530 return; |
522 pending_jobs_.pop_front(); | 531 |
523 if (!job->was_cancelled()) { | 532 // Get the next job to process (FIFO). Transfer it from the pending queue |
524 executor->StartJob(job.get()); | 533 // to the executor. |
525 return; | 534 scoped_refptr<Job> job = pending_jobs_.front(); |
526 } | 535 pending_jobs_.pop_front(); |
527 } | 536 executor->StartJob(job.get()); |
528 } | 537 } |
529 | 538 |
530 } // namespace | 539 } // namespace |
531 | 540 |
532 class MultiThreadedProxyResolverFactory::Job | 541 class MultiThreadedProxyResolverFactory::Job |
533 : public ProxyResolverFactory::Request, | 542 : public ProxyResolverFactory::Request, |
534 public Executor::Coordinator { | 543 public Executor::Coordinator { |
535 public: | 544 public: |
536 Job(MultiThreadedProxyResolverFactory* factory, | 545 Job(MultiThreadedProxyResolverFactory* factory, |
537 const scoped_refptr<ProxyResolverScriptData>& script_data, | 546 const scoped_refptr<ProxyResolverScriptData>& script_data, |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
615 return ERR_IO_PENDING; | 624 return ERR_IO_PENDING; |
616 } | 625 } |
617 | 626 |
618 void MultiThreadedProxyResolverFactory::RemoveJob( | 627 void MultiThreadedProxyResolverFactory::RemoveJob( |
619 MultiThreadedProxyResolverFactory::Job* job) { | 628 MultiThreadedProxyResolverFactory::Job* job) { |
620 size_t erased = jobs_.erase(job); | 629 size_t erased = jobs_.erase(job); |
621 DCHECK_EQ(1u, erased); | 630 DCHECK_EQ(1u, erased); |
622 } | 631 } |
623 | 632 |
624 } // namespace net | 633 } // namespace net |
OLD | NEW |