Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(418)

Side by Side Diff: mojo/message_pump/message_pump_mojo.cc

Issue 1467953002: Implement MessagePumpMojo using WaitSet. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-implementation
Patch Set: Review reade. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/message_pump/message_pump_mojo.h" 5 #include "mojo/message_pump/message_pump_mojo.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <map>
8 #include <vector> 9 #include <vector>
9 10
10 #include "base/debug/alias.h" 11 #include "base/debug/alias.h"
11 #include "base/lazy_instance.h" 12 #include "base/lazy_instance.h"
12 #include "base/logging.h" 13 #include "base/logging.h"
13 #include "base/threading/thread_local.h" 14 #include "base/threading/thread_local.h"
14 #include "base/time/time.h" 15 #include "base/time/time.h"
15 #include "mojo/message_pump/message_pump_mojo_handler.h" 16 #include "mojo/message_pump/message_pump_mojo_handler.h"
16 #include "mojo/message_pump/time_helper.h" 17 #include "mojo/message_pump/time_helper.h"
18 #include "mojo/public/c/system/wait_set.h"
sky 2015/12/08 18:26:14 It's a bit hard to review this patch without knowi
Anand Mistry (off Chromium) 2015/12/09 00:01:21 Will do. The API is defined in https://codereview
17 19
18 namespace mojo { 20 namespace mojo {
19 namespace common { 21 namespace common {
20 namespace { 22 namespace {
21 23
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky 24 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; 25 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
24 26
25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, 27 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
26 base::TimeTicks now) { 28 base::TimeTicks now) {
27 // The is_null() check matches that of HandleWatcher as well as how 29 // The is_null() check matches that of HandleWatcher as well as how
28 // |delayed_work_time| is used. 30 // |delayed_work_time| is used.
29 if (time_ticks.is_null()) 31 if (time_ticks.is_null())
30 return MOJO_DEADLINE_INDEFINITE; 32 return MOJO_DEADLINE_INDEFINITE;
31 const int64_t delta = (time_ticks - now).InMicroseconds(); 33 const int64_t delta = (time_ticks - now).InMicroseconds();
32 return delta < 0 ? static_cast<MojoDeadline>(0) : 34 return delta < 0 ? static_cast<MojoDeadline>(0) :
33 static_cast<MojoDeadline>(delta); 35 static_cast<MojoDeadline>(delta);
34 } 36 }
35 37
36 } // namespace 38 } // namespace
37 39
38 // State needed for one iteration of WaitMany. The first handle and flags
39 // corresponds to that of the control pipe.
40 struct MessagePumpMojo::WaitState {
41 std::vector<Handle> handles;
42 std::vector<MojoHandleSignals> wait_signals;
43 };
44
45 struct MessagePumpMojo::RunState { 40 struct MessagePumpMojo::RunState {
46 RunState() : should_quit(false) {} 41 RunState() : should_quit(false) {}
47 42
48 base::TimeTicks delayed_work_time; 43 base::TimeTicks delayed_work_time;
49 44
50 bool should_quit; 45 bool should_quit;
51 }; 46 };
52 47
53 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { 48 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
54 DCHECK(!current()) 49 DCHECK(!current())
55 << "There is already a MessagePumpMojo instance on this thread."; 50 << "There is already a MessagePumpMojo instance on this thread.";
56 g_tls_current_pump.Pointer()->Set(this); 51 g_tls_current_pump.Pointer()->Set(this);
57 52
58 MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_); 53 MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_);
59 CHECK_EQ(result, MOJO_RESULT_OK); 54 CHECK_EQ(result, MOJO_RESULT_OK);
60 CHECK(read_handle_.is_valid()); 55 CHECK(read_handle_.is_valid());
61 CHECK(write_handle_.is_valid()); 56 CHECK(write_handle_.is_valid());
57
58 MojoHandle handle;
59 result = MojoCreateWaitSet(&handle);
60 CHECK_EQ(result, MOJO_RESULT_OK);
61 wait_set_handle_.reset(Handle(handle));
62 CHECK(wait_set_handle_.is_valid());
63
64 result = MojoAddHandle(wait_set_handle_.get().value(),
65 read_handle_.get().value(),
66 MOJO_HANDLE_SIGNAL_READABLE);
67 CHECK_EQ(result, MOJO_RESULT_OK);
62 } 68 }
63 69
64 MessagePumpMojo::~MessagePumpMojo() { 70 MessagePumpMojo::~MessagePumpMojo() {
65 DCHECK_EQ(this, current()); 71 DCHECK_EQ(this, current());
66 g_tls_current_pump.Pointer()->Set(NULL); 72 g_tls_current_pump.Pointer()->Set(NULL);
67 } 73 }
68 74
69 // static 75 // static
70 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { 76 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
71 return scoped_ptr<MessagePump>(new MessagePumpMojo()); 77 return scoped_ptr<MessagePump>(new MessagePumpMojo());
(...skipping 15 matching lines...) Expand all
87 Handler handler_data; 93 Handler handler_data;
88 handler_data.handler = handler; 94 handler_data.handler = handler;
89 handler_data.wait_signals = wait_signals; 95 handler_data.wait_signals = wait_signals;
90 handler_data.deadline = deadline; 96 handler_data.deadline = deadline;
91 handler_data.id = next_handler_id_++; 97 handler_data.id = next_handler_id_++;
92 handlers_[handle] = handler_data; 98 handlers_[handle] = handler_data;
93 if (!deadline.is_null()) { 99 if (!deadline.is_null()) {
94 bool inserted = deadline_handles_.insert(handle).second; 100 bool inserted = deadline_handles_.insert(handle).second;
95 DCHECK(inserted); 101 DCHECK(inserted);
96 } 102 }
103
104 MojoResult result = MojoAddHandle(wait_set_handle_.get().value(),
105 handle.value(), wait_signals);
106 // Because stopping a HandleWatcher is now asynchrnous, it's possible for the
107 // handle to no longer be open at this point.
108 CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT);
97 } 109 }
98 110
99 void MessagePumpMojo::RemoveHandler(const Handle& handle) { 111 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
112 MojoResult result = MojoRemoveHandle(wait_set_handle_.get().value(),
113 handle.value());
114 // At this point, it's possible that the handle has been closed, which would
115 // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also
116 // possible for the handle to have already been removed, so all of the
117 // possible error codes are valid here.
118 CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND ||
119 result == MOJO_RESULT_INVALID_ARGUMENT);
120
100 handlers_.erase(handle); 121 handlers_.erase(handle);
101 deadline_handles_.erase(handle); 122 deadline_handles_.erase(handle);
102 } 123 }
103 124
104 void MessagePumpMojo::AddObserver(Observer* observer) { 125 void MessagePumpMojo::AddObserver(Observer* observer) {
105 observers_.AddObserver(observer); 126 observers_.AddObserver(observer);
106 } 127 }
107 128
108 void MessagePumpMojo::RemoveObserver(Observer* observer) { 129 void MessagePumpMojo::RemoveObserver(Observer* observer) {
109 observers_.RemoveObserver(observer); 130 observers_.RemoveObserver(observer);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 if (more_work_is_plausible) 184 if (more_work_is_plausible)
164 continue; 185 continue;
165 186
166 more_work_is_plausible = delegate->DoIdleWork(); 187 more_work_is_plausible = delegate->DoIdleWork();
167 if (run_state->should_quit) 188 if (run_state->should_quit)
168 break; 189 break;
169 } 190 }
170 } 191 }
171 192
172 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { 193 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
173 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; 194 bool did_work = block;
174 const WaitState wait_state = GetWaitState(); 195 if (block) {
196 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
175 197
176 std::vector<MojoHandleSignalsState> states(wait_state.handles.size()); 198
177 const WaitManyResult wait_many_result = 199 const MojoResult wait_result = Wait(wait_set_handle_.get(),
178 WaitMany(wait_state.handles, wait_state.wait_signals, deadline, &states); 200 MOJO_HANDLE_SIGNAL_READABLE,
179 const MojoResult result = wait_many_result.result; 201 deadline, nullptr);
180 bool did_work = true; 202 DVLOG(1) << "Wait result: " << wait_result << ", block: " << block;
181 if (result == MOJO_RESULT_OK) { 203 if (wait_result == MOJO_RESULT_OK) {
182 if (wait_many_result.index == 0) { 204 } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) {
183 if (states[0].satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) { 205 DVLOG(1) << "Deadline expired, block: " << block;
184 // The Mojo EDK is shutting down. The ThreadQuitHelper task in 206 did_work = false;
185 // base::Thread won't get run since the control pipe depends on the EDK
186 // staying alive. So quit manually to avoid this thread hanging.
187 Quit();
188 } else {
189 // Control pipe was written to.
190 ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
191 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
192 }
193 } else { 207 } else {
194 DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != 208 base::debug::Alias(&wait_result);
195 handlers_.end()); 209 // Unexpected result is likely fatal, crash so we can determine cause.
196 WillSignalHandler(); 210 CHECK(false);
197 handlers_[wait_state.handles[wait_many_result.index]]
198 .handler->OnHandleReady(wait_state.handles[wait_many_result.index]);
199 DidSignalHandler();
200 }
201 } else {
202 switch (result) {
203 case MOJO_RESULT_CANCELLED:
204 case MOJO_RESULT_FAILED_PRECONDITION:
205 case MOJO_RESULT_INVALID_ARGUMENT:
206 RemoveInvalidHandle(wait_state, result, wait_many_result.index);
207 break;
208 case MOJO_RESULT_DEADLINE_EXCEEDED:
209 did_work = false;
210 break;
211 default:
212 base::debug::Alias(&result);
213 // Unexpected result is likely fatal, crash so we can determine cause.
214 CHECK(false);
215 } 211 }
216 } 212 }
217 213
214 const uint32_t kMaxServiced = 8;
215 uint32_t count = kMaxServiced;
216 MojoResult handle_results[kMaxServiced];
217 MojoHandle handles[kMaxServiced] = {MOJO_HANDLE_INVALID};
218
219 const MojoResult get_result = MojoGetReadyHandles(
220 wait_set_handle_.get().value(),
221 &count, handles, handle_results, nullptr);
222 CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT);
223 if (get_result == MOJO_RESULT_OK) {
224 DCHECK(count);
225 DCHECK_LE(count, kMaxServiced);
226 // Do this in two steps, because notifying a handler may remove/add other
227 // handles that may have also been woken up.
228 // First, enumerate the IDs of the ready handles. Then, iterate over the
229 // handles and only taking action if the ID hasn't changed.
230 std::map<Handle, int> ready_handles;
231 for (uint32_t i = 0 ; i < count; i++) {
232 const Handle handle = Handle(handles[i]);
233 // Skip the control handle. It's special.
234 if (handle.value() == read_handle_.get().value())
235 continue;
236 CHECK(handle.is_valid());
237 const auto it = handlers_.find(handle);
238 // Skip handles that have been removed.
239 if (it == handlers_.end())
240 continue;
241 ready_handles[handle] = it->second.id;
242 }
243
244 for (uint32_t i = 0 ; i < count; i++) {
245 const Handle handle = Handle(handles[i]);
246
247 // If the handle has been removed, or it's ID has changed, skip over it.
248 // If the handle's ID has changed, and it still satisfied its signals,
249 // then it'll be caught in the next message pump iteration.
250 const auto it = handlers_.find(handle);
251 if ((handle.value() != read_handle_.get().value()) &&
252 (it == handlers_.end() || it->second.id != ready_handles[handle])) {
253 continue;
254 }
255
256 CHECK(handle.is_valid());
257 switch (handle_results[i]) {
258 case MOJO_RESULT_CANCELLED:
259 case MOJO_RESULT_FAILED_PRECONDITION:
260 DVLOG(1) << "Error: " << handle_results[i]
261 << " handle: " << handle.value();
262 if (handle.value() == read_handle_.get().value()) {
263 // The Mojo EDK is shutting down. The ThreadQuitHelper task in
264 // base::Thread won't get run since the control pipe depends on the
265 // EDK staying alive. So quit manually to avoid this thread hanging.
266 Quit();
267 } else {
268 RemoveInvalidHandle(handle_results[i], handle);
269 }
270 break;
271 case MOJO_RESULT_OK:
272 if (handle.value() == read_handle_.get().value()) {
273 DVLOG(1) << "Signaled control pipe";
274 // Control pipe was written to.
275 ReadMessageRaw(read_handle_.get(), NULL, NULL, NULL, NULL,
276 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
277 } else {
278 DVLOG(1) << "Handle ready: " << handle.value();
279 SignalHandleReady(handle);
280 }
281 break;
282 default:
283 base::debug::Alias(&i);
284 base::debug::Alias(&handle_results[i]);
285 // Unexpected result is likely fatal, crash so we can determine cause.
286 CHECK(false);
287 }
288 }
289 did_work = true;
290 } else {
291 DVLOG(1) << "No handles ready, block: " << block;
292 }
293
218 // Notify and remove any handlers whose time has expired. First, iterate over 294 // Notify and remove any handlers whose time has expired. First, iterate over
219 // the set of handles that have a deadline, and add the expired handles to a 295 // the set of handles that have a deadline, and add the expired handles to a
220 // map of <Handle, id>. Then, iterate over those expired handles and remove 296 // map of <Handle, id>. Then, iterate over those expired handles and remove
221 // them. The two-step process is because a handler can add/remove new 297 // them. The two-step process is because a handler can add/remove new
222 // handlers. 298 // handlers.
223 std::map<Handle, int> expired_handles; 299 std::map<Handle, int> expired_handles;
224 const base::TimeTicks now(internal::NowTicks()); 300 const base::TimeTicks now(internal::NowTicks());
225 for (const Handle handle : deadline_handles_) { 301 for (const Handle handle : deadline_handles_) {
226 const auto it = handlers_.find(handle); 302 const auto it = handlers_.find(handle);
227 // Expect any handle in |deadline_handles_| to also be in |handlers_| since 303 // Expect any handle in |deadline_handles_| to also be in |handlers_| since
(...skipping 11 matching lines...) Expand all
239 RemoveHandler(pair.first); 315 RemoveHandler(pair.first);
240 WillSignalHandler(); 316 WillSignalHandler();
241 handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED); 317 handler->OnHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED);
242 DidSignalHandler(); 318 DidSignalHandler();
243 did_work = true; 319 did_work = true;
244 } 320 }
245 } 321 }
246 return did_work; 322 return did_work;
247 } 323 }
248 324
249 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, 325 void MessagePumpMojo::RemoveInvalidHandle(MojoResult result, Handle handle) {
250 MojoResult result,
251 uint32_t index) {
252 // TODO(sky): deal with control pipe going bad. 326 // TODO(sky): deal with control pipe going bad.
253 CHECK(result == MOJO_RESULT_INVALID_ARGUMENT || 327 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
254 result == MOJO_RESULT_FAILED_PRECONDITION || 328 result == MOJO_RESULT_CANCELLED ||
255 result == MOJO_RESULT_CANCELLED); 329 result == MOJO_RESULT_DEADLINE_EXCEEDED);
256 CHECK_NE(index, 0u); // Indicates the control pipe went bad. 330 // Indicates the control pipe went bad.
331 CHECK_NE(handle.value(), read_handle_.get().value());
257 332
258 // Remove the handle first, this way if OnHandleError() tries to remove the 333 // Remove the handle first, this way if OnHandleError() tries to remove the
259 // handle our iterator isn't invalidated. 334 // handle our iterator isn't invalidated.
260 Handle handle = wait_state.handles[index]; 335 if (handlers_.find(handle) == handlers_.end())
261 CHECK(handlers_.find(handle) != handlers_.end()); 336 return;
262 MessagePumpMojoHandler* handler = handlers_[handle].handler; 337 MessagePumpMojoHandler* handler = handlers_[handle].handler;
263 RemoveHandler(handle); 338 RemoveHandler(handle);
264 WillSignalHandler(); 339 WillSignalHandler();
265 handler->OnHandleError(handle, result); 340 handler->OnHandleError(handle, result);
266 DidSignalHandler(); 341 DidSignalHandler();
267 } 342 }
268 343
269 void MessagePumpMojo::SignalControlPipe() { 344 void MessagePumpMojo::SignalControlPipe() {
270 const MojoResult result = 345 const MojoResult result =
271 WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0, 346 WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0,
272 MOJO_WRITE_MESSAGE_FLAG_NONE); 347 MOJO_WRITE_MESSAGE_FLAG_NONE);
273 if (result == MOJO_RESULT_FAILED_PRECONDITION) { 348 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
274 // Mojo EDK is shutting down. 349 // Mojo EDK is shutting down.
275 return; 350 return;
276 } 351 }
277 352
278 // If we can't write we likely won't wake up the thread and there is a strong 353 // If we can't write we likely won't wake up the thread and there is a strong
279 // chance we'll deadlock. 354 // chance we'll deadlock.
280 CHECK_EQ(MOJO_RESULT_OK, result); 355 CHECK_EQ(MOJO_RESULT_OK, result);
281 } 356 }
282 357
283 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState() const {
284 WaitState wait_state;
285 wait_state.handles.push_back(read_handle_.get());
286 wait_state.wait_signals.push_back(
287 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
288
289 for (HandleToHandler::const_iterator i = handlers_.begin();
290 i != handlers_.end(); ++i) {
291 wait_state.handles.push_back(i->first);
292 wait_state.wait_signals.push_back(i->second.wait_signals);
293 }
294 return wait_state;
295 }
296
297 MojoDeadline MessagePumpMojo::GetDeadlineForWait( 358 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
298 const RunState& run_state) const { 359 const RunState& run_state) const {
299 const base::TimeTicks now(internal::NowTicks()); 360 const base::TimeTicks now(internal::NowTicks());
300 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, 361 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
301 now); 362 now);
302 for (const Handle handle : deadline_handles_) { 363 for (const Handle handle : deadline_handles_) {
303 auto it = handlers_.find(handle); 364 auto it = handlers_.find(handle);
304 DCHECK(it != handlers_.end()); 365 DCHECK(it != handlers_.end());
305 deadline = std::min( 366 deadline = std::min(
306 TimeTicksToMojoDeadline(it->second.deadline, now), deadline); 367 TimeTicksToMojoDeadline(it->second.deadline, now), deadline);
307 } 368 }
308 return deadline; 369 return deadline;
309 } 370 }
310 371
372 void MessagePumpMojo::SignalHandleReady(Handle handle) {
373 CHECK(handlers_.find(handle) != handlers_.end());
374 WillSignalHandler();
375 handlers_[handle].handler->OnHandleReady(handle);
376 DidSignalHandler();
377 }
378
311 void MessagePumpMojo::WillSignalHandler() { 379 void MessagePumpMojo::WillSignalHandler() {
312 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); 380 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
313 } 381 }
314 382
315 void MessagePumpMojo::DidSignalHandler() { 383 void MessagePumpMojo::DidSignalHandler() {
316 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); 384 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
317 } 385 }
318 386
319 } // namespace common 387 } // namespace common
320 } // namespace mojo 388 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698