Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Side by Side Diff: mojo/public/cpp/system/wait_set.cc

Issue 2754143005: Use WaitableEvents to wake up sync IPC waiting (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/system/wait_set.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/system/wait_set.h" 5 #include "mojo/public/cpp/system/wait_set.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 #include <map> 9 #include <map>
10 #include <set>
10 #include <vector> 11 #include <vector>
11 12
12 #include "base/containers/stack_container.h" 13 #include "base/containers/stack_container.h"
13 #include "base/logging.h" 14 #include "base/logging.h"
14 #include "base/macros.h" 15 #include "base/macros.h"
15 #include "base/memory/ptr_util.h" 16 #include "base/memory/ptr_util.h"
16 #include "base/synchronization/lock.h" 17 #include "base/synchronization/lock.h"
17 #include "base/synchronization/waitable_event.h" 18 #include "base/synchronization/waitable_event.h"
18 #include "mojo/public/cpp/system/watcher.h" 19 #include "mojo/public/cpp/system/watcher.h"
19 20
20 namespace mojo { 21 namespace mojo {
21 22
22 class WaitSet::State : public base::RefCountedThreadSafe<State> { 23 class WaitSet::State : public base::RefCountedThreadSafe<State> {
23 public: 24 public:
24 State() 25 State()
25 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL, 26 : handle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
26 base::WaitableEvent::InitialState::NOT_SIGNALED) { 27 base::WaitableEvent::InitialState::NOT_SIGNALED) {
27 MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_); 28 MojoResult rv = CreateWatcher(&Context::OnNotification, &watcher_handle_);
28 DCHECK_EQ(MOJO_RESULT_OK, rv); 29 DCHECK_EQ(MOJO_RESULT_OK, rv);
29 } 30 }
30 31
31 void ShutDown() { 32 void ShutDown() {
32 // NOTE: This may immediately invoke Notify for every context. 33 // NOTE: This may immediately invoke Notify for every context.
33 watcher_handle_.reset(); 34 watcher_handle_.reset();
34 } 35 }
35 36
37 MojoResult AddEvent(base::WaitableEvent* event) {
38 auto result = user_events_.insert(event);
39 if (result.second)
40 return MOJO_RESULT_OK;
41 return MOJO_RESULT_ALREADY_EXISTS;
42 }
43
44 MojoResult RemoveEvent(base::WaitableEvent* event) {
45 auto it = user_events_.find(event);
46 if (it == user_events_.end())
47 return MOJO_RESULT_NOT_FOUND;
48 user_events_.erase(it);
49 return MOJO_RESULT_OK;
50 }
51
36 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) { 52 MojoResult AddHandle(Handle handle, MojoHandleSignals signals) {
37 DCHECK(watcher_handle_.is_valid()); 53 DCHECK(watcher_handle_.is_valid());
38 54
39 scoped_refptr<Context> context = new Context(this, handle); 55 scoped_refptr<Context> context = new Context(this, handle);
40 56
41 { 57 {
42 base::AutoLock lock(lock_); 58 base::AutoLock lock(lock_);
43 59
44 if (handle_to_context_.count(handle)) 60 if (handle_to_context_.count(handle))
45 return MOJO_RESULT_ALREADY_EXISTS; 61 return MOJO_RESULT_ALREADY_EXISTS;
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
103 { 119 {
104 // Always clear |cancelled_contexts_| in case it's accumulated any more 120 // Always clear |cancelled_contexts_| in case it's accumulated any more
105 // entries since the last time we ran. 121 // entries since the last time we ran.
106 base::AutoLock lock(lock_); 122 base::AutoLock lock(lock_);
107 cancelled_contexts_.clear(); 123 cancelled_contexts_.clear();
108 } 124 }
109 125
110 return rv; 126 return rv;
111 } 127 }
112 128
113 void Wait(size_t* num_ready_handles, 129 void Wait(base::WaitableEvent** ready_event,
130 size_t* num_ready_handles,
114 Handle* ready_handles, 131 Handle* ready_handles,
115 MojoResult* ready_results, 132 MojoResult* ready_results,
116 MojoHandleSignalsState* signals_states) { 133 MojoHandleSignalsState* signals_states) {
117 DCHECK(watcher_handle_.is_valid()); 134 DCHECK(watcher_handle_.is_valid());
118 DCHECK(num_ready_handles); 135 DCHECK(num_ready_handles);
119 DCHECK(ready_handles); 136 DCHECK(ready_handles);
120 DCHECK(ready_results); 137 DCHECK(ready_results);
121 bool should_wait = false;
122 { 138 {
123 base::AutoLock lock(lock_); 139 base::AutoLock lock(lock_);
124 if (ready_handles_.empty()) { 140 if (ready_handles_.empty()) {
125 // No handles are currently in the ready set. Make sure the event is 141 // No handles are currently in the ready set. Make sure the event is
126 // reset and try to arm the watcher. 142 // reset and try to arm the watcher.
127 handle_event_.Reset(); 143 handle_event_.Reset();
128 144
129 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max()); 145 DCHECK_LE(*num_ready_handles, std::numeric_limits<uint32_t>::max());
130 uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles); 146 uint32_t num_ready_contexts = static_cast<uint32_t>(*num_ready_handles);
147
131 base::StackVector<uintptr_t, 4> ready_contexts; 148 base::StackVector<uintptr_t, 4> ready_contexts;
132 ready_contexts.container().resize(num_ready_contexts); 149 ready_contexts.container().resize(num_ready_contexts);
133 base::StackVector<MojoHandleSignalsState, 4> ready_states; 150 base::StackVector<MojoHandleSignalsState, 4> ready_states;
134 MojoHandleSignalsState* out_states = signals_states; 151 MojoHandleSignalsState* out_states = signals_states;
135 if (!out_states) { 152 if (!out_states) {
136 // If the caller didn't provide a buffer for signal states, we provide 153 // If the caller didn't provide a buffer for signal states, we provide
137 // our own locally. MojoArmWatcher() requires one if we want to handle 154 // our own locally. MojoArmWatcher() requires one if we want to handle
138 // arming failure properly. 155 // arming failure properly.
139 ready_states.container().resize(num_ready_contexts); 156 ready_states.container().resize(num_ready_contexts);
140 out_states = ready_states.container().data(); 157 out_states = ready_states.container().data();
141 } 158 }
142 MojoResult rv = MojoArmWatcher( 159 MojoResult rv = MojoArmWatcher(
143 watcher_handle_.get().value(), &num_ready_contexts, 160 watcher_handle_.get().value(), &num_ready_contexts,
144 ready_contexts.container().data(), ready_results, out_states); 161 ready_contexts.container().data(), ready_results, out_states);
145 162
146 if (rv == MOJO_RESULT_FAILED_PRECONDITION) { 163 if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
147 // Can't arm because one or more handles is already ready. 164 // Simulate the handles becoming ready. We do this in lieu of
148 *num_ready_handles = num_ready_contexts; 165 // returning the results immediately so as to avoid potentially
166 // starving user events. i.e., we always want to call WaitMany()
167 // below.
168 handle_event_.Signal();
149 for (size_t i = 0; i < num_ready_contexts; ++i) { 169 for (size_t i = 0; i < num_ready_contexts; ++i) {
150 auto it = contexts_.find(ready_contexts.container()[i]); 170 auto it = contexts_.find(ready_contexts.container()[i]);
151 DCHECK(it != contexts_.end()); 171 DCHECK(it != contexts_.end());
152 ready_handles[i] = it->second->handle(); 172 ready_handles_[it->second->handle()] = {ready_results[i],
173 out_states[i]};
153 } 174 }
154 return; 175 } else if (rv == MOJO_RESULT_NOT_FOUND) {
176 // Nothing to watch. If there are no user events, always signal to
177 // avoid deadlock.
178 if (user_events_.empty())
179 handle_event_.Signal();
180 } else {
181 // Watcher must be armed now. No need to manually signal.
182 DCHECK_EQ(MOJO_RESULT_OK, rv);
155 } 183 }
156
157 if (rv == MOJO_RESULT_NOT_FOUND) {
158 // There are no handles in the set. Nothing to watch.
159 *num_ready_handles = 0;
160 return;
161 }
162
163 // Watcher is armed. We can go on waiting for an event to signal.
164 DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_ALREADY_EXISTS);
165 should_wait = true;
166 } 184 }
167 } 185 }
168 186
169 if (should_wait) 187 // Build a local contiguous array of events to wait on. These are rotated
170 handle_event_.Wait(); 188 // across Wait() calls to avoid starvation, by virtue of the fact that
189 // WaitMany guarantees left-to-right priority when multiple events are
190 // signaled.
171 191
192 base::StackVector<base::WaitableEvent*, 4> events;
193 events.container().resize(user_events_.size() + 1);
194 if (waitable_index_shift_ > user_events_.size())
195 waitable_index_shift_ = 0;
196
197 size_t dest_index = waitable_index_shift_++;
198 events.container()[dest_index] = &handle_event_;
199 for (auto* e : user_events_) {
200 dest_index = (dest_index + 1) % events.container().size();
201 events.container()[dest_index] = e;
202 }
203
204 size_t index = base::WaitableEvent::WaitMany(events.container().data(),
205 events.container().size());
172 base::AutoLock lock(lock_); 206 base::AutoLock lock(lock_);
173 207
174 DCHECK(!ready_handles_.empty()); 208 // Pop as many handles as we can out of the ready set and return them. Note
175 209 // that we do this regardless of which event signaled, as there may be
176 // Pop as many handles as we can out of the ready set and return them. 210 // ready handles in any case and they may be interesting to the caller.
177 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size()); 211 *num_ready_handles = std::min(*num_ready_handles, ready_handles_.size());
178 for (size_t i = 0; i < *num_ready_handles; ++i) { 212 for (size_t i = 0; i < *num_ready_handles; ++i) {
179 auto it = ready_handles_.begin(); 213 auto it = ready_handles_.begin();
180 ready_handles[i] = it->first; 214 ready_handles[i] = it->first;
181 ready_results[i] = it->second.result; 215 ready_results[i] = it->second.result;
182 if (signals_states) 216 if (signals_states)
183 signals_states[i] = it->second.signals_state; 217 signals_states[i] = it->second.signals_state;
184 ready_handles_.erase(it); 218 ready_handles_.erase(it);
185 } 219 }
220
221 // If the caller cares, let them know which user event unblocked us, if any.
222 if (ready_event) {
223 if (events.container()[index] == &handle_event_)
224 *ready_event = nullptr;
225 else
226 *ready_event = events.container()[index];
227 }
186 } 228 }
187 229
188 private: 230 private:
189 friend class base::RefCountedThreadSafe<State>; 231 friend class base::RefCountedThreadSafe<State>;
190 232
191 class Context : public base::RefCountedThreadSafe<Context> { 233 class Context : public base::RefCountedThreadSafe<Context> {
192 public: 234 public:
193 Context(scoped_refptr<State> state, Handle handle) 235 Context(scoped_refptr<State> state, Handle handle)
194 : state_(state), handle_(handle) {} 236 : state_(state), handle_(handle) {}
195 237
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
275 317
276 // Not guarded by lock. Must only be accessed from the WaitSet's owning 318 // Not guarded by lock. Must only be accessed from the WaitSet's owning
277 // thread. 319 // thread.
278 ScopedWatcherHandle watcher_handle_; 320 ScopedWatcherHandle watcher_handle_;
279 321
280 base::Lock lock_; 322 base::Lock lock_;
281 std::map<uintptr_t, scoped_refptr<Context>> contexts_; 323 std::map<uintptr_t, scoped_refptr<Context>> contexts_;
282 std::map<Handle, scoped_refptr<Context>> handle_to_context_; 324 std::map<Handle, scoped_refptr<Context>> handle_to_context_;
283 std::map<Handle, ReadyState> ready_handles_; 325 std::map<Handle, ReadyState> ready_handles_;
284 std::vector<scoped_refptr<Context>> cancelled_contexts_; 326 std::vector<scoped_refptr<Context>> cancelled_contexts_;
327 std::set<base::WaitableEvent*> user_events_;
285 328
286 // Event signaled any time a handle notification is received. 329 // Event signaled any time a handle notification is received.
287 base::WaitableEvent handle_event_; 330 base::WaitableEvent handle_event_;
288 331
332 // Offset by which to rotate the current set of waitable objects. This is used
333 // to guard against event starvation, as base::WaitableEvent::WaitMany gives
334 // preference to events in left-to-right order.
335 size_t waitable_index_shift_ = 0;
336
289 DISALLOW_COPY_AND_ASSIGN(State); 337 DISALLOW_COPY_AND_ASSIGN(State);
290 }; 338 };
291 339
292 WaitSet::WaitSet() : state_(new State) {} 340 WaitSet::WaitSet() : state_(new State) {}
293 341
294 WaitSet::~WaitSet() { 342 WaitSet::~WaitSet() {
295 state_->ShutDown(); 343 state_->ShutDown();
296 } 344 }
297 345
346 MojoResult WaitSet::AddEvent(base::WaitableEvent* event) {
347 return state_->AddEvent(event);
348 }
349
350 MojoResult WaitSet::RemoveEvent(base::WaitableEvent* event) {
351 return state_->RemoveEvent(event);
352 }
353
298 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) { 354 MojoResult WaitSet::AddHandle(Handle handle, MojoHandleSignals signals) {
299 return state_->AddHandle(handle, signals); 355 return state_->AddHandle(handle, signals);
300 } 356 }
301 357
302 MojoResult WaitSet::RemoveHandle(Handle handle) { 358 MojoResult WaitSet::RemoveHandle(Handle handle) {
303 return state_->RemoveHandle(handle); 359 return state_->RemoveHandle(handle);
304 } 360 }
305 361
306 void WaitSet::Wait(size_t* num_ready_handles, 362 void WaitSet::Wait(base::WaitableEvent** ready_event,
363 size_t* num_ready_handles,
307 Handle* ready_handles, 364 Handle* ready_handles,
308 MojoResult* ready_results, 365 MojoResult* ready_results,
309 MojoHandleSignalsState* signals_states) { 366 MojoHandleSignalsState* signals_states) {
310 state_->Wait(num_ready_handles, ready_handles, ready_results, signals_states); 367 state_->Wait(ready_event, num_ready_handles, ready_handles, ready_results,
368 signals_states);
311 } 369 }
312 370
313 } // namespace mojo 371 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/system/wait_set.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698