Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(171)

Unified Diff: mojo/edk/system/wait_set_dispatcher.cc

Issue 2093763002: Implement WaitSetDispatcher::WaitSetWaitImpl(). (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: doh Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/wait_set_dispatcher.h ('k') | mojo/edk/system/wait_set_dispatcher_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/wait_set_dispatcher.cc
diff --git a/mojo/edk/system/wait_set_dispatcher.cc b/mojo/edk/system/wait_set_dispatcher.cc
index 68ace5443302537d87d8aab6fee523a2868b12fb..cfd911f905845af1e4de39b153a7facec0b3ce0e 100644
--- a/mojo/edk/system/wait_set_dispatcher.cc
+++ b/mojo/edk/system/wait_set_dispatcher.cc
@@ -4,12 +4,18 @@
#include "mojo/edk/system/wait_set_dispatcher.h"
+#include <string.h>
+
+#include <algorithm>
+#include <limits>
#include <utility>
#include "base/logging.h"
+#include "mojo/edk/platform/time_ticks.h"
#include "mojo/edk/system/configuration.h"
#include "mojo/edk/system/options_validation.h"
+using mojo::platform::GetTimeTicks;
using mojo::util::MutexLocker;
using mojo::util::RefPtr;
@@ -113,6 +119,8 @@ void WaitSetDispatcher::CloseImplNoLock() {
triggered_tail_ = nullptr;
triggered_count_ = 0u;
+ cv_.Signal();
+
// We want to remove the awakables outside the lock, so we have to unlock
// |mutex()|. Note that while unlocked, |Awake()| may get called.
// TODO(vtl): This is pretty terrible, but changing it would require pretty
@@ -230,20 +238,36 @@ MojoResult WaitSetDispatcher::WaitSetRemoveImpl(uint64_t cookie) {
// the entry with this cookie hasn't been added yet.
return MOJO_RESULT_NOT_FOUND;
}
+ if (entry->is_being_removed) {
+ // This entry is being removed on another thread!
+ return MOJO_RESULT_NOT_FOUND;
+ }
+
+ entry->is_being_removed = true;
// We'll remove ourself from the target dispatcher's awakable list outside
// the lock.
- dispatcher = std::move(entry->dispatcher);
+ dispatcher = entry->dispatcher;
+ }
+
+ if (dispatcher)
+ dispatcher->RemoveAwakable(true, this, cookie, nullptr);
+ {
+ MutexLocker locker(&mutex());
+
+ if (is_closed_no_lock())
+ return MOJO_RESULT_OK;
+
+ auto it = entries_.find(cookie);
+ DCHECK(it != entries_.end());
+ Entry* entry = it->second.get();
+ DCHECK(entry->is_being_removed);
if (entry->is_triggered)
RemoveTriggeredNoLock(entry);
-
- // Note: This invalidates |entry|.
- entries_.erase(it);
+ entries_.erase(cookie);
}
- if (dispatcher)
- dispatcher->RemoveAwakable(true, this, cookie, nullptr);
return MOJO_RESULT_OK;
}
@@ -256,9 +280,93 @@ MojoResult WaitSetDispatcher::WaitSetWaitImpl(
if (is_closed_no_lock())
return MOJO_RESULT_INVALID_ARGUMENT;
- // TODO(vtl)
- NOTIMPLEMENTED();
- return MOJO_RESULT_UNIMPLEMENTED;
+ // Read this before waiting. (If we're going to crash due to reading input
+ // values, we'd like to do so before waiting.)
+ uint32_t num_results_in = num_results.Get();
+
+ if (deadline == MOJO_DEADLINE_INDEFINITE) {
+ while (!is_closed_no_lock() && triggered_count_ == 0u)
+ cv_.Wait(&mutex());
+ } else {
+ // We may get spurious wakeups, so record the start time and track the
+ // remaining timeout.
+ uint64_t wait_remaining = deadline;
+ MojoTimeTicks start = GetTimeTicks();
+ while (!is_closed_no_lock() && triggered_count_ == 0u) {
+ // NOTE(vtl): Possibly, we should add a version of |WaitWithTimeout()|
+ // that takes an absolute deadline, since that's what pthreads takes.
+ if (cv_.WaitWithTimeout(&mutex(), wait_remaining))
+ return MOJO_RESULT_DEADLINE_EXCEEDED; // Definitely timed out.
+
+ MojoTimeTicks now = GetTimeTicks();
+ DCHECK_GE(now, start);
+ uint64_t elapsed = static_cast<uint64_t>(now - start);
+ // It's possible that the deadline has passed anyway.
+ if (elapsed >= deadline)
+ return MOJO_RESULT_DEADLINE_EXCEEDED;
+
+ // Otherwise, recalculate the amount that we have left to wait.
+ wait_remaining = deadline - elapsed;
+ }
+ }
+ if (is_closed_no_lock())
+ return MOJO_RESULT_CANCELLED;
+ DCHECK_GT(triggered_count_, 0u);
+
+ uint32_t num_results_out =
+ static_cast<uint32_t>(std::min<size_t>(num_results_in, triggered_count_));
+ std::vector<MojoWaitSetResult> results_out(num_results_out);
+ if (num_results_out > 0u) {
+ // We're going to copy out all this memory, so we should make sure it's all
+ // been zeroed.
+ memset(results_out.data(), 0,
+ results_out.size() * sizeof(MojoWaitSetResult));
+
+ const Entry* entry = triggered_head_;
+ for (auto& wait_set_result : results_out) {
+ DCHECK(entry);
+ DCHECK(entry->is_triggered);
+
+ wait_set_result.cookie = entry->cookie;
+ // |wait_set_result.reserved| has already been zeroed, as has
+ // |wait_set_result.signals_state| (for the cases below where we don't set
+ // it explicitly).
+ wait_set_result.reserved = 0u;
+ if (!entry->dispatcher) {
+ wait_set_result.wait_result = MOJO_RESULT_CANCELLED;
+ } else if (entry->signals_state.satisfies(entry->signals)) {
+ wait_set_result.wait_result = MOJO_RESULT_OK;
+ wait_set_result.signals_state = entry->signals_state;
+ } else if (!entry->signals_state.can_satisfy(entry->signals)) {
+ wait_set_result.wait_result = MOJO_RESULT_FAILED_PRECONDITION;
+ wait_set_result.signals_state = entry->signals_state;
+ } else {
+ NOTREACHED();
+ wait_set_result.wait_result = MOJO_RESULT_INTERNAL;
+ }
+ // TODO(vtl): The comment in mojo/public/c/system/wait_set.h indicates
+ // that we may have to provide |MOJO_RESULT_BUSY|, but we never do that
+ // here. Is that right or am I missing something?
+
+ entry = entry->triggered_next;
+ }
+ }
+ uint32_t max_results_out = static_cast<uint32_t>(
+ std::min<size_t>(std::numeric_limits<uint32_t>::max(), triggered_count_));
+
+ DCHECK_LE(num_results_out, num_results_in);
+ num_results.Put(num_results_out);
+ if (num_results_out > 0u) {
+ DCHECK_EQ(num_results_out, results_out.size());
+ results.PutArray(results_out.data(), results_out.size());
+ } else {
+ // We were awoken and didn't time out, so the only reason we should be
+ // providing no results is if none were requested.
+ DCHECK_EQ(num_results_in, 0u);
+ }
+ if (!max_results.IsNull())
+ max_results.Put(max_results_out);
+ return MOJO_RESULT_OK;
}
void WaitSetDispatcher::Awake(uint64_t context,
@@ -278,6 +386,9 @@ void WaitSetDispatcher::Awake(uint64_t context,
const auto& entry = it->second;
// Once we get "cancelled", we should never be awoken again.
DCHECK(entry->dispatcher);
+ if (entry->is_being_removed)
+ return;
+
switch (reason) {
case AwakeReason::CANCELLED:
if (!entry->is_triggered)
@@ -313,6 +424,8 @@ void WaitSetDispatcher::AddTriggeredNoLock(Entry* entry) {
DCHECK(!entry->triggered_next);
entry->is_triggered = true;
+ if (!triggered_count_)
+ cv_.Signal();
triggered_count_++;
if (!triggered_tail_) {
« no previous file with comments | « mojo/edk/system/wait_set_dispatcher.h ('k') | mojo/edk/system/wait_set_dispatcher_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698