Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(635)

Side by Side Diff: mojo/public/cpp/system/wait_set.cc

Issue 2744943002: Mojo: Move waiting APIs to public library (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/system/wait_set.h ('k') | mojo/public/cpp/test_support/lib/test_utils.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/system/wait_set.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <map>
10 #include <vector>
11
12 #include "base/containers/stack_container.h"
13 #include "base/logging.h"
14 #include "base/macros.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/synchronization/lock.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "mojo/public/cpp/system/watcher.h"
19
20 namespace mojo {
21
22 class WaitSet::State : public base::RefCountedThreadSafe<State> {
23 public:
24 State()
25 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
26 base::WaitableEvent::InitialState::NOT_SIGNALED) {
27 MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_);
28 DCHECK_EQ(MOJO_RESULT_OK, rv);
29 }
30
31 void ShutDown() {
32 // NOTE: This may immediately invoke Notify for every context.
33 watcher_handle_.reset();
34 }
35
36 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
37 DCHECK(watcher_handle_.is_valid());
38
39 scoped_refptr<Context> context = new Context(this, handle);
40
41 {
42 base::AutoLock lock(lock_);
43
44 if (handle_to_context_.count(handle))
45 return MOJO_RESULT_ALREADY_EXISTS;
46 DCHECK(!contexts_.count(context->context_value()));
47
48 handle_to_context_[handle] = context;
49 contexts_[context->context_value()] = context;
50 }
51
52 // Balanced in State::Notify() with MOJO_RESULT_CANCELLED if
53 // MojoWatch() succeeds. Otherwise balanced immediately below.
54 context->AddRef();
55
56 // This can notify immediately if the watcher is already armed. Don't hold
57 // |lock_| while calling it.
58 MojoResult rv = MojoWatch(watcher_handle_.get().value(), handle.value(),
59 signals, context->context_value());
60 if (rv == MOJO_RESULT_INVALID_ARGUMENT) {
61 base::AutoLock lock(lock_);
62 handle_to_context_.erase(handle);
63 contexts_.erase(context->context_value());
64
65 // Balanced above.
66 context->Release();
67 return rv;
68 }
69 DCHECK_EQ(MOJO_RESULT_OK, rv);
70
71 return rv;
72 }
73
74 MojoResult RemoveHandle(Handle handle) {
75 DCHECK(watcher_handle_.is_valid());
76
77 scoped_refptr<Context> context;
78 {
79 base::AutoLock lock(lock_);
80 auto it = handle_to_context_.find(handle);
81 if (it == handle_to_context_.end())
82 return MOJO_RESULT_NOT_FOUND;
83
84 context = std::move(it->second);
85 handle_to_context_.erase(it);
86
87 // Ensure that we never return this handle as a ready result again. Note
88 // that it's removal from |handle_to_context_| above ensures it will never
89 // be added back to this map.
90 ready_handles_.erase(handle);
91 }
92
93 // NOTE: This may enter the notification callback immediately, so don't hold
94 // |lock_| while calling it.
95 MojoResult rv = MojoCancelWatch(watcher_handle_.get().value(),
96 context->context_value());
97
98 // We don't really care whether or not this succeeds. In either case, the
99 // context was or will imminently be cancelled and moved from |contexts_|
100 // to |cancelled_contexts_|.
101 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_NOT_FOUND);
102
103 {
104 // Always clear |cancelled_contexts_| in case it's accumulated any more
105 // entries since the last time we ran.
106 base::AutoLock lock(lock_);
107 cancelled_contexts_.clear();
108 }
109
110 return rv;
111 }
112
113 void Wait(size_t* num_ready_handles,
114 Handle* ready_handles,
115 MojoResult* ready_results,
116 MojoHandleSignalsState* signals_states) {
117 DCHECK(watcher_handle_.is_valid());
118 DCHECK(num_ready_handles);
119 DCHECK(ready_handles);
120 DCHECK(ready_results);
121 bool should_wait = false;
122 {
123 base::AutoLock lock(lock_);
124 if (ready_handles_.empty()) {
125 // No handles are currently in the ready set. Make sure the event is
126 // reset and try to arm the watcher.
127 handle_event_.Reset();
128
129 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
130 uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles);
131 base::StackVector<uintptr_t, 4> ready_contexts;
132 ready_contexts.container().resize(num_ready_contexts);
133 base::StackVector<MojoHandleSignalsState, 4> ready_states;
134 MojoHandleSignalsState* out_states = signals_states;
135 if (!out_states) {
136 // If the caller didn't provide a buffer for signal states, we provide
137 // our own locally. MojoArmWatcher() requires one if we want to handle
138 // arming failure properly.
139 ready_states.container().resize(num_ready_contexts);
140 out_states = ready_states.container().data();
141 }
142 MojoResult rv = MojoArmWatcher(
143 watcher_handle_.get().value(), &num_ready_contexts,
144 ready_contexts.container().data(), ready_results, out_states);
145
146 if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
147 // Can't arm because one or more handles is already ready.
148 *num_ready_handles = num_ready_contexts;
149 for (size_t i = 0; i < num_ready_contexts; ++i) {
150 auto it = contexts_.find(ready_contexts.container()[i]);
151 DCHECK(it != contexts_.end());
152 ready_handles[i] = it->second->handle();
153 }
154 return;
155 }
156
157 if (rv == MOJO_RESULT_NOT_FOUND) {
158 // There are no handles in the set. Nothing to watch.
159 *num_ready_handles = 0;
160 return;
161 }
162
163 // Watcher is armed. We can go on waiting for an event to signal.
164 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS);
165 should_wait = true;
166 }
167 }
168
169 if (should_wait)
170 handle_event_.Wait();
171
172 base::AutoLock lock(lock_);
173
174 DCHECK(!ready_handles_.empty());
175
176 // Pop as many handles as we can out of the ready set and return them.
177 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
178 for (size_t i = 0; i < *num_ready_handles; ++i) {
179 auto it = ready_handles_.begin();
180 ready_handles[i] = it->first;
181 ready_results[i] = it->second.result;
182 if (signals_states)
183 signals_states[i] = it->second.signals_state;
184 ready_handles_.erase(it);
185 }
186 }
187
188 private:
189 friend class base::RefCountedThreadSafe<State>;
190
191 class Context : public base::RefCountedThreadSafe<Context> {
192 public:
193 Context(scoped_refptr<State> state, Handle handle)
194 : state_(state), handle_(handle) {}
195
196 Handle handle() const { return handle_; }
197
198 uintptr_t context_value() const {
199 return reinterpret_cast<uintptr_t>(this);
200 }
201
202 static void OnNotification(uintptr_t context,
203 MojoResult result,
204 MojoHandleSignalsState signals_state,
205 MojoWatcherNotificationFlags flags) {
206 reinterpret_cast<Context*>(context)->Notify(result, signals_state);
207 }
208
209 private:
210 friend class base::RefCountedThreadSafe<Context>;
211
212 ~Context() {}
213
214 void Notify(MojoResult result, MojoHandleSignalsState signals_state) {
215 state_->Notify(handle_, result, signals_state, this);
216 }
217
218 const scoped_refptr<State> state_;
219 const Handle handle_;
220
221 DISALLOW_COPY_AND_ASSIGN(Context);
222 };
223
224 ~State() {}
225
226 void Notify(Handle handle,
227 MojoResult result,
228 MojoHandleSignalsState signals_state,
229 Context* context) {
230 base::AutoLock lock(lock_);
231
232 // This could be a cancellation notification following an explicit
233 // RemoveHandle(), in which case we really don't care and don't want to
234 // add it to the ready set. Only update and signal if that's not the case.
235 if (!handle_to_context_.count(handle)) {
236 DCHECK_EQ(MOJO_RESULT_CANCELLED, result);
237 } else {
238 ready_handles_[handle] = {result, signals_state};
239 handle_event_.Signal();
240 }
241
242 // Whether it's an implicit or explicit cancellation, erase from |contexts_|
243 // and append to |cancelled_contexts_|.
244 if (result == MOJO_RESULT_CANCELLED) {
245 contexts_.erase(context->context_value());
246 handle_to_context_.erase(handle);
247
248 // NOTE: We retain a context ref in |cancelled_contexts_| to ensure that
249 // this Context's heap address is not reused too soon. For example, it
250 // would otherwise be possible for the user to call AddHandle() from the
251 // WaitSet's thread immediately after this notification has fired on
252 // another thread, potentially reusing the same heap address for the newly
253 // added Context; and then they may call RemoveHandle() for this handle
254 // (not knowing its context has just been implicitly cancelled) and
255 // cause the new Context to be incorrectly removed from |contexts_|.
256 //
257 // This vector is cleared on the WaitSet's own thread every time
258 // RemoveHandle is called.
259 cancelled_contexts_.emplace_back(make_scoped_refptr(context));
260
261 // Balanced in State::AddHandle().
262 context->Release();
263 }
264 }
265
266 struct ReadyState {
267 ReadyState() = default;
268 ReadyState(MojoResult result, MojoHandleSignalsState signals_state)
269 : result(result), signals_state(signals_state) {}
270 ~ReadyState() = default;
271
272 MojoResult result = MOJO_RESULT_UNKNOWN;
273 MojoHandleSignalsState signals_state = {0, 0};
274 };
275
276 // Not guarded by lock. Must only be accessed from the WaitSet's owning
277 // thread.
278 ScopedWatcherHandle watcher_handle_;
279
280 base::Lock lock_;
281 std::map<uintptr_t, scoped_refptr<Context>> contexts_;
282 std::map<Handle, scoped_refptr<Context>> handle_to_context_;
283 std::map<Handle, ReadyState> ready_handles_;
284 std::vector<scoped_refptr<Context>> cancelled_contexts_;
285
286 // Event signaled any time a handle notification is received.
287 base::WaitableEvent handle_event_;
288
289 DISALLOW_COPY_AND_ASSIGN(State);
290 };
291
292 WaitSet::WaitSet() : state_(new State) {}
293
294 WaitSet::~WaitSet() {
295 state_->ShutDown();
296 }
297
298 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
299 return state_->AddHandle(handle, signals);
300 }
301
302 MojoResult WaitSet::RemoveHandle(Handle handle) {
303 return state_->RemoveHandle(handle);
304 }
305
306 void WaitSet::Wait(size_t* num_ready_handles,
307 Handle* ready_handles,
308 MojoResult* ready_results,
309 MojoHandleSignalsState* signals_states) {
310 state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states);
311 }
312
313 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/system/wait_set.h ('k') | mojo/public/cpp/test_support/lib/test_utils.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698