Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(518)

Unified Diff: mojo/public/cpp/system/simple_watcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/cpp/system/simple_watcher.cc
diff --git a/mojo/public/cpp/system/simple_watcher.cc b/mojo/public/cpp/system/simple_watcher.cc
new file mode 100644
index 0000000000000000000000000000000000000000..f99ff3f91f4ff6be71d1ed4e992b6ae8cae47fe3
--- /dev/null
+++ b/mojo/public/cpp/system/simple_watcher.cc
@@ -0,0 +1,276 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/cpp/system/simple_watcher.h"
+
+#include "base/bind.h"
+#include "base/macros.h"
+#include "base/memory/ptr_util.h"
+#include "base/single_thread_task_runner.h"
+#include "base/synchronization/lock.h"
+#include "base/trace_event/heap_profiler.h"
+#include "mojo/public/c/system/watcher.h"
+
+namespace mojo {
+
+// Thread-safe Context object used to dispatch watch notifications from a
+// arbitrary threads.
+class SimpleWatcher::Context : public base::RefCountedThreadSafe<Context> {
+ public:
+ // Creates a |Context| instance for a new watch on |watcher|, to watch
+ // |handle| for |signals|.
+ static scoped_refptr<Context> Create(
+ base::WeakPtr<SimpleWatcher> watcher,
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner,
+ WatcherHandle watcher_handle,
+ Handle handle,
+ MojoHandleSignals signals,
+ MojoResult* watch_result) {
+ // We use a scoped_refptr<Context> instance as watch context value. This
+ // instance (and thus the Context ref it holds) is effectively owned by the
+ // registered watch. We delete it on cancellation, which is a guaranteed
+ // event.
+ auto context_ref = base::MakeUnique<scoped_refptr<Context>>();
+ *context_ref = new Context(watcher, task_runner,
+ reinterpret_cast<uintptr_t>(context_ref.get()));
+
+ *watch_result = MojoWatch(watcher_handle.value(), handle.value(), signals,
+ (*context_ref)->value());
+ if (*watch_result != MOJO_RESULT_OK)
+ return nullptr;
+
+ // Ownership of the ref has been transferred into the successfully
+ // registered watch.
+ return *context_ref.release();
+ }
+
+ static void CallNotify(uintptr_t context_value,
+ MojoResult result,
+ MojoHandleSignalsState signals_state,
+ MojoWatcherNotificationFlags flags) {
+ auto* context_ref =
+ reinterpret_cast<scoped_refptr<Context>*>(context_value);
+ (*context_ref)->Notify(result, signals_state, flags);
+
+ // That was the last notification for the context. We can delete the ref
+ // owned by the watch.
+ if (result == MOJO_RESULT_CANCELLED)
+ delete context_ref;
yzshen1 2017/03/13 20:21:18 Does it make sense to use Context* as the |context
Ken Rockot(use gerrit already) 2017/03/13 22:00:10 Sure, that's effectively equivalent to this, but I
+ }
+
+ uintptr_t value() const { return context_value_; }
+
+ void DisableCancellationNotifications() {
+ base::AutoLock lock(lock_);
+ enable_cancellation_notifications_ = false;
+ }
+
+ private:
+ friend class base::RefCountedThreadSafe<Context>;
+
+ Context(base::WeakPtr<SimpleWatcher> weak_watcher,
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner,
+ uintptr_t context_value)
+ : weak_watcher_(weak_watcher),
+ task_runner_(task_runner),
+ context_value_(context_value) {}
+ ~Context() {}
+
+ void Notify(MojoResult result,
+ MojoHandleSignalsState signals_state,
+ MojoWatcherNotificationFlags flags) {
+ if (result == MOJO_RESULT_CANCELLED) {
+ // The SimpleWatcher may have explicitly cancelled this watch, so we don't
+ // bother dispatching the notification - it would be ignored anyway.
+ //
+ // TODO(rockot): This shouldn't really be necessary, but there are already
+ // instances today where bindings object may be bound and subsequently
+ // closed due to pipe error, all before the thread's TaskRunner has been
+ // properly initialized.
+ base::AutoLock lock(lock_);
+ if (!enable_cancellation_notifications_)
+ return;
+ }
+
+ if ((flags & MOJO_WATCHER_NOTIFICATION_FLAG_FROM_SYSTEM) &&
+ task_runner_->RunsTasksOnCurrentThread() && weak_watcher_ &&
+ weak_watcher_->is_default_task_runner_) {
+ // System notifications will trigger from the task runner passed to
+ // mojo::edk::InitIPCSupport(). In Chrome this happens to always be the
+ // default task runner for the IO thread.
+ weak_watcher_->OnHandleReady(make_scoped_refptr(this), result);
+ } else {
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady, weak_watcher_,
+ make_scoped_refptr(this), result));
+ }
+ }
+
+ const base::WeakPtr<SimpleWatcher> weak_watcher_;
+ const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
+ const uintptr_t context_value_;
+
+ base::Lock lock_;
+ bool enable_cancellation_notifications_ = true;
+
+ DISALLOW_COPY_AND_ASSIGN(Context);
+};
+
+SimpleWatcher::SimpleWatcher(const tracked_objects::Location& from_here,
+ ArmingPolicy arming_policy,
+ scoped_refptr<base::SingleThreadTaskRunner> runner)
+ : arming_policy_(arming_policy),
+ task_runner_(std::move(runner)),
+ is_default_task_runner_(task_runner_ ==
+ base::ThreadTaskRunnerHandle::Get()),
+ heap_profiler_tag_(from_here.file_name()),
+ weak_factory_(this) {
+ MojoResult rv = CreateWatcher(&Context::CallNotify, &watcher_handle_);
+ DCHECK_EQ(MOJO_RESULT_OK, rv);
+ DCHECK(task_runner_->BelongsToCurrentThread());
+}
+
+SimpleWatcher::~SimpleWatcher() {
+ if (IsWatching())
+ Cancel();
+}
+
+bool SimpleWatcher::IsWatching() const {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ return context_ != nullptr;
+}
+
+MojoResult SimpleWatcher::Watch(Handle handle,
+ MojoHandleSignals signals,
+ const ReadyCallback& callback) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(!IsWatching());
+ DCHECK(!callback.is_null());
+
+ callback_ = callback;
+ handle_ = handle;
+
+ MojoResult watch_result = MOJO_RESULT_UNKNOWN;
+ context_ =
+ Context::Create(weak_factory_.GetWeakPtr(), task_runner_,
+ watcher_handle_.get(), handle_, signals, &watch_result);
+ if (!context_) {
+ handle_.set_value(kInvalidHandleValue);
+ callback_.Reset();
+ DCHECK_EQ(MOJO_RESULT_INVALID_ARGUMENT, watch_result);
+ return watch_result;
+ }
+
+ if (arming_policy_ == ArmingPolicy::AUTOMATIC)
+ ArmOrNotify();
+
+ return MOJO_RESULT_OK;
+}
+
+void SimpleWatcher::Cancel() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // The watcher may have already been cancelled if the handle was closed.
+ if (!context_)
+ return;
+
+ context_->DisableCancellationNotifications();
+
+ handle_.set_value(kInvalidHandleValue);
+ callback_.Reset();
+
+ // Ensure |context_| is unset by the time we call MojoCancelWatch, as may
+ // re-enter the notification callback and we want to ensure |context_| is
+ // unset by then. This prevents the cancellation notification from reaching
yzshen1 2017/03/13 20:21:18 Is the last sentence a comment for line 178 instea
Ken Rockot(use gerrit already) 2017/03/13 22:00:10 Ah good catch. It used to be accurate here, but I
+ // OnHandleReady() when cancellation is explicit.
+ scoped_refptr<Context> context;
+ std::swap(context, context_);
+ MojoResult rv =
+ MojoCancelWatch(watcher_handle_.get().value(), context->value());
+
+ // It's possible this cancellation could race with a handle closure
+ // notification, in which case the watch may have already been implicitly
+ // cancelled.
+ DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
+}
+
+MojoResult SimpleWatcher::Arm(MojoResult* ready_result) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ uint32_t num_ready_contexts = 1;
+ uintptr_t ready_context;
+ MojoResult local_ready_result;
+ MojoHandleSignalsState ready_state;
+ MojoResult rv =
+ MojoArmWatcher(watcher_handle_.get().value(), &num_ready_contexts,
+ &ready_context, &local_ready_result, &ready_state);
+ if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
+ DCHECK(context_);
+ DCHECK_EQ(1u, num_ready_contexts);
+ DCHECK_EQ(context_->value(), ready_context);
+ if (ready_result)
+ *ready_result = local_ready_result;
+ }
+
+ return rv;
+}
+
+void SimpleWatcher::ArmOrNotify() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // Already cancelled, nothing to do.
+ if (!IsWatching())
+ return;
+
+ MojoResult ready_result;
+ MojoResult rv = Arm(&ready_result);
+ if (rv == MOJO_RESULT_OK)
+ return;
+
+ DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, rv);
+ task_runner_->PostTask(FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady,
+ weak_factory_.GetWeakPtr(),
+ context_, ready_result));
+}
+
+void SimpleWatcher::OnHandleReady(scoped_refptr<const Context> context,
+ MojoResult result) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // This notification may be for a previously watched context, in which case
+ // we just ignore it.
+ if (context != context_)
+ return;
+
+ ReadyCallback callback = callback_;
+ if (result == MOJO_RESULT_CANCELLED) {
+ // Implicit cancellation due to someone closing the watched handle. We clear
+ // the SimppleWatcher's state before dispatching this.
+ context_ = nullptr;
+ handle_.set_value(kInvalidHandleValue);
+ callback_.Reset();
+ }
+
+ // NOTE: It's legal for |callback| to delete |this|.
+ if (!callback.is_null()) {
+ TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION event(heap_profiler_tag_);
+
+ base::WeakPtr<SimpleWatcher> weak_self = weak_factory_.GetWeakPtr();
+ callback.Run(result);
+ if (!weak_self)
+ return;
+
+ if (unsatisfiable_)
+ return;
+
+ // Prevent |MOJO_RESULT_FAILED_PRECONDITION| task spam by only notifying
+ // at most once in AUTOMATIC arming mode.
+ if (result == MOJO_RESULT_FAILED_PRECONDITION)
+ unsatisfiable_ = true;
+
+ if (arming_policy_ == ArmingPolicy::AUTOMATIC && IsWatching())
+ ArmOrNotify();
+ }
+}
+
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698