Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(314)

Side by Side Diff: mojo/public/cpp/system/wait_set.cc

Issue 2744943002: Mojo: Move waiting APIs to public library (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/system/wait_set.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <map>
10 #include <vector>
11
12 #include "base/containers/stack_container.h"
13 #include "base/logging.h"
14 #include "base/macros.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/synchronization/lock.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "mojo/public/cpp/system/watcher.h"
19
20 namespace mojo {
21
22 class WaitSet::State : public base::RefCountedThreadSafe<State> {
23 public:
24 State()
25 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
26 base::WaitableEvent::InitialState::NOT_SIGNALED) {
27 MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_);
28 DCHECK_EQ(MOJO_RESULT_OK, rv);
29 }
30
31 void ShutDown() {
32 // NOTE: This may immediately invoke Notify for every context.
33 watcher_handle_.reset();
34 }
35
36 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
37 DCHECK(watcher_handle_.is_valid());
38
39 scoped_refptr<Context> context = new Context(this, handle);
40
41 {
42 base::AutoLock lock(lock_);
43
44 if (handle_to_context_.count(handle))
45 return MOJO_RESULT_ALREADY_EXISTS;
46 DCHECK(!contexts_.count(context->context_value()));
47
48 handle_to_context_[handle] = context;
49 contexts_[context->context_value()] = context;
50 }
51
52 // Balanced in State::Notify() with MOJO_RESULT_CANCELLED if
53 // MojoWatch() succeeds. Otherwise balanced immediately below.
54 context->AddRef();
55
56 // This can notify immediately if the watcher is already armed. Don't hold
57 // |lock_| while calling it.
58 MojoResult rv = MojoWatch(watcher_handle_.get().value(), handle.value(),
59 signals, context->context_value());
60 if (rv == MOJO_RESULT_INVALID_ARGUMENT) {
61 base::AutoLock lock(lock_);
62 handle_to_context_.erase(handle);
63 contexts_.erase(context->context_value());
64
65 // Balanced above.
66 context->Release();
67 return rv;
68 }
69 DCHECK_EQ(MOJO_RESULT_OK, rv);
70
71 return rv;
72 }
73
74 MojoResult RemoveHandle(Handle handle) {
75 DCHECK(watcher_handle_.is_valid());
76
77 scoped_refptr<Context> context;
78 {
79 base::AutoLock lock(lock_);
80 auto it = handle_to_context_.find(handle);
81 if (it == handle_to_context_.end())
82 return MOJO_RESULT_NOT_FOUND;
83
84 context = std::move(it->second);
85 handle_to_context_.erase(it);
86
87 // Ensure that we never return this handle as a ready result again. Note
88 // that it's removal from |handle_to_context_| above ensures it will never
89 // be added back to this map.
90 ready_handles_.erase(handle);
91 }
92
93 // NOTE: This may enter the notification callback immediately, so don't hold
94 // |lock_| while calling it.
95 MojoResult rv = MojoCancelWatch(watcher_handle_.get().value(),
96 context->context_value());
97
98 // We don't really care whether or not this succeeds. In either case, the
99 // context was or will imminently be cancelled and moved from |contexts_|
100 // to |cancelled_contexts_|.
101 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
102
103 {
104 // Always clear |cancelled_contexts_| in case it's accumulated any more
105 // entries since the last time we ran.
106 base::AutoLock lock(lock_);
107 cancelled_contexts_.clear();
108 }
109
110 return rv;
111 }
112
113 void Wait(size_t* num_ready_handles,
114 Handle* ready_handles,
115 MojoResult* ready_results,
116 MojoHandleSignalsState* signals_states) {
117 DCHECK(watcher_handle_.is_valid());
118 DCHECK(num_ready_handles);
119 DCHECK(ready_handles);
120 DCHECK(ready_results);
121 bool should_wait = false;
122 {
123 base::AutoLock lock(lock_);
124 if (ready_handles_.empty()) {
125 // No handles are currently in the ready set. Make sure the event is
126 // reset and try to arm the watcher.
127 handle_event_.Reset();
128
129 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
130 uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles);
131 base::StackVector<uintptr_t, 4> ready_contexts;
132 ready_contexts.container().resize(num_ready_contexts);
133 base::StackVector<MojoHandleSignalsState, 4> ready_states;
134 if (!signals_states) {
135 // If the caller didn't provide a buffer for signal states, we provide
136 // our own locally. MojoArmWatcher() requires one if we want to handle
137 // arming failure properly.
138 ready_states.container().resize(num_ready_contexts);
139 signals_states = ready_states.container().data();
140 }
141 MojoResult rv = MojoArmWatcher(
142 watcher_handle_.get().value(), &num_ready_contexts,
143 ready_contexts.container().data(), ready_results, signals_states);
144
145 if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
146 // Can't arm because one or more handles is already ready.
147 *num_ready_handles = num_ready_contexts;
148 for (size_t i = 0; i < num_ready_contexts; ++i) {
149 auto it = contexts_.find(ready_contexts.container()[i]);
150 DCHECK(it != contexts_.end());
151 ready_handles[i] = it->second->handle();
152 }
153 return;
154 }
155
156 if (rv == MOJO_RESULT_NOT_FOUND) {
157 // There are no handles in the set. Nothing to watch.
158 *num_ready_handles = 0;
159 return;
160 }
161
162 // Watcher is armed. We can go on waiting for an event to signal.
163 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS);
164 should_wait = true;
165 }
166 }
167
168 if (should_wait)
169 handle_event_.Wait();
170
171 base::AutoLock lock(lock_);
172
173 DCHECK(!ready_handles_.empty());
174
175 // Pop as many handles as we can out of the ready set and return them.
176 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
177 for (size_t i = 0; i < *num_ready_handles; ++i) {
178 auto it = ready_handles_.begin();
179 ready_handles[i] = it->first;
180 ready_results[i] = it->second.result;
181 if (signals_states)
182 signals_states[i] = it->second.signals_state;
183 ready_handles_.erase(it);
184 }
185 }
186
187 private:
188 friend class base::RefCountedThreadSafe<State>;
189
190 class Context : public base::RefCountedThreadSafe<Context> {
191 public:
192 Context(scoped_refptr<State> state, Handle handle)
193 : state_(state), handle_(handle) {}
194
195 Handle handle() const { return handle_; }
196
197 uintptr_t context_value() const {
198 return reinterpret_cast<uintptr_t>(this);
199 }
200
201 static void OnNotification(uintptr_t context,
202 MojoResult result,
203 MojoHandleSignalsState signals_state,
204 MojoWatcherNotificationFlags flags) {
205 reinterpret_cast<Context*>(context)->Notify(result, signals_state);
206 }
207
208 private:
209 friend class base::RefCountedThreadSafe<Context>;
210
211 ~Context() {}
212
213 void Notify(MojoResult result, MojoHandleSignalsState signals_state) {
214 state_->Notify(handle_, result, signals_state, this);
215 }
216
217 const scoped_refptr<State> state_;
218 const Handle handle_;
219
220 DISALLOW_COPY_AND_ASSIGN(Context);
221 };
222
223 ~State() {}
224
225 void Notify(Handle handle,
226 MojoResult result,
227 MojoHandleSignalsState signals_state,
228 Context* context) {
229 base::AutoLock lock(lock_);
230
231 // This could be a cancellation notification following an explicit
232 // RemoveHandle(), in which case we really don't care and don't want to
233 // add it to the ready set. Only update and signal if that's not the case.
234 if (!handle_to_context_.count(handle)) {
235 DCHECK_EQ(MOJO_RESULT_CANCELLED, result);
236 } else {
237 ready_handles_[handle] = {result, signals_state};
238 handle_event_.Signal();
239 }
240
241 // Whether it's an implicit or explicit cancellation, erase from |contexts_|
242 // and append to |cancelled_contexts_|.
243 if (result == MOJO_RESULT_CANCELLED) {
244 contexts_.erase(context->context_value());
245 handle_to_context_.erase(handle);
246
247 // NOTE: We retain a context ref in |cancelled_contexts_| to ensure that
248 // this Context's heap address is not reused too soon. For example, it
249 // would otherwise be possible for the user to call AddHandle() from the
250 // WaitSet's thread immediately after this notification has fired on
251 // another thread, potentially reusing the same heap address for the newly
252 // added Context; and then they may call RemoveHandle() for this handle
253 // (not knowing its context has just been implicitly cancelled) and
254 // cause the new Context to be incorrectly removed from |contexts_|.
255 //
256 // This vector is cleared on the WaitSet's own thread every time
257 // RemoveHandle is called.
258 cancelled_contexts_.emplace_back(make_scoped_refptr(context));
259
260 // Balanced in State::AddHandle().
261 context->Release();
262 }
263 }
264
265 struct ReadyState {
266 ReadyState() = default;
267 ReadyState(MojoResult result, MojoHandleSignalsState signals_state)
268 : result(result), signals_state(signals_state) {}
269 ~ReadyState() = default;
270
271 MojoResult result = MOJO_RESULT_UNKNOWN;
272 MojoHandleSignalsState signals_state = {0, 0};
273 };
274
275 // Not guarded by lock. Must only be accessed from the WaitSet's owning
276 // thread.
277 ScopedWatcherHandle watcher_handle_;
278
279 base::Lock lock_;
280 std::map<uintptr_t, scoped_refptr<Context>> contexts_;
281 std::map<Handle, scoped_refptr<Context>> handle_to_context_;
282 std::map<Handle, ReadyState> ready_handles_;
283 std::vector<scoped_refptr<Context>> cancelled_contexts_;
284
285 // Event signaled any time a handle notification is received.
286 base::WaitableEvent handle_event_;
287
288 DISALLOW_COPY_AND_ASSIGN(State);
289 };
290
291 WaitSet::WaitSet() : state_(new State) {}
292
293 WaitSet::~WaitSet() {
294 state_->ShutDown();
295 }
296
297 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
298 return state_->AddHandle(handle, signals);
299 }
300
301 MojoResult WaitSet::RemoveHandle(Handle handle) {
302 return state_->RemoveHandle(handle);
303 }
304
305 void WaitSet::Wait(size_t* num_ready_handles,
306 Handle* ready_handles,
307 MojoResult* ready_results,
308 MojoHandleSignalsState* signals_states) {
309 state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states);
310 }
311
312 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698