Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(440)

Unified Diff: mojo/edk/system/watcher_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/watcher_dispatcher.cc
diff --git a/mojo/edk/system/watcher_dispatcher.cc b/mojo/edk/system/watcher_dispatcher.cc
new file mode 100644
index 0000000000000000000000000000000000000000..e4cc875580ad3f82322cf6559dc3f4ac677d1e71
--- /dev/null
+++ b/mojo/edk/system/watcher_dispatcher.cc
@@ -0,0 +1,219 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/watcher_dispatcher.h"
+
+#include <limits>
+#include <map>
+
+#include "base/macros.h"
+#include "base/memory/ptr_util.h"
+#include "mojo/edk/system/watch.h"
+
+namespace mojo {
+namespace edk {
+
+WatcherDispatcher::WatcherDispatcher(MojoWatcherCallback callback)
+ : callback_(callback) {}
+
+void WatcherDispatcher::NotifyHandleState(Dispatcher* dispatcher,
+ const HandleSignalsState& state) {
+ base::AutoLock lock(lock_);
+ auto it = watched_handles_.find(dispatcher);
+ if (it == watched_handles_.end())
+ return;
+
+ // Maybe fire a notification to the watch assoicated with this dispatcher,
+ // provided we're armed it cares about the new state.
+ if (it->second->NotifyState(state, armed_)) {
+ ready_watches_.insert(it->second.get());
+
+ // If we were armed and got here, we notified the watch. Disarm.
+ armed_ = false;
+ } else {
+ ready_watches_.erase(it->second.get());
+ }
+}
+
+void WatcherDispatcher::NotifyHandleClosed(Dispatcher* dispatcher) {
+ scoped_refptr<Watch> watch;
+ {
+ base::AutoLock lock(lock_);
+ auto it = watched_handles_.find(dispatcher);
+ if (it == watched_handles_.end())
+ return;
+
+ watch = std::move(it->second);
+
+ // Wipe out all state associated with the closed dispatcher.
+ watches_.erase(watch->context());
+ ready_watches_.erase(watch.get());
+ watched_handles_.erase(it);
+ }
+
+ // NOTE: It's important that this is called outside of |lock_| since it
+ // acquires internal Watch locks.
+ watch->Cancel();
+}
+
+void WatcherDispatcher::InvokeWatchCallback(
+ uintptr_t context,
+ MojoResult result,
+ const HandleSignalsState& state,
+ MojoWatcherNotificationFlags flags) {
+ {
+ // We avoid holding the lock during dispatch. It's OK for notification
+ // callbacks to close this watcher, and it's OK for notifications to race
+ // with closure, if for example the watcher is closed from another thread
+ // between this test and the invocation of |callback_| below.
+ //
+ // Because cancellation synchronously blocks all future notifications, and
+ // because notifications themselves are mutually exclusive for any given
+ // context, we still guarantee that a single MOJO_RESULT_CANCELLED result
+ // is the last notification received for any given context.
+ //
+ // This guarantee is sufficient to make safe, synchronized, per-context
+ // state management possible in user code.
+ base::AutoLock lock(lock_);
+ if (closed_ && result != MOJO_RESULT_CANCELLED)
+ return;
+ }
+
+ callback_(context, result, static_cast<MojoHandleSignalsState>(state), flags);
+}
+
+Dispatcher::Type WatcherDispatcher::GetType() const {
+ return Type::WATCHER;
+}
+
+MojoResult WatcherDispatcher::Close() {
+ // We swap out all the watched handle information onto the stack so we can
+ // call into their dispatchers without our own lock held.
+ std::map<uintptr_t, scoped_refptr<Watch>> watches;
+ {
+ base::AutoLock lock(lock_);
+ DCHECK(!closed_);
+ closed_ = true;
+ std::swap(watches, watches_);
+ watched_handles_.clear();
+ }
+
+ // Remove all refs from our watched dispatchers and fire cancellations.
+ for (auto& entry : watches) {
+ entry.second->dispatcher()->RemoveWatcherRef(this, entry.first);
+ entry.second->Cancel();
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WatcherDispatcher::WatchDispatcher(
+ scoped_refptr<Dispatcher> dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context) {
+ // NOTE: Because it's critical to avoid acquiring any other dispatcher locks
+ // while |lock_| is held, we defer adding oursevles to the dispatcher until
+ // after we've updated all our own relevant state and released |lock_|.
+ {
+ base::AutoLock lock(lock_);
+ if (watches_.count(context) || watched_handles_.count(dispatcher.get()))
yzshen1 2017/03/13 20:21:18 [just to double check] IIUC, we want |context| to
Ken Rockot(use gerrit already) 2017/03/13 22:00:10 Correct. Unlike the WaitSet API, I didn't want wat
+ return MOJO_RESULT_ALREADY_EXISTS;
+
+ scoped_refptr<Watch> watch = new Watch(this, dispatcher, context, signals);
+ watches_.insert({context, watch});
+ auto result =
+ watched_handles_.insert(std::make_pair(dispatcher.get(), watch));
+ DCHECK(result.second);
+ }
+
+ MojoResult rv = dispatcher->AddWatcherRef(this, context);
+ if (rv != MOJO_RESULT_OK) {
+ // Oops. This was not a valid handle to watch. Undo the above work and
+ // fail gracefully.
+ base::AutoLock lock(lock_);
+ watches_.erase(context);
+ watched_handles_.erase(dispatcher.get());
+ return rv;
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WatcherDispatcher::CancelWatch(uintptr_t context) {
+ // We may remove the last stored ref to the Watch below, so we retain
+ // a reference on the stack.
+ scoped_refptr<Watch> watch;
+ {
+ base::AutoLock lock(lock_);
+ auto it = watches_.find(context);
+ if (it == watches_.end())
+ return MOJO_RESULT_NOT_FOUND;
+ watch = it->second;
+ watches_.erase(it);
+ }
+
+ // Mark the watch as cancelled so no further notifications get through.
+ watch->Cancel();
+
+ // We remove the watcher ref for this context before updating any more
+ // internal watcher state, ensuring that we don't receiving further
+ // notifications for this context.
+ watch->dispatcher()->RemoveWatcherRef(this, context);
+
+ {
+ base::AutoLock lock(lock_);
+ auto handle_it = watched_handles_.find(watch->dispatcher().get());
+ DCHECK(handle_it != watched_handles_.end());
+ ready_watches_.erase(handle_it->second.get());
+ watched_handles_.erase(handle_it);
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WatcherDispatcher::Arm(
+ uint32_t* num_ready_contexts,
+ uintptr_t* ready_contexts,
+ MojoResult* ready_results,
+ MojoHandleSignalsState* ready_signals_states) {
+ base::AutoLock lock(lock_);
+ if (num_ready_contexts &&
+ (!ready_contexts || !ready_results || !ready_signals_states)) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+
+ if (watched_handles_.empty())
+ return MOJO_RESULT_NOT_FOUND;
+
+ if (ready_watches_.empty()) {
+ // Fast path: No watches are ready to notify, so we're done.
+ armed_ = true;
+ return MOJO_RESULT_OK;
+ }
+
+ if (num_ready_contexts) {
+ uint32_t max_ready_contexts = *num_ready_contexts;
+ *num_ready_contexts = 0;
+ for (auto& entry : watched_handles_) {
+ if (max_ready_contexts == *num_ready_contexts)
+ break;
+
+ const Watch* watch = entry.second.get();
+ if (!watch->ready())
+ continue;
+
+ *(ready_contexts++) = watch->context();
+ *(ready_results++) = watch->last_known_result();
+ *(ready_signals_states++) = watch->last_known_signals_state();
+ ++(*num_ready_contexts);
+ }
+ }
+
+ return MOJO_RESULT_FAILED_PRECONDITION;
+}
+
+WatcherDispatcher::~WatcherDispatcher() {}
+
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698