Index: mojo/public/cpp/system/simple_watcher.cc |
diff --git a/mojo/public/cpp/system/simple_watcher.cc b/mojo/public/cpp/system/simple_watcher.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f99ff3f91f4ff6be71d1ed4e992b6ae8cae47fe3 |
--- /dev/null |
+++ b/mojo/public/cpp/system/simple_watcher.cc |
@@ -0,0 +1,276 @@ |
+// Copyright 2017 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/public/cpp/system/simple_watcher.h" |
+ |
+#include "base/bind.h" |
+#include "base/macros.h" |
+#include "base/memory/ptr_util.h" |
+#include "base/single_thread_task_runner.h" |
+#include "base/synchronization/lock.h" |
+#include "base/trace_event/heap_profiler.h" |
+#include "mojo/public/c/system/watcher.h" |
+ |
+namespace mojo { |
+ |
+// Thread-safe Context object used to dispatch watch notifications from a |
+// arbitrary threads. |
+class SimpleWatcher::Context : public base::RefCountedThreadSafe<Context> { |
+ public: |
+ // Creates a |Context| instance for a new watch on |watcher|, to watch |
+ // |handle| for |signals|. |
+ static scoped_refptr<Context> Create( |
+ base::WeakPtr<SimpleWatcher> watcher, |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner, |
+ WatcherHandle watcher_handle, |
+ Handle handle, |
+ MojoHandleSignals signals, |
+ MojoResult* watch_result) { |
+ // We use a scoped_refptr<Context> instance as watch context value. This |
+ // instance (and thus the Context ref it holds) is effectively owned by the |
+ // registered watch. We delete it on cancellation, which is a guaranteed |
+ // event. |
+ auto context_ref = base::MakeUnique<scoped_refptr<Context>>(); |
+ *context_ref = new Context(watcher, task_runner, |
+ reinterpret_cast<uintptr_t>(context_ref.get())); |
+ |
+ *watch_result = MojoWatch(watcher_handle.value(), handle.value(), signals, |
+ (*context_ref)->value()); |
+ if (*watch_result != MOJO_RESULT_OK) |
+ return nullptr; |
+ |
+ // Ownership of the ref has been transferred into the successfully |
+ // registered watch. |
+ return *context_ref.release(); |
+ } |
+ |
+ static void CallNotify(uintptr_t context_value, |
+ MojoResult result, |
+ MojoHandleSignalsState signals_state, |
+ MojoWatcherNotificationFlags flags) { |
+ auto* context_ref = |
+ reinterpret_cast<scoped_refptr<Context>*>(context_value); |
+ (*context_ref)->Notify(result, signals_state, flags); |
+ |
+ // That was the last notification for the context. We can delete the ref |
+ // owned by the watch. |
+ if (result == MOJO_RESULT_CANCELLED) |
+ delete context_ref; |
yzshen1
2017/03/13 20:21:18
Does it make sense to use Context* as the |context
Ken Rockot(use gerrit already)
2017/03/13 22:00:10
Sure, that's effectively equivalent to this, but I
|
+ } |
+ |
+ uintptr_t value() const { return context_value_; } |
+ |
+ void DisableCancellationNotifications() { |
+ base::AutoLock lock(lock_); |
+ enable_cancellation_notifications_ = false; |
+ } |
+ |
+ private: |
+ friend class base::RefCountedThreadSafe<Context>; |
+ |
+ Context(base::WeakPtr<SimpleWatcher> weak_watcher, |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner, |
+ uintptr_t context_value) |
+ : weak_watcher_(weak_watcher), |
+ task_runner_(task_runner), |
+ context_value_(context_value) {} |
+ ~Context() {} |
+ |
+ void Notify(MojoResult result, |
+ MojoHandleSignalsState signals_state, |
+ MojoWatcherNotificationFlags flags) { |
+ if (result == MOJO_RESULT_CANCELLED) { |
+ // The SimpleWatcher may have explicitly cancelled this watch, so we don't |
+ // bother dispatching the notification - it would be ignored anyway. |
+ // |
+ // TODO(rockot): This shouldn't really be necessary, but there are already |
+ // instances today where bindings object may be bound and subsequently |
+ // closed due to pipe error, all before the thread's TaskRunner has been |
+ // properly initialized. |
+ base::AutoLock lock(lock_); |
+ if (!enable_cancellation_notifications_) |
+ return; |
+ } |
+ |
+ if ((flags & MOJO_WATCHER_NOTIFICATION_FLAG_FROM_SYSTEM) && |
+ task_runner_->RunsTasksOnCurrentThread() && weak_watcher_ && |
+ weak_watcher_->is_default_task_runner_) { |
+ // System notifications will trigger from the task runner passed to |
+ // mojo::edk::InitIPCSupport(). In Chrome this happens to always be the |
+ // default task runner for the IO thread. |
+ weak_watcher_->OnHandleReady(make_scoped_refptr(this), result); |
+ } else { |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady, weak_watcher_, |
+ make_scoped_refptr(this), result)); |
+ } |
+ } |
+ |
+ const base::WeakPtr<SimpleWatcher> weak_watcher_; |
+ const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
+ const uintptr_t context_value_; |
+ |
+ base::Lock lock_; |
+ bool enable_cancellation_notifications_ = true; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(Context); |
+}; |
+ |
+SimpleWatcher::SimpleWatcher(const tracked_objects::Location& from_here, |
+ ArmingPolicy arming_policy, |
+ scoped_refptr<base::SingleThreadTaskRunner> runner) |
+ : arming_policy_(arming_policy), |
+ task_runner_(std::move(runner)), |
+ is_default_task_runner_(task_runner_ == |
+ base::ThreadTaskRunnerHandle::Get()), |
+ heap_profiler_tag_(from_here.file_name()), |
+ weak_factory_(this) { |
+ MojoResult rv = CreateWatcher(&Context::CallNotify, &watcher_handle_); |
+ DCHECK_EQ(MOJO_RESULT_OK, rv); |
+ DCHECK(task_runner_->BelongsToCurrentThread()); |
+} |
+ |
+SimpleWatcher::~SimpleWatcher() { |
+ if (IsWatching()) |
+ Cancel(); |
+} |
+ |
+bool SimpleWatcher::IsWatching() const { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ return context_ != nullptr; |
+} |
+ |
+MojoResult SimpleWatcher::Watch(Handle handle, |
+ MojoHandleSignals signals, |
+ const ReadyCallback& callback) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(!IsWatching()); |
+ DCHECK(!callback.is_null()); |
+ |
+ callback_ = callback; |
+ handle_ = handle; |
+ |
+ MojoResult watch_result = MOJO_RESULT_UNKNOWN; |
+ context_ = |
+ Context::Create(weak_factory_.GetWeakPtr(), task_runner_, |
+ watcher_handle_.get(), handle_, signals, &watch_result); |
+ if (!context_) { |
+ handle_.set_value(kInvalidHandleValue); |
+ callback_.Reset(); |
+ DCHECK_EQ(MOJO_RESULT_INVALID_ARGUMENT, watch_result); |
+ return watch_result; |
+ } |
+ |
+ if (arming_policy_ == ArmingPolicy::AUTOMATIC) |
+ ArmOrNotify(); |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void SimpleWatcher::Cancel() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ // The watcher may have already been cancelled if the handle was closed. |
+ if (!context_) |
+ return; |
+ |
+ context_->DisableCancellationNotifications(); |
+ |
+ handle_.set_value(kInvalidHandleValue); |
+ callback_.Reset(); |
+ |
+ // Ensure |context_| is unset by the time we call MojoCancelWatch, as may |
+ // re-enter the notification callback and we want to ensure |context_| is |
+ // unset by then. This prevents the cancellation notification from reaching |
yzshen1
2017/03/13 20:21:18
Is the last sentence a comment for line 178 instea
Ken Rockot(use gerrit already)
2017/03/13 22:00:10
Ah good catch. It used to be accurate here, but I
|
+ // OnHandleReady() when cancellation is explicit. |
+ scoped_refptr<Context> context; |
+ std::swap(context, context_); |
+ MojoResult rv = |
+ MojoCancelWatch(watcher_handle_.get().value(), context->value()); |
+ |
+ // It's possible this cancellation could race with a handle closure |
+ // notification, in which case the watch may have already been implicitly |
+ // cancelled. |
+ DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND); |
+} |
+ |
+MojoResult SimpleWatcher::Arm(MojoResult* ready_result) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ uint32_t num_ready_contexts = 1; |
+ uintptr_t ready_context; |
+ MojoResult local_ready_result; |
+ MojoHandleSignalsState ready_state; |
+ MojoResult rv = |
+ MojoArmWatcher(watcher_handle_.get().value(), &num_ready_contexts, |
+ &ready_context, &local_ready_result, &ready_state); |
+ if (rv == MOJO_RESULT_FAILED_PRECONDITION) { |
+ DCHECK(context_); |
+ DCHECK_EQ(1u, num_ready_contexts); |
+ DCHECK_EQ(context_->value(), ready_context); |
+ if (ready_result) |
+ *ready_result = local_ready_result; |
+ } |
+ |
+ return rv; |
+} |
+ |
+void SimpleWatcher::ArmOrNotify() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ // Already cancelled, nothing to do. |
+ if (!IsWatching()) |
+ return; |
+ |
+ MojoResult ready_result; |
+ MojoResult rv = Arm(&ready_result); |
+ if (rv == MOJO_RESULT_OK) |
+ return; |
+ |
+ DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, rv); |
+ task_runner_->PostTask(FROM_HERE, base::Bind(&SimpleWatcher::OnHandleReady, |
+ weak_factory_.GetWeakPtr(), |
+ context_, ready_result)); |
+} |
+ |
+void SimpleWatcher::OnHandleReady(scoped_refptr<const Context> context, |
+ MojoResult result) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ // This notification may be for a previously watched context, in which case |
+ // we just ignore it. |
+ if (context != context_) |
+ return; |
+ |
+ ReadyCallback callback = callback_; |
+ if (result == MOJO_RESULT_CANCELLED) { |
+ // Implicit cancellation due to someone closing the watched handle. We clear |
+ // the SimppleWatcher's state before dispatching this. |
+ context_ = nullptr; |
+ handle_.set_value(kInvalidHandleValue); |
+ callback_.Reset(); |
+ } |
+ |
+ // NOTE: It's legal for |callback| to delete |this|. |
+ if (!callback.is_null()) { |
+ TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION event(heap_profiler_tag_); |
+ |
+ base::WeakPtr<SimpleWatcher> weak_self = weak_factory_.GetWeakPtr(); |
+ callback.Run(result); |
+ if (!weak_self) |
+ return; |
+ |
+ if (unsatisfiable_) |
+ return; |
+ |
+ // Prevent |MOJO_RESULT_FAILED_PRECONDITION| task spam by only notifying |
+ // at most once in AUTOMATIC arming mode. |
+ if (result == MOJO_RESULT_FAILED_PRECONDITION) |
+ unsatisfiable_ = true; |
+ |
+ if (arming_policy_ == ArmingPolicy::AUTOMATIC && IsWatching()) |
+ ArmOrNotify(); |
+ } |
+} |
+ |
+} // namespace mojo |