Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Side by Side Diff: ipc/ipc_sync_message_filter.cc

Issue 2754143005: Use WaitableEvents to wake up sync IPC waiting (Closed)
Patch Set: . Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_sync_message_filter.h ('k') | ipc/mojo_event.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_message_filter.h" 5 #include "ipc/ipc_sync_message_filter.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/logging.h" 9 #include "base/logging.h"
10 #include "base/macros.h"
11 #include "base/memory/ptr_util.h" 10 #include "base/memory/ptr_util.h"
12 #include "base/memory/ref_counted.h" 11 #include "base/memory/ref_counted.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/synchronization/waitable_event.h" 12 #include "base/synchronization/waitable_event.h"
16 #include "base/threading/thread_task_runner_handle.h" 13 #include "base/threading/thread_task_runner_handle.h"
17 #include "ipc/ipc_channel.h" 14 #include "ipc/ipc_channel.h"
18 #include "ipc/ipc_sync_message.h" 15 #include "ipc/ipc_sync_message.h"
19 #include "ipc/mojo_event.h"
20 #include "mojo/public/cpp/bindings/sync_handle_registry.h" 16 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
21 17
22 namespace IPC { 18 namespace IPC {
23 19
24 namespace { 20 namespace {
25 21
26 // A generic callback used when watching handles synchronously. Sets |*signal| 22 // A generic callback used when watching handles synchronously. Sets |*signal|
27 // to true. Also sets |*error| to true in case of an error. 23 // to true.
28 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { 24 void OnEventReady(bool* signal) {
29 *signal = true; 25 *signal = true;
30 *error = result != MOJO_RESULT_OK;
31 } 26 }
32 27
33 } // namespace 28 } // namespace
34 29
35 // A helper class created by SyncMessageFilter to watch the lifetime of the IO
36 // MessageLoop. This holds a weak ref to the SyncMessageFilter and notifies it
37 // on its own thread if the SyncMessageFilter is still alive at the time of
38 // IO MessageLoop destruction.
39 class SyncMessageFilter::IOMessageLoopObserver
40 : public base::MessageLoop::DestructionObserver,
41 public base::RefCountedThreadSafe<IOMessageLoopObserver> {
42 public:
43 IOMessageLoopObserver(
44 base::WeakPtr<SyncMessageFilter> weak_filter,
45 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner)
46 : weak_filter_(weak_filter), filter_task_runner_(filter_task_runner) {}
47
48 void StartOnIOThread() {
49 DCHECK(!watching_);
50 watching_ = true;
51 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
52 base::MessageLoop::current()->AddDestructionObserver(this);
53 }
54
55 void Stop() {
56 if (!io_task_runner_)
57 return;
58
59 if (io_task_runner_->BelongsToCurrentThread()) {
60 StopOnIOThread();
61 } else {
62 io_task_runner_->PostTask(
63 FROM_HERE, base::Bind(&IOMessageLoopObserver::StopOnIOThread, this));
64 }
65 }
66
67 private:
68 void StopOnIOThread() {
69 DCHECK(io_task_runner_->BelongsToCurrentThread());
70 if (!watching_)
71 return;
72 watching_ = false;
73 base::MessageLoop::current()->RemoveDestructionObserver(this);
74 }
75
76 // base::MessageLoop::DestructionObserver:
77 void WillDestroyCurrentMessageLoop() override {
78 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread());
79 DCHECK(watching_);
80 StopOnIOThread();
81 filter_task_runner_->PostTask(
82 FROM_HERE,
83 base::Bind(&SyncMessageFilter::OnIOMessageLoopDestroyed, weak_filter_));
84 }
85
86 friend class base::RefCountedThreadSafe<IOMessageLoopObserver>;
87
88 ~IOMessageLoopObserver() override {}
89
90 bool watching_ = false;
91 base::WeakPtr<SyncMessageFilter> weak_filter_;
92 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner_;
93 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
94
95 DISALLOW_COPY_AND_ASSIGN(IOMessageLoopObserver);
96 };
97
98 bool SyncMessageFilter::Send(Message* message) { 30 bool SyncMessageFilter::Send(Message* message) {
99 if (!message->is_sync()) { 31 if (!message->is_sync()) {
100 { 32 {
101 base::AutoLock auto_lock(lock_); 33 base::AutoLock auto_lock(lock_);
102 if (!io_task_runner_.get()) { 34 if (!io_task_runner_.get()) {
103 pending_messages_.emplace_back(base::WrapUnique(message)); 35 pending_messages_.emplace_back(base::WrapUnique(message));
104 return true; 36 return true;
105 } 37 }
106 } 38 }
107 io_task_runner_->PostTask( 39 io_task_runner_->PostTask(
108 FROM_HERE, 40 FROM_HERE,
109 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 41 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
110 return true; 42 return true;
111 } 43 }
112 44
113 MojoEvent done_event; 45 base::WaitableEvent done_event(
46 base::WaitableEvent::ResetPolicy::MANUAL,
47 base::WaitableEvent::InitialState::NOT_SIGNALED);
114 PendingSyncMsg pending_message( 48 PendingSyncMsg pending_message(
115 SyncMessage::GetMessageId(*message), 49 SyncMessage::GetMessageId(*message),
116 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), 50 static_cast<SyncMessage*>(message)->GetReplyDeserializer(),
117 &done_event); 51 &done_event);
118 52
119 { 53 {
120 base::AutoLock auto_lock(lock_); 54 base::AutoLock auto_lock(lock_);
121 // Can't use this class on the main thread or else it can lead to deadlocks. 55 // Can't use this class on the main thread or else it can lead to deadlocks.
122 // Also by definition, can't use this on IO thread since we're blocking it. 56 // Also by definition, can't use this on IO thread since we're blocking it.
123 if (base::ThreadTaskRunnerHandle::IsSet()) { 57 if (base::ThreadTaskRunnerHandle::IsSet()) {
124 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); 58 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_);
125 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); 59 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_);
126 } 60 }
127 pending_sync_messages_.insert(&pending_message); 61 pending_sync_messages_.insert(&pending_message);
128 62
129 if (io_task_runner_.get()) { 63 if (io_task_runner_.get()) {
130 io_task_runner_->PostTask( 64 io_task_runner_->PostTask(
131 FROM_HERE, 65 FROM_HERE,
132 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); 66 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message));
133 } else { 67 } else {
134 pending_messages_.emplace_back(base::WrapUnique(message)); 68 pending_messages_.emplace_back(base::WrapUnique(message));
135 } 69 }
136 } 70 }
137 71
138 bool done = false; 72 bool done = false;
139 bool shutdown = false; 73 bool shutdown = false;
140 bool error = false;
141 scoped_refptr<mojo::SyncHandleRegistry> registry = 74 scoped_refptr<mojo::SyncHandleRegistry> registry =
142 mojo::SyncHandleRegistry::current(); 75 mojo::SyncHandleRegistry::current();
143 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(), 76 registry->RegisterEvent(shutdown_event_,
144 MOJO_HANDLE_SIGNAL_READABLE, 77 base::Bind(&OnEventReady, &shutdown));
145 base::Bind(&OnSyncHandleReady, &shutdown, &error)); 78 registry->RegisterEvent(&done_event, base::Bind(&OnEventReady, &done));
146 registry->RegisterHandle(done_event.GetHandle(),
147 MOJO_HANDLE_SIGNAL_READABLE,
148 base::Bind(&OnSyncHandleReady, &done, &error));
149 79
150 const bool* stop_flags[] = { &done, &shutdown }; 80 const bool* stop_flags[] = { &done, &shutdown };
151 registry->WatchAllHandles(stop_flags, 2); 81 registry->Wait(stop_flags, 2);
152 DCHECK(!error);
153
154 if (done) { 82 if (done) {
155 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 83 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
156 "SyncMessageFilter::Send", &done_event); 84 "SyncMessageFilter::Send", &done_event);
157 } 85 }
158 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle()); 86
159 registry->UnregisterHandle(done_event.GetHandle()); 87 registry->UnregisterEvent(shutdown_event_);
88 registry->UnregisterEvent(&done_event);
160 89
161 { 90 {
162 base::AutoLock auto_lock(lock_); 91 base::AutoLock auto_lock(lock_);
163 delete pending_message.deserializer; 92 delete pending_message.deserializer;
164 pending_sync_messages_.erase(&pending_message); 93 pending_sync_messages_.erase(&pending_message);
165 } 94 }
166 95
167 return pending_message.send_result; 96 return pending_message.send_result;
168 } 97 }
169 98
170 void SyncMessageFilter::OnFilterAdded(Channel* channel) { 99 void SyncMessageFilter::OnFilterAdded(Channel* channel) {
171 std::vector<std::unique_ptr<Message>> pending_messages; 100 std::vector<std::unique_ptr<Message>> pending_messages;
172 { 101 {
173 base::AutoLock auto_lock(lock_); 102 base::AutoLock auto_lock(lock_);
174 channel_ = channel; 103 channel_ = channel;
175 104
176 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 105 io_task_runner_ = base::ThreadTaskRunnerHandle::Get();
177 shutdown_watcher_.StartWatching(
178 shutdown_event_,
179 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this));
180 io_message_loop_observer_->StartOnIOThread();
181 std::swap(pending_messages_, pending_messages); 106 std::swap(pending_messages_, pending_messages);
182 } 107 }
183 for (auto& msg : pending_messages) 108 for (auto& msg : pending_messages)
184 SendOnIOThread(msg.release()); 109 SendOnIOThread(msg.release());
185 } 110 }
186 111
187 void SyncMessageFilter::OnChannelError() { 112 void SyncMessageFilter::OnChannelError() {
188 base::AutoLock auto_lock(lock_); 113 base::AutoLock auto_lock(lock_);
189 channel_ = nullptr; 114 channel_ = nullptr;
190 shutdown_watcher_.StopWatching();
191 SignalAllEvents(); 115 SignalAllEvents();
192 } 116 }
193 117
194 void SyncMessageFilter::OnChannelClosing() { 118 void SyncMessageFilter::OnChannelClosing() {
195 base::AutoLock auto_lock(lock_); 119 base::AutoLock auto_lock(lock_);
196 channel_ = nullptr; 120 channel_ = nullptr;
197 shutdown_watcher_.StopWatching();
198 SignalAllEvents(); 121 SignalAllEvents();
199 } 122 }
200 123
201 bool SyncMessageFilter::OnMessageReceived(const Message& message) { 124 bool SyncMessageFilter::OnMessageReceived(const Message& message) {
202 base::AutoLock auto_lock(lock_); 125 base::AutoLock auto_lock(lock_);
203 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 126 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
204 iter != pending_sync_messages_.end(); ++iter) { 127 iter != pending_sync_messages_.end(); ++iter) {
205 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { 128 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) {
206 if (!message.is_reply_error()) { 129 if (!message.is_reply_error()) {
207 (*iter)->send_result = 130 (*iter)->send_result =
208 (*iter)->deserializer->SerializeOutputParameters(message); 131 (*iter)->deserializer->SerializeOutputParameters(message);
209 } 132 }
210 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 133 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
211 "SyncMessageFilter::OnMessageReceived", 134 "SyncMessageFilter::OnMessageReceived",
212 (*iter)->done_event); 135 (*iter)->done_event);
213 (*iter)->done_event->Signal(); 136 (*iter)->done_event->Signal();
214 return true; 137 return true;
215 } 138 }
216 } 139 }
217 140
218 return false; 141 return false;
219 } 142 }
220 143
221 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event) 144 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event)
222 : channel_(nullptr), 145 : channel_(nullptr),
223 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 146 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
224 shutdown_event_(shutdown_event), 147 shutdown_event_(shutdown_event) {}
225 weak_factory_(this) {
226 io_message_loop_observer_ = new IOMessageLoopObserver(
227 weak_factory_.GetWeakPtr(), listener_task_runner_);
228 }
229 148
230 SyncMessageFilter::~SyncMessageFilter() { 149 SyncMessageFilter::~SyncMessageFilter() {}
231 io_message_loop_observer_->Stop();
232 }
233 150
234 void SyncMessageFilter::SendOnIOThread(Message* message) { 151 void SyncMessageFilter::SendOnIOThread(Message* message) {
235 if (channel_) { 152 if (channel_) {
236 channel_->Send(message); 153 channel_->Send(message);
237 return; 154 return;
238 } 155 }
239 156
240 if (message->is_sync()) { 157 if (message->is_sync()) {
241 // We don't know which thread sent it, but it doesn't matter, just signal 158 // We don't know which thread sent it, but it doesn't matter, just signal
242 // them all. 159 // them all.
243 base::AutoLock auto_lock(lock_); 160 base::AutoLock auto_lock(lock_);
244 SignalAllEvents(); 161 SignalAllEvents();
245 } 162 }
246 163
247 delete message; 164 delete message;
248 } 165 }
249 166
250 void SyncMessageFilter::SignalAllEvents() { 167 void SyncMessageFilter::SignalAllEvents() {
251 lock_.AssertAcquired(); 168 lock_.AssertAcquired();
252 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); 169 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin();
253 iter != pending_sync_messages_.end(); ++iter) { 170 iter != pending_sync_messages_.end(); ++iter) {
254 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 171 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
255 "SyncMessageFilter::SignalAllEvents", 172 "SyncMessageFilter::SignalAllEvents",
256 (*iter)->done_event); 173 (*iter)->done_event);
257 (*iter)->done_event->Signal(); 174 (*iter)->done_event->Signal();
258 } 175 }
259 } 176 }
260 177
261 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) {
262 DCHECK_EQ(event, shutdown_event_);
263 shutdown_mojo_event_.Signal();
264 }
265
266 void SyncMessageFilter::OnIOMessageLoopDestroyed() {
267 // Since we use an async WaitableEventWatcher to watch the shutdown event
268 // from the IO thread, we can't forward the shutdown signal after the IO
269 // message loop is destroyed. Since that destruction indicates shutdown
270 // anyway, we manually signal the shutdown event in this case.
271 shutdown_mojo_event_.Signal();
272 }
273
274 void SyncMessageFilter::GetGenericRemoteAssociatedInterface( 178 void SyncMessageFilter::GetGenericRemoteAssociatedInterface(
275 const std::string& interface_name, 179 const std::string& interface_name,
276 mojo::ScopedInterfaceEndpointHandle handle) { 180 mojo::ScopedInterfaceEndpointHandle handle) {
277 base::AutoLock auto_lock(lock_); 181 base::AutoLock auto_lock(lock_);
278 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); 182 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread());
279 if (!channel_) 183 if (!channel_)
280 return; 184 return;
281 185
282 Channel::AssociatedInterfaceSupport* support = 186 Channel::AssociatedInterfaceSupport* support =
283 channel_->GetAssociatedInterfaceSupport(); 187 channel_->GetAssociatedInterfaceSupport();
284 support->GetGenericRemoteAssociatedInterface( 188 support->GetGenericRemoteAssociatedInterface(
285 interface_name, std::move(handle)); 189 interface_name, std::move(handle));
286 } 190 }
287 191
288 } // namespace IPC 192 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_message_filter.h ('k') | ipc/mojo_event.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698