OLD | NEW |
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/proxy/single_threaded_proxy_resolver.h" | 5 #include "net/proxy/multi_threaded_proxy_resolver.h" |
6 | 6 |
7 #include "base/message_loop.h" | 7 #include "base/message_loop.h" |
| 8 #include "base/string_util.h" |
8 #include "base/thread.h" | 9 #include "base/thread.h" |
9 #include "net/base/capturing_net_log.h" | 10 #include "net/base/capturing_net_log.h" |
10 #include "net/base/net_errors.h" | 11 #include "net/base/net_errors.h" |
11 #include "net/proxy/proxy_info.h" | 12 #include "net/proxy/proxy_info.h" |
12 | 13 |
| 14 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script |
| 15 // data when SetPacScript fails. That will reclaim memory when |
| 16 // testing bogus scripts. |
| 17 |
13 namespace net { | 18 namespace net { |
14 | 19 |
15 namespace { | 20 namespace { |
16 | 21 |
17 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> { | 22 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> { |
18 public: | 23 public: |
19 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {} | 24 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {} |
20 void PurgeMemory() { resolver_->PurgeMemory(); } | 25 void PurgeMemory() { resolver_->PurgeMemory(); } |
21 private: | 26 private: |
22 friend class base::RefCountedThreadSafe<PurgeMemoryTask>; | 27 friend class base::RefCountedThreadSafe<PurgeMemoryTask>; |
23 ~PurgeMemoryTask() {} | 28 ~PurgeMemoryTask() {} |
24 ProxyResolver* resolver_; | 29 ProxyResolver* resolver_; |
25 }; | 30 }; |
26 | 31 |
27 } // namespace | 32 } // namespace |
28 | 33 |
29 // SingleThreadedProxyResolver::SetPacScriptTask ------------------------------ | 34 // An "executor" is a job-runner for PAC requests. It encapsulates a worker |
| 35 // thread and a synchronous ProxyResolver (which will be operated on said |
| 36 // thread.) |
| 37 class MultiThreadedProxyResolver::Executor |
| 38 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { |
| 39 public: |
| 40 // |coordinator| must remain valid throughout our lifetime. It is used to |
| 41 // signal when the executor is ready to receive work by calling |
| 42 // |coordinator->OnExecutorReady()|. |
| 43 // The constructor takes ownership of |resolver|. |
| 44 // |thread_number| is an identifier used when naming the worker thread. |
| 45 Executor(MultiThreadedProxyResolver* coordinator, |
| 46 ProxyResolver* resolver, |
| 47 int thread_number); |
| 48 |
| 49 // Submit a job to this executor. |
| 50 void StartJob(Job* job); |
| 51 |
| 52 // Callback for when a job has completed running on the executor's thread. |
| 53 void OnJobCompleted(Job* job); |
| 54 |
| 55 // Cleanup the executor. Cancels all outstanding work, and frees the thread |
| 56 // and resolver. |
| 57 void Destroy(); |
| 58 |
| 59 void PurgeMemory(); |
| 60 |
| 61 // Returns the outstanding job, or NULL. |
| 62 Job* outstanding_job() const { return outstanding_job_.get(); } |
| 63 |
| 64 ProxyResolver* resolver() { return resolver_.get(); } |
| 65 |
| 66 int thread_number() const { return thread_number_; } |
| 67 |
| 68 private: |
| 69 friend class base::RefCountedThreadSafe<Executor>; |
| 70 ~Executor(); |
| 71 |
| 72 MultiThreadedProxyResolver* coordinator_; |
| 73 const int thread_number_; |
| 74 |
| 75 // The currently active job for this executor (either a SetPacScript or |
| 76 // GetProxyForURL task). |
| 77 scoped_refptr<Job> outstanding_job_; |
| 78 |
| 79 // The synchronous resolver implementation. |
| 80 scoped_ptr<ProxyResolver> resolver_; |
| 81 |
| 82 // The thread where |resolver_| is run on. |
| 83 // Note that declaration ordering is important here. |thread_| needs to be |
| 84 // destroyed *before* |resolver_|, in case |resolver_| is currently |
| 85 // executing on |thread_|. |
| 86 scoped_ptr<base::Thread> thread_; |
| 87 }; |
| 88 |
| 89 // MultiThreadedProxyResolver::Job --------------------------------------------- |
| 90 |
| 91 class MultiThreadedProxyResolver::Job |
| 92 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { |
| 93 public: |
| 94 // Identifies the subclass of Job (only being used for debugging purposes). |
| 95 enum Type { |
| 96 TYPE_GET_PROXY_FOR_URL, |
| 97 TYPE_SET_PAC_SCRIPT, |
| 98 TYPE_SET_PAC_SCRIPT_INTERNAL, |
| 99 }; |
| 100 |
| 101 Job(Type type, CompletionCallback* user_callback) |
| 102 : type_(type), |
| 103 user_callback_(user_callback), |
| 104 executor_(NULL), |
| 105 was_cancelled_(false) { |
| 106 } |
| 107 |
| 108 void set_executor(Executor* executor) { |
| 109 executor_ = executor; |
| 110 } |
| 111 |
| 112 // The "executor" is the job runner that is scheduling this job. If |
| 113 // this job has not been submitted to an executor yet, this will be |
| 114 // NULL (and we know it hasn't started yet). |
| 115 Executor* executor() { |
| 116 return executor_; |
| 117 } |
| 118 |
| 119 // Mark the job as having been cancelled. |
| 120 void Cancel() { |
| 121 was_cancelled_ = true; |
| 122 } |
| 123 |
| 124 // Returns true if Cancel() has been called. |
| 125 bool was_cancelled() const { return was_cancelled_; } |
| 126 |
| 127 Type type() const { return type_; } |
| 128 |
| 129 // Returns true if this job still has a user callback. Some jobs |
| 130 // do not have a user callback, because they were helper jobs |
| 131 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). |
| 132 // |
| 133 // Otherwise jobs that correspond with user-initiated work will |
| 134 // have a non-NULL callback up until the callback is run. |
| 135 bool has_user_callback() const { return user_callback_ != NULL; } |
| 136 |
| 137 // This method is called when the job is inserted into a wait queue |
| 138 // because no executors were ready to accept it. |
| 139 virtual void WaitingForThread() {} |
| 140 |
| 141 // This method is called just before the job is posted to the work thread. |
| 142 virtual void FinishedWaitingForThread() {} |
| 143 |
| 144 // This method is called on the worker thread to do the job's work. On |
| 145 // completion, implementors are expected to call OnJobCompleted() on |
| 146 // |origin_loop|. |
| 147 virtual void Run(MessageLoop* origin_loop) = 0; |
| 148 |
| 149 protected: |
| 150 void OnJobCompleted() { |
| 151 // |executor_| will be NULL if the executor has already been deleted. |
| 152 if (executor_) |
| 153 executor_->OnJobCompleted(this); |
| 154 } |
| 155 |
| 156 void RunUserCallback(int result) { |
| 157 DCHECK(has_user_callback()); |
| 158 CompletionCallback* callback = user_callback_; |
| 159 // Null the callback so has_user_callback() will now return false. |
| 160 user_callback_ = NULL; |
| 161 callback->Run(result); |
| 162 } |
| 163 |
| 164 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; |
| 165 |
| 166 virtual ~Job() {} |
| 167 |
| 168 private: |
| 169 const Type type_; |
| 170 CompletionCallback* user_callback_; |
| 171 Executor* executor_; |
| 172 bool was_cancelled_; |
| 173 }; |
| 174 |
| 175 // MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- |
30 | 176 |
31 // Runs on the worker thread to call ProxyResolver::SetPacScript. | 177 // Runs on the worker thread to call ProxyResolver::SetPacScript. |
32 class SingleThreadedProxyResolver::SetPacScriptTask | 178 class MultiThreadedProxyResolver::SetPacScriptJob |
33 : public base::RefCountedThreadSafe< | 179 : public MultiThreadedProxyResolver::Job { |
34 SingleThreadedProxyResolver::SetPacScriptTask> { | |
35 public: | 180 public: |
36 SetPacScriptTask(SingleThreadedProxyResolver* coordinator, | 181 SetPacScriptJob(const GURL& pac_url, |
37 const GURL& pac_url, | 182 const string16& pac_script, |
38 const string16& pac_script, | 183 CompletionCallback* callback) |
39 CompletionCallback* callback) | 184 : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL, |
40 : coordinator_(coordinator), | 185 callback), |
41 callback_(callback), | |
42 pac_script_(pac_script), | |
43 pac_url_(pac_url), | 186 pac_url_(pac_url), |
44 origin_loop_(MessageLoop::current()) { | 187 pac_script_(pac_script) { |
45 DCHECK(callback); | 188 } |
46 } | |
47 | |
48 // Start the SetPacScript request on the worker thread. | |
49 void Start() { | |
50 coordinator_->thread()->message_loop()->PostTask( | |
51 FROM_HERE, NewRunnableMethod(this, &SetPacScriptTask::DoRequest, | |
52 coordinator_->resolver_.get())); | |
53 } | |
54 | |
55 void Cancel() { | |
56 // Clear these to inform RequestComplete that it should not try to | |
57 // access them. | |
58 coordinator_ = NULL; | |
59 callback_ = NULL; | |
60 } | |
61 | |
62 // Returns true if Cancel() has been called. | |
63 bool was_cancelled() const { return callback_ == NULL; } | |
64 | |
65 private: | |
66 friend class base::RefCountedThreadSafe< | |
67 SingleThreadedProxyResolver::SetPacScriptTask>; | |
68 | |
69 ~SetPacScriptTask() {} | |
70 | 189 |
71 // Runs on the worker thread. | 190 // Runs on the worker thread. |
72 void DoRequest(ProxyResolver* resolver) { | 191 virtual void Run(MessageLoop* origin_loop) { |
| 192 ProxyResolver* resolver = executor()->resolver(); |
73 int rv = resolver->expects_pac_bytes() ? | 193 int rv = resolver->expects_pac_bytes() ? |
74 resolver->SetPacScriptByData(pac_script_, NULL) : | 194 resolver->SetPacScriptByData(pac_script_, NULL) : |
75 resolver->SetPacScriptByUrl(pac_url_, NULL); | 195 resolver->SetPacScriptByUrl(pac_url_, NULL); |
76 | 196 |
77 DCHECK_NE(rv, ERR_IO_PENDING); | 197 DCHECK_NE(rv, ERR_IO_PENDING); |
78 origin_loop_->PostTask(FROM_HERE, | 198 origin_loop->PostTask( |
79 NewRunnableMethod(this, &SetPacScriptTask::RequestComplete, rv)); | 199 FROM_HERE, |
80 } | 200 NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv)); |
81 | 201 } |
| 202 |
| 203 private: |
82 // Runs the completion callback on the origin thread. | 204 // Runs the completion callback on the origin thread. |
83 void RequestComplete(int result_code) { | 205 void RequestComplete(int result_code) { |
84 // The task may have been cancelled after it was started. | 206 // The task may have been cancelled after it was started. |
85 if (!was_cancelled()) { | 207 if (!was_cancelled() && has_user_callback()) { |
86 CompletionCallback* callback = callback_; | 208 RunUserCallback(result_code); |
87 coordinator_->RemoveOutstandingSetPacScriptTask(this); | |
88 callback->Run(result_code); | |
89 } | 209 } |
90 } | 210 OnJobCompleted(); |
91 | 211 } |
92 // Must only be used on the "origin" thread. | 212 |
93 SingleThreadedProxyResolver* coordinator_; | 213 const GURL pac_url_; |
94 CompletionCallback* callback_; | 214 const string16 pac_script_; |
95 string16 pac_script_; | |
96 GURL pac_url_; | |
97 | |
98 // Usable from within DoQuery on the worker thread. | |
99 MessageLoop* origin_loop_; | |
100 }; | 215 }; |
101 | 216 |
102 // SingleThreadedProxyResolver::Job ------------------------------------------- | 217 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ |
103 | 218 |
104 class SingleThreadedProxyResolver::Job | 219 class MultiThreadedProxyResolver::GetProxyForURLJob |
105 : public base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job> { | 220 : public MultiThreadedProxyResolver::Job { |
106 public: | 221 public: |
107 // |coordinator| -- the SingleThreadedProxyResolver that owns this job. | |
108 // |url| -- the URL of the query. | 222 // |url| -- the URL of the query. |
109 // |results| -- the structure to fill with proxy resolve results. | 223 // |results| -- the structure to fill with proxy resolve results. |
110 Job(SingleThreadedProxyResolver* coordinator, | 224 GetProxyForURLJob(const GURL& url, |
111 const GURL& url, | 225 ProxyInfo* results, |
112 ProxyInfo* results, | 226 CompletionCallback* callback, |
113 CompletionCallback* callback, | 227 const BoundNetLog& net_log) |
114 const BoundNetLog& net_log) | 228 : Job(TYPE_GET_PROXY_FOR_URL, callback), |
115 : coordinator_(coordinator), | 229 results_(results), |
116 callback_(callback), | 230 net_log_(net_log), |
117 results_(results), | 231 url_(url), |
118 net_log_(net_log), | 232 was_waiting_for_thread_(false) { |
119 url_(url), | |
120 is_started_(false), | |
121 origin_loop_(MessageLoop::current()) { | |
122 DCHECK(callback); | 233 DCHECK(callback); |
123 } | 234 } |
124 | 235 |
125 // Start the resolve proxy request on the worker thread. | |
126 void Start() { | |
127 is_started_ = true; | |
128 | |
129 size_t load_log_bound = 100; | |
130 | |
131 coordinator_->thread()->message_loop()->PostTask( | |
132 FROM_HERE, NewRunnableMethod(this, &Job::DoQuery, | |
133 coordinator_->resolver_.get(), | |
134 load_log_bound)); | |
135 } | |
136 | |
137 bool is_started() const { return is_started_; } | |
138 | |
139 void Cancel() { | |
140 // Clear these to inform QueryComplete that it should not try to | |
141 // access them. | |
142 coordinator_ = NULL; | |
143 callback_ = NULL; | |
144 results_ = NULL; | |
145 } | |
146 | |
147 // Returns true if Cancel() has been called. | |
148 bool was_cancelled() const { return callback_ == NULL; } | |
149 | |
150 BoundNetLog* net_log() { return &net_log_; } | 236 BoundNetLog* net_log() { return &net_log_; } |
151 | 237 |
| 238 virtual void WaitingForThread() { |
| 239 was_waiting_for_thread_ = true; |
| 240 net_log_.BeginEvent( |
| 241 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); |
| 242 } |
| 243 |
| 244 virtual void FinishedWaitingForThread() { |
| 245 DCHECK(executor()); |
| 246 |
| 247 if (was_waiting_for_thread_) { |
| 248 net_log_.EndEvent( |
| 249 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); |
| 250 } |
| 251 |
| 252 net_log_.AddEvent( |
| 253 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, |
| 254 new NetLogIntegerParameter( |
| 255 "thread_number", executor()->thread_number())); |
| 256 } |
| 257 |
| 258 // Runs on the worker thread. |
| 259 virtual void Run(MessageLoop* origin_loop) { |
| 260 const size_t kNetLogBound = 50u; |
| 261 worker_log_.reset(new CapturingNetLog(kNetLogBound)); |
| 262 BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get()); |
| 263 |
| 264 ProxyResolver* resolver = executor()->resolver(); |
| 265 int rv = resolver->GetProxyForURL( |
| 266 url_, &results_buf_, NULL, NULL, bound_worker_log); |
| 267 DCHECK_NE(rv, ERR_IO_PENDING); |
| 268 |
| 269 origin_loop->PostTask( |
| 270 FROM_HERE, |
| 271 NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv)); |
| 272 } |
| 273 |
152 private: | 274 private: |
153 friend class base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job>; | |
154 | |
155 ~Job() {} | |
156 | |
157 // Runs on the worker thread. | |
158 void DoQuery(ProxyResolver* resolver, size_t load_log_bound) { | |
159 worker_log_.reset(new CapturingNetLog(load_log_bound)); | |
160 BoundNetLog bound_worker_log(NetLog::Source(), worker_log_.get()); | |
161 | |
162 int rv = resolver->GetProxyForURL(url_, &results_buf_, NULL, NULL, | |
163 bound_worker_log); | |
164 DCHECK_NE(rv, ERR_IO_PENDING); | |
165 | |
166 origin_loop_->PostTask(FROM_HERE, | |
167 NewRunnableMethod(this, &Job::QueryComplete, rv)); | |
168 } | |
169 | |
170 // Runs the completion callback on the origin thread. | 275 // Runs the completion callback on the origin thread. |
171 void QueryComplete(int result_code) { | 276 void QueryComplete(int result_code) { |
172 // Merge the load log that was generated on the worker thread, into the | |
173 // main log. | |
174 CapturingBoundNetLog bound_worker_log(NetLog::Source(), | |
175 worker_log_.release()); | |
176 bound_worker_log.AppendTo(net_log_); | |
177 | |
178 // The Job may have been cancelled after it was started. | 277 // The Job may have been cancelled after it was started. |
179 if (!was_cancelled()) { | 278 if (!was_cancelled()) { |
| 279 // Merge the load log that was generated on the worker thread, into the |
| 280 // main log. |
| 281 CapturingBoundNetLog bound_worker_log(NetLog::Source(), |
| 282 worker_log_.release()); |
| 283 bound_worker_log.AppendTo(net_log_); |
| 284 |
180 if (result_code >= OK) { // Note: unit-tests use values > 0. | 285 if (result_code >= OK) { // Note: unit-tests use values > 0. |
181 results_->Use(results_buf_); | 286 results_->Use(results_buf_); |
182 } | 287 } |
183 callback_->Run(result_code); | 288 RunUserCallback(result_code); |
184 | |
185 // We check for cancellation once again, in case the callback deleted | |
186 // the owning ProxyService (whose destructor will in turn cancel us). | |
187 if (!was_cancelled()) | |
188 coordinator_->RemoveFrontOfJobsQueueAndStartNext(this); | |
189 } | 289 } |
| 290 OnJobCompleted(); |
190 } | 291 } |
191 | 292 |
192 // Must only be used on the "origin" thread. | 293 // Must only be used on the "origin" thread. |
193 SingleThreadedProxyResolver* coordinator_; | |
194 CompletionCallback* callback_; | |
195 ProxyInfo* results_; | 294 ProxyInfo* results_; |
196 BoundNetLog net_log_; | 295 BoundNetLog net_log_; |
197 GURL url_; | 296 const GURL url_; |
198 bool is_started_; | |
199 | 297 |
200 // Usable from within DoQuery on the worker thread. | 298 // Usable from within DoQuery on the worker thread. |
201 ProxyInfo results_buf_; | 299 ProxyInfo results_buf_; |
202 MessageLoop* origin_loop_; | |
203 | 300 |
204 // Used to pass the captured events between DoQuery [worker thread] and | 301 // Used to pass the captured events between DoQuery [worker thread] and |
205 // QueryComplete [origin thread]. | 302 // QueryComplete [origin thread]. |
206 scoped_ptr<CapturingNetLog> worker_log_; | 303 scoped_ptr<CapturingNetLog> worker_log_; |
| 304 |
| 305 bool was_waiting_for_thread_; |
207 }; | 306 }; |
208 | 307 |
209 // SingleThreadedProxyResolver ------------------------------------------------ | 308 // MultiThreadedProxyResolver::Executor ---------------------------------------- |
210 | 309 |
211 SingleThreadedProxyResolver::SingleThreadedProxyResolver( | 310 MultiThreadedProxyResolver::Executor::Executor( |
212 ProxyResolver* resolver) | 311 MultiThreadedProxyResolver* coordinator, |
213 : ProxyResolver(resolver->expects_pac_bytes()), | 312 ProxyResolver* resolver, |
| 313 int thread_number) |
| 314 : coordinator_(coordinator), |
| 315 thread_number_(thread_number), |
214 resolver_(resolver) { | 316 resolver_(resolver) { |
215 } | 317 DCHECK(coordinator); |
216 | 318 DCHECK(resolver); |
217 SingleThreadedProxyResolver::~SingleThreadedProxyResolver() { | 319 // Start up the thread. |
218 // Cancel the inprogress job (if any), and free the rest. | 320 // Note that it is safe to pass a temporary C-String to Thread(), as it will |
219 for (PendingJobsQueue::iterator it = pending_jobs_.begin(); | 321 // make a copy. |
220 it != pending_jobs_.end(); | 322 std::string thread_name = |
221 ++it) { | 323 StringPrintf("PAC thread #%d", thread_number); |
222 (*it)->Cancel(); | 324 thread_.reset(new base::Thread(thread_name.c_str())); |
223 } | 325 thread_->Start(); |
224 | 326 } |
225 if (outstanding_set_pac_script_task_) | 327 |
226 outstanding_set_pac_script_task_->Cancel(); | 328 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { |
227 | 329 DCHECK(!outstanding_job_); |
228 // Note that |thread_| is destroyed before |resolver_|. This is important | 330 outstanding_job_ = job; |
229 // since |resolver_| could be running on |thread_|. | 331 |
230 } | 332 // Run the job. Once it has completed (regardless of whether it was |
231 | 333 // cancelled), it will invoke OnJobCompleted() on this thread. |
232 int SingleThreadedProxyResolver::GetProxyForURL(const GURL& url, | 334 job->set_executor(this); |
233 ProxyInfo* results, | 335 job->FinishedWaitingForThread(); |
234 CompletionCallback* callback, | 336 thread_->message_loop()->PostTask( |
235 RequestHandle* request, | 337 FROM_HERE, |
236 const BoundNetLog& net_log) { | 338 NewRunnableMethod(job, &Job::Run, MessageLoop::current())); |
| 339 } |
| 340 |
| 341 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { |
| 342 DCHECK_EQ(job, outstanding_job_.get()); |
| 343 outstanding_job_ = NULL; |
| 344 coordinator_->OnExecutorReady(this); |
| 345 } |
| 346 |
| 347 void MultiThreadedProxyResolver::Executor::Destroy() { |
| 348 DCHECK(coordinator_); |
| 349 |
| 350 // Give the resolver an opportunity to shutdown from THIS THREAD before |
| 351 // joining on the resolver thread. This allows certain implementations |
| 352 // to avoid deadlocks. |
| 353 resolver_->Shutdown(); |
| 354 |
| 355 // Join the worker thread. |
| 356 thread_.reset(); |
| 357 |
| 358 // Cancel any outstanding job. |
| 359 if (outstanding_job_) { |
| 360 outstanding_job_->Cancel(); |
| 361 // Orphan the job (since this executor may be deleted soon). |
| 362 outstanding_job_->set_executor(NULL); |
| 363 } |
| 364 |
| 365 // It is now safe to free the ProxyResolver, since all the tasks that |
| 366 // were using it on the resolver thread have completed. |
| 367 resolver_.reset(); |
| 368 |
| 369 // Null some stuff as a precaution. |
| 370 coordinator_ = NULL; |
| 371 outstanding_job_ = NULL; |
| 372 } |
| 373 |
| 374 void MultiThreadedProxyResolver::Executor::PurgeMemory() { |
| 375 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); |
| 376 thread_->message_loop()->PostTask( |
| 377 FROM_HERE, |
| 378 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); |
| 379 } |
| 380 |
| 381 MultiThreadedProxyResolver::Executor::~Executor() { |
| 382 // The important cleanup happens as part of Destroy(), which should always be |
| 383 // called first. |
| 384 DCHECK(!coordinator_) << "Destroy() was not called"; |
| 385 DCHECK(!thread_.get()); |
| 386 DCHECK(!resolver_.get()); |
| 387 DCHECK(!outstanding_job_); |
| 388 } |
| 389 |
| 390 // MultiThreadedProxyResolver -------------------------------------------------- |
| 391 |
| 392 MultiThreadedProxyResolver::MultiThreadedProxyResolver( |
| 393 ProxyResolverFactory* resolver_factory, |
| 394 size_t max_num_threads) |
| 395 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), |
| 396 resolver_factory_(resolver_factory), |
| 397 max_num_threads_(max_num_threads), |
| 398 was_set_pac_script_called_(false) { |
| 399 DCHECK_GE(max_num_threads, 1u); |
| 400 } |
| 401 |
| 402 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { |
| 403 // We will cancel all outstanding requests. |
| 404 pending_jobs_.clear(); |
| 405 ReleaseAllExecutors(); |
| 406 } |
| 407 |
| 408 int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url, |
| 409 ProxyInfo* results, |
| 410 CompletionCallback* callback, |
| 411 RequestHandle* request, |
| 412 const BoundNetLog& net_log) { |
| 413 DCHECK(CalledOnValidThread()); |
237 DCHECK(callback); | 414 DCHECK(callback); |
238 | 415 DCHECK(was_set_pac_script_called_) |
239 scoped_refptr<Job> job = new Job(this, url, results, callback, net_log); | 416 << "Resolver is un-initialized. Must call SetPacScript() first!"; |
240 bool is_first_job = pending_jobs_.empty(); | 417 |
241 pending_jobs_.push_back(job); // Jobs can never finish synchronously. | 418 scoped_refptr<GetProxyForURLJob> job = |
242 | 419 new GetProxyForURLJob(url, results, callback, net_log); |
243 if (is_first_job) { | |
244 // If there is nothing already running, start the job now. | |
245 EnsureThreadStarted(); | |
246 job->Start(); | |
247 } else { | |
248 // Otherwise the job will get started eventually by ProcessPendingJobs(). | |
249 job->net_log()->BeginEvent( | |
250 NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL); | |
251 } | |
252 | 420 |
253 // Completion will be notified through |callback|, unless the caller cancels | 421 // Completion will be notified through |callback|, unless the caller cancels |
254 // the request using |request|. | 422 // the request using |request|. |
255 if (request) | 423 if (request) |
256 *request = reinterpret_cast<RequestHandle>(job.get()); | 424 *request = reinterpret_cast<RequestHandle>(job.get()); |
257 | 425 |
| 426 // If there is an executor that is ready to run this request, submit it! |
| 427 Executor* executor = FindIdleExecutor(); |
| 428 if (executor) { |
| 429 DCHECK_EQ(0u, pending_jobs_.size()); |
| 430 executor->StartJob(job); |
| 431 return ERR_IO_PENDING; |
| 432 } |
| 433 |
| 434 // Otherwise queue this request. (We will schedule it to a thread once one |
| 435 // becomes available). |
| 436 job->WaitingForThread(); |
| 437 pending_jobs_.push_back(job); |
| 438 |
| 439 // If we haven't already reached the thread limit, provision a new thread to |
| 440 // drain the requests more quickly. |
| 441 if (executors_.size() < max_num_threads_) { |
| 442 executor = AddNewExecutor(); |
| 443 executor->StartJob( |
| 444 new SetPacScriptJob(current_pac_url_, current_pac_script_, NULL)); |
| 445 } |
| 446 |
258 return ERR_IO_PENDING; | 447 return ERR_IO_PENDING; |
259 } | 448 } |
260 | 449 |
261 // There are three states of the request we need to handle: | 450 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { |
262 // (1) Not started (just sitting in the queue). | 451 DCHECK(CalledOnValidThread()); |
263 // (2) Executing Job::DoQuery in the worker thread. | |
264 // (3) Waiting for Job::QueryComplete to be run on the origin thread. | |
265 void SingleThreadedProxyResolver::CancelRequest(RequestHandle req) { | |
266 DCHECK(req); | 452 DCHECK(req); |
267 | 453 |
268 Job* job = reinterpret_cast<Job*>(req); | 454 Job* job = reinterpret_cast<Job*>(req); |
269 | 455 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); |
270 bool is_active_job = job->is_started() && !pending_jobs_.empty() && | 456 |
271 pending_jobs_.front().get() == job; | 457 if (job->executor()) { |
272 | 458 // If the job was already submitted to the executor, just mark it |
273 job->Cancel(); | 459 // as cancelled so the user callback isn't run on completion. |
274 | 460 job->Cancel(); |
275 if (is_active_job) { | 461 } else { |
276 RemoveFrontOfJobsQueueAndStartNext(job); | 462 // Otherwise the job is just sitting in a queue. |
277 return; | 463 PendingJobsQueue::iterator it = |
278 } | 464 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); |
279 | 465 DCHECK(it != pending_jobs_.end()); |
280 // Otherwise just delete the job from the queue. | 466 pending_jobs_.erase(it); |
281 PendingJobsQueue::iterator it = std::find( | 467 } |
282 pending_jobs_.begin(), pending_jobs_.end(), job); | 468 } |
283 DCHECK(it != pending_jobs_.end()); | 469 |
284 pending_jobs_.erase(it); | 470 void MultiThreadedProxyResolver::CancelSetPacScript() { |
285 } | 471 DCHECK(CalledOnValidThread()); |
286 | 472 DCHECK_EQ(0u, pending_jobs_.size()); |
287 void SingleThreadedProxyResolver::CancelSetPacScript() { | 473 DCHECK_EQ(1u, executors_.size()); |
288 DCHECK(outstanding_set_pac_script_task_); | 474 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, |
289 outstanding_set_pac_script_task_->Cancel(); | 475 executors_[0]->outstanding_job()->type()); |
290 outstanding_set_pac_script_task_ = NULL; | 476 |
291 } | 477 // Defensively clear some data which shouldn't be getting used |
292 | 478 // anymore. |
293 void SingleThreadedProxyResolver::PurgeMemory() { | 479 was_set_pac_script_called_ = false; |
294 if (thread_.get()) { | 480 current_pac_url_ = GURL(); |
295 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); | 481 current_pac_script_ = string16(); |
296 thread_->message_loop()->PostTask(FROM_HERE, | 482 |
297 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); | 483 ReleaseAllExecutors(); |
298 } | 484 } |
299 } | 485 |
300 | 486 void MultiThreadedProxyResolver::PurgeMemory() { |
301 int SingleThreadedProxyResolver::SetPacScript( | 487 DCHECK(CalledOnValidThread()); |
| 488 for (ExecutorList::iterator it = executors_.begin(); |
| 489 it != executors_.end(); ++it) { |
| 490 Executor* executor = *it; |
| 491 executor->PurgeMemory(); |
| 492 } |
| 493 } |
| 494 |
| 495 int MultiThreadedProxyResolver::SetPacScript( |
302 const GURL& pac_url, | 496 const GURL& pac_url, |
303 const string16& pac_script, | 497 const string16& pac_script, |
304 CompletionCallback* callback) { | 498 CompletionCallback* callback) { |
305 EnsureThreadStarted(); | 499 DCHECK(CalledOnValidThread()); |
306 DCHECK(!outstanding_set_pac_script_task_); | 500 DCHECK(callback); |
307 | 501 |
308 SetPacScriptTask* task = new SetPacScriptTask( | 502 // Save the script details, so we can provision new executors later. |
309 this, pac_url, pac_script, callback); | 503 // (We rely on internal reference counting of strings to avoid this memory |
310 outstanding_set_pac_script_task_ = task; | 504 // being duplicated by each of the resolver threads). |
311 task->Start(); | 505 was_set_pac_script_called_ = true; |
| 506 current_pac_url_ = pac_url; |
| 507 current_pac_script_ = pac_script; |
| 508 |
| 509 // The user should not have any outstanding requests when they call |
| 510 // SetPacScript(). |
| 511 CheckNoOutstandingUserRequests(); |
| 512 |
| 513 // Destroy all of the current threads and their proxy resolvers. |
| 514 ReleaseAllExecutors(); |
| 515 |
| 516 // Provision a new executor, and run the SetPacScript request. On completion |
| 517 // notification will be sent through |callback|. |
| 518 Executor* executor = AddNewExecutor(); |
| 519 executor->StartJob(new SetPacScriptJob(pac_url, pac_script, callback)); |
312 return ERR_IO_PENDING; | 520 return ERR_IO_PENDING; |
313 } | 521 } |
314 | 522 |
315 void SingleThreadedProxyResolver::EnsureThreadStarted() { | 523 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { |
316 if (!thread_.get()) { | 524 DCHECK(CalledOnValidThread()); |
317 thread_.reset(new base::Thread("pac-thread")); | 525 CHECK_EQ(0u, pending_jobs_.size()); |
318 thread_->Start(); | 526 |
319 } | 527 for (ExecutorList::const_iterator it = executors_.begin(); |
320 } | 528 it != executors_.end(); ++it) { |
321 | 529 const Executor* executor = *it; |
322 void SingleThreadedProxyResolver::ProcessPendingJobs() { | 530 Job* job = executor->outstanding_job(); |
| 531 // The "has_user_callback()" is to exclude jobs for which the callback |
| 532 // has already been invoked, or was not user-initiated (as in the case of |
| 533 // lazy thread provisions). User-initiated jobs may !has_user_callback() |
| 534 // when the callback has already been run. (Since we only clear the |
| 535 // outstanding job AFTER the callback has been invoked, it is possible |
| 536 // for a new request to be started from within the callback). |
| 537 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); |
| 538 } |
| 539 } |
| 540 |
| 541 void MultiThreadedProxyResolver::ReleaseAllExecutors() { |
| 542 DCHECK(CalledOnValidThread()); |
| 543 for (ExecutorList::iterator it = executors_.begin(); |
| 544 it != executors_.end(); ++it) { |
| 545 Executor* executor = *it; |
| 546 executor->Destroy(); |
| 547 } |
| 548 executors_.clear(); |
| 549 } |
| 550 |
| 551 MultiThreadedProxyResolver::Executor* |
| 552 MultiThreadedProxyResolver::FindIdleExecutor() { |
| 553 DCHECK(CalledOnValidThread()); |
| 554 for (ExecutorList::iterator it = executors_.begin(); |
| 555 it != executors_.end(); ++it) { |
| 556 Executor* executor = *it; |
| 557 if (!executor->outstanding_job()) |
| 558 return executor; |
| 559 } |
| 560 return NULL; |
| 561 } |
| 562 |
| 563 MultiThreadedProxyResolver::Executor* |
| 564 MultiThreadedProxyResolver::AddNewExecutor() { |
| 565 DCHECK(CalledOnValidThread()); |
| 566 DCHECK_LT(executors_.size(), max_num_threads_); |
| 567 // The "thread number" is used to give the thread a unique name. |
| 568 int thread_number = executors_.size(); |
| 569 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); |
| 570 Executor* executor = new Executor( |
| 571 this, resolver, thread_number); |
| 572 executors_.push_back(executor); |
| 573 return executor; |
| 574 } |
| 575 |
| 576 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { |
| 577 DCHECK(CalledOnValidThread()); |
323 if (pending_jobs_.empty()) | 578 if (pending_jobs_.empty()) |
324 return; | 579 return; |
325 | 580 |
326 // Get the next job to process (FIFO). | 581 // Get the next job to process (FIFO). Transfer it from the pending queue |
327 Job* job = pending_jobs_.front().get(); | 582 // to the executor. |
328 if (job->is_started()) | 583 scoped_refptr<Job> job = pending_jobs_.front(); |
329 return; | |
330 | |
331 job->net_log()->EndEvent( | |
332 NetLog::TYPE_WAITING_FOR_SINGLE_PROXY_RESOLVER_THREAD, NULL); | |
333 | |
334 EnsureThreadStarted(); | |
335 job->Start(); | |
336 } | |
337 | |
338 void SingleThreadedProxyResolver::RemoveFrontOfJobsQueueAndStartNext( | |
339 Job* expected_job) { | |
340 DCHECK_EQ(expected_job, pending_jobs_.front().get()); | |
341 pending_jobs_.pop_front(); | 584 pending_jobs_.pop_front(); |
342 | 585 executor->StartJob(job); |
343 // Start next work item. | |
344 ProcessPendingJobs(); | |
345 } | |
346 | |
347 void SingleThreadedProxyResolver::RemoveOutstandingSetPacScriptTask( | |
348 SetPacScriptTask* task) { | |
349 DCHECK_EQ(outstanding_set_pac_script_task_.get(), task); | |
350 outstanding_set_pac_script_task_ = NULL; | |
351 } | 586 } |
352 | 587 |
353 } // namespace net | 588 } // namespace net |
OLD | NEW |