Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(640)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 2754143005: Use WaitableEvents to wake up sync IPC waiting (Closed)
Patch Set: docs Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/lazy_instance.h" 13 #include "base/lazy_instance.h"
14 #include "base/location.h" 14 #include "base/location.h"
15 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/macros.h" 16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h" 17 #include "base/memory/ptr_util.h"
18 #include "base/run_loop.h" 18 #include "base/run_loop.h"
19 #include "base/synchronization/waitable_event.h" 19 #include "base/synchronization/waitable_event.h"
20 #include "base/threading/thread_local.h" 20 #include "base/threading/thread_local.h"
21 #include "base/threading/thread_task_runner_handle.h" 21 #include "base/threading/thread_task_runner_handle.h"
22 #include "base/trace_event/trace_event.h" 22 #include "base/trace_event/trace_event.h"
23 #include "ipc/ipc_channel_factory.h" 23 #include "ipc/ipc_channel_factory.h"
24 #include "ipc/ipc_logging.h" 24 #include "ipc/ipc_logging.h"
25 #include "ipc/ipc_message_macros.h" 25 #include "ipc/ipc_message_macros.h"
26 #include "ipc/ipc_sync_message.h" 26 #include "ipc/ipc_sync_message.h"
27 #include "ipc/mojo_event.h" 27 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
29 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
30 28
31 using base::WaitableEvent; 29 using base::WaitableEvent;
32 30
33 namespace IPC { 31 namespace IPC {
34 32
35 namespace { 33 namespace {
36 34
37 // A generic callback used when watching handles synchronously. Sets |*signal| 35 // A generic callback used when watching handles synchronously. Sets |*signal|
38 // to true. Also sets |*error| to true in case of an error. 36 // to true.
39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { 37 void OnEventReady(bool* signal) {
40 *signal = true; 38 *signal = true;
41 *error = result != MOJO_RESULT_OK;
42 } 39 }
43 40
44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result 41 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
45 // (DCHECKs, but is only used in cases where failure should be impossible) and 42 g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
46 // runs |callback|.
47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
48 DCHECK_EQ(result, MOJO_RESULT_OK);
49 callback.Run();
50 }
51
52 class PumpMessagesEvent {
53 public:
54 PumpMessagesEvent() { event_.Signal(); }
55 ~PumpMessagesEvent() {}
56
57 const MojoEvent* event() const { return &event_; }
58
59 private:
60 MojoEvent event_;
61
62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
63 };
64
65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
66 LAZY_INSTANCE_INITIALIZER;
67 43
68 } // namespace 44 } // namespace
69 45
70 // When we're blocked in a Send(), we need to process incoming synchronous 46 // When we're blocked in a Send(), we need to process incoming synchronous
71 // messages right away because it could be blocking our reply (either 47 // messages right away because it could be blocking our reply (either
72 // directly from the same object we're calling, or indirectly through one or 48 // directly from the same object we're calling, or indirectly through one or
73 // more other channels). That means that in SyncContext's OnMessageReceived, 49 // more other channels). That means that in SyncContext's OnMessageReceived,
74 // we need to process sync message right away if we're blocked. However a 50 // we need to process sync message right away if we're blocked. However a
75 // simple check isn't sufficient, because the listener thread can be in the 51 // simple check isn't sufficient, because the listener thread can be in the
76 // process of calling Send. 52 // process of calling Send.
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
201 } 177 }
202 } 178 }
203 179
204 if (--listener_count_ == 0) { 180 if (--listener_count_ == 0) {
205 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 181 DCHECK(lazy_tls_ptr_.Pointer()->Get());
206 lazy_tls_ptr_.Pointer()->Set(nullptr); 182 lazy_tls_ptr_.Pointer()->Set(nullptr);
207 sync_dispatch_watcher_.reset(); 183 sync_dispatch_watcher_.reset();
208 } 184 }
209 } 185 }
210 186
211 MojoEvent* dispatch_event() { return &dispatch_event_; } 187 base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
212 base::SingleThreadTaskRunner* listener_task_runner() { 188 base::SingleThreadTaskRunner* listener_task_runner() {
213 return listener_task_runner_.get(); 189 return listener_task_runner_.get();
214 } 190 }
215 191
216 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 192 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
217 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: 193 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>::
218 DestructorAtExit lazy_tls_ptr_; 194 DestructorAtExit lazy_tls_ptr_;
219 195
220 // Called on the ipc thread to check if we can unblock any current Send() 196 // Called on the ipc thread to check if we can unblock any current Send()
221 // calls based on a queued reply. 197 // calls based on a queued reply.
222 void DispatchReplies() { 198 void DispatchReplies() {
223 for (size_t i = 0; i < received_replies_.size(); ++i) { 199 for (size_t i = 0; i < received_replies_.size(); ++i) {
224 Message* message = received_replies_[i].message; 200 Message* message = received_replies_[i].message;
225 if (received_replies_[i].context->TryToUnblockListener(message)) { 201 if (received_replies_[i].context->TryToUnblockListener(message)) {
226 delete message; 202 delete message;
227 received_replies_.erase(received_replies_.begin() + i); 203 received_replies_.erase(received_replies_.begin() + i);
228 return; 204 return;
229 } 205 }
230 } 206 }
231 } 207 }
232 208
233 mojo::SimpleWatcher* top_send_done_watcher() { 209 // See SyncChannel::WaitForReplyWithNestedMessageLoop for details.
234 return top_send_done_watcher_; 210 void SetTopSendDoneState(
235 } 211 base::WaitableEventWatcher* watcher,
236 212 base::WaitableEvent* event,
237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { 213 const base::WaitableEventWatcher::EventCallback& callback,
214 base::WaitableEventWatcher** outer_watcher,
215 base::WaitableEvent** outer_event,
216 base::WaitableEventWatcher::EventCallback* outer_callback) {
217 if (outer_watcher) {
218 DCHECK(outer_event && outer_callback);
219 *outer_watcher = top_send_done_watcher_;
220 *outer_event = top_send_done_event_;
221 *outer_callback = top_send_done_callback_;
222 }
238 top_send_done_watcher_ = watcher; 223 top_send_done_watcher_ = watcher;
224 top_send_done_event_ = event;
225 top_send_done_callback_ = callback;
239 } 226 }
240 227
241 private: 228 private:
242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 229 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
243 230
244 // See the comment in SyncChannel::SyncChannel for why this event is created 231 // See the comment in SyncChannel::SyncChannel for why this event is created
245 // as manual reset. 232 // as manual reset.
246 ReceivedSyncMsgQueue() 233 ReceivedSyncMsgQueue()
247 : message_queue_version_(0), 234 : message_queue_version_(0),
235 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
236 base::WaitableEvent::InitialState::NOT_SIGNALED),
248 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 237 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
249 task_pending_(false), 238 sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>(
250 listener_count_(0), 239 &dispatch_event_,
251 top_send_done_watcher_(nullptr) { 240 base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
252 sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( 241 base::Unretained(this)))) {
253 dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
254 base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady,
255 base::Unretained(this))));
256 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 242 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
257 } 243 }
258 244
259 ~ReceivedSyncMsgQueue() {} 245 ~ReceivedSyncMsgQueue() {}
260 246
261 void OnDispatchHandleReady(MojoResult result) { 247 void OnDispatchEventReady() {
262 if (result != MOJO_RESULT_OK)
263 return;
264
265 if (dispatch_flag_) { 248 if (dispatch_flag_) {
266 *dispatch_flag_ = true; 249 *dispatch_flag_ = true;
267 return; 250 return;
268 } 251 }
269 252
270 // We were woken up during a sync wait, but no specific SyncChannel is 253 // We were woken up during a sync wait, but no specific SyncChannel is
271 // currently waiting. i.e., some other Mojo interface on this thread is 254 // currently waiting. i.e., some other Mojo interface on this thread is
272 // waiting for a response. Since we don't support anything analogous to 255 // waiting for a response. Since we don't support anything analogous to
273 // restricted dispatch on Mojo interfaces, in this case it's safe to 256 // restricted dispatch on Mojo interfaces, in this case it's safe to
274 // dispatch sync messages for any context. 257 // dispatch sync messages for any context.
275 DispatchMessages(nullptr); 258 DispatchMessages(nullptr);
276 } 259 }
277 260
278 // Holds information about a queued synchronous message or reply. 261 // Holds information about a queued synchronous message or reply.
279 struct QueuedMessage { 262 struct QueuedMessage {
280 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 263 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
281 Message* message; 264 Message* message;
282 scoped_refptr<SyncChannel::SyncContext> context; 265 scoped_refptr<SyncChannel::SyncContext> context;
283 }; 266 };
284 267
285 typedef std::list<QueuedMessage> SyncMessageQueue; 268 typedef std::list<QueuedMessage> SyncMessageQueue;
286 SyncMessageQueue message_queue_; 269 SyncMessageQueue message_queue_;
287 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan 270
271 // Used to signal DispatchMessages to rescan
272 uint32_t message_queue_version_ = 0;
288 273
289 std::vector<QueuedMessage> received_replies_; 274 std::vector<QueuedMessage> received_replies_;
290 275
291 // Signaled when we get a synchronous message that we must respond to, as the 276 // Signaled when we get a synchronous message that we must respond to, as the
292 // sender needs its reply before it can reply to our original synchronous 277 // sender needs its reply before it can reply to our original synchronous
293 // message. 278 // message.
294 MojoEvent dispatch_event_; 279 base::WaitableEvent dispatch_event_;
295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 280 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
296 base::Lock message_lock_; 281 base::Lock message_lock_;
297 bool task_pending_; 282 bool task_pending_ = false;
298 int listener_count_; 283 int listener_count_ = 0;
299 284
300 // The current send done handle watcher for this thread. Used to maintain 285 // The current send-done state for this thread. Used to maintain a thread-
301 // a thread-local stack of send done watchers to ensure that nested sync 286 // local stack of state to ensure that nested sync message loops complete
302 // message loops complete correctly. 287 // correctly.
303 mojo::SimpleWatcher* top_send_done_watcher_; 288 base::WaitableEventWatcher* top_send_done_watcher_ = nullptr;
289 base::WaitableEvent* top_send_done_event_ = nullptr;
290 base::WaitableEventWatcher::EventCallback top_send_done_callback_;
304 291
305 // If not null, the address of a flag to set when the dispatch event signals, 292 // If not null, the address of a flag to set when the dispatch event signals,
306 // in lieu of actually dispatching messages. This is used by 293 // in lieu of actually dispatching messages. This is used by
307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're 294 // SyncChannel::WaitForReply to restrict the scope of queued messages we're
308 // allowed to process while it's waiting. 295 // allowed to process while it's waiting.
309 bool* dispatch_flag_ = nullptr; 296 bool* dispatch_flag_ = nullptr;
310 297
311 // Watches |dispatch_event_| during all sync handle watches on this thread. 298 // Watches |dispatch_event_| during all sync handle watches on this thread.
312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; 299 std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
313 }; 300 };
314 301
315 base::LazyInstance<base::ThreadLocalPointer< 302 base::LazyInstance<base::ThreadLocalPointer<
316 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit 303 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit
317 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 304 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
318 LAZY_INSTANCE_INITIALIZER; 305 LAZY_INSTANCE_INITIALIZER;
319 306
320 SyncChannel::SyncContext::SyncContext( 307 SyncChannel::SyncContext::SyncContext(
321 Listener* listener, 308 Listener* listener,
322 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 309 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
323 WaitableEvent* shutdown_event) 310 WaitableEvent* shutdown_event)
324 : ChannelProxy::Context(listener, ipc_task_runner), 311 : ChannelProxy::Context(listener, ipc_task_runner),
325 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 312 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
326 shutdown_event_(shutdown_event), 313 shutdown_event_(shutdown_event),
327 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 314 restrict_dispatch_group_(kRestrictDispatchGroup_None) {
328 } 315 }
329 316
317 void SyncChannel::SyncContext::OnSendDoneEventSignaled(
318 base::RunLoop* nested_loop,
319 base::WaitableEvent* event) {
320 DCHECK_EQ(GetSendDoneEvent(), event);
321 nested_loop->Quit();
322 }
323
330 SyncChannel::SyncContext::~SyncContext() { 324 SyncChannel::SyncContext::~SyncContext() {
331 while (!deserializers_.empty()) 325 while (!deserializers_.empty())
332 Pop(); 326 Pop();
333 } 327 }
334 328
335 // Adds information about an outgoing sync message to the context so that 329 // Adds information about an outgoing sync message to the context so that
336 // we know how to deserialize the reply. Returns |true| if the message was added 330 // we know how to deserialize the reply. Returns |true| if the message was added
337 // to the context or |false| if it was rejected (e.g. due to shutdown.) 331 // to the context or |false| if it was rejected (e.g. due to shutdown.)
338 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 332 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
339 // Create the tracking information for this message. This object is stored 333 // Create the tracking information for this message. This object is stored
340 // by value since all members are pointers that are cheap to copy. These 334 // by value since all members are pointers that are cheap to copy. These
341 // pointers are cleaned up in the Pop() function. 335 // pointers are cleaned up in the Pop() function.
342 // 336 //
343 // The event is created as manual reset because in between Signal and 337 // The event is created as manual reset because in between Signal and
344 // OnObjectSignalled, another Send can happen which would stop the watcher 338 // OnObjectSignalled, another Send can happen which would stop the watcher
345 // from being called. The event would get watched later, when the nested 339 // from being called. The event would get watched later, when the nested
346 // Send completes, so the event will need to remain set. 340 // Send completes, so the event will need to remain set.
347 base::AutoLock auto_lock(deserializers_lock_); 341 base::AutoLock auto_lock(deserializers_lock_);
348 if (reject_new_deserializers_) 342 if (reject_new_deserializers_)
349 return false; 343 return false;
350 PendingSyncMsg pending( 344 PendingSyncMsg pending(
351 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), 345 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
352 new MojoEvent); 346 new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
347 base::WaitableEvent::InitialState::NOT_SIGNALED));
353 deserializers_.push_back(pending); 348 deserializers_.push_back(pending);
354 return true; 349 return true;
355 } 350 }
356 351
357 bool SyncChannel::SyncContext::Pop() { 352 bool SyncChannel::SyncContext::Pop() {
358 bool result; 353 bool result;
359 { 354 {
360 base::AutoLock auto_lock(deserializers_lock_); 355 base::AutoLock auto_lock(deserializers_lock_);
361 PendingSyncMsg msg = deserializers_.back(); 356 PendingSyncMsg msg = deserializers_.back();
362 delete msg.deserializer; 357 delete msg.deserializer;
363 delete msg.done_event; 358 delete msg.done_event;
364 msg.done_event = nullptr; 359 msg.done_event = nullptr;
365 deserializers_.pop_back(); 360 deserializers_.pop_back();
366 result = msg.send_result; 361 result = msg.send_result;
367 } 362 }
368 363
369 // We got a reply to a synchronous Send() call that's blocking the listener 364 // We got a reply to a synchronous Send() call that's blocking the listener
370 // thread. However, further down the call stack there could be another 365 // thread. However, further down the call stack there could be another
371 // blocking Send() call, whose reply we received after we made this last 366 // blocking Send() call, whose reply we received after we made this last
372 // Send() call. So check if we have any queued replies available that 367 // Send() call. So check if we have any queued replies available that
373 // can now unblock the listener thread. 368 // can now unblock the listener thread.
374 ipc_task_runner()->PostTask( 369 ipc_task_runner()->PostTask(
375 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 370 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
376 received_sync_msgs_)); 371 received_sync_msgs_));
377 372
378 return result; 373 return result;
379 } 374 }
380 375
381 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 376 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
382 base::AutoLock auto_lock(deserializers_lock_); 377 base::AutoLock auto_lock(deserializers_lock_);
383 return deserializers_.back().done_event; 378 return deserializers_.back().done_event;
384 } 379 }
385 380
386 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { 381 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
387 return received_sync_msgs_->dispatch_event(); 382 return received_sync_msgs_->dispatch_event();
388 } 383 }
389 384
390 void SyncChannel::SyncContext::DispatchMessages() { 385 void SyncChannel::SyncContext::DispatchMessages() {
391 received_sync_msgs_->DispatchMessages(this); 386 received_sync_msgs_->DispatchMessages(this);
392 } 387 }
393 388
394 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 389 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
395 base::AutoLock auto_lock(deserializers_lock_); 390 base::AutoLock auto_lock(deserializers_lock_);
396 if (deserializers_.empty() || 391 if (deserializers_.empty() ||
397 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 392 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
398 return false; 393 return false;
399 } 394 }
400 395
401 if (!msg->is_reply_error()) { 396 if (!msg->is_reply_error()) {
402 bool send_result = deserializers_.back().deserializer-> 397 bool send_result = deserializers_.back().deserializer->
403 SerializeOutputParameters(*msg); 398 SerializeOutputParameters(*msg);
404 deserializers_.back().send_result = send_result; 399 deserializers_.back().send_result = send_result;
405 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; 400 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
406 } else { 401 } else {
407 DVLOG(1) << "Received error reply"; 402 DVLOG(1) << "Received error reply";
408 } 403 }
409 404
410 MojoEvent* done_event = deserializers_.back().done_event; 405 base::WaitableEvent* done_event = deserializers_.back().done_event;
411 TRACE_EVENT_FLOW_BEGIN0( 406 TRACE_EVENT_FLOW_BEGIN0(
412 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 407 TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
413 "SyncChannel::SyncContext::TryToUnblockListener", done_event); 408 "SyncChannel::SyncContext::TryToUnblockListener", done_event);
414 409
415 done_event->Signal(); 410 done_event->Signal();
416 411
417 return true; 412 return true;
418 } 413 }
419 414
420 void SyncChannel::SyncContext::Clear() { 415 void SyncChannel::SyncContext::Clear() {
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
519 WaitableEvent* shutdown_event) { 514 WaitableEvent* shutdown_event) {
520 return base::WrapUnique( 515 return base::WrapUnique(
521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); 516 new SyncChannel(listener, ipc_task_runner, shutdown_event));
522 } 517 }
523 518
524 SyncChannel::SyncChannel( 519 SyncChannel::SyncChannel(
525 Listener* listener, 520 Listener* listener,
526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 521 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
527 WaitableEvent* shutdown_event) 522 WaitableEvent* shutdown_event)
528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), 523 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), 524 sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
530 dispatch_watcher_(FROM_HERE,
531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
532 // The current (listener) thread must be distinct from the IPC thread, or else 525 // The current (listener) thread must be distinct from the IPC thread, or else
533 // sending synchronous messages will deadlock. 526 // sending synchronous messages will deadlock.
534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); 527 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
535 StartWatching(); 528 StartWatching();
536 } 529 }
537 530
538 SyncChannel::~SyncChannel() { 531 SyncChannel::~SyncChannel() {
539 } 532 }
540 533
541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 534 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
589 "SyncChannel::Send", context->GetSendDoneEvent()); 582 "SyncChannel::Send", context->GetSendDoneEvent());
590 583
591 return context->Pop(); 584 return context->Pop();
592 } 585 }
593 586
594 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, 587 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
595 SyncContext* context, 588 SyncContext* context,
596 bool pump_messages) { 589 bool pump_messages) {
597 context->DispatchMessages(); 590 context->DispatchMessages();
598 591
599 const MojoEvent* pump_messages_event = nullptr; 592 base::WaitableEvent* pump_messages_event = nullptr;
600 if (pump_messages) 593 if (pump_messages) {
601 pump_messages_event = g_pump_messages_event.Get().event(); 594 if (!g_pump_messages_event.Get()) {
595 g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>(
596 base::WaitableEvent::ResetPolicy::MANUAL,
597 base::WaitableEvent::InitialState::SIGNALED);
598 }
599 pump_messages_event = g_pump_messages_event.Get().get();
600 }
602 601
603 while (true) { 602 while (true) {
604 bool dispatch = false; 603 bool dispatch = false;
605 bool send_done = false; 604 bool send_done = false;
606 bool should_pump_messages = false; 605 bool should_pump_messages = false;
607 bool error = false; 606 bool registered = registry->RegisterEvent(
608 bool registered = registry->RegisterHandle( 607 context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done));
609 context->GetSendDoneEvent()->GetHandle(),
610 MOJO_HANDLE_SIGNAL_READABLE,
611 base::Bind(&OnSyncHandleReady, &send_done, &error));
612 DCHECK(registered); 608 DCHECK(registered);
609
613 if (pump_messages_event) { 610 if (pump_messages_event) {
614 registered = registry->RegisterHandle( 611 registered = registry->RegisterEvent(
615 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, 612 pump_messages_event,
616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); 613 base::Bind(&OnEventReady, &should_pump_messages));
617 DCHECK(registered); 614 DCHECK(registered);
618 } 615 }
619 616
620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; 617 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
621 context->received_sync_msgs()->BlockDispatch(&dispatch); 618 context->received_sync_msgs()->BlockDispatch(&dispatch);
622 registry->WatchAllHandles(stop_flags, 3); 619 registry->WatchAllHandles(stop_flags, 3);
623 context->received_sync_msgs()->UnblockDispatch(); 620 context->received_sync_msgs()->UnblockDispatch();
624 DCHECK(!error);
625 621
626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); 622 registry->UnregisterEvent(context->GetSendDoneEvent());
627 if (pump_messages_event) 623 if (pump_messages_event)
628 registry->UnregisterHandle(pump_messages_event->GetHandle()); 624 registry->UnregisterEvent(pump_messages_event);
629 625
630 if (dispatch) { 626 if (dispatch) {
631 // We're waiting for a reply, but we received a blocking synchronous call. 627 // We're waiting for a reply, but we received a blocking synchronous call.
632 // We must process it to avoid potential deadlocks. 628 // We must process it to avoid potential deadlocks.
633 context->GetDispatchEvent()->Reset(); 629 context->GetDispatchEvent()->Reset();
634 context->DispatchMessages(); 630 context->DispatchMessages();
635 continue; 631 continue;
636 } 632 }
637 633
638 if (should_pump_messages) 634 if (should_pump_messages)
639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 635 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
640 636
641 break; 637 break;
642 } 638 }
643 } 639 }
644 640
645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 641 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
646 mojo::SimpleWatcher send_done_watcher( 642 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); 643 DCHECK(sync_msg_queue);
648 644
649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 645 base::RunLoop nested_loop;
650 DCHECK_NE(sync_msg_queue, nullptr);
651 646
652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); 647 // WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we can nest
653 mojo::Handle old_handle(mojo::kInvalidHandleValue); 648 // waiting message loops arbitrarily deep on the SyncChannel's thread. Every
654 mojo::SimpleWatcher::ReadyCallback old_callback; 649 // such operation has a corresponding WaitableEvent to be watched which, when
650 // signalled for IPC completion, breaks out of the loop. The innermost (i.e.
651 // topmost) such event is stored in |sync_msg_queue| state.
652 //
653 // Here we preserve the current top-of-stack event state (if any) within the
654 // local stack frame. We then replace the event state in |sync_msg_queue| with
655 // our own and run a nested loop. If a subsequent nested loop is started
656 // therein the process is repeated in that stack frame, and so on.
657 //
658 // Once the innermost nested loop is broken, the locally preserved event state
659 // is swapped back into |sync_msg_queue| before unwinding the stack.
660 base::WaitableEventWatcher* outer_watcher = nullptr;
yzshen1 2017/03/23 20:15:50 Does it make sense to group these three things int
Ken Rockot(use gerrit already) 2017/03/23 22:04:20 Sure, that sounds nice. In fact I've gone and move
661 base::WaitableEvent* outer_event = nullptr;
662 base::WaitableEventWatcher::EventCallback outer_callback;
663 base::WaitableEventWatcher send_done_watcher;
664 base::WaitableEvent* event = context->GetSendDoneEvent();
665 const base::WaitableEventWatcher::EventCallback callback =
666 base::Bind(&SyncContext::OnSendDoneEventSignaled, context, &nested_loop);
667 sync_msg_queue->SetTopSendDoneState(&send_done_watcher, event, callback,
668 &outer_watcher, &outer_event,
669 &outer_callback);
670 if (outer_watcher)
671 outer_watcher->StopWatching();
672 send_done_watcher.StartWatching(event, callback);
655 673
656 // Maintain a thread-local stack of watchers to ensure nested calls complete 674 base::MessageLoop::ScopedNestableTaskAllower allow(
657 // in the correct sequence, i.e. the outermost call completes first, etc. 675 base::MessageLoop::current());
658 if (old_watcher) { 676 nested_loop.Run();
659 old_callback = old_watcher->ready_callback();
660 old_handle = old_watcher->handle();
661 old_watcher->Cancel();
662 }
663 677
664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); 678 sync_msg_queue->SetTopSendDoneState(
665 679 outer_watcher, outer_event, outer_callback, nullptr, nullptr, nullptr);
666 { 680 if (outer_watcher)
667 base::RunLoop nested_loop; 681 outer_watcher->StartWatching(outer_event, outer_callback);
668 send_done_watcher.Watch(
669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
671
672 base::MessageLoop::ScopedNestableTaskAllower allow(
673 base::MessageLoop::current());
674 nested_loop.Run();
675 send_done_watcher.Cancel();
676 }
677
678 sync_msg_queue->set_top_send_done_watcher(old_watcher);
679 if (old_watcher)
680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
681 } 682 }
682 683
683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { 684 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
684 DCHECK_EQ(result, MOJO_RESULT_OK); 685 DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
685 sync_context()->GetDispatchEvent()->Reset(); 686 sync_context()->GetDispatchEvent()->Reset();
687
688 StartWatching();
689
690 // NOTE: May delete |this|.
686 sync_context()->DispatchMessages(); 691 sync_context()->DispatchMessages();
687 } 692 }
688 693
689 void SyncChannel::StartWatching() { 694 void SyncChannel::StartWatching() {
690 // |dispatch_watcher_| watches the event asynchronously, only dispatching 695 // |dispatch_watcher_| watches the event asynchronously, only dispatching
691 // messages once the listener thread is unblocked and pumping its task queue. 696 // messages once the listener thread is unblocked and pumping its task queue.
692 // The ReceivedSyncMsgQueue also watches this event and may dispatch 697 // The ReceivedSyncMsgQueue also watches this event and may dispatch
693 // immediately if woken up by a message which it's allowed to dispatch. 698 // immediately if woken up by a message which it's allowed to dispatch.
694 dispatch_watcher_.Watch( 699 dispatch_watcher_.StartWatching(
695 sync_context()->GetDispatchEvent()->GetHandle(), 700 sync_context()->GetDispatchEvent(),
696 MOJO_HANDLE_SIGNAL_READABLE, 701 base::Bind(&SyncChannel::OnDispatchEventSignaled,
697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); 702 base::Unretained(this)));
698 } 703 }
699 704
700 void SyncChannel::OnChannelInit() { 705 void SyncChannel::OnChannelInit() {
701 pre_init_sync_message_filters_.clear(); 706 pre_init_sync_message_filters_.clear();
702 } 707 }
703 708
704 } // namespace IPC 709 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698