Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(140)

Side by Side Diff: mojo/message_pump/message_pump_mojo.cc

Issue 1467953002: Implement MessagePumpMojo using WaitSet. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-implementation
Patch Set: Rebase. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/message_pump/message_pump_mojo.h" 5 #include "mojo/message_pump/message_pump_mojo.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map>
8 #include <vector> 9 #include <vector>
9 10
10 #include "base/debug/alias.h" 11 #include "base/debug/alias.h"
11 #include "base/lazy_instance.h" 12 #include "base/lazy_instance.h"
12 #include "base/logging.h" 13 #include "base/logging.h"
13 #include "base/threading/thread_local.h" 14 #include "base/threading/thread_local.h"
14 #include "base/time/time.h" 15 #include "base/time/time.h"
15 #include "mojo/message_pump/message_pump_mojo_handler.h" 16 #include "mojo/message_pump/message_pump_mojo_handler.h"
16 #include "mojo/message_pump/time_helper.h" 17 #include "mojo/message_pump/time_helper.h"
18 #include "mojo/public/c/system/wait_set.h"
17 19
18 namespace mojo { 20 namespace mojo {
19 namespace common { 21 namespace common {
20 namespace { 22 namespace {
21 23
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky 24 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; 25 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
24 26
25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, 27 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
26 base::TimeTicks now) { 28 base::TimeTicks now) {
27 // The is_null() check matches that of HandleWatcher as well as how 29 // The is_null() check matches that of HandleWatcher as well as how
28 // |delayed_work_time| is used. 30 // |delayed_work_time| is used.
29 if (time_ticks.is_null()) 31 if (time_ticks.is_null())
30 return MOJO_DEADLINE_INDEFINITE; 32 return MOJO_DEADLINE_INDEFINITE;
31 const int64_t delta = (time_ticks - now).InMicroseconds(); 33 const int64_t delta = (time_ticks - now).InMicroseconds();
32 return delta < 0 ? static_cast<MojoDeadline>(0) : 34 return delta < 0 ? static_cast<MojoDeadline>(0) :
33 static_cast<MojoDeadline>(delta); 35 static_cast<MojoDeadline>(delta);
34 } 36 }
35 37
36 } // namespace 38 } // namespace
37 39
38 // State needed for one iteration of WaitMany. The first handle and flags
39 // corresponds to that of the control pipe.
40 struct MessagePumpMojo::WaitState {
41 std::vector<Handle> handles;
42 std::vector<MojoHandleSignals> wait_signals;
43 };
44
45 struct MessagePumpMojo::RunState { 40 struct MessagePumpMojo::RunState {
46 RunState() : should_quit(false) {} 41 RunState() : should_quit(false) {}
47 42
48 base::TimeTicks delayed_work_time; 43 base::TimeTicks delayed_work_time;
49 44
50 bool should_quit; 45 bool should_quit;
51 }; 46 };
52 47
53 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { 48 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
54 DCHECK(!current()) 49 DCHECK(!current())
55 << "There is already a MessagePumpMojo instance on this thread."; 50 << "There is already a MessagePumpMojo instance on this thread.";
56 g_tls_current_pump.Pointer()->Set(this); 51 g_tls_current_pump.Pointer()->Set(this);
57 52
58 MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_); 53 MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_);
59 CHECK_EQ(result, MOJO_RESULT_OK); 54 CHECK_EQ(result, MOJO_RESULT_OK);
60 CHECK(read_handle_.is_valid()); 55 CHECK(read_handle_.is_valid());
61 CHECK(write_handle_.is_valid()); 56 CHECK(write_handle_.is_valid());
57
58 MojoHandle handle;
59 result = MojoCreateWaitSet(&handle);
60 CHECK_EQ(result, MOJO_RESULT_OK);
61 wait_set_handle_.reset(Handle(handle));
62 CHECK(wait_set_handle_.is_valid());
63
64 result = MojoAddHandle(wait_set_handle_.get().value(),
65 read_handle_.get().value(),
66 MOJO_HANDLE_SIGNAL_READABLE);
67 CHECK_EQ(result, MOJO_RESULT_OK);
62 } 68 }
63 69
64 MessagePumpMojo::~MessagePumpMojo() { 70 MessagePumpMojo::~MessagePumpMojo() {
65 DCHECK_EQ(this, current()); 71 DCHECK_EQ(this, current());
66 g_tls_current_pump.Pointer()->Set(NULL); 72 g_tls_current_pump.Pointer()->Set(NULL);
67 } 73 }
68 74
69 // static 75 // static
70 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { 76 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
71 return scoped_ptr<MessagePump>(new MessagePumpMojo()); 77 return scoped_ptr<MessagePump>(new MessagePumpMojo());
(...skipping 15 matching lines...) Expand all
87 Handler handler_data; 93 Handler handler_data;
88 handler_data.handler = handler; 94 handler_data.handler = handler;
89 handler_data.wait_signals = wait_signals; 95 handler_data.wait_signals = wait_signals;
90 handler_data.deadline = deadline; 96 handler_data.deadline = deadline;
91 handler_data.id = next_handler_id_++; 97 handler_data.id = next_handler_id_++;
92 handlers_[handle] = handler_data; 98 handlers_[handle] = handler_data;
93 if (!deadline.is_null()) { 99 if (!deadline.is_null()) {
94 bool inserted = deadline_handles_.insert(handle).second; 100 bool inserted = deadline_handles_.insert(handle).second;
95 DCHECK(inserted); 101 DCHECK(inserted);
96 } 102 }
103
104 MojoResult result = MojoAddHandle(wait_set_handle_.get().value(),
105 handle.value(), wait_signals);
106 // Because stopping a HandleWatcher is now asynchronous, it's possible for the
107 // handle to no longer be open at this point.
108 CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT);
sky 2015/12/11 17:49:01 Can you add test coverage of the add with closed p
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Hm. Here's a question for you. If you add a closed
sky 2015/12/28 17:29:44 Is there a reason you don't want to notify immedia
97 } 109 }
98 110
99 void MessagePumpMojo::RemoveHandler(const Handle& handle) { 111 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
112 MojoResult result = MojoRemoveHandle(wait_set_handle_.get().value(),
113 handle.value());
114 // At this point, it's possible that the handle has been closed, which would
115 // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also
116 // possible for the handle to have already been removed, so all of the
117 // possible error codes are valid here.
118 CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND ||
119 result == MOJO_RESULT_INVALID_ARGUMENT);
120
100 handlers_.erase(handle); 121 handlers_.erase(handle);
101 deadline_handles_.erase(handle); 122 deadline_handles_.erase(handle);
102 } 123 }
103 124
104 void MessagePumpMojo::AddObserver(Observer* observer) { 125 void MessagePumpMojo::AddObserver(Observer* observer) {
105 observers_.AddObserver(observer); 126 observers_.AddObserver(observer);
106 } 127 }
107 128
108 void MessagePumpMojo::RemoveObserver(Observer* observer) { 129 void MessagePumpMojo::RemoveObserver(Observer* observer) {
109 observers_.RemoveObserver(observer); 130 observers_.RemoveObserver(observer);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 if (more_work_is_plausible) 184 if (more_work_is_plausible)
164 continue; 185 continue;
165 186
166 more_work_is_plausible = delegate->DoIdleWork(); 187 more_work_is_plausible = delegate->DoIdleWork();
167 if (run_state->should_quit) 188 if (run_state->should_quit)
168 break; 189 break;
169 } 190 }
170 } 191 }
171 192
172 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { 193 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
173 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; 194 bool did_work = block;
174 const WaitState wait_state = GetWaitState(); 195 if (block) {
175 196 const MojoDeadline deadline = GetDeadlineForWait(run_state);
176 std::vector<MojoHandleSignalsState> states(wait_state.handles.size()); 197 const MojoResult wait_result = Wait(wait_set_handle_.get(),
sky 2015/12/11 17:49:01 How come you don't always call wait?
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Added a comment to address.
177 const WaitManyResult wait_many_result = 198 MOJO_HANDLE_SIGNAL_READABLE,
178 WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states); 199 deadline, nullptr);
179 const MojoResult result = wait_many_result.result; 200 if (wait_result == MOJO_RESULT_OK) {
180 bool did_work = true; 201 // Handles may be ready. Or not since wake-ups can be spurious in certain
181 if (result == MOJO_RESULT_OK) { 202 // circumstances.
182 if (wait_many_result.index == 0) { 203 } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) {
183 if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) { 204 did_work = false;
184 // The Mojo EDK is shutting down. The ThreadQuitHelper task in
185 // base::Thread won't get run since the control pipe depends on the EDK
186 // staying alive. So quit manually to avoid this thread hanging.
187 Quit();
188 } else {
189 // Control pipe was written to.
190 ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
191 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
192 }
193 } else { 205 } else {
194 DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != 206 base::debug::Alias(&wait_result);
195 handlers_.end()); 207 // Unexpected result is likely fatal, crash so we can determine cause.
196 WillSignalHandler(); 208 CHECK(false);
197 handlers_[wait_state.handles[wait_many_result.index]]
198 .handler->OnHandleReady(wait_state.handles[wait_many_result.index]);
199 DidSignalHandler();
200 }
201 } else {
202 switch (result) {
203 case MOJO_RESULT_CANCELLED:
204 case MOJO_RESULT_FAILED_PRECONDITION:
205 case MOJO_RESULT_INVALID_ARGUMENT:
206 RemoveInvalidHandle(wait_state, result, wait_many_result.index);
207 break;
208 case MOJO_RESULT_DEADLINE_EXCEEDED:
209 did_work = false;
210 break;
211 default:
212 base::debug::Alias(&result);
213 // Unexpected result is likely fatal, crash so we can determine cause.
214 CHECK(false);
215 } 209 }
216 } 210 }
217 211
212 const uint32_t kMaxServiced = 8;
sky 2015/12/11 17:49:01 This function is rather lengthy. How about breakin
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Done.
213 uint32_t count = kMaxServiced;
214 MojoResult handle_results[kMaxServiced];
215 MojoHandle handles[kMaxServiced] = {MOJO_HANDLE_INVALID};
216
217 const MojoResult get_result = MojoGetReadyHandles(
218 wait_set_handle_.get().value(),
219 &count, handles, handle_results, nullptr);
220 CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT);
221 if (get_result == MOJO_RESULT_OK) {
222 DCHECK(count);
223 DCHECK_LE(count, kMaxServiced);
224 // Do this in two steps, because notifying a handler may remove/add other
225 // handles that may have also been woken up.
226 // First, enumerate the IDs of the ready handles. Then, iterate over the
227 // handles and only take action if the ID hasn't changed.
228 std::map<Handle, int> ready_handles;
229 for (uint32_t i = 0 ; i < count; i++) {
230 const Handle handle = Handle(handles[i]);
231 // Skip the control handle. It's special.
232 if (handle.value() == read_handle_.get().value())
233 continue;
234 DCHECK(handle.is_valid());
235 const auto it = handlers_.find(handle);
236 // Skip handles that have been removed.
sky 2015/12/11 17:49:01 Under what conditions could this happen? I'm wonde
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Added comment to explain why this is possible. It'
237 if (it == handlers_.end())
238 continue;
239 ready_handles[handle] = it->second.id;
240 }
241
242 for (uint32_t i = 0 ; i < count; i++) {
243 const Handle handle = Handle(handles[i]);
244
245 // If the handle has been removed, or it's ID has changed, skip over it.
246 // If the handle's ID has changed, and it still satisfied its signals,
247 // then it'll be caught in the next message pump iteration.
248 const auto it = handlers_.find(handle);
249 if ((handle.value() != read_handle_.get().value()) &&
250 (it == handlers_.end() || it->second.id != ready_handles[handle])) {
251 continue;
252 }
253
254 switch (handle_results[i]) {
255 case MOJO_RESULT_CANCELLED:
256 case MOJO_RESULT_FAILED_PRECONDITION:
257 DVLOG(1) << "Error: " << handle_results[i]
258 << " handle: " << handle.value();
259 if (handle.value() == read_handle_.get().value()) {
260 // The Mojo EDK is shutting down. The ThreadQuitHelper task in
261 // base::Thread won't get run since the control pipe depends on the
262 // EDK staying alive. So quit manually to avoid this thread hanging.
263 Quit();
264 } else {
265 RemoveInvalidHandle(handle_results[i], handle);
266 }
267 break;
268 case MOJO_RESULT_OK:
269 if (handle.value() == read_handle_.get().value()) {
270 DVLOG(1) << "Signaled control pipe";
271 // Control pipe was written to.
272 ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
sky 2015/12/11 17:49:01 nullptr on these?
Anand Mistry (off Chromium) 2015/12/14 03:16:20 Done.
273 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
274 } else {
275 DVLOG(1) << "Handle ready: " << handle.value();
276 SignalHandleReady(handle);
277 }
278 break;
279 default:
280 base::debug::Alias(&i);
281 base::debug::Alias(&handle_results[i]);
282 // Unexpected result is likely fatal, crash so we can determine cause.
283 CHECK(false);
284 }
285 }
286 did_work = true;
287 }
288
218 // Notify and remove any handlers whose time has expired. First, iterate over 289 // Notify and remove any handlers whose time has expired. First, iterate over
219 // the set of handles that have a deadline, and add the expired handles to a 290 // the set of handles that have a deadline, and add the expired handles to a
220 // map of <Handle, id>. Then, iterate over those expired handles and remove 291 // map of <Handle, id>. Then, iterate over those expired handles and remove
221 // them. The two-step process is because a handler can add/remove new 292 // them. The two-step process is because a handler can add/remove new
222 // handlers. 293 // handlers.
223 std::map<Handle, int> expired_handles; 294 std::map<Handle, int> expired_handles;
224 const base::TimeTicks now(internal::NowTicks()); 295 const base::TimeTicks now(internal::NowTicks());
225 for (const Handle handle : deadline_handles_) { 296 for (const Handle handle : deadline_handles_) {
226 const auto it = handlers_.find(handle); 297 const auto it = handlers_.find(handle);
227 // Expect any handle in |deadline_handles_| to also be in |handlers_| since 298 // Expect any handle in |deadline_handles_| to also be in |handlers_| since
(...skipping 11 matching lines...) Expand all
239 RemoveHandler(pair.first); 310 RemoveHandler(pair.first);
240 WillSignalHandler(); 311 WillSignalHandler();
241 handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED); 312 handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED);
242 DidSignalHandler(); 313 DidSignalHandler();
243 did_work = true; 314 did_work = true;
244 } 315 }
245 } 316 }
246 return did_work; 317 return did_work;
247 } 318 }
248 319
249 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, 320 void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) {
250 MojoResult result,
251 uint32_t index) {
252 // TODO(sky): deal with control pipe going bad. 321 // TODO(sky): deal with control pipe going bad.
253 CHECK(result == MOJO_RESULT_INVALID_ARGUMENT || 322 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
254 result == MOJO_RESULT_FAILED_PRECONDITION || 323 result == MOJO_RESULT_CANCELLED ||
255 result == MOJO_RESULT_CANCELLED); 324 result == MOJO_RESULT_DEADLINE_EXCEEDED);
256 CHECK_NE(index, 0u); // Indicates the control pipe went bad. 325 // Indicates the control pipe went bad.
326 CHECK_NE(handle.value(), read_handle_.get().value());
257 327
258 // Remove the handle first, this way if OnHandleError() tries to remove the 328 // Remove the handle first, this way if OnHandleError() tries to remove the
259 // handle our iterator isn't invalidated. 329 // handle our iterator isn't invalidated.
260 Handle handle = wait_state.handles[index]; 330 if (handlers_.find(handle) == handlers_.end())
sky 2015/12/11 17:49:01 Under what conditions would this hit?
Anand Mistry (off Chromium) 2015/12/14 03:16:20 It did in a previous iteration, but not any more.
261 CHECK(handlers_.find(handle) != handlers_.end()); 331 return;
262 MessagePumpMojoHandler* handler = handlers_[handle].handler; 332 MessagePumpMojoHandler* handler = handlers_[handle].handler;
263 RemoveHandler(handle); 333 RemoveHandler(handle);
264 WillSignalHandler(); 334 WillSignalHandler();
265 handler->OnHandleError(handle, result); 335 handler->OnHandleError(handle, result);
266 DidSignalHandler(); 336 DidSignalHandler();
267 } 337 }
268 338
269 void MessagePumpMojo::SignalControlPipe() { 339 void MessagePumpMojo::SignalControlPipe() {
270 const MojoResult result = 340 const MojoResult result =
271 WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0, 341 WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0,
272 MOJO_WRITE_MESSAGE_FLAG_NONE); 342 MOJO_WRITE_MESSAGE_FLAG_NONE);
273 if (result == MOJO_RESULT_FAILED_PRECONDITION) { 343 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
274 // Mojo EDK is shutting down. 344 // Mojo EDK is shutting down.
275 return; 345 return;
276 } 346 }
277 347
278 // If we can't write we likely won't wake up the thread and there is a strong 348 // If we can't write we likely won't wake up the thread and there is a strong
279 // chance we'll deadlock. 349 // chance we'll deadlock.
280 CHECK_EQ(MOJO_RESULT_OK, result); 350 CHECK_EQ(MOJO_RESULT_OK, result);
281 } 351 }
282 352
283 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const {
284 WaitState wait_state;
285 wait_state.handles.push_back(read_handle_.get());
286 wait_state.wait_signals.push_back(
287 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
288
289 for (HandleToHandler::const_iterator i = handlers_.begin();
290 i != handlers_.end(); ++i) {
291 wait_state.handles.push_back(i->first);
292 wait_state.wait_signals.push_back(i->second.wait_signals);
293 }
294 return wait_state;
295 }
296
297 MojoDeadline MessagePumpMojo::GetDeadlineForWait( 353 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
298 const RunState& run_state) const { 354 const RunState& run_state) const {
299 const base::TimeTicks now(internal::NowTicks()); 355 const base::TimeTicks now(internal::NowTicks());
300 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, 356 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
301 now); 357 now);
302 for (const Handle handle : deadline_handles_) { 358 for (const Handle handle : deadline_handles_) {
303 auto it = handlers_.find(handle); 359 auto it = handlers_.find(handle);
304 DCHECK(it != handlers_.end()); 360 DCHECK(it != handlers_.end());
305 deadline = std::min( 361 deadline = std::min(
306 TimeTicksToMojoDeadline(it->second.deadline, now), deadline); 362 TimeTicksToMojoDeadline(it->second.deadline, now), deadline);
307 } 363 }
308 return deadline; 364 return deadline;
309 } 365 }
310 366
367 void MessagePumpMojo::SignalHandleReady(Handle handle) {
368 DCHECK(handlers_.find(handle) != handlers_.end());
369 WillSignalHandler();
370 handlers_[handle].handler->OnHandleReady(handle);
371 DidSignalHandler();
372 }
373
311 void MessagePumpMojo::WillSignalHandler() { 374 void MessagePumpMojo::WillSignalHandler() {
312 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); 375 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
313 } 376 }
314 377
315 void MessagePumpMojo::DidSignalHandler() { 378 void MessagePumpMojo::DidSignalHandler() {
316 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); 379 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
317 } 380 }
318 381
319 } // namespace common 382 } // namespace common
320 } // namespace mojo 383 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698