Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(698)

Unified Diff: net/proxy/multi_threaded_proxy_resolver.cc

Issue 2945004: Revert 51877, since SpdyNetworkTransactionTest.CorruptFrameSessionError start... (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: Created 10 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/proxy/multi_threaded_proxy_resolver.h ('k') | net/proxy/multi_threaded_proxy_resolver_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/proxy/multi_threaded_proxy_resolver.cc
===================================================================
--- net/proxy/multi_threaded_proxy_resolver.cc (revision 51891)
+++ net/proxy/multi_threaded_proxy_resolver.cc (working copy)
@@ -1,588 +0,0 @@
-// Copyright (c) 2010 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "net/proxy/multi_threaded_proxy_resolver.h"
-
-#include "base/message_loop.h"
-#include "base/string_util.h"
-#include "base/thread.h"
-#include "net/base/capturing_net_log.h"
-#include "net/base/net_errors.h"
-#include "net/proxy/proxy_info.h"
-
-// TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
-// data when SetPacScript fails. That will reclaim memory when
-// testing bogus scripts.
-
-namespace net {
-
-namespace {
-
-class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
- public:
- explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
- void PurgeMemory() { resolver_->PurgeMemory(); }
- private:
- friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
- ~PurgeMemoryTask() {}
- ProxyResolver* resolver_;
-};
-
-} // namespace
-
-// An "executor" is a job-runner for PAC requests. It encapsulates a worker
-// thread and a synchronous ProxyResolver (which will be operated on said
-// thread.)
-class MultiThreadedProxyResolver::Executor
- : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
- public:
- // |coordinator| must remain valid throughout our lifetime. It is used to
- // signal when the executor is ready to receive work by calling
- // |coordinator->OnExecutorReady()|.
- // The constructor takes ownership of |resolver|.
- // |thread_number| is an identifier used when naming the worker thread.
- Executor(MultiThreadedProxyResolver* coordinator,
- ProxyResolver* resolver,
- int thread_number);
-
- // Submit a job to this executor.
- void StartJob(Job* job);
-
- // Callback for when a job has completed running on the executor's thread.
- void OnJobCompleted(Job* job);
-
- // Cleanup the executor. Cancels all outstanding work, and frees the thread
- // and resolver.
- void Destroy();
-
- void PurgeMemory();
-
- // Returns the outstanding job, or NULL.
- Job* outstanding_job() const { return outstanding_job_.get(); }
-
- ProxyResolver* resolver() { return resolver_.get(); }
-
- int thread_number() const { return thread_number_; }
-
- private:
- friend class base::RefCountedThreadSafe<Executor>;
- ~Executor();
-
- MultiThreadedProxyResolver* coordinator_;
- const int thread_number_;
-
- // The currently active job for this executor (either a SetPacScript or
- // GetProxyForURL task).
- scoped_refptr<Job> outstanding_job_;
-
- // The synchronous resolver implementation.
- scoped_ptr<ProxyResolver> resolver_;
-
- // The thread where |resolver_| is run on.
- // Note that declaration ordering is important here. |thread_| needs to be
- // destroyed *before* |resolver_|, in case |resolver_| is currently
- // executing on |thread_|.
- scoped_ptr<base::Thread> thread_;
-};
-
-// MultiThreadedProxyResolver::Job ---------------------------------------------
-
-class MultiThreadedProxyResolver::Job
- : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
- public:
- // Identifies the subclass of Job (only being used for debugging purposes).
- enum Type {
- TYPE_GET_PROXY_FOR_URL,
- TYPE_SET_PAC_SCRIPT,
- TYPE_SET_PAC_SCRIPT_INTERNAL,
- };
-
- Job(Type type, CompletionCallback* user_callback)
- : type_(type),
- user_callback_(user_callback),
- executor_(NULL),
- was_cancelled_(false) {
- }
-
- void set_executor(Executor* executor) {
- executor_ = executor;
- }
-
- // The "executor" is the job runner that is scheduling this job. If
- // this job has not been submitted to an executor yet, this will be
- // NULL (and we know it hasn't started yet).
- Executor* executor() {
- return executor_;
- }
-
- // Mark the job as having been cancelled.
- void Cancel() {
- was_cancelled_ = true;
- }
-
- // Returns true if Cancel() has been called.
- bool was_cancelled() const { return was_cancelled_; }
-
- Type type() const { return type_; }
-
- // Returns true if this job still has a user callback. Some jobs
- // do not have a user callback, because they were helper jobs
- // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
- //
- // Otherwise jobs that correspond with user-initiated work will
- // have a non-NULL callback up until the callback is run.
- bool has_user_callback() const { return user_callback_ != NULL; }
-
- // This method is called when the job is inserted into a wait queue
- // because no executors were ready to accept it.
- virtual void WaitingForThread() {}
-
- // This method is called just before the job is posted to the work thread.
- virtual void FinishedWaitingForThread() {}
-
- // This method is called on the worker thread to do the job's work. On
- // completion, implementors are expected to call OnJobCompleted() on
- // |origin_loop|.
- virtual void Run(MessageLoop* origin_loop) = 0;
-
- protected:
- void OnJobCompleted() {
- // |executor_| will be NULL if the executor has already been deleted.
- if (executor_)
- executor_->OnJobCompleted(this);
- }
-
- void RunUserCallback(int result) {
- DCHECK(has_user_callback());
- CompletionCallback* callback = user_callback_;
- // Null the callback so has_user_callback() will now return false.
- user_callback_ = NULL;
- callback->Run(result);
- }
-
- friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
-
- virtual ~Job() {}
-
- private:
- const Type type_;
- CompletionCallback* user_callback_;
- Executor* executor_;
- bool was_cancelled_;
-};
-
-// MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
-
-// Runs on the worker thread to call ProxyResolver::SetPacScript.
-class MultiThreadedProxyResolver::SetPacScriptJob
- : public MultiThreadedProxyResolver::Job {
- public:
- SetPacScriptJob(const GURL& pac_url,
- const string16& pac_script,
- CompletionCallback* callback)
- : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
- callback),
- pac_url_(pac_url),
- pac_script_(pac_script) {
- }
-
- // Runs on the worker thread.
- virtual void Run(MessageLoop* origin_loop) {
- ProxyResolver* resolver = executor()->resolver();
- int rv = resolver->expects_pac_bytes() ?
- resolver->SetPacScriptByData(pac_script_, NULL) :
- resolver->SetPacScriptByUrl(pac_url_, NULL);
-
- DCHECK_NE(rv, ERR_IO_PENDING);
- origin_loop->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
- }
-
- private:
- // Runs the completion callback on the origin thread.
- void RequestComplete(int result_code) {
- // The task may have been cancelled after it was started.
- if (!was_cancelled() && has_user_callback()) {
- RunUserCallback(result_code);
- }
- OnJobCompleted();
- }
-
- const GURL pac_url_;
- const string16 pac_script_;
-};
-
-// MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
-
-class MultiThreadedProxyResolver::GetProxyForURLJob
- : public MultiThreadedProxyResolver::Job {
- public:
- // |url| -- the URL of the query.
- // |results| -- the structure to fill with proxy resolve results.
- GetProxyForURLJob(const GURL& url,
- ProxyInfo* results,
- CompletionCallback* callback,
- const BoundNetLog& net_log)
- : Job(TYPE_GET_PROXY_FOR_URL, callback),
- results_(results),
- net_log_(net_log),
- url_(url),
- was_waiting_for_thread_(false) {
- DCHECK(callback);
- }
-
- BoundNetLog* net_log() { return &net_log_; }
-
- virtual void WaitingForThread() {
- was_waiting_for_thread_ = true;
- net_log_.BeginEvent(
- NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
- }
-
- virtual void FinishedWaitingForThread() {
- DCHECK(executor());
-
- if (was_waiting_for_thread_) {
- net_log_.EndEvent(
- NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
- }
-
- net_log_.AddEvent(
- NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
- new NetLogIntegerParameter(
- "thread_number", executor()->thread_number()));
- }
-
- // Runs on the worker thread.
- virtual void Run(MessageLoop* origin_loop) {
- const size_t kNetLogBound = 50u;
- worker_log_.reset(new CapturingNetLog(kNetLogBound));
- BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get());
-
- ProxyResolver* resolver = executor()->resolver();
- int rv = resolver->GetProxyForURL(
- url_, &results_buf_, NULL, NULL, bound_worker_log);
- DCHECK_NE(rv, ERR_IO_PENDING);
-
- origin_loop->PostTask(
- FROM_HERE,
- NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
- }
-
- private:
- // Runs the completion callback on the origin thread.
- void QueryComplete(int result_code) {
- // The Job may have been cancelled after it was started.
- if (!was_cancelled()) {
- // Merge the load log that was generated on the worker thread, into the
- // main log.
- CapturingBoundNetLog bound_worker_log(NetLog::Source(),
- worker_log_.release());
- bound_worker_log.AppendTo(net_log_);
-
- if (result_code >= OK) { // Note: unit-tests use values > 0.
- results_->Use(results_buf_);
- }
- RunUserCallback(result_code);
- }
- OnJobCompleted();
- }
-
- // Must only be used on the "origin" thread.
- ProxyInfo* results_;
- BoundNetLog net_log_;
- const GURL url_;
-
- // Usable from within DoQuery on the worker thread.
- ProxyInfo results_buf_;
-
- // Used to pass the captured events between DoQuery [worker thread] and
- // QueryComplete [origin thread].
- scoped_ptr<CapturingNetLog> worker_log_;
-
- bool was_waiting_for_thread_;
-};
-
-// MultiThreadedProxyResolver::Executor ----------------------------------------
-
-MultiThreadedProxyResolver::Executor::Executor(
- MultiThreadedProxyResolver* coordinator,
- ProxyResolver* resolver,
- int thread_number)
- : coordinator_(coordinator),
- thread_number_(thread_number),
- resolver_(resolver) {
- DCHECK(coordinator);
- DCHECK(resolver);
- // Start up the thread.
- // Note that it is safe to pass a temporary C-String to Thread(), as it will
- // make a copy.
- std::string thread_name =
- StringPrintf("PAC thread #%d", thread_number);
- thread_.reset(new base::Thread(thread_name.c_str()));
- thread_->Start();
-}
-
-void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
- DCHECK(!outstanding_job_);
- outstanding_job_ = job;
-
- // Run the job. Once it has completed (regardless of whether it was
- // cancelled), it will invoke OnJobCompleted() on this thread.
- job->set_executor(this);
- job->FinishedWaitingForThread();
- thread_->message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
-}
-
-void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
- DCHECK_EQ(job, outstanding_job_.get());
- outstanding_job_ = NULL;
- coordinator_->OnExecutorReady(this);
-}
-
-void MultiThreadedProxyResolver::Executor::Destroy() {
- DCHECK(coordinator_);
-
- // Give the resolver an opportunity to shutdown from THIS THREAD before
- // joining on the resolver thread. This allows certain implementations
- // to avoid deadlocks.
- resolver_->Shutdown();
-
- // Join the worker thread.
- thread_.reset();
-
- // Cancel any outstanding job.
- if (outstanding_job_) {
- outstanding_job_->Cancel();
- // Orphan the job (since this executor may be deleted soon).
- outstanding_job_->set_executor(NULL);
- }
-
- // It is now safe to free the ProxyResolver, since all the tasks that
- // were using it on the resolver thread have completed.
- resolver_.reset();
-
- // Null some stuff as a precaution.
- coordinator_ = NULL;
- outstanding_job_ = NULL;
-}
-
-void MultiThreadedProxyResolver::Executor::PurgeMemory() {
- scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
- thread_->message_loop()->PostTask(
- FROM_HERE,
- NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
-}
-
-MultiThreadedProxyResolver::Executor::~Executor() {
- // The important cleanup happens as part of Destroy(), which should always be
- // called first.
- DCHECK(!coordinator_) << "Destroy() was not called";
- DCHECK(!thread_.get());
- DCHECK(!resolver_.get());
- DCHECK(!outstanding_job_);
-}
-
-// MultiThreadedProxyResolver --------------------------------------------------
-
-MultiThreadedProxyResolver::MultiThreadedProxyResolver(
- ProxyResolverFactory* resolver_factory,
- size_t max_num_threads)
- : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
- resolver_factory_(resolver_factory),
- max_num_threads_(max_num_threads),
- was_set_pac_script_called_(false) {
- DCHECK_GE(max_num_threads, 1u);
-}
-
-MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
- // We will cancel all outstanding requests.
- pending_jobs_.clear();
- ReleaseAllExecutors();
-}
-
-int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
- ProxyInfo* results,
- CompletionCallback* callback,
- RequestHandle* request,
- const BoundNetLog& net_log) {
- DCHECK(CalledOnValidThread());
- DCHECK(callback);
- DCHECK(was_set_pac_script_called_)
- << "Resolver is un-initialized. Must call SetPacScript() first!";
-
- scoped_refptr<GetProxyForURLJob> job =
- new GetProxyForURLJob(url, results, callback, net_log);
-
- // Completion will be notified through |callback|, unless the caller cancels
- // the request using |request|.
- if (request)
- *request = reinterpret_cast<RequestHandle>(job.get());
-
- // If there is an executor that is ready to run this request, submit it!
- Executor* executor = FindIdleExecutor();
- if (executor) {
- DCHECK_EQ(0u, pending_jobs_.size());
- executor->StartJob(job);
- return ERR_IO_PENDING;
- }
-
- // Otherwise queue this request. (We will schedule it to a thread once one
- // becomes available).
- job->WaitingForThread();
- pending_jobs_.push_back(job);
-
- // If we haven't already reached the thread limit, provision a new thread to
- // drain the requests more quickly.
- if (executors_.size() < max_num_threads_) {
- executor = AddNewExecutor();
- executor->StartJob(
- new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL));
- }
-
- return ERR_IO_PENDING;
-}
-
-void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
- DCHECK(CalledOnValidThread());
- DCHECK(req);
-
- Job* job = reinterpret_cast<Job*>(req);
- DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
-
- if (job->executor()) {
- // If the job was already submitted to the executor, just mark it
- // as cancelled so the user callback isn't run on completion.
- job->Cancel();
- } else {
- // Otherwise the job is just sitting in a queue.
- PendingJobsQueue::iterator it =
- std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
- DCHECK(it != pending_jobs_.end());
- pending_jobs_.erase(it);
- }
-}
-
-void MultiThreadedProxyResolver::CancelSetPacScript() {
- DCHECK(CalledOnValidThread());
- DCHECK_EQ(0u, pending_jobs_.size());
- DCHECK_EQ(1u, executors_.size());
- DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
- executors_[0]->outstanding_job()->type());
-
- // Defensively clear some data which shouldn't be getting used
- // anymore.
- was_set_pac_script_called_ = false;
- current_pac_url_ = GURL();
- current_pac_script_ = string16();
-
- ReleaseAllExecutors();
-}
-
-void MultiThreadedProxyResolver::PurgeMemory() {
- DCHECK(CalledOnValidThread());
- for (ExecutorList::iterator it = executors_.begin();
- it != executors_.end(); ++it) {
- Executor* executor = *it;
- executor->PurgeMemory();
- }
-}
-
-int MultiThreadedProxyResolver::SetPacScript(
- const GURL& pac_url,
- const string16& pac_script,
- CompletionCallback* callback) {
- DCHECK(CalledOnValidThread());
- DCHECK(callback);
-
- // Save the script details, so we can provision new executors later.
- // (We rely on internal reference counting of strings to avoid this memory
- // being duplicated by each of the resolver threads).
- was_set_pac_script_called_ = true;
- current_pac_url_ = pac_url;
- current_pac_script_ = pac_script;
-
- // The user should not have any outstanding requests when they call
- // SetPacScript().
- CheckNoOutstandingUserRequests();
-
- // Destroy all of the current threads and their proxy resolvers.
- ReleaseAllExecutors();
-
- // Provision a new executor, and run the SetPacScript request. On completion
- // notification will be sent through |callback|.
- Executor* executor = AddNewExecutor();
- executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback));
- return ERR_IO_PENDING;
-}
-
-void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
- DCHECK(CalledOnValidThread());
- CHECK_EQ(0u, pending_jobs_.size());
-
- for (ExecutorList::const_iterator it = executors_.begin();
- it != executors_.end(); ++it) {
- const Executor* executor = *it;
- Job* job = executor->outstanding_job();
- // The "has_user_callback()" is to exclude jobs for which the callback
- // has already been invoked, or was not user-initiated (as in the case of
- // lazy thread provisions). User-initiated jobs may !has_user_callback()
- // when the callback has already been run. (Since we only clear the
- // outstanding job AFTER the callback has been invoked, it is possible
- // for a new request to be started from within the callback).
- CHECK(!job || job->was_cancelled() || !job->has_user_callback());
- }
-}
-
-void MultiThreadedProxyResolver::ReleaseAllExecutors() {
- DCHECK(CalledOnValidThread());
- for (ExecutorList::iterator it = executors_.begin();
- it != executors_.end(); ++it) {
- Executor* executor = *it;
- executor->Destroy();
- }
- executors_.clear();
-}
-
-MultiThreadedProxyResolver::Executor*
-MultiThreadedProxyResolver::FindIdleExecutor() {
- DCHECK(CalledOnValidThread());
- for (ExecutorList::iterator it = executors_.begin();
- it != executors_.end(); ++it) {
- Executor* executor = *it;
- if (!executor->outstanding_job())
- return executor;
- }
- return NULL;
-}
-
-MultiThreadedProxyResolver::Executor*
-MultiThreadedProxyResolver::AddNewExecutor() {
- DCHECK(CalledOnValidThread());
- DCHECK_LT(executors_.size(), max_num_threads_);
- // The "thread number" is used to give the thread a unique name.
- int thread_number = executors_.size();
- ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
- Executor* executor = new Executor(
- this, resolver, thread_number);
- executors_.push_back(executor);
- return executor;
-}
-
-void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
- DCHECK(CalledOnValidThread());
- if (pending_jobs_.empty())
- return;
-
- // Get the next job to process (FIFO). Transfer it from the pending queue
- // to the executor.
- scoped_refptr<Job> job = pending_jobs_.front();
- pending_jobs_.pop_front();
- executor->StartJob(job);
-}
-
-} // namespace net
« no previous file with comments | « net/proxy/multi_threaded_proxy_resolver.h ('k') | net/proxy/multi_threaded_proxy_resolver_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698