Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(660)

Unified Diff: third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc

Issue 1461213002: WaitSet implementation for old EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-skeleton
Patch Set: Fix windows build and style issues. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc
diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc
new file mode 100644
index 0000000000000000000000000000000000000000..45518e7eefd14044f2696bca00dd8fce301cfc31
--- /dev/null
+++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc
@@ -0,0 +1,277 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "base/logging.h"
+#include "third_party/mojo/src/mojo/edk/system/awakable.h"
+
+namespace mojo {
+namespace system {
+
+class WaitSetDispatcher::Waiter final : public Awakable {
+ public:
+ explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
+ ~Waiter() {}
+
+ // |Awakable| implementation.
+ bool Awake(MojoResult result, uintptr_t context) override {
+ // Note: This is called with various Mojo locks held.
+ dispatcher_->WakeDispatcher(result, context);
+ // Removes |this| from the dispatcher's list of waiters.
+ return false;
+ }
+
+ private:
+ WaitSetDispatcher* const dispatcher_;
+};
+
+WaitSetDispatcher::WaitSetDispatcher()
+ : waiter_(new WaitSetDispatcher::Waiter(this)) {}
+
+WaitSetDispatcher::~WaitSetDispatcher() {
+ DCHECK(waiting_dispatchers_.empty());
+ DCHECK(awoken_queue_.empty());
+ DCHECK(processed_dispatchers_.empty());
+}
+
+Dispatcher::Type WaitSetDispatcher::GetType() const {
+ return Type::WAIT_SET;
+}
+
+void WaitSetDispatcher::CloseImplNoLock() {
+ mutex().AssertHeld();
+ for (const auto& entry : waiting_dispatchers_)
+ entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
+ waiting_dispatchers_.clear();
+
+ MutexLocker locker(&awoken_mutex_);
+ awoken_queue_.clear();
+ processed_dispatchers_.clear();
+}
+
+MojoResult WaitSetDispatcher::AddWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher,
+ MojoHandleSignals signals,
+ uintptr_t context) {
+ mutex().AssertHeld();
+ if (dispatcher == this)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
+ auto it = waiting_dispatchers_.find(dispatcher_handle);
+ if (it != waiting_dispatchers_.end()) {
+ return MOJO_RESULT_ALREADY_EXISTS;
+ }
+
+ const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
+ dispatcher_handle, nullptr);
+ if (result == MOJO_RESULT_INVALID_ARGUMENT) {
+ // Dispatcher is closed.
+ return result;
+ } else if (result != MOJO_RESULT_OK) {
+ WakeDispatcher(result, dispatcher_handle);
+ }
+
+ WaitState state;
+ state.dispatcher = dispatcher;
+ state.context = context;
+ state.signals = signals;
+ bool inserted =
+ waiting_dispatchers_.insert(std::make_pair(dispatcher_handle, state))
+ .second;
+ DCHECK(inserted);
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WaitSetDispatcher::RemoveWaitingDispatcherImplNoLock(
+ const scoped_refptr<Dispatcher>& dispatcher) {
+ mutex().AssertHeld();
+ uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
+ auto it = waiting_dispatchers_.find(dispatcher_handle);
+ if (it == waiting_dispatchers_.end())
+ return MOJO_RESULT_NOT_FOUND;
+
+ dispatcher->RemoveAwakable(waiter_.get(), nullptr);
+ // At this point, it should not be possible for |waiter_| to be woken with
+ // |dispatcher|.
+ waiting_dispatchers_.erase(it);
+
+ MutexLocker locker(&awoken_mutex_);
+ int num_erased = 0;
+ for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
+ if (it->first == dispatcher_handle) {
+ it = awoken_queue_.erase(it);
+ num_erased++;
+ } else {
+ ++it;
+ }
+ }
+ // The dispatcher should only exist in the queue once.
+ DCHECK_LE(num_erased, 1);
+ processed_dispatchers_.erase(
+ std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
+ dispatcher_handle),
+ processed_dispatchers_.end());
+
+ return MOJO_RESULT_OK;
+}
+
+MojoResult WaitSetDispatcher::GetReadyDispatchersImplNoLock(
+ UserPointer<uint32_t> count,
+ DispatcherVector* dispatchers,
+ UserPointer<MojoResult> results,
+ UserPointer<uintptr_t> contexts) {
+ mutex().AssertHeld();
+ dispatchers->clear();
+
+ // Re-queue any already retrieved dispatchers. These should be the dispatchers
+ // that were returned on the last call to this function. This loop is
+ // necessary to preserve the logically level-triggering behaviour of waiting
+ // in Mojo. In particular, if no action is taken on a signal, that signal
+ // continues to be satisfied, and therefore a |MojoWait()| on that
+ // handle/signal continues to return immediately.
+ std::deque<uintptr_t> pending;
+ {
+ MutexLocker locker(&awoken_mutex_);
+ pending.swap(processed_dispatchers_);
+ }
+ for (uintptr_t d : pending) {
+ auto it = waiting_dispatchers_.find(d);
+ // Anything in |processed_dispatchers_| should also be in
+ // |waiting_dispatchers_| since dispatchers are removed from both in
+ // |RemoveWaitingDispatcherImplNoLock()|.
+ DCHECK(it != waiting_dispatchers_.end());
+
+ // |awoken_mutex_| cannot be held here because
+ // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
+ // mutex is held while running |WakeDispatcher()| below, which needs to
+ // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
+ // a deadlock.
+ const MojoResult result = it->second.dispatcher->AddAwakable(
+ waiter_.get(), it->second.signals, d, nullptr);
+
+ if (result == MOJO_RESULT_INVALID_ARGUMENT) {
+ // Dispatcher is closed. Implicitly remove it from the wait set since
+ // it may be impossible to remove using |MojoRemoveHandle()|.
+ waiting_dispatchers_.erase(it);
+ } else if (result != MOJO_RESULT_OK) {
+ WakeDispatcher(result, d);
+ }
+ }
+
+ const uint32_t max_woken = count.Get();
+ uint32_t num_woken = 0;
+
+ MutexLocker locker(&awoken_mutex_);
+ while (!awoken_queue_.empty() && num_woken < max_woken) {
+ uintptr_t d = awoken_queue_.front().first;
+ MojoResult result = awoken_queue_.front().second;
+ awoken_queue_.pop_front();
+
+ auto it = waiting_dispatchers_.find(d);
+ DCHECK(it != waiting_dispatchers_.end());
+
+ results.At(num_woken).Put(result);
+ dispatchers->push_back(it->second.dispatcher);
+ if (!contexts.IsNull())
+ contexts.At(num_woken).Put(it->second.context);
+
+ if (result != MOJO_RESULT_CANCELLED) {
+ processed_dispatchers_.push_back(d);
+ } else {
+ waiting_dispatchers_.erase(it);
+ }
+
+ num_woken++;
+ }
+
+ count.Put(num_woken);
+ if (!num_woken)
+ return MOJO_RESULT_SHOULD_WAIT;
+
+ return MOJO_RESULT_OK;
+}
+
+void WaitSetDispatcher::CancelAllAwakablesNoLock() {
+ mutex().AssertHeld();
+ MutexLocker locker(&awakable_mutex_);
+ awakable_list_.CancelAll();
+}
+
+MojoResult WaitSetDispatcher::AddAwakableImplNoLock(
+ Awakable* awakable,
+ MojoHandleSignals signals,
+ uintptr_t context,
+ HandleSignalsState* signals_state) {
+ mutex().AssertHeld();
+
+ HandleSignalsState state(GetHandleSignalsStateImplNoLock());
+ if (state.satisfies(signals)) {
+ if (signals_state)
+ *signals_state = state;
+ return MOJO_RESULT_ALREADY_EXISTS;
+ }
+ if (!state.can_satisfy(signals)) {
+ if (signals_state)
+ *signals_state = state;
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
+
+ MutexLocker locker(&awakable_mutex_);
+ awakable_list_.Add(awakable, signals, context);
+ return MOJO_RESULT_OK;
+}
+
+void WaitSetDispatcher::RemoveAwakableImplNoLock(
+ Awakable* awakable,
+ HandleSignalsState* signals_state) {
+ mutex().AssertHeld();
+ MutexLocker locker(&awakable_mutex_);
+ awakable_list_.Remove(awakable);
+ if (signals_state)
+ *signals_state = GetHandleSignalsStateImplNoLock();
+}
+
+HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateImplNoLock() const {
+ mutex().AssertHeld();
+ HandleSignalsState rv;
+ rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ MutexLocker locker(&awoken_mutex_);
+ if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
+ rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ return rv;
+}
+
+scoped_refptr<Dispatcher>
+WaitSetDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
+ mutex().AssertHeld();
+ LOG(ERROR) << "Attempting to serialize WaitSet";
+ CloseImplNoLock();
+ return new WaitSetDispatcher();
+}
+
+void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
+ {
+ MutexLocker locker(&awoken_mutex_);
+
+ if (result == MOJO_RESULT_ALREADY_EXISTS)
+ result = MOJO_RESULT_OK;
+
+ awoken_queue_.push_back(std::make_pair(context, result));
+ }
+
+ MutexLocker locker(&awakable_mutex_);
+ HandleSignalsState signals_state;
+ signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
+ awakable_list_.AwakeForStateChange(signals_state);
+}
+
+} // namespace system
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698