| Index: mojo/public/cpp/system/simple_watcher.cc
|
| diff --git a/mojo/public/cpp/system/simple_watcher.cc b/mojo/public/cpp/system/simple_watcher.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..f99ff3f91f4ff6be71d1ed4e992b6ae8cae47fe3
|
| --- /dev/null
|
| +++ b/mojo/public/cpp/system/simple_watcher.cc
|
| @@ -0,0 +1,276 @@
|
| +// Copyright 2017 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/public/cpp/system/simple_watcher.h"
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/macros.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/single_thread_task_runner.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/trace_event/heap_profiler.h"
|
| +#include "mojo/public/c/system/watcher.h"
|
| +
|
| +namespace mojo {
|
| +
|
| +// Thread-safe Context object used to dispatch watch notifications from a
|
| +// arbitrary threads.
|
| +class SimpleWatcher::Context : public base::RefCountedThreadSafe<Context> {
|
| + public:
|
| + // Creates a |Context| instance for a new watch on |watcher|, to watch
|
| + // |handle| for |signals|.
|
| + static scoped_refptr<Context> Create(
|
| + base::WeakPtr<SimpleWatcher> watcher,
|
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner,
|
| + WatcherHandle watcher_handle,
|
| + Handle handle,
|
| + MojoHandleSignals signals,
|
| + MojoResult* watch_result) {
|
| + // We use a scoped_refptr<Context> instance as watch context value. This
|
| + // instance (and thus the Context ref it holds) is effectively owned by the
|
| + // registered watch. We delete it on cancellation, which is a guaranteed
|
| + // event.
|
| + auto context_ref = base::MakeUnique<scoped_refptr<Context>>();
|
| + *context_ref = new Context(watcher, task_runner,
|
| + reinterpret_cast<uintptr_t>(context_ref.get()));
|
| +
|
| + *watch_result = MojoWatch(watcher_handle.value(), handle.value(), signals,
|
| + (*context_ref)->value());
|
| + if (*watch_result != MOJO_RESULT_OK)
|
| + return nullptr;
|
| +
|
| + // Ownership of the ref has been transferred into the successfully
|
| + // registered watch.
|
| + return *context_ref.release();
|
| + }
|
| +
|
| + static void CallNotify(uintptr_t context_value,
|
| + MojoResult result,
|
| + MojoHandleSignalsState signals_state,
|
| + MojoWatcherNotificationFlags flags) {
|
| + auto* context_ref =
|
| + reinterpret_cast<scoped_refptr<Context>*>(context_value);
|
| + (*context_ref)->Notify(result, signals_state, flags);
|
| +
|
| + // That was the last notification for the context. We can delete the ref
|
| + // owned by the watch.
|
| + if (result == MOJO_RESULT_CANCELLED)
|
| + delete context_ref;
|
| + }
|
| +
|
| + uintptr_t value() const { return context_value_; }
|
| +
|
| + void DisableCancellationNotifications() {
|
| + base::AutoLock lock(lock_);
|
| + enable_cancellation_notifications_ = false;
|
| + }
|
| +
|
| + private:
|
| + friend class base::RefCountedThreadSafe<Context>;
|
| +
|
| + Context(base::WeakPtr<SimpleWatcher> weak_watcher,
|
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner,
|
| + uintptr_t context_value)
|
| + : weak_watcher_(weak_watcher),
|
| + task_runner_(task_runner),
|
| + context_value_(context_value) {}
|
| + ~Context() {}
|
| +
|
| + void Notify(MojoResult result,
|
| + MojoHandleSignalsState signals_state,
|
| + MojoWatcherNotificationFlags flags) {
|
| + if (result == MOJO_RESULT_CANCELLED) {
|
| + // The SimpleWatcher may have explicitly cancelled this watch, so we don't
|
| + // bother dispatching the notification - it would be ignored anyway.
|
| + //
|
| + // TODO(rockot): This shouldn't really be necessary, but there are already
|
| + // instances today where bindings object may be bound and subsequently
|
| + // closed due to pipe error, all before the thread's TaskRunner has been
|
| + // properly initialized.
|
| + base::AutoLock lock(lock_);
|
| + if (!enable_cancellation_notifications_)
|
| + return;
|
| + }
|
| +
|
| + if ((flags & MOJO_WATCHER_NOTIFICATION_FLAG_FROM_SYSTEM) &&
|
| + task_runner_->RunsTasksOnCurrentThread() && weak_watcher_ &&
|
| + weak_watcher_->is_default_task_runner_) {
|
| + // System notifications will trigger from the task runner passed to
|
| + // mojo::edk::InitIPCSupport(). In Chrome this happens to always be the
|
| + // default task runner for the IO thread.
|
| + weak_watcher_->OnHandleReady(make_scoped_refptr(this), result);
|
| + } else {
|
| + task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady, weak_watcher_,
|
| + make_scoped_refptr(this), result));
|
| + }
|
| + }
|
| +
|
| + const base::WeakPtr<SimpleWatcher> weak_watcher_;
|
| + const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
|
| + const uintptr_t context_value_;
|
| +
|
| + base::Lock lock_;
|
| + bool enable_cancellation_notifications_ = true;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(Context);
|
| +};
|
| +
|
| +SimpleWatcher::SimpleWatcher(const tracked_objects::Location& from_here,
|
| + ArmingPolicy arming_policy,
|
| + scoped_refptr<base::SingleThreadTaskRunner> runner)
|
| + : arming_policy_(arming_policy),
|
| + task_runner_(std::move(runner)),
|
| + is_default_task_runner_(task_runner_ ==
|
| + base::ThreadTaskRunnerHandle::Get()),
|
| + heap_profiler_tag_(from_here.file_name()),
|
| + weak_factory_(this) {
|
| + MojoResult rv = CreateWatcher(&Context::CallNotify, &watcher_handle_);
|
| + DCHECK_EQ(MOJO_RESULT_OK, rv);
|
| + DCHECK(task_runner_->BelongsToCurrentThread());
|
| +}
|
| +
|
| +SimpleWatcher::~SimpleWatcher() {
|
| + if (IsWatching())
|
| + Cancel();
|
| +}
|
| +
|
| +bool SimpleWatcher::IsWatching() const {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + return context_ != nullptr;
|
| +}
|
| +
|
| +MojoResult SimpleWatcher::Watch(Handle handle,
|
| + MojoHandleSignals signals,
|
| + const ReadyCallback& callback) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DCHECK(!IsWatching());
|
| + DCHECK(!callback.is_null());
|
| +
|
| + callback_ = callback;
|
| + handle_ = handle;
|
| +
|
| + MojoResult watch_result = MOJO_RESULT_UNKNOWN;
|
| + context_ =
|
| + Context::Create(weak_factory_.GetWeakPtr(), task_runner_,
|
| + watcher_handle_.get(), handle_, signals, &watch_result);
|
| + if (!context_) {
|
| + handle_.set_value(kInvalidHandleValue);
|
| + callback_.Reset();
|
| + DCHECK_EQ(MOJO_RESULT_INVALID_ARGUMENT, watch_result);
|
| + return watch_result;
|
| + }
|
| +
|
| + if (arming_policy_ == ArmingPolicy::AUTOMATIC)
|
| + ArmOrNotify();
|
| +
|
| + return MOJO_RESULT_OK;
|
| +}
|
| +
|
| +void SimpleWatcher::Cancel() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + // The watcher may have already been cancelled if the handle was closed.
|
| + if (!context_)
|
| + return;
|
| +
|
| + context_->DisableCancellationNotifications();
|
| +
|
| + handle_.set_value(kInvalidHandleValue);
|
| + callback_.Reset();
|
| +
|
| + // Ensure |context_| is unset by the time we call MojoCancelWatch, as may
|
| + // re-enter the notification callback and we want to ensure |context_| is
|
| + // unset by then. This prevents the cancellation notification from reaching
|
| + // OnHandleReady() when cancellation is explicit.
|
| + scoped_refptr<Context> context;
|
| + std::swap(context, context_);
|
| + MojoResult rv =
|
| + MojoCancelWatch(watcher_handle_.get().value(), context->value());
|
| +
|
| + // It's possible this cancellation could race with a handle closure
|
| + // notification, in which case the watch may have already been implicitly
|
| + // cancelled.
|
| + DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
|
| +}
|
| +
|
| +MojoResult SimpleWatcher::Arm(MojoResult* ready_result) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + uint32_t num_ready_contexts = 1;
|
| + uintptr_t ready_context;
|
| + MojoResult local_ready_result;
|
| + MojoHandleSignalsState ready_state;
|
| + MojoResult rv =
|
| + MojoArmWatcher(watcher_handle_.get().value(), &num_ready_contexts,
|
| + &ready_context, &local_ready_result, &ready_state);
|
| + if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
|
| + DCHECK(context_);
|
| + DCHECK_EQ(1u, num_ready_contexts);
|
| + DCHECK_EQ(context_->value(), ready_context);
|
| + if (ready_result)
|
| + *ready_result = local_ready_result;
|
| + }
|
| +
|
| + return rv;
|
| +}
|
| +
|
| +void SimpleWatcher::ArmOrNotify() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + // Already cancelled, nothing to do.
|
| + if (!IsWatching())
|
| + return;
|
| +
|
| + MojoResult ready_result;
|
| + MojoResult rv = Arm(&ready_result);
|
| + if (rv == MOJO_RESULT_OK)
|
| + return;
|
| +
|
| + DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, rv);
|
| + task_runner_->PostTask(FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady,
|
| + weak_factory_.GetWeakPtr(),
|
| + context_, ready_result));
|
| +}
|
| +
|
| +void SimpleWatcher::OnHandleReady(scoped_refptr<const Context> context,
|
| + MojoResult result) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + // This notification may be for a previously watched context, in which case
|
| + // we just ignore it.
|
| + if (context != context_)
|
| + return;
|
| +
|
| + ReadyCallback callback = callback_;
|
| + if (result == MOJO_RESULT_CANCELLED) {
|
| + // Implicit cancellation due to someone closing the watched handle. We clear
|
| + // the SimppleWatcher's state before dispatching this.
|
| + context_ = nullptr;
|
| + handle_.set_value(kInvalidHandleValue);
|
| + callback_.Reset();
|
| + }
|
| +
|
| + // NOTE: It's legal for |callback| to delete |this|.
|
| + if (!callback.is_null()) {
|
| + TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION event(heap_profiler_tag_);
|
| +
|
| + base::WeakPtr<SimpleWatcher> weak_self = weak_factory_.GetWeakPtr();
|
| + callback.Run(result);
|
| + if (!weak_self)
|
| + return;
|
| +
|
| + if (unsatisfiable_)
|
| + return;
|
| +
|
| + // Prevent |MOJO_RESULT_FAILED_PRECONDITION| task spam by only notifying
|
| + // at most once in AUTOMATIC arming mode.
|
| + if (result == MOJO_RESULT_FAILED_PRECONDITION)
|
| + unsatisfiable_ = true;
|
| +
|
| + if (arming_policy_ == ArmingPolicy::AUTOMATIC && IsWatching())
|
| + ArmOrNotify();
|
| + }
|
| +}
|
| +
|
| +} // namespace mojo
|
|
|