OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_message_filter.h" | 5 #include "ipc/ipc_sync_message_filter.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/location.h" | 8 #include "base/location.h" |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/macros.h" | |
11 #include "base/memory/ptr_util.h" | 10 #include "base/memory/ptr_util.h" |
12 #include "base/memory/ref_counted.h" | 11 #include "base/memory/ref_counted.h" |
13 #include "base/message_loop/message_loop.h" | |
14 #include "base/single_thread_task_runner.h" | |
15 #include "base/synchronization/waitable_event.h" | 12 #include "base/synchronization/waitable_event.h" |
16 #include "base/threading/thread_task_runner_handle.h" | 13 #include "base/threading/thread_task_runner_handle.h" |
17 #include "ipc/ipc_channel.h" | 14 #include "ipc/ipc_channel.h" |
18 #include "ipc/ipc_sync_message.h" | 15 #include "ipc/ipc_sync_message.h" |
19 #include "ipc/mojo_event.h" | |
20 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | 16 #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
21 | 17 |
22 namespace IPC { | 18 namespace IPC { |
23 | 19 |
24 namespace { | 20 namespace { |
25 | 21 |
26 // A generic callback used when watching handles synchronously. Sets |*signal| | 22 // A generic callback used when watching handles synchronously. Sets |*signal| |
27 // to true. Also sets |*error| to true in case of an error. | 23 // to true. |
28 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 24 void OnEventReady(bool* signal) { |
29 *signal = true; | 25 *signal = true; |
30 *error = result != MOJO_RESULT_OK; | |
31 } | 26 } |
32 | 27 |
33 } // namespace | 28 } // namespace |
34 | 29 |
35 // A helper class created by SyncMessageFilter to watch the lifetime of the IO | |
36 // MessageLoop. This holds a weak ref to the SyncMessageFilter and notifies it | |
37 // on its own thread if the SyncMessageFilter is still alive at the time of | |
38 // IO MessageLoop destruction. | |
39 class SyncMessageFilter::IOMessageLoopObserver | |
40 : public base::MessageLoop::DestructionObserver, | |
41 public base::RefCountedThreadSafe<IOMessageLoopObserver> { | |
42 public: | |
43 IOMessageLoopObserver( | |
44 base::WeakPtr<SyncMessageFilter> weak_filter, | |
45 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner) | |
46 : weak_filter_(weak_filter), filter_task_runner_(filter_task_runner) {} | |
47 | |
48 void StartOnIOThread() { | |
49 DCHECK(!watching_); | |
50 watching_ = true; | |
51 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | |
52 base::MessageLoop::current()->AddDestructionObserver(this); | |
53 } | |
54 | |
55 void Stop() { | |
56 if (!io_task_runner_) | |
57 return; | |
58 | |
59 if (io_task_runner_->BelongsToCurrentThread()) { | |
60 StopOnIOThread(); | |
61 } else { | |
62 io_task_runner_->PostTask( | |
63 FROM_HERE, base::Bind(&IOMessageLoopObserver::StopOnIOThread, this)); | |
64 } | |
65 } | |
66 | |
67 private: | |
68 void StopOnIOThread() { | |
69 DCHECK(io_task_runner_->BelongsToCurrentThread()); | |
70 if (!watching_) | |
71 return; | |
72 watching_ = false; | |
73 base::MessageLoop::current()->RemoveDestructionObserver(this); | |
74 } | |
75 | |
76 // base::MessageLoop::DestructionObserver: | |
77 void WillDestroyCurrentMessageLoop() override { | |
78 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); | |
79 DCHECK(watching_); | |
80 StopOnIOThread(); | |
81 filter_task_runner_->PostTask( | |
82 FROM_HERE, | |
83 base::Bind(&SyncMessageFilter::OnIOMessageLoopDestroyed, weak_filter_)); | |
84 } | |
85 | |
86 friend class base::RefCountedThreadSafe<IOMessageLoopObserver>; | |
87 | |
88 ~IOMessageLoopObserver() override {} | |
89 | |
90 bool watching_ = false; | |
91 base::WeakPtr<SyncMessageFilter> weak_filter_; | |
92 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner_; | |
93 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_; | |
94 | |
95 DISALLOW_COPY_AND_ASSIGN(IOMessageLoopObserver); | |
96 }; | |
97 | |
98 bool SyncMessageFilter::Send(Message* message) { | 30 bool SyncMessageFilter::Send(Message* message) { |
99 if (!message->is_sync()) { | 31 if (!message->is_sync()) { |
100 { | 32 { |
101 base::AutoLock auto_lock(lock_); | 33 base::AutoLock auto_lock(lock_); |
102 if (!io_task_runner_.get()) { | 34 if (!io_task_runner_.get()) { |
103 pending_messages_.emplace_back(base::WrapUnique(message)); | 35 pending_messages_.emplace_back(base::WrapUnique(message)); |
104 return true; | 36 return true; |
105 } | 37 } |
106 } | 38 } |
107 io_task_runner_->PostTask( | 39 io_task_runner_->PostTask( |
108 FROM_HERE, | 40 FROM_HERE, |
109 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 41 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
110 return true; | 42 return true; |
111 } | 43 } |
112 | 44 |
113 MojoEvent done_event; | 45 base::WaitableEvent done_event( |
| 46 base::WaitableEvent::ResetPolicy::MANUAL, |
| 47 base::WaitableEvent::InitialState::NOT_SIGNALED); |
114 PendingSyncMsg pending_message( | 48 PendingSyncMsg pending_message( |
115 SyncMessage::GetMessageId(*message), | 49 SyncMessage::GetMessageId(*message), |
116 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), | 50 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), |
117 &done_event); | 51 &done_event); |
118 | 52 |
119 { | 53 { |
120 base::AutoLock auto_lock(lock_); | 54 base::AutoLock auto_lock(lock_); |
121 // Can't use this class on the main thread or else it can lead to deadlocks. | 55 // Can't use this class on the main thread or else it can lead to deadlocks. |
122 // Also by definition, can't use this on IO thread since we're blocking it. | 56 // Also by definition, can't use this on IO thread since we're blocking it. |
123 if (base::ThreadTaskRunnerHandle::IsSet()) { | 57 if (base::ThreadTaskRunnerHandle::IsSet()) { |
124 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); | 58 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); |
125 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); | 59 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); |
126 } | 60 } |
127 pending_sync_messages_.insert(&pending_message); | 61 pending_sync_messages_.insert(&pending_message); |
128 | 62 |
129 if (io_task_runner_.get()) { | 63 if (io_task_runner_.get()) { |
130 io_task_runner_->PostTask( | 64 io_task_runner_->PostTask( |
131 FROM_HERE, | 65 FROM_HERE, |
132 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 66 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
133 } else { | 67 } else { |
134 pending_messages_.emplace_back(base::WrapUnique(message)); | 68 pending_messages_.emplace_back(base::WrapUnique(message)); |
135 } | 69 } |
136 } | 70 } |
137 | 71 |
138 bool done = false; | 72 bool done = false; |
139 bool shutdown = false; | 73 bool shutdown = false; |
140 bool error = false; | |
141 scoped_refptr<mojo::SyncHandleRegistry> registry = | 74 scoped_refptr<mojo::SyncHandleRegistry> registry = |
142 mojo::SyncHandleRegistry::current(); | 75 mojo::SyncHandleRegistry::current(); |
143 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(), | 76 registry->RegisterEvent(shutdown_event_, |
144 MOJO_HANDLE_SIGNAL_READABLE, | 77 base::Bind(&OnEventReady, &shutdown)); |
145 base::Bind(&OnSyncHandleReady, &shutdown, &error)); | 78 registry->RegisterEvent(&done_event, base::Bind(&OnEventReady, &done)); |
146 registry->RegisterHandle(done_event.GetHandle(), | |
147 MOJO_HANDLE_SIGNAL_READABLE, | |
148 base::Bind(&OnSyncHandleReady, &done, &error)); | |
149 | 79 |
150 const bool* stop_flags[] = { &done, &shutdown }; | 80 const bool* stop_flags[] = { &done, &shutdown }; |
151 registry->WatchAllHandles(stop_flags, 2); | 81 registry->Wait(stop_flags, 2); |
152 DCHECK(!error); | |
153 | |
154 if (done) { | 82 if (done) { |
155 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 83 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
156 "SyncMessageFilter::Send", &done_event); | 84 "SyncMessageFilter::Send", &done_event); |
157 } | 85 } |
158 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle()); | 86 |
159 registry->UnregisterHandle(done_event.GetHandle()); | 87 registry->UnregisterEvent(shutdown_event_); |
| 88 registry->UnregisterEvent(&done_event); |
160 | 89 |
161 { | 90 { |
162 base::AutoLock auto_lock(lock_); | 91 base::AutoLock auto_lock(lock_); |
163 delete pending_message.deserializer; | 92 delete pending_message.deserializer; |
164 pending_sync_messages_.erase(&pending_message); | 93 pending_sync_messages_.erase(&pending_message); |
165 } | 94 } |
166 | 95 |
167 return pending_message.send_result; | 96 return pending_message.send_result; |
168 } | 97 } |
169 | 98 |
170 void SyncMessageFilter::OnFilterAdded(Channel* channel) { | 99 void SyncMessageFilter::OnFilterAdded(Channel* channel) { |
171 std::vector<std::unique_ptr<Message>> pending_messages; | 100 std::vector<std::unique_ptr<Message>> pending_messages; |
172 { | 101 { |
173 base::AutoLock auto_lock(lock_); | 102 base::AutoLock auto_lock(lock_); |
174 channel_ = channel; | 103 channel_ = channel; |
175 | 104 |
176 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | 105 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); |
177 shutdown_watcher_.StartWatching( | |
178 shutdown_event_, | |
179 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this)); | |
180 io_message_loop_observer_->StartOnIOThread(); | |
181 std::swap(pending_messages_, pending_messages); | 106 std::swap(pending_messages_, pending_messages); |
182 } | 107 } |
183 for (auto& msg : pending_messages) | 108 for (auto& msg : pending_messages) |
184 SendOnIOThread(msg.release()); | 109 SendOnIOThread(msg.release()); |
185 } | 110 } |
186 | 111 |
187 void SyncMessageFilter::OnChannelError() { | 112 void SyncMessageFilter::OnChannelError() { |
188 base::AutoLock auto_lock(lock_); | 113 base::AutoLock auto_lock(lock_); |
189 channel_ = nullptr; | 114 channel_ = nullptr; |
190 shutdown_watcher_.StopWatching(); | |
191 SignalAllEvents(); | 115 SignalAllEvents(); |
192 } | 116 } |
193 | 117 |
194 void SyncMessageFilter::OnChannelClosing() { | 118 void SyncMessageFilter::OnChannelClosing() { |
195 base::AutoLock auto_lock(lock_); | 119 base::AutoLock auto_lock(lock_); |
196 channel_ = nullptr; | 120 channel_ = nullptr; |
197 shutdown_watcher_.StopWatching(); | |
198 SignalAllEvents(); | 121 SignalAllEvents(); |
199 } | 122 } |
200 | 123 |
201 bool SyncMessageFilter::OnMessageReceived(const Message& message) { | 124 bool SyncMessageFilter::OnMessageReceived(const Message& message) { |
202 base::AutoLock auto_lock(lock_); | 125 base::AutoLock auto_lock(lock_); |
203 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 126 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
204 iter != pending_sync_messages_.end(); ++iter) { | 127 iter != pending_sync_messages_.end(); ++iter) { |
205 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { | 128 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { |
206 if (!message.is_reply_error()) { | 129 if (!message.is_reply_error()) { |
207 (*iter)->send_result = | 130 (*iter)->send_result = |
208 (*iter)->deserializer->SerializeOutputParameters(message); | 131 (*iter)->deserializer->SerializeOutputParameters(message); |
209 } | 132 } |
210 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 133 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
211 "SyncMessageFilter::OnMessageReceived", | 134 "SyncMessageFilter::OnMessageReceived", |
212 (*iter)->done_event); | 135 (*iter)->done_event); |
213 (*iter)->done_event->Signal(); | 136 (*iter)->done_event->Signal(); |
214 return true; | 137 return true; |
215 } | 138 } |
216 } | 139 } |
217 | 140 |
218 return false; | 141 return false; |
219 } | 142 } |
220 | 143 |
221 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event) | 144 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event) |
222 : channel_(nullptr), | 145 : channel_(nullptr), |
223 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 146 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
224 shutdown_event_(shutdown_event), | 147 shutdown_event_(shutdown_event) {} |
225 weak_factory_(this) { | |
226 io_message_loop_observer_ = new IOMessageLoopObserver( | |
227 weak_factory_.GetWeakPtr(), listener_task_runner_); | |
228 } | |
229 | 148 |
230 SyncMessageFilter::~SyncMessageFilter() { | 149 SyncMessageFilter::~SyncMessageFilter() {} |
231 io_message_loop_observer_->Stop(); | |
232 } | |
233 | 150 |
234 void SyncMessageFilter::SendOnIOThread(Message* message) { | 151 void SyncMessageFilter::SendOnIOThread(Message* message) { |
235 if (channel_) { | 152 if (channel_) { |
236 channel_->Send(message); | 153 channel_->Send(message); |
237 return; | 154 return; |
238 } | 155 } |
239 | 156 |
240 if (message->is_sync()) { | 157 if (message->is_sync()) { |
241 // We don't know which thread sent it, but it doesn't matter, just signal | 158 // We don't know which thread sent it, but it doesn't matter, just signal |
242 // them all. | 159 // them all. |
243 base::AutoLock auto_lock(lock_); | 160 base::AutoLock auto_lock(lock_); |
244 SignalAllEvents(); | 161 SignalAllEvents(); |
245 } | 162 } |
246 | 163 |
247 delete message; | 164 delete message; |
248 } | 165 } |
249 | 166 |
250 void SyncMessageFilter::SignalAllEvents() { | 167 void SyncMessageFilter::SignalAllEvents() { |
251 lock_.AssertAcquired(); | 168 lock_.AssertAcquired(); |
252 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 169 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
253 iter != pending_sync_messages_.end(); ++iter) { | 170 iter != pending_sync_messages_.end(); ++iter) { |
254 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 171 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
255 "SyncMessageFilter::SignalAllEvents", | 172 "SyncMessageFilter::SignalAllEvents", |
256 (*iter)->done_event); | 173 (*iter)->done_event); |
257 (*iter)->done_event->Signal(); | 174 (*iter)->done_event->Signal(); |
258 } | 175 } |
259 } | 176 } |
260 | 177 |
261 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) { | |
262 DCHECK_EQ(event, shutdown_event_); | |
263 shutdown_mojo_event_.Signal(); | |
264 } | |
265 | |
266 void SyncMessageFilter::OnIOMessageLoopDestroyed() { | |
267 // Since we use an async WaitableEventWatcher to watch the shutdown event | |
268 // from the IO thread, we can't forward the shutdown signal after the IO | |
269 // message loop is destroyed. Since that destruction indicates shutdown | |
270 // anyway, we manually signal the shutdown event in this case. | |
271 shutdown_mojo_event_.Signal(); | |
272 } | |
273 | |
274 void SyncMessageFilter::GetGenericRemoteAssociatedInterface( | 178 void SyncMessageFilter::GetGenericRemoteAssociatedInterface( |
275 const std::string& interface_name, | 179 const std::string& interface_name, |
276 mojo::ScopedInterfaceEndpointHandle handle) { | 180 mojo::ScopedInterfaceEndpointHandle handle) { |
277 base::AutoLock auto_lock(lock_); | 181 base::AutoLock auto_lock(lock_); |
278 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); | 182 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); |
279 if (!channel_) | 183 if (!channel_) |
280 return; | 184 return; |
281 | 185 |
282 Channel::AssociatedInterfaceSupport* support = | 186 Channel::AssociatedInterfaceSupport* support = |
283 channel_->GetAssociatedInterfaceSupport(); | 187 channel_->GetAssociatedInterfaceSupport(); |
284 support->GetGenericRemoteAssociatedInterface( | 188 support->GetGenericRemoteAssociatedInterface( |
285 interface_name, std::move(handle)); | 189 interface_name, std::move(handle)); |
286 } | 190 } |
287 | 191 |
288 } // namespace IPC | 192 } // namespace IPC |
OLD | NEW |