Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: net/proxy/multi_threaded_proxy_resolver.cc

Issue 2945004: Revert 51877, since SpdyNetworkTransactionTest.CorruptFrameSessionError start... (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: Created 10 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6
7 #include "base/message_loop.h"
8 #include "base/string_util.h"
9 #include "base/thread.h"
10 #include "net/base/capturing_net_log.h"
11 #include "net/base/net_errors.h"
12 #include "net/proxy/proxy_info.h"
13
14 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
15 // data when SetPacScript fails. That will reclaim memory when
16 // testing bogus scripts.
17
18 namespace net {
19
20 namespace {
21
22 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
23 public:
24 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
25 void PurgeMemory() { resolver_->PurgeMemory(); }
26 private:
27 friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
28 ~PurgeMemoryTask() {}
29 ProxyResolver* resolver_;
30 };
31
32 } // namespace
33
34 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
35 // thread and a synchronous ProxyResolver (which will be operated on said
36 // thread.)
37 class MultiThreadedProxyResolver::Executor
38 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
39 public:
40 // |coordinator| must remain valid throughout our lifetime. It is used to
41 // signal when the executor is ready to receive work by calling
42 // |coordinator->OnExecutorReady()|.
43 // The constructor takes ownership of |resolver|.
44 // |thread_number| is an identifier used when naming the worker thread.
45 Executor(MultiThreadedProxyResolver* coordinator,
46 ProxyResolver* resolver,
47 int thread_number);
48
49 // Submit a job to this executor.
50 void StartJob(Job* job);
51
52 // Callback for when a job has completed running on the executor's thread.
53 void OnJobCompleted(Job* job);
54
55 // Cleanup the executor. Cancels all outstanding work, and frees the thread
56 // and resolver.
57 void Destroy();
58
59 void PurgeMemory();
60
61 // Returns the outstanding job, or NULL.
62 Job* outstanding_job() const { return outstanding_job_.get(); }
63
64 ProxyResolver* resolver() { return resolver_.get(); }
65
66 int thread_number() const { return thread_number_; }
67
68 private:
69 friend class base::RefCountedThreadSafe<Executor>;
70 ~Executor();
71
72 MultiThreadedProxyResolver* coordinator_;
73 const int thread_number_;
74
75 // The currently active job for this executor (either a SetPacScript or
76 // GetProxyForURL task).
77 scoped_refptr<Job> outstanding_job_;
78
79 // The synchronous resolver implementation.
80 scoped_ptr<ProxyResolver> resolver_;
81
82 // The thread where |resolver_| is run on.
83 // Note that declaration ordering is important here. |thread_| needs to be
84 // destroyed *before* |resolver_|, in case |resolver_| is currently
85 // executing on |thread_|.
86 scoped_ptr<base::Thread> thread_;
87 };
88
89 // MultiThreadedProxyResolver::Job ---------------------------------------------
90
91 class MultiThreadedProxyResolver::Job
92 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
93 public:
94 // Identifies the subclass of Job (only being used for debugging purposes).
95 enum Type {
96 TYPE_GET_PROXY_FOR_URL,
97 TYPE_SET_PAC_SCRIPT,
98 TYPE_SET_PAC_SCRIPT_INTERNAL,
99 };
100
101 Job(Type type, CompletionCallback* user_callback)
102 : type_(type),
103 user_callback_(user_callback),
104 executor_(NULL),
105 was_cancelled_(false) {
106 }
107
108 void set_executor(Executor* executor) {
109 executor_ = executor;
110 }
111
112 // The "executor" is the job runner that is scheduling this job. If
113 // this job has not been submitted to an executor yet, this will be
114 // NULL (and we know it hasn't started yet).
115 Executor* executor() {
116 return executor_;
117 }
118
119 // Mark the job as having been cancelled.
120 void Cancel() {
121 was_cancelled_ = true;
122 }
123
124 // Returns true if Cancel() has been called.
125 bool was_cancelled() const { return was_cancelled_; }
126
127 Type type() const { return type_; }
128
129 // Returns true if this job still has a user callback. Some jobs
130 // do not have a user callback, because they were helper jobs
131 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
132 //
133 // Otherwise jobs that correspond with user-initiated work will
134 // have a non-NULL callback up until the callback is run.
135 bool has_user_callback() const { return user_callback_ != NULL; }
136
137 // This method is called when the job is inserted into a wait queue
138 // because no executors were ready to accept it.
139 virtual void WaitingForThread() {}
140
141 // This method is called just before the job is posted to the work thread.
142 virtual void FinishedWaitingForThread() {}
143
144 // This method is called on the worker thread to do the job's work. On
145 // completion, implementors are expected to call OnJobCompleted() on
146 // |origin_loop|.
147 virtual void Run(MessageLoop* origin_loop) = 0;
148
149 protected:
150 void OnJobCompleted() {
151 // |executor_| will be NULL if the executor has already been deleted.
152 if (executor_)
153 executor_->OnJobCompleted(this);
154 }
155
156 void RunUserCallback(int result) {
157 DCHECK(has_user_callback());
158 CompletionCallback* callback = user_callback_;
159 // Null the callback so has_user_callback() will now return false.
160 user_callback_ = NULL;
161 callback->Run(result);
162 }
163
164 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
165
166 virtual ~Job() {}
167
168 private:
169 const Type type_;
170 CompletionCallback* user_callback_;
171 Executor* executor_;
172 bool was_cancelled_;
173 };
174
175 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
176
177 // Runs on the worker thread to call ProxyResolver::SetPacScript.
178 class MultiThreadedProxyResolver::SetPacScriptJob
179 : public MultiThreadedProxyResolver::Job {
180 public:
181 SetPacScriptJob(const GURL& pac_url,
182 const string16& pac_script,
183 CompletionCallback* callback)
184 : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
185 callback),
186 pac_url_(pac_url),
187 pac_script_(pac_script) {
188 }
189
190 // Runs on the worker thread.
191 virtual void Run(MessageLoop* origin_loop) {
192 ProxyResolver* resolver = executor()->resolver();
193 int rv = resolver->expects_pac_bytes() ?
194 resolver->SetPacScriptByData(pac_script_, NULL) :
195 resolver->SetPacScriptByUrl(pac_url_, NULL);
196
197 DCHECK_NE(rv, ERR_IO_PENDING);
198 origin_loop->PostTask(
199 FROM_HERE,
200 NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
201 }
202
203 private:
204 // Runs the completion callback on the origin thread.
205 void RequestComplete(int result_code) {
206 // The task may have been cancelled after it was started.
207 if (!was_cancelled() && has_user_callback()) {
208 RunUserCallback(result_code);
209 }
210 OnJobCompleted();
211 }
212
213 const GURL pac_url_;
214 const string16 pac_script_;
215 };
216
217 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
218
219 class MultiThreadedProxyResolver::GetProxyForURLJob
220 : public MultiThreadedProxyResolver::Job {
221 public:
222 // |url| -- the URL of the query.
223 // |results| -- the structure to fill with proxy resolve results.
224 GetProxyForURLJob(const GURL& url,
225 ProxyInfo* results,
226 CompletionCallback* callback,
227 const BoundNetLog& net_log)
228 : Job(TYPE_GET_PROXY_FOR_URL, callback),
229 results_(results),
230 net_log_(net_log),
231 url_(url),
232 was_waiting_for_thread_(false) {
233 DCHECK(callback);
234 }
235
236 BoundNetLog* net_log() { return &net_log_; }
237
238 virtual void WaitingForThread() {
239 was_waiting_for_thread_ = true;
240 net_log_.BeginEvent(
241 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
242 }
243
244 virtual void FinishedWaitingForThread() {
245 DCHECK(executor());
246
247 if (was_waiting_for_thread_) {
248 net_log_.EndEvent(
249 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
250 }
251
252 net_log_.AddEvent(
253 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
254 new NetLogIntegerParameter(
255 "thread_number", executor()->thread_number()));
256 }
257
258 // Runs on the worker thread.
259 virtual void Run(MessageLoop* origin_loop) {
260 const size_t kNetLogBound = 50u;
261 worker_log_.reset(new CapturingNetLog(kNetLogBound));
262 BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get());
263
264 ProxyResolver* resolver = executor()->resolver();
265 int rv = resolver->GetProxyForURL(
266 url_, &results_buf_, NULL, NULL, bound_worker_log);
267 DCHECK_NE(rv, ERR_IO_PENDING);
268
269 origin_loop->PostTask(
270 FROM_HERE,
271 NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
272 }
273
274 private:
275 // Runs the completion callback on the origin thread.
276 void QueryComplete(int result_code) {
277 // The Job may have been cancelled after it was started.
278 if (!was_cancelled()) {
279 // Merge the load log that was generated on the worker thread, into the
280 // main log.
281 CapturingBoundNetLog bound_worker_log(NetLog::Source(),
282 worker_log_.release());
283 bound_worker_log.AppendTo(net_log_);
284
285 if (result_code >= OK) { // Note: unit-tests use values > 0.
286 results_->Use(results_buf_);
287 }
288 RunUserCallback(result_code);
289 }
290 OnJobCompleted();
291 }
292
293 // Must only be used on the "origin" thread.
294 ProxyInfo* results_;
295 BoundNetLog net_log_;
296 const GURL url_;
297
298 // Usable from within DoQuery on the worker thread.
299 ProxyInfo results_buf_;
300
301 // Used to pass the captured events between DoQuery [worker thread] and
302 // QueryComplete [origin thread].
303 scoped_ptr<CapturingNetLog> worker_log_;
304
305 bool was_waiting_for_thread_;
306 };
307
308 // MultiThreadedProxyResolver::Executor ----------------------------------------
309
310 MultiThreadedProxyResolver::Executor::Executor(
311 MultiThreadedProxyResolver* coordinator,
312 ProxyResolver* resolver,
313 int thread_number)
314 : coordinator_(coordinator),
315 thread_number_(thread_number),
316 resolver_(resolver) {
317 DCHECK(coordinator);
318 DCHECK(resolver);
319 // Start up the thread.
320 // Note that it is safe to pass a temporary C-String to Thread(), as it will
321 // make a copy.
322 std::string thread_name =
323 StringPrintf("PAC thread #%d", thread_number);
324 thread_.reset(new base::Thread(thread_name.c_str()));
325 thread_->Start();
326 }
327
328 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
329 DCHECK(!outstanding_job_);
330 outstanding_job_ = job;
331
332 // Run the job. Once it has completed (regardless of whether it was
333 // cancelled), it will invoke OnJobCompleted() on this thread.
334 job->set_executor(this);
335 job->FinishedWaitingForThread();
336 thread_->message_loop()->PostTask(
337 FROM_HERE,
338 NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
339 }
340
341 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
342 DCHECK_EQ(job, outstanding_job_.get());
343 outstanding_job_ = NULL;
344 coordinator_->OnExecutorReady(this);
345 }
346
347 void MultiThreadedProxyResolver::Executor::Destroy() {
348 DCHECK(coordinator_);
349
350 // Give the resolver an opportunity to shutdown from THIS THREAD before
351 // joining on the resolver thread. This allows certain implementations
352 // to avoid deadlocks.
353 resolver_->Shutdown();
354
355 // Join the worker thread.
356 thread_.reset();
357
358 // Cancel any outstanding job.
359 if (outstanding_job_) {
360 outstanding_job_->Cancel();
361 // Orphan the job (since this executor may be deleted soon).
362 outstanding_job_->set_executor(NULL);
363 }
364
365 // It is now safe to free the ProxyResolver, since all the tasks that
366 // were using it on the resolver thread have completed.
367 resolver_.reset();
368
369 // Null some stuff as a precaution.
370 coordinator_ = NULL;
371 outstanding_job_ = NULL;
372 }
373
374 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
375 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
376 thread_->message_loop()->PostTask(
377 FROM_HERE,
378 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
379 }
380
381 MultiThreadedProxyResolver::Executor::~Executor() {
382 // The important cleanup happens as part of Destroy(), which should always be
383 // called first.
384 DCHECK(!coordinator_) << "Destroy() was not called";
385 DCHECK(!thread_.get());
386 DCHECK(!resolver_.get());
387 DCHECK(!outstanding_job_);
388 }
389
390 // MultiThreadedProxyResolver --------------------------------------------------
391
392 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
393 ProxyResolverFactory* resolver_factory,
394 size_t max_num_threads)
395 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
396 resolver_factory_(resolver_factory),
397 max_num_threads_(max_num_threads),
398 was_set_pac_script_called_(false) {
399 DCHECK_GE(max_num_threads, 1u);
400 }
401
402 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
403 // We will cancel all outstanding requests.
404 pending_jobs_.clear();
405 ReleaseAllExecutors();
406 }
407
408 int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
409 ProxyInfo* results,
410 CompletionCallback* callback,
411 RequestHandle* request,
412 const BoundNetLog& net_log) {
413 DCHECK(CalledOnValidThread());
414 DCHECK(callback);
415 DCHECK(was_set_pac_script_called_)
416 << "Resolver is un-initialized. Must call SetPacScript() first!";
417
418 scoped_refptr<GetProxyForURLJob> job =
419 new GetProxyForURLJob(url, results, callback, net_log);
420
421 // Completion will be notified through |callback|, unless the caller cancels
422 // the request using |request|.
423 if (request)
424 *request = reinterpret_cast<RequestHandle>(job.get());
425
426 // If there is an executor that is ready to run this request, submit it!
427 Executor* executor = FindIdleExecutor();
428 if (executor) {
429 DCHECK_EQ(0u, pending_jobs_.size());
430 executor->StartJob(job);
431 return ERR_IO_PENDING;
432 }
433
434 // Otherwise queue this request. (We will schedule it to a thread once one
435 // becomes available).
436 job->WaitingForThread();
437 pending_jobs_.push_back(job);
438
439 // If we haven't already reached the thread limit, provision a new thread to
440 // drain the requests more quickly.
441 if (executors_.size() < max_num_threads_) {
442 executor = AddNewExecutor();
443 executor->StartJob(
444 new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL));
445 }
446
447 return ERR_IO_PENDING;
448 }
449
450 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
451 DCHECK(CalledOnValidThread());
452 DCHECK(req);
453
454 Job* job = reinterpret_cast<Job*>(req);
455 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
456
457 if (job->executor()) {
458 // If the job was already submitted to the executor, just mark it
459 // as cancelled so the user callback isn't run on completion.
460 job->Cancel();
461 } else {
462 // Otherwise the job is just sitting in a queue.
463 PendingJobsQueue::iterator it =
464 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
465 DCHECK(it != pending_jobs_.end());
466 pending_jobs_.erase(it);
467 }
468 }
469
470 void MultiThreadedProxyResolver::CancelSetPacScript() {
471 DCHECK(CalledOnValidThread());
472 DCHECK_EQ(0u, pending_jobs_.size());
473 DCHECK_EQ(1u, executors_.size());
474 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
475 executors_[0]->outstanding_job()->type());
476
477 // Defensively clear some data which shouldn't be getting used
478 // anymore.
479 was_set_pac_script_called_ = false;
480 current_pac_url_ = GURL();
481 current_pac_script_ = string16();
482
483 ReleaseAllExecutors();
484 }
485
486 void MultiThreadedProxyResolver::PurgeMemory() {
487 DCHECK(CalledOnValidThread());
488 for (ExecutorList::iterator it = executors_.begin();
489 it != executors_.end(); ++it) {
490 Executor* executor = *it;
491 executor->PurgeMemory();
492 }
493 }
494
495 int MultiThreadedProxyResolver::SetPacScript(
496 const GURL& pac_url,
497 const string16& pac_script,
498 CompletionCallback* callback) {
499 DCHECK(CalledOnValidThread());
500 DCHECK(callback);
501
502 // Save the script details, so we can provision new executors later.
503 // (We rely on internal reference counting of strings to avoid this memory
504 // being duplicated by each of the resolver threads).
505 was_set_pac_script_called_ = true;
506 current_pac_url_ = pac_url;
507 current_pac_script_ = pac_script;
508
509 // The user should not have any outstanding requests when they call
510 // SetPacScript().
511 CheckNoOutstandingUserRequests();
512
513 // Destroy all of the current threads and their proxy resolvers.
514 ReleaseAllExecutors();
515
516 // Provision a new executor, and run the SetPacScript request. On completion
517 // notification will be sent through |callback|.
518 Executor* executor = AddNewExecutor();
519 executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback));
520 return ERR_IO_PENDING;
521 }
522
523 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
524 DCHECK(CalledOnValidThread());
525 CHECK_EQ(0u, pending_jobs_.size());
526
527 for (ExecutorList::const_iterator it = executors_.begin();
528 it != executors_.end(); ++it) {
529 const Executor* executor = *it;
530 Job* job = executor->outstanding_job();
531 // The "has_user_callback()" is to exclude jobs for which the callback
532 // has already been invoked, or was not user-initiated (as in the case of
533 // lazy thread provisions). User-initiated jobs may !has_user_callback()
534 // when the callback has already been run. (Since we only clear the
535 // outstanding job AFTER the callback has been invoked, it is possible
536 // for a new request to be started from within the callback).
537 CHECK(!job || job->was_cancelled() || !job->has_user_callback());
538 }
539 }
540
541 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
542 DCHECK(CalledOnValidThread());
543 for (ExecutorList::iterator it = executors_.begin();
544 it != executors_.end(); ++it) {
545 Executor* executor = *it;
546 executor->Destroy();
547 }
548 executors_.clear();
549 }
550
551 MultiThreadedProxyResolver::Executor*
552 MultiThreadedProxyResolver::FindIdleExecutor() {
553 DCHECK(CalledOnValidThread());
554 for (ExecutorList::iterator it = executors_.begin();
555 it != executors_.end(); ++it) {
556 Executor* executor = *it;
557 if (!executor->outstanding_job())
558 return executor;
559 }
560 return NULL;
561 }
562
563 MultiThreadedProxyResolver::Executor*
564 MultiThreadedProxyResolver::AddNewExecutor() {
565 DCHECK(CalledOnValidThread());
566 DCHECK_LT(executors_.size(), max_num_threads_);
567 // The "thread number" is used to give the thread a unique name.
568 int thread_number = executors_.size();
569 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
570 Executor* executor = new Executor(
571 this, resolver, thread_number);
572 executors_.push_back(executor);
573 return executor;
574 }
575
576 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
577 DCHECK(CalledOnValidThread());
578 if (pending_jobs_.empty())
579 return;
580
581 // Get the next job to process (FIFO). Transfer it from the pending queue
582 // to the executor.
583 scoped_refptr<Job> job = pending_jobs_.front();
584 pending_jobs_.pop_front();
585 executor->StartJob(job);
586 }
587
588 } // namespace net
OLDNEW
« no previous file with comments | « net/proxy/multi_threaded_proxy_resolver.h ('k') | net/proxy/multi_threaded_proxy_resolver_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698