Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(636)

Side by Side Diff: mojo/edk/system/wait_set_dispatcher.cc

Issue 1504733002: Implementation of WaitSet for new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-implementation
Patch Set: Add gyp rules. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/wait_set_dispatcher.h ('k') | mojo/edk/system/wait_set_dispatcher_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "third_party/mojo/src/mojo/edk/system/wait_set_dispatcher.h" 5 #include "mojo/edk/system/wait_set_dispatcher.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "third_party/mojo/src/mojo/edk/system/awakable.h" 11 #include "mojo/edk/system/awakable.h"
12 12
13 namespace mojo { 13 namespace mojo {
14 namespace system { 14 namespace edk {
15 15
16 class WaitSetDispatcher::Waiter final : public Awakable { 16 class WaitSetDispatcher::Waiter final : public Awakable {
17 public: 17 public:
18 explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {} 18 explicit Waiter(WaitSetDispatcher* dispatcher) : dispatcher_(dispatcher) {}
19 ~Waiter() {} 19 ~Waiter() {}
20 20
21 // |Awakable| implementation. 21 // |Awakable| implementation.
22 bool Awake(MojoResult result, uintptr_t context) override { 22 bool Awake(MojoResult result, uintptr_t context) override {
23 // Note: This is called with various Mojo locks held. 23 // Note: This is called with various Mojo locks held.
24 dispatcher_->WakeDispatcher(result, context); 24 dispatcher_->WakeDispatcher(result, context);
25 // Removes |this| from the dispatcher's list of waiters. 25 // Removes |this| from the dispatcher's list of waiters.
26 return false; 26 return false;
27 } 27 }
28 28
29 private: 29 private:
30 WaitSetDispatcher* const dispatcher_; 30 WaitSetDispatcher* const dispatcher_;
31 }; 31 };
32 32
33 WaitSetDispatcher::WaitState::WaitState() {}
34
35 WaitSetDispatcher::WaitState::~WaitState() {}
36
33 WaitSetDispatcher::WaitSetDispatcher() 37 WaitSetDispatcher::WaitSetDispatcher()
34 : waiter_(new WaitSetDispatcher::Waiter(this)) {} 38 : waiter_(new WaitSetDispatcher::Waiter(this)) {}
35 39
36 WaitSetDispatcher::~WaitSetDispatcher() { 40 WaitSetDispatcher::~WaitSetDispatcher() {
37 DCHECK(waiting_dispatchers_.empty()); 41 DCHECK(waiting_dispatchers_.empty());
38 DCHECK(awoken_queue_.empty()); 42 DCHECK(awoken_queue_.empty());
39 DCHECK(processed_dispatchers_.empty()); 43 DCHECK(processed_dispatchers_.empty());
40 } 44 }
41 45
42 Dispatcher::Type WaitSetDispatcher::GetType() const { 46 Dispatcher::Type WaitSetDispatcher::GetType() const {
43 return Type::WAIT_SET; 47 return Type::WAIT_SET;
44 } 48 }
45 49
46 void WaitSetDispatcher::CloseImplNoLock() { 50 void WaitSetDispatcher::CloseImplNoLock() {
47 mutex().AssertHeld(); 51 lock().AssertAcquired();
48 for (const auto& entry : waiting_dispatchers_) 52 for (const auto& entry : waiting_dispatchers_)
49 entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr); 53 entry.second.dispatcher->RemoveAwakable(waiter_.get(), nullptr);
50 waiting_dispatchers_.clear(); 54 waiting_dispatchers_.clear();
51 55
52 MutexLocker locker(&awoken_mutex_); 56 base::AutoLock locker(awoken_lock_);
53 awoken_queue_.clear(); 57 awoken_queue_.clear();
54 processed_dispatchers_.clear(); 58 processed_dispatchers_.clear();
55 } 59 }
56 60
57 MojoResult WaitSetDispatcher::AddWaitingDispatcherImplNoLock( 61 MojoResult WaitSetDispatcher::AddWaitingDispatcherImplNoLock(
58 const scoped_refptr<Dispatcher>& dispatcher, 62 const scoped_refptr<Dispatcher>& dispatcher,
59 MojoHandleSignals signals, 63 MojoHandleSignals signals,
60 uintptr_t context) { 64 uintptr_t context) {
61 mutex().AssertHeld(); 65 lock().AssertAcquired();
62 if (dispatcher == this) 66 if (dispatcher == this)
63 return MOJO_RESULT_INVALID_ARGUMENT; 67 return MOJO_RESULT_INVALID_ARGUMENT;
64 68
65 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); 69 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
66 auto it = waiting_dispatchers_.find(dispatcher_handle); 70 auto it = waiting_dispatchers_.find(dispatcher_handle);
67 if (it != waiting_dispatchers_.end()) { 71 if (it != waiting_dispatchers_.end()) {
68 return MOJO_RESULT_ALREADY_EXISTS; 72 return MOJO_RESULT_ALREADY_EXISTS;
69 } 73 }
70 74
71 const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals, 75 const MojoResult result = dispatcher->AddAwakable(waiter_.get(), signals,
72 dispatcher_handle, nullptr); 76 dispatcher_handle, nullptr);
73 if (result == MOJO_RESULT_INVALID_ARGUMENT) { 77 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
74 // Dispatcher is closed. 78 // Dispatcher is closed.
75 return result; 79 return result;
76 } else if (result != MOJO_RESULT_OK) { 80 } else if (result != MOJO_RESULT_OK) {
77 WakeDispatcher(result, dispatcher_handle); 81 WakeDispatcher(result, dispatcher_handle);
78 } 82 }
79 83
80 WaitState state; 84 WaitState state;
81 state.dispatcher = dispatcher; 85 state.dispatcher = dispatcher;
82 state.context = context; 86 state.context = context;
83 state.signals = signals; 87 state.signals = signals;
84 bool inserted = 88 bool inserted = waiting_dispatchers_.insert(
85 waiting_dispatchers_.insert(std::make_pair(dispatcher_handle, state)) 89 std::make_pair(dispatcher_handle, state)).second;
86 .second;
87 DCHECK(inserted); 90 DCHECK(inserted);
88 91
89 return MOJO_RESULT_OK; 92 return MOJO_RESULT_OK;
90 } 93 }
91 94
92 MojoResult WaitSetDispatcher::RemoveWaitingDispatcherImplNoLock( 95 MojoResult WaitSetDispatcher::RemoveWaitingDispatcherImplNoLock(
93 const scoped_refptr<Dispatcher>& dispatcher) { 96 const scoped_refptr<Dispatcher>& dispatcher) {
94 mutex().AssertHeld(); 97 lock().AssertAcquired();
95 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get()); 98 uintptr_t dispatcher_handle = reinterpret_cast<uintptr_t>(dispatcher.get());
96 auto it = waiting_dispatchers_.find(dispatcher_handle); 99 auto it = waiting_dispatchers_.find(dispatcher_handle);
97 if (it == waiting_dispatchers_.end()) 100 if (it == waiting_dispatchers_.end())
98 return MOJO_RESULT_NOT_FOUND; 101 return MOJO_RESULT_NOT_FOUND;
99 102
100 dispatcher->RemoveAwakable(waiter_.get(), nullptr); 103 dispatcher->RemoveAwakable(waiter_.get(), nullptr);
101 // At this point, it should not be possible for |waiter_| to be woken with 104 // At this point, it should not be possible for |waiter_| to be woken with
102 // |dispatcher|. 105 // |dispatcher|.
103 waiting_dispatchers_.erase(it); 106 waiting_dispatchers_.erase(it);
104 107
105 MutexLocker locker(&awoken_mutex_); 108 base::AutoLock locker(awoken_lock_);
106 int num_erased = 0; 109 int num_erased = 0;
107 for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) { 110 for (auto it = awoken_queue_.begin(); it != awoken_queue_.end();) {
108 if (it->first == dispatcher_handle) { 111 if (it->first == dispatcher_handle) {
109 it = awoken_queue_.erase(it); 112 it = awoken_queue_.erase(it);
110 num_erased++; 113 num_erased++;
111 } else { 114 } else {
112 ++it; 115 ++it;
113 } 116 }
114 } 117 }
115 // The dispatcher should only exist in the queue once. 118 // The dispatcher should only exist in the queue once.
116 DCHECK_LE(num_erased, 1); 119 DCHECK_LE(num_erased, 1);
117 processed_dispatchers_.erase( 120 processed_dispatchers_.erase(
118 std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(), 121 std::remove(processed_dispatchers_.begin(), processed_dispatchers_.end(),
119 dispatcher_handle), 122 dispatcher_handle),
120 processed_dispatchers_.end()); 123 processed_dispatchers_.end());
121 124
122 return MOJO_RESULT_OK; 125 return MOJO_RESULT_OK;
123 } 126 }
124 127
125 MojoResult WaitSetDispatcher::GetReadyDispatchersImplNoLock( 128 MojoResult WaitSetDispatcher::GetReadyDispatchersImplNoLock(
126 UserPointer<uint32_t> count, 129 uint32_t* count,
127 DispatcherVector* dispatchers, 130 DispatcherVector* dispatchers,
128 UserPointer<MojoResult> results, 131 MojoResult* results,
129 UserPointer<uintptr_t> contexts) { 132 uintptr_t* contexts) {
130 mutex().AssertHeld(); 133 lock().AssertAcquired();
131 dispatchers->clear(); 134 dispatchers->clear();
132 135
133 // Re-queue any already retrieved dispatchers. These should be the dispatchers 136 // Re-queue any already retrieved dispatchers. These should be the dispatchers
134 // that were returned on the last call to this function. This loop is 137 // that were returned on the last call to this function. This loop is
135 // necessary to preserve the logically level-triggering behaviour of waiting 138 // necessary to preserve the logically level-triggering behaviour of waiting
136 // in Mojo. In particular, if no action is taken on a signal, that signal 139 // in Mojo. In particular, if no action is taken on a signal, that signal
137 // continues to be satisfied, and therefore a |MojoWait()| on that 140 // continues to be satisfied, and therefore a |MojoWait()| on that
138 // handle/signal continues to return immediately. 141 // handle/signal continues to return immediately.
139 std::deque<uintptr_t> pending; 142 std::deque<uintptr_t> pending;
140 { 143 {
141 MutexLocker locker(&awoken_mutex_); 144 base::AutoLock locker(awoken_lock_);
142 pending.swap(processed_dispatchers_); 145 pending.swap(processed_dispatchers_);
143 } 146 }
144 for (uintptr_t d : pending) { 147 for (uintptr_t d : pending) {
145 auto it = waiting_dispatchers_.find(d); 148 auto it = waiting_dispatchers_.find(d);
146 // Anything in |processed_dispatchers_| should also be in 149 // Anything in |processed_dispatchers_| should also be in
147 // |waiting_dispatchers_| since dispatchers are removed from both in 150 // |waiting_dispatchers_| since dispatchers are removed from both in
148 // |RemoveWaitingDispatcherImplNoLock()|. 151 // |RemoveWaitingDispatcherImplNoLock()|.
149 DCHECK(it != waiting_dispatchers_.end()); 152 DCHECK(it != waiting_dispatchers_.end());
150 153
151 // |awoken_mutex_| cannot be held here because 154 // |awoken_mutex_| cannot be held here because
152 // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This 155 // |Dispatcher::AddAwakable()| acquires the Dispatcher's mutex. This
153 // mutex is held while running |WakeDispatcher()| below, which needs to 156 // mutex is held while running |WakeDispatcher()| below, which needs to
154 // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in 157 // acquire |awoken_mutex_|. Holding |awoken_mutex_| here would result in
155 // a deadlock. 158 // a deadlock.
156 const MojoResult result = it->second.dispatcher->AddAwakable( 159 const MojoResult result = it->second.dispatcher->AddAwakable(
157 waiter_.get(), it->second.signals, d, nullptr); 160 waiter_.get(), it->second.signals, d, nullptr);
158 161
159 if (result == MOJO_RESULT_INVALID_ARGUMENT) { 162 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
160 // Dispatcher is closed. Implicitly remove it from the wait set since 163 // Dispatcher is closed. Implicitly remove it from the wait set since
161 // it may be impossible to remove using |MojoRemoveHandle()|. 164 // it may be impossible to remove using |MojoRemoveHandle()|.
162 waiting_dispatchers_.erase(it); 165 waiting_dispatchers_.erase(it);
163 } else if (result != MOJO_RESULT_OK) { 166 } else if (result != MOJO_RESULT_OK) {
164 WakeDispatcher(result, d); 167 WakeDispatcher(result, d);
165 } 168 }
166 } 169 }
167 170
168 const uint32_t max_woken = count.Get(); 171 const uint32_t max_woken = *count;
169 uint32_t num_woken = 0; 172 uint32_t num_woken = 0;
170 173
171 MutexLocker locker(&awoken_mutex_); 174 base::AutoLock locker(awoken_lock_);
172 while (!awoken_queue_.empty() && num_woken < max_woken) { 175 while (!awoken_queue_.empty() && num_woken < max_woken) {
173 uintptr_t d = awoken_queue_.front().first; 176 uintptr_t d = awoken_queue_.front().first;
174 MojoResult result = awoken_queue_.front().second; 177 MojoResult result = awoken_queue_.front().second;
175 awoken_queue_.pop_front(); 178 awoken_queue_.pop_front();
176 179
177 auto it = waiting_dispatchers_.find(d); 180 auto it = waiting_dispatchers_.find(d);
178 DCHECK(it != waiting_dispatchers_.end()); 181 DCHECK(it != waiting_dispatchers_.end());
179 182
180 results.At(num_woken).Put(result); 183 results[num_woken] = result;
181 dispatchers->push_back(it->second.dispatcher); 184 dispatchers->push_back(it->second.dispatcher);
182 if (!contexts.IsNull()) 185 if (contexts)
183 contexts.At(num_woken).Put(it->second.context); 186 contexts[num_woken] = it->second.context;
184 187
185 if (result != MOJO_RESULT_CANCELLED) { 188 if (result != MOJO_RESULT_CANCELLED) {
186 processed_dispatchers_.push_back(d); 189 processed_dispatchers_.push_back(d);
187 } else { 190 } else {
191 // |MOJO_RESULT_CANCELLED| indicates that the dispatcher was closed.
192 // Return it, but also implcitly remove it from the wait set.
188 waiting_dispatchers_.erase(it); 193 waiting_dispatchers_.erase(it);
189 } 194 }
190 195
191 num_woken++; 196 num_woken++;
192 } 197 }
193 198
194 count.Put(num_woken); 199 *count = num_woken;
195 if (!num_woken) 200 if (!num_woken)
196 return MOJO_RESULT_SHOULD_WAIT; 201 return MOJO_RESULT_SHOULD_WAIT;
197 202
198 return MOJO_RESULT_OK; 203 return MOJO_RESULT_OK;
199 } 204 }
200 205
201 void WaitSetDispatcher::CancelAllAwakablesNoLock() { 206 void WaitSetDispatcher::CancelAllAwakablesNoLock() {
202 mutex().AssertHeld(); 207 lock().AssertAcquired();
203 MutexLocker locker(&awakable_mutex_); 208 base::AutoLock locker(awakable_lock_);
204 awakable_list_.CancelAll(); 209 awakable_list_.CancelAll();
205 } 210 }
206 211
207 MojoResult WaitSetDispatcher::AddAwakableImplNoLock( 212 MojoResult WaitSetDispatcher::AddAwakableImplNoLock(
208 Awakable* awakable, 213 Awakable* awakable,
209 MojoHandleSignals signals, 214 MojoHandleSignals signals,
210 uintptr_t context, 215 uintptr_t context,
211 HandleSignalsState* signals_state) { 216 HandleSignalsState* signals_state) {
212 mutex().AssertHeld(); 217 lock().AssertAcquired();
213 218
214 HandleSignalsState state(GetHandleSignalsStateImplNoLock()); 219 HandleSignalsState state(GetHandleSignalsStateImplNoLock());
215 if (state.satisfies(signals)) { 220 if (state.satisfies(signals)) {
216 if (signals_state) 221 if (signals_state)
217 *signals_state = state; 222 *signals_state = state;
218 return MOJO_RESULT_ALREADY_EXISTS; 223 return MOJO_RESULT_ALREADY_EXISTS;
219 } 224 }
220 if (!state.can_satisfy(signals)) { 225 if (!state.can_satisfy(signals)) {
221 if (signals_state) 226 if (signals_state)
222 *signals_state = state; 227 *signals_state = state;
223 return MOJO_RESULT_FAILED_PRECONDITION; 228 return MOJO_RESULT_FAILED_PRECONDITION;
224 } 229 }
225 230
226 MutexLocker locker(&awakable_mutex_); 231 base::AutoLock locker(awakable_lock_);
227 awakable_list_.Add(awakable, signals, context); 232 awakable_list_.Add(awakable, signals, context);
228 return MOJO_RESULT_OK; 233 return MOJO_RESULT_OK;
229 } 234 }
230 235
231 void WaitSetDispatcher::RemoveAwakableImplNoLock( 236 void WaitSetDispatcher::RemoveAwakableImplNoLock(
232 Awakable* awakable, 237 Awakable* awakable,
233 HandleSignalsState* signals_state) { 238 HandleSignalsState* signals_state) {
234 mutex().AssertHeld(); 239 lock().AssertAcquired();
235 MutexLocker locker(&awakable_mutex_); 240 base::AutoLock locker(awakable_lock_);
236 awakable_list_.Remove(awakable); 241 awakable_list_.Remove(awakable);
237 if (signals_state) 242 if (signals_state)
238 *signals_state = GetHandleSignalsStateImplNoLock(); 243 *signals_state = GetHandleSignalsStateImplNoLock();
239 } 244 }
240 245
241 HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateImplNoLock() const { 246 HandleSignalsState WaitSetDispatcher::GetHandleSignalsStateImplNoLock() const {
242 mutex().AssertHeld(); 247 lock().AssertAcquired();
243 HandleSignalsState rv; 248 HandleSignalsState rv;
244 rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; 249 rv.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
245 MutexLocker locker(&awoken_mutex_); 250 base::AutoLock locker(awoken_lock_);
246 if (!awoken_queue_.empty() || !processed_dispatchers_.empty()) 251 if (!awoken_queue_.empty() || !processed_dispatchers_.empty())
247 rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; 252 rv.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
248 return rv; 253 return rv;
249 } 254 }
250 255
251 scoped_refptr<Dispatcher> 256 scoped_refptr<Dispatcher>
252 WaitSetDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { 257 WaitSetDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
253 mutex().AssertHeld(); 258 lock().AssertAcquired();
254 LOG(ERROR) << "Attempting to serialize WaitSet"; 259 LOG(ERROR) << "Attempting to serialize WaitSet";
255 CloseImplNoLock(); 260 CloseImplNoLock();
256 return new WaitSetDispatcher(); 261 return new WaitSetDispatcher();
257 } 262 }
258 263
259 void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) { 264 void WaitSetDispatcher::WakeDispatcher(MojoResult result, uintptr_t context) {
260 { 265 {
261 MutexLocker locker(&awoken_mutex_); 266 base::AutoLock locker(awoken_lock_);
262 267
263 if (result == MOJO_RESULT_ALREADY_EXISTS) 268 if (result == MOJO_RESULT_ALREADY_EXISTS)
264 result = MOJO_RESULT_OK; 269 result = MOJO_RESULT_OK;
265 270
266 awoken_queue_.push_back(std::make_pair(context, result)); 271 awoken_queue_.push_back(std::make_pair(context, result));
267 } 272 }
268 273
269 MutexLocker locker(&awakable_mutex_); 274 base::AutoLock locker(awakable_lock_);
270 HandleSignalsState signals_state; 275 HandleSignalsState signals_state;
271 signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE; 276 signals_state.satisfiable_signals = MOJO_HANDLE_SIGNAL_READABLE;
272 signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE; 277 signals_state.satisfied_signals = MOJO_HANDLE_SIGNAL_READABLE;
273 awakable_list_.AwakeForStateChange(signals_state); 278 awakable_list_.AwakeForStateChange(signals_state);
274 } 279 }
275 280
276 } // namespace system 281 } // namespace edk
277 } // namespace mojo 282 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/wait_set_dispatcher.h ('k') | mojo/edk/system/wait_set_dispatcher_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698