OLD | NEW |
1 // Copyright 2017 The Chromium Authors. All rights reserved. | 1 // Copyright 2017 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/system/wait_set.h" | 5 #include "mojo/public/cpp/system/wait_set.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 #include <map> | 9 #include <map> |
| 10 #include <set> |
10 #include <vector> | 11 #include <vector> |
11 | 12 |
12 #include "base/containers/stack_container.h" | 13 #include "base/containers/stack_container.h" |
13 #include "base/logging.h" | 14 #include "base/logging.h" |
14 #include "base/macros.h" | 15 #include "base/macros.h" |
15 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
16 #include "base/synchronization/lock.h" | 17 #include "base/synchronization/lock.h" |
17 #include "base/synchronization/waitable_event.h" | 18 #include "base/synchronization/waitable_event.h" |
18 #include "mojo/public/cpp/system/watcher.h" | 19 #include "mojo/public/cpp/system/watcher.h" |
19 | 20 |
20 namespace mojo { | 21 namespace mojo { |
21 | 22 |
22 class WaitSet::State : public base::RefCountedThreadSafe<State> { | 23 class WaitSet::State : public base::RefCountedThreadSafe<State> { |
23 public: | 24 public: |
24 State() | 25 State() |
25 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL, | 26 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
26 base::WaitableEvent::InitialState::NOT_SIGNALED) { | 27 base::WaitableEvent::InitialState::NOT_SIGNALED) { |
27 MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_); | 28 MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_); |
28 DCHECK_EQ(MOJO_RESULT_OK, rv); | 29 DCHECK_EQ(MOJO_RESULT_OK, rv); |
29 } | 30 } |
30 | 31 |
31 void ShutDown() { | 32 void ShutDown() { |
32 // NOTE: This may immediately invoke Notify for every context. | 33 // NOTE: This may immediately invoke Notify for every context. |
33 watcher_handle_.reset(); | 34 watcher_handle_.reset(); |
34 } | 35 } |
35 | 36 |
| 37 MojoResult AddEvent(base::WaitableEvent* event) { |
| 38 auto result = user_events_.insert(event); |
| 39 if (result.second) |
| 40 return MOJO_RESULT_OK; |
| 41 return MOJO_RESULT_ALREADY_EXISTS; |
| 42 } |
| 43 |
| 44 MojoResult RemoveEvent(base::WaitableEvent* event) { |
| 45 auto it = user_events_.find(event); |
| 46 if (it == user_events_.end()) |
| 47 return MOJO_RESULT_NOT_FOUND; |
| 48 user_events_.erase(it); |
| 49 return MOJO_RESULT_OK; |
| 50 } |
| 51 |
36 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) { | 52 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) { |
37 DCHECK(watcher_handle_.is_valid()); | 53 DCHECK(watcher_handle_.is_valid()); |
38 | 54 |
39 scoped_refptr<Context> context = new Context(this, handle); | 55 scoped_refptr<Context> context = new Context(this, handle); |
40 | 56 |
41 { | 57 { |
42 base::AutoLock lock(lock_); | 58 base::AutoLock lock(lock_); |
43 | 59 |
44 if (handle_to_context_.count(handle)) | 60 if (handle_to_context_.count(handle)) |
45 return MOJO_RESULT_ALREADY_EXISTS; | 61 return MOJO_RESULT_ALREADY_EXISTS; |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
103 { | 119 { |
104 // Always clear |cancelled_contexts_| in case it's accumulated any more | 120 // Always clear |cancelled_contexts_| in case it's accumulated any more |
105 // entries since the last time we ran. | 121 // entries since the last time we ran. |
106 base::AutoLock lock(lock_); | 122 base::AutoLock lock(lock_); |
107 cancelled_contexts_.clear(); | 123 cancelled_contexts_.clear(); |
108 } | 124 } |
109 | 125 |
110 return rv; | 126 return rv; |
111 } | 127 } |
112 | 128 |
113 void Wait(size_t* num_ready_handles, | 129 void Wait(base::WaitableEvent** ready_event, |
| 130 size_t* num_ready_handles, |
114 Handle* ready_handles, | 131 Handle* ready_handles, |
115 MojoResult* ready_results, | 132 MojoResult* ready_results, |
116 MojoHandleSignalsState* signals_states) { | 133 MojoHandleSignalsState* signals_states) { |
117 DCHECK(watcher_handle_.is_valid()); | 134 DCHECK(watcher_handle_.is_valid()); |
118 DCHECK(num_ready_handles); | 135 DCHECK(num_ready_handles); |
119 DCHECK(ready_handles); | 136 DCHECK(ready_handles); |
120 DCHECK(ready_results); | 137 DCHECK(ready_results); |
121 bool should_wait = false; | |
122 { | 138 { |
123 base::AutoLock lock(lock_); | 139 base::AutoLock lock(lock_); |
124 if (ready_handles_.empty()) { | 140 if (ready_handles_.empty()) { |
125 // No handles are currently in the ready set. Make sure the event is | 141 // No handles are currently in the ready set. Make sure the event is |
126 // reset and try to arm the watcher. | 142 // reset and try to arm the watcher. |
127 handle_event_.Reset(); | 143 handle_event_.Reset(); |
128 | 144 |
129 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max()); | 145 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max()); |
130 uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles); | 146 uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles); |
| 147 |
131 base::StackVector<uintptr_t, 4> ready_contexts; | 148 base::StackVector<uintptr_t, 4> ready_contexts; |
132 ready_contexts.container().resize(num_ready_contexts); | 149 ready_contexts.container().resize(num_ready_contexts); |
133 base::StackVector<MojoHandleSignalsState, 4> ready_states; | 150 base::StackVector<MojoHandleSignalsState, 4> ready_states; |
134 MojoHandleSignalsState* out_states = signals_states; | 151 MojoHandleSignalsState* out_states = signals_states; |
135 if (!out_states) { | 152 if (!out_states) { |
136 // If the caller didn't provide a buffer for signal states, we provide | 153 // If the caller didn't provide a buffer for signal states, we provide |
137 // our own locally. MojoArmWatcher() requires one if we want to handle | 154 // our own locally. MojoArmWatcher() requires one if we want to handle |
138 // arming failure properly. | 155 // arming failure properly. |
139 ready_states.container().resize(num_ready_contexts); | 156 ready_states.container().resize(num_ready_contexts); |
140 out_states = ready_states.container().data(); | 157 out_states = ready_states.container().data(); |
141 } | 158 } |
142 MojoResult rv = MojoArmWatcher( | 159 MojoResult rv = MojoArmWatcher( |
143 watcher_handle_.get().value(), &num_ready_contexts, | 160 watcher_handle_.get().value(), &num_ready_contexts, |
144 ready_contexts.container().data(), ready_results, out_states); | 161 ready_contexts.container().data(), ready_results, out_states); |
145 | 162 |
146 if (rv == MOJO_RESULT_FAILED_PRECONDITION) { | 163 if (rv == MOJO_RESULT_FAILED_PRECONDITION) { |
147 // Can't arm because one or more handles is already ready. | 164 // Simulate the handles becoming ready. We do this in lieu of |
148 *num_ready_handles = num_ready_contexts; | 165 // returning the results immediately so as to avoid potentially |
| 166 // starving user events. i.e., we always want to call WaitMany() |
| 167 // below. |
| 168 handle_event_.Signal(); |
149 for (size_t i = 0; i < num_ready_contexts; ++i) { | 169 for (size_t i = 0; i < num_ready_contexts; ++i) { |
150 auto it = contexts_.find(ready_contexts.container()[i]); | 170 auto it = contexts_.find(ready_contexts.container()[i]); |
151 DCHECK(it != contexts_.end()); | 171 DCHECK(it != contexts_.end()); |
152 ready_handles[i] = it->second->handle(); | 172 ready_handles_[it->second->handle()] = {ready_results[i], |
| 173 out_states[i]}; |
153 } | 174 } |
154 return; | 175 } else if (rv == MOJO_RESULT_NOT_FOUND) { |
| 176 // Nothing to watch. If there are no user events, always signal to |
| 177 // avoid deadlock. |
| 178 if (user_events_.empty()) |
| 179 handle_event_.Signal(); |
| 180 } else { |
| 181 // Watcher must be armed now. No need to manually signal. |
| 182 DCHECK_EQ(MOJO_RESULT_OK, rv); |
155 } | 183 } |
156 | |
157 if (rv == MOJO_RESULT_NOT_FOUND) { | |
158 // There are no handles in the set. Nothing to watch. | |
159 *num_ready_handles = 0; | |
160 return; | |
161 } | |
162 | |
163 // Watcher is armed. We can go on waiting for an event to signal. | |
164 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS); | |
165 should_wait = true; | |
166 } | 184 } |
167 } | 185 } |
168 | 186 |
169 if (should_wait) | 187 // Build a local contiguous array of events to wait on. These are rotated |
170 handle_event_.Wait(); | 188 // across Wait() calls to avoid starvation, by virtue of the fact that |
| 189 // WaitMany guarantees left-to-right priority when multiple events are |
| 190 // signaled. |
171 | 191 |
| 192 base::StackVector<base::WaitableEvent*, 4> events; |
| 193 events.container().resize(user_events_.size() + 1); |
| 194 if (waitable_index_shift_ > user_events_.size()) |
| 195 waitable_index_shift_ = 0; |
| 196 |
| 197 size_t dest_index = waitable_index_shift_++; |
| 198 events.container()[dest_index] = &handle_event_; |
| 199 for (auto* e : user_events_) { |
| 200 dest_index = (dest_index + 1) % events.container().size(); |
| 201 events.container()[dest_index] = e; |
| 202 } |
| 203 |
| 204 size_t index = base::WaitableEvent::WaitMany(events.container().data(), |
| 205 events.container().size()); |
172 base::AutoLock lock(lock_); | 206 base::AutoLock lock(lock_); |
173 | 207 |
174 DCHECK(!ready_handles_.empty()); | 208 // Pop as many handles as we can out of the ready set and return them. Note |
175 | 209 // that we do this regardless of which event signaled, as there may be |
176 // Pop as many handles as we can out of the ready set and return them. | 210 // ready handles in any case and they may be interesting to the caller. |
177 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size()); | 211 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size()); |
178 for (size_t i = 0; i < *num_ready_handles; ++i) { | 212 for (size_t i = 0; i < *num_ready_handles; ++i) { |
179 auto it = ready_handles_.begin(); | 213 auto it = ready_handles_.begin(); |
180 ready_handles[i] = it->first; | 214 ready_handles[i] = it->first; |
181 ready_results[i] = it->second.result; | 215 ready_results[i] = it->second.result; |
182 if (signals_states) | 216 if (signals_states) |
183 signals_states[i] = it->second.signals_state; | 217 signals_states[i] = it->second.signals_state; |
184 ready_handles_.erase(it); | 218 ready_handles_.erase(it); |
185 } | 219 } |
| 220 |
| 221 // If the caller cares, let them know which user event unblocked us, if any. |
| 222 if (ready_event) { |
| 223 if (events.container()[index] == &handle_event_) |
| 224 *ready_event = nullptr; |
| 225 else |
| 226 *ready_event = events.container()[index]; |
| 227 } |
186 } | 228 } |
187 | 229 |
188 private: | 230 private: |
189 friend class base::RefCountedThreadSafe<State>; | 231 friend class base::RefCountedThreadSafe<State>; |
190 | 232 |
191 class Context : public base::RefCountedThreadSafe<Context> { | 233 class Context : public base::RefCountedThreadSafe<Context> { |
192 public: | 234 public: |
193 Context(scoped_refptr<State> state, Handle handle) | 235 Context(scoped_refptr<State> state, Handle handle) |
194 : state_(state), handle_(handle) {} | 236 : state_(state), handle_(handle) {} |
195 | 237 |
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
275 | 317 |
276 // Not guarded by lock. Must only be accessed from the WaitSet's owning | 318 // Not guarded by lock. Must only be accessed from the WaitSet's owning |
277 // thread. | 319 // thread. |
278 ScopedWatcherHandle watcher_handle_; | 320 ScopedWatcherHandle watcher_handle_; |
279 | 321 |
280 base::Lock lock_; | 322 base::Lock lock_; |
281 std::map<uintptr_t, scoped_refptr<Context>> contexts_; | 323 std::map<uintptr_t, scoped_refptr<Context>> contexts_; |
282 std::map<Handle, scoped_refptr<Context>> handle_to_context_; | 324 std::map<Handle, scoped_refptr<Context>> handle_to_context_; |
283 std::map<Handle, ReadyState> ready_handles_; | 325 std::map<Handle, ReadyState> ready_handles_; |
284 std::vector<scoped_refptr<Context>> cancelled_contexts_; | 326 std::vector<scoped_refptr<Context>> cancelled_contexts_; |
| 327 std::set<base::WaitableEvent*> user_events_; |
285 | 328 |
286 // Event signaled any time a handle notification is received. | 329 // Event signaled any time a handle notification is received. |
287 base::WaitableEvent handle_event_; | 330 base::WaitableEvent handle_event_; |
288 | 331 |
| 332 // Offset by which to rotate the current set of waitable objects. This is used |
| 333 // to guard against event starvation, as base::WaitableEvent::WaitMany gives |
| 334 // preference to events in left-to-right order. |
| 335 size_t waitable_index_shift_ = 0; |
| 336 |
289 DISALLOW_COPY_AND_ASSIGN(State); | 337 DISALLOW_COPY_AND_ASSIGN(State); |
290 }; | 338 }; |
291 | 339 |
292 WaitSet::WaitSet() : state_(new State) {} | 340 WaitSet::WaitSet() : state_(new State) {} |
293 | 341 |
294 WaitSet::~WaitSet() { | 342 WaitSet::~WaitSet() { |
295 state_->ShutDown(); | 343 state_->ShutDown(); |
296 } | 344 } |
297 | 345 |
| 346 MojoResult WaitSet::AddEvent(base::WaitableEvent* event) { |
| 347 return state_->AddEvent(event); |
| 348 } |
| 349 |
| 350 MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) { |
| 351 return state_->RemoveEvent(event); |
| 352 } |
| 353 |
298 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) { | 354 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) { |
299 return state_->AddHandle(handle, signals); | 355 return state_->AddHandle(handle, signals); |
300 } | 356 } |
301 | 357 |
302 MojoResult WaitSet::RemoveHandle(Handle handle) { | 358 MojoResult WaitSet::RemoveHandle(Handle handle) { |
303 return state_->RemoveHandle(handle); | 359 return state_->RemoveHandle(handle); |
304 } | 360 } |
305 | 361 |
306 void WaitSet::Wait(size_t* num_ready_handles, | 362 void WaitSet::Wait(base::WaitableEvent** ready_event, |
| 363 size_t* num_ready_handles, |
307 Handle* ready_handles, | 364 Handle* ready_handles, |
308 MojoResult* ready_results, | 365 MojoResult* ready_results, |
309 MojoHandleSignalsState* signals_states) { | 366 MojoHandleSignalsState* signals_states) { |
310 state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states); | 367 state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results, |
| 368 signals_states); |
311 } | 369 } |
312 | 370 |
313 } // namespace mojo | 371 } // namespace mojo |
OLD | NEW |