Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(228)

Side by Side Diff: third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.cc

Issue 1461213002: WaitSet implementation for old EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-skeleton
Patch Set: Fix windows build and style issues. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h"
6
7 #include <algorithm>
8 #include <utility>
9
10 #include "base/logging.h"
11 #include "third_party/mojo/src/mojo/edk/system/awakable.h"
12
13 namespace mojo {
14 namespace system {
15
16 class WaitSetDispatcher::Waiter final : public Awakable {
17 public:
18 explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
19 ~Waiter() {}
20
21 // |Awakable| implementation.
22 bool Awake(MojoResult result, uintptr_t context) override {
23 // Note: This is called with various Mojo locks held.
24 dispatcher_->WakeDispatcher(result, context);
25 // Removes |this| from the dispatcher's list of waiters.
26 return false;
27 }
28
29 private:
30 WaitSetDispatcher* const dispatcher_;
31 };
32
33 WaitSetDispatcher::WaitSetDispatcher()
34 : waiter_(new WaitSetDispatcher::Waiter(this)) {}
35
36 WaitSetDispatcher::~WaitSetDispatcher() {
37 DCHECK(waiting_dispatchers_.empty());
38 DCHECK(awoken_queue_.empty());
39 DCHECK(processed_dispatchers_.empty());
40 }
41
42 Dispatcher::Type WaitSetDispatcher::GetType() const {
43 return Type::WAIT_SET;
44 }
45
46 void WaitSetDispatcher::CloseImplNoLock() {
47 mutex().AssertHeld();
48 for (const auto& entry : waiting_dispatchers_)
49 entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
50 waiting_dispatchers_.clear();
51
52 MutexLocker locker(&awoken_mutex_);
53 awoken_queue_.clear();
54 processed_dispatchers_.clear();
55 }
56
57 MojoResult WaitSetDispatcher::AddWaitingDispatcherImplNoLock(
58 const scoped_refptr<Dispatcher>& dispatcher,
59 MojoHandleSignals signals,
60 uintptr_t context) {
61 mutex().AssertHeld();
62 if (dispatcher == this)
63 return MOJO_RESULT_INVALID_ARGUMENT;
64
65 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
66 auto it = waiting_dispatchers_.find(dispatcher_handle);
67 if (it != waiting_dispatchers_.end()) {
68 return MOJO_RESULT_ALREADY_EXISTS;
69 }
70
71 const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
72 dispatcher_handle, nullptr);
73 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
74 // Dispatcher is closed.
75 return result;
76 } else if (result != MOJO_RESULT_OK) {
77 WakeDispatcher(result, dispatcher_handle);
78 }
79
80 WaitState state;
81 state.dispatcher = dispatcher;
82 state.context = context;
83 state.signals = signals;
84 bool inserted =
85 waiting_dispatchers_.insert(std::make_pair(dispatcher_handle, state))
86 .second;
87 DCHECK(inserted);
88
89 return MOJO_RESULT_OK;
90 }
91
92 MojoResult WaitSetDispatcher::RemoveWaitingDispatcherImplNoLock(
93 const scoped_refptr<Dispatcher>& dispatcher) {
94 mutex().AssertHeld();
95 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
96 auto it = waiting_dispatchers_.find(dispatcher_handle);
97 if (it == waiting_dispatchers_.end())
98 return MOJO_RESULT_NOT_FOUND;
99
100 dispatcher->RemoveAwakable(waiter_.get(), nullptr);
101 // At this point, it should not be possible for |waiter_| to be woken with
102 // |dispatcher|.
103 waiting_dispatchers_.erase(it);
104
105 MutexLocker locker(&awoken_mutex_);
106 int num_erased = 0;
107 for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
108 if (it->first == dispatcher_handle) {
109 it = awoken_queue_.erase(it);
110 num_erased++;
111 } else {
112 ++it;
113 }
114 }
115 // The dispatcher should only exist in the queue once.
116 DCHECK_LE(num_erased, 1);
117 processed_dispatchers_.erase(
118 std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
119 dispatcher_handle),
120 processed_dispatchers_.end());
121
122 return MOJO_RESULT_OK;
123 }
124
125 MojoResult WaitSetDispatcher::GetReadyDispatchersImplNoLock(
126 UserPointer<uint32_t> count,
127 DispatcherVector* dispatchers,
128 UserPointer<MojoResult> results,
129 UserPointer<uintptr_t> contexts) {
130 mutex().AssertHeld();
131 dispatchers->clear();
132
133 // Re-queue any already retrieved dispatchers. These should be the dispatchers
134 // that were returned on the last call to this function. This loop is
135 // necessary to preserve the logically level-triggering behaviour of waiting
136 // in Mojo. In particular, if no action is taken on a signal, that signal
137 // continues to be satisfied, and therefore a |MojoWait()| on that
138 // handle/signal continues to return immediately.
139 std::deque<uintptr_t> pending;
140 {
141 MutexLocker locker(&awoken_mutex_);
142 pending.swap(processed_dispatchers_);
143 }
144 for (uintptr_t d : pending) {
145 auto it = waiting_dispatchers_.find(d);
146 // Anything in |processed_dispatchers_| should also be in
147 // |waiting_dispatchers_| since dispatchers are removed from both in
148 // |RemoveWaitingDispatcherImplNoLock()|.
149 DCHECK(it != waiting_dispatchers_.end());
150
151 // |awoken_mutex_| cannot be held here because
152 // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
153 // mutex is held while running |WakeDispatcher()| below, which needs to
154 // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
155 // a deadlock.
156 const MojoResult result = it->second.dispatcher->AddAwakable(
157 waiter_.get(), it->second.signals, d, nullptr);
158
159 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
160 // Dispatcher is closed. Implicitly remove it from the wait set since
161 // it may be impossible to remove using |MojoRemoveHandle()|.
162 waiting_dispatchers_.erase(it);
163 } else if (result != MOJO_RESULT_OK) {
164 WakeDispatcher(result, d);
165 }
166 }
167
168 const uint32_t max_woken = count.Get();
169 uint32_t num_woken = 0;
170
171 MutexLocker locker(&awoken_mutex_);
172 while (!awoken_queue_.empty() && num_woken < max_woken) {
173 uintptr_t d = awoken_queue_.front().first;
174 MojoResult result = awoken_queue_.front().second;
175 awoken_queue_.pop_front();
176
177 auto it = waiting_dispatchers_.find(d);
178 DCHECK(it != waiting_dispatchers_.end());
179
180 results.At(num_woken).Put(result);
181 dispatchers->push_back(it->second.dispatcher);
182 if (!contexts.IsNull())
183 contexts.At(num_woken).Put(it->second.context);
184
185 if (result != MOJO_RESULT_CANCELLED) {
186 processed_dispatchers_.push_back(d);
187 } else {
188 waiting_dispatchers_.erase(it);
189 }
190
191 num_woken++;
192 }
193
194 count.Put(num_woken);
195 if (!num_woken)
196 return MOJO_RESULT_SHOULD_WAIT;
197
198 return MOJO_RESULT_OK;
199 }
200
201 void WaitSetDispatcher::CancelAllAwakablesNoLock() {
202 mutex().AssertHeld();
203 MutexLocker locker(&awakable_mutex_);
204 awakable_list_.CancelAll();
205 }
206
207 MojoResult WaitSetDispatcher::AddAwakableImplNoLock(
208 Awakable* awakable,
209 MojoHandleSignals signals,
210 uintptr_t context,
211 HandleSignalsState* signals_state) {
212 mutex().AssertHeld();
213
214 HandleSignalsState state(GetHandleSignalsStateImplNoLock());
215 if (state.satisfies(signals)) {
216 if (signals_state)
217 *signals_state = state;
218 return MOJO_RESULT_ALREADY_EXISTS;
219 }
220 if (!state.can_satisfy(signals)) {
221 if (signals_state)
222 *signals_state = state;
223 return MOJO_RESULT_FAILED_PRECONDITION;
224 }
225
226 MutexLocker locker(&awakable_mutex_);
227 awakable_list_.Add(awakable, signals, context);
228 return MOJO_RESULT_OK;
229 }
230
231 void WaitSetDispatcher::RemoveAwakableImplNoLock(
232 Awakable* awakable,
233 HandleSignalsState* signals_state) {
234 mutex().AssertHeld();
235 MutexLocker locker(&awakable_mutex_);
236 awakable_list_.Remove(awakable);
237 if (signals_state)
238 *signals_state = GetHandleSignalsStateImplNoLock();
239 }
240
241 HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateImplNoLock() const {
242 mutex().AssertHeld();
243 HandleSignalsState rv;
244 rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
245 MutexLocker locker(&awoken_mutex_);
246 if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
247 rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
248 return rv;
249 }
250
251 scoped_refptr<Dispatcher>
252 WaitSetDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
253 mutex().AssertHeld();
254 LOG(ERROR) << "Attempting to serialize WaitSet";
255 CloseImplNoLock();
256 return new WaitSetDispatcher();
257 }
258
259 void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
260 {
261 MutexLocker locker(&awoken_mutex_);
262
263 if (result == MOJO_RESULT_ALREADY_EXISTS)
264 result = MOJO_RESULT_OK;
265
266 awoken_queue_.push_back(std::make_pair(context, result));
267 }
268
269 MutexLocker locker(&awakable_mutex_);
270 HandleSignalsState signals_state;
271 signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
272 signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
273 awakable_list_.AwakeForStateChange(signals_state);
274 }
275
276 } // namespace system
277 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698