OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
(...skipping 23 matching lines...) Expand all Loading... |
34 | 34 |
35 namespace { | 35 namespace { |
36 | 36 |
37 // A generic callback used when watching handles synchronously. Sets |*signal| | 37 // A generic callback used when watching handles synchronously. Sets |*signal| |
38 // to true. Also sets |*error| to true in case of an error. | 38 // to true. Also sets |*error| to true in case of an error. |
39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
40 *signal = true; | 40 *signal = true; |
41 *error = result != MOJO_RESULT_OK; | 41 *error = result != MOJO_RESULT_OK; |
42 } | 42 } |
43 | 43 |
44 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but | 44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result |
45 // is only used in cases where failure should be impossible) and runs | 45 // (DCHECKs, but is only used in cases where failure should be impossible) and |
46 // |callback|. | 46 // runs |callback|. |
47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { | 47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
48 DCHECK_EQ(result, MOJO_RESULT_OK); | 48 DCHECK_EQ(result, MOJO_RESULT_OK); |
49 callback.Run(); | 49 callback.Run(); |
50 } | 50 } |
51 | 51 |
52 class PumpMessagesEvent { | 52 class PumpMessagesEvent { |
53 public: | 53 public: |
54 PumpMessagesEvent() { event_.Signal(); } | 54 PumpMessagesEvent() { event_.Signal(); } |
55 ~PumpMessagesEvent() {} | 55 ~PumpMessagesEvent() {} |
56 | 56 |
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
223 for (size_t i = 0; i < received_replies_.size(); ++i) { | 223 for (size_t i = 0; i < received_replies_.size(); ++i) { |
224 Message* message = received_replies_[i].message; | 224 Message* message = received_replies_[i].message; |
225 if (received_replies_[i].context->TryToUnblockListener(message)) { | 225 if (received_replies_[i].context->TryToUnblockListener(message)) { |
226 delete message; | 226 delete message; |
227 received_replies_.erase(received_replies_.begin() + i); | 227 received_replies_.erase(received_replies_.begin() + i); |
228 return; | 228 return; |
229 } | 229 } |
230 } | 230 } |
231 } | 231 } |
232 | 232 |
233 mojo::Watcher* top_send_done_watcher() { | 233 mojo::SimpleWatcher* top_send_done_watcher() { |
234 return top_send_done_watcher_; | 234 return top_send_done_watcher_; |
235 } | 235 } |
236 | 236 |
237 void set_top_send_done_watcher(mojo::Watcher* watcher) { | 237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { |
238 top_send_done_watcher_ = watcher; | 238 top_send_done_watcher_ = watcher; |
239 } | 239 } |
240 | 240 |
241 private: | 241 private: |
242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
243 | 243 |
244 // See the comment in SyncChannel::SyncChannel for why this event is created | 244 // See the comment in SyncChannel::SyncChannel for why this event is created |
245 // as manual reset. | 245 // as manual reset. |
246 ReceivedSyncMsgQueue() | 246 ReceivedSyncMsgQueue() |
247 : message_queue_version_(0), | 247 : message_queue_version_(0), |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
293 // message. | 293 // message. |
294 MojoEvent dispatch_event_; | 294 MojoEvent dispatch_event_; |
295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
296 base::Lock message_lock_; | 296 base::Lock message_lock_; |
297 bool task_pending_; | 297 bool task_pending_; |
298 int listener_count_; | 298 int listener_count_; |
299 | 299 |
300 // The current send done handle watcher for this thread. Used to maintain | 300 // The current send done handle watcher for this thread. Used to maintain |
301 // a thread-local stack of send done watchers to ensure that nested sync | 301 // a thread-local stack of send done watchers to ensure that nested sync |
302 // message loops complete correctly. | 302 // message loops complete correctly. |
303 mojo::Watcher* top_send_done_watcher_; | 303 mojo::SimpleWatcher* top_send_done_watcher_; |
304 | 304 |
305 // If not null, the address of a flag to set when the dispatch event signals, | 305 // If not null, the address of a flag to set when the dispatch event signals, |
306 // in lieu of actually dispatching messages. This is used by | 306 // in lieu of actually dispatching messages. This is used by |
307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're | 307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
308 // allowed to process while it's waiting. | 308 // allowed to process while it's waiting. |
309 bool* dispatch_flag_ = nullptr; | 309 bool* dispatch_flag_ = nullptr; |
310 | 310 |
311 // Watches |dispatch_event_| during all sync handle watches on this thread. | 311 // Watches |dispatch_event_| during all sync handle watches on this thread. |
312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; | 312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; |
313 }; | 313 }; |
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
520 return base::WrapUnique( | 520 return base::WrapUnique( |
521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
522 } | 522 } |
523 | 523 |
524 SyncChannel::SyncChannel( | 524 SyncChannel::SyncChannel( |
525 Listener* listener, | 525 Listener* listener, |
526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
527 WaitableEvent* shutdown_event) | 527 WaitableEvent* shutdown_event) |
528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), | 528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), | 529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), |
530 dispatch_watcher_(FROM_HERE) { | 530 dispatch_watcher_(FROM_HERE, |
| 531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { |
531 // The current (listener) thread must be distinct from the IPC thread, or else | 532 // The current (listener) thread must be distinct from the IPC thread, or else |
532 // sending synchronous messages will deadlock. | 533 // sending synchronous messages will deadlock. |
533 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
534 StartWatching(); | 535 StartWatching(); |
535 } | 536 } |
536 | 537 |
537 SyncChannel::~SyncChannel() { | 538 SyncChannel::~SyncChannel() { |
538 } | 539 } |
539 | 540 |
540 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
615 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | 616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
616 DCHECK(registered); | 617 DCHECK(registered); |
617 } | 618 } |
618 | 619 |
619 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | 620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
620 context->received_sync_msgs()->BlockDispatch(&dispatch); | 621 context->received_sync_msgs()->BlockDispatch(&dispatch); |
621 registry->WatchAllHandles(stop_flags, 3); | 622 registry->WatchAllHandles(stop_flags, 3); |
622 context->received_sync_msgs()->UnblockDispatch(); | 623 context->received_sync_msgs()->UnblockDispatch(); |
623 DCHECK(!error); | 624 DCHECK(!error); |
624 | 625 |
625 | |
626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
627 if (pump_messages_event) | 627 if (pump_messages_event) |
628 registry->UnregisterHandle(pump_messages_event->GetHandle()); | 628 registry->UnregisterHandle(pump_messages_event->GetHandle()); |
629 | 629 |
630 if (dispatch) { | 630 if (dispatch) { |
631 // We're waiting for a reply, but we received a blocking synchronous call. | 631 // We're waiting for a reply, but we received a blocking synchronous call. |
632 // We must process it to avoid potential deadlocks. | 632 // We must process it to avoid potential deadlocks. |
633 context->GetDispatchEvent()->Reset(); | 633 context->GetDispatchEvent()->Reset(); |
634 context->DispatchMessages(); | 634 context->DispatchMessages(); |
635 continue; | 635 continue; |
636 } | 636 } |
637 | 637 |
638 if (should_pump_messages) | 638 if (should_pump_messages) |
639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
640 | 640 |
641 break; | 641 break; |
642 } | 642 } |
643 } | 643 } |
644 | 644 |
645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
646 mojo::Watcher send_done_watcher(FROM_HERE); | 646 mojo::SimpleWatcher send_done_watcher( |
| 647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); |
647 | 648 |
648 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
649 DCHECK_NE(sync_msg_queue, nullptr); | 650 DCHECK_NE(sync_msg_queue, nullptr); |
650 | 651 |
651 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); | 652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
652 mojo::Handle old_handle(mojo::kInvalidHandleValue); | 653 mojo::Handle old_handle(mojo::kInvalidHandleValue); |
653 mojo::Watcher::ReadyCallback old_callback; | 654 mojo::SimpleWatcher::ReadyCallback old_callback; |
654 | 655 |
655 // Maintain a thread-local stack of watchers to ensure nested calls complete | 656 // Maintain a thread-local stack of watchers to ensure nested calls complete |
656 // in the correct sequence, i.e. the outermost call completes first, etc. | 657 // in the correct sequence, i.e. the outermost call completes first, etc. |
657 if (old_watcher) { | 658 if (old_watcher) { |
658 old_callback = old_watcher->ready_callback(); | 659 old_callback = old_watcher->ready_callback(); |
659 old_handle = old_watcher->handle(); | 660 old_handle = old_watcher->handle(); |
660 old_watcher->Cancel(); | 661 old_watcher->Cancel(); |
661 } | 662 } |
662 | 663 |
663 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
664 | 665 |
665 { | 666 { |
666 base::RunLoop nested_loop; | 667 base::RunLoop nested_loop; |
667 send_done_watcher.Start( | 668 send_done_watcher.Watch( |
668 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | 669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
669 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); | 670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
670 | 671 |
671 base::MessageLoop::ScopedNestableTaskAllower allow( | 672 base::MessageLoop::ScopedNestableTaskAllower allow( |
672 base::MessageLoop::current()); | 673 base::MessageLoop::current()); |
673 nested_loop.Run(); | 674 nested_loop.Run(); |
674 send_done_watcher.Cancel(); | 675 send_done_watcher.Cancel(); |
675 } | 676 } |
676 | 677 |
677 sync_msg_queue->set_top_send_done_watcher(old_watcher); | 678 sync_msg_queue->set_top_send_done_watcher(old_watcher); |
678 if (old_watcher) | 679 if (old_watcher) |
679 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); | 680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
680 } | 681 } |
681 | 682 |
682 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
683 DCHECK_EQ(result, MOJO_RESULT_OK); | 684 DCHECK_EQ(result, MOJO_RESULT_OK); |
684 sync_context()->GetDispatchEvent()->Reset(); | 685 sync_context()->GetDispatchEvent()->Reset(); |
685 sync_context()->DispatchMessages(); | 686 sync_context()->DispatchMessages(); |
686 } | 687 } |
687 | 688 |
688 void SyncChannel::StartWatching() { | 689 void SyncChannel::StartWatching() { |
689 // |dispatch_watcher_| watches the event asynchronously, only dispatching | 690 // |dispatch_watcher_| watches the event asynchronously, only dispatching |
690 // messages once the listener thread is unblocked and pumping its task queue. | 691 // messages once the listener thread is unblocked and pumping its task queue. |
691 // The ReceivedSyncMsgQueue also watches this event and may dispatch | 692 // The ReceivedSyncMsgQueue also watches this event and may dispatch |
692 // immediately if woken up by a message which it's allowed to dispatch. | 693 // immediately if woken up by a message which it's allowed to dispatch. |
693 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), | 694 dispatch_watcher_.Watch( |
694 MOJO_HANDLE_SIGNAL_READABLE, | 695 sync_context()->GetDispatchEvent()->GetHandle(), |
695 base::Bind(&SyncChannel::OnDispatchHandleReady, | 696 MOJO_HANDLE_SIGNAL_READABLE, |
696 base::Unretained(this))); | 697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); |
697 } | 698 } |
698 | 699 |
699 void SyncChannel::OnChannelInit() { | 700 void SyncChannel::OnChannelInit() { |
700 pre_init_sync_message_filters_.clear(); | 701 pre_init_sync_message_filters_.clear(); |
701 } | 702 } |
702 | 703 |
703 } // namespace IPC | 704 } // namespace IPC |
OLD | NEW |