Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: mojo/edk/system/wait_set_dispatcher.cc

Issue 2744943002: Mojo: Move waiting APIs to public library (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/wait_set_dispatcher.h ('k') | mojo/edk/system/wait_set_dispatcher_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/wait_set_dispatcher.h"
6
7 #include <stdint.h>
8
9 #include <algorithm>
10 #include <utility>
11
12 #include "base/logging.h"
13 #include "mojo/edk/system/awakable.h"
14
15 namespace mojo {
16 namespace edk {
17
18 class WaitSetDispatcher::Waiter final : public Awakable {
19 public:
20 explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
21 ~Waiter() {}
22
23 // |Awakable| implementation.
24 bool Awake(MojoResult result, uintptr_t context) override {
25 // Note: This is called with various Mojo locks held.
26 dispatcher_->WakeDispatcher(result, context);
27 // Removes |this| from the dispatcher's list of waiters.
28 return false;
29 }
30
31 private:
32 WaitSetDispatcher* const dispatcher_;
33 };
34
35 WaitSetDispatcher::WaitState::WaitState() {}
36
37 WaitSetDispatcher::WaitState::WaitState(const WaitState& other) = default;
38
39 WaitSetDispatcher::WaitState::~WaitState() {}
40
41 WaitSetDispatcher::WaitSetDispatcher()
42 : waiter_(new WaitSetDispatcher::Waiter(this)) {}
43
44 Dispatcher::Type WaitSetDispatcher::GetType() const {
45 return Type::WAIT_SET;
46 }
47
48 MojoResult WaitSetDispatcher::Close() {
49 base::AutoLock lock(lock_);
50
51 if (is_closed_)
52 return MOJO_RESULT_INVALID_ARGUMENT;
53 is_closed_ = true;
54
55 {
56 base::AutoLock locker(awakable_lock_);
57 awakable_list_.CancelAll();
58 }
59
60 for (const auto& entry : waiting_dispatchers_)
61 entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
62 waiting_dispatchers_.clear();
63
64 base::AutoLock locker(awoken_lock_);
65 awoken_queue_.clear();
66 processed_dispatchers_.clear();
67
68 return MOJO_RESULT_OK;
69 }
70
71 MojoResult WaitSetDispatcher::AddWaitingDispatcher(
72 const scoped_refptr<Dispatcher>& dispatcher,
73 MojoHandleSignals signals,
74 uintptr_t context) {
75 if (dispatcher == this)
76 return MOJO_RESULT_INVALID_ARGUMENT;
77
78 base::AutoLock lock(lock_);
79
80 if (is_closed_)
81 return MOJO_RESULT_INVALID_ARGUMENT;
82
83 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
84 auto it = waiting_dispatchers_.find(dispatcher_handle);
85 if (it != waiting_dispatchers_.end()) {
86 return MOJO_RESULT_ALREADY_EXISTS;
87 }
88
89 const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
90 dispatcher_handle, nullptr);
91 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
92 // Dispatcher is closed.
93 return result;
94 } else if (result != MOJO_RESULT_OK) {
95 WakeDispatcher(result, dispatcher_handle);
96 }
97
98 WaitState state;
99 state.dispatcher = dispatcher;
100 state.context = context;
101 state.signals = signals;
102 bool inserted = waiting_dispatchers_.insert(
103 std::make_pair(dispatcher_handle, state)).second;
104 DCHECK(inserted);
105
106 return MOJO_RESULT_OK;
107 }
108
109 MojoResult WaitSetDispatcher::RemoveWaitingDispatcher(
110 const scoped_refptr<Dispatcher>& dispatcher) {
111 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
112
113 base::AutoLock lock(lock_);
114 if (is_closed_)
115 return MOJO_RESULT_INVALID_ARGUMENT;
116
117 auto it = waiting_dispatchers_.find(dispatcher_handle);
118 if (it == waiting_dispatchers_.end())
119 return MOJO_RESULT_NOT_FOUND;
120
121 dispatcher->RemoveAwakable(waiter_.get(), nullptr);
122 // At this point, it should not be possible for |waiter_| to be woken with
123 // |dispatcher|.
124 waiting_dispatchers_.erase(it);
125
126 base::AutoLock locker(awoken_lock_);
127 int num_erased = 0;
128 for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
129 if (it->first == dispatcher_handle) {
130 it = awoken_queue_.erase(it);
131 num_erased++;
132 } else {
133 ++it;
134 }
135 }
136 // The dispatcher should only exist in the queue once.
137 DCHECK_LE(num_erased, 1);
138 processed_dispatchers_.erase(
139 std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
140 dispatcher_handle),
141 processed_dispatchers_.end());
142
143 return MOJO_RESULT_OK;
144 }
145
146 MojoResult WaitSetDispatcher::GetReadyDispatchers(
147 uint32_t* count,
148 DispatcherVector* dispatchers,
149 MojoResult* results,
150 uintptr_t* contexts) {
151 base::AutoLock lock(lock_);
152
153 if (is_closed_)
154 return MOJO_RESULT_INVALID_ARGUMENT;
155
156 dispatchers->clear();
157
158 // Re-queue any already retrieved dispatchers. These should be the dispatchers
159 // that were returned on the last call to this function. This loop is
160 // necessary to preserve the logically level-triggering behaviour of waiting
161 // in Mojo. In particular, if no action is taken on a signal, that signal
162 // continues to be satisfied, and therefore a |MojoWait()| on that
163 // handle/signal continues to return immediately.
164 std::deque<uintptr_t> pending;
165 {
166 base::AutoLock locker(awoken_lock_);
167 pending.swap(processed_dispatchers_);
168 }
169 for (uintptr_t d : pending) {
170 auto it = waiting_dispatchers_.find(d);
171 // Anything in |processed_dispatchers_| should also be in
172 // |waiting_dispatchers_| since dispatchers are removed from both in
173 // |RemoveWaitingDispatcherImplNoLock()|.
174 DCHECK(it != waiting_dispatchers_.end());
175
176 // |awoken_mutex_| cannot be held here because
177 // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
178 // mutex is held while running |WakeDispatcher()| below, which needs to
179 // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
180 // a deadlock.
181 const MojoResult result = it->second.dispatcher->AddAwakable(
182 waiter_.get(), it->second.signals, d, nullptr);
183
184 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
185 // Dispatcher is closed. Implicitly remove it from the wait set since
186 // it may be impossible to remove using |MojoRemoveHandle()|.
187 waiting_dispatchers_.erase(it);
188 } else if (result != MOJO_RESULT_OK) {
189 WakeDispatcher(result, d);
190 }
191 }
192
193 const uint32_t max_woken = *count;
194 uint32_t num_woken = 0;
195
196 base::AutoLock locker(awoken_lock_);
197 while (!awoken_queue_.empty() && num_woken < max_woken) {
198 uintptr_t d = awoken_queue_.front().first;
199 MojoResult result = awoken_queue_.front().second;
200 awoken_queue_.pop_front();
201
202 auto it = waiting_dispatchers_.find(d);
203 DCHECK(it != waiting_dispatchers_.end());
204
205 results[num_woken] = result;
206 dispatchers->push_back(it->second.dispatcher);
207 if (contexts)
208 contexts[num_woken] = it->second.context;
209
210 if (result != MOJO_RESULT_CANCELLED) {
211 processed_dispatchers_.push_back(d);
212 } else {
213 // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed.
214 // Return it, but also implcitly remove it from the wait set.
215 waiting_dispatchers_.erase(it);
216 }
217
218 num_woken++;
219 }
220
221 *count = num_woken;
222 if (!num_woken)
223 return MOJO_RESULT_SHOULD_WAIT;
224
225 return MOJO_RESULT_OK;
226 }
227
228 HandleSignalsState WaitSetDispatcher::GetHandleSignalsState() const {
229 base::AutoLock lock(lock_);
230 return GetHandleSignalsStateNoLock();
231 }
232
233 HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateNoLock() const {
234 lock_.AssertAcquired();
235 if (is_closed_)
236 return HandleSignalsState();
237
238 HandleSignalsState rv;
239 rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
240 base::AutoLock locker(awoken_lock_);
241 if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
242 rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
243 return rv;
244 }
245
246 MojoResult WaitSetDispatcher::AddAwakable(Awakable* awakable,
247 MojoHandleSignals signals,
248 uintptr_t context,
249 HandleSignalsState* signals_state) {
250 base::AutoLock lock(lock_);
251 // |awakable_lock_| is acquired here instead of immediately before adding to
252 // |awakable_list_| because we need to check the signals state and add to
253 // |awakable_list_| as an atomic operation. If the pair isn't atomic, it is
254 // possible for the signals state to change after it is checked, but before
255 // the awakable is added. In that case, the added awakable won't be signalled.
256 base::AutoLock awakable_locker(awakable_lock_);
257 HandleSignalsState state(GetHandleSignalsStateNoLock());
258 if (state.satisfies(signals)) {
259 if (signals_state)
260 *signals_state = state;
261 return MOJO_RESULT_ALREADY_EXISTS;
262 }
263 if (!state.can_satisfy(signals)) {
264 if (signals_state)
265 *signals_state = state;
266 return MOJO_RESULT_FAILED_PRECONDITION;
267 }
268
269 awakable_list_.Add(awakable, signals, context);
270 return MOJO_RESULT_OK;
271 }
272
273 void WaitSetDispatcher::RemoveAwakable(Awakable* awakable,
274 HandleSignalsState* signals_state) {
275 {
276 base::AutoLock locker(awakable_lock_);
277 awakable_list_.Remove(awakable);
278 }
279 if (signals_state)
280 *signals_state = GetHandleSignalsState();
281 }
282
283 bool WaitSetDispatcher::BeginTransit() {
284 // You can't transfer wait sets!
285 return false;
286 }
287
288 WaitSetDispatcher::~WaitSetDispatcher() {
289 DCHECK(waiting_dispatchers_.empty());
290 DCHECK(awoken_queue_.empty());
291 DCHECK(processed_dispatchers_.empty());
292 }
293
294 void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
295 {
296 base::AutoLock locker(awoken_lock_);
297
298 if (result == MOJO_RESULT_ALREADY_EXISTS)
299 result = MOJO_RESULT_OK;
300
301 awoken_queue_.push_back(std::make_pair(context, result));
302 }
303
304 base::AutoLock locker(awakable_lock_);
305 HandleSignalsState signals_state;
306 signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
307 signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
308 awakable_list_.AwakeForStateChange(signals_state);
309 }
310
311 } // namespace edk
312 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/wait_set_dispatcher.h ('k') | mojo/edk/system/wait_set_dispatcher_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698