Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(261)

Unified Diff: net/proxy/multi_threaded_proxy_resolver.cc

Issue 2822043: Add the capability to run multiple proxy PAC scripts in parallel.... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Re-upload after revert Created 10 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/proxy/multi_threaded_proxy_resolver.h ('k') | net/proxy/multi_threaded_proxy_resolver_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/proxy/multi_threaded_proxy_resolver.cc
===================================================================
--- net/proxy/multi_threaded_proxy_resolver.cc (revision 51914)
+++ net/proxy/multi_threaded_proxy_resolver.cc (working copy)
@@ -2,14 +2,19 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "net/proxy/single_threaded_proxy_resolver.h"
+#include "net/proxy/multi_threaded_proxy_resolver.h"
#include "base/message_loop.h"
+#include "base/string_util.h"
#include "base/thread.h"
#include "net/base/capturing_net_log.h"
#include "net/base/net_errors.h"
#include "net/proxy/proxy_info.h"
+// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
+// data when SetPacScript fails. That will reclaim memory when
+// testing bogus scripts.
+
namespace net {
namespace {
@@ -26,328 +31,558 @@
} // namespace
-// SingleThreadedProxyResolver::SetPacScriptTask ------------------------------
+// An "executor" is a job-runner for PAC requests. It encapsulates a worker
+// thread and a synchronous ProxyResolver (which will be operated on said
+// thread.)
+class MultiThreadedProxyResolver::Executor
+ : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
+ public:
+ // |coordinator| must remain valid throughout our lifetime. It is used to
+ // signal when the executor is ready to receive work by calling
+ // |coordinator->OnExecutorReady()|.
+ // The constructor takes ownership of |resolver|.
+ // |thread_number| is an identifier used when naming the worker thread.
+ Executor(MultiThreadedProxyResolver* coordinator,
+ ProxyResolver* resolver,
+ int thread_number);
-// Runs on the worker thread to call ProxyResolver::SetPacScript.
-class SingleThreadedProxyResolver::SetPacScriptTask
- : public base::RefCountedThreadSafe<
- SingleThreadedProxyResolver::SetPacScriptTask> {
+ // Submit a job to this executor.
+ void StartJob(Job* job);
+
+ // Callback for when a job has completed running on the executor's thread.
+ void OnJobCompleted(Job* job);
+
+ // Cleanup the executor. Cancels all outstanding work, and frees the thread
+ // and resolver.
+ void Destroy();
+
+ void PurgeMemory();
+
+ // Returns the outstanding job, or NULL.
+ Job* outstanding_job() const { return outstanding_job_.get(); }
+
+ ProxyResolver* resolver() { return resolver_.get(); }
+
+ int thread_number() const { return thread_number_; }
+
+ private:
+ friend class base::RefCountedThreadSafe<Executor>;
+ ~Executor();
+
+ MultiThreadedProxyResolver* coordinator_;
+ const int thread_number_;
+
+ // The currently active job for this executor (either a SetPacScript or
+ // GetProxyForURL task).
+ scoped_refptr<Job> outstanding_job_;
+
+ // The synchronous resolver implementation.
+ scoped_ptr<ProxyResolver> resolver_;
+
+ // The thread where |resolver_| is run on.
+ // Note that declaration ordering is important here. |thread_| needs to be
+ // destroyed *before* |resolver_|, in case |resolver_| is currently
+ // executing on |thread_|.
+ scoped_ptr<base::Thread> thread_;
+};
+
+// MultiThreadedProxyResolver::Job ---------------------------------------------
+
+class MultiThreadedProxyResolver::Job
+ : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
public:
- SetPacScriptTask(SingleThreadedProxyResolver* coordinator,
- const GURL& pac_url,
- const string16& pac_script,
- CompletionCallback* callback)
- : coordinator_(coordinator),
- callback_(callback),
- pac_script_(pac_script),
- pac_url_(pac_url),
- origin_loop_(MessageLoop::current()) {
- DCHECK(callback);
+ // Identifies the subclass of Job (only being used for debugging purposes).
+ enum Type {
+ TYPE_GET_PROXY_FOR_URL,
+ TYPE_SET_PAC_SCRIPT,
+ TYPE_SET_PAC_SCRIPT_INTERNAL,
+ };
+
+ Job(Type type, CompletionCallback* user_callback)
+ : type_(type),
+ user_callback_(user_callback),
+ executor_(NULL),
+ was_cancelled_(false) {
}
- // Start the SetPacScript request on the worker thread.
- void Start() {
- coordinator_->thread()->message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &SetPacScriptTask::DoRequest,
- coordinator_->resolver_.get()));
+ void set_executor(Executor* executor) {
+ executor_ = executor;
}
+ // The "executor" is the job runner that is scheduling this job. If
+ // this job has not been submitted to an executor yet, this will be
+ // NULL (and we know it hasn't started yet).
+ Executor* executor() {
+ return executor_;
+ }
+
+ // Mark the job as having been cancelled.
void Cancel() {
- // Clear these to inform RequestComplete that it should not try to
- // access them.
- coordinator_ = NULL;
- callback_ = NULL;
+ was_cancelled_ = true;
}
// Returns true if Cancel() has been called.
- bool was_cancelled() const { return callback_ == NULL; }
+ bool was_cancelled() const { return was_cancelled_; }
+ Type type() const { return type_; }
+
+ // Returns true if this job still has a user callback. Some jobs
+ // do not have a user callback, because they were helper jobs
+ // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
+ //
+ // Otherwise jobs that correspond with user-initiated work will
+ // have a non-NULL callback up until the callback is run.
+ bool has_user_callback() const { return user_callback_ != NULL; }
+
+ // This method is called when the job is inserted into a wait queue
+ // because no executors were ready to accept it.
+ virtual void WaitingForThread() {}
+
+ // This method is called just before the job is posted to the work thread.
+ virtual void FinishedWaitingForThread() {}
+
+ // This method is called on the worker thread to do the job's work. On
+ // completion, implementors are expected to call OnJobCompleted() on
+ // |origin_loop|.
+ virtual void Run(MessageLoop* origin_loop) = 0;
+
+ protected:
+ void OnJobCompleted() {
+ // |executor_| will be NULL if the executor has already been deleted.
+ if (executor_)
+ executor_->OnJobCompleted(this);
+ }
+
+ void RunUserCallback(int result) {
+ DCHECK(has_user_callback());
+ CompletionCallback* callback = user_callback_;
+ // Null the callback so has_user_callback() will now return false.
+ user_callback_ = NULL;
+ callback->Run(result);
+ }
+
+ friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
+
+ virtual ~Job() {}
+
private:
- friend class base::RefCountedThreadSafe<
- SingleThreadedProxyResolver::SetPacScriptTask>;
+ const Type type_;
+ CompletionCallback* user_callback_;
+ Executor* executor_;
+ bool was_cancelled_;
+};
- ~SetPacScriptTask() {}
+// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
+// Runs on the worker thread to call ProxyResolver::SetPacScript.
+class MultiThreadedProxyResolver::SetPacScriptJob
+ : public MultiThreadedProxyResolver::Job {
+ public:
+ SetPacScriptJob(const GURL& pac_url,
+ const string16& pac_script,
+ CompletionCallback* callback)
+ : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
+ callback),
+ pac_url_(pac_url),
+ pac_script_(pac_script) {
+ }
+
// Runs on the worker thread.
- void DoRequest(ProxyResolver* resolver) {
+ virtual void Run(MessageLoop* origin_loop) {
+ ProxyResolver* resolver = executor()->resolver();
int rv = resolver->expects_pac_bytes() ?
resolver->SetPacScriptByData(pac_script_, NULL) :
resolver->SetPacScriptByUrl(pac_url_, NULL);
DCHECK_NE(rv, ERR_IO_PENDING);
- origin_loop_->PostTask(FROM_HERE,
- NewRunnableMethod(this, &SetPacScriptTask::RequestComplete, rv));
+ origin_loop->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
}
+ private:
// Runs the completion callback on the origin thread.
void RequestComplete(int result_code) {
// The task may have been cancelled after it was started.
- if (!was_cancelled()) {
- CompletionCallback* callback = callback_;
- coordinator_->RemoveOutstandingSetPacScriptTask(this);
- callback->Run(result_code);
+ if (!was_cancelled() && has_user_callback()) {
+ RunUserCallback(result_code);
}
+ OnJobCompleted();
}
- // Must only be used on the "origin" thread.
- SingleThreadedProxyResolver* coordinator_;
- CompletionCallback* callback_;
- string16 pac_script_;
- GURL pac_url_;
-
- // Usable from within DoQuery on the worker thread.
- MessageLoop* origin_loop_;
+ const GURL pac_url_;
+ const string16 pac_script_;
};
-// SingleThreadedProxyResolver::Job -------------------------------------------
+// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
-class SingleThreadedProxyResolver::Job
- : public base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job> {
+class MultiThreadedProxyResolver::GetProxyForURLJob
+ : public MultiThreadedProxyResolver::Job {
public:
- // |coordinator| -- the SingleThreadedProxyResolver that owns this job.
// |url| -- the URL of the query.
// |results| -- the structure to fill with proxy resolve results.
- Job(SingleThreadedProxyResolver* coordinator,
- const GURL& url,
- ProxyInfo* results,
- CompletionCallback* callback,
- const BoundNetLog& net_log)
- : coordinator_(coordinator),
- callback_(callback),
- results_(results),
- net_log_(net_log),
- url_(url),
- is_started_(false),
- origin_loop_(MessageLoop::current()) {
+ GetProxyForURLJob(const GURL& url,
+ ProxyInfo* results,
+ CompletionCallback* callback,
+ const BoundNetLog& net_log)
+ : Job(TYPE_GET_PROXY_FOR_URL, callback),
+ results_(results),
+ net_log_(net_log),
+ url_(url),
+ was_waiting_for_thread_(false) {
DCHECK(callback);
}
- // Start the resolve proxy request on the worker thread.
- void Start() {
- is_started_ = true;
+ BoundNetLog* net_log() { return &net_log_; }
- size_t load_log_bound = 100;
-
- coordinator_->thread()->message_loop()->PostTask(
- FROM_HERE, NewRunnableMethod(this, &Job::DoQuery,
- coordinator_->resolver_.get(),
- load_log_bound));
+ virtual void WaitingForThread() {
+ was_waiting_for_thread_ = true;
+ net_log_.BeginEvent(
+ NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
}
- bool is_started() const { return is_started_; }
+ virtual void FinishedWaitingForThread() {
+ DCHECK(executor());
- void Cancel() {
- // Clear these to inform QueryComplete that it should not try to
- // access them.
- coordinator_ = NULL;
- callback_ = NULL;
- results_ = NULL;
+ if (was_waiting_for_thread_) {
+ net_log_.EndEvent(
+ NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
+ }
+
+ net_log_.AddEvent(
+ NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
+ new NetLogIntegerParameter(
+ "thread_number", executor()->thread_number()));
}
- // Returns true if Cancel() has been called.
- bool was_cancelled() const { return callback_ == NULL; }
-
- BoundNetLog* net_log() { return &net_log_; }
-
- private:
- friend class base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job>;
-
- ~Job() {}
-
// Runs on the worker thread.
- void DoQuery(ProxyResolver* resolver, size_t load_log_bound) {
- worker_log_.reset(new CapturingNetLog(load_log_bound));
+ virtual void Run(MessageLoop* origin_loop) {
+ const size_t kNetLogBound = 50u;
+ worker_log_.reset(new CapturingNetLog(kNetLogBound));
BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get());
- int rv = resolver->GetProxyForURL(url_, &results_buf_, NULL, NULL,
- bound_worker_log);
+ ProxyResolver* resolver = executor()->resolver();
+ int rv = resolver->GetProxyForURL(
+ url_, &results_buf_, NULL, NULL, bound_worker_log);
DCHECK_NE(rv, ERR_IO_PENDING);
- origin_loop_->PostTask(FROM_HERE,
- NewRunnableMethod(this, &Job::QueryComplete, rv));
+ origin_loop->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
}
+ private:
// Runs the completion callback on the origin thread.
void QueryComplete(int result_code) {
- // Merge the load log that was generated on the worker thread, into the
- // main log.
- CapturingBoundNetLog bound_worker_log(NetLog::Source(),
- worker_log_.release());
- bound_worker_log.AppendTo(net_log_);
-
// The Job may have been cancelled after it was started.
if (!was_cancelled()) {
+ // Merge the load log that was generated on the worker thread, into the
+ // main log.
+ CapturingBoundNetLog bound_worker_log(NetLog::Source(),
+ worker_log_.release());
+ bound_worker_log.AppendTo(net_log_);
+
if (result_code >= OK) { // Note: unit-tests use values > 0.
results_->Use(results_buf_);
}
- callback_->Run(result_code);
-
- // We check for cancellation once again, in case the callback deleted
- // the owning ProxyService (whose destructor will in turn cancel us).
- if (!was_cancelled())
- coordinator_->RemoveFrontOfJobsQueueAndStartNext(this);
+ RunUserCallback(result_code);
}
+ OnJobCompleted();
}
// Must only be used on the "origin" thread.
- SingleThreadedProxyResolver* coordinator_;
- CompletionCallback* callback_;
ProxyInfo* results_;
BoundNetLog net_log_;
- GURL url_;
- bool is_started_;
+ const GURL url_;
// Usable from within DoQuery on the worker thread.
ProxyInfo results_buf_;
- MessageLoop* origin_loop_;
// Used to pass the captured events between DoQuery [worker thread] and
// QueryComplete [origin thread].
scoped_ptr<CapturingNetLog> worker_log_;
+
+ bool was_waiting_for_thread_;
};
-// SingleThreadedProxyResolver ------------------------------------------------
+// MultiThreadedProxyResolver::Executor ----------------------------------------
-SingleThreadedProxyResolver::SingleThreadedProxyResolver(
- ProxyResolver* resolver)
- : ProxyResolver(resolver->expects_pac_bytes()),
+MultiThreadedProxyResolver::Executor::Executor(
+ MultiThreadedProxyResolver* coordinator,
+ ProxyResolver* resolver,
+ int thread_number)
+ : coordinator_(coordinator),
+ thread_number_(thread_number),
resolver_(resolver) {
+ DCHECK(coordinator);
+ DCHECK(resolver);
+ // Start up the thread.
+ // Note that it is safe to pass a temporary C-String to Thread(), as it will
+ // make a copy.
+ std::string thread_name =
+ StringPrintf("PAC thread #%d", thread_number);
+ thread_.reset(new base::Thread(thread_name.c_str()));
+ thread_->Start();
}
-SingleThreadedProxyResolver::~SingleThreadedProxyResolver() {
- // Cancel the inprogress job (if any), and free the rest.
- for (PendingJobsQueue::iterator it = pending_jobs_.begin();
- it != pending_jobs_.end();
- ++it) {
- (*it)->Cancel();
+void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
+ DCHECK(!outstanding_job_);
+ outstanding_job_ = job;
+
+ // Run the job. Once it has completed (regardless of whether it was
+ // cancelled), it will invoke OnJobCompleted() on this thread.
+ job->set_executor(this);
+ job->FinishedWaitingForThread();
+ thread_->message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
+}
+
+void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
+ DCHECK_EQ(job, outstanding_job_.get());
+ outstanding_job_ = NULL;
+ coordinator_->OnExecutorReady(this);
+}
+
+void MultiThreadedProxyResolver::Executor::Destroy() {
+ DCHECK(coordinator_);
+
+ // Give the resolver an opportunity to shutdown from THIS THREAD before
+ // joining on the resolver thread. This allows certain implementations
+ // to avoid deadlocks.
+ resolver_->Shutdown();
+
+ // Join the worker thread.
+ thread_.reset();
+
+ // Cancel any outstanding job.
+ if (outstanding_job_) {
+ outstanding_job_->Cancel();
+ // Orphan the job (since this executor may be deleted soon).
+ outstanding_job_->set_executor(NULL);
}
- if (outstanding_set_pac_script_task_)
- outstanding_set_pac_script_task_->Cancel();
+ // It is now safe to free the ProxyResolver, since all the tasks that
+ // were using it on the resolver thread have completed.
+ resolver_.reset();
- // Note that |thread_| is destroyed before |resolver_|. This is important
- // since |resolver_| could be running on |thread_|.
+ // Null some stuff as a precaution.
+ coordinator_ = NULL;
+ outstanding_job_ = NULL;
}
-int SingleThreadedProxyResolver::GetProxyForURL(const GURL& url,
- ProxyInfo* results,
- CompletionCallback* callback,
- RequestHandle* request,
- const BoundNetLog& net_log) {
+void MultiThreadedProxyResolver::Executor::PurgeMemory() {
+ scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
+ thread_->message_loop()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
+}
+
+MultiThreadedProxyResolver::Executor::~Executor() {
+ // The important cleanup happens as part of Destroy(), which should always be
+ // called first.
+ DCHECK(!coordinator_) << "Destroy() was not called";
+ DCHECK(!thread_.get());
+ DCHECK(!resolver_.get());
+ DCHECK(!outstanding_job_);
+}
+
+// MultiThreadedProxyResolver --------------------------------------------------
+
+MultiThreadedProxyResolver::MultiThreadedProxyResolver(
+ ProxyResolverFactory* resolver_factory,
+ size_t max_num_threads)
+ : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
+ resolver_factory_(resolver_factory),
+ max_num_threads_(max_num_threads),
+ was_set_pac_script_called_(false) {
+ DCHECK_GE(max_num_threads, 1u);
+}
+
+MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
+ // We will cancel all outstanding requests.
+ pending_jobs_.clear();
+ ReleaseAllExecutors();
+}
+
+int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
+ ProxyInfo* results,
+ CompletionCallback* callback,
+ RequestHandle* request,
+ const BoundNetLog& net_log) {
+ DCHECK(CalledOnValidThread());
DCHECK(callback);
+ DCHECK(was_set_pac_script_called_)
+ << "Resolver is un-initialized. Must call SetPacScript() first!";
- scoped_refptr<Job> job = new Job(this, url, results, callback, net_log);
- bool is_first_job = pending_jobs_.empty();
- pending_jobs_.push_back(job); // Jobs can never finish synchronously.
+ scoped_refptr<GetProxyForURLJob> job =
+ new GetProxyForURLJob(url, results, callback, net_log);
- if (is_first_job) {
- // If there is nothing already running, start the job now.
- EnsureThreadStarted();
- job->Start();
- } else {
- // Otherwise the job will get started eventually by ProcessPendingJobs().
- job->net_log()->BeginEvent(
- NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL);
- }
-
// Completion will be notified through |callback|, unless the caller cancels
// the request using |request|.
if (request)
*request = reinterpret_cast<RequestHandle>(job.get());
+ // If there is an executor that is ready to run this request, submit it!
+ Executor* executor = FindIdleExecutor();
+ if (executor) {
+ DCHECK_EQ(0u, pending_jobs_.size());
+ executor->StartJob(job);
+ return ERR_IO_PENDING;
+ }
+
+ // Otherwise queue this request. (We will schedule it to a thread once one
+ // becomes available).
+ job->WaitingForThread();
+ pending_jobs_.push_back(job);
+
+ // If we haven't already reached the thread limit, provision a new thread to
+ // drain the requests more quickly.
+ if (executors_.size() < max_num_threads_) {
+ executor = AddNewExecutor();
+ executor->StartJob(
+ new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL));
+ }
+
return ERR_IO_PENDING;
}
-// There are three states of the request we need to handle:
-// (1) Not started (just sitting in the queue).
-// (2) Executing Job::DoQuery in the worker thread.
-// (3) Waiting for Job::QueryComplete to be run on the origin thread.
-void SingleThreadedProxyResolver::CancelRequest(RequestHandle req) {
+void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
+ DCHECK(CalledOnValidThread());
DCHECK(req);
Job* job = reinterpret_cast<Job*>(req);
+ DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
- bool is_active_job = job->is_started() && !pending_jobs_.empty() &&
- pending_jobs_.front().get() == job;
+ if (job->executor()) {
+ // If the job was already submitted to the executor, just mark it
+ // as cancelled so the user callback isn't run on completion.
+ job->Cancel();
+ } else {
+ // Otherwise the job is just sitting in a queue.
+ PendingJobsQueue::iterator it =
+ std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
+ DCHECK(it != pending_jobs_.end());
+ pending_jobs_.erase(it);
+ }
+}
- job->Cancel();
+void MultiThreadedProxyResolver::CancelSetPacScript() {
+ DCHECK(CalledOnValidThread());
+ DCHECK_EQ(0u, pending_jobs_.size());
+ DCHECK_EQ(1u, executors_.size());
+ DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
+ executors_[0]->outstanding_job()->type());
- if (is_active_job) {
- RemoveFrontOfJobsQueueAndStartNext(job);
- return;
- }
+ // Defensively clear some data which shouldn't be getting used
+ // anymore.
+ was_set_pac_script_called_ = false;
+ current_pac_url_ = GURL();
+ current_pac_script_ = string16();
- // Otherwise just delete the job from the queue.
- PendingJobsQueue::iterator it = std::find(
- pending_jobs_.begin(), pending_jobs_.end(), job);
- DCHECK(it != pending_jobs_.end());
- pending_jobs_.erase(it);
+ ReleaseAllExecutors();
}
-void SingleThreadedProxyResolver::CancelSetPacScript() {
- DCHECK(outstanding_set_pac_script_task_);
- outstanding_set_pac_script_task_->Cancel();
- outstanding_set_pac_script_task_ = NULL;
-}
-
-void SingleThreadedProxyResolver::PurgeMemory() {
- if (thread_.get()) {
- scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
- thread_->message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
+void MultiThreadedProxyResolver::PurgeMemory() {
+ DCHECK(CalledOnValidThread());
+ for (ExecutorList::iterator it = executors_.begin();
+ it != executors_.end(); ++it) {
+ Executor* executor = *it;
+ executor->PurgeMemory();
}
}
-int SingleThreadedProxyResolver::SetPacScript(
+int MultiThreadedProxyResolver::SetPacScript(
const GURL& pac_url,
const string16& pac_script,
CompletionCallback* callback) {
- EnsureThreadStarted();
- DCHECK(!outstanding_set_pac_script_task_);
+ DCHECK(CalledOnValidThread());
+ DCHECK(callback);
- SetPacScriptTask* task = new SetPacScriptTask(
- this, pac_url, pac_script, callback);
- outstanding_set_pac_script_task_ = task;
- task->Start();
+ // Save the script details, so we can provision new executors later.
+ // (We rely on internal reference counting of strings to avoid this memory
+ // being duplicated by each of the resolver threads).
+ was_set_pac_script_called_ = true;
+ current_pac_url_ = pac_url;
+ current_pac_script_ = pac_script;
+
+ // The user should not have any outstanding requests when they call
+ // SetPacScript().
+ CheckNoOutstandingUserRequests();
+
+ // Destroy all of the current threads and their proxy resolvers.
+ ReleaseAllExecutors();
+
+ // Provision a new executor, and run the SetPacScript request. On completion
+ // notification will be sent through |callback|.
+ Executor* executor = AddNewExecutor();
+ executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback));
return ERR_IO_PENDING;
}
-void SingleThreadedProxyResolver::EnsureThreadStarted() {
- if (!thread_.get()) {
- thread_.reset(new base::Thread("pac-thread"));
- thread_->Start();
+void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
+ DCHECK(CalledOnValidThread());
+ CHECK_EQ(0u, pending_jobs_.size());
+
+ for (ExecutorList::const_iterator it = executors_.begin();
+ it != executors_.end(); ++it) {
+ const Executor* executor = *it;
+ Job* job = executor->outstanding_job();
+ // The "has_user_callback()" is to exclude jobs for which the callback
+ // has already been invoked, or was not user-initiated (as in the case of
+ // lazy thread provisions). User-initiated jobs may !has_user_callback()
+ // when the callback has already been run. (Since we only clear the
+ // outstanding job AFTER the callback has been invoked, it is possible
+ // for a new request to be started from within the callback).
+ CHECK(!job || job->was_cancelled() || !job->has_user_callback());
}
}
-void SingleThreadedProxyResolver::ProcessPendingJobs() {
- if (pending_jobs_.empty())
- return;
+void MultiThreadedProxyResolver::ReleaseAllExecutors() {
+ DCHECK(CalledOnValidThread());
+ for (ExecutorList::iterator it = executors_.begin();
+ it != executors_.end(); ++it) {
+ Executor* executor = *it;
+ executor->Destroy();
+ }
+ executors_.clear();
+}
- // Get the next job to process (FIFO).
- Job* job = pending_jobs_.front().get();
- if (job->is_started())
- return;
+MultiThreadedProxyResolver::Executor*
+MultiThreadedProxyResolver::FindIdleExecutor() {
+ DCHECK(CalledOnValidThread());
+ for (ExecutorList::iterator it = executors_.begin();
+ it != executors_.end(); ++it) {
+ Executor* executor = *it;
+ if (!executor->outstanding_job())
+ return executor;
+ }
+ return NULL;
+}
- job->net_log()->EndEvent(
- NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL);
-
- EnsureThreadStarted();
- job->Start();
+MultiThreadedProxyResolver::Executor*
+MultiThreadedProxyResolver::AddNewExecutor() {
+ DCHECK(CalledOnValidThread());
+ DCHECK_LT(executors_.size(), max_num_threads_);
+ // The "thread number" is used to give the thread a unique name.
+ int thread_number = executors_.size();
+ ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
+ Executor* executor = new Executor(
+ this, resolver, thread_number);
+ executors_.push_back(executor);
+ return executor;
}
-void SingleThreadedProxyResolver::RemoveFrontOfJobsQueueAndStartNext(
- Job* expected_job) {
- DCHECK_EQ(expected_job, pending_jobs_.front().get());
+void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
+ DCHECK(CalledOnValidThread());
+ if (pending_jobs_.empty())
+ return;
+
+ // Get the next job to process (FIFO). Transfer it from the pending queue
+ // to the executor.
+ scoped_refptr<Job> job = pending_jobs_.front();
pending_jobs_.pop_front();
-
- // Start next work item.
- ProcessPendingJobs();
+ executor->StartJob(job);
}
-void SingleThreadedProxyResolver::RemoveOutstandingSetPacScriptTask(
- SetPacScriptTask* task) {
- DCHECK_EQ(outstanding_set_pac_script_task_.get(), task);
- outstanding_set_pac_script_task_ = NULL;
-}
-
} // namespace net
« no previous file with comments | « net/proxy/multi_threaded_proxy_resolver.h ('k') | net/proxy/multi_threaded_proxy_resolver_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698