Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(243)

Side by Side Diff: mojo/message_pump/message_pump_mojo.cc

Issue 1566573002: Fix race on mojo message pump shutdown. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@mojo-waitset-message-pump
Patch Set: Revert stuff. Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/message_pump/message_pump_mojo.h" 5 #include "mojo/message_pump/message_pump_mojo.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <map> 10 #include <map>
11 #include <vector> 11 #include <vector>
12 12
13 #include "base/containers/small_map.h" 13 #include "base/containers/small_map.h"
14 #include "base/debug/alias.h" 14 #include "base/debug/alias.h"
15 #include "base/lazy_instance.h" 15 #include "base/lazy_instance.h"
16 #include "base/logging.h" 16 #include "base/logging.h"
17 #include "base/threading/thread_local.h" 17 #include "base/threading/thread_local.h"
18 #include "base/threading/thread_restrictions.h"
18 #include "base/time/time.h" 19 #include "base/time/time.h"
19 #include "mojo/message_pump/message_pump_mojo_handler.h" 20 #include "mojo/message_pump/message_pump_mojo_handler.h"
20 #include "mojo/message_pump/time_helper.h" 21 #include "mojo/message_pump/time_helper.h"
21 #include "mojo/public/c/system/wait_set.h" 22 #include "mojo/public/c/system/wait_set.h"
22 23
23 namespace mojo { 24 namespace mojo {
24 namespace common { 25 namespace common {
25 namespace { 26 namespace {
26 27
27 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky 28 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
(...skipping 13 matching lines...) Expand all
41 } // namespace 42 } // namespace
42 43
43 struct MessagePumpMojo::RunState { 44 struct MessagePumpMojo::RunState {
44 RunState() : should_quit(false) {} 45 RunState() : should_quit(false) {}
45 46
46 base::TimeTicks delayed_work_time; 47 base::TimeTicks delayed_work_time;
47 48
48 bool should_quit; 49 bool should_quit;
49 }; 50 };
50 51
51 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { 52 MessagePumpMojo::MessagePumpMojo()
53 : run_state_(NULL), next_handler_id_(0), event_(false, false) {
52 DCHECK(!current()) 54 DCHECK(!current())
53 << "There is already a MessagePumpMojo instance on this thread."; 55 << "There is already a MessagePumpMojo instance on this thread.";
54 g_tls_current_pump.Pointer()->Set(this); 56 g_tls_current_pump.Pointer()->Set(this);
55 57
56 MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_); 58 MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_);
57 CHECK_EQ(result, MOJO_RESULT_OK); 59 CHECK_EQ(result, MOJO_RESULT_OK);
58 CHECK(read_handle_.is_valid()); 60 CHECK(read_handle_.is_valid());
59 CHECK(write_handle_.is_valid()); 61 CHECK(write_handle_.is_valid());
60 62
61 MojoHandle handle; 63 MojoHandle handle;
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 base::AutoLock auto_lock(run_state_lock_); 165 base::AutoLock auto_lock(run_state_lock_);
164 if (!run_state_) 166 if (!run_state_)
165 return; 167 return;
166 run_state_->delayed_work_time = delayed_work_time; 168 run_state_->delayed_work_time = delayed_work_time;
167 } 169 }
168 170
169 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) { 171 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
170 bool more_work_is_plausible = true; 172 bool more_work_is_plausible = true;
171 for (;;) { 173 for (;;) {
172 const bool block = !more_work_is_plausible; 174 const bool block = !more_work_is_plausible;
173 more_work_is_plausible = DoInternalWork(*run_state, block); 175 if (read_handle_.is_valid()) {
176 more_work_is_plausible = DoInternalWork(*run_state, block);
177 } else {
178 more_work_is_plausible = DoNonMojoWork(*run_state, block);
179 }
174 180
175 if (run_state->should_quit) 181 if (run_state->should_quit)
176 break; 182 break;
177 183
178 more_work_is_plausible |= delegate->DoWork(); 184 more_work_is_plausible |= delegate->DoWork();
179 if (run_state->should_quit) 185 if (run_state->should_quit)
180 break; 186 break;
181 187
182 more_work_is_plausible |= delegate->DoDelayedWork( 188 more_work_is_plausible |= delegate->DoDelayedWork(
183 &run_state->delayed_work_time); 189 &run_state->delayed_work_time);
(...skipping 18 matching lines...) Expand all
202 // unnecessary work. 208 // unnecessary work.
203 did_work = WaitForReadyHandles(run_state); 209 did_work = WaitForReadyHandles(run_state);
204 } 210 }
205 211
206 did_work |= ProcessReadyHandles(); 212 did_work |= ProcessReadyHandles();
207 did_work |= RemoveExpiredHandles(); 213 did_work |= RemoveExpiredHandles();
208 214
209 return did_work; 215 return did_work;
210 } 216 }
211 217
218 bool MessagePumpMojo::DoNonMojoWork(const RunState& run_state, bool block) {
219 bool did_work = block;
220 if (block) {
221 const MojoDeadline deadline = GetDeadlineForWait(run_state);
222 // Stolen from base/message_loop/message_pump_default.cc
223 base::ThreadRestrictions::ScopedAllowWait allow_wait;
224 if (deadline == MOJO_DEADLINE_INDEFINITE) {
225 event_.Wait();
226 } else {
227 if (deadline > 0) {
228 event_.TimedWait(base::TimeDelta::FromMicroseconds(deadline));
229 } else {
230 did_work = false;
231 }
232 }
233 // Since event_ is auto-reset, we don't need to do anything special here
234 // other than service each delegate method.
235 }
236
237 did_work |= RemoveExpiredHandles();
238
239 return did_work;
240 }
241
212 bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const { 242 bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const {
213 const MojoDeadline deadline = GetDeadlineForWait(run_state); 243 const MojoDeadline deadline = GetDeadlineForWait(run_state);
214 const MojoResult wait_result = Wait( 244 const MojoResult wait_result = Wait(
215 wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr); 245 wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
216 if (wait_result == MOJO_RESULT_OK) { 246 if (wait_result == MOJO_RESULT_OK) {
217 // Handles may be ready. Or not since wake-ups can be spurious in certain 247 // Handles may be ready. Or not since wake-ups can be spurious in certain
218 // circumstances. 248 // circumstances.
219 return true; 249 return true;
220 } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) { 250 } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) {
221 return false; 251 return false;
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
283 (it == handlers_.end() || it->second.id != ready_handles[handle])) { 313 (it == handlers_.end() || it->second.id != ready_handles[handle])) {
284 continue; 314 continue;
285 } 315 }
286 316
287 switch (handle_results[i]) { 317 switch (handle_results[i]) {
288 case MOJO_RESULT_CANCELLED: 318 case MOJO_RESULT_CANCELLED:
289 case MOJO_RESULT_FAILED_PRECONDITION: 319 case MOJO_RESULT_FAILED_PRECONDITION:
290 DVLOG(1) << "Error: " << handle_results[i] 320 DVLOG(1) << "Error: " << handle_results[i]
291 << " handle: " << handle.value(); 321 << " handle: " << handle.value();
292 if (handle.value() == read_handle_.get().value()) { 322 if (handle.value() == read_handle_.get().value()) {
293 // The Mojo EDK is shutting down. The ThreadQuitHelper task in 323 // The Mojo EDK is shutting down. We can't just quit the message pump
294 // base::Thread won't get run since the control pipe depends on the 324 // because that may cause the thread to quit, which causes the
295 // EDK staying alive. So quit manually to avoid this thread hanging. 325 // thread's MessageLoop to be destroyed, which races with any use of
296 Quit(); 326 // |Thread::task_runner()|. So instead, we enter a "dumb" mode which
327 // bypasses Mojo and just acts like a trivial message pump. That way,
328 // we can wait for the usual thread exiting mechanism to happen.
329 // The dumb mode is indicated by releasing the control pipe's read
330 // handle.
331 read_handle_.reset();
297 } else { 332 } else {
298 RemoveInvalidHandle(handle_results[i], handle); 333 RemoveInvalidHandle(handle_results[i], handle);
299 } 334 }
300 break; 335 break;
301 case MOJO_RESULT_OK: 336 case MOJO_RESULT_OK:
302 if (handle.value() == read_handle_.get().value()) { 337 if (handle.value() == read_handle_.get().value()) {
303 DVLOG(1) << "Signaled control pipe"; 338 DVLOG(1) << "Signaled control pipe";
304 // Control pipe was written to. 339 // Control pipe was written to.
305 ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, nullptr, 340 ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, nullptr,
306 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); 341 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
368 } 403 }
369 return removed; 404 return removed;
370 } 405 }
371 406
372 void MessagePumpMojo::SignalControlPipe() { 407 void MessagePumpMojo::SignalControlPipe() {
373 const MojoResult result = 408 const MojoResult result =
374 WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0, 409 WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0,
375 MOJO_WRITE_MESSAGE_FLAG_NONE); 410 MOJO_WRITE_MESSAGE_FLAG_NONE);
376 if (result == MOJO_RESULT_FAILED_PRECONDITION) { 411 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
377 // Mojo EDK is shutting down. 412 // Mojo EDK is shutting down.
413 event_.Signal();
378 return; 414 return;
379 } 415 }
380 416
381 // If we can't write we likely won't wake up the thread and there is a strong 417 // If we can't write we likely won't wake up the thread and there is a strong
382 // chance we'll deadlock. 418 // chance we'll deadlock.
383 CHECK_EQ(MOJO_RESULT_OK, result); 419 CHECK_EQ(MOJO_RESULT_OK, result);
384 } 420 }
385 421
386 MojoDeadline MessagePumpMojo::GetDeadlineForWait( 422 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
387 const RunState& run_state) const { 423 const RunState& run_state) const {
(...skipping 19 matching lines...) Expand all
407 void MessagePumpMojo::WillSignalHandler() { 443 void MessagePumpMojo::WillSignalHandler() {
408 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); 444 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
409 } 445 }
410 446
411 void MessagePumpMojo::DidSignalHandler() { 447 void MessagePumpMojo::DidSignalHandler() {
412 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); 448 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
413 } 449 }
414 450
415 } // namespace common 451 } // namespace common
416 } // namespace mojo 452 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698