Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(127)

Unified Diff: chrome/browser/storage_monitor/schedule.cc

Issue 24298002: Media Galleries API: Use Scheduler in MediaGalleriesPreferences initialization. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chrome/browser/storage_monitor/schedule.cc
diff --git a/chrome/browser/storage_monitor/schedule.cc b/chrome/browser/storage_monitor/schedule.cc
new file mode 100644
index 0000000000000000000000000000000000000000..7889a22cc41eeeb09f09564127579eecfad34202
--- /dev/null
+++ b/chrome/browser/storage_monitor/schedule.cc
@@ -0,0 +1,436 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "chrome/browser/storage_monitor/schedule.h"
+
+#include <string>
+#include <vector>
+
+#include "base/bind.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/weak_ptr.h"
+#include "base/message_loop/message_loop.h"
+#include "base/stl_util.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/thread_checker.h"
+
+namespace chrome {
+
+#ifndef NDEBUG
+class SingleUseClosure : public base::RefCounted<SingleUseClosure> {
+ public:
+ explicit SingleUseClosure(const base::Closure& closure)
+ : closure_(closure), called_(false) {}
+
+ void Run() {
+ {
+ base::AutoLock l(guard_);
+ if (!called_)
+ called_ = true;
+ else
+ NOTREACHED();
+ }
+
+ closure_.Run();
+ }
+
+ private:
+ friend class base::RefCounted<SingleUseClosure>;
+
+ virtual ~SingleUseClosure() {}
+
+ base::Closure closure_;
+ base::Lock guard_;
+ bool called_;
+};
+
+base::Closure SingleUse(const base::Closure& closure) {
+ return Bind(&SingleUseClosure::Run, new SingleUseClosure(closure));
+}
+#endif
+
+void OnRunner(const tracked_objects::Location& location,
+ const base::Closure& closure,
+ scoped_refptr<base::TaskRunner> task_runner) {
+ if (!task_runner->RunsTasksOnCurrentThread()) {
+ task_runner->PostTask(location, closure);
+ return;
+ }
+ closure.Run();
+}
+
+//////////////////////////////////////////////////////////////////////////
+// ScheduleStage
+
+class ScheduleStage {
+ public:
+ ScheduleStage() {}
+ ~ScheduleStage() {}
+
+ void SetPrecondition(const base::Callback<bool(void)>& condition);
+ void AddCallback(const base::Closure& task,
+ scoped_refptr<base::TaskRunner> task_runner,
+ bool async);
+
+ int num_callbacks();
+
+ bool PreconditionMet();
+
+ bool StartAll(tracked_objects::Location location,
+ const base::Closure& on_done);
+
+ private:
+ struct Task {
+ Task(base::Closure c, scoped_refptr<base::TaskRunner> r, bool a)
+ : closure(c), runner(r), async(a) {}
+ virtual ~Task() {}
+
+ base::Closure closure;
+ scoped_refptr<base::TaskRunner> runner;
+ bool async;
+ };
+
+ base::Callback<bool(void)> precondition_;
+ std::vector<Task> tasks_;
+};
+
+int ScheduleStage::num_callbacks() { return tasks_.size(); }
+
+void ScheduleStage::SetPrecondition(
+ const base::Callback<bool(void)>& condition) {
+ CHECK(num_callbacks() == 0);
+ precondition_ = condition;
+}
+
+// Caller must only evaluate this method on the thread the schedule was
+// created on.
+bool ScheduleStage::PreconditionMet() {
+ if (precondition_.is_null())
+ return true;
+
+ return precondition_.Run();
+}
+
+void ScheduleStage::AddCallback(const base::Closure& task,
+ scoped_refptr<base::TaskRunner> task_runner,
+ bool async) {
+ tasks_.push_back(Task(task, task_runner, async));
+}
+
+// TODO(gbillock): Idea. Trampoline this call through a bunch of indexed
+// functions so any crashes record in the trace the stage they are on. Useful?
+bool ScheduleStage::StartAll(tracked_objects::Location location,
+ const base::Closure& on_done) {
+ bool success = true;
+
+ for (size_t i = 0; i < tasks_.size(); ++i) {
+ if (!tasks_[i].async) {
+ success &= tasks_[i].runner->PostTaskAndReply(
+ location, tasks_[i].closure, on_done);
+ } else {
+ success &= tasks_[i].runner->PostTask(location, tasks_[i].closure);
+ }
+ }
+
+ return success;
+}
+
+///////////////////////////////////////////////////////////////////////////
+
+class ScheduleImpl : public Schedule,
+ public base::SupportsWeakPtr<ScheduleImpl> {
+ public:
+ ScheduleImpl(const char* name, tracked_objects::Location location);
+ virtual ~ScheduleImpl();
+
+ base::Closure ContinueClosure();
+
+ // Schedule implementation
+ virtual void Start() OVERRIDE;
+ virtual void Cancel(const tracked_objects::Location& location) OVERRIDE;
+ virtual void OnCancel(const base::Closure& closure) OVERRIDE;
+ virtual void OnExit(const base::Closure& closure) OVERRIDE;
+
+ void Run(const base::Closure& task,
+ scoped_refptr<base::TaskRunner> task_runner,
+ bool async);
+
+ void NextStage(const base::Callback<bool(void)>& condition);
+
+ const scoped_refptr<base::TaskRunner>& default_task_runner() {
+ return default_task_runner_;
+ }
+
+ private:
+ bool started() { return currently_running_ >= 0; }
+ bool CalledOnValidThread() { return thread_checker_.CalledOnValidThread(); }
+
+ void Continue();
+
+ // This function is scheduled to be run whenever a scheduled callback is
+ // finished.
+ void OnReply();
+
+ void Advance();
+ void PreShutdown();
+ void Shutdown();
+
+ std::string name_;
+ tracked_objects::Location location_;
+ tracked_objects::Location cancel_location_;
+ base::ThreadChecker thread_checker_;
+
+ // This variable is potentially accessed multi-threaded. It is
+ // thread-compatible: set during initialization and never touched after.
+ scoped_refptr<base::TaskRunner> default_task_runner_;
+
+ std::vector<ScheduleStage*> stages_;
+ std::vector<int> completions_;
+ int currently_running_;
+ std::vector<base::Closure> on_cancel_callbacks_;
+ int cancels_pending_;
+ std::vector<base::Closure> on_exit_callbacks_;
+
+ bool cancelled_;
+};
+
+ScheduleImpl::ScheduleImpl(const char* name, tracked_objects::Location location)
+ : name_(name),
+ location_(location),
+ currently_running_(-1),
+ cancels_pending_(0),
+ cancelled_(false) {
+ default_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
+ stages_.push_back(new ScheduleStage());
+ completions_.push_back(0);
+}
+
+ScheduleImpl::~ScheduleImpl() { STLDeleteElements(&stages_); }
+
+void ScheduleImpl::Start() {
+ CHECK(!started());
+ CHECK(CalledOnValidThread());
+
+ Advance();
+}
+
+void ScheduleImpl::Run(const base::Closure& task,
+ scoped_refptr<base::TaskRunner> task_runner,
+ bool async) {
+ CHECK(!started());
+ CHECK(CalledOnValidThread());
+ stages_.back()->AddCallback(task, task_runner, async);
+}
+
+void ScheduleImpl::OnCancel(const base::Closure& closure) {
+ if (!default_task_runner_->RunsTasksOnCurrentThread()) {
+ default_task_runner_->PostTask(
+ location_, base::Bind(&ScheduleImpl::OnCancel, AsWeakPtr(), closure));
+ return;
+ }
+ CHECK(CalledOnValidThread());
+ // TODO(gbillock): Is this too harsh? Could bounce to the right thread...
+ CHECK(CalledOnValidThread());
+ CHECK(!cancels_pending_);
+ on_cancel_callbacks_.push_back(closure);
+}
+
+void ScheduleImpl::OnExit(const base::Closure& closure) {
+ if (!default_task_runner_->RunsTasksOnCurrentThread()) {
+ default_task_runner_->PostTask(
+ location_, base::Bind(&ScheduleImpl::OnExit, AsWeakPtr(), closure));
+ return;
+ }
+ CHECK(CalledOnValidThread());
+
+ // TODO(gbillock): figure out how to keep exit closures from posting more
+ // exit closures?
+ on_exit_callbacks_.push_back(closure);
+}
+
+void ScheduleImpl::NextStage(const base::Callback<bool(void)>& condition) {
+ CHECK(!started());
+ CHECK(CalledOnValidThread());
+ stages_.push_back(new ScheduleStage);
+ completions_.push_back(0);
+ stages_.back()->SetPrecondition(condition);
+}
+
+void ScheduleImpl::Cancel(const tracked_objects::Location& location) {
+ if (!default_task_runner_->RunsTasksOnCurrentThread()) {
+ default_task_runner_->PostTask(
+ location, base::Bind(&ScheduleImpl::Cancel, AsWeakPtr(), location));
+ return;
+ }
+
+ CHECK(CalledOnValidThread());
+
+ cancel_location_ = location;
+ cancelled_ = true;
+
+ if (!started())
+ PreShutdown();
+}
+
+base::Closure ScheduleImpl::ContinueClosure() {
+#ifndef NDEBUG
+ return SingleUse(base::Bind(&OnRunner,
+ location_,
+ base::Bind(&ScheduleImpl::Continue, AsWeakPtr()),
+ default_task_runner_));
+#else
+ return base::Bind(&OnRunner,
+ location_,
+ base::Bind(&ScheduleImpl::Continue, AsWeakPtr()),
+ default_task_runner_);
+#endif
+}
+
+void ScheduleImpl::Continue() {
+ if (!default_task_runner_->RunsTasksOnCurrentThread()) {
+ // Bounce to the correct thread if not already there.
+ // TODO(gbillock): Resolve potential cross-thread use of AsWeakPtr()
+ // here and similar spots elsewhere.
+ default_task_runner_->PostTask(
+ location_, base::Bind(&ScheduleImpl::OnReply, AsWeakPtr()));
+ return;
+ }
+
+ OnReply();
+}
+
+void ScheduleImpl::Advance() {
+ CHECK(CalledOnValidThread());
+
+ currently_running_++;
+
+ // If the scheduler has been cancelled, or we've run off the end of
+ // the registered stages, then stop.
+ if (cancelled_ || currently_running_ >= static_cast<int>(stages_.size())) {
+ PreShutdown();
+ return;
+ }
+
+ ScheduleStage* stage = stages_.at(currently_running_);
+
+ // Check for a precondition. If not met, then advance past the stage.
+ if (!stage->PreconditionMet()) {
+ Advance();
+ return;
+ }
+
+ // Schedule all the callbacks in the stage.
+ DCHECK(completions_[currently_running_] == 0);
+ if (stage->num_callbacks() > 0) {
+ bool success = stage->StartAll(
+ location_, base::Bind(&ScheduleImpl::OnReply, AsWeakPtr()));
+ if (!success) {
+ // Could not schedule work. We must be in shutdown mode.
+ Cancel(location_);
+ }
+ } else {
+ Advance();
+ }
+}
+
+void ScheduleImpl::OnReply() {
+ CHECK(CalledOnValidThread());
+
+ // If we've gotten all the callbacks for the current stage done, then advance.
+ completions_[currently_running_]++;
+ if (completions_[currently_running_] >=
+ stages_[currently_running_]->num_callbacks()) {
+ Advance();
+ return;
+ }
+}
+
+// TODO(gbillock): rework this as a conditional execution stage?
+void ScheduleImpl::PreShutdown() {
+ if (!cancelled_ || on_cancel_callbacks_.size() == 0) {
+ Shutdown();
+ return;
+ }
+
+ cancels_pending_ = on_cancel_callbacks_.size();
+ base::Closure on_cancelled = base::Bind(&ScheduleImpl::Shutdown, AsWeakPtr());
+ for (size_t i = 0; i < on_cancel_callbacks_.size(); ++i) {
+ // TODO(gbillock): Should we just directly Run() these instead of
+ // posting them?
+ default_task_runner_->PostTaskAndReply(
+ cancel_location_, on_cancel_callbacks_[i], on_cancelled);
+ }
+}
+
+void ScheduleImpl::Shutdown() {
+ // If we're doing any cancellation callbacks, wait for them all.
+ // TODO(gbillock): too slick?
+ if (cancels_pending_ > 0 && --cancels_pending_ > 0)
+ return;
+
+ for (std::vector<base::Closure>::reverse_iterator riter =
+ on_exit_callbacks_.rbegin();
+ riter != on_exit_callbacks_.rend();
+ ++riter) {
+ // TODO(gbillock): Should we just directly Run() these instead of
+ // posting them?
+ default_task_runner_->PostTask(location_, *riter);
+ }
+
+ delete this;
+}
+
+////////////////////////////////////////////////////////////////////////////
+// ScheduleBuilder
+////////////////////////////////////////////////////////////////////////////
+
+ScheduleBuilder::ScheduleBuilder(const char* name,
+ tracked_objects::Location location)
+ : impl_(new ScheduleImpl(name, location)) {}
+
+ScheduleBuilder::~ScheduleBuilder() {}
+
+base::Closure ScheduleBuilder::ContinueClosure() {
+ return impl_->ContinueClosure();
+}
+
+base::WeakPtr<Schedule> ScheduleBuilder::schedule() {
+ return impl_->AsWeakPtr();
+}
+
+void ScheduleBuilder::Run(const base::Closure& task) {
+ impl_->Run(task, impl_->default_task_runner(), false);
+}
+
+void ScheduleBuilder::RunOn(const base::Closure& task,
+ scoped_refptr<base::TaskRunner> task_runner) {
+ impl_->Run(task, task_runner, false);
+}
+
+void ScheduleBuilder::RunAsync(const base::Closure& task) {
+ impl_->Run(task, impl_->default_task_runner(), true);
+}
+
+void ScheduleBuilder::RunAsyncOn(const base::Closure& task,
+ scoped_refptr<base::TaskRunner> task_runner) {
+ impl_->Run(task, task_runner, true);
+}
+
+void ScheduleBuilder::NextStage() {
+ impl_->NextStage(base::Callback<bool(void)>());
+}
+
+void ScheduleBuilder::NextStageConditional(
+ const base::Callback<bool(void)>& condition) {
+ impl_->NextStage(condition);
+}
+
+base::WeakPtr<Schedule> ScheduleBuilder::Build() {
+ // Makes the ScheduleImpl self-deleting
+ return impl_.release()->AsWeakPtr();
+}
+
+Schedule::~Schedule() {}
+
+} // namespace chrome

Powered by Google App Engine
This is Rietveld 408576698