OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/proxy/multi_threaded_proxy_resolver.h" | |
6 | |
7 #include "base/bind.h" | |
8 #include "base/bind_helpers.h" | |
9 #include "base/message_loop/message_loop_proxy.h" | |
10 #include "base/strings/string_util.h" | |
11 #include "base/strings/stringprintf.h" | |
12 #include "base/threading/thread.h" | |
13 #include "base/threading/thread_restrictions.h" | |
14 #include "net/base/net_errors.h" | |
15 #include "net/base/net_log.h" | |
16 #include "net/proxy/proxy_info.h" | |
17 | |
18 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script | |
19 // data when SetPacScript fails. That will reclaim memory when | |
20 // testing bogus scripts. | |
21 | |
22 namespace net { | |
23 | |
24 // An "executor" is a job-runner for PAC requests. It encapsulates a worker | |
25 // thread and a synchronous ProxyResolver (which will be operated on said | |
26 // thread.) | |
27 class MultiThreadedProxyResolver::Executor | |
28 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { | |
29 public: | |
30 // |coordinator| must remain valid throughout our lifetime. It is used to | |
31 // signal when the executor is ready to receive work by calling | |
32 // |coordinator->OnExecutorReady()|. | |
33 // The constructor takes ownership of |resolver|. | |
34 // |thread_number| is an identifier used when naming the worker thread. | |
35 Executor(MultiThreadedProxyResolver* coordinator, | |
36 ProxyResolver* resolver, | |
37 int thread_number); | |
38 | |
39 // Submit a job to this executor. | |
40 void StartJob(Job* job); | |
41 | |
42 // Callback for when a job has completed running on the executor's thread. | |
43 void OnJobCompleted(Job* job); | |
44 | |
45 // Cleanup the executor. Cancels all outstanding work, and frees the thread | |
46 // and resolver. | |
47 void Destroy(); | |
48 | |
49 // Returns the outstanding job, or NULL. | |
50 Job* outstanding_job() const { return outstanding_job_.get(); } | |
51 | |
52 ProxyResolver* resolver() { return resolver_.get(); } | |
53 | |
54 int thread_number() const { return thread_number_; } | |
55 | |
56 private: | |
57 friend class base::RefCountedThreadSafe<Executor>; | |
58 ~Executor(); | |
59 | |
60 MultiThreadedProxyResolver* coordinator_; | |
61 const int thread_number_; | |
62 | |
63 // The currently active job for this executor (either a SetPacScript or | |
64 // GetProxyForURL task). | |
65 scoped_refptr<Job> outstanding_job_; | |
66 | |
67 // The synchronous resolver implementation. | |
68 scoped_ptr<ProxyResolver> resolver_; | |
69 | |
70 // The thread where |resolver_| is run on. | |
71 // Note that declaration ordering is important here. |thread_| needs to be | |
72 // destroyed *before* |resolver_|, in case |resolver_| is currently | |
73 // executing on |thread_|. | |
74 scoped_ptr<base::Thread> thread_; | |
75 }; | |
76 | |
77 // MultiThreadedProxyResolver::Job --------------------------------------------- | |
78 | |
79 class MultiThreadedProxyResolver::Job | |
80 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { | |
81 public: | |
82 // Identifies the subclass of Job (only being used for debugging purposes). | |
83 enum Type { | |
84 TYPE_GET_PROXY_FOR_URL, | |
85 TYPE_SET_PAC_SCRIPT, | |
86 TYPE_SET_PAC_SCRIPT_INTERNAL, | |
87 }; | |
88 | |
89 Job(Type type, const CompletionCallback& callback) | |
90 : type_(type), | |
91 callback_(callback), | |
92 executor_(NULL), | |
93 was_cancelled_(false) { | |
94 } | |
95 | |
96 void set_executor(Executor* executor) { | |
97 executor_ = executor; | |
98 } | |
99 | |
100 // The "executor" is the job runner that is scheduling this job. If | |
101 // this job has not been submitted to an executor yet, this will be | |
102 // NULL (and we know it hasn't started yet). | |
103 Executor* executor() { | |
104 return executor_; | |
105 } | |
106 | |
107 // Mark the job as having been cancelled. | |
108 void Cancel() { | |
109 was_cancelled_ = true; | |
110 } | |
111 | |
112 // Returns true if Cancel() has been called. | |
113 bool was_cancelled() const { return was_cancelled_; } | |
114 | |
115 Type type() const { return type_; } | |
116 | |
117 // Returns true if this job still has a user callback. Some jobs | |
118 // do not have a user callback, because they were helper jobs | |
119 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). | |
120 // | |
121 // Otherwise jobs that correspond with user-initiated work will | |
122 // have a non-null callback up until the callback is run. | |
123 bool has_user_callback() const { return !callback_.is_null(); } | |
124 | |
125 // This method is called when the job is inserted into a wait queue | |
126 // because no executors were ready to accept it. | |
127 virtual void WaitingForThread() {} | |
128 | |
129 // This method is called just before the job is posted to the work thread. | |
130 virtual void FinishedWaitingForThread() {} | |
131 | |
132 // This method is called on the worker thread to do the job's work. On | |
133 // completion, implementors are expected to call OnJobCompleted() on | |
134 // |origin_loop|. | |
135 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0; | |
136 | |
137 protected: | |
138 void OnJobCompleted() { | |
139 // |executor_| will be NULL if the executor has already been deleted. | |
140 if (executor_) | |
141 executor_->OnJobCompleted(this); | |
142 } | |
143 | |
144 void RunUserCallback(int result) { | |
145 DCHECK(has_user_callback()); | |
146 CompletionCallback callback = callback_; | |
147 // Reset the callback so has_user_callback() will now return false. | |
148 callback_.Reset(); | |
149 callback.Run(result); | |
150 } | |
151 | |
152 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; | |
153 | |
154 virtual ~Job() {} | |
155 | |
156 private: | |
157 const Type type_; | |
158 CompletionCallback callback_; | |
159 Executor* executor_; | |
160 bool was_cancelled_; | |
161 }; | |
162 | |
163 // MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- | |
164 | |
165 // Runs on the worker thread to call ProxyResolver::SetPacScript. | |
166 class MultiThreadedProxyResolver::SetPacScriptJob | |
167 : public MultiThreadedProxyResolver::Job { | |
168 public: | |
169 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data, | |
170 const CompletionCallback& callback) | |
171 : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT : | |
172 TYPE_SET_PAC_SCRIPT_INTERNAL, | |
173 callback), | |
174 script_data_(script_data) { | |
175 } | |
176 | |
177 // Runs on the worker thread. | |
178 void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) override { | |
179 ProxyResolver* resolver = executor()->resolver(); | |
180 int rv = resolver->SetPacScript(script_data_, CompletionCallback()); | |
181 | |
182 DCHECK_NE(rv, ERR_IO_PENDING); | |
183 origin_loop->PostTask( | |
184 FROM_HERE, | |
185 base::Bind(&SetPacScriptJob::RequestComplete, this, rv)); | |
186 } | |
187 | |
188 protected: | |
189 ~SetPacScriptJob() override {} | |
190 | |
191 private: | |
192 // Runs the completion callback on the origin thread. | |
193 void RequestComplete(int result_code) { | |
194 // The task may have been cancelled after it was started. | |
195 if (!was_cancelled() && has_user_callback()) { | |
196 RunUserCallback(result_code); | |
197 } | |
198 OnJobCompleted(); | |
199 } | |
200 | |
201 const scoped_refptr<ProxyResolverScriptData> script_data_; | |
202 }; | |
203 | |
204 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ | |
205 | |
206 class MultiThreadedProxyResolver::GetProxyForURLJob | |
207 : public MultiThreadedProxyResolver::Job { | |
208 public: | |
209 // |url| -- the URL of the query. | |
210 // |results| -- the structure to fill with proxy resolve results. | |
211 GetProxyForURLJob(const GURL& url, | |
212 ProxyInfo* results, | |
213 const CompletionCallback& callback, | |
214 const BoundNetLog& net_log) | |
215 : Job(TYPE_GET_PROXY_FOR_URL, callback), | |
216 results_(results), | |
217 net_log_(net_log), | |
218 url_(url), | |
219 was_waiting_for_thread_(false) { | |
220 DCHECK(!callback.is_null()); | |
221 } | |
222 | |
223 BoundNetLog* net_log() { return &net_log_; } | |
224 | |
225 void WaitingForThread() override { | |
226 was_waiting_for_thread_ = true; | |
227 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); | |
228 } | |
229 | |
230 void FinishedWaitingForThread() override { | |
231 DCHECK(executor()); | |
232 | |
233 if (was_waiting_for_thread_) { | |
234 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); | |
235 } | |
236 | |
237 net_log_.AddEvent( | |
238 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, | |
239 NetLog::IntegerCallback("thread_number", executor()->thread_number())); | |
240 } | |
241 | |
242 // Runs on the worker thread. | |
243 void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) override { | |
244 ProxyResolver* resolver = executor()->resolver(); | |
245 int rv = resolver->GetProxyForURL( | |
246 url_, &results_buf_, CompletionCallback(), NULL, net_log_); | |
247 DCHECK_NE(rv, ERR_IO_PENDING); | |
248 | |
249 origin_loop->PostTask( | |
250 FROM_HERE, | |
251 base::Bind(&GetProxyForURLJob::QueryComplete, this, rv)); | |
252 } | |
253 | |
254 protected: | |
255 ~GetProxyForURLJob() override {} | |
256 | |
257 private: | |
258 // Runs the completion callback on the origin thread. | |
259 void QueryComplete(int result_code) { | |
260 // The Job may have been cancelled after it was started. | |
261 if (!was_cancelled()) { | |
262 if (result_code >= OK) { // Note: unit-tests use values > 0. | |
263 results_->Use(results_buf_); | |
264 } | |
265 RunUserCallback(result_code); | |
266 } | |
267 OnJobCompleted(); | |
268 } | |
269 | |
270 // Must only be used on the "origin" thread. | |
271 ProxyInfo* results_; | |
272 | |
273 // Can be used on either "origin" or worker thread. | |
274 BoundNetLog net_log_; | |
275 const GURL url_; | |
276 | |
277 // Usable from within DoQuery on the worker thread. | |
278 ProxyInfo results_buf_; | |
279 | |
280 bool was_waiting_for_thread_; | |
281 }; | |
282 | |
283 // MultiThreadedProxyResolver::Executor ---------------------------------------- | |
284 | |
285 MultiThreadedProxyResolver::Executor::Executor( | |
286 MultiThreadedProxyResolver* coordinator, | |
287 ProxyResolver* resolver, | |
288 int thread_number) | |
289 : coordinator_(coordinator), | |
290 thread_number_(thread_number), | |
291 resolver_(resolver) { | |
292 DCHECK(coordinator); | |
293 DCHECK(resolver); | |
294 // Start up the thread. | |
295 thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d", | |
296 thread_number))); | |
297 CHECK(thread_->Start()); | |
298 } | |
299 | |
300 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { | |
301 DCHECK(!outstanding_job_.get()); | |
302 outstanding_job_ = job; | |
303 | |
304 // Run the job. Once it has completed (regardless of whether it was | |
305 // cancelled), it will invoke OnJobCompleted() on this thread. | |
306 job->set_executor(this); | |
307 job->FinishedWaitingForThread(); | |
308 thread_->message_loop()->PostTask( | |
309 FROM_HERE, | |
310 base::Bind(&Job::Run, job, base::MessageLoopProxy::current())); | |
311 } | |
312 | |
313 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { | |
314 DCHECK_EQ(job, outstanding_job_.get()); | |
315 outstanding_job_ = NULL; | |
316 coordinator_->OnExecutorReady(this); | |
317 } | |
318 | |
319 void MultiThreadedProxyResolver::Executor::Destroy() { | |
320 DCHECK(coordinator_); | |
321 | |
322 { | |
323 // See http://crbug.com/69710. | |
324 base::ThreadRestrictions::ScopedAllowIO allow_io; | |
325 | |
326 // Join the worker thread. | |
327 thread_.reset(); | |
328 } | |
329 | |
330 // Cancel any outstanding job. | |
331 if (outstanding_job_.get()) { | |
332 outstanding_job_->Cancel(); | |
333 // Orphan the job (since this executor may be deleted soon). | |
334 outstanding_job_->set_executor(NULL); | |
335 } | |
336 | |
337 // It is now safe to free the ProxyResolver, since all the tasks that | |
338 // were using it on the resolver thread have completed. | |
339 resolver_.reset(); | |
340 | |
341 // Null some stuff as a precaution. | |
342 coordinator_ = NULL; | |
343 outstanding_job_ = NULL; | |
344 } | |
345 | |
346 MultiThreadedProxyResolver::Executor::~Executor() { | |
347 // The important cleanup happens as part of Destroy(), which should always be | |
348 // called first. | |
349 DCHECK(!coordinator_) << "Destroy() was not called"; | |
350 DCHECK(!thread_.get()); | |
351 DCHECK(!resolver_.get()); | |
352 DCHECK(!outstanding_job_.get()); | |
353 } | |
354 | |
355 // MultiThreadedProxyResolver -------------------------------------------------- | |
356 | |
357 MultiThreadedProxyResolver::MultiThreadedProxyResolver( | |
358 ProxyResolverFactory* resolver_factory, | |
359 size_t max_num_threads) | |
360 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), | |
361 resolver_factory_(resolver_factory), | |
362 max_num_threads_(max_num_threads) { | |
363 DCHECK_GE(max_num_threads, 1u); | |
364 } | |
365 | |
366 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { | |
367 // We will cancel all outstanding requests. | |
368 pending_jobs_.clear(); | |
369 ReleaseAllExecutors(); | |
370 } | |
371 | |
372 int MultiThreadedProxyResolver::GetProxyForURL( | |
373 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, | |
374 RequestHandle* request, const BoundNetLog& net_log) { | |
375 DCHECK(CalledOnValidThread()); | |
376 DCHECK(!callback.is_null()); | |
377 DCHECK(current_script_data_.get()) | |
378 << "Resolver is un-initialized. Must call SetPacScript() first!"; | |
379 | |
380 scoped_refptr<GetProxyForURLJob> job( | |
381 new GetProxyForURLJob(url, results, callback, net_log)); | |
382 | |
383 // Completion will be notified through |callback|, unless the caller cancels | |
384 // the request using |request|. | |
385 if (request) | |
386 *request = reinterpret_cast<RequestHandle>(job.get()); | |
387 | |
388 // If there is an executor that is ready to run this request, submit it! | |
389 Executor* executor = FindIdleExecutor(); | |
390 if (executor) { | |
391 DCHECK_EQ(0u, pending_jobs_.size()); | |
392 executor->StartJob(job.get()); | |
393 return ERR_IO_PENDING; | |
394 } | |
395 | |
396 // Otherwise queue this request. (We will schedule it to a thread once one | |
397 // becomes available). | |
398 job->WaitingForThread(); | |
399 pending_jobs_.push_back(job); | |
400 | |
401 // If we haven't already reached the thread limit, provision a new thread to | |
402 // drain the requests more quickly. | |
403 if (executors_.size() < max_num_threads_) { | |
404 executor = AddNewExecutor(); | |
405 executor->StartJob( | |
406 new SetPacScriptJob(current_script_data_, CompletionCallback())); | |
407 } | |
408 | |
409 return ERR_IO_PENDING; | |
410 } | |
411 | |
412 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { | |
413 DCHECK(CalledOnValidThread()); | |
414 DCHECK(req); | |
415 | |
416 Job* job = reinterpret_cast<Job*>(req); | |
417 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); | |
418 | |
419 if (job->executor()) { | |
420 // If the job was already submitted to the executor, just mark it | |
421 // as cancelled so the user callback isn't run on completion. | |
422 job->Cancel(); | |
423 } else { | |
424 // Otherwise the job is just sitting in a queue. | |
425 PendingJobsQueue::iterator it = | |
426 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); | |
427 DCHECK(it != pending_jobs_.end()); | |
428 pending_jobs_.erase(it); | |
429 } | |
430 } | |
431 | |
432 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { | |
433 DCHECK(CalledOnValidThread()); | |
434 DCHECK(req); | |
435 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; | |
436 } | |
437 | |
438 void MultiThreadedProxyResolver::CancelSetPacScript() { | |
439 DCHECK(CalledOnValidThread()); | |
440 DCHECK_EQ(0u, pending_jobs_.size()); | |
441 DCHECK_EQ(1u, executors_.size()); | |
442 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, | |
443 executors_[0]->outstanding_job()->type()); | |
444 | |
445 // Defensively clear some data which shouldn't be getting used | |
446 // anymore. | |
447 current_script_data_ = NULL; | |
448 | |
449 ReleaseAllExecutors(); | |
450 } | |
451 | |
452 int MultiThreadedProxyResolver::SetPacScript( | |
453 const scoped_refptr<ProxyResolverScriptData>& script_data, | |
454 const CompletionCallback&callback) { | |
455 DCHECK(CalledOnValidThread()); | |
456 DCHECK(!callback.is_null()); | |
457 | |
458 // Save the script details, so we can provision new executors later. | |
459 current_script_data_ = script_data; | |
460 | |
461 // The user should not have any outstanding requests when they call | |
462 // SetPacScript(). | |
463 CheckNoOutstandingUserRequests(); | |
464 | |
465 // Destroy all of the current threads and their proxy resolvers. | |
466 ReleaseAllExecutors(); | |
467 | |
468 // Provision a new executor, and run the SetPacScript request. On completion | |
469 // notification will be sent through |callback|. | |
470 Executor* executor = AddNewExecutor(); | |
471 executor->StartJob(new SetPacScriptJob(script_data, callback)); | |
472 return ERR_IO_PENDING; | |
473 } | |
474 | |
475 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { | |
476 DCHECK(CalledOnValidThread()); | |
477 CHECK_EQ(0u, pending_jobs_.size()); | |
478 | |
479 for (ExecutorList::const_iterator it = executors_.begin(); | |
480 it != executors_.end(); ++it) { | |
481 const Executor* executor = it->get(); | |
482 Job* job = executor->outstanding_job(); | |
483 // The "has_user_callback()" is to exclude jobs for which the callback | |
484 // has already been invoked, or was not user-initiated (as in the case of | |
485 // lazy thread provisions). User-initiated jobs may !has_user_callback() | |
486 // when the callback has already been run. (Since we only clear the | |
487 // outstanding job AFTER the callback has been invoked, it is possible | |
488 // for a new request to be started from within the callback). | |
489 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); | |
490 } | |
491 } | |
492 | |
493 void MultiThreadedProxyResolver::ReleaseAllExecutors() { | |
494 DCHECK(CalledOnValidThread()); | |
495 for (ExecutorList::iterator it = executors_.begin(); | |
496 it != executors_.end(); ++it) { | |
497 Executor* executor = it->get(); | |
498 executor->Destroy(); | |
499 } | |
500 executors_.clear(); | |
501 } | |
502 | |
503 MultiThreadedProxyResolver::Executor* | |
504 MultiThreadedProxyResolver::FindIdleExecutor() { | |
505 DCHECK(CalledOnValidThread()); | |
506 for (ExecutorList::iterator it = executors_.begin(); | |
507 it != executors_.end(); ++it) { | |
508 Executor* executor = it->get(); | |
509 if (!executor->outstanding_job()) | |
510 return executor; | |
511 } | |
512 return NULL; | |
513 } | |
514 | |
515 MultiThreadedProxyResolver::Executor* | |
516 MultiThreadedProxyResolver::AddNewExecutor() { | |
517 DCHECK(CalledOnValidThread()); | |
518 DCHECK_LT(executors_.size(), max_num_threads_); | |
519 // The "thread number" is used to give the thread a unique name. | |
520 int thread_number = executors_.size(); | |
521 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); | |
522 Executor* executor = new Executor( | |
523 this, resolver, thread_number); | |
524 executors_.push_back(make_scoped_refptr(executor)); | |
525 return executor; | |
526 } | |
527 | |
528 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { | |
529 DCHECK(CalledOnValidThread()); | |
530 if (pending_jobs_.empty()) | |
531 return; | |
532 | |
533 // Get the next job to process (FIFO). Transfer it from the pending queue | |
534 // to the executor. | |
535 scoped_refptr<Job> job = pending_jobs_.front(); | |
536 pending_jobs_.pop_front(); | |
537 executor->StartJob(job.get()); | |
538 } | |
539 | |
540 } // namespace net | |
OLD | NEW |