Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4430)

Unified Diff: mojo/public/cpp/system/wait_set.cc

Issue 2744943002: Mojo: Move waiting APIs to public library (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/public/cpp/system/wait_set.h ('k') | mojo/public/cpp/test_support/lib/test_utils.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/public/cpp/system/wait_set.cc
diff --git a/mojo/public/cpp/system/wait_set.cc b/mojo/public/cpp/system/wait_set.cc
new file mode 100644
index 0000000000000000000000000000000000000000..410ec7490706c7b3738521edebf1508cfe6e0d79
--- /dev/null
+++ b/mojo/public/cpp/system/wait_set.cc
@@ -0,0 +1,313 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/public/cpp/system/wait_set.h"
+
+#include <algorithm>
+#include <limits>
+#include <map>
+#include <vector>
+
+#include "base/containers/stack_container.h"
+#include "base/logging.h"
+#include "base/macros.h"
+#include "base/memory/ptr_util.h"
+#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
+#include "mojo/public/cpp/system/watcher.h"
+
+namespace mojo {
+
+class WaitSet::State : public base::RefCountedThreadSafe<State> {
+ public:
+ State()
+ : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED) {
+ MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_);
+ DCHECK_EQ(MOJO_RESULT_OK, rv);
+ }
+
+ void ShutDown() {
+ // NOTE: This may immediately invoke Notify for every context.
+ watcher_handle_.reset();
+ }
+
+ MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
+ DCHECK(watcher_handle_.is_valid());
+
+ scoped_refptr<Context> context = new Context(this, handle);
+
+ {
+ base::AutoLock lock(lock_);
+
+ if (handle_to_context_.count(handle))
+ return MOJO_RESULT_ALREADY_EXISTS;
+ DCHECK(!contexts_.count(context->context_value()));
+
+ handle_to_context_[handle] = context;
+ contexts_[context->context_value()] = context;
+ }
+
+ // Balanced in State::Notify() with MOJO_RESULT_CANCELLED if
+ // MojoWatch() succeeds. Otherwise balanced immediately below.
+ context->AddRef();
+
+ // This can notify immediately if the watcher is already armed. Don't hold
+ // |lock_| while calling it.
+ MojoResult rv = MojoWatch(watcher_handle_.get().value(), handle.value(),
+ signals, context->context_value());
+ if (rv == MOJO_RESULT_INVALID_ARGUMENT) {
+ base::AutoLock lock(lock_);
+ handle_to_context_.erase(handle);
+ contexts_.erase(context->context_value());
+
+ // Balanced above.
+ context->Release();
+ return rv;
+ }
+ DCHECK_EQ(MOJO_RESULT_OK, rv);
+
+ return rv;
+ }
+
+ MojoResult RemoveHandle(Handle handle) {
+ DCHECK(watcher_handle_.is_valid());
+
+ scoped_refptr<Context> context;
+ {
+ base::AutoLock lock(lock_);
+ auto it = handle_to_context_.find(handle);
+ if (it == handle_to_context_.end())
+ return MOJO_RESULT_NOT_FOUND;
+
+ context = std::move(it->second);
+ handle_to_context_.erase(it);
+
+ // Ensure that we never return this handle as a ready result again. Note
+ // that it's removal from |handle_to_context_| above ensures it will never
+ // be added back to this map.
+ ready_handles_.erase(handle);
+ }
+
+ // NOTE: This may enter the notification callback immediately, so don't hold
+ // |lock_| while calling it.
+ MojoResult rv = MojoCancelWatch(watcher_handle_.get().value(),
+ context->context_value());
+
+ // We don't really care whether or not this succeeds. In either case, the
+ // context was or will imminently be cancelled and moved from |contexts_|
+ // to |cancelled_contexts_|.
+ DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
+
+ {
+ // Always clear |cancelled_contexts_| in case it's accumulated any more
+ // entries since the last time we ran.
+ base::AutoLock lock(lock_);
+ cancelled_contexts_.clear();
+ }
+
+ return rv;
+ }
+
+ void Wait(size_t* num_ready_handles,
+ Handle* ready_handles,
+ MojoResult* ready_results,
+ MojoHandleSignalsState* signals_states) {
+ DCHECK(watcher_handle_.is_valid());
+ DCHECK(num_ready_handles);
+ DCHECK(ready_handles);
+ DCHECK(ready_results);
+ bool should_wait = false;
+ {
+ base::AutoLock lock(lock_);
+ if (ready_handles_.empty()) {
+ // No handles are currently in the ready set. Make sure the event is
+ // reset and try to arm the watcher.
+ handle_event_.Reset();
+
+ DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
+ uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles);
+ base::StackVector<uintptr_t, 4> ready_contexts;
+ ready_contexts.container().resize(num_ready_contexts);
+ base::StackVector<MojoHandleSignalsState, 4> ready_states;
+ MojoHandleSignalsState* out_states = signals_states;
+ if (!out_states) {
+ // If the caller didn't provide a buffer for signal states, we provide
+ // our own locally. MojoArmWatcher() requires one if we want to handle
+ // arming failure properly.
+ ready_states.container().resize(num_ready_contexts);
+ out_states = ready_states.container().data();
+ }
+ MojoResult rv = MojoArmWatcher(
+ watcher_handle_.get().value(), &num_ready_contexts,
+ ready_contexts.container().data(), ready_results, out_states);
+
+ if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
+ // Can't arm because one or more handles is already ready.
+ *num_ready_handles = num_ready_contexts;
+ for (size_t i = 0; i < num_ready_contexts; ++i) {
+ auto it = contexts_.find(ready_contexts.container()[i]);
+ DCHECK(it != contexts_.end());
+ ready_handles[i] = it->second->handle();
+ }
+ return;
+ }
+
+ if (rv == MOJO_RESULT_NOT_FOUND) {
+ // There are no handles in the set. Nothing to watch.
+ *num_ready_handles = 0;
+ return;
+ }
+
+ // Watcher is armed. We can go on waiting for an event to signal.
+ DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS);
+ should_wait = true;
+ }
+ }
+
+ if (should_wait)
+ handle_event_.Wait();
+
+ base::AutoLock lock(lock_);
+
+ DCHECK(!ready_handles_.empty());
+
+ // Pop as many handles as we can out of the ready set and return them.
+ *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
+ for (size_t i = 0; i < *num_ready_handles; ++i) {
+ auto it = ready_handles_.begin();
+ ready_handles[i] = it->first;
+ ready_results[i] = it->second.result;
+ if (signals_states)
+ signals_states[i] = it->second.signals_state;
+ ready_handles_.erase(it);
+ }
+ }
+
+ private:
+ friend class base::RefCountedThreadSafe<State>;
+
+ class Context : public base::RefCountedThreadSafe<Context> {
+ public:
+ Context(scoped_refptr<State> state, Handle handle)
+ : state_(state), handle_(handle) {}
+
+ Handle handle() const { return handle_; }
+
+ uintptr_t context_value() const {
+ return reinterpret_cast<uintptr_t>(this);
+ }
+
+ static void OnNotification(uintptr_t context,
+ MojoResult result,
+ MojoHandleSignalsState signals_state,
+ MojoWatcherNotificationFlags flags) {
+ reinterpret_cast<Context*>(context)->Notify(result, signals_state);
+ }
+
+ private:
+ friend class base::RefCountedThreadSafe<Context>;
+
+ ~Context() {}
+
+ void Notify(MojoResult result, MojoHandleSignalsState signals_state) {
+ state_->Notify(handle_, result, signals_state, this);
+ }
+
+ const scoped_refptr<State> state_;
+ const Handle handle_;
+
+ DISALLOW_COPY_AND_ASSIGN(Context);
+ };
+
+ ~State() {}
+
+ void Notify(Handle handle,
+ MojoResult result,
+ MojoHandleSignalsState signals_state,
+ Context* context) {
+ base::AutoLock lock(lock_);
+
+ // This could be a cancellation notification following an explicit
+ // RemoveHandle(), in which case we really don't care and don't want to
+ // add it to the ready set. Only update and signal if that's not the case.
+ if (!handle_to_context_.count(handle)) {
+ DCHECK_EQ(MOJO_RESULT_CANCELLED, result);
+ } else {
+ ready_handles_[handle] = {result, signals_state};
+ handle_event_.Signal();
+ }
+
+ // Whether it's an implicit or explicit cancellation, erase from |contexts_|
+ // and append to |cancelled_contexts_|.
+ if (result == MOJO_RESULT_CANCELLED) {
+ contexts_.erase(context->context_value());
+ handle_to_context_.erase(handle);
+
+ // NOTE: We retain a context ref in |cancelled_contexts_| to ensure that
+ // this Context's heap address is not reused too soon. For example, it
+ // would otherwise be possible for the user to call AddHandle() from the
+ // WaitSet's thread immediately after this notification has fired on
+ // another thread, potentially reusing the same heap address for the newly
+ // added Context; and then they may call RemoveHandle() for this handle
+ // (not knowing its context has just been implicitly cancelled) and
+ // cause the new Context to be incorrectly removed from |contexts_|.
+ //
+ // This vector is cleared on the WaitSet's own thread every time
+ // RemoveHandle is called.
+ cancelled_contexts_.emplace_back(make_scoped_refptr(context));
+
+ // Balanced in State::AddHandle().
+ context->Release();
+ }
+ }
+
+ struct ReadyState {
+ ReadyState() = default;
+ ReadyState(MojoResult result, MojoHandleSignalsState signals_state)
+ : result(result), signals_state(signals_state) {}
+ ~ReadyState() = default;
+
+ MojoResult result = MOJO_RESULT_UNKNOWN;
+ MojoHandleSignalsState signals_state = {0, 0};
+ };
+
+ // Not guarded by lock. Must only be accessed from the WaitSet's owning
+ // thread.
+ ScopedWatcherHandle watcher_handle_;
+
+ base::Lock lock_;
+ std::map<uintptr_t, scoped_refptr<Context>> contexts_;
+ std::map<Handle, scoped_refptr<Context>> handle_to_context_;
+ std::map<Handle, ReadyState> ready_handles_;
+ std::vector<scoped_refptr<Context>> cancelled_contexts_;
+
+ // Event signaled any time a handle notification is received.
+ base::WaitableEvent handle_event_;
+
+ DISALLOW_COPY_AND_ASSIGN(State);
+};
+
+WaitSet::WaitSet() : state_(new State) {}
+
+WaitSet::~WaitSet() {
+ state_->ShutDown();
+}
+
+MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
+ return state_->AddHandle(handle, signals);
+}
+
+MojoResult WaitSet::RemoveHandle(Handle handle) {
+ return state_->RemoveHandle(handle);
+}
+
+void WaitSet::Wait(size_t* num_ready_handles,
+ Handle* ready_handles,
+ MojoResult* ready_results,
+ MojoHandleSignalsState* signals_states) {
+ state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states);
+}
+
+} // namespace mojo
« no previous file with comments | « mojo/public/cpp/system/wait_set.h ('k') | mojo/public/cpp/test_support/lib/test_utils.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698