| Index: chrome/browser/storage_monitor/schedule.cc
|
| diff --git a/chrome/browser/storage_monitor/schedule.cc b/chrome/browser/storage_monitor/schedule.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..7889a22cc41eeeb09f09564127579eecfad34202
|
| --- /dev/null
|
| +++ b/chrome/browser/storage_monitor/schedule.cc
|
| @@ -0,0 +1,436 @@
|
| +// Copyright (c) 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "chrome/browser/storage_monitor/schedule.h"
|
| +
|
| +#include <string>
|
| +#include <vector>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/memory/weak_ptr.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/stl_util.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/threading/thread_checker.h"
|
| +
|
| +namespace chrome {
|
| +
|
| +#ifndef NDEBUG
|
| +class SingleUseClosure : public base::RefCounted<SingleUseClosure> {
|
| + public:
|
| + explicit SingleUseClosure(const base::Closure& closure)
|
| + : closure_(closure), called_(false) {}
|
| +
|
| + void Run() {
|
| + {
|
| + base::AutoLock l(guard_);
|
| + if (!called_)
|
| + called_ = true;
|
| + else
|
| + NOTREACHED();
|
| + }
|
| +
|
| + closure_.Run();
|
| + }
|
| +
|
| + private:
|
| + friend class base::RefCounted<SingleUseClosure>;
|
| +
|
| + virtual ~SingleUseClosure() {}
|
| +
|
| + base::Closure closure_;
|
| + base::Lock guard_;
|
| + bool called_;
|
| +};
|
| +
|
| +base::Closure SingleUse(const base::Closure& closure) {
|
| + return Bind(&SingleUseClosure::Run, new SingleUseClosure(closure));
|
| +}
|
| +#endif
|
| +
|
| +void OnRunner(const tracked_objects::Location& location,
|
| + const base::Closure& closure,
|
| + scoped_refptr<base::TaskRunner> task_runner) {
|
| + if (!task_runner->RunsTasksOnCurrentThread()) {
|
| + task_runner->PostTask(location, closure);
|
| + return;
|
| + }
|
| + closure.Run();
|
| +}
|
| +
|
| +//////////////////////////////////////////////////////////////////////////
|
| +// ScheduleStage
|
| +
|
| +class ScheduleStage {
|
| + public:
|
| + ScheduleStage() {}
|
| + ~ScheduleStage() {}
|
| +
|
| + void SetPrecondition(const base::Callback<bool(void)>& condition);
|
| + void AddCallback(const base::Closure& task,
|
| + scoped_refptr<base::TaskRunner> task_runner,
|
| + bool async);
|
| +
|
| + int num_callbacks();
|
| +
|
| + bool PreconditionMet();
|
| +
|
| + bool StartAll(tracked_objects::Location location,
|
| + const base::Closure& on_done);
|
| +
|
| + private:
|
| + struct Task {
|
| + Task(base::Closure c, scoped_refptr<base::TaskRunner> r, bool a)
|
| + : closure(c), runner(r), async(a) {}
|
| + virtual ~Task() {}
|
| +
|
| + base::Closure closure;
|
| + scoped_refptr<base::TaskRunner> runner;
|
| + bool async;
|
| + };
|
| +
|
| + base::Callback<bool(void)> precondition_;
|
| + std::vector<Task> tasks_;
|
| +};
|
| +
|
| +int ScheduleStage::num_callbacks() { return tasks_.size(); }
|
| +
|
| +void ScheduleStage::SetPrecondition(
|
| + const base::Callback<bool(void)>& condition) {
|
| + CHECK(num_callbacks() == 0);
|
| + precondition_ = condition;
|
| +}
|
| +
|
| +// Caller must only evaluate this method on the thread the schedule was
|
| +// created on.
|
| +bool ScheduleStage::PreconditionMet() {
|
| + if (precondition_.is_null())
|
| + return true;
|
| +
|
| + return precondition_.Run();
|
| +}
|
| +
|
| +void ScheduleStage::AddCallback(const base::Closure& task,
|
| + scoped_refptr<base::TaskRunner> task_runner,
|
| + bool async) {
|
| + tasks_.push_back(Task(task, task_runner, async));
|
| +}
|
| +
|
| +// TODO(gbillock): Idea. Trampoline this call through a bunch of indexed
|
| +// functions so any crashes record in the trace the stage they are on. Useful?
|
| +bool ScheduleStage::StartAll(tracked_objects::Location location,
|
| + const base::Closure& on_done) {
|
| + bool success = true;
|
| +
|
| + for (size_t i = 0; i < tasks_.size(); ++i) {
|
| + if (!tasks_[i].async) {
|
| + success &= tasks_[i].runner->PostTaskAndReply(
|
| + location, tasks_[i].closure, on_done);
|
| + } else {
|
| + success &= tasks_[i].runner->PostTask(location, tasks_[i].closure);
|
| + }
|
| + }
|
| +
|
| + return success;
|
| +}
|
| +
|
| +///////////////////////////////////////////////////////////////////////////
|
| +
|
| +class ScheduleImpl : public Schedule,
|
| + public base::SupportsWeakPtr<ScheduleImpl> {
|
| + public:
|
| + ScheduleImpl(const char* name, tracked_objects::Location location);
|
| + virtual ~ScheduleImpl();
|
| +
|
| + base::Closure ContinueClosure();
|
| +
|
| + // Schedule implementation
|
| + virtual void Start() OVERRIDE;
|
| + virtual void Cancel(const tracked_objects::Location& location) OVERRIDE;
|
| + virtual void OnCancel(const base::Closure& closure) OVERRIDE;
|
| + virtual void OnExit(const base::Closure& closure) OVERRIDE;
|
| +
|
| + void Run(const base::Closure& task,
|
| + scoped_refptr<base::TaskRunner> task_runner,
|
| + bool async);
|
| +
|
| + void NextStage(const base::Callback<bool(void)>& condition);
|
| +
|
| + const scoped_refptr<base::TaskRunner>& default_task_runner() {
|
| + return default_task_runner_;
|
| + }
|
| +
|
| + private:
|
| + bool started() { return currently_running_ >= 0; }
|
| + bool CalledOnValidThread() { return thread_checker_.CalledOnValidThread(); }
|
| +
|
| + void Continue();
|
| +
|
| + // This function is scheduled to be run whenever a scheduled callback is
|
| + // finished.
|
| + void OnReply();
|
| +
|
| + void Advance();
|
| + void PreShutdown();
|
| + void Shutdown();
|
| +
|
| + std::string name_;
|
| + tracked_objects::Location location_;
|
| + tracked_objects::Location cancel_location_;
|
| + base::ThreadChecker thread_checker_;
|
| +
|
| + // This variable is potentially accessed multi-threaded. It is
|
| + // thread-compatible: set during initialization and never touched after.
|
| + scoped_refptr<base::TaskRunner> default_task_runner_;
|
| +
|
| + std::vector<ScheduleStage*> stages_;
|
| + std::vector<int> completions_;
|
| + int currently_running_;
|
| + std::vector<base::Closure> on_cancel_callbacks_;
|
| + int cancels_pending_;
|
| + std::vector<base::Closure> on_exit_callbacks_;
|
| +
|
| + bool cancelled_;
|
| +};
|
| +
|
| +ScheduleImpl::ScheduleImpl(const char* name, tracked_objects::Location location)
|
| + : name_(name),
|
| + location_(location),
|
| + currently_running_(-1),
|
| + cancels_pending_(0),
|
| + cancelled_(false) {
|
| + default_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
|
| + stages_.push_back(new ScheduleStage());
|
| + completions_.push_back(0);
|
| +}
|
| +
|
| +ScheduleImpl::~ScheduleImpl() { STLDeleteElements(&stages_); }
|
| +
|
| +void ScheduleImpl::Start() {
|
| + CHECK(!started());
|
| + CHECK(CalledOnValidThread());
|
| +
|
| + Advance();
|
| +}
|
| +
|
| +void ScheduleImpl::Run(const base::Closure& task,
|
| + scoped_refptr<base::TaskRunner> task_runner,
|
| + bool async) {
|
| + CHECK(!started());
|
| + CHECK(CalledOnValidThread());
|
| + stages_.back()->AddCallback(task, task_runner, async);
|
| +}
|
| +
|
| +void ScheduleImpl::OnCancel(const base::Closure& closure) {
|
| + if (!default_task_runner_->RunsTasksOnCurrentThread()) {
|
| + default_task_runner_->PostTask(
|
| + location_, base::Bind(&ScheduleImpl::OnCancel, AsWeakPtr(), closure));
|
| + return;
|
| + }
|
| + CHECK(CalledOnValidThread());
|
| + // TODO(gbillock): Is this too harsh? Could bounce to the right thread...
|
| + CHECK(CalledOnValidThread());
|
| + CHECK(!cancels_pending_);
|
| + on_cancel_callbacks_.push_back(closure);
|
| +}
|
| +
|
| +void ScheduleImpl::OnExit(const base::Closure& closure) {
|
| + if (!default_task_runner_->RunsTasksOnCurrentThread()) {
|
| + default_task_runner_->PostTask(
|
| + location_, base::Bind(&ScheduleImpl::OnExit, AsWeakPtr(), closure));
|
| + return;
|
| + }
|
| + CHECK(CalledOnValidThread());
|
| +
|
| + // TODO(gbillock): figure out how to keep exit closures from posting more
|
| + // exit closures?
|
| + on_exit_callbacks_.push_back(closure);
|
| +}
|
| +
|
| +void ScheduleImpl::NextStage(const base::Callback<bool(void)>& condition) {
|
| + CHECK(!started());
|
| + CHECK(CalledOnValidThread());
|
| + stages_.push_back(new ScheduleStage);
|
| + completions_.push_back(0);
|
| + stages_.back()->SetPrecondition(condition);
|
| +}
|
| +
|
| +void ScheduleImpl::Cancel(const tracked_objects::Location& location) {
|
| + if (!default_task_runner_->RunsTasksOnCurrentThread()) {
|
| + default_task_runner_->PostTask(
|
| + location, base::Bind(&ScheduleImpl::Cancel, AsWeakPtr(), location));
|
| + return;
|
| + }
|
| +
|
| + CHECK(CalledOnValidThread());
|
| +
|
| + cancel_location_ = location;
|
| + cancelled_ = true;
|
| +
|
| + if (!started())
|
| + PreShutdown();
|
| +}
|
| +
|
| +base::Closure ScheduleImpl::ContinueClosure() {
|
| +#ifndef NDEBUG
|
| + return SingleUse(base::Bind(&OnRunner,
|
| + location_,
|
| + base::Bind(&ScheduleImpl::Continue, AsWeakPtr()),
|
| + default_task_runner_));
|
| +#else
|
| + return base::Bind(&OnRunner,
|
| + location_,
|
| + base::Bind(&ScheduleImpl::Continue, AsWeakPtr()),
|
| + default_task_runner_);
|
| +#endif
|
| +}
|
| +
|
| +void ScheduleImpl::Continue() {
|
| + if (!default_task_runner_->RunsTasksOnCurrentThread()) {
|
| + // Bounce to the correct thread if not already there.
|
| + // TODO(gbillock): Resolve potential cross-thread use of AsWeakPtr()
|
| + // here and similar spots elsewhere.
|
| + default_task_runner_->PostTask(
|
| + location_, base::Bind(&ScheduleImpl::OnReply, AsWeakPtr()));
|
| + return;
|
| + }
|
| +
|
| + OnReply();
|
| +}
|
| +
|
| +void ScheduleImpl::Advance() {
|
| + CHECK(CalledOnValidThread());
|
| +
|
| + currently_running_++;
|
| +
|
| + // If the scheduler has been cancelled, or we've run off the end of
|
| + // the registered stages, then stop.
|
| + if (cancelled_ || currently_running_ >= static_cast<int>(stages_.size())) {
|
| + PreShutdown();
|
| + return;
|
| + }
|
| +
|
| + ScheduleStage* stage = stages_.at(currently_running_);
|
| +
|
| + // Check for a precondition. If not met, then advance past the stage.
|
| + if (!stage->PreconditionMet()) {
|
| + Advance();
|
| + return;
|
| + }
|
| +
|
| + // Schedule all the callbacks in the stage.
|
| + DCHECK(completions_[currently_running_] == 0);
|
| + if (stage->num_callbacks() > 0) {
|
| + bool success = stage->StartAll(
|
| + location_, base::Bind(&ScheduleImpl::OnReply, AsWeakPtr()));
|
| + if (!success) {
|
| + // Could not schedule work. We must be in shutdown mode.
|
| + Cancel(location_);
|
| + }
|
| + } else {
|
| + Advance();
|
| + }
|
| +}
|
| +
|
| +void ScheduleImpl::OnReply() {
|
| + CHECK(CalledOnValidThread());
|
| +
|
| + // If we've gotten all the callbacks for the current stage done, then advance.
|
| + completions_[currently_running_]++;
|
| + if (completions_[currently_running_] >=
|
| + stages_[currently_running_]->num_callbacks()) {
|
| + Advance();
|
| + return;
|
| + }
|
| +}
|
| +
|
| +// TODO(gbillock): rework this as a conditional execution stage?
|
| +void ScheduleImpl::PreShutdown() {
|
| + if (!cancelled_ || on_cancel_callbacks_.size() == 0) {
|
| + Shutdown();
|
| + return;
|
| + }
|
| +
|
| + cancels_pending_ = on_cancel_callbacks_.size();
|
| + base::Closure on_cancelled = base::Bind(&ScheduleImpl::Shutdown, AsWeakPtr());
|
| + for (size_t i = 0; i < on_cancel_callbacks_.size(); ++i) {
|
| + // TODO(gbillock): Should we just directly Run() these instead of
|
| + // posting them?
|
| + default_task_runner_->PostTaskAndReply(
|
| + cancel_location_, on_cancel_callbacks_[i], on_cancelled);
|
| + }
|
| +}
|
| +
|
| +void ScheduleImpl::Shutdown() {
|
| + // If we're doing any cancellation callbacks, wait for them all.
|
| + // TODO(gbillock): too slick?
|
| + if (cancels_pending_ > 0 && --cancels_pending_ > 0)
|
| + return;
|
| +
|
| + for (std::vector<base::Closure>::reverse_iterator riter =
|
| + on_exit_callbacks_.rbegin();
|
| + riter != on_exit_callbacks_.rend();
|
| + ++riter) {
|
| + // TODO(gbillock): Should we just directly Run() these instead of
|
| + // posting them?
|
| + default_task_runner_->PostTask(location_, *riter);
|
| + }
|
| +
|
| + delete this;
|
| +}
|
| +
|
| +////////////////////////////////////////////////////////////////////////////
|
| +// ScheduleBuilder
|
| +////////////////////////////////////////////////////////////////////////////
|
| +
|
| +ScheduleBuilder::ScheduleBuilder(const char* name,
|
| + tracked_objects::Location location)
|
| + : impl_(new ScheduleImpl(name, location)) {}
|
| +
|
| +ScheduleBuilder::~ScheduleBuilder() {}
|
| +
|
| +base::Closure ScheduleBuilder::ContinueClosure() {
|
| + return impl_->ContinueClosure();
|
| +}
|
| +
|
| +base::WeakPtr<Schedule> ScheduleBuilder::schedule() {
|
| + return impl_->AsWeakPtr();
|
| +}
|
| +
|
| +void ScheduleBuilder::Run(const base::Closure& task) {
|
| + impl_->Run(task, impl_->default_task_runner(), false);
|
| +}
|
| +
|
| +void ScheduleBuilder::RunOn(const base::Closure& task,
|
| + scoped_refptr<base::TaskRunner> task_runner) {
|
| + impl_->Run(task, task_runner, false);
|
| +}
|
| +
|
| +void ScheduleBuilder::RunAsync(const base::Closure& task) {
|
| + impl_->Run(task, impl_->default_task_runner(), true);
|
| +}
|
| +
|
| +void ScheduleBuilder::RunAsyncOn(const base::Closure& task,
|
| + scoped_refptr<base::TaskRunner> task_runner) {
|
| + impl_->Run(task, task_runner, true);
|
| +}
|
| +
|
| +void ScheduleBuilder::NextStage() {
|
| + impl_->NextStage(base::Callback<bool(void)>());
|
| +}
|
| +
|
| +void ScheduleBuilder::NextStageConditional(
|
| + const base::Callback<bool(void)>& condition) {
|
| + impl_->NextStage(condition);
|
| +}
|
| +
|
| +base::WeakPtr<Schedule> ScheduleBuilder::Build() {
|
| + // Makes the ScheduleImpl self-deleting
|
| + return impl_.release()->AsWeakPtr();
|
| +}
|
| +
|
| +Schedule::~Schedule() {}
|
| +
|
| +} // namespace chrome
|
|
|