| Index: net/proxy/multi_threaded_proxy_resolver.cc
|
| ===================================================================
|
| --- net/proxy/multi_threaded_proxy_resolver.cc (revision 51914)
|
| +++ net/proxy/multi_threaded_proxy_resolver.cc (working copy)
|
| @@ -2,14 +2,19 @@
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
|
|
| -#include "net/proxy/single_threaded_proxy_resolver.h"
|
| +#include "net/proxy/multi_threaded_proxy_resolver.h"
|
|
|
| #include "base/message_loop.h"
|
| +#include "base/string_util.h"
|
| #include "base/thread.h"
|
| #include "net/base/capturing_net_log.h"
|
| #include "net/base/net_errors.h"
|
| #include "net/proxy/proxy_info.h"
|
|
|
| +// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
|
| +// data when SetPacScript fails. That will reclaim memory when
|
| +// testing bogus scripts.
|
| +
|
| namespace net {
|
|
|
| namespace {
|
| @@ -26,328 +31,558 @@
|
|
|
| } // namespace
|
|
|
| -// SingleThreadedProxyResolver::SetPacScriptTask ------------------------------
|
| +// An "executor" is a job-runner for PAC requests. It encapsulates a worker
|
| +// thread and a synchronous ProxyResolver (which will be operated on said
|
| +// thread.)
|
| +class MultiThreadedProxyResolver::Executor
|
| + : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
|
| + public:
|
| + // |coordinator| must remain valid throughout our lifetime. It is used to
|
| + // signal when the executor is ready to receive work by calling
|
| + // |coordinator->OnExecutorReady()|.
|
| + // The constructor takes ownership of |resolver|.
|
| + // |thread_number| is an identifier used when naming the worker thread.
|
| + Executor(MultiThreadedProxyResolver* coordinator,
|
| + ProxyResolver* resolver,
|
| + int thread_number);
|
|
|
| -// Runs on the worker thread to call ProxyResolver::SetPacScript.
|
| -class SingleThreadedProxyResolver::SetPacScriptTask
|
| - : public base::RefCountedThreadSafe<
|
| - SingleThreadedProxyResolver::SetPacScriptTask> {
|
| + // Submit a job to this executor.
|
| + void StartJob(Job* job);
|
| +
|
| + // Callback for when a job has completed running on the executor's thread.
|
| + void OnJobCompleted(Job* job);
|
| +
|
| + // Cleanup the executor. Cancels all outstanding work, and frees the thread
|
| + // and resolver.
|
| + void Destroy();
|
| +
|
| + void PurgeMemory();
|
| +
|
| + // Returns the outstanding job, or NULL.
|
| + Job* outstanding_job() const { return outstanding_job_.get(); }
|
| +
|
| + ProxyResolver* resolver() { return resolver_.get(); }
|
| +
|
| + int thread_number() const { return thread_number_; }
|
| +
|
| + private:
|
| + friend class base::RefCountedThreadSafe<Executor>;
|
| + ~Executor();
|
| +
|
| + MultiThreadedProxyResolver* coordinator_;
|
| + const int thread_number_;
|
| +
|
| + // The currently active job for this executor (either a SetPacScript or
|
| + // GetProxyForURL task).
|
| + scoped_refptr<Job> outstanding_job_;
|
| +
|
| + // The synchronous resolver implementation.
|
| + scoped_ptr<ProxyResolver> resolver_;
|
| +
|
| + // The thread where |resolver_| is run on.
|
| + // Note that declaration ordering is important here. |thread_| needs to be
|
| + // destroyed *before* |resolver_|, in case |resolver_| is currently
|
| + // executing on |thread_|.
|
| + scoped_ptr<base::Thread> thread_;
|
| +};
|
| +
|
| +// MultiThreadedProxyResolver::Job ---------------------------------------------
|
| +
|
| +class MultiThreadedProxyResolver::Job
|
| + : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
|
| public:
|
| - SetPacScriptTask(SingleThreadedProxyResolver* coordinator,
|
| - const GURL& pac_url,
|
| - const string16& pac_script,
|
| - CompletionCallback* callback)
|
| - : coordinator_(coordinator),
|
| - callback_(callback),
|
| - pac_script_(pac_script),
|
| - pac_url_(pac_url),
|
| - origin_loop_(MessageLoop::current()) {
|
| - DCHECK(callback);
|
| + // Identifies the subclass of Job (only being used for debugging purposes).
|
| + enum Type {
|
| + TYPE_GET_PROXY_FOR_URL,
|
| + TYPE_SET_PAC_SCRIPT,
|
| + TYPE_SET_PAC_SCRIPT_INTERNAL,
|
| + };
|
| +
|
| + Job(Type type, CompletionCallback* user_callback)
|
| + : type_(type),
|
| + user_callback_(user_callback),
|
| + executor_(NULL),
|
| + was_cancelled_(false) {
|
| }
|
|
|
| - // Start the SetPacScript request on the worker thread.
|
| - void Start() {
|
| - coordinator_->thread()->message_loop()->PostTask(
|
| - FROM_HERE, NewRunnableMethod(this, &SetPacScriptTask::DoRequest,
|
| - coordinator_->resolver_.get()));
|
| + void set_executor(Executor* executor) {
|
| + executor_ = executor;
|
| }
|
|
|
| + // The "executor" is the job runner that is scheduling this job. If
|
| + // this job has not been submitted to an executor yet, this will be
|
| + // NULL (and we know it hasn't started yet).
|
| + Executor* executor() {
|
| + return executor_;
|
| + }
|
| +
|
| + // Mark the job as having been cancelled.
|
| void Cancel() {
|
| - // Clear these to inform RequestComplete that it should not try to
|
| - // access them.
|
| - coordinator_ = NULL;
|
| - callback_ = NULL;
|
| + was_cancelled_ = true;
|
| }
|
|
|
| // Returns true if Cancel() has been called.
|
| - bool was_cancelled() const { return callback_ == NULL; }
|
| + bool was_cancelled() const { return was_cancelled_; }
|
|
|
| + Type type() const { return type_; }
|
| +
|
| + // Returns true if this job still has a user callback. Some jobs
|
| + // do not have a user callback, because they were helper jobs
|
| + // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
|
| + //
|
| + // Otherwise jobs that correspond with user-initiated work will
|
| + // have a non-NULL callback up until the callback is run.
|
| + bool has_user_callback() const { return user_callback_ != NULL; }
|
| +
|
| + // This method is called when the job is inserted into a wait queue
|
| + // because no executors were ready to accept it.
|
| + virtual void WaitingForThread() {}
|
| +
|
| + // This method is called just before the job is posted to the work thread.
|
| + virtual void FinishedWaitingForThread() {}
|
| +
|
| + // This method is called on the worker thread to do the job's work. On
|
| + // completion, implementors are expected to call OnJobCompleted() on
|
| + // |origin_loop|.
|
| + virtual void Run(MessageLoop* origin_loop) = 0;
|
| +
|
| + protected:
|
| + void OnJobCompleted() {
|
| + // |executor_| will be NULL if the executor has already been deleted.
|
| + if (executor_)
|
| + executor_->OnJobCompleted(this);
|
| + }
|
| +
|
| + void RunUserCallback(int result) {
|
| + DCHECK(has_user_callback());
|
| + CompletionCallback* callback = user_callback_;
|
| + // Null the callback so has_user_callback() will now return false.
|
| + user_callback_ = NULL;
|
| + callback->Run(result);
|
| + }
|
| +
|
| + friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
|
| +
|
| + virtual ~Job() {}
|
| +
|
| private:
|
| - friend class base::RefCountedThreadSafe<
|
| - SingleThreadedProxyResolver::SetPacScriptTask>;
|
| + const Type type_;
|
| + CompletionCallback* user_callback_;
|
| + Executor* executor_;
|
| + bool was_cancelled_;
|
| +};
|
|
|
| - ~SetPacScriptTask() {}
|
| +// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
|
|
|
| +// Runs on the worker thread to call ProxyResolver::SetPacScript.
|
| +class MultiThreadedProxyResolver::SetPacScriptJob
|
| + : public MultiThreadedProxyResolver::Job {
|
| + public:
|
| + SetPacScriptJob(const GURL& pac_url,
|
| + const string16& pac_script,
|
| + CompletionCallback* callback)
|
| + : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
|
| + callback),
|
| + pac_url_(pac_url),
|
| + pac_script_(pac_script) {
|
| + }
|
| +
|
| // Runs on the worker thread.
|
| - void DoRequest(ProxyResolver* resolver) {
|
| + virtual void Run(MessageLoop* origin_loop) {
|
| + ProxyResolver* resolver = executor()->resolver();
|
| int rv = resolver->expects_pac_bytes() ?
|
| resolver->SetPacScriptByData(pac_script_, NULL) :
|
| resolver->SetPacScriptByUrl(pac_url_, NULL);
|
|
|
| DCHECK_NE(rv, ERR_IO_PENDING);
|
| - origin_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &SetPacScriptTask::RequestComplete, rv));
|
| + origin_loop->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
|
| }
|
|
|
| + private:
|
| // Runs the completion callback on the origin thread.
|
| void RequestComplete(int result_code) {
|
| // The task may have been cancelled after it was started.
|
| - if (!was_cancelled()) {
|
| - CompletionCallback* callback = callback_;
|
| - coordinator_->RemoveOutstandingSetPacScriptTask(this);
|
| - callback->Run(result_code);
|
| + if (!was_cancelled() && has_user_callback()) {
|
| + RunUserCallback(result_code);
|
| }
|
| + OnJobCompleted();
|
| }
|
|
|
| - // Must only be used on the "origin" thread.
|
| - SingleThreadedProxyResolver* coordinator_;
|
| - CompletionCallback* callback_;
|
| - string16 pac_script_;
|
| - GURL pac_url_;
|
| -
|
| - // Usable from within DoQuery on the worker thread.
|
| - MessageLoop* origin_loop_;
|
| + const GURL pac_url_;
|
| + const string16 pac_script_;
|
| };
|
|
|
| -// SingleThreadedProxyResolver::Job -------------------------------------------
|
| +// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
|
|
|
| -class SingleThreadedProxyResolver::Job
|
| - : public base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job> {
|
| +class MultiThreadedProxyResolver::GetProxyForURLJob
|
| + : public MultiThreadedProxyResolver::Job {
|
| public:
|
| - // |coordinator| -- the SingleThreadedProxyResolver that owns this job.
|
| // |url| -- the URL of the query.
|
| // |results| -- the structure to fill with proxy resolve results.
|
| - Job(SingleThreadedProxyResolver* coordinator,
|
| - const GURL& url,
|
| - ProxyInfo* results,
|
| - CompletionCallback* callback,
|
| - const BoundNetLog& net_log)
|
| - : coordinator_(coordinator),
|
| - callback_(callback),
|
| - results_(results),
|
| - net_log_(net_log),
|
| - url_(url),
|
| - is_started_(false),
|
| - origin_loop_(MessageLoop::current()) {
|
| + GetProxyForURLJob(const GURL& url,
|
| + ProxyInfo* results,
|
| + CompletionCallback* callback,
|
| + const BoundNetLog& net_log)
|
| + : Job(TYPE_GET_PROXY_FOR_URL, callback),
|
| + results_(results),
|
| + net_log_(net_log),
|
| + url_(url),
|
| + was_waiting_for_thread_(false) {
|
| DCHECK(callback);
|
| }
|
|
|
| - // Start the resolve proxy request on the worker thread.
|
| - void Start() {
|
| - is_started_ = true;
|
| + BoundNetLog* net_log() { return &net_log_; }
|
|
|
| - size_t load_log_bound = 100;
|
| -
|
| - coordinator_->thread()->message_loop()->PostTask(
|
| - FROM_HERE, NewRunnableMethod(this, &Job::DoQuery,
|
| - coordinator_->resolver_.get(),
|
| - load_log_bound));
|
| + virtual void WaitingForThread() {
|
| + was_waiting_for_thread_ = true;
|
| + net_log_.BeginEvent(
|
| + NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
|
| }
|
|
|
| - bool is_started() const { return is_started_; }
|
| + virtual void FinishedWaitingForThread() {
|
| + DCHECK(executor());
|
|
|
| - void Cancel() {
|
| - // Clear these to inform QueryComplete that it should not try to
|
| - // access them.
|
| - coordinator_ = NULL;
|
| - callback_ = NULL;
|
| - results_ = NULL;
|
| + if (was_waiting_for_thread_) {
|
| + net_log_.EndEvent(
|
| + NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
|
| + }
|
| +
|
| + net_log_.AddEvent(
|
| + NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
|
| + new NetLogIntegerParameter(
|
| + "thread_number", executor()->thread_number()));
|
| }
|
|
|
| - // Returns true if Cancel() has been called.
|
| - bool was_cancelled() const { return callback_ == NULL; }
|
| -
|
| - BoundNetLog* net_log() { return &net_log_; }
|
| -
|
| - private:
|
| - friend class base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job>;
|
| -
|
| - ~Job() {}
|
| -
|
| // Runs on the worker thread.
|
| - void DoQuery(ProxyResolver* resolver, size_t load_log_bound) {
|
| - worker_log_.reset(new CapturingNetLog(load_log_bound));
|
| + virtual void Run(MessageLoop* origin_loop) {
|
| + const size_t kNetLogBound = 50u;
|
| + worker_log_.reset(new CapturingNetLog(kNetLogBound));
|
| BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get());
|
|
|
| - int rv = resolver->GetProxyForURL(url_, &results_buf_, NULL, NULL,
|
| - bound_worker_log);
|
| + ProxyResolver* resolver = executor()->resolver();
|
| + int rv = resolver->GetProxyForURL(
|
| + url_, &results_buf_, NULL, NULL, bound_worker_log);
|
| DCHECK_NE(rv, ERR_IO_PENDING);
|
|
|
| - origin_loop_->PostTask(FROM_HERE,
|
| - NewRunnableMethod(this, &Job::QueryComplete, rv));
|
| + origin_loop->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
|
| }
|
|
|
| + private:
|
| // Runs the completion callback on the origin thread.
|
| void QueryComplete(int result_code) {
|
| - // Merge the load log that was generated on the worker thread, into the
|
| - // main log.
|
| - CapturingBoundNetLog bound_worker_log(NetLog::Source(),
|
| - worker_log_.release());
|
| - bound_worker_log.AppendTo(net_log_);
|
| -
|
| // The Job may have been cancelled after it was started.
|
| if (!was_cancelled()) {
|
| + // Merge the load log that was generated on the worker thread, into the
|
| + // main log.
|
| + CapturingBoundNetLog bound_worker_log(NetLog::Source(),
|
| + worker_log_.release());
|
| + bound_worker_log.AppendTo(net_log_);
|
| +
|
| if (result_code >= OK) { // Note: unit-tests use values > 0.
|
| results_->Use(results_buf_);
|
| }
|
| - callback_->Run(result_code);
|
| -
|
| - // We check for cancellation once again, in case the callback deleted
|
| - // the owning ProxyService (whose destructor will in turn cancel us).
|
| - if (!was_cancelled())
|
| - coordinator_->RemoveFrontOfJobsQueueAndStartNext(this);
|
| + RunUserCallback(result_code);
|
| }
|
| + OnJobCompleted();
|
| }
|
|
|
| // Must only be used on the "origin" thread.
|
| - SingleThreadedProxyResolver* coordinator_;
|
| - CompletionCallback* callback_;
|
| ProxyInfo* results_;
|
| BoundNetLog net_log_;
|
| - GURL url_;
|
| - bool is_started_;
|
| + const GURL url_;
|
|
|
| // Usable from within DoQuery on the worker thread.
|
| ProxyInfo results_buf_;
|
| - MessageLoop* origin_loop_;
|
|
|
| // Used to pass the captured events between DoQuery [worker thread] and
|
| // QueryComplete [origin thread].
|
| scoped_ptr<CapturingNetLog> worker_log_;
|
| +
|
| + bool was_waiting_for_thread_;
|
| };
|
|
|
| -// SingleThreadedProxyResolver ------------------------------------------------
|
| +// MultiThreadedProxyResolver::Executor ----------------------------------------
|
|
|
| -SingleThreadedProxyResolver::SingleThreadedProxyResolver(
|
| - ProxyResolver* resolver)
|
| - : ProxyResolver(resolver->expects_pac_bytes()),
|
| +MultiThreadedProxyResolver::Executor::Executor(
|
| + MultiThreadedProxyResolver* coordinator,
|
| + ProxyResolver* resolver,
|
| + int thread_number)
|
| + : coordinator_(coordinator),
|
| + thread_number_(thread_number),
|
| resolver_(resolver) {
|
| + DCHECK(coordinator);
|
| + DCHECK(resolver);
|
| + // Start up the thread.
|
| + // Note that it is safe to pass a temporary C-String to Thread(), as it will
|
| + // make a copy.
|
| + std::string thread_name =
|
| + StringPrintf("PAC thread #%d", thread_number);
|
| + thread_.reset(new base::Thread(thread_name.c_str()));
|
| + thread_->Start();
|
| }
|
|
|
| -SingleThreadedProxyResolver::~SingleThreadedProxyResolver() {
|
| - // Cancel the inprogress job (if any), and free the rest.
|
| - for (PendingJobsQueue::iterator it = pending_jobs_.begin();
|
| - it != pending_jobs_.end();
|
| - ++it) {
|
| - (*it)->Cancel();
|
| +void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
|
| + DCHECK(!outstanding_job_);
|
| + outstanding_job_ = job;
|
| +
|
| + // Run the job. Once it has completed (regardless of whether it was
|
| + // cancelled), it will invoke OnJobCompleted() on this thread.
|
| + job->set_executor(this);
|
| + job->FinishedWaitingForThread();
|
| + thread_->message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
|
| +}
|
| +
|
| +void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
|
| + DCHECK_EQ(job, outstanding_job_.get());
|
| + outstanding_job_ = NULL;
|
| + coordinator_->OnExecutorReady(this);
|
| +}
|
| +
|
| +void MultiThreadedProxyResolver::Executor::Destroy() {
|
| + DCHECK(coordinator_);
|
| +
|
| + // Give the resolver an opportunity to shutdown from THIS THREAD before
|
| + // joining on the resolver thread. This allows certain implementations
|
| + // to avoid deadlocks.
|
| + resolver_->Shutdown();
|
| +
|
| + // Join the worker thread.
|
| + thread_.reset();
|
| +
|
| + // Cancel any outstanding job.
|
| + if (outstanding_job_) {
|
| + outstanding_job_->Cancel();
|
| + // Orphan the job (since this executor may be deleted soon).
|
| + outstanding_job_->set_executor(NULL);
|
| }
|
|
|
| - if (outstanding_set_pac_script_task_)
|
| - outstanding_set_pac_script_task_->Cancel();
|
| + // It is now safe to free the ProxyResolver, since all the tasks that
|
| + // were using it on the resolver thread have completed.
|
| + resolver_.reset();
|
|
|
| - // Note that |thread_| is destroyed before |resolver_|. This is important
|
| - // since |resolver_| could be running on |thread_|.
|
| + // Null some stuff as a precaution.
|
| + coordinator_ = NULL;
|
| + outstanding_job_ = NULL;
|
| }
|
|
|
| -int SingleThreadedProxyResolver::GetProxyForURL(const GURL& url,
|
| - ProxyInfo* results,
|
| - CompletionCallback* callback,
|
| - RequestHandle* request,
|
| - const BoundNetLog& net_log) {
|
| +void MultiThreadedProxyResolver::Executor::PurgeMemory() {
|
| + scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
|
| + thread_->message_loop()->PostTask(
|
| + FROM_HERE,
|
| + NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
|
| +}
|
| +
|
| +MultiThreadedProxyResolver::Executor::~Executor() {
|
| + // The important cleanup happens as part of Destroy(), which should always be
|
| + // called first.
|
| + DCHECK(!coordinator_) << "Destroy() was not called";
|
| + DCHECK(!thread_.get());
|
| + DCHECK(!resolver_.get());
|
| + DCHECK(!outstanding_job_);
|
| +}
|
| +
|
| +// MultiThreadedProxyResolver --------------------------------------------------
|
| +
|
| +MultiThreadedProxyResolver::MultiThreadedProxyResolver(
|
| + ProxyResolverFactory* resolver_factory,
|
| + size_t max_num_threads)
|
| + : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
|
| + resolver_factory_(resolver_factory),
|
| + max_num_threads_(max_num_threads),
|
| + was_set_pac_script_called_(false) {
|
| + DCHECK_GE(max_num_threads, 1u);
|
| +}
|
| +
|
| +MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
|
| + // We will cancel all outstanding requests.
|
| + pending_jobs_.clear();
|
| + ReleaseAllExecutors();
|
| +}
|
| +
|
| +int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
|
| + ProxyInfo* results,
|
| + CompletionCallback* callback,
|
| + RequestHandle* request,
|
| + const BoundNetLog& net_log) {
|
| + DCHECK(CalledOnValidThread());
|
| DCHECK(callback);
|
| + DCHECK(was_set_pac_script_called_)
|
| + << "Resolver is un-initialized. Must call SetPacScript() first!";
|
|
|
| - scoped_refptr<Job> job = new Job(this, url, results, callback, net_log);
|
| - bool is_first_job = pending_jobs_.empty();
|
| - pending_jobs_.push_back(job); // Jobs can never finish synchronously.
|
| + scoped_refptr<GetProxyForURLJob> job =
|
| + new GetProxyForURLJob(url, results, callback, net_log);
|
|
|
| - if (is_first_job) {
|
| - // If there is nothing already running, start the job now.
|
| - EnsureThreadStarted();
|
| - job->Start();
|
| - } else {
|
| - // Otherwise the job will get started eventually by ProcessPendingJobs().
|
| - job->net_log()->BeginEvent(
|
| - NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL);
|
| - }
|
| -
|
| // Completion will be notified through |callback|, unless the caller cancels
|
| // the request using |request|.
|
| if (request)
|
| *request = reinterpret_cast<RequestHandle>(job.get());
|
|
|
| + // If there is an executor that is ready to run this request, submit it!
|
| + Executor* executor = FindIdleExecutor();
|
| + if (executor) {
|
| + DCHECK_EQ(0u, pending_jobs_.size());
|
| + executor->StartJob(job);
|
| + return ERR_IO_PENDING;
|
| + }
|
| +
|
| + // Otherwise queue this request. (We will schedule it to a thread once one
|
| + // becomes available).
|
| + job->WaitingForThread();
|
| + pending_jobs_.push_back(job);
|
| +
|
| + // If we haven't already reached the thread limit, provision a new thread to
|
| + // drain the requests more quickly.
|
| + if (executors_.size() < max_num_threads_) {
|
| + executor = AddNewExecutor();
|
| + executor->StartJob(
|
| + new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL));
|
| + }
|
| +
|
| return ERR_IO_PENDING;
|
| }
|
|
|
| -// There are three states of the request we need to handle:
|
| -// (1) Not started (just sitting in the queue).
|
| -// (2) Executing Job::DoQuery in the worker thread.
|
| -// (3) Waiting for Job::QueryComplete to be run on the origin thread.
|
| -void SingleThreadedProxyResolver::CancelRequest(RequestHandle req) {
|
| +void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
|
| + DCHECK(CalledOnValidThread());
|
| DCHECK(req);
|
|
|
| Job* job = reinterpret_cast<Job*>(req);
|
| + DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
|
|
|
| - bool is_active_job = job->is_started() && !pending_jobs_.empty() &&
|
| - pending_jobs_.front().get() == job;
|
| + if (job->executor()) {
|
| + // If the job was already submitted to the executor, just mark it
|
| + // as cancelled so the user callback isn't run on completion.
|
| + job->Cancel();
|
| + } else {
|
| + // Otherwise the job is just sitting in a queue.
|
| + PendingJobsQueue::iterator it =
|
| + std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
|
| + DCHECK(it != pending_jobs_.end());
|
| + pending_jobs_.erase(it);
|
| + }
|
| +}
|
|
|
| - job->Cancel();
|
| +void MultiThreadedProxyResolver::CancelSetPacScript() {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(0u, pending_jobs_.size());
|
| + DCHECK_EQ(1u, executors_.size());
|
| + DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
|
| + executors_[0]->outstanding_job()->type());
|
|
|
| - if (is_active_job) {
|
| - RemoveFrontOfJobsQueueAndStartNext(job);
|
| - return;
|
| - }
|
| + // Defensively clear some data which shouldn't be getting used
|
| + // anymore.
|
| + was_set_pac_script_called_ = false;
|
| + current_pac_url_ = GURL();
|
| + current_pac_script_ = string16();
|
|
|
| - // Otherwise just delete the job from the queue.
|
| - PendingJobsQueue::iterator it = std::find(
|
| - pending_jobs_.begin(), pending_jobs_.end(), job);
|
| - DCHECK(it != pending_jobs_.end());
|
| - pending_jobs_.erase(it);
|
| + ReleaseAllExecutors();
|
| }
|
|
|
| -void SingleThreadedProxyResolver::CancelSetPacScript() {
|
| - DCHECK(outstanding_set_pac_script_task_);
|
| - outstanding_set_pac_script_task_->Cancel();
|
| - outstanding_set_pac_script_task_ = NULL;
|
| -}
|
| -
|
| -void SingleThreadedProxyResolver::PurgeMemory() {
|
| - if (thread_.get()) {
|
| - scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
|
| - thread_->message_loop()->PostTask(FROM_HERE,
|
| - NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
|
| +void MultiThreadedProxyResolver::PurgeMemory() {
|
| + DCHECK(CalledOnValidThread());
|
| + for (ExecutorList::iterator it = executors_.begin();
|
| + it != executors_.end(); ++it) {
|
| + Executor* executor = *it;
|
| + executor->PurgeMemory();
|
| }
|
| }
|
|
|
| -int SingleThreadedProxyResolver::SetPacScript(
|
| +int MultiThreadedProxyResolver::SetPacScript(
|
| const GURL& pac_url,
|
| const string16& pac_script,
|
| CompletionCallback* callback) {
|
| - EnsureThreadStarted();
|
| - DCHECK(!outstanding_set_pac_script_task_);
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK(callback);
|
|
|
| - SetPacScriptTask* task = new SetPacScriptTask(
|
| - this, pac_url, pac_script, callback);
|
| - outstanding_set_pac_script_task_ = task;
|
| - task->Start();
|
| + // Save the script details, so we can provision new executors later.
|
| + // (We rely on internal reference counting of strings to avoid this memory
|
| + // being duplicated by each of the resolver threads).
|
| + was_set_pac_script_called_ = true;
|
| + current_pac_url_ = pac_url;
|
| + current_pac_script_ = pac_script;
|
| +
|
| + // The user should not have any outstanding requests when they call
|
| + // SetPacScript().
|
| + CheckNoOutstandingUserRequests();
|
| +
|
| + // Destroy all of the current threads and their proxy resolvers.
|
| + ReleaseAllExecutors();
|
| +
|
| + // Provision a new executor, and run the SetPacScript request. On completion
|
| + // notification will be sent through |callback|.
|
| + Executor* executor = AddNewExecutor();
|
| + executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback));
|
| return ERR_IO_PENDING;
|
| }
|
|
|
| -void SingleThreadedProxyResolver::EnsureThreadStarted() {
|
| - if (!thread_.get()) {
|
| - thread_.reset(new base::Thread("pac-thread"));
|
| - thread_->Start();
|
| +void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
|
| + DCHECK(CalledOnValidThread());
|
| + CHECK_EQ(0u, pending_jobs_.size());
|
| +
|
| + for (ExecutorList::const_iterator it = executors_.begin();
|
| + it != executors_.end(); ++it) {
|
| + const Executor* executor = *it;
|
| + Job* job = executor->outstanding_job();
|
| + // The "has_user_callback()" is to exclude jobs for which the callback
|
| + // has already been invoked, or was not user-initiated (as in the case of
|
| + // lazy thread provisions). User-initiated jobs may !has_user_callback()
|
| + // when the callback has already been run. (Since we only clear the
|
| + // outstanding job AFTER the callback has been invoked, it is possible
|
| + // for a new request to be started from within the callback).
|
| + CHECK(!job || job->was_cancelled() || !job->has_user_callback());
|
| }
|
| }
|
|
|
| -void SingleThreadedProxyResolver::ProcessPendingJobs() {
|
| - if (pending_jobs_.empty())
|
| - return;
|
| +void MultiThreadedProxyResolver::ReleaseAllExecutors() {
|
| + DCHECK(CalledOnValidThread());
|
| + for (ExecutorList::iterator it = executors_.begin();
|
| + it != executors_.end(); ++it) {
|
| + Executor* executor = *it;
|
| + executor->Destroy();
|
| + }
|
| + executors_.clear();
|
| +}
|
|
|
| - // Get the next job to process (FIFO).
|
| - Job* job = pending_jobs_.front().get();
|
| - if (job->is_started())
|
| - return;
|
| +MultiThreadedProxyResolver::Executor*
|
| +MultiThreadedProxyResolver::FindIdleExecutor() {
|
| + DCHECK(CalledOnValidThread());
|
| + for (ExecutorList::iterator it = executors_.begin();
|
| + it != executors_.end(); ++it) {
|
| + Executor* executor = *it;
|
| + if (!executor->outstanding_job())
|
| + return executor;
|
| + }
|
| + return NULL;
|
| +}
|
|
|
| - job->net_log()->EndEvent(
|
| - NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL);
|
| -
|
| - EnsureThreadStarted();
|
| - job->Start();
|
| +MultiThreadedProxyResolver::Executor*
|
| +MultiThreadedProxyResolver::AddNewExecutor() {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_LT(executors_.size(), max_num_threads_);
|
| + // The "thread number" is used to give the thread a unique name.
|
| + int thread_number = executors_.size();
|
| + ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
|
| + Executor* executor = new Executor(
|
| + this, resolver, thread_number);
|
| + executors_.push_back(executor);
|
| + return executor;
|
| }
|
|
|
| -void SingleThreadedProxyResolver::RemoveFrontOfJobsQueueAndStartNext(
|
| - Job* expected_job) {
|
| - DCHECK_EQ(expected_job, pending_jobs_.front().get());
|
| +void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
|
| + DCHECK(CalledOnValidThread());
|
| + if (pending_jobs_.empty())
|
| + return;
|
| +
|
| + // Get the next job to process (FIFO). Transfer it from the pending queue
|
| + // to the executor.
|
| + scoped_refptr<Job> job = pending_jobs_.front();
|
| pending_jobs_.pop_front();
|
| -
|
| - // Start next work item.
|
| - ProcessPendingJobs();
|
| + executor->StartJob(job);
|
| }
|
|
|
| -void SingleThreadedProxyResolver::RemoveOutstandingSetPacScriptTask(
|
| - SetPacScriptTask* task) {
|
| - DCHECK_EQ(outstanding_set_pac_script_task_.get(), task);
|
| - outstanding_set_pac_script_task_ = NULL;
|
| -}
|
| -
|
| } // namespace net
|
|
|