| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/proxy/multi_threaded_proxy_resolver.h" | 5 #include "net/proxy/multi_threaded_proxy_resolver.h" |
| 6 | 6 |
| 7 #include <deque> | 7 #include <deque> |
| 8 #include <utility> | 8 #include <utility> |
| 9 #include <vector> | 9 #include <vector> |
| 10 | 10 |
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 110 size_t max_num_threads, | 110 size_t max_num_threads, |
| 111 const scoped_refptr<ProxyResolverScriptData>& script_data, | 111 const scoped_refptr<ProxyResolverScriptData>& script_data, |
| 112 scoped_refptr<Executor> executor); | 112 scoped_refptr<Executor> executor); |
| 113 | 113 |
| 114 ~MultiThreadedProxyResolver() override; | 114 ~MultiThreadedProxyResolver() override; |
| 115 | 115 |
| 116 // ProxyResolver implementation: | 116 // ProxyResolver implementation: |
| 117 int GetProxyForURL(const GURL& url, | 117 int GetProxyForURL(const GURL& url, |
| 118 ProxyInfo* results, | 118 ProxyInfo* results, |
| 119 const CompletionCallback& callback, | 119 const CompletionCallback& callback, |
| 120 scoped_ptr<Request>* request, | 120 RequestHandle* request, |
| 121 const BoundNetLog& net_log) override; | 121 const BoundNetLog& net_log) override; |
| 122 void CancelRequest(RequestHandle request) override; |
| 123 LoadState GetLoadState(RequestHandle request) const override; |
| 122 | 124 |
| 123 private: | 125 private: |
| 124 class GetProxyForURLJob; | 126 class GetProxyForURLJob; |
| 125 class RequestImpl; | |
| 126 // FIFO queue of pending jobs waiting to be started. | 127 // FIFO queue of pending jobs waiting to be started. |
| 127 // TODO(eroman): Make this priority queue. | 128 // TODO(eroman): Make this priority queue. |
| 128 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; | 129 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; |
| 129 typedef std::vector<scoped_refptr<Executor>> ExecutorList; | 130 typedef std::vector<scoped_refptr<Executor>> ExecutorList; |
| 130 | 131 |
| 131 // Returns an idle worker thread which is ready to receive GetProxyForURL() | 132 // Returns an idle worker thread which is ready to receive GetProxyForURL() |
| 132 // requests. If all threads are occupied, returns NULL. | 133 // requests. If all threads are occupied, returns NULL. |
| 133 Executor* FindIdleExecutor(); | 134 Executor* FindIdleExecutor(); |
| 134 | 135 |
| 135 // Creates a new worker thread, and appends it to |executors_|. | 136 // Creates a new worker thread, and appends it to |executors_|. |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 223 | 224 |
| 224 virtual ~Job() {} | 225 virtual ~Job() {} |
| 225 | 226 |
| 226 private: | 227 private: |
| 227 const Type type_; | 228 const Type type_; |
| 228 CompletionCallback callback_; | 229 CompletionCallback callback_; |
| 229 Executor* executor_; | 230 Executor* executor_; |
| 230 bool was_cancelled_; | 231 bool was_cancelled_; |
| 231 }; | 232 }; |
| 232 | 233 |
| 233 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request { | |
| 234 public: | |
| 235 explicit RequestImpl(scoped_refptr<Job> job) : job_(std::move(job)) {} | |
| 236 | |
| 237 ~RequestImpl() override { job_->Cancel(); } | |
| 238 | |
| 239 LoadState GetLoadState() override { | |
| 240 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; | |
| 241 } | |
| 242 | |
| 243 private: | |
| 244 scoped_refptr<Job> job_; | |
| 245 }; | |
| 246 | |
| 247 // CreateResolverJob ----------------------------------------------------------- | 234 // CreateResolverJob ----------------------------------------------------------- |
| 248 | 235 |
| 249 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. | 236 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. |
| 250 class CreateResolverJob : public Job { | 237 class CreateResolverJob : public Job { |
| 251 public: | 238 public: |
| 252 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, | 239 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, |
| 253 ProxyResolverFactory* factory) | 240 ProxyResolverFactory* factory) |
| 254 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), | 241 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), |
| 255 script_data_(script_data), | 242 script_data_(script_data), |
| 256 factory_(factory) {} | 243 factory_(factory) {} |
| (...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 448 DCHECK(CalledOnValidThread()); | 435 DCHECK(CalledOnValidThread()); |
| 449 // We will cancel all outstanding requests. | 436 // We will cancel all outstanding requests. |
| 450 pending_jobs_.clear(); | 437 pending_jobs_.clear(); |
| 451 | 438 |
| 452 for (auto& executor : executors_) { | 439 for (auto& executor : executors_) { |
| 453 executor->Destroy(); | 440 executor->Destroy(); |
| 454 } | 441 } |
| 455 } | 442 } |
| 456 | 443 |
| 457 int MultiThreadedProxyResolver::GetProxyForURL( | 444 int MultiThreadedProxyResolver::GetProxyForURL( |
| 458 const GURL& url, | 445 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, |
| 459 ProxyInfo* results, | 446 RequestHandle* request, const BoundNetLog& net_log) { |
| 460 const CompletionCallback& callback, | |
| 461 scoped_ptr<Request>* request, | |
| 462 const BoundNetLog& net_log) { | |
| 463 DCHECK(CalledOnValidThread()); | 447 DCHECK(CalledOnValidThread()); |
| 464 DCHECK(!callback.is_null()); | 448 DCHECK(!callback.is_null()); |
| 465 | 449 |
| 466 scoped_refptr<GetProxyForURLJob> job( | 450 scoped_refptr<GetProxyForURLJob> job( |
| 467 new GetProxyForURLJob(url, results, callback, net_log)); | 451 new GetProxyForURLJob(url, results, callback, net_log)); |
| 468 | 452 |
| 469 // Completion will be notified through |callback|, unless the caller cancels | 453 // Completion will be notified through |callback|, unless the caller cancels |
| 470 // the request using |request|. | 454 // the request using |request|. |
| 471 if (request) | 455 if (request) |
| 472 request->reset(new RequestImpl(job)); | 456 *request = reinterpret_cast<RequestHandle>(job.get()); |
| 473 | 457 |
| 474 // If there is an executor that is ready to run this request, submit it! | 458 // If there is an executor that is ready to run this request, submit it! |
| 475 Executor* executor = FindIdleExecutor(); | 459 Executor* executor = FindIdleExecutor(); |
| 476 if (executor) { | 460 if (executor) { |
| 477 DCHECK_EQ(0u, pending_jobs_.size()); | 461 DCHECK_EQ(0u, pending_jobs_.size()); |
| 478 executor->StartJob(job.get()); | 462 executor->StartJob(job.get()); |
| 479 return ERR_IO_PENDING; | 463 return ERR_IO_PENDING; |
| 480 } | 464 } |
| 481 | 465 |
| 482 // Otherwise queue this request. (We will schedule it to a thread once one | 466 // Otherwise queue this request. (We will schedule it to a thread once one |
| 483 // becomes available). | 467 // becomes available). |
| 484 job->WaitingForThread(); | 468 job->WaitingForThread(); |
| 485 pending_jobs_.push_back(job); | 469 pending_jobs_.push_back(job); |
| 486 | 470 |
| 487 // If we haven't already reached the thread limit, provision a new thread to | 471 // If we haven't already reached the thread limit, provision a new thread to |
| 488 // drain the requests more quickly. | 472 // drain the requests more quickly. |
| 489 if (executors_.size() < max_num_threads_) | 473 if (executors_.size() < max_num_threads_) |
| 490 AddNewExecutor(); | 474 AddNewExecutor(); |
| 491 | 475 |
| 492 return ERR_IO_PENDING; | 476 return ERR_IO_PENDING; |
| 493 } | 477 } |
| 494 | 478 |
| 479 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { |
| 480 DCHECK(CalledOnValidThread()); |
| 481 DCHECK(req); |
| 482 |
| 483 Job* job = reinterpret_cast<Job*>(req); |
| 484 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); |
| 485 |
| 486 if (job->executor()) { |
| 487 // If the job was already submitted to the executor, just mark it |
| 488 // as cancelled so the user callback isn't run on completion. |
| 489 job->Cancel(); |
| 490 } else { |
| 491 // Otherwise the job is just sitting in a queue. |
| 492 PendingJobsQueue::iterator it = |
| 493 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); |
| 494 DCHECK(it != pending_jobs_.end()); |
| 495 pending_jobs_.erase(it); |
| 496 } |
| 497 } |
| 498 |
| 499 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { |
| 500 DCHECK(CalledOnValidThread()); |
| 501 DCHECK(req); |
| 502 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; |
| 503 } |
| 495 | 504 |
| 496 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { | 505 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { |
| 497 DCHECK(CalledOnValidThread()); | 506 DCHECK(CalledOnValidThread()); |
| 498 for (ExecutorList::iterator it = executors_.begin(); | 507 for (ExecutorList::iterator it = executors_.begin(); |
| 499 it != executors_.end(); ++it) { | 508 it != executors_.end(); ++it) { |
| 500 Executor* executor = it->get(); | 509 Executor* executor = it->get(); |
| 501 if (!executor->outstanding_job()) | 510 if (!executor->outstanding_job()) |
| 502 return executor; | 511 return executor; |
| 503 } | 512 } |
| 504 return NULL; | 513 return NULL; |
| 505 } | 514 } |
| 506 | 515 |
| 507 void MultiThreadedProxyResolver::AddNewExecutor() { | 516 void MultiThreadedProxyResolver::AddNewExecutor() { |
| 508 DCHECK(CalledOnValidThread()); | 517 DCHECK(CalledOnValidThread()); |
| 509 DCHECK_LT(executors_.size(), max_num_threads_); | 518 DCHECK_LT(executors_.size(), max_num_threads_); |
| 510 // The "thread number" is used to give the thread a unique name. | 519 // The "thread number" is used to give the thread a unique name. |
| 511 int thread_number = executors_.size(); | 520 int thread_number = executors_.size(); |
| 512 Executor* executor = new Executor(this, thread_number); | 521 Executor* executor = new Executor(this, thread_number); |
| 513 executor->StartJob( | 522 executor->StartJob( |
| 514 new CreateResolverJob(script_data_, resolver_factory_.get())); | 523 new CreateResolverJob(script_data_, resolver_factory_.get())); |
| 515 executors_.push_back(make_scoped_refptr(executor)); | 524 executors_.push_back(make_scoped_refptr(executor)); |
| 516 } | 525 } |
| 517 | 526 |
| 518 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { | 527 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
| 519 DCHECK(CalledOnValidThread()); | 528 DCHECK(CalledOnValidThread()); |
| 520 while (!pending_jobs_.empty()) { | 529 if (pending_jobs_.empty()) |
| 521 scoped_refptr<Job> job = pending_jobs_.front(); | 530 return; |
| 522 pending_jobs_.pop_front(); | 531 |
| 523 if (!job->was_cancelled()) { | 532 // Get the next job to process (FIFO). Transfer it from the pending queue |
| 524 executor->StartJob(job.get()); | 533 // to the executor. |
| 525 return; | 534 scoped_refptr<Job> job = pending_jobs_.front(); |
| 526 } | 535 pending_jobs_.pop_front(); |
| 527 } | 536 executor->StartJob(job.get()); |
| 528 } | 537 } |
| 529 | 538 |
| 530 } // namespace | 539 } // namespace |
| 531 | 540 |
| 532 class MultiThreadedProxyResolverFactory::Job | 541 class MultiThreadedProxyResolverFactory::Job |
| 533 : public ProxyResolverFactory::Request, | 542 : public ProxyResolverFactory::Request, |
| 534 public Executor::Coordinator { | 543 public Executor::Coordinator { |
| 535 public: | 544 public: |
| 536 Job(MultiThreadedProxyResolverFactory* factory, | 545 Job(MultiThreadedProxyResolverFactory* factory, |
| 537 const scoped_refptr<ProxyResolverScriptData>& script_data, | 546 const scoped_refptr<ProxyResolverScriptData>& script_data, |
| (...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 615 return ERR_IO_PENDING; | 624 return ERR_IO_PENDING; |
| 616 } | 625 } |
| 617 | 626 |
| 618 void MultiThreadedProxyResolverFactory::RemoveJob( | 627 void MultiThreadedProxyResolverFactory::RemoveJob( |
| 619 MultiThreadedProxyResolverFactory::Job* job) { | 628 MultiThreadedProxyResolverFactory::Job* job) { |
| 620 size_t erased = jobs_.erase(job); | 629 size_t erased = jobs_.erase(job); |
| 621 DCHECK_EQ(1u, erased); | 630 DCHECK_EQ(1u, erased); |
| 622 } | 631 } |
| 623 | 632 |
| 624 } // namespace net | 633 } // namespace net |
| OLD | NEW |