Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(538)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: rebase Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
(...skipping 23 matching lines...) Expand all
34 34
35 namespace { 35 namespace {
36 36
37 // A generic callback used when watching handles synchronously. Sets |*signal| 37 // A generic callback used when watching handles synchronously. Sets |*signal|
38 // to true. Also sets |*error| to true in case of an error. 38 // to true. Also sets |*error| to true in case of an error.
39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
40 *signal = true; 40 *signal = true;
41 *error = result != MOJO_RESULT_OK; 41 *error = result != MOJO_RESULT_OK;
42 } 42 }
43 43
44 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but 44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result
45 // is only used in cases where failure should be impossible) and runs 45 // (DCHECKs, but is only used in cases where failure should be impossible) and
46 // |callback|. 46 // runs |callback|.
47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { 47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
48 DCHECK_EQ(result, MOJO_RESULT_OK); 48 DCHECK_EQ(result, MOJO_RESULT_OK);
49 callback.Run(); 49 callback.Run();
50 } 50 }
51 51
52 class PumpMessagesEvent { 52 class PumpMessagesEvent {
53 public: 53 public:
54 PumpMessagesEvent() { event_.Signal(); } 54 PumpMessagesEvent() { event_.Signal(); }
55 ~PumpMessagesEvent() {} 55 ~PumpMessagesEvent() {}
56 56
(...skipping 166 matching lines...) Expand 10 before | Expand all | Expand 10 after
223 for (size_t i = 0; i < received_replies_.size(); ++i) { 223 for (size_t i = 0; i < received_replies_.size(); ++i) {
224 Message* message = received_replies_[i].message; 224 Message* message = received_replies_[i].message;
225 if (received_replies_[i].context->TryToUnblockListener(message)) { 225 if (received_replies_[i].context->TryToUnblockListener(message)) {
226 delete message; 226 delete message;
227 received_replies_.erase(received_replies_.begin() + i); 227 received_replies_.erase(received_replies_.begin() + i);
228 return; 228 return;
229 } 229 }
230 } 230 }
231 } 231 }
232 232
233 mojo::Watcher* top_send_done_watcher() { 233 mojo::SimpleWatcher* top_send_done_watcher() {
234 return top_send_done_watcher_; 234 return top_send_done_watcher_;
235 } 235 }
236 236
237 void set_top_send_done_watcher(mojo::Watcher* watcher) { 237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) {
238 top_send_done_watcher_ = watcher; 238 top_send_done_watcher_ = watcher;
239 } 239 }
240 240
241 private: 241 private:
242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
243 243
244 // See the comment in SyncChannel::SyncChannel for why this event is created 244 // See the comment in SyncChannel::SyncChannel for why this event is created
245 // as manual reset. 245 // as manual reset.
246 ReceivedSyncMsgQueue() 246 ReceivedSyncMsgQueue()
247 : message_queue_version_(0), 247 : message_queue_version_(0),
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
293 // message. 293 // message.
294 MojoEvent dispatch_event_; 294 MojoEvent dispatch_event_;
295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
296 base::Lock message_lock_; 296 base::Lock message_lock_;
297 bool task_pending_; 297 bool task_pending_;
298 int listener_count_; 298 int listener_count_;
299 299
300 // The current send done handle watcher for this thread. Used to maintain 300 // The current send done handle watcher for this thread. Used to maintain
301 // a thread-local stack of send done watchers to ensure that nested sync 301 // a thread-local stack of send done watchers to ensure that nested sync
302 // message loops complete correctly. 302 // message loops complete correctly.
303 mojo::Watcher* top_send_done_watcher_; 303 mojo::SimpleWatcher* top_send_done_watcher_;
304 304
305 // If not null, the address of a flag to set when the dispatch event signals, 305 // If not null, the address of a flag to set when the dispatch event signals,
306 // in lieu of actually dispatching messages. This is used by 306 // in lieu of actually dispatching messages. This is used by
307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're 307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're
308 // allowed to process while it's waiting. 308 // allowed to process while it's waiting.
309 bool* dispatch_flag_ = nullptr; 309 bool* dispatch_flag_ = nullptr;
310 310
311 // Watches |dispatch_event_| during all sync handle watches on this thread. 311 // Watches |dispatch_event_| during all sync handle watches on this thread.
312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; 312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_;
313 }; 313 };
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after
520 return base::WrapUnique( 520 return base::WrapUnique(
521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); 521 new SyncChannel(listener, ipc_task_runner, shutdown_event));
522 } 522 }
523 523
524 SyncChannel::SyncChannel( 524 SyncChannel::SyncChannel(
525 Listener* listener, 525 Listener* listener,
526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
527 WaitableEvent* shutdown_event) 527 WaitableEvent* shutdown_event)
528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), 528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), 529 sync_handle_registry_(mojo::SyncHandleRegistry::current()),
530 dispatch_watcher_(FROM_HERE) { 530 dispatch_watcher_(FROM_HERE,
531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
531 // The current (listener) thread must be distinct from the IPC thread, or else 532 // The current (listener) thread must be distinct from the IPC thread, or else
532 // sending synchronous messages will deadlock. 533 // sending synchronous messages will deadlock.
533 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); 534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
534 StartWatching(); 535 StartWatching();
535 } 536 }
536 537
537 SyncChannel::~SyncChannel() { 538 SyncChannel::~SyncChannel() {
538 } 539 }
539 540
540 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
615 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); 616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
616 DCHECK(registered); 617 DCHECK(registered);
617 } 618 }
618 619
619 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; 620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
620 context->received_sync_msgs()->BlockDispatch(&dispatch); 621 context->received_sync_msgs()->BlockDispatch(&dispatch);
621 registry->WatchAllHandles(stop_flags, 3); 622 registry->WatchAllHandles(stop_flags, 3);
622 context->received_sync_msgs()->UnblockDispatch(); 623 context->received_sync_msgs()->UnblockDispatch();
623 DCHECK(!error); 624 DCHECK(!error);
624 625
625
626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
627 if (pump_messages_event) 627 if (pump_messages_event)
628 registry->UnregisterHandle(pump_messages_event->GetHandle()); 628 registry->UnregisterHandle(pump_messages_event->GetHandle());
629 629
630 if (dispatch) { 630 if (dispatch) {
631 // We're waiting for a reply, but we received a blocking synchronous call. 631 // We're waiting for a reply, but we received a blocking synchronous call.
632 // We must process it to avoid potential deadlocks. 632 // We must process it to avoid potential deadlocks.
633 context->GetDispatchEvent()->Reset(); 633 context->GetDispatchEvent()->Reset();
634 context->DispatchMessages(); 634 context->DispatchMessages();
635 continue; 635 continue;
636 } 636 }
637 637
638 if (should_pump_messages) 638 if (should_pump_messages)
639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
640 640
641 break; 641 break;
642 } 642 }
643 } 643 }
644 644
645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
646 mojo::Watcher send_done_watcher(FROM_HERE); 646 mojo::SimpleWatcher send_done_watcher(
647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC);
647 648
648 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
649 DCHECK_NE(sync_msg_queue, nullptr); 650 DCHECK_NE(sync_msg_queue, nullptr);
650 651
651 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); 652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher();
652 mojo::Handle old_handle(mojo::kInvalidHandleValue); 653 mojo::Handle old_handle(mojo::kInvalidHandleValue);
653 mojo::Watcher::ReadyCallback old_callback; 654 mojo::SimpleWatcher::ReadyCallback old_callback;
654 655
655 // Maintain a thread-local stack of watchers to ensure nested calls complete 656 // Maintain a thread-local stack of watchers to ensure nested calls complete
656 // in the correct sequence, i.e. the outermost call completes first, etc. 657 // in the correct sequence, i.e. the outermost call completes first, etc.
657 if (old_watcher) { 658 if (old_watcher) {
658 old_callback = old_watcher->ready_callback(); 659 old_callback = old_watcher->ready_callback();
659 old_handle = old_watcher->handle(); 660 old_handle = old_watcher->handle();
660 old_watcher->Cancel(); 661 old_watcher->Cancel();
661 } 662 }
662 663
663 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); 664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
664 665
665 { 666 {
666 base::RunLoop nested_loop; 667 base::RunLoop nested_loop;
667 send_done_watcher.Start( 668 send_done_watcher.Watch(
668 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, 669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
669 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); 670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
670 671
671 base::MessageLoop::ScopedNestableTaskAllower allow( 672 base::MessageLoop::ScopedNestableTaskAllower allow(
672 base::MessageLoop::current()); 673 base::MessageLoop::current());
673 nested_loop.Run(); 674 nested_loop.Run();
674 send_done_watcher.Cancel(); 675 send_done_watcher.Cancel();
675 } 676 }
676 677
677 sync_msg_queue->set_top_send_done_watcher(old_watcher); 678 sync_msg_queue->set_top_send_done_watcher(old_watcher);
678 if (old_watcher) 679 if (old_watcher)
679 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); 680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
680 } 681 }
681 682
682 void SyncChannel::OnDispatchHandleReady(MojoResult result) { 683 void SyncChannel::OnDispatchHandleReady(MojoResult result) {
683 DCHECK_EQ(result, MOJO_RESULT_OK); 684 DCHECK_EQ(result, MOJO_RESULT_OK);
684 sync_context()->GetDispatchEvent()->Reset(); 685 sync_context()->GetDispatchEvent()->Reset();
685 sync_context()->DispatchMessages(); 686 sync_context()->DispatchMessages();
686 } 687 }
687 688
688 void SyncChannel::StartWatching() { 689 void SyncChannel::StartWatching() {
689 // |dispatch_watcher_| watches the event asynchronously, only dispatching 690 // |dispatch_watcher_| watches the event asynchronously, only dispatching
690 // messages once the listener thread is unblocked and pumping its task queue. 691 // messages once the listener thread is unblocked and pumping its task queue.
691 // The ReceivedSyncMsgQueue also watches this event and may dispatch 692 // The ReceivedSyncMsgQueue also watches this event and may dispatch
692 // immediately if woken up by a message which it's allowed to dispatch. 693 // immediately if woken up by a message which it's allowed to dispatch.
693 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), 694 dispatch_watcher_.Watch(
694 MOJO_HANDLE_SIGNAL_READABLE, 695 sync_context()->GetDispatchEvent()->GetHandle(),
695 base::Bind(&SyncChannel::OnDispatchHandleReady, 696 MOJO_HANDLE_SIGNAL_READABLE,
696 base::Unretained(this))); 697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this)));
697 } 698 }
698 699
699 void SyncChannel::OnChannelInit() { 700 void SyncChannel::OnChannelInit() {
700 pre_init_sync_message_filters_.clear(); 701 pre_init_sync_message_filters_.clear();
701 } 702 }
702 703
703 } // namespace IPC 704 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698