| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 23 matching lines...) Expand all Loading... |
| 34 | 34 |
| 35 namespace { | 35 namespace { |
| 36 | 36 |
| 37 // A generic callback used when watching handles synchronously. Sets |*signal| | 37 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 38 // to true. Also sets |*error| to true in case of an error. | 38 // to true. Also sets |*error| to true in case of an error. |
| 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| 40 *signal = true; | 40 *signal = true; |
| 41 *error = result != MOJO_RESULT_OK; | 41 *error = result != MOJO_RESULT_OK; |
| 42 } | 42 } |
| 43 | 43 |
| 44 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but | 44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result |
| 45 // is only used in cases where failure should be impossible) and runs | 45 // (DCHECKs, but is only used in cases where failure should be impossible) and |
| 46 // |callback|. | 46 // runs |callback|. |
| 47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { | 47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
| 48 DCHECK_EQ(result, MOJO_RESULT_OK); | 48 DCHECK_EQ(result, MOJO_RESULT_OK); |
| 49 callback.Run(); | 49 callback.Run(); |
| 50 } | 50 } |
| 51 | 51 |
| 52 class PumpMessagesEvent { | 52 class PumpMessagesEvent { |
| 53 public: | 53 public: |
| 54 PumpMessagesEvent() { event_.Signal(); } | 54 PumpMessagesEvent() { event_.Signal(); } |
| 55 ~PumpMessagesEvent() {} | 55 ~PumpMessagesEvent() {} |
| 56 | 56 |
| (...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 223 for (size_t i = 0; i < received_replies_.size(); ++i) { | 223 for (size_t i = 0; i < received_replies_.size(); ++i) { |
| 224 Message* message = received_replies_[i].message; | 224 Message* message = received_replies_[i].message; |
| 225 if (received_replies_[i].context->TryToUnblockListener(message)) { | 225 if (received_replies_[i].context->TryToUnblockListener(message)) { |
| 226 delete message; | 226 delete message; |
| 227 received_replies_.erase(received_replies_.begin() + i); | 227 received_replies_.erase(received_replies_.begin() + i); |
| 228 return; | 228 return; |
| 229 } | 229 } |
| 230 } | 230 } |
| 231 } | 231 } |
| 232 | 232 |
| 233 mojo::Watcher* top_send_done_watcher() { | 233 mojo::SimpleWatcher* top_send_done_watcher() { |
| 234 return top_send_done_watcher_; | 234 return top_send_done_watcher_; |
| 235 } | 235 } |
| 236 | 236 |
| 237 void set_top_send_done_watcher(mojo::Watcher* watcher) { | 237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { |
| 238 top_send_done_watcher_ = watcher; | 238 top_send_done_watcher_ = watcher; |
| 239 } | 239 } |
| 240 | 240 |
| 241 private: | 241 private: |
| 242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
| 243 | 243 |
| 244 // See the comment in SyncChannel::SyncChannel for why this event is created | 244 // See the comment in SyncChannel::SyncChannel for why this event is created |
| 245 // as manual reset. | 245 // as manual reset. |
| 246 ReceivedSyncMsgQueue() | 246 ReceivedSyncMsgQueue() |
| 247 : message_queue_version_(0), | 247 : message_queue_version_(0), |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 293 // message. | 293 // message. |
| 294 MojoEvent dispatch_event_; | 294 MojoEvent dispatch_event_; |
| 295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
| 296 base::Lock message_lock_; | 296 base::Lock message_lock_; |
| 297 bool task_pending_; | 297 bool task_pending_; |
| 298 int listener_count_; | 298 int listener_count_; |
| 299 | 299 |
| 300 // The current send done handle watcher for this thread. Used to maintain | 300 // The current send done handle watcher for this thread. Used to maintain |
| 301 // a thread-local stack of send done watchers to ensure that nested sync | 301 // a thread-local stack of send done watchers to ensure that nested sync |
| 302 // message loops complete correctly. | 302 // message loops complete correctly. |
| 303 mojo::Watcher* top_send_done_watcher_; | 303 mojo::SimpleWatcher* top_send_done_watcher_; |
| 304 | 304 |
| 305 // If not null, the address of a flag to set when the dispatch event signals, | 305 // If not null, the address of a flag to set when the dispatch event signals, |
| 306 // in lieu of actually dispatching messages. This is used by | 306 // in lieu of actually dispatching messages. This is used by |
| 307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're | 307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
| 308 // allowed to process while it's waiting. | 308 // allowed to process while it's waiting. |
| 309 bool* dispatch_flag_ = nullptr; | 309 bool* dispatch_flag_ = nullptr; |
| 310 | 310 |
| 311 // Watches |dispatch_event_| during all sync handle watches on this thread. | 311 // Watches |dispatch_event_| during all sync handle watches on this thread. |
| 312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; | 312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; |
| 313 }; | 313 }; |
| (...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 520 return base::WrapUnique( | 520 return base::WrapUnique( |
| 521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
| 522 } | 522 } |
| 523 | 523 |
| 524 SyncChannel::SyncChannel( | 524 SyncChannel::SyncChannel( |
| 525 Listener* listener, | 525 Listener* listener, |
| 526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| 527 WaitableEvent* shutdown_event) | 527 WaitableEvent* shutdown_event) |
| 528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), | 528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
| 529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), | 529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), |
| 530 dispatch_watcher_(FROM_HERE) { | 530 dispatch_watcher_(FROM_HERE, |
| 531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { |
| 531 // The current (listener) thread must be distinct from the IPC thread, or else | 532 // The current (listener) thread must be distinct from the IPC thread, or else |
| 532 // sending synchronous messages will deadlock. | 533 // sending synchronous messages will deadlock. |
| 533 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
| 534 StartWatching(); | 535 StartWatching(); |
| 535 } | 536 } |
| 536 | 537 |
| 537 SyncChannel::~SyncChannel() { | 538 SyncChannel::~SyncChannel() { |
| 538 } | 539 } |
| 539 | 540 |
| 540 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 615 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | 616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| 616 DCHECK(registered); | 617 DCHECK(registered); |
| 617 } | 618 } |
| 618 | 619 |
| 619 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | 620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
| 620 context->received_sync_msgs()->BlockDispatch(&dispatch); | 621 context->received_sync_msgs()->BlockDispatch(&dispatch); |
| 621 registry->WatchAllHandles(stop_flags, 3); | 622 registry->WatchAllHandles(stop_flags, 3); |
| 622 context->received_sync_msgs()->UnblockDispatch(); | 623 context->received_sync_msgs()->UnblockDispatch(); |
| 623 DCHECK(!error); | 624 DCHECK(!error); |
| 624 | 625 |
| 625 | |
| 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| 627 if (pump_messages_event) | 627 if (pump_messages_event) |
| 628 registry->UnregisterHandle(pump_messages_event->GetHandle()); | 628 registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| 629 | 629 |
| 630 if (dispatch) { | 630 if (dispatch) { |
| 631 // We're waiting for a reply, but we received a blocking synchronous call. | 631 // We're waiting for a reply, but we received a blocking synchronous call. |
| 632 // We must process it to avoid potential deadlocks. | 632 // We must process it to avoid potential deadlocks. |
| 633 context->GetDispatchEvent()->Reset(); | 633 context->GetDispatchEvent()->Reset(); |
| 634 context->DispatchMessages(); | 634 context->DispatchMessages(); |
| 635 continue; | 635 continue; |
| 636 } | 636 } |
| 637 | 637 |
| 638 if (should_pump_messages) | 638 if (should_pump_messages) |
| 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
| 640 | 640 |
| 641 break; | 641 break; |
| 642 } | 642 } |
| 643 } | 643 } |
| 644 | 644 |
| 645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
| 646 mojo::Watcher send_done_watcher(FROM_HERE); | 646 mojo::SimpleWatcher send_done_watcher( |
| 647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); |
| 647 | 648 |
| 648 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
| 649 DCHECK_NE(sync_msg_queue, nullptr); | 650 DCHECK_NE(sync_msg_queue, nullptr); |
| 650 | 651 |
| 651 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); | 652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
| 652 mojo::Handle old_handle(mojo::kInvalidHandleValue); | 653 mojo::Handle old_handle(mojo::kInvalidHandleValue); |
| 653 mojo::Watcher::ReadyCallback old_callback; | 654 mojo::SimpleWatcher::ReadyCallback old_callback; |
| 654 | 655 |
| 655 // Maintain a thread-local stack of watchers to ensure nested calls complete | 656 // Maintain a thread-local stack of watchers to ensure nested calls complete |
| 656 // in the correct sequence, i.e. the outermost call completes first, etc. | 657 // in the correct sequence, i.e. the outermost call completes first, etc. |
| 657 if (old_watcher) { | 658 if (old_watcher) { |
| 658 old_callback = old_watcher->ready_callback(); | 659 old_callback = old_watcher->ready_callback(); |
| 659 old_handle = old_watcher->handle(); | 660 old_handle = old_watcher->handle(); |
| 660 old_watcher->Cancel(); | 661 old_watcher->Cancel(); |
| 661 } | 662 } |
| 662 | 663 |
| 663 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
| 664 | 665 |
| 665 { | 666 { |
| 666 base::RunLoop nested_loop; | 667 base::RunLoop nested_loop; |
| 667 send_done_watcher.Start( | 668 send_done_watcher.Watch( |
| 668 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | 669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 669 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); | 670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
| 670 | 671 |
| 671 base::MessageLoop::ScopedNestableTaskAllower allow( | 672 base::MessageLoop::ScopedNestableTaskAllower allow( |
| 672 base::MessageLoop::current()); | 673 base::MessageLoop::current()); |
| 673 nested_loop.Run(); | 674 nested_loop.Run(); |
| 674 send_done_watcher.Cancel(); | 675 send_done_watcher.Cancel(); |
| 675 } | 676 } |
| 676 | 677 |
| 677 sync_msg_queue->set_top_send_done_watcher(old_watcher); | 678 sync_msg_queue->set_top_send_done_watcher(old_watcher); |
| 678 if (old_watcher) | 679 if (old_watcher) |
| 679 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); | 680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
| 680 } | 681 } |
| 681 | 682 |
| 682 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
| 683 DCHECK_EQ(result, MOJO_RESULT_OK); | 684 DCHECK_EQ(result, MOJO_RESULT_OK); |
| 684 sync_context()->GetDispatchEvent()->Reset(); | 685 sync_context()->GetDispatchEvent()->Reset(); |
| 685 sync_context()->DispatchMessages(); | 686 sync_context()->DispatchMessages(); |
| 686 } | 687 } |
| 687 | 688 |
| 688 void SyncChannel::StartWatching() { | 689 void SyncChannel::StartWatching() { |
| 689 // |dispatch_watcher_| watches the event asynchronously, only dispatching | 690 // |dispatch_watcher_| watches the event asynchronously, only dispatching |
| 690 // messages once the listener thread is unblocked and pumping its task queue. | 691 // messages once the listener thread is unblocked and pumping its task queue. |
| 691 // The ReceivedSyncMsgQueue also watches this event and may dispatch | 692 // The ReceivedSyncMsgQueue also watches this event and may dispatch |
| 692 // immediately if woken up by a message which it's allowed to dispatch. | 693 // immediately if woken up by a message which it's allowed to dispatch. |
| 693 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), | 694 dispatch_watcher_.Watch( |
| 694 MOJO_HANDLE_SIGNAL_READABLE, | 695 sync_context()->GetDispatchEvent()->GetHandle(), |
| 695 base::Bind(&SyncChannel::OnDispatchHandleReady, | 696 MOJO_HANDLE_SIGNAL_READABLE, |
| 696 base::Unretained(this))); | 697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); |
| 697 } | 698 } |
| 698 | 699 |
| 699 void SyncChannel::OnChannelInit() { | 700 void SyncChannel::OnChannelInit() { |
| 700 pre_init_sync_message_filters_.clear(); | 701 pre_init_sync_message_filters_.clear(); |
| 701 } | 702 } |
| 702 | 703 |
| 703 } // namespace IPC | 704 } // namespace IPC |
| OLD | NEW |