Index: net/proxy/multi_threaded_proxy_resolver.cc |
=================================================================== |
--- net/proxy/multi_threaded_proxy_resolver.cc (revision 51914) |
+++ net/proxy/multi_threaded_proxy_resolver.cc (working copy) |
@@ -2,14 +2,19 @@ |
// Use of this source code is governed by a BSD-style license that can be |
// found in the LICENSE file. |
-#include "net/proxy/single_threaded_proxy_resolver.h" |
+#include "net/proxy/multi_threaded_proxy_resolver.h" |
#include "base/message_loop.h" |
+#include "base/string_util.h" |
#include "base/thread.h" |
#include "net/base/capturing_net_log.h" |
#include "net/base/net_errors.h" |
#include "net/proxy/proxy_info.h" |
+// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script |
+// data when SetPacScript fails. That will reclaim memory when |
+// testing bogus scripts. |
+ |
namespace net { |
namespace { |
@@ -26,328 +31,558 @@ |
} // namespace |
-// SingleThreadedProxyResolver::SetPacScriptTask ------------------------------ |
+// An "executor" is a job-runner for PAC requests. It encapsulates a worker |
+// thread and a synchronous ProxyResolver (which will be operated on said |
+// thread.) |
+class MultiThreadedProxyResolver::Executor |
+ : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { |
+ public: |
+ // |coordinator| must remain valid throughout our lifetime. It is used to |
+ // signal when the executor is ready to receive work by calling |
+ // |coordinator->OnExecutorReady()|. |
+ // The constructor takes ownership of |resolver|. |
+ // |thread_number| is an identifier used when naming the worker thread. |
+ Executor(MultiThreadedProxyResolver* coordinator, |
+ ProxyResolver* resolver, |
+ int thread_number); |
-// Runs on the worker thread to call ProxyResolver::SetPacScript. |
-class SingleThreadedProxyResolver::SetPacScriptTask |
- : public base::RefCountedThreadSafe< |
- SingleThreadedProxyResolver::SetPacScriptTask> { |
+ // Submit a job to this executor. |
+ void StartJob(Job* job); |
+ |
+ // Callback for when a job has completed running on the executor's thread. |
+ void OnJobCompleted(Job* job); |
+ |
+ // Cleanup the executor. Cancels all outstanding work, and frees the thread |
+ // and resolver. |
+ void Destroy(); |
+ |
+ void PurgeMemory(); |
+ |
+ // Returns the outstanding job, or NULL. |
+ Job* outstanding_job() const { return outstanding_job_.get(); } |
+ |
+ ProxyResolver* resolver() { return resolver_.get(); } |
+ |
+ int thread_number() const { return thread_number_; } |
+ |
+ private: |
+ friend class base::RefCountedThreadSafe<Executor>; |
+ ~Executor(); |
+ |
+ MultiThreadedProxyResolver* coordinator_; |
+ const int thread_number_; |
+ |
+ // The currently active job for this executor (either a SetPacScript or |
+ // GetProxyForURL task). |
+ scoped_refptr<Job> outstanding_job_; |
+ |
+ // The synchronous resolver implementation. |
+ scoped_ptr<ProxyResolver> resolver_; |
+ |
+ // The thread where |resolver_| is run on. |
+ // Note that declaration ordering is important here. |thread_| needs to be |
+ // destroyed *before* |resolver_|, in case |resolver_| is currently |
+ // executing on |thread_|. |
+ scoped_ptr<base::Thread> thread_; |
+}; |
+ |
+// MultiThreadedProxyResolver::Job --------------------------------------------- |
+ |
+class MultiThreadedProxyResolver::Job |
+ : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { |
public: |
- SetPacScriptTask(SingleThreadedProxyResolver* coordinator, |
- const GURL& pac_url, |
- const string16& pac_script, |
- CompletionCallback* callback) |
- : coordinator_(coordinator), |
- callback_(callback), |
- pac_script_(pac_script), |
- pac_url_(pac_url), |
- origin_loop_(MessageLoop::current()) { |
- DCHECK(callback); |
+ // Identifies the subclass of Job (only being used for debugging purposes). |
+ enum Type { |
+ TYPE_GET_PROXY_FOR_URL, |
+ TYPE_SET_PAC_SCRIPT, |
+ TYPE_SET_PAC_SCRIPT_INTERNAL, |
+ }; |
+ |
+ Job(Type type, CompletionCallback* user_callback) |
+ : type_(type), |
+ user_callback_(user_callback), |
+ executor_(NULL), |
+ was_cancelled_(false) { |
} |
- // Start the SetPacScript request on the worker thread. |
- void Start() { |
- coordinator_->thread()->message_loop()->PostTask( |
- FROM_HERE, NewRunnableMethod(this, &SetPacScriptTask::DoRequest, |
- coordinator_->resolver_.get())); |
+ void set_executor(Executor* executor) { |
+ executor_ = executor; |
} |
+ // The "executor" is the job runner that is scheduling this job. If |
+ // this job has not been submitted to an executor yet, this will be |
+ // NULL (and we know it hasn't started yet). |
+ Executor* executor() { |
+ return executor_; |
+ } |
+ |
+ // Mark the job as having been cancelled. |
void Cancel() { |
- // Clear these to inform RequestComplete that it should not try to |
- // access them. |
- coordinator_ = NULL; |
- callback_ = NULL; |
+ was_cancelled_ = true; |
} |
// Returns true if Cancel() has been called. |
- bool was_cancelled() const { return callback_ == NULL; } |
+ bool was_cancelled() const { return was_cancelled_; } |
+ Type type() const { return type_; } |
+ |
+ // Returns true if this job still has a user callback. Some jobs |
+ // do not have a user callback, because they were helper jobs |
+ // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). |
+ // |
+ // Otherwise jobs that correspond with user-initiated work will |
+ // have a non-NULL callback up until the callback is run. |
+ bool has_user_callback() const { return user_callback_ != NULL; } |
+ |
+ // This method is called when the job is inserted into a wait queue |
+ // because no executors were ready to accept it. |
+ virtual void WaitingForThread() {} |
+ |
+ // This method is called just before the job is posted to the work thread. |
+ virtual void FinishedWaitingForThread() {} |
+ |
+ // This method is called on the worker thread to do the job's work. On |
+ // completion, implementors are expected to call OnJobCompleted() on |
+ // |origin_loop|. |
+ virtual void Run(MessageLoop* origin_loop) = 0; |
+ |
+ protected: |
+ void OnJobCompleted() { |
+ // |executor_| will be NULL if the executor has already been deleted. |
+ if (executor_) |
+ executor_->OnJobCompleted(this); |
+ } |
+ |
+ void RunUserCallback(int result) { |
+ DCHECK(has_user_callback()); |
+ CompletionCallback* callback = user_callback_; |
+ // Null the callback so has_user_callback() will now return false. |
+ user_callback_ = NULL; |
+ callback->Run(result); |
+ } |
+ |
+ friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; |
+ |
+ virtual ~Job() {} |
+ |
private: |
- friend class base::RefCountedThreadSafe< |
- SingleThreadedProxyResolver::SetPacScriptTask>; |
+ const Type type_; |
+ CompletionCallback* user_callback_; |
+ Executor* executor_; |
+ bool was_cancelled_; |
+}; |
- ~SetPacScriptTask() {} |
+// MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- |
+// Runs on the worker thread to call ProxyResolver::SetPacScript. |
+class MultiThreadedProxyResolver::SetPacScriptJob |
+ : public MultiThreadedProxyResolver::Job { |
+ public: |
+ SetPacScriptJob(const GURL& pac_url, |
+ const string16& pac_script, |
+ CompletionCallback* callback) |
+ : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL, |
+ callback), |
+ pac_url_(pac_url), |
+ pac_script_(pac_script) { |
+ } |
+ |
// Runs on the worker thread. |
- void DoRequest(ProxyResolver* resolver) { |
+ virtual void Run(MessageLoop* origin_loop) { |
+ ProxyResolver* resolver = executor()->resolver(); |
int rv = resolver->expects_pac_bytes() ? |
resolver->SetPacScriptByData(pac_script_, NULL) : |
resolver->SetPacScriptByUrl(pac_url_, NULL); |
DCHECK_NE(rv, ERR_IO_PENDING); |
- origin_loop_->PostTask(FROM_HERE, |
- NewRunnableMethod(this, &SetPacScriptTask::RequestComplete, rv)); |
+ origin_loop->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv)); |
} |
+ private: |
// Runs the completion callback on the origin thread. |
void RequestComplete(int result_code) { |
// The task may have been cancelled after it was started. |
- if (!was_cancelled()) { |
- CompletionCallback* callback = callback_; |
- coordinator_->RemoveOutstandingSetPacScriptTask(this); |
- callback->Run(result_code); |
+ if (!was_cancelled() && has_user_callback()) { |
+ RunUserCallback(result_code); |
} |
+ OnJobCompleted(); |
} |
- // Must only be used on the "origin" thread. |
- SingleThreadedProxyResolver* coordinator_; |
- CompletionCallback* callback_; |
- string16 pac_script_; |
- GURL pac_url_; |
- |
- // Usable from within DoQuery on the worker thread. |
- MessageLoop* origin_loop_; |
+ const GURL pac_url_; |
+ const string16 pac_script_; |
}; |
-// SingleThreadedProxyResolver::Job ------------------------------------------- |
+// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ |
-class SingleThreadedProxyResolver::Job |
- : public base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job> { |
+class MultiThreadedProxyResolver::GetProxyForURLJob |
+ : public MultiThreadedProxyResolver::Job { |
public: |
- // |coordinator| -- the SingleThreadedProxyResolver that owns this job. |
// |url| -- the URL of the query. |
// |results| -- the structure to fill with proxy resolve results. |
- Job(SingleThreadedProxyResolver* coordinator, |
- const GURL& url, |
- ProxyInfo* results, |
- CompletionCallback* callback, |
- const BoundNetLog& net_log) |
- : coordinator_(coordinator), |
- callback_(callback), |
- results_(results), |
- net_log_(net_log), |
- url_(url), |
- is_started_(false), |
- origin_loop_(MessageLoop::current()) { |
+ GetProxyForURLJob(const GURL& url, |
+ ProxyInfo* results, |
+ CompletionCallback* callback, |
+ const BoundNetLog& net_log) |
+ : Job(TYPE_GET_PROXY_FOR_URL, callback), |
+ results_(results), |
+ net_log_(net_log), |
+ url_(url), |
+ was_waiting_for_thread_(false) { |
DCHECK(callback); |
} |
- // Start the resolve proxy request on the worker thread. |
- void Start() { |
- is_started_ = true; |
+ BoundNetLog* net_log() { return &net_log_; } |
- size_t load_log_bound = 100; |
- |
- coordinator_->thread()->message_loop()->PostTask( |
- FROM_HERE, NewRunnableMethod(this, &Job::DoQuery, |
- coordinator_->resolver_.get(), |
- load_log_bound)); |
+ virtual void WaitingForThread() { |
+ was_waiting_for_thread_ = true; |
+ net_log_.BeginEvent( |
+ NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); |
} |
- bool is_started() const { return is_started_; } |
+ virtual void FinishedWaitingForThread() { |
+ DCHECK(executor()); |
- void Cancel() { |
- // Clear these to inform QueryComplete that it should not try to |
- // access them. |
- coordinator_ = NULL; |
- callback_ = NULL; |
- results_ = NULL; |
+ if (was_waiting_for_thread_) { |
+ net_log_.EndEvent( |
+ NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); |
+ } |
+ |
+ net_log_.AddEvent( |
+ NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, |
+ new NetLogIntegerParameter( |
+ "thread_number", executor()->thread_number())); |
} |
- // Returns true if Cancel() has been called. |
- bool was_cancelled() const { return callback_ == NULL; } |
- |
- BoundNetLog* net_log() { return &net_log_; } |
- |
- private: |
- friend class base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job>; |
- |
- ~Job() {} |
- |
// Runs on the worker thread. |
- void DoQuery(ProxyResolver* resolver, size_t load_log_bound) { |
- worker_log_.reset(new CapturingNetLog(load_log_bound)); |
+ virtual void Run(MessageLoop* origin_loop) { |
+ const size_t kNetLogBound = 50u; |
+ worker_log_.reset(new CapturingNetLog(kNetLogBound)); |
BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get()); |
- int rv = resolver->GetProxyForURL(url_, &results_buf_, NULL, NULL, |
- bound_worker_log); |
+ ProxyResolver* resolver = executor()->resolver(); |
+ int rv = resolver->GetProxyForURL( |
+ url_, &results_buf_, NULL, NULL, bound_worker_log); |
DCHECK_NE(rv, ERR_IO_PENDING); |
- origin_loop_->PostTask(FROM_HERE, |
- NewRunnableMethod(this, &Job::QueryComplete, rv)); |
+ origin_loop->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv)); |
} |
+ private: |
// Runs the completion callback on the origin thread. |
void QueryComplete(int result_code) { |
- // Merge the load log that was generated on the worker thread, into the |
- // main log. |
- CapturingBoundNetLog bound_worker_log(NetLog::Source(), |
- worker_log_.release()); |
- bound_worker_log.AppendTo(net_log_); |
- |
// The Job may have been cancelled after it was started. |
if (!was_cancelled()) { |
+ // Merge the load log that was generated on the worker thread, into the |
+ // main log. |
+ CapturingBoundNetLog bound_worker_log(NetLog::Source(), |
+ worker_log_.release()); |
+ bound_worker_log.AppendTo(net_log_); |
+ |
if (result_code >= OK) { // Note: unit-tests use values > 0. |
results_->Use(results_buf_); |
} |
- callback_->Run(result_code); |
- |
- // We check for cancellation once again, in case the callback deleted |
- // the owning ProxyService (whose destructor will in turn cancel us). |
- if (!was_cancelled()) |
- coordinator_->RemoveFrontOfJobsQueueAndStartNext(this); |
+ RunUserCallback(result_code); |
} |
+ OnJobCompleted(); |
} |
// Must only be used on the "origin" thread. |
- SingleThreadedProxyResolver* coordinator_; |
- CompletionCallback* callback_; |
ProxyInfo* results_; |
BoundNetLog net_log_; |
- GURL url_; |
- bool is_started_; |
+ const GURL url_; |
// Usable from within DoQuery on the worker thread. |
ProxyInfo results_buf_; |
- MessageLoop* origin_loop_; |
// Used to pass the captured events between DoQuery [worker thread] and |
// QueryComplete [origin thread]. |
scoped_ptr<CapturingNetLog> worker_log_; |
+ |
+ bool was_waiting_for_thread_; |
}; |
-// SingleThreadedProxyResolver ------------------------------------------------ |
+// MultiThreadedProxyResolver::Executor ---------------------------------------- |
-SingleThreadedProxyResolver::SingleThreadedProxyResolver( |
- ProxyResolver* resolver) |
- : ProxyResolver(resolver->expects_pac_bytes()), |
+MultiThreadedProxyResolver::Executor::Executor( |
+ MultiThreadedProxyResolver* coordinator, |
+ ProxyResolver* resolver, |
+ int thread_number) |
+ : coordinator_(coordinator), |
+ thread_number_(thread_number), |
resolver_(resolver) { |
+ DCHECK(coordinator); |
+ DCHECK(resolver); |
+ // Start up the thread. |
+ // Note that it is safe to pass a temporary C-String to Thread(), as it will |
+ // make a copy. |
+ std::string thread_name = |
+ StringPrintf("PAC thread #%d", thread_number); |
+ thread_.reset(new base::Thread(thread_name.c_str())); |
+ thread_->Start(); |
} |
-SingleThreadedProxyResolver::~SingleThreadedProxyResolver() { |
- // Cancel the inprogress job (if any), and free the rest. |
- for (PendingJobsQueue::iterator it = pending_jobs_.begin(); |
- it != pending_jobs_.end(); |
- ++it) { |
- (*it)->Cancel(); |
+void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { |
+ DCHECK(!outstanding_job_); |
+ outstanding_job_ = job; |
+ |
+ // Run the job. Once it has completed (regardless of whether it was |
+ // cancelled), it will invoke OnJobCompleted() on this thread. |
+ job->set_executor(this); |
+ job->FinishedWaitingForThread(); |
+ thread_->message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(job, &Job::Run, MessageLoop::current())); |
+} |
+ |
+void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { |
+ DCHECK_EQ(job, outstanding_job_.get()); |
+ outstanding_job_ = NULL; |
+ coordinator_->OnExecutorReady(this); |
+} |
+ |
+void MultiThreadedProxyResolver::Executor::Destroy() { |
+ DCHECK(coordinator_); |
+ |
+ // Give the resolver an opportunity to shutdown from THIS THREAD before |
+ // joining on the resolver thread. This allows certain implementations |
+ // to avoid deadlocks. |
+ resolver_->Shutdown(); |
+ |
+ // Join the worker thread. |
+ thread_.reset(); |
+ |
+ // Cancel any outstanding job. |
+ if (outstanding_job_) { |
+ outstanding_job_->Cancel(); |
+ // Orphan the job (since this executor may be deleted soon). |
+ outstanding_job_->set_executor(NULL); |
} |
- if (outstanding_set_pac_script_task_) |
- outstanding_set_pac_script_task_->Cancel(); |
+ // It is now safe to free the ProxyResolver, since all the tasks that |
+ // were using it on the resolver thread have completed. |
+ resolver_.reset(); |
- // Note that |thread_| is destroyed before |resolver_|. This is important |
- // since |resolver_| could be running on |thread_|. |
+ // Null some stuff as a precaution. |
+ coordinator_ = NULL; |
+ outstanding_job_ = NULL; |
} |
-int SingleThreadedProxyResolver::GetProxyForURL(const GURL& url, |
- ProxyInfo* results, |
- CompletionCallback* callback, |
- RequestHandle* request, |
- const BoundNetLog& net_log) { |
+void MultiThreadedProxyResolver::Executor::PurgeMemory() { |
+ scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); |
+ thread_->message_loop()->PostTask( |
+ FROM_HERE, |
+ NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); |
+} |
+ |
+MultiThreadedProxyResolver::Executor::~Executor() { |
+ // The important cleanup happens as part of Destroy(), which should always be |
+ // called first. |
+ DCHECK(!coordinator_) << "Destroy() was not called"; |
+ DCHECK(!thread_.get()); |
+ DCHECK(!resolver_.get()); |
+ DCHECK(!outstanding_job_); |
+} |
+ |
+// MultiThreadedProxyResolver -------------------------------------------------- |
+ |
+MultiThreadedProxyResolver::MultiThreadedProxyResolver( |
+ ProxyResolverFactory* resolver_factory, |
+ size_t max_num_threads) |
+ : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), |
+ resolver_factory_(resolver_factory), |
+ max_num_threads_(max_num_threads), |
+ was_set_pac_script_called_(false) { |
+ DCHECK_GE(max_num_threads, 1u); |
+} |
+ |
+MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { |
+ // We will cancel all outstanding requests. |
+ pending_jobs_.clear(); |
+ ReleaseAllExecutors(); |
+} |
+ |
+int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url, |
+ ProxyInfo* results, |
+ CompletionCallback* callback, |
+ RequestHandle* request, |
+ const BoundNetLog& net_log) { |
+ DCHECK(CalledOnValidThread()); |
DCHECK(callback); |
+ DCHECK(was_set_pac_script_called_) |
+ << "Resolver is un-initialized. Must call SetPacScript() first!"; |
- scoped_refptr<Job> job = new Job(this, url, results, callback, net_log); |
- bool is_first_job = pending_jobs_.empty(); |
- pending_jobs_.push_back(job); // Jobs can never finish synchronously. |
+ scoped_refptr<GetProxyForURLJob> job = |
+ new GetProxyForURLJob(url, results, callback, net_log); |
- if (is_first_job) { |
- // If there is nothing already running, start the job now. |
- EnsureThreadStarted(); |
- job->Start(); |
- } else { |
- // Otherwise the job will get started eventually by ProcessPendingJobs(). |
- job->net_log()->BeginEvent( |
- NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL); |
- } |
- |
// Completion will be notified through |callback|, unless the caller cancels |
// the request using |request|. |
if (request) |
*request = reinterpret_cast<RequestHandle>(job.get()); |
+ // If there is an executor that is ready to run this request, submit it! |
+ Executor* executor = FindIdleExecutor(); |
+ if (executor) { |
+ DCHECK_EQ(0u, pending_jobs_.size()); |
+ executor->StartJob(job); |
+ return ERR_IO_PENDING; |
+ } |
+ |
+ // Otherwise queue this request. (We will schedule it to a thread once one |
+ // becomes available). |
+ job->WaitingForThread(); |
+ pending_jobs_.push_back(job); |
+ |
+ // If we haven't already reached the thread limit, provision a new thread to |
+ // drain the requests more quickly. |
+ if (executors_.size() < max_num_threads_) { |
+ executor = AddNewExecutor(); |
+ executor->StartJob( |
+ new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL)); |
+ } |
+ |
return ERR_IO_PENDING; |
} |
-// There are three states of the request we need to handle: |
-// (1) Not started (just sitting in the queue). |
-// (2) Executing Job::DoQuery in the worker thread. |
-// (3) Waiting for Job::QueryComplete to be run on the origin thread. |
-void SingleThreadedProxyResolver::CancelRequest(RequestHandle req) { |
+void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { |
+ DCHECK(CalledOnValidThread()); |
DCHECK(req); |
Job* job = reinterpret_cast<Job*>(req); |
+ DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); |
- bool is_active_job = job->is_started() && !pending_jobs_.empty() && |
- pending_jobs_.front().get() == job; |
+ if (job->executor()) { |
+ // If the job was already submitted to the executor, just mark it |
+ // as cancelled so the user callback isn't run on completion. |
+ job->Cancel(); |
+ } else { |
+ // Otherwise the job is just sitting in a queue. |
+ PendingJobsQueue::iterator it = |
+ std::find(pending_jobs_.begin(), pending_jobs_.end(), job); |
+ DCHECK(it != pending_jobs_.end()); |
+ pending_jobs_.erase(it); |
+ } |
+} |
- job->Cancel(); |
+void MultiThreadedProxyResolver::CancelSetPacScript() { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(0u, pending_jobs_.size()); |
+ DCHECK_EQ(1u, executors_.size()); |
+ DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, |
+ executors_[0]->outstanding_job()->type()); |
- if (is_active_job) { |
- RemoveFrontOfJobsQueueAndStartNext(job); |
- return; |
- } |
+ // Defensively clear some data which shouldn't be getting used |
+ // anymore. |
+ was_set_pac_script_called_ = false; |
+ current_pac_url_ = GURL(); |
+ current_pac_script_ = string16(); |
- // Otherwise just delete the job from the queue. |
- PendingJobsQueue::iterator it = std::find( |
- pending_jobs_.begin(), pending_jobs_.end(), job); |
- DCHECK(it != pending_jobs_.end()); |
- pending_jobs_.erase(it); |
+ ReleaseAllExecutors(); |
} |
-void SingleThreadedProxyResolver::CancelSetPacScript() { |
- DCHECK(outstanding_set_pac_script_task_); |
- outstanding_set_pac_script_task_->Cancel(); |
- outstanding_set_pac_script_task_ = NULL; |
-} |
- |
-void SingleThreadedProxyResolver::PurgeMemory() { |
- if (thread_.get()) { |
- scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); |
- thread_->message_loop()->PostTask(FROM_HERE, |
- NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); |
+void MultiThreadedProxyResolver::PurgeMemory() { |
+ DCHECK(CalledOnValidThread()); |
+ for (ExecutorList::iterator it = executors_.begin(); |
+ it != executors_.end(); ++it) { |
+ Executor* executor = *it; |
+ executor->PurgeMemory(); |
} |
} |
-int SingleThreadedProxyResolver::SetPacScript( |
+int MultiThreadedProxyResolver::SetPacScript( |
const GURL& pac_url, |
const string16& pac_script, |
CompletionCallback* callback) { |
- EnsureThreadStarted(); |
- DCHECK(!outstanding_set_pac_script_task_); |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK(callback); |
- SetPacScriptTask* task = new SetPacScriptTask( |
- this, pac_url, pac_script, callback); |
- outstanding_set_pac_script_task_ = task; |
- task->Start(); |
+ // Save the script details, so we can provision new executors later. |
+ // (We rely on internal reference counting of strings to avoid this memory |
+ // being duplicated by each of the resolver threads). |
+ was_set_pac_script_called_ = true; |
+ current_pac_url_ = pac_url; |
+ current_pac_script_ = pac_script; |
+ |
+ // The user should not have any outstanding requests when they call |
+ // SetPacScript(). |
+ CheckNoOutstandingUserRequests(); |
+ |
+ // Destroy all of the current threads and their proxy resolvers. |
+ ReleaseAllExecutors(); |
+ |
+ // Provision a new executor, and run the SetPacScript request. On completion |
+ // notification will be sent through |callback|. |
+ Executor* executor = AddNewExecutor(); |
+ executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback)); |
return ERR_IO_PENDING; |
} |
-void SingleThreadedProxyResolver::EnsureThreadStarted() { |
- if (!thread_.get()) { |
- thread_.reset(new base::Thread("pac-thread")); |
- thread_->Start(); |
+void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { |
+ DCHECK(CalledOnValidThread()); |
+ CHECK_EQ(0u, pending_jobs_.size()); |
+ |
+ for (ExecutorList::const_iterator it = executors_.begin(); |
+ it != executors_.end(); ++it) { |
+ const Executor* executor = *it; |
+ Job* job = executor->outstanding_job(); |
+ // The "has_user_callback()" is to exclude jobs for which the callback |
+ // has already been invoked, or was not user-initiated (as in the case of |
+ // lazy thread provisions). User-initiated jobs may !has_user_callback() |
+ // when the callback has already been run. (Since we only clear the |
+ // outstanding job AFTER the callback has been invoked, it is possible |
+ // for a new request to be started from within the callback). |
+ CHECK(!job || job->was_cancelled() || !job->has_user_callback()); |
} |
} |
-void SingleThreadedProxyResolver::ProcessPendingJobs() { |
- if (pending_jobs_.empty()) |
- return; |
+void MultiThreadedProxyResolver::ReleaseAllExecutors() { |
+ DCHECK(CalledOnValidThread()); |
+ for (ExecutorList::iterator it = executors_.begin(); |
+ it != executors_.end(); ++it) { |
+ Executor* executor = *it; |
+ executor->Destroy(); |
+ } |
+ executors_.clear(); |
+} |
- // Get the next job to process (FIFO). |
- Job* job = pending_jobs_.front().get(); |
- if (job->is_started()) |
- return; |
+MultiThreadedProxyResolver::Executor* |
+MultiThreadedProxyResolver::FindIdleExecutor() { |
+ DCHECK(CalledOnValidThread()); |
+ for (ExecutorList::iterator it = executors_.begin(); |
+ it != executors_.end(); ++it) { |
+ Executor* executor = *it; |
+ if (!executor->outstanding_job()) |
+ return executor; |
+ } |
+ return NULL; |
+} |
- job->net_log()->EndEvent( |
- NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL); |
- |
- EnsureThreadStarted(); |
- job->Start(); |
+MultiThreadedProxyResolver::Executor* |
+MultiThreadedProxyResolver::AddNewExecutor() { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_LT(executors_.size(), max_num_threads_); |
+ // The "thread number" is used to give the thread a unique name. |
+ int thread_number = executors_.size(); |
+ ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); |
+ Executor* executor = new Executor( |
+ this, resolver, thread_number); |
+ executors_.push_back(executor); |
+ return executor; |
} |
-void SingleThreadedProxyResolver::RemoveFrontOfJobsQueueAndStartNext( |
- Job* expected_job) { |
- DCHECK_EQ(expected_job, pending_jobs_.front().get()); |
+void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
+ DCHECK(CalledOnValidThread()); |
+ if (pending_jobs_.empty()) |
+ return; |
+ |
+ // Get the next job to process (FIFO). Transfer it from the pending queue |
+ // to the executor. |
+ scoped_refptr<Job> job = pending_jobs_.front(); |
pending_jobs_.pop_front(); |
- |
- // Start next work item. |
- ProcessPendingJobs(); |
+ executor->StartJob(job); |
} |
-void SingleThreadedProxyResolver::RemoveOutstandingSetPacScriptTask( |
- SetPacScriptTask* task) { |
- DCHECK_EQ(outstanding_set_pac_script_task_.get(), task); |
- outstanding_set_pac_script_task_ = NULL; |
-} |
- |
} // namespace net |