OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 23 matching lines...) Expand all Loading... |
34 | 34 |
35 namespace { | 35 namespace { |
36 | 36 |
37 // A generic callback used when watching handles synchronously. Sets |*signal| | 37 // A generic callback used when watching handles synchronously. Sets |*signal| |
38 // to true. Also sets |*error| to true in case of an error. | 38 // to true. Also sets |*error| to true in case of an error. |
39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
40 *signal = true; | 40 *signal = true; |
41 *error = result != MOJO_RESULT_OK; | 41 *error = result != MOJO_RESULT_OK; |
42 } | 42 } |
43 | 43 |
44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result | 44 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but |
45 // (DCHECKs, but is only used in cases where failure should be impossible) and | 45 // is only used in cases where failure should be impossible) and runs |
46 // runs |callback|. | 46 // |callback|. |
47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { | 47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
48 DCHECK_EQ(result, MOJO_RESULT_OK); | 48 DCHECK_EQ(result, MOJO_RESULT_OK); |
49 callback.Run(); | 49 callback.Run(); |
50 } | 50 } |
51 | 51 |
52 class PumpMessagesEvent { | 52 class PumpMessagesEvent { |
53 public: | 53 public: |
54 PumpMessagesEvent() { event_.Signal(); } | 54 PumpMessagesEvent() { event_.Signal(); } |
55 ~PumpMessagesEvent() {} | 55 ~PumpMessagesEvent() {} |
56 | 56 |
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
223 for (size_t i = 0; i < received_replies_.size(); ++i) { | 223 for (size_t i = 0; i < received_replies_.size(); ++i) { |
224 Message* message = received_replies_[i].message; | 224 Message* message = received_replies_[i].message; |
225 if (received_replies_[i].context->TryToUnblockListener(message)) { | 225 if (received_replies_[i].context->TryToUnblockListener(message)) { |
226 delete message; | 226 delete message; |
227 received_replies_.erase(received_replies_.begin() + i); | 227 received_replies_.erase(received_replies_.begin() + i); |
228 return; | 228 return; |
229 } | 229 } |
230 } | 230 } |
231 } | 231 } |
232 | 232 |
233 mojo::SimpleWatcher* top_send_done_watcher() { | 233 mojo::Watcher* top_send_done_watcher() { |
234 return top_send_done_watcher_; | 234 return top_send_done_watcher_; |
235 } | 235 } |
236 | 236 |
237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { | 237 void set_top_send_done_watcher(mojo::Watcher* watcher) { |
238 top_send_done_watcher_ = watcher; | 238 top_send_done_watcher_ = watcher; |
239 } | 239 } |
240 | 240 |
241 private: | 241 private: |
242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
243 | 243 |
244 // See the comment in SyncChannel::SyncChannel for why this event is created | 244 // See the comment in SyncChannel::SyncChannel for why this event is created |
245 // as manual reset. | 245 // as manual reset. |
246 ReceivedSyncMsgQueue() | 246 ReceivedSyncMsgQueue() |
247 : message_queue_version_(0), | 247 : message_queue_version_(0), |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
293 // message. | 293 // message. |
294 MojoEvent dispatch_event_; | 294 MojoEvent dispatch_event_; |
295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
296 base::Lock message_lock_; | 296 base::Lock message_lock_; |
297 bool task_pending_; | 297 bool task_pending_; |
298 int listener_count_; | 298 int listener_count_; |
299 | 299 |
300 // The current send done handle watcher for this thread. Used to maintain | 300 // The current send done handle watcher for this thread. Used to maintain |
301 // a thread-local stack of send done watchers to ensure that nested sync | 301 // a thread-local stack of send done watchers to ensure that nested sync |
302 // message loops complete correctly. | 302 // message loops complete correctly. |
303 mojo::SimpleWatcher* top_send_done_watcher_; | 303 mojo::Watcher* top_send_done_watcher_; |
304 | 304 |
305 // If not null, the address of a flag to set when the dispatch event signals, | 305 // If not null, the address of a flag to set when the dispatch event signals, |
306 // in lieu of actually dispatching messages. This is used by | 306 // in lieu of actually dispatching messages. This is used by |
307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're | 307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
308 // allowed to process while it's waiting. | 308 // allowed to process while it's waiting. |
309 bool* dispatch_flag_ = nullptr; | 309 bool* dispatch_flag_ = nullptr; |
310 | 310 |
311 // Watches |dispatch_event_| during all sync handle watches on this thread. | 311 // Watches |dispatch_event_| during all sync handle watches on this thread. |
312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; | 312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; |
313 }; | 313 }; |
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
520 return base::WrapUnique( | 520 return base::WrapUnique( |
521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
522 } | 522 } |
523 | 523 |
524 SyncChannel::SyncChannel( | 524 SyncChannel::SyncChannel( |
525 Listener* listener, | 525 Listener* listener, |
526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
527 WaitableEvent* shutdown_event) | 527 WaitableEvent* shutdown_event) |
528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), | 528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), | 529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), |
530 dispatch_watcher_(FROM_HERE, | 530 dispatch_watcher_(FROM_HERE) { |
531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { | |
532 // The current (listener) thread must be distinct from the IPC thread, or else | 531 // The current (listener) thread must be distinct from the IPC thread, or else |
533 // sending synchronous messages will deadlock. | 532 // sending synchronous messages will deadlock. |
534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 533 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
535 StartWatching(); | 534 StartWatching(); |
536 } | 535 } |
537 | 536 |
538 SyncChannel::~SyncChannel() { | 537 SyncChannel::~SyncChannel() { |
539 } | 538 } |
540 | 539 |
541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 540 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | 615 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
617 DCHECK(registered); | 616 DCHECK(registered); |
618 } | 617 } |
619 | 618 |
620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | 619 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
621 context->received_sync_msgs()->BlockDispatch(&dispatch); | 620 context->received_sync_msgs()->BlockDispatch(&dispatch); |
622 registry->WatchAllHandles(stop_flags, 3); | 621 registry->WatchAllHandles(stop_flags, 3); |
623 context->received_sync_msgs()->UnblockDispatch(); | 622 context->received_sync_msgs()->UnblockDispatch(); |
624 DCHECK(!error); | 623 DCHECK(!error); |
625 | 624 |
| 625 |
626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
627 if (pump_messages_event) | 627 if (pump_messages_event) |
628 registry->UnregisterHandle(pump_messages_event->GetHandle()); | 628 registry->UnregisterHandle(pump_messages_event->GetHandle()); |
629 | 629 |
630 if (dispatch) { | 630 if (dispatch) { |
631 // We're waiting for a reply, but we received a blocking synchronous call. | 631 // We're waiting for a reply, but we received a blocking synchronous call. |
632 // We must process it to avoid potential deadlocks. | 632 // We must process it to avoid potential deadlocks. |
633 context->GetDispatchEvent()->Reset(); | 633 context->GetDispatchEvent()->Reset(); |
634 context->DispatchMessages(); | 634 context->DispatchMessages(); |
635 continue; | 635 continue; |
636 } | 636 } |
637 | 637 |
638 if (should_pump_messages) | 638 if (should_pump_messages) |
639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
640 | 640 |
641 break; | 641 break; |
642 } | 642 } |
643 } | 643 } |
644 | 644 |
645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
646 mojo::SimpleWatcher send_done_watcher( | 646 mojo::Watcher send_done_watcher(FROM_HERE); |
647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); | |
648 | 647 |
649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 648 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
650 DCHECK_NE(sync_msg_queue, nullptr); | 649 DCHECK_NE(sync_msg_queue, nullptr); |
651 | 650 |
652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); | 651 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
653 mojo::Handle old_handle(mojo::kInvalidHandleValue); | 652 mojo::Handle old_handle(mojo::kInvalidHandleValue); |
654 mojo::SimpleWatcher::ReadyCallback old_callback; | 653 mojo::Watcher::ReadyCallback old_callback; |
655 | 654 |
656 // Maintain a thread-local stack of watchers to ensure nested calls complete | 655 // Maintain a thread-local stack of watchers to ensure nested calls complete |
657 // in the correct sequence, i.e. the outermost call completes first, etc. | 656 // in the correct sequence, i.e. the outermost call completes first, etc. |
658 if (old_watcher) { | 657 if (old_watcher) { |
659 old_callback = old_watcher->ready_callback(); | 658 old_callback = old_watcher->ready_callback(); |
660 old_handle = old_watcher->handle(); | 659 old_handle = old_watcher->handle(); |
661 old_watcher->Cancel(); | 660 old_watcher->Cancel(); |
662 } | 661 } |
663 | 662 |
664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 663 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
665 | 664 |
666 { | 665 { |
667 base::RunLoop nested_loop; | 666 base::RunLoop nested_loop; |
668 send_done_watcher.Watch( | 667 send_done_watcher.Start( |
669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | 668 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); | 669 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
671 | 670 |
672 base::MessageLoop::ScopedNestableTaskAllower allow( | 671 base::MessageLoop::ScopedNestableTaskAllower allow( |
673 base::MessageLoop::current()); | 672 base::MessageLoop::current()); |
674 nested_loop.Run(); | 673 nested_loop.Run(); |
675 send_done_watcher.Cancel(); | 674 send_done_watcher.Cancel(); |
676 } | 675 } |
677 | 676 |
678 sync_msg_queue->set_top_send_done_watcher(old_watcher); | 677 sync_msg_queue->set_top_send_done_watcher(old_watcher); |
679 if (old_watcher) | 678 if (old_watcher) |
680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); | 679 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
681 } | 680 } |
682 | 681 |
683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 682 void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
684 DCHECK_EQ(result, MOJO_RESULT_OK); | 683 DCHECK_EQ(result, MOJO_RESULT_OK); |
685 sync_context()->GetDispatchEvent()->Reset(); | 684 sync_context()->GetDispatchEvent()->Reset(); |
686 sync_context()->DispatchMessages(); | 685 sync_context()->DispatchMessages(); |
687 } | 686 } |
688 | 687 |
689 void SyncChannel::StartWatching() { | 688 void SyncChannel::StartWatching() { |
690 // |dispatch_watcher_| watches the event asynchronously, only dispatching | 689 // |dispatch_watcher_| watches the event asynchronously, only dispatching |
691 // messages once the listener thread is unblocked and pumping its task queue. | 690 // messages once the listener thread is unblocked and pumping its task queue. |
692 // The ReceivedSyncMsgQueue also watches this event and may dispatch | 691 // The ReceivedSyncMsgQueue also watches this event and may dispatch |
693 // immediately if woken up by a message which it's allowed to dispatch. | 692 // immediately if woken up by a message which it's allowed to dispatch. |
694 dispatch_watcher_.Watch( | 693 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
695 sync_context()->GetDispatchEvent()->GetHandle(), | 694 MOJO_HANDLE_SIGNAL_READABLE, |
696 MOJO_HANDLE_SIGNAL_READABLE, | 695 base::Bind(&SyncChannel::OnDispatchHandleReady, |
697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); | 696 base::Unretained(this))); |
698 } | 697 } |
699 | 698 |
700 void SyncChannel::OnChannelInit() { | 699 void SyncChannel::OnChannelInit() { |
701 pre_init_sync_message_filters_.clear(); | 700 pre_init_sync_message_filters_.clear(); |
702 } | 701 } |
703 | 702 |
704 } // namespace IPC | 703 } // namespace IPC |
OLD | NEW |