Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/proxy/multi_threaded_proxy_resolver.h" | 5 #include "net/proxy/multi_threaded_proxy_resolver.h" |
| 6 | 6 |
| 7 #include <deque> | 7 #include <deque> |
| 8 #include <utility> | 8 #include <utility> |
| 9 #include <vector> | 9 #include <vector> |
| 10 | 10 |
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 110 size_t max_num_threads, | 110 size_t max_num_threads, |
| 111 const scoped_refptr<ProxyResolverScriptData>& script_data, | 111 const scoped_refptr<ProxyResolverScriptData>& script_data, |
| 112 scoped_refptr<Executor> executor); | 112 scoped_refptr<Executor> executor); |
| 113 | 113 |
| 114 ~MultiThreadedProxyResolver() override; | 114 ~MultiThreadedProxyResolver() override; |
| 115 | 115 |
| 116 // ProxyResolver implementation: | 116 // ProxyResolver implementation: |
| 117 int GetProxyForURL(const GURL& url, | 117 int GetProxyForURL(const GURL& url, |
| 118 ProxyInfo* results, | 118 ProxyInfo* results, |
| 119 const CompletionCallback& callback, | 119 const CompletionCallback& callback, |
| 120 RequestHandle* request, | 120 scoped_ptr<Request>* request, |
| 121 const BoundNetLog& net_log) override; | 121 const BoundNetLog& net_log) override; |
| 122 void CancelRequest(RequestHandle request) override; | |
| 123 LoadState GetLoadState(RequestHandle request) const override; | |
| 124 | 122 |
| 125 private: | 123 private: |
| 126 class GetProxyForURLJob; | 124 class GetProxyForURLJob; |
| 125 class RequestImpl; | |
| 127 // FIFO queue of pending jobs waiting to be started. | 126 // FIFO queue of pending jobs waiting to be started. |
| 128 // TODO(eroman): Make this priority queue. | 127 // TODO(eroman): Make this priority queue. |
| 129 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; | 128 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; |
| 130 typedef std::vector<scoped_refptr<Executor>> ExecutorList; | 129 typedef std::vector<scoped_refptr<Executor>> ExecutorList; |
| 131 | 130 |
| 132 // Returns an idle worker thread which is ready to receive GetProxyForURL() | 131 // Returns an idle worker thread which is ready to receive GetProxyForURL() |
| 133 // requests. If all threads are occupied, returns NULL. | 132 // requests. If all threads are occupied, returns NULL. |
| 134 Executor* FindIdleExecutor(); | 133 Executor* FindIdleExecutor(); |
| 135 | 134 |
| 136 // Creates a new worker thread, and appends it to |executors_|. | 135 // Creates a new worker thread, and appends it to |executors_|. |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 224 | 223 |
| 225 virtual ~Job() {} | 224 virtual ~Job() {} |
| 226 | 225 |
| 227 private: | 226 private: |
| 228 const Type type_; | 227 const Type type_; |
| 229 CompletionCallback callback_; | 228 CompletionCallback callback_; |
| 230 Executor* executor_; | 229 Executor* executor_; |
| 231 bool was_cancelled_; | 230 bool was_cancelled_; |
| 232 }; | 231 }; |
| 233 | 232 |
| 233 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request { | |
| 234 public: | |
| 235 RequestImpl(scoped_refptr<Job> job) : job_(job) {} | |
|
eroman
2016/02/24 03:08:06
You can do: job_(std::move(job)) to avoid a spurio
| |
| 236 | |
| 237 ~RequestImpl() override { job_->Cancel(); } | |
| 238 | |
| 239 LoadState GetLoadState() override { | |
| 240 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; | |
| 241 } | |
| 242 | |
| 243 private: | |
| 244 scoped_refptr<Job> job_; | |
| 245 }; | |
| 246 | |
| 234 // CreateResolverJob ----------------------------------------------------------- | 247 // CreateResolverJob ----------------------------------------------------------- |
| 235 | 248 |
| 236 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. | 249 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. |
| 237 class CreateResolverJob : public Job { | 250 class CreateResolverJob : public Job { |
| 238 public: | 251 public: |
| 239 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, | 252 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, |
| 240 ProxyResolverFactory* factory) | 253 ProxyResolverFactory* factory) |
| 241 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), | 254 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), |
| 242 script_data_(script_data), | 255 script_data_(script_data), |
| 243 factory_(factory) {} | 256 factory_(factory) {} |
| (...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 435 DCHECK(CalledOnValidThread()); | 448 DCHECK(CalledOnValidThread()); |
| 436 // We will cancel all outstanding requests. | 449 // We will cancel all outstanding requests. |
| 437 pending_jobs_.clear(); | 450 pending_jobs_.clear(); |
| 438 | 451 |
| 439 for (auto& executor : executors_) { | 452 for (auto& executor : executors_) { |
| 440 executor->Destroy(); | 453 executor->Destroy(); |
| 441 } | 454 } |
| 442 } | 455 } |
| 443 | 456 |
| 444 int MultiThreadedProxyResolver::GetProxyForURL( | 457 int MultiThreadedProxyResolver::GetProxyForURL( |
| 445 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, | 458 const GURL& url, |
| 446 RequestHandle* request, const BoundNetLog& net_log) { | 459 ProxyInfo* results, |
| 460 const CompletionCallback& callback, | |
| 461 scoped_ptr<Request>* request, | |
| 462 const BoundNetLog& net_log) { | |
| 447 DCHECK(CalledOnValidThread()); | 463 DCHECK(CalledOnValidThread()); |
| 448 DCHECK(!callback.is_null()); | 464 DCHECK(!callback.is_null()); |
| 449 | 465 |
| 450 scoped_refptr<GetProxyForURLJob> job( | 466 scoped_refptr<GetProxyForURLJob> job( |
| 451 new GetProxyForURLJob(url, results, callback, net_log)); | 467 new GetProxyForURLJob(url, results, callback, net_log)); |
| 452 | 468 |
| 453 // Completion will be notified through |callback|, unless the caller cancels | 469 // Completion will be notified through |callback|, unless the caller cancels |
| 454 // the request using |request|. | 470 // the request using |request|. |
| 455 if (request) | 471 if (request) |
| 456 *request = reinterpret_cast<RequestHandle>(job.get()); | 472 request->reset(new RequestImpl(job)); |
| 457 | 473 |
| 458 // If there is an executor that is ready to run this request, submit it! | 474 // If there is an executor that is ready to run this request, submit it! |
| 459 Executor* executor = FindIdleExecutor(); | 475 Executor* executor = FindIdleExecutor(); |
| 460 if (executor) { | 476 if (executor) { |
| 461 DCHECK_EQ(0u, pending_jobs_.size()); | 477 DCHECK_EQ(0u, pending_jobs_.size()); |
| 462 executor->StartJob(job.get()); | 478 executor->StartJob(job.get()); |
| 463 return ERR_IO_PENDING; | 479 return ERR_IO_PENDING; |
| 464 } | 480 } |
| 465 | 481 |
| 466 // Otherwise queue this request. (We will schedule it to a thread once one | 482 // Otherwise queue this request. (We will schedule it to a thread once one |
| 467 // becomes available). | 483 // becomes available). |
| 468 job->WaitingForThread(); | 484 job->WaitingForThread(); |
| 469 pending_jobs_.push_back(job); | 485 pending_jobs_.push_back(job); |
| 470 | 486 |
| 471 // If we haven't already reached the thread limit, provision a new thread to | 487 // If we haven't already reached the thread limit, provision a new thread to |
| 472 // drain the requests more quickly. | 488 // drain the requests more quickly. |
| 473 if (executors_.size() < max_num_threads_) | 489 if (executors_.size() < max_num_threads_) |
| 474 AddNewExecutor(); | 490 AddNewExecutor(); |
| 475 | 491 |
| 476 return ERR_IO_PENDING; | 492 return ERR_IO_PENDING; |
| 477 } | 493 } |
| 478 | 494 |
| 479 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { | |
| 480 DCHECK(CalledOnValidThread()); | |
| 481 DCHECK(req); | |
| 482 | |
| 483 Job* job = reinterpret_cast<Job*>(req); | |
| 484 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); | |
| 485 | |
| 486 if (job->executor()) { | |
| 487 // If the job was already submitted to the executor, just mark it | |
| 488 // as cancelled so the user callback isn't run on completion. | |
| 489 job->Cancel(); | |
| 490 } else { | |
| 491 // Otherwise the job is just sitting in a queue. | |
| 492 PendingJobsQueue::iterator it = | |
| 493 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); | |
| 494 DCHECK(it != pending_jobs_.end()); | |
| 495 pending_jobs_.erase(it); | |
| 496 } | |
| 497 } | |
| 498 | |
| 499 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { | |
| 500 DCHECK(CalledOnValidThread()); | |
| 501 DCHECK(req); | |
| 502 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; | |
| 503 } | |
| 504 | 495 |
| 505 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { | 496 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { |
| 506 DCHECK(CalledOnValidThread()); | 497 DCHECK(CalledOnValidThread()); |
| 507 for (ExecutorList::iterator it = executors_.begin(); | 498 for (ExecutorList::iterator it = executors_.begin(); |
| 508 it != executors_.end(); ++it) { | 499 it != executors_.end(); ++it) { |
| 509 Executor* executor = it->get(); | 500 Executor* executor = it->get(); |
| 510 if (!executor->outstanding_job()) | 501 if (!executor->outstanding_job()) |
| 511 return executor; | 502 return executor; |
| 512 } | 503 } |
| 513 return NULL; | 504 return NULL; |
| 514 } | 505 } |
| 515 | 506 |
| 516 void MultiThreadedProxyResolver::AddNewExecutor() { | 507 void MultiThreadedProxyResolver::AddNewExecutor() { |
| 517 DCHECK(CalledOnValidThread()); | 508 DCHECK(CalledOnValidThread()); |
| 518 DCHECK_LT(executors_.size(), max_num_threads_); | 509 DCHECK_LT(executors_.size(), max_num_threads_); |
| 519 // The "thread number" is used to give the thread a unique name. | 510 // The "thread number" is used to give the thread a unique name. |
| 520 int thread_number = executors_.size(); | 511 int thread_number = executors_.size(); |
| 521 Executor* executor = new Executor(this, thread_number); | 512 Executor* executor = new Executor(this, thread_number); |
| 522 executor->StartJob( | 513 executor->StartJob( |
| 523 new CreateResolverJob(script_data_, resolver_factory_.get())); | 514 new CreateResolverJob(script_data_, resolver_factory_.get())); |
| 524 executors_.push_back(make_scoped_refptr(executor)); | 515 executors_.push_back(make_scoped_refptr(executor)); |
| 525 } | 516 } |
| 526 | 517 |
| 527 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { | 518 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
| 528 DCHECK(CalledOnValidThread()); | 519 DCHECK(CalledOnValidThread()); |
| 529 if (pending_jobs_.empty()) | 520 while (!pending_jobs_.empty()) { |
| 530 return; | 521 scoped_refptr<Job> job = pending_jobs_.front(); |
| 531 | 522 pending_jobs_.pop_front(); |
| 532 // Get the next job to process (FIFO). Transfer it from the pending queue | 523 if (!job->was_cancelled()) { |
| 533 // to the executor. | 524 executor->StartJob(job.get()); |
| 534 scoped_refptr<Job> job = pending_jobs_.front(); | 525 return; |
| 535 pending_jobs_.pop_front(); | 526 } |
| 536 executor->StartJob(job.get()); | 527 } |
| 537 } | 528 } |
| 538 | 529 |
| 539 } // namespace | 530 } // namespace |
| 540 | 531 |
| 541 class MultiThreadedProxyResolverFactory::Job | 532 class MultiThreadedProxyResolverFactory::Job |
| 542 : public ProxyResolverFactory::Request, | 533 : public ProxyResolverFactory::Request, |
| 543 public Executor::Coordinator { | 534 public Executor::Coordinator { |
| 544 public: | 535 public: |
| 545 Job(MultiThreadedProxyResolverFactory* factory, | 536 Job(MultiThreadedProxyResolverFactory* factory, |
| 546 const scoped_refptr<ProxyResolverScriptData>& script_data, | 537 const scoped_refptr<ProxyResolverScriptData>& script_data, |
| (...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 624 return ERR_IO_PENDING; | 615 return ERR_IO_PENDING; |
| 625 } | 616 } |
| 626 | 617 |
| 627 void MultiThreadedProxyResolverFactory::RemoveJob( | 618 void MultiThreadedProxyResolverFactory::RemoveJob( |
| 628 MultiThreadedProxyResolverFactory::Job* job) { | 619 MultiThreadedProxyResolverFactory::Job* job) { |
| 629 size_t erased = jobs_.erase(job); | 620 size_t erased = jobs_.erase(job); |
| 630 DCHECK_EQ(1u, erased); | 621 DCHECK_EQ(1u, erased); |
| 631 } | 622 } |
| 632 | 623 |
| 633 } // namespace net | 624 } // namespace net |
| OLD | NEW |