Index: third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..45518e7eefd14044f2696bca00dd8fce301cfc31 |
--- /dev/null |
+++ b/third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc |
@@ -0,0 +1,277 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h" |
+ |
+#include <algorithm> |
+#include <utility> |
+ |
+#include "base/logging.h" |
+#include "third_party/mojo/src/mojo/edk/system/awakable.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+class WaitSetDispatcher::Waiter final : public Awakable { |
+ public: |
+ explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {} |
+ ~Waiter() {} |
+ |
+ // |Awakable| implementation. |
+ bool Awake(MojoResult result, uintptr_t context) override { |
+ // Note: This is called with various Mojo locks held. |
+ dispatcher_->WakeDispatcher(result, context); |
+ // Removes |this| from the dispatcher's list of waiters. |
+ return false; |
+ } |
+ |
+ private: |
+ WaitSetDispatcher* const dispatcher_; |
+}; |
+ |
+WaitSetDispatcher::WaitSetDispatcher() |
+ : waiter_(new WaitSetDispatcher::Waiter(this)) {} |
+ |
+WaitSetDispatcher::~WaitSetDispatcher() { |
+ DCHECK(waiting_dispatchers_.empty()); |
+ DCHECK(awoken_queue_.empty()); |
+ DCHECK(processed_dispatchers_.empty()); |
+} |
+ |
+Dispatcher::Type WaitSetDispatcher::GetType() const { |
+ return Type::WAIT_SET; |
+} |
+ |
+void WaitSetDispatcher::CloseImplNoLock() { |
+ mutex().AssertHeld(); |
+ for (const auto& entry : waiting_dispatchers_) |
+ entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr); |
+ waiting_dispatchers_.clear(); |
+ |
+ MutexLocker locker(&awoken_mutex_); |
+ awoken_queue_.clear(); |
+ processed_dispatchers_.clear(); |
+} |
+ |
+MojoResult WaitSetDispatcher::AddWaitingDispatcherImplNoLock( |
+ const scoped_refptr<Dispatcher>& dispatcher, |
+ MojoHandleSignals signals, |
+ uintptr_t context) { |
+ mutex().AssertHeld(); |
+ if (dispatcher == this) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); |
+ auto it = waiting_dispatchers_.find(dispatcher_handle); |
+ if (it != waiting_dispatchers_.end()) { |
+ return MOJO_RESULT_ALREADY_EXISTS; |
+ } |
+ |
+ const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals, |
+ dispatcher_handle, nullptr); |
+ if (result == MOJO_RESULT_INVALID_ARGUMENT) { |
+ // Dispatcher is closed. |
+ return result; |
+ } else if (result != MOJO_RESULT_OK) { |
+ WakeDispatcher(result, dispatcher_handle); |
+ } |
+ |
+ WaitState state; |
+ state.dispatcher = dispatcher; |
+ state.context = context; |
+ state.signals = signals; |
+ bool inserted = |
+ waiting_dispatchers_.insert(std::make_pair(dispatcher_handle, state)) |
+ .second; |
+ DCHECK(inserted); |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+MojoResult WaitSetDispatcher::RemoveWaitingDispatcherImplNoLock( |
+ const scoped_refptr<Dispatcher>& dispatcher) { |
+ mutex().AssertHeld(); |
+ uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); |
+ auto it = waiting_dispatchers_.find(dispatcher_handle); |
+ if (it == waiting_dispatchers_.end()) |
+ return MOJO_RESULT_NOT_FOUND; |
+ |
+ dispatcher->RemoveAwakable(waiter_.get(), nullptr); |
+ // At this point, it should not be possible for |waiter_| to be woken with |
+ // |dispatcher|. |
+ waiting_dispatchers_.erase(it); |
+ |
+ MutexLocker locker(&awoken_mutex_); |
+ int num_erased = 0; |
+ for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) { |
+ if (it->first == dispatcher_handle) { |
+ it = awoken_queue_.erase(it); |
+ num_erased++; |
+ } else { |
+ ++it; |
+ } |
+ } |
+ // The dispatcher should only exist in the queue once. |
+ DCHECK_LE(num_erased, 1); |
+ processed_dispatchers_.erase( |
+ std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(), |
+ dispatcher_handle), |
+ processed_dispatchers_.end()); |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+MojoResult WaitSetDispatcher::GetReadyDispatchersImplNoLock( |
+ UserPointer<uint32_t> count, |
+ DispatcherVector* dispatchers, |
+ UserPointer<MojoResult> results, |
+ UserPointer<uintptr_t> contexts) { |
+ mutex().AssertHeld(); |
+ dispatchers->clear(); |
+ |
+ // Re-queue any already retrieved dispatchers. These should be the dispatchers |
+ // that were returned on the last call to this function. This loop is |
+ // necessary to preserve the logically level-triggering behaviour of waiting |
+ // in Mojo. In particular, if no action is taken on a signal, that signal |
+ // continues to be satisfied, and therefore a |MojoWait()| on that |
+ // handle/signal continues to return immediately. |
+ std::deque<uintptr_t> pending; |
+ { |
+ MutexLocker locker(&awoken_mutex_); |
+ pending.swap(processed_dispatchers_); |
+ } |
+ for (uintptr_t d : pending) { |
+ auto it = waiting_dispatchers_.find(d); |
+ // Anything in |processed_dispatchers_| should also be in |
+ // |waiting_dispatchers_| since dispatchers are removed from both in |
+ // |RemoveWaitingDispatcherImplNoLock()|. |
+ DCHECK(it != waiting_dispatchers_.end()); |
+ |
+ // |awoken_mutex_| cannot be held here because |
+ // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This |
+ // mutex is held while running |WakeDispatcher()| below, which needs to |
+ // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in |
+ // a deadlock. |
+ const MojoResult result = it->second.dispatcher->AddAwakable( |
+ waiter_.get(), it->second.signals, d, nullptr); |
+ |
+ if (result == MOJO_RESULT_INVALID_ARGUMENT) { |
+ // Dispatcher is closed. Implicitly remove it from the wait set since |
+ // it may be impossible to remove using |MojoRemoveHandle()|. |
+ waiting_dispatchers_.erase(it); |
+ } else if (result != MOJO_RESULT_OK) { |
+ WakeDispatcher(result, d); |
+ } |
+ } |
+ |
+ const uint32_t max_woken = count.Get(); |
+ uint32_t num_woken = 0; |
+ |
+ MutexLocker locker(&awoken_mutex_); |
+ while (!awoken_queue_.empty() && num_woken < max_woken) { |
+ uintptr_t d = awoken_queue_.front().first; |
+ MojoResult result = awoken_queue_.front().second; |
+ awoken_queue_.pop_front(); |
+ |
+ auto it = waiting_dispatchers_.find(d); |
+ DCHECK(it != waiting_dispatchers_.end()); |
+ |
+ results.At(num_woken).Put(result); |
+ dispatchers->push_back(it->second.dispatcher); |
+ if (!contexts.IsNull()) |
+ contexts.At(num_woken).Put(it->second.context); |
+ |
+ if (result != MOJO_RESULT_CANCELLED) { |
+ processed_dispatchers_.push_back(d); |
+ } else { |
+ waiting_dispatchers_.erase(it); |
+ } |
+ |
+ num_woken++; |
+ } |
+ |
+ count.Put(num_woken); |
+ if (!num_woken) |
+ return MOJO_RESULT_SHOULD_WAIT; |
+ |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void WaitSetDispatcher::CancelAllAwakablesNoLock() { |
+ mutex().AssertHeld(); |
+ MutexLocker locker(&awakable_mutex_); |
+ awakable_list_.CancelAll(); |
+} |
+ |
+MojoResult WaitSetDispatcher::AddAwakableImplNoLock( |
+ Awakable* awakable, |
+ MojoHandleSignals signals, |
+ uintptr_t context, |
+ HandleSignalsState* signals_state) { |
+ mutex().AssertHeld(); |
+ |
+ HandleSignalsState state(GetHandleSignalsStateImplNoLock()); |
+ if (state.satisfies(signals)) { |
+ if (signals_state) |
+ *signals_state = state; |
+ return MOJO_RESULT_ALREADY_EXISTS; |
+ } |
+ if (!state.can_satisfy(signals)) { |
+ if (signals_state) |
+ *signals_state = state; |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
+ |
+ MutexLocker locker(&awakable_mutex_); |
+ awakable_list_.Add(awakable, signals, context); |
+ return MOJO_RESULT_OK; |
+} |
+ |
+void WaitSetDispatcher::RemoveAwakableImplNoLock( |
+ Awakable* awakable, |
+ HandleSignalsState* signals_state) { |
+ mutex().AssertHeld(); |
+ MutexLocker locker(&awakable_mutex_); |
+ awakable_list_.Remove(awakable); |
+ if (signals_state) |
+ *signals_state = GetHandleSignalsStateImplNoLock(); |
+} |
+ |
+HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateImplNoLock() const { |
+ mutex().AssertHeld(); |
+ HandleSignalsState rv; |
+ rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; |
+ MutexLocker locker(&awoken_mutex_); |
+ if (!awoken_queue_.empty() || !processed_dispatchers_.empty()) |
+ rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; |
+ return rv; |
+} |
+ |
+scoped_refptr<Dispatcher> |
+WaitSetDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
+ mutex().AssertHeld(); |
+ LOG(ERROR) << "Attempting to serialize WaitSet"; |
+ CloseImplNoLock(); |
+ return new WaitSetDispatcher(); |
+} |
+ |
+void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) { |
+ { |
+ MutexLocker locker(&awoken_mutex_); |
+ |
+ if (result == MOJO_RESULT_ALREADY_EXISTS) |
+ result = MOJO_RESULT_OK; |
+ |
+ awoken_queue_.push_back(std::make_pair(context, result)); |
+ } |
+ |
+ MutexLocker locker(&awakable_mutex_); |
+ HandleSignalsState signals_state; |
+ signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; |
+ signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; |
+ awakable_list_.AwakeForStateChange(signals_state); |
+} |
+ |
+} // namespace system |
+} // namespace mojo |