Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: net/proxy/multi_threaded_proxy_resolver.cc

Issue 2822043: Add the capability to run multiple proxy PAC scripts in parallel.... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: Re-upload after revert Created 10 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/proxy/single_threaded_proxy_resolver.h" 5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6 6
7 #include "base/message_loop.h" 7 #include "base/message_loop.h"
8 #include "base/string_util.h"
8 #include "base/thread.h" 9 #include "base/thread.h"
9 #include "net/base/capturing_net_log.h" 10 #include "net/base/capturing_net_log.h"
10 #include "net/base/net_errors.h" 11 #include "net/base/net_errors.h"
11 #include "net/proxy/proxy_info.h" 12 #include "net/proxy/proxy_info.h"
12 13
14 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
15 // data when SetPacScript fails. That will reclaim memory when
16 // testing bogus scripts.
17
13 namespace net { 18 namespace net {
14 19
15 namespace { 20 namespace {
16 21
17 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> { 22 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
18 public: 23 public:
19 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {} 24 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
20 void PurgeMemory() { resolver_->PurgeMemory(); } 25 void PurgeMemory() { resolver_->PurgeMemory(); }
21 private: 26 private:
22 friend class base::RefCountedThreadSafe<PurgeMemoryTask>; 27 friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
23 ~PurgeMemoryTask() {} 28 ~PurgeMemoryTask() {}
24 ProxyResolver* resolver_; 29 ProxyResolver* resolver_;
25 }; 30 };
26 31
27 } // namespace 32 } // namespace
28 33
29 // SingleThreadedProxyResolver::SetPacScriptTask ------------------------------ 34 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
35 // thread and a synchronous ProxyResolver (which will be operated on said
36 // thread.)
37 class MultiThreadedProxyResolver::Executor
38 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
39 public:
40 // |coordinator| must remain valid throughout our lifetime. It is used to
41 // signal when the executor is ready to receive work by calling
42 // |coordinator->OnExecutorReady()|.
43 // The constructor takes ownership of |resolver|.
44 // |thread_number| is an identifier used when naming the worker thread.
45 Executor(MultiThreadedProxyResolver* coordinator,
46 ProxyResolver* resolver,
47 int thread_number);
48
49 // Submit a job to this executor.
50 void StartJob(Job* job);
51
52 // Callback for when a job has completed running on the executor's thread.
53 void OnJobCompleted(Job* job);
54
55 // Cleanup the executor. Cancels all outstanding work, and frees the thread
56 // and resolver.
57 void Destroy();
58
59 void PurgeMemory();
60
61 // Returns the outstanding job, or NULL.
62 Job* outstanding_job() const { return outstanding_job_.get(); }
63
64 ProxyResolver* resolver() { return resolver_.get(); }
65
66 int thread_number() const { return thread_number_; }
67
68 private:
69 friend class base::RefCountedThreadSafe<Executor>;
70 ~Executor();
71
72 MultiThreadedProxyResolver* coordinator_;
73 const int thread_number_;
74
75 // The currently active job for this executor (either a SetPacScript or
76 // GetProxyForURL task).
77 scoped_refptr<Job> outstanding_job_;
78
79 // The synchronous resolver implementation.
80 scoped_ptr<ProxyResolver> resolver_;
81
82 // The thread where |resolver_| is run on.
83 // Note that declaration ordering is important here. |thread_| needs to be
84 // destroyed *before* |resolver_|, in case |resolver_| is currently
85 // executing on |thread_|.
86 scoped_ptr<base::Thread> thread_;
87 };
88
89 // MultiThreadedProxyResolver::Job ---------------------------------------------
90
91 class MultiThreadedProxyResolver::Job
92 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
93 public:
94 // Identifies the subclass of Job (only being used for debugging purposes).
95 enum Type {
96 TYPE_GET_PROXY_FOR_URL,
97 TYPE_SET_PAC_SCRIPT,
98 TYPE_SET_PAC_SCRIPT_INTERNAL,
99 };
100
101 Job(Type type, CompletionCallback* user_callback)
102 : type_(type),
103 user_callback_(user_callback),
104 executor_(NULL),
105 was_cancelled_(false) {
106 }
107
108 void set_executor(Executor* executor) {
109 executor_ = executor;
110 }
111
112 // The "executor" is the job runner that is scheduling this job. If
113 // this job has not been submitted to an executor yet, this will be
114 // NULL (and we know it hasn't started yet).
115 Executor* executor() {
116 return executor_;
117 }
118
119 // Mark the job as having been cancelled.
120 void Cancel() {
121 was_cancelled_ = true;
122 }
123
124 // Returns true if Cancel() has been called.
125 bool was_cancelled() const { return was_cancelled_; }
126
127 Type type() const { return type_; }
128
129 // Returns true if this job still has a user callback. Some jobs
130 // do not have a user callback, because they were helper jobs
131 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
132 //
133 // Otherwise jobs that correspond with user-initiated work will
134 // have a non-NULL callback up until the callback is run.
135 bool has_user_callback() const { return user_callback_ != NULL; }
136
137 // This method is called when the job is inserted into a wait queue
138 // because no executors were ready to accept it.
139 virtual void WaitingForThread() {}
140
141 // This method is called just before the job is posted to the work thread.
142 virtual void FinishedWaitingForThread() {}
143
144 // This method is called on the worker thread to do the job's work. On
145 // completion, implementors are expected to call OnJobCompleted() on
146 // |origin_loop|.
147 virtual void Run(MessageLoop* origin_loop) = 0;
148
149 protected:
150 void OnJobCompleted() {
151 // |executor_| will be NULL if the executor has already been deleted.
152 if (executor_)
153 executor_->OnJobCompleted(this);
154 }
155
156 void RunUserCallback(int result) {
157 DCHECK(has_user_callback());
158 CompletionCallback* callback = user_callback_;
159 // Null the callback so has_user_callback() will now return false.
160 user_callback_ = NULL;
161 callback->Run(result);
162 }
163
164 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
165
166 virtual ~Job() {}
167
168 private:
169 const Type type_;
170 CompletionCallback* user_callback_;
171 Executor* executor_;
172 bool was_cancelled_;
173 };
174
175 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
30 176
31 // Runs on the worker thread to call ProxyResolver::SetPacScript. 177 // Runs on the worker thread to call ProxyResolver::SetPacScript.
32 class SingleThreadedProxyResolver::SetPacScriptTask 178 class MultiThreadedProxyResolver::SetPacScriptJob
33 : public base::RefCountedThreadSafe< 179 : public MultiThreadedProxyResolver::Job {
34 SingleThreadedProxyResolver::SetPacScriptTask> {
35 public: 180 public:
36 SetPacScriptTask(SingleThreadedProxyResolver* coordinator, 181 SetPacScriptJob(const GURL& pac_url,
37 const GURL& pac_url, 182 const string16& pac_script,
38 const string16& pac_script, 183 CompletionCallback* callback)
39 CompletionCallback* callback) 184 : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
40 : coordinator_(coordinator), 185 callback),
41 callback_(callback),
42 pac_script_(pac_script),
43 pac_url_(pac_url), 186 pac_url_(pac_url),
44 origin_loop_(MessageLoop::current()) { 187 pac_script_(pac_script) {
45 DCHECK(callback); 188 }
46 }
47
48 // Start the SetPacScript request on the worker thread.
49 void Start() {
50 coordinator_->thread()->message_loop()->PostTask(
51 FROM_HERE, NewRunnableMethod(this, &SetPacScriptTask::DoRequest,
52 coordinator_->resolver_.get()));
53 }
54
55 void Cancel() {
56 // Clear these to inform RequestComplete that it should not try to
57 // access them.
58 coordinator_ = NULL;
59 callback_ = NULL;
60 }
61
62 // Returns true if Cancel() has been called.
63 bool was_cancelled() const { return callback_ == NULL; }
64
65 private:
66 friend class base::RefCountedThreadSafe<
67 SingleThreadedProxyResolver::SetPacScriptTask>;
68
69 ~SetPacScriptTask() {}
70 189
71 // Runs on the worker thread. 190 // Runs on the worker thread.
72 void DoRequest(ProxyResolver* resolver) { 191 virtual void Run(MessageLoop* origin_loop) {
192 ProxyResolver* resolver = executor()->resolver();
73 int rv = resolver->expects_pac_bytes() ? 193 int rv = resolver->expects_pac_bytes() ?
74 resolver->SetPacScriptByData(pac_script_, NULL) : 194 resolver->SetPacScriptByData(pac_script_, NULL) :
75 resolver->SetPacScriptByUrl(pac_url_, NULL); 195 resolver->SetPacScriptByUrl(pac_url_, NULL);
76 196
77 DCHECK_NE(rv, ERR_IO_PENDING); 197 DCHECK_NE(rv, ERR_IO_PENDING);
78 origin_loop_->PostTask(FROM_HERE, 198 origin_loop->PostTask(
79 NewRunnableMethod(this, &SetPacScriptTask::RequestComplete, rv)); 199 FROM_HERE,
80 } 200 NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
81 201 }
202
203 private:
82 // Runs the completion callback on the origin thread. 204 // Runs the completion callback on the origin thread.
83 void RequestComplete(int result_code) { 205 void RequestComplete(int result_code) {
84 // The task may have been cancelled after it was started. 206 // The task may have been cancelled after it was started.
85 if (!was_cancelled()) { 207 if (!was_cancelled() && has_user_callback()) {
86 CompletionCallback* callback = callback_; 208 RunUserCallback(result_code);
87 coordinator_->RemoveOutstandingSetPacScriptTask(this);
88 callback->Run(result_code);
89 } 209 }
90 } 210 OnJobCompleted();
91 211 }
92 // Must only be used on the "origin" thread. 212
93 SingleThreadedProxyResolver* coordinator_; 213 const GURL pac_url_;
94 CompletionCallback* callback_; 214 const string16 pac_script_;
95 string16 pac_script_;
96 GURL pac_url_;
97
98 // Usable from within DoQuery on the worker thread.
99 MessageLoop* origin_loop_;
100 }; 215 };
101 216
102 // SingleThreadedProxyResolver::Job ------------------------------------------- 217 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
103 218
104 class SingleThreadedProxyResolver::Job 219 class MultiThreadedProxyResolver::GetProxyForURLJob
105 : public base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job> { 220 : public MultiThreadedProxyResolver::Job {
106 public: 221 public:
107 // |coordinator| -- the SingleThreadedProxyResolver that owns this job.
108 // |url| -- the URL of the query. 222 // |url| -- the URL of the query.
109 // |results| -- the structure to fill with proxy resolve results. 223 // |results| -- the structure to fill with proxy resolve results.
110 Job(SingleThreadedProxyResolver* coordinator, 224 GetProxyForURLJob(const GURL& url,
111 const GURL& url, 225 ProxyInfo* results,
112 ProxyInfo* results, 226 CompletionCallback* callback,
113 CompletionCallback* callback, 227 const BoundNetLog& net_log)
114 const BoundNetLog& net_log) 228 : Job(TYPE_GET_PROXY_FOR_URL, callback),
115 : coordinator_(coordinator), 229 results_(results),
116 callback_(callback), 230 net_log_(net_log),
117 results_(results), 231 url_(url),
118 net_log_(net_log), 232 was_waiting_for_thread_(false) {
119 url_(url),
120 is_started_(false),
121 origin_loop_(MessageLoop::current()) {
122 DCHECK(callback); 233 DCHECK(callback);
123 } 234 }
124 235
125 // Start the resolve proxy request on the worker thread.
126 void Start() {
127 is_started_ = true;
128
129 size_t load_log_bound = 100;
130
131 coordinator_->thread()->message_loop()->PostTask(
132 FROM_HERE, NewRunnableMethod(this, &Job::DoQuery,
133 coordinator_->resolver_.get(),
134 load_log_bound));
135 }
136
137 bool is_started() const { return is_started_; }
138
139 void Cancel() {
140 // Clear these to inform QueryComplete that it should not try to
141 // access them.
142 coordinator_ = NULL;
143 callback_ = NULL;
144 results_ = NULL;
145 }
146
147 // Returns true if Cancel() has been called.
148 bool was_cancelled() const { return callback_ == NULL; }
149
150 BoundNetLog* net_log() { return &net_log_; } 236 BoundNetLog* net_log() { return &net_log_; }
151 237
238 virtual void WaitingForThread() {
239 was_waiting_for_thread_ = true;
240 net_log_.BeginEvent(
241 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
242 }
243
244 virtual void FinishedWaitingForThread() {
245 DCHECK(executor());
246
247 if (was_waiting_for_thread_) {
248 net_log_.EndEvent(
249 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
250 }
251
252 net_log_.AddEvent(
253 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
254 new NetLogIntegerParameter(
255 "thread_number", executor()->thread_number()));
256 }
257
258 // Runs on the worker thread.
259 virtual void Run(MessageLoop* origin_loop) {
260 const size_t kNetLogBound = 50u;
261 worker_log_.reset(new CapturingNetLog(kNetLogBound));
262 BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get());
263
264 ProxyResolver* resolver = executor()->resolver();
265 int rv = resolver->GetProxyForURL(
266 url_, &results_buf_, NULL, NULL, bound_worker_log);
267 DCHECK_NE(rv, ERR_IO_PENDING);
268
269 origin_loop->PostTask(
270 FROM_HERE,
271 NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
272 }
273
152 private: 274 private:
153 friend class base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job>;
154
155 ~Job() {}
156
157 // Runs on the worker thread.
158 void DoQuery(ProxyResolver* resolver, size_t load_log_bound) {
159 worker_log_.reset(new CapturingNetLog(load_log_bound));
160 BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get());
161
162 int rv = resolver->GetProxyForURL(url_, &results_buf_, NULL, NULL,
163 bound_worker_log);
164 DCHECK_NE(rv, ERR_IO_PENDING);
165
166 origin_loop_->PostTask(FROM_HERE,
167 NewRunnableMethod(this, &Job::QueryComplete, rv));
168 }
169
170 // Runs the completion callback on the origin thread. 275 // Runs the completion callback on the origin thread.
171 void QueryComplete(int result_code) { 276 void QueryComplete(int result_code) {
172 // Merge the load log that was generated on the worker thread, into the
173 // main log.
174 CapturingBoundNetLog bound_worker_log(NetLog::Source(),
175 worker_log_.release());
176 bound_worker_log.AppendTo(net_log_);
177
178 // The Job may have been cancelled after it was started. 277 // The Job may have been cancelled after it was started.
179 if (!was_cancelled()) { 278 if (!was_cancelled()) {
279 // Merge the load log that was generated on the worker thread, into the
280 // main log.
281 CapturingBoundNetLog bound_worker_log(NetLog::Source(),
282 worker_log_.release());
283 bound_worker_log.AppendTo(net_log_);
284
180 if (result_code >= OK) { // Note: unit-tests use values > 0. 285 if (result_code >= OK) { // Note: unit-tests use values > 0.
181 results_->Use(results_buf_); 286 results_->Use(results_buf_);
182 } 287 }
183 callback_->Run(result_code); 288 RunUserCallback(result_code);
184
185 // We check for cancellation once again, in case the callback deleted
186 // the owning ProxyService (whose destructor will in turn cancel us).
187 if (!was_cancelled())
188 coordinator_->RemoveFrontOfJobsQueueAndStartNext(this);
189 } 289 }
290 OnJobCompleted();
190 } 291 }
191 292
192 // Must only be used on the "origin" thread. 293 // Must only be used on the "origin" thread.
193 SingleThreadedProxyResolver* coordinator_;
194 CompletionCallback* callback_;
195 ProxyInfo* results_; 294 ProxyInfo* results_;
196 BoundNetLog net_log_; 295 BoundNetLog net_log_;
197 GURL url_; 296 const GURL url_;
198 bool is_started_;
199 297
200 // Usable from within DoQuery on the worker thread. 298 // Usable from within DoQuery on the worker thread.
201 ProxyInfo results_buf_; 299 ProxyInfo results_buf_;
202 MessageLoop* origin_loop_;
203 300
204 // Used to pass the captured events between DoQuery [worker thread] and 301 // Used to pass the captured events between DoQuery [worker thread] and
205 // QueryComplete [origin thread]. 302 // QueryComplete [origin thread].
206 scoped_ptr<CapturingNetLog> worker_log_; 303 scoped_ptr<CapturingNetLog> worker_log_;
304
305 bool was_waiting_for_thread_;
207 }; 306 };
208 307
209 // SingleThreadedProxyResolver ------------------------------------------------ 308 // MultiThreadedProxyResolver::Executor ----------------------------------------
210 309
211 SingleThreadedProxyResolver::SingleThreadedProxyResolver( 310 MultiThreadedProxyResolver::Executor::Executor(
212 ProxyResolver* resolver) 311 MultiThreadedProxyResolver* coordinator,
213 : ProxyResolver(resolver->expects_pac_bytes()), 312 ProxyResolver* resolver,
313 int thread_number)
314 : coordinator_(coordinator),
315 thread_number_(thread_number),
214 resolver_(resolver) { 316 resolver_(resolver) {
215 } 317 DCHECK(coordinator);
216 318 DCHECK(resolver);
217 SingleThreadedProxyResolver::~SingleThreadedProxyResolver() { 319 // Start up the thread.
218 // Cancel the inprogress job (if any), and free the rest. 320 // Note that it is safe to pass a temporary C-String to Thread(), as it will
219 for (PendingJobsQueue::iterator it = pending_jobs_.begin(); 321 // make a copy.
220 it != pending_jobs_.end(); 322 std::string thread_name =
221 ++it) { 323 StringPrintf("PAC thread #%d", thread_number);
222 (*it)->Cancel(); 324 thread_.reset(new base::Thread(thread_name.c_str()));
223 } 325 thread_->Start();
224 326 }
225 if (outstanding_set_pac_script_task_) 327
226 outstanding_set_pac_script_task_->Cancel(); 328 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
227 329 DCHECK(!outstanding_job_);
228 // Note that |thread_| is destroyed before |resolver_|. This is important 330 outstanding_job_ = job;
229 // since |resolver_| could be running on |thread_|. 331
230 } 332 // Run the job. Once it has completed (regardless of whether it was
231 333 // cancelled), it will invoke OnJobCompleted() on this thread.
232 int SingleThreadedProxyResolver::GetProxyForURL(const GURL& url, 334 job->set_executor(this);
233 ProxyInfo* results, 335 job->FinishedWaitingForThread();
234 CompletionCallback* callback, 336 thread_->message_loop()->PostTask(
235 RequestHandle* request, 337 FROM_HERE,
236 const BoundNetLog& net_log) { 338 NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
339 }
340
341 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
342 DCHECK_EQ(job, outstanding_job_.get());
343 outstanding_job_ = NULL;
344 coordinator_->OnExecutorReady(this);
345 }
346
347 void MultiThreadedProxyResolver::Executor::Destroy() {
348 DCHECK(coordinator_);
349
350 // Give the resolver an opportunity to shutdown from THIS THREAD before
351 // joining on the resolver thread. This allows certain implementations
352 // to avoid deadlocks.
353 resolver_->Shutdown();
354
355 // Join the worker thread.
356 thread_.reset();
357
358 // Cancel any outstanding job.
359 if (outstanding_job_) {
360 outstanding_job_->Cancel();
361 // Orphan the job (since this executor may be deleted soon).
362 outstanding_job_->set_executor(NULL);
363 }
364
365 // It is now safe to free the ProxyResolver, since all the tasks that
366 // were using it on the resolver thread have completed.
367 resolver_.reset();
368
369 // Null some stuff as a precaution.
370 coordinator_ = NULL;
371 outstanding_job_ = NULL;
372 }
373
374 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
375 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
376 thread_->message_loop()->PostTask(
377 FROM_HERE,
378 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
379 }
380
381 MultiThreadedProxyResolver::Executor::~Executor() {
382 // The important cleanup happens as part of Destroy(), which should always be
383 // called first.
384 DCHECK(!coordinator_) << "Destroy() was not called";
385 DCHECK(!thread_.get());
386 DCHECK(!resolver_.get());
387 DCHECK(!outstanding_job_);
388 }
389
390 // MultiThreadedProxyResolver --------------------------------------------------
391
392 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
393 ProxyResolverFactory* resolver_factory,
394 size_t max_num_threads)
395 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
396 resolver_factory_(resolver_factory),
397 max_num_threads_(max_num_threads),
398 was_set_pac_script_called_(false) {
399 DCHECK_GE(max_num_threads, 1u);
400 }
401
402 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
403 // We will cancel all outstanding requests.
404 pending_jobs_.clear();
405 ReleaseAllExecutors();
406 }
407
408 int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
409 ProxyInfo* results,
410 CompletionCallback* callback,
411 RequestHandle* request,
412 const BoundNetLog& net_log) {
413 DCHECK(CalledOnValidThread());
237 DCHECK(callback); 414 DCHECK(callback);
238 415 DCHECK(was_set_pac_script_called_)
239 scoped_refptr<Job> job = new Job(this, url, results, callback, net_log); 416 << "Resolver is un-initialized. Must call SetPacScript() first!";
240 bool is_first_job = pending_jobs_.empty(); 417
241 pending_jobs_.push_back(job); // Jobs can never finish synchronously. 418 scoped_refptr<GetProxyForURLJob> job =
242 419 new GetProxyForURLJob(url, results, callback, net_log);
243 if (is_first_job) {
244 // If there is nothing already running, start the job now.
245 EnsureThreadStarted();
246 job->Start();
247 } else {
248 // Otherwise the job will get started eventually by ProcessPendingJobs().
249 job->net_log()->BeginEvent(
250 NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL);
251 }
252 420
253 // Completion will be notified through |callback|, unless the caller cancels 421 // Completion will be notified through |callback|, unless the caller cancels
254 // the request using |request|. 422 // the request using |request|.
255 if (request) 423 if (request)
256 *request = reinterpret_cast<RequestHandle>(job.get()); 424 *request = reinterpret_cast<RequestHandle>(job.get());
257 425
426 // If there is an executor that is ready to run this request, submit it!
427 Executor* executor = FindIdleExecutor();
428 if (executor) {
429 DCHECK_EQ(0u, pending_jobs_.size());
430 executor->StartJob(job);
431 return ERR_IO_PENDING;
432 }
433
434 // Otherwise queue this request. (We will schedule it to a thread once one
435 // becomes available).
436 job->WaitingForThread();
437 pending_jobs_.push_back(job);
438
439 // If we haven't already reached the thread limit, provision a new thread to
440 // drain the requests more quickly.
441 if (executors_.size() < max_num_threads_) {
442 executor = AddNewExecutor();
443 executor->StartJob(
444 new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL));
445 }
446
258 return ERR_IO_PENDING; 447 return ERR_IO_PENDING;
259 } 448 }
260 449
261 // There are three states of the request we need to handle: 450 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
262 // (1) Not started (just sitting in the queue). 451 DCHECK(CalledOnValidThread());
263 // (2) Executing Job::DoQuery in the worker thread.
264 // (3) Waiting for Job::QueryComplete to be run on the origin thread.
265 void SingleThreadedProxyResolver::CancelRequest(RequestHandle req) {
266 DCHECK(req); 452 DCHECK(req);
267 453
268 Job* job = reinterpret_cast<Job*>(req); 454 Job* job = reinterpret_cast<Job*>(req);
269 455 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
270 bool is_active_job = job->is_started() && !pending_jobs_.empty() && 456
271 pending_jobs_.front().get() == job; 457 if (job->executor()) {
272 458 // If the job was already submitted to the executor, just mark it
273 job->Cancel(); 459 // as cancelled so the user callback isn't run on completion.
274 460 job->Cancel();
275 if (is_active_job) { 461 } else {
276 RemoveFrontOfJobsQueueAndStartNext(job); 462 // Otherwise the job is just sitting in a queue.
277 return; 463 PendingJobsQueue::iterator it =
278 } 464 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
279 465 DCHECK(it != pending_jobs_.end());
280 // Otherwise just delete the job from the queue. 466 pending_jobs_.erase(it);
281 PendingJobsQueue::iterator it = std::find( 467 }
282 pending_jobs_.begin(), pending_jobs_.end(), job); 468 }
283 DCHECK(it != pending_jobs_.end()); 469
284 pending_jobs_.erase(it); 470 void MultiThreadedProxyResolver::CancelSetPacScript() {
285 } 471 DCHECK(CalledOnValidThread());
286 472 DCHECK_EQ(0u, pending_jobs_.size());
287 void SingleThreadedProxyResolver::CancelSetPacScript() { 473 DCHECK_EQ(1u, executors_.size());
288 DCHECK(outstanding_set_pac_script_task_); 474 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
289 outstanding_set_pac_script_task_->Cancel(); 475 executors_[0]->outstanding_job()->type());
290 outstanding_set_pac_script_task_ = NULL; 476
291 } 477 // Defensively clear some data which shouldn't be getting used
292 478 // anymore.
293 void SingleThreadedProxyResolver::PurgeMemory() { 479 was_set_pac_script_called_ = false;
294 if (thread_.get()) { 480 current_pac_url_ = GURL();
295 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); 481 current_pac_script_ = string16();
296 thread_->message_loop()->PostTask(FROM_HERE, 482
297 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); 483 ReleaseAllExecutors();
298 } 484 }
299 } 485
300 486 void MultiThreadedProxyResolver::PurgeMemory() {
301 int SingleThreadedProxyResolver::SetPacScript( 487 DCHECK(CalledOnValidThread());
488 for (ExecutorList::iterator it = executors_.begin();
489 it != executors_.end(); ++it) {
490 Executor* executor = *it;
491 executor->PurgeMemory();
492 }
493 }
494
495 int MultiThreadedProxyResolver::SetPacScript(
302 const GURL& pac_url, 496 const GURL& pac_url,
303 const string16& pac_script, 497 const string16& pac_script,
304 CompletionCallback* callback) { 498 CompletionCallback* callback) {
305 EnsureThreadStarted(); 499 DCHECK(CalledOnValidThread());
306 DCHECK(!outstanding_set_pac_script_task_); 500 DCHECK(callback);
307 501
308 SetPacScriptTask* task = new SetPacScriptTask( 502 // Save the script details, so we can provision new executors later.
309 this, pac_url, pac_script, callback); 503 // (We rely on internal reference counting of strings to avoid this memory
310 outstanding_set_pac_script_task_ = task; 504 // being duplicated by each of the resolver threads).
311 task->Start(); 505 was_set_pac_script_called_ = true;
506 current_pac_url_ = pac_url;
507 current_pac_script_ = pac_script;
508
509 // The user should not have any outstanding requests when they call
510 // SetPacScript().
511 CheckNoOutstandingUserRequests();
512
513 // Destroy all of the current threads and their proxy resolvers.
514 ReleaseAllExecutors();
515
516 // Provision a new executor, and run the SetPacScript request. On completion
517 // notification will be sent through |callback|.
518 Executor* executor = AddNewExecutor();
519 executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback));
312 return ERR_IO_PENDING; 520 return ERR_IO_PENDING;
313 } 521 }
314 522
315 void SingleThreadedProxyResolver::EnsureThreadStarted() { 523 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
316 if (!thread_.get()) { 524 DCHECK(CalledOnValidThread());
317 thread_.reset(new base::Thread("pac-thread")); 525 CHECK_EQ(0u, pending_jobs_.size());
318 thread_->Start(); 526
319 } 527 for (ExecutorList::const_iterator it = executors_.begin();
320 } 528 it != executors_.end(); ++it) {
321 529 const Executor* executor = *it;
322 void SingleThreadedProxyResolver::ProcessPendingJobs() { 530 Job* job = executor->outstanding_job();
531 // The "has_user_callback()" is to exclude jobs for which the callback
532 // has already been invoked, or was not user-initiated (as in the case of
533 // lazy thread provisions). User-initiated jobs may !has_user_callback()
534 // when the callback has already been run. (Since we only clear the
535 // outstanding job AFTER the callback has been invoked, it is possible
536 // for a new request to be started from within the callback).
537 CHECK(!job || job->was_cancelled() || !job->has_user_callback());
538 }
539 }
540
541 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
542 DCHECK(CalledOnValidThread());
543 for (ExecutorList::iterator it = executors_.begin();
544 it != executors_.end(); ++it) {
545 Executor* executor = *it;
546 executor->Destroy();
547 }
548 executors_.clear();
549 }
550
551 MultiThreadedProxyResolver::Executor*
552 MultiThreadedProxyResolver::FindIdleExecutor() {
553 DCHECK(CalledOnValidThread());
554 for (ExecutorList::iterator it = executors_.begin();
555 it != executors_.end(); ++it) {
556 Executor* executor = *it;
557 if (!executor->outstanding_job())
558 return executor;
559 }
560 return NULL;
561 }
562
563 MultiThreadedProxyResolver::Executor*
564 MultiThreadedProxyResolver::AddNewExecutor() {
565 DCHECK(CalledOnValidThread());
566 DCHECK_LT(executors_.size(), max_num_threads_);
567 // The "thread number" is used to give the thread a unique name.
568 int thread_number = executors_.size();
569 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
570 Executor* executor = new Executor(
571 this, resolver, thread_number);
572 executors_.push_back(executor);
573 return executor;
574 }
575
576 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
577 DCHECK(CalledOnValidThread());
323 if (pending_jobs_.empty()) 578 if (pending_jobs_.empty())
324 return; 579 return;
325 580
326 // Get the next job to process (FIFO). 581 // Get the next job to process (FIFO). Transfer it from the pending queue
327 Job* job = pending_jobs_.front().get(); 582 // to the executor.
328 if (job->is_started()) 583 scoped_refptr<Job> job = pending_jobs_.front();
329 return;
330
331 job->net_log()->EndEvent(
332 NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL);
333
334 EnsureThreadStarted();
335 job->Start();
336 }
337
338 void SingleThreadedProxyResolver::RemoveFrontOfJobsQueueAndStartNext(
339 Job* expected_job) {
340 DCHECK_EQ(expected_job, pending_jobs_.front().get());
341 pending_jobs_.pop_front(); 584 pending_jobs_.pop_front();
342 585 executor->StartJob(job);
343 // Start next work item.
344 ProcessPendingJobs();
345 }
346
347 void SingleThreadedProxyResolver::RemoveOutstandingSetPacScriptTask(
348 SetPacScriptTask* task) {
349 DCHECK_EQ(outstanding_set_pac_script_task_.get(), task);
350 outstanding_set_pac_script_task_ = NULL;
351 } 586 }
352 587
353 } // namespace net 588 } // namespace net
OLDNEW
« no previous file with comments | « net/proxy/multi_threaded_proxy_resolver.h ('k') | net/proxy/multi_threaded_proxy_resolver_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698