Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(77)

Side by Side Diff: net/proxy/multi_threaded_proxy_resolver.cc

Issue 1439053002: Change ProxyResolver::GetProxyForURL() to take a scoped_ptr<Request>* rather than a RequestHandle* (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: ToT Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/proxy/multi_threaded_proxy_resolver.h" 5 #include "net/proxy/multi_threaded_proxy_resolver.h"
6 6
7 #include <deque> 7 #include <deque>
8 #include <utility> 8 #include <utility>
9 #include <vector> 9 #include <vector>
10 10
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
110 size_t max_num_threads, 110 size_t max_num_threads,
111 const scoped_refptr<ProxyResolverScriptData>& script_data, 111 const scoped_refptr<ProxyResolverScriptData>& script_data,
112 scoped_refptr<Executor> executor); 112 scoped_refptr<Executor> executor);
113 113
114 ~MultiThreadedProxyResolver() override; 114 ~MultiThreadedProxyResolver() override;
115 115
116 // ProxyResolver implementation: 116 // ProxyResolver implementation:
117 int GetProxyForURL(const GURL& url, 117 int GetProxyForURL(const GURL& url,
118 ProxyInfo* results, 118 ProxyInfo* results,
119 const CompletionCallback& callback, 119 const CompletionCallback& callback,
120 RequestHandle* request, 120 scoped_ptr<Request>* request,
121 const BoundNetLog& net_log) override; 121 const BoundNetLog& net_log) override;
122 void CancelRequest(RequestHandle request) override; 122
123 LoadState GetLoadState(RequestHandle request) const override; 123 void RemovePendingJob(base::WeakPtr<Job> job) {
eroman 2016/01/23 02:17:01 I believe this can be deleted based on my later su
124 PendingJobsQueue::iterator it =
125 std::find(pending_jobs_.begin(), pending_jobs_.end(), job.get());
126 DCHECK(it != pending_jobs_.end());
127 pending_jobs_.erase(it);
128 }
124 129
125 private: 130 private:
126 class GetProxyForURLJob; 131 class GetProxyForURLJob;
132 class RequestImpl;
127 // FIFO queue of pending jobs waiting to be started. 133 // FIFO queue of pending jobs waiting to be started.
128 // TODO(eroman): Make this priority queue. 134 // TODO(eroman): Make this priority queue.
129 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue; 135 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue;
130 typedef std::vector<scoped_refptr<Executor>> ExecutorList; 136 typedef std::vector<scoped_refptr<Executor>> ExecutorList;
131 137
132 // Returns an idle worker thread which is ready to receive GetProxyForURL() 138 // Returns an idle worker thread which is ready to receive GetProxyForURL()
133 // requests. If all threads are occupied, returns NULL. 139 // requests. If all threads are occupied, returns NULL.
134 Executor* FindIdleExecutor(); 140 Executor* FindIdleExecutor();
135 141
136 // Creates a new worker thread, and appends it to |executors_|. 142 // Creates a new worker thread, and appends it to |executors_|.
137 void AddNewExecutor(); 143 void AddNewExecutor();
138 144
139 // Starts the next job from |pending_jobs_| if possible. 145 // Starts the next job from |pending_jobs_| if possible.
140 void OnExecutorReady(Executor* executor) override; 146 void OnExecutorReady(Executor* executor) override;
141 147
142 const scoped_ptr<ProxyResolverFactory> resolver_factory_; 148 const scoped_ptr<ProxyResolverFactory> resolver_factory_;
143 const size_t max_num_threads_; 149 const size_t max_num_threads_;
144 PendingJobsQueue pending_jobs_; 150 PendingJobsQueue pending_jobs_;
145 ExecutorList executors_; 151 ExecutorList executors_;
146 scoped_refptr<ProxyResolverScriptData> script_data_; 152 scoped_refptr<ProxyResolverScriptData> script_data_;
147 }; 153 };
148 154
149 // Job --------------------------------------------- 155 // Job ---------------------------------------------
150 156
151 class Job : public base::RefCountedThreadSafe<Job> { 157 class Job : public base::SupportsWeakPtr<Job>,
eroman 2016/01/23 02:17:01 I don't believe this should expose a weak pointer.
158 public base::RefCountedThreadSafe<Job> {
152 public: 159 public:
153 // Identifies the subclass of Job (only being used for debugging purposes). 160 // Identifies the subclass of Job (only being used for debugging purposes).
154 enum Type { 161 enum Type {
155 TYPE_GET_PROXY_FOR_URL, 162 TYPE_GET_PROXY_FOR_URL,
156 TYPE_CREATE_RESOLVER, 163 TYPE_CREATE_RESOLVER,
157 }; 164 };
158 165
159 Job(Type type, const CompletionCallback& callback) 166 Job(Type type, const CompletionCallback& callback)
160 : type_(type), 167 : type_(type),
161 callback_(callback), 168 callback_(callback),
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
224 231
225 virtual ~Job() {} 232 virtual ~Job() {}
226 233
227 private: 234 private:
228 const Type type_; 235 const Type type_;
229 CompletionCallback callback_; 236 CompletionCallback callback_;
230 Executor* executor_; 237 Executor* executor_;
231 bool was_cancelled_; 238 bool was_cancelled_;
232 }; 239 };
233 240
241 class MultiThreadedProxyResolver::RequestImpl : public ProxyResolver::Request {
242 public:
243 RequestImpl(base::WeakPtr<Job> job, MultiThreadedProxyResolver* resolver)
eroman 2016/01/23 02:17:01 Rather than a WeakPtr, can this instead just hold
244 : job_(job), resolver_(resolver) {}
245
246 ~RequestImpl() override {
eroman 2016/01/23 02:17:01 We can simplify this to a single line: job_->Canc
247 DCHECK(resolver_->CalledOnValidThread());
248 if (job_) {
249 if (job_->executor()) {
250 // If the job was already submitted to the executor, just mark it
251 // as cancelled so the user callback isn't run on completion.
252 job_->Cancel();
253 } else {
254 // Otherwise the job is just sitting in a queue.
255 resolver_->RemovePendingJob(job_);
256 }
257 }
258 }
259
260 LoadState GetLoadState() override {
261 DCHECK(resolver_->CalledOnValidThread());
262 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
263 }
264
265 private:
266 base::WeakPtr<Job> job_;
267 MultiThreadedProxyResolver* resolver_;
eroman 2016/01/23 02:17:01 With the suggestion above, this member is no longe
268 };
269
234 // CreateResolverJob ----------------------------------------------------------- 270 // CreateResolverJob -----------------------------------------------------------
235 271
236 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver. 272 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
237 class CreateResolverJob : public Job { 273 class CreateResolverJob : public Job {
238 public: 274 public:
239 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data, 275 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
240 ProxyResolverFactory* factory) 276 ProxyResolverFactory* factory)
241 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()), 277 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()),
242 script_data_(script_data), 278 script_data_(script_data),
243 factory_(factory) {} 279 factory_(factory) {}
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after
435 DCHECK(CalledOnValidThread()); 471 DCHECK(CalledOnValidThread());
436 // We will cancel all outstanding requests. 472 // We will cancel all outstanding requests.
437 pending_jobs_.clear(); 473 pending_jobs_.clear();
438 474
439 for (auto& executor : executors_) { 475 for (auto& executor : executors_) {
440 executor->Destroy(); 476 executor->Destroy();
441 } 477 }
442 } 478 }
443 479
444 int MultiThreadedProxyResolver::GetProxyForURL( 480 int MultiThreadedProxyResolver::GetProxyForURL(
445 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, 481 const GURL& url,
446 RequestHandle* request, const BoundNetLog& net_log) { 482 ProxyInfo* results,
483 const CompletionCallback& callback,
484 scoped_ptr<Request>* request,
485 const BoundNetLog& net_log) {
447 DCHECK(CalledOnValidThread()); 486 DCHECK(CalledOnValidThread());
448 DCHECK(!callback.is_null()); 487 DCHECK(!callback.is_null());
449 488
450 scoped_refptr<GetProxyForURLJob> job( 489 scoped_refptr<GetProxyForURLJob> job(
451 new GetProxyForURLJob(url, results, callback, net_log)); 490 new GetProxyForURLJob(url, results, callback, net_log));
452 491
453 // Completion will be notified through |callback|, unless the caller cancels 492 // Completion will be notified through |callback|, unless the caller cancels
454 // the request using |request|. 493 // the request using |request|.
455 if (request) 494 if (request)
456 *request = reinterpret_cast<RequestHandle>(job.get()); 495 request->reset(new RequestImpl(job->AsWeakPtr(), this));
457 496
458 // If there is an executor that is ready to run this request, submit it! 497 // If there is an executor that is ready to run this request, submit it!
459 Executor* executor = FindIdleExecutor(); 498 Executor* executor = FindIdleExecutor();
460 if (executor) { 499 if (executor) {
461 DCHECK_EQ(0u, pending_jobs_.size()); 500 DCHECK_EQ(0u, pending_jobs_.size());
462 executor->StartJob(job.get()); 501 executor->StartJob(job.get());
463 return ERR_IO_PENDING; 502 return ERR_IO_PENDING;
464 } 503 }
465 504
466 // Otherwise queue this request. (We will schedule it to a thread once one 505 // Otherwise queue this request. (We will schedule it to a thread once one
467 // becomes available). 506 // becomes available).
468 job->WaitingForThread(); 507 job->WaitingForThread();
469 pending_jobs_.push_back(job); 508 pending_jobs_.push_back(job);
470 509
471 // If we haven't already reached the thread limit, provision a new thread to 510 // If we haven't already reached the thread limit, provision a new thread to
472 // drain the requests more quickly. 511 // drain the requests more quickly.
473 if (executors_.size() < max_num_threads_) 512 if (executors_.size() < max_num_threads_)
474 AddNewExecutor(); 513 AddNewExecutor();
475 514
476 return ERR_IO_PENDING; 515 return ERR_IO_PENDING;
477 } 516 }
478 517
479 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
480 DCHECK(CalledOnValidThread());
481 DCHECK(req);
482
483 Job* job = reinterpret_cast<Job*>(req);
484 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
485
486 if (job->executor()) {
487 // If the job was already submitted to the executor, just mark it
488 // as cancelled so the user callback isn't run on completion.
489 job->Cancel();
490 } else {
491 // Otherwise the job is just sitting in a queue.
492 PendingJobsQueue::iterator it =
493 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
494 DCHECK(it != pending_jobs_.end());
495 pending_jobs_.erase(it);
496 }
497 }
498
499 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
500 DCHECK(CalledOnValidThread());
501 DCHECK(req);
502 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
503 }
504 518
505 Executor* MultiThreadedProxyResolver::FindIdleExecutor() { 519 Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
506 DCHECK(CalledOnValidThread()); 520 DCHECK(CalledOnValidThread());
507 for (ExecutorList::iterator it = executors_.begin(); 521 for (ExecutorList::iterator it = executors_.begin();
508 it != executors_.end(); ++it) { 522 it != executors_.end(); ++it) {
509 Executor* executor = it->get(); 523 Executor* executor = it->get();
510 if (!executor->outstanding_job()) 524 if (!executor->outstanding_job())
511 return executor; 525 return executor;
512 } 526 }
513 return NULL; 527 return NULL;
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
624 return ERR_IO_PENDING; 638 return ERR_IO_PENDING;
625 } 639 }
626 640
627 void MultiThreadedProxyResolverFactory::RemoveJob( 641 void MultiThreadedProxyResolverFactory::RemoveJob(
628 MultiThreadedProxyResolverFactory::Job* job) { 642 MultiThreadedProxyResolverFactory::Job* job) {
629 size_t erased = jobs_.erase(job); 643 size_t erased = jobs_.erase(job);
630 DCHECK_EQ(1u, erased); 644 DCHECK_EQ(1u, erased);
631 } 645 }
632 646
633 } // namespace net 647 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698