Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(537)

Unified Diff: mojo/edk/system/watcher_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/watcher_dispatcher.cc
diff --git a/mojo/edk/system/watcher_dispatcher.cc b/mojo/edk/system/watcher_dispatcher.cc
new file mode 100644
index 0000000000000000000000000000000000000000..93ee18b073e80e4af52d77ac557a363563ad4292
--- /dev/null
+++ b/mojo/edk/system/watcher_dispatcher.cc
@@ -0,0 +1,374 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/watcher_dispatcher.h"
+
+#include <limits>
+#include <map>
+
+#include "base/macros.h"
+#include "base/memory/ptr_util.h"
+#include "mojo/edk/system/watch.h"
+
+namespace mojo {
+namespace edk {
+
+// Helper to track contexts relevant to a specific handle. This is used to avoid
+// redundant lookups when notifying interested watches about a handle change.
+//
+// Not thread-safe: Should only be accessed while the dispatcher holds its lock,
+// with the exception that during watch removal these may be moved out of the
+// WatcherDispatcher and safely used without locking.
+class WatcherDispatcher::WatchedHandle {
+ public:
+ struct WatchEntry {
+ explicit WatchEntry(const scoped_refptr<Watch>& watch) : watch(watch) {}
+ ~WatchEntry() {}
+
+ // Indicates whether the last computed MojoResult (combinatation of
+ // watched signals and current signal state) indicates that a notification
+ // should happen when our watcher is armed.
+ //
+ // |MOJO_RESULT_CANCELLED| is not included in the set of "ready" results
+ // because cancellation is a one-time event that always fires immediately.
+ bool ready() const {
+ return last_known_result != MOJO_RESULT_SHOULD_WAIT &&
+ last_known_result != MOJO_RESULT_CANCELLED;
+ }
+
+ scoped_refptr<Watch> watch;
+ MojoResult last_known_result = MOJO_RESULT_UNKNOWN;
+ };
+
+ using WatchMap = std::map<uintptr_t, std::unique_ptr<WatchEntry>>;
+
+ explicit WatchedHandle(const scoped_refptr<Dispatcher>& dispatcher)
+ : dispatcher_(dispatcher) {}
+ ~WatchedHandle() {}
+
+ void AddWatch(uintptr_t context, const scoped_refptr<Watch>& watch) {
+ auto result = watches_.insert(
+ std::make_pair(context, base::MakeUnique<WatchEntry>(watch)));
+ DCHECK(result.second);
+ }
+
+ std::unique_ptr<WatchEntry> RemoveWatch(uintptr_t context) {
+ auto it = watches_.find(context);
+ DCHECK(it != watches_.end());
+
+ std::unique_ptr<WatchEntry> entry = std::move(it->second);
+ watches_.erase(it);
+ return entry;
+ }
+
+ bool NotifyState(const HandleSignalsState& state,
+ bool allowed_to_call_callback) {
+ last_known_signals_state_ = state;
+ bool handle_ready = false;
+ for (auto& entry : watches_) {
+ MojoResult rv =
+ entry.second->watch->NotifyState(state, allowed_to_call_callback);
+ if (rv != MOJO_RESULT_SHOULD_WAIT && rv != MOJO_RESULT_CANCELLED)
+ handle_ready |= true;
+ entry.second->last_known_result = rv;
+ }
+ return handle_ready;
+ }
+
+ void CancelAllWatches() {
+ for (auto& entry : watches_)
+ entry.second->watch->Cancel();
+ }
+
+ // Determines if any watch on this handle is ready.
+ bool IsAnyWatchReady() const {
+ for (auto& entry : watches_)
+ if (entry.second->ready())
+ return true;
+ return false;
+ }
+
+ const WatchMap& watches() const { return watches_; }
+
+ // Stores at most |max_contexts| elements at the output locations, regarding
+ // ready contexts associated with this handle. Returns the number of elements
+ // actually stored.
+ //
+ // This is used to implement WatcherDispatcher::Arm()'s already-satisfied/
+ // already-unsatisfiable failure mode by filling in a user-provided buffer
+ // with as much information as we can fit.
+ uint32_t StoreReadyContextState(uint32_t max_contexts,
+ uintptr_t* out_contexts,
+ MojoResult* out_results,
+ MojoHandleSignalsState* out_states) {
+ uint32_t num_contexts = 0;
+ for (auto& entry : watches_) {
+ if (!max_contexts)
+ return num_contexts;
+
+ if (entry.second->last_known_result == MOJO_RESULT_OK ||
+ entry.second->last_known_result == MOJO_RESULT_FAILED_PRECONDITION) {
+ out_contexts[num_contexts] = entry.first;
+ out_results[num_contexts] = entry.second->last_known_result;
+ out_states[num_contexts] =
+ static_cast<MojoHandleSignalsState>(last_known_signals_state_);
+ num_contexts++;
+ max_contexts--;
+ }
+ }
+
+ return num_contexts;
+ }
+
+ private:
+ // The dispatcher to which all these Watch instances belong.
+ const scoped_refptr<Dispatcher> dispatcher_;
+
+ // A ref to every Watch actively watching |dispatcher_| on behalf of our
+ // owning watcher.
+ WatchMap watches_;
+
+ // The last known state of |dispatcher_|. This is tracked for quick access in
+ // StoreReadyContextState().
+ HandleSignalsState last_known_signals_state_ = {0, 0};
+
+ DISALLOW_COPY_AND_ASSIGN(WatchedHandle);
+};
+
+WatcherDispatcher::WatcherDispatcher(MojoWatcherCallback callback)
+ : callback_(callback) {}
+
+void WatcherDispatcher::NotifyHandleState(Dispatcher* dispatcher,
+ const HandleSignalsState& state) {
+ base::AutoLock lock(lock_);
+ auto it = watched_handles_.find(dispatcher);
+ if (it == watched_handles_.end())
+ return;
+
+ // Maybe fire a notification to one or more watches assoicated with the
+ // dispatcher, provided we're armed and at least one of the watches cares
+ // about the new state.
+ if (it->second->NotifyState(state, armed_)) {
+ ready_handles_.insert(it->second.get());
+
+ // If we were armed, we also notified watchers. Ensure we're disarmed now.
+ armed_ = false;
+ } else {
+ ready_handles_.erase(it->second.get());
+ }
+}
+
+void WatcherDispatcher::NotifyHandleClosed(Dispatcher* dispatcher) {
+ std::unique_ptr<WatchedHandle> handle;
+ {
+ base::AutoLock lock(lock_);
+ auto it = watched_handles_.find(dispatcher);
+ if (it == watched_handles_.end())
+ return;
+
+ handle = std::move(it->second);
+
+ // Wipe out all watches associated with the closed dispatcher.
+ for (auto& entry : handle->watches())
+ watches_.erase(entry.first);
+ ready_handles_.erase(handle.get());
+ watched_handles_.erase(it);
+ }
+
+ // NOTE: It's important that this is called outside of |lock_| since it
+ // acquires internal Watch locks.
+ handle->CancelAllWatches();
+}
+
+void WatcherDispatcher::InvokeWatchCallback(
+ uintptr_t context,
+ MojoResult result,
+ const HandleSignalsState& state,
+ MojoWatcherNotificationFlags flags) {
+ {
+ // We avoid holding the lock during dispatch. It's OK for notification
+ // callbacks to close this watcher, and it's OK for notifications to race
+ // with closure, if for example the watcher is closed from another thread
+ // between this test and the invocation of |callback_| below.
+ //
+ // Because cancellation synchronously blocks all future notifications, and
+ // because notifications themselves are mutually exclusive for any given
+ // context, we still guarantee that a single MOJO_RESULT_CANCELLED result
+ // is the last notification received for any given context.
+ //
+ // This guarantee is sufficient to make safe, synchronized, per-context
+ // state management possible in user code.
+ base::AutoLock lock(lock_);
+ if (closed_ && result != MOJO_RESULT_CANCELLED)
+ return;
+ }
+
+ callback_(context, result, static_cast<MojoHandleSignalsState>(state), flags);
+}
+
+Dispatcher::Type WatcherDispatcher::GetType() const {
+ return Type::WATCHER;
+}
+
+MojoResult WatcherDispatcher::Close() {
+ // We swap out all the watched handle information onto the stack so we can
+ // call into their dispatchers without our own lock held.
+ std::map<uintptr_t, scoped_refptr<Watch>> watches;
+ {
+ base::AutoLock lock(lock_);
+ DCHECK(!closed_);
+ closed_ = true;
+ std::swap(watches, watches_);
+ watched_handles_.clear();
+ }
+
+ // Remove all refs from our watched dispatchers and fire cancellations.
+ for (auto& entry : watches) {
+ entry.second->dispatcher()->RemoveWatcherRef(this, entry.first);
+ entry.second->Cancel();
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WatcherDispatcher::WatchDispatcher(
+ scoped_refptr<Dispatcher> dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context) {
+ // NOTE: Because it's critical to avoid acquiring any other dispatcher locks
+ // while |lock_| is held, we defer adding oursevles to the dispatcher until
+ // after we've updated all our own relevant state and released |lock_|.
+ {
+ base::AutoLock lock(lock_);
+ if (watches_.find(context) != watches_.end())
+ return MOJO_RESULT_ALREADY_EXISTS;
+
+ auto it = watched_handles_.find(dispatcher.get());
+ if (it == watched_handles_.end()) {
+ auto result = watched_handles_.insert(std::make_pair(
+ dispatcher.get(), base::MakeUnique<WatchedHandle>(dispatcher)));
+ DCHECK(result.second);
+ it = result.first;
+ }
+
+ scoped_refptr<Watch> watch = new Watch(this, dispatcher, context, signals);
+ watches_.insert({context, watch});
+ it->second->AddWatch(context, watch);
+ }
+
+ MojoResult rv = dispatcher->AddWatcherRef(this, context);
+ if (rv != MOJO_RESULT_OK) {
+ // Oops. This was not a valid handle to watch. Undo the above work and
+ // fail gracefully. Note that other watches may have been added for the
+ // handle since we last held the lock, so we can't necessarily just wipe
+ // out the |watched_handles_| entry.
+ base::AutoLock lock(lock_);
+ watches_.erase(context);
+
+ // Also note that it's possible for someone to have closed this
+ // WatcherDispatcher since we last held |lock_|, so the entry might already
+ // be gone.
+ auto it = watched_handles_.find(dispatcher.get());
+ if (it != watched_handles_.end()) {
+ it->second->RemoveWatch(context);
+ if (it->second->watches().empty())
+ watched_handles_.erase(dispatcher.get());
+ }
+
+ return rv;
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WatcherDispatcher::CancelWatch(uintptr_t context) {
+ // We may remove the last stored ref to the Watch below, so we retain
+ // a reference on the stack.
+ scoped_refptr<Watch> watch;
+ {
+ base::AutoLock lock(lock_);
+ auto it = watches_.find(context);
+ if (it == watches_.end())
+ return MOJO_RESULT_NOT_FOUND;
+ watch = it->second;
+ watches_.erase(it);
+ }
+
+ // Mark the watch as cancelled so no further notifications get through.
+ watch->Cancel();
+
+ // We remove the watcher ref for this context before updating any more
+ // internal watcher state, ensuring that we don't receiving further
+ // notifications for this context.
+ watch->dispatcher()->RemoveWatcherRef(this, context);
+
+ {
+ base::AutoLock lock(lock_);
+ auto handle_it = watched_handles_.find(watch->dispatcher().get());
+ DCHECK(handle_it != watched_handles_.end());
+
+ std::unique_ptr<WatchedHandle::WatchEntry> entry =
+ handle_it->second->RemoveWatch(context);
+
+ if (handle_it->second->watches().empty()) {
+ // This was the only remaining watch we had for its dispatcher, so we
+ // can wipe out all knowledge of that dispatcher.
+ ready_handles_.erase(handle_it->second.get());
+ watched_handles_.erase(handle_it);
+ } else if (entry->ready() && !handle_it->second->IsAnyWatchReady()) {
+ // There are other watches for this dispatcher, but this was the only one
+ // keeping it in a "ready" state. Remove the handle from the ready-set.
+ ready_handles_.erase(handle_it->second.get());
+ }
+ }
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WatcherDispatcher::Arm(
+ uint32_t* num_ready_contexts,
+ uintptr_t* ready_contexts,
+ MojoResult* ready_results,
+ MojoHandleSignalsState* ready_signals_states) {
+ base::AutoLock lock(lock_);
+ if (num_ready_contexts &&
+ (!ready_contexts || !ready_results || !ready_signals_states)) {
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ }
+
+ if (watched_handles_.empty())
+ return MOJO_RESULT_NOT_FOUND;
+
+ if (ready_handles_.empty()) {
+ // Fast path: No handles are ready to notify, so we're done.
+ armed_ = true;
+ return MOJO_RESULT_OK;
+ }
+
+ if (num_ready_contexts) {
+ uint32_t max_ready_contexts = *num_ready_contexts;
+ *num_ready_contexts = 0;
+ for (auto& handles : watched_handles_) {
yzshen1 2017/03/11 00:44:58 nit: handles -> handle?
Ken Rockot(use gerrit already) 2017/03/12 22:24:13 Done
+ if (max_ready_contexts == 0)
+ break;
+ uint32_t n = handles.second->StoreReadyContextState(
+ max_ready_contexts, ready_contexts, ready_results,
+ ready_signals_states);
+ DCHECK_GE(max_ready_contexts, n);
+
+ *num_ready_contexts += n;
+ ready_contexts += n;
+ ready_results += n;
+ ready_signals_states += n;
+ max_ready_contexts -= n;
+ }
+ }
+
+ return MOJO_RESULT_FAILED_PRECONDITION;
+}
+
+WatcherDispatcher::~WatcherDispatcher() {}
+
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698