Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: ipc/ipc_sync_message_filter.cc

Issue 2033243003: Use Mojo pipes to signal sync IPC events (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_message_filter.h" 5 #include "ipc/ipc_sync_message_filter.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/message_loop/message_loop.h"
10 #include "base/single_thread_task_runner.h" 13 #include "base/single_thread_task_runner.h"
11 #include "base/synchronization/waitable_event.h" 14 #include "base/synchronization/waitable_event.h"
12 #include "base/threading/thread_task_runner_handle.h" 15 #include "base/threading/thread_task_runner_handle.h"
13 #include "ipc/ipc_channel.h" 16 #include "ipc/ipc_channel.h"
14 #include "ipc/ipc_sync_message.h" 17 #include "ipc/ipc_sync_message.h"
18 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
15 19
16 namespace IPC { 20 namespace IPC {
17 21
22 namespace {
23
24 // A generic callback used when watching handles synchronously. Sets |*signal|
25 // to true. Also sets |*error| to true in case of an error.
26 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
27 *signal = true;
28 *error = result != MOJO_RESULT_OK;
29 }
30
31 } // namespace
32
18 bool SyncMessageFilter::Send(Message* message) { 33 bool SyncMessageFilter::Send(Message* message) {
19 if (!message->is_sync()) { 34 if (!message->is_sync()) {
20 { 35 {
21 base::AutoLock auto_lock(lock_); 36 base::AutoLock auto_lock(lock_);
22 if (!io_task_runner_.get()) { 37 if (!io_task_runner_.get()) {
23 pending_messages_.push_back(message); 38 pending_messages_.emplace_back(base::WrapUnique(message));
24 return true; 39 return true;
25 } 40 }
26 } 41 }
27 io_task_runner_->PostTask( 42 io_task_runner_->PostTask(
28 FROM_HERE, 43 FROM_HERE,
29 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 44 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
30 return true; 45 return true;
31 } 46 }
32 47
33 base::WaitableEvent done_event( 48 mojo::Event done_event;
34 base::WaitableEvent::ResetPolicy::MANUAL,
35 base::WaitableEvent::InitialState::NOT_SIGNALED);
36 PendingSyncMsg pending_message( 49 PendingSyncMsg pending_message(
37 SyncMessage::GetMessageId(*message), 50 SyncMessage::GetMessageId(*message),
38 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), 51 static_cast<SyncMessage*>(message)->GetReplyDeserializer(),
39 &done_event); 52 &done_event);
40 53
41 { 54 {
42 base::AutoLock auto_lock(lock_); 55 base::AutoLock auto_lock(lock_);
43 // Can't use this class on the main thread or else it can lead to deadlocks. 56 // Can't use this class on the main thread or else it can lead to deadlocks.
44 // Also by definition, can't use this on IO thread since we're blocking it. 57 // Also by definition, can't use this on IO thread since we're blocking it.
45 if (base::ThreadTaskRunnerHandle::IsSet()) { 58 if (base::ThreadTaskRunnerHandle::IsSet()) {
46 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); 59 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_);
47 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); 60 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_);
48 } 61 }
49 pending_sync_messages_.insert(&pending_message); 62 pending_sync_messages_.insert(&pending_message);
50 63
51 if (io_task_runner_.get()) { 64 if (io_task_runner_.get()) {
52 io_task_runner_->PostTask( 65 io_task_runner_->PostTask(
53 FROM_HERE, 66 FROM_HERE,
54 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 67 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
55 } else { 68 } else {
56 pending_messages_.push_back(message); 69 pending_messages_.emplace_back(base::WrapUnique(message));
57 } 70 }
58 } 71 }
59 72
60 base::WaitableEvent* events[2] = { shutdown_event_, &done_event }; 73 bool done = false;
61 if (base::WaitableEvent::WaitMany(events, 2) == 1) { 74 bool shutdown = false;
75 bool error = false;
76 scoped_refptr<mojo::SyncHandleRegistry> registry =
77 mojo::SyncHandleRegistry::current();
78 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(),
79 MOJO_HANDLE_SIGNAL_READABLE,
80 base::Bind(&OnSyncHandleReady, &shutdown, &error));
81 registry->RegisterHandle(done_event.GetHandle(),
82 MOJO_HANDLE_SIGNAL_READABLE,
83 base::Bind(&OnSyncHandleReady, &done, &error));
84
85 const bool* stop_flags[] = { &done, &shutdown };
86 bool result = registry->WatchAllHandles(stop_flags, 2);
87 DCHECK(result);
88 DCHECK(!error);
89
90 if (done) {
62 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 91 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
63 "SyncMessageFilter::Send", &done_event); 92 "SyncMessageFilter::Send", &done_event);
64 } 93 }
94 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle());
95 registry->UnregisterHandle(done_event.GetHandle());
65 96
66 { 97 {
67 base::AutoLock auto_lock(lock_); 98 base::AutoLock auto_lock(lock_);
68 delete pending_message.deserializer; 99 delete pending_message.deserializer;
69 pending_sync_messages_.erase(&pending_message); 100 pending_sync_messages_.erase(&pending_message);
70 } 101 }
71 102
72 return pending_message.send_result; 103 return pending_message.send_result;
73 } 104 }
74 105
75 void SyncMessageFilter::OnFilterAdded(Sender* sender) { 106 void SyncMessageFilter::OnFilterAdded(Sender* sender) {
76 std::vector<Message*> pending_messages; 107 std::vector<std::unique_ptr<Message>> pending_messages;
77 { 108 {
78 base::AutoLock auto_lock(lock_); 109 base::AutoLock auto_lock(lock_);
79 sender_ = sender; 110 sender_ = sender;
80 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 111 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
81 pending_messages_.release(&pending_messages); 112 shutdown_watcher_.StartWatching(
113 shutdown_event_,
114 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this));
115 std::swap(pending_messages_, pending_messages);
82 } 116 }
83 for (auto* msg : pending_messages) 117 for (auto& msg : pending_messages)
84 SendOnIOThread(msg); 118 SendOnIOThread(msg.release());
85 } 119 }
86 120
87 void SyncMessageFilter::OnChannelError() { 121 void SyncMessageFilter::OnChannelError() {
88 base::AutoLock auto_lock(lock_); 122 base::AutoLock auto_lock(lock_);
89 sender_ = NULL; 123 sender_ = NULL;
124 shutdown_watcher_.StopWatching();
90 SignalAllEvents(); 125 SignalAllEvents();
91 } 126 }
92 127
93 void SyncMessageFilter::OnChannelClosing() { 128 void SyncMessageFilter::OnChannelClosing() {
94 base::AutoLock auto_lock(lock_); 129 base::AutoLock auto_lock(lock_);
95 sender_ = NULL; 130 sender_ = NULL;
131 shutdown_watcher_.StopWatching();
96 SignalAllEvents(); 132 SignalAllEvents();
97 } 133 }
98 134
99 bool SyncMessageFilter::OnMessageReceived(const Message& message) { 135 bool SyncMessageFilter::OnMessageReceived(const Message& message) {
100 base::AutoLock auto_lock(lock_); 136 base::AutoLock auto_lock(lock_);
101 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 137 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
102 iter != pending_sync_messages_.end(); ++iter) { 138 iter != pending_sync_messages_.end(); ++iter) {
103 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { 139 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) {
104 if (!message.is_reply_error()) { 140 if (!message.is_reply_error()) {
105 (*iter)->send_result = 141 (*iter)->send_result =
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
160 lock_.AssertAcquired(); 196 lock_.AssertAcquired();
161 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 197 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
162 iter != pending_sync_messages_.end(); ++iter) { 198 iter != pending_sync_messages_.end(); ++iter) {
163 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 199 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
164 "SyncMessageFilter::SignalAllEvents", 200 "SyncMessageFilter::SignalAllEvents",
165 (*iter)->done_event); 201 (*iter)->done_event);
166 (*iter)->done_event->Signal(); 202 (*iter)->done_event->Signal();
167 } 203 }
168 } 204 }
169 205
206 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) {
207 DCHECK_EQ(event, shutdown_event_);
208 shutdown_mojo_event_.Signal();
209 }
210
170 } // namespace IPC 211 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698