| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_message_filter.h" | 5 #include "ipc/ipc_sync_message_filter.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/location.h" | 8 #include "base/location.h" |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/macros.h" | |
| 11 #include "base/memory/ptr_util.h" | 10 #include "base/memory/ptr_util.h" |
| 12 #include "base/memory/ref_counted.h" | 11 #include "base/memory/ref_counted.h" |
| 13 #include "base/message_loop/message_loop.h" | |
| 14 #include "base/single_thread_task_runner.h" | |
| 15 #include "base/synchronization/waitable_event.h" | 12 #include "base/synchronization/waitable_event.h" |
| 16 #include "base/threading/thread_task_runner_handle.h" | 13 #include "base/threading/thread_task_runner_handle.h" |
| 17 #include "ipc/ipc_channel.h" | 14 #include "ipc/ipc_channel.h" |
| 18 #include "ipc/ipc_sync_message.h" | 15 #include "ipc/ipc_sync_message.h" |
| 19 #include "ipc/mojo_event.h" | |
| 20 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | 16 #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
| 21 | 17 |
| 22 namespace IPC { | 18 namespace IPC { |
| 23 | 19 |
| 24 namespace { | 20 namespace { |
| 25 | 21 |
| 26 // A generic callback used when watching handles synchronously. Sets |*signal| | 22 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 27 // to true. Also sets |*error| to true in case of an error. | 23 // to true. |
| 28 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 24 void OnEventReady(bool* signal) { |
| 29 *signal = true; | 25 *signal = true; |
| 30 *error = result != MOJO_RESULT_OK; | |
| 31 } | 26 } |
| 32 | 27 |
| 33 } // namespace | 28 } // namespace |
| 34 | 29 |
| 35 // A helper class created by SyncMessageFilter to watch the lifetime of the IO | |
| 36 // MessageLoop. This holds a weak ref to the SyncMessageFilter and notifies it | |
| 37 // on its own thread if the SyncMessageFilter is still alive at the time of | |
| 38 // IO MessageLoop destruction. | |
| 39 class SyncMessageFilter::IOMessageLoopObserver | |
| 40 : public base::MessageLoop::DestructionObserver, | |
| 41 public base::RefCountedThreadSafe<IOMessageLoopObserver> { | |
| 42 public: | |
| 43 IOMessageLoopObserver( | |
| 44 base::WeakPtr<SyncMessageFilter> weak_filter, | |
| 45 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner) | |
| 46 : weak_filter_(weak_filter), filter_task_runner_(filter_task_runner) {} | |
| 47 | |
| 48 void StartOnIOThread() { | |
| 49 DCHECK(!watching_); | |
| 50 watching_ = true; | |
| 51 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | |
| 52 base::MessageLoop::current()->AddDestructionObserver(this); | |
| 53 } | |
| 54 | |
| 55 void Stop() { | |
| 56 if (!io_task_runner_) | |
| 57 return; | |
| 58 | |
| 59 if (io_task_runner_->BelongsToCurrentThread()) { | |
| 60 StopOnIOThread(); | |
| 61 } else { | |
| 62 io_task_runner_->PostTask( | |
| 63 FROM_HERE, base::Bind(&IOMessageLoopObserver::StopOnIOThread, this)); | |
| 64 } | |
| 65 } | |
| 66 | |
| 67 private: | |
| 68 void StopOnIOThread() { | |
| 69 DCHECK(io_task_runner_->BelongsToCurrentThread()); | |
| 70 if (!watching_) | |
| 71 return; | |
| 72 watching_ = false; | |
| 73 base::MessageLoop::current()->RemoveDestructionObserver(this); | |
| 74 } | |
| 75 | |
| 76 // base::MessageLoop::DestructionObserver: | |
| 77 void WillDestroyCurrentMessageLoop() override { | |
| 78 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); | |
| 79 DCHECK(watching_); | |
| 80 StopOnIOThread(); | |
| 81 filter_task_runner_->PostTask( | |
| 82 FROM_HERE, | |
| 83 base::Bind(&SyncMessageFilter::OnIOMessageLoopDestroyed, weak_filter_)); | |
| 84 } | |
| 85 | |
| 86 friend class base::RefCountedThreadSafe<IOMessageLoopObserver>; | |
| 87 | |
| 88 ~IOMessageLoopObserver() override {} | |
| 89 | |
| 90 bool watching_ = false; | |
| 91 base::WeakPtr<SyncMessageFilter> weak_filter_; | |
| 92 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner_; | |
| 93 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_; | |
| 94 | |
| 95 DISALLOW_COPY_AND_ASSIGN(IOMessageLoopObserver); | |
| 96 }; | |
| 97 | |
| 98 bool SyncMessageFilter::Send(Message* message) { | 30 bool SyncMessageFilter::Send(Message* message) { |
| 99 if (!message->is_sync()) { | 31 if (!message->is_sync()) { |
| 100 { | 32 { |
| 101 base::AutoLock auto_lock(lock_); | 33 base::AutoLock auto_lock(lock_); |
| 102 if (!io_task_runner_.get()) { | 34 if (!io_task_runner_.get()) { |
| 103 pending_messages_.emplace_back(base::WrapUnique(message)); | 35 pending_messages_.emplace_back(base::WrapUnique(message)); |
| 104 return true; | 36 return true; |
| 105 } | 37 } |
| 106 } | 38 } |
| 107 io_task_runner_->PostTask( | 39 io_task_runner_->PostTask( |
| 108 FROM_HERE, | 40 FROM_HERE, |
| 109 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 41 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
| 110 return true; | 42 return true; |
| 111 } | 43 } |
| 112 | 44 |
| 113 MojoEvent done_event; | 45 base::WaitableEvent done_event( |
| 46 base::WaitableEvent::ResetPolicy::MANUAL, |
| 47 base::WaitableEvent::InitialState::NOT_SIGNALED); |
| 114 PendingSyncMsg pending_message( | 48 PendingSyncMsg pending_message( |
| 115 SyncMessage::GetMessageId(*message), | 49 SyncMessage::GetMessageId(*message), |
| 116 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), | 50 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), |
| 117 &done_event); | 51 &done_event); |
| 118 | 52 |
| 119 { | 53 { |
| 120 base::AutoLock auto_lock(lock_); | 54 base::AutoLock auto_lock(lock_); |
| 121 // Can't use this class on the main thread or else it can lead to deadlocks. | 55 // Can't use this class on the main thread or else it can lead to deadlocks. |
| 122 // Also by definition, can't use this on IO thread since we're blocking it. | 56 // Also by definition, can't use this on IO thread since we're blocking it. |
| 123 if (base::ThreadTaskRunnerHandle::IsSet()) { | 57 if (base::ThreadTaskRunnerHandle::IsSet()) { |
| 124 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); | 58 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); |
| 125 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); | 59 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); |
| 126 } | 60 } |
| 127 pending_sync_messages_.insert(&pending_message); | 61 pending_sync_messages_.insert(&pending_message); |
| 128 | 62 |
| 129 if (io_task_runner_.get()) { | 63 if (io_task_runner_.get()) { |
| 130 io_task_runner_->PostTask( | 64 io_task_runner_->PostTask( |
| 131 FROM_HERE, | 65 FROM_HERE, |
| 132 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 66 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
| 133 } else { | 67 } else { |
| 134 pending_messages_.emplace_back(base::WrapUnique(message)); | 68 pending_messages_.emplace_back(base::WrapUnique(message)); |
| 135 } | 69 } |
| 136 } | 70 } |
| 137 | 71 |
| 138 bool done = false; | 72 bool done = false; |
| 139 bool shutdown = false; | 73 bool shutdown = false; |
| 140 bool error = false; | |
| 141 scoped_refptr<mojo::SyncHandleRegistry> registry = | 74 scoped_refptr<mojo::SyncHandleRegistry> registry = |
| 142 mojo::SyncHandleRegistry::current(); | 75 mojo::SyncHandleRegistry::current(); |
| 143 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(), | 76 registry->RegisterEvent(shutdown_event_, |
| 144 MOJO_HANDLE_SIGNAL_READABLE, | 77 base::Bind(&OnEventReady, &shutdown)); |
| 145 base::Bind(&OnSyncHandleReady, &shutdown, &error)); | 78 registry->RegisterEvent(&done_event, base::Bind(&OnEventReady, &done)); |
| 146 registry->RegisterHandle(done_event.GetHandle(), | |
| 147 MOJO_HANDLE_SIGNAL_READABLE, | |
| 148 base::Bind(&OnSyncHandleReady, &done, &error)); | |
| 149 | 79 |
| 150 const bool* stop_flags[] = { &done, &shutdown }; | 80 const bool* stop_flags[] = { &done, &shutdown }; |
| 151 registry->WatchAllHandles(stop_flags, 2); | 81 registry->Wait(stop_flags, 2); |
| 152 DCHECK(!error); | |
| 153 | |
| 154 if (done) { | 82 if (done) { |
| 155 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 83 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 156 "SyncMessageFilter::Send", &done_event); | 84 "SyncMessageFilter::Send", &done_event); |
| 157 } | 85 } |
| 158 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle()); | 86 |
| 159 registry->UnregisterHandle(done_event.GetHandle()); | 87 registry->UnregisterEvent(shutdown_event_); |
| 88 registry->UnregisterEvent(&done_event); |
| 160 | 89 |
| 161 { | 90 { |
| 162 base::AutoLock auto_lock(lock_); | 91 base::AutoLock auto_lock(lock_); |
| 163 delete pending_message.deserializer; | 92 delete pending_message.deserializer; |
| 164 pending_sync_messages_.erase(&pending_message); | 93 pending_sync_messages_.erase(&pending_message); |
| 165 } | 94 } |
| 166 | 95 |
| 167 return pending_message.send_result; | 96 return pending_message.send_result; |
| 168 } | 97 } |
| 169 | 98 |
| 170 void SyncMessageFilter::OnFilterAdded(Channel* channel) { | 99 void SyncMessageFilter::OnFilterAdded(Channel* channel) { |
| 171 std::vector<std::unique_ptr<Message>> pending_messages; | 100 std::vector<std::unique_ptr<Message>> pending_messages; |
| 172 { | 101 { |
| 173 base::AutoLock auto_lock(lock_); | 102 base::AutoLock auto_lock(lock_); |
| 174 channel_ = channel; | 103 channel_ = channel; |
| 175 | 104 |
| 176 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | 105 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); |
| 177 shutdown_watcher_.StartWatching( | |
| 178 shutdown_event_, | |
| 179 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this)); | |
| 180 io_message_loop_observer_->StartOnIOThread(); | |
| 181 std::swap(pending_messages_, pending_messages); | 106 std::swap(pending_messages_, pending_messages); |
| 182 } | 107 } |
| 183 for (auto& msg : pending_messages) | 108 for (auto& msg : pending_messages) |
| 184 SendOnIOThread(msg.release()); | 109 SendOnIOThread(msg.release()); |
| 185 } | 110 } |
| 186 | 111 |
| 187 void SyncMessageFilter::OnChannelError() { | 112 void SyncMessageFilter::OnChannelError() { |
| 188 base::AutoLock auto_lock(lock_); | 113 base::AutoLock auto_lock(lock_); |
| 189 channel_ = nullptr; | 114 channel_ = nullptr; |
| 190 shutdown_watcher_.StopWatching(); | |
| 191 SignalAllEvents(); | 115 SignalAllEvents(); |
| 192 } | 116 } |
| 193 | 117 |
| 194 void SyncMessageFilter::OnChannelClosing() { | 118 void SyncMessageFilter::OnChannelClosing() { |
| 195 base::AutoLock auto_lock(lock_); | 119 base::AutoLock auto_lock(lock_); |
| 196 channel_ = nullptr; | 120 channel_ = nullptr; |
| 197 shutdown_watcher_.StopWatching(); | |
| 198 SignalAllEvents(); | 121 SignalAllEvents(); |
| 199 } | 122 } |
| 200 | 123 |
| 201 bool SyncMessageFilter::OnMessageReceived(const Message& message) { | 124 bool SyncMessageFilter::OnMessageReceived(const Message& message) { |
| 202 base::AutoLock auto_lock(lock_); | 125 base::AutoLock auto_lock(lock_); |
| 203 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 126 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
| 204 iter != pending_sync_messages_.end(); ++iter) { | 127 iter != pending_sync_messages_.end(); ++iter) { |
| 205 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { | 128 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { |
| 206 if (!message.is_reply_error()) { | 129 if (!message.is_reply_error()) { |
| 207 (*iter)->send_result = | 130 (*iter)->send_result = |
| 208 (*iter)->deserializer->SerializeOutputParameters(message); | 131 (*iter)->deserializer->SerializeOutputParameters(message); |
| 209 } | 132 } |
| 210 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 133 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 211 "SyncMessageFilter::OnMessageReceived", | 134 "SyncMessageFilter::OnMessageReceived", |
| 212 (*iter)->done_event); | 135 (*iter)->done_event); |
| 213 (*iter)->done_event->Signal(); | 136 (*iter)->done_event->Signal(); |
| 214 return true; | 137 return true; |
| 215 } | 138 } |
| 216 } | 139 } |
| 217 | 140 |
| 218 return false; | 141 return false; |
| 219 } | 142 } |
| 220 | 143 |
| 221 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event) | 144 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event) |
| 222 : channel_(nullptr), | 145 : channel_(nullptr), |
| 223 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 146 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 224 shutdown_event_(shutdown_event), | 147 shutdown_event_(shutdown_event) {} |
| 225 weak_factory_(this) { | |
| 226 io_message_loop_observer_ = new IOMessageLoopObserver( | |
| 227 weak_factory_.GetWeakPtr(), listener_task_runner_); | |
| 228 } | |
| 229 | 148 |
| 230 SyncMessageFilter::~SyncMessageFilter() { | 149 SyncMessageFilter::~SyncMessageFilter() {} |
| 231 io_message_loop_observer_->Stop(); | |
| 232 } | |
| 233 | 150 |
| 234 void SyncMessageFilter::SendOnIOThread(Message* message) { | 151 void SyncMessageFilter::SendOnIOThread(Message* message) { |
| 235 if (channel_) { | 152 if (channel_) { |
| 236 channel_->Send(message); | 153 channel_->Send(message); |
| 237 return; | 154 return; |
| 238 } | 155 } |
| 239 | 156 |
| 240 if (message->is_sync()) { | 157 if (message->is_sync()) { |
| 241 // We don't know which thread sent it, but it doesn't matter, just signal | 158 // We don't know which thread sent it, but it doesn't matter, just signal |
| 242 // them all. | 159 // them all. |
| 243 base::AutoLock auto_lock(lock_); | 160 base::AutoLock auto_lock(lock_); |
| 244 SignalAllEvents(); | 161 SignalAllEvents(); |
| 245 } | 162 } |
| 246 | 163 |
| 247 delete message; | 164 delete message; |
| 248 } | 165 } |
| 249 | 166 |
| 250 void SyncMessageFilter::SignalAllEvents() { | 167 void SyncMessageFilter::SignalAllEvents() { |
| 251 lock_.AssertAcquired(); | 168 lock_.AssertAcquired(); |
| 252 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 169 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
| 253 iter != pending_sync_messages_.end(); ++iter) { | 170 iter != pending_sync_messages_.end(); ++iter) { |
| 254 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 171 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 255 "SyncMessageFilter::SignalAllEvents", | 172 "SyncMessageFilter::SignalAllEvents", |
| 256 (*iter)->done_event); | 173 (*iter)->done_event); |
| 257 (*iter)->done_event->Signal(); | 174 (*iter)->done_event->Signal(); |
| 258 } | 175 } |
| 259 } | 176 } |
| 260 | 177 |
| 261 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) { | |
| 262 DCHECK_EQ(event, shutdown_event_); | |
| 263 shutdown_mojo_event_.Signal(); | |
| 264 } | |
| 265 | |
| 266 void SyncMessageFilter::OnIOMessageLoopDestroyed() { | |
| 267 // Since we use an async WaitableEventWatcher to watch the shutdown event | |
| 268 // from the IO thread, we can't forward the shutdown signal after the IO | |
| 269 // message loop is destroyed. Since that destruction indicates shutdown | |
| 270 // anyway, we manually signal the shutdown event in this case. | |
| 271 shutdown_mojo_event_.Signal(); | |
| 272 } | |
| 273 | |
| 274 void SyncMessageFilter::GetGenericRemoteAssociatedInterface( | 178 void SyncMessageFilter::GetGenericRemoteAssociatedInterface( |
| 275 const std::string& interface_name, | 179 const std::string& interface_name, |
| 276 mojo::ScopedInterfaceEndpointHandle handle) { | 180 mojo::ScopedInterfaceEndpointHandle handle) { |
| 277 base::AutoLock auto_lock(lock_); | 181 base::AutoLock auto_lock(lock_); |
| 278 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); | 182 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); |
| 279 if (!channel_) | 183 if (!channel_) |
| 280 return; | 184 return; |
| 281 | 185 |
| 282 Channel::AssociatedInterfaceSupport* support = | 186 Channel::AssociatedInterfaceSupport* support = |
| 283 channel_->GetAssociatedInterfaceSupport(); | 187 channel_->GetAssociatedInterfaceSupport(); |
| 284 support->GetGenericRemoteAssociatedInterface( | 188 support->GetGenericRemoteAssociatedInterface( |
| 285 interface_name, std::move(handle)); | 189 interface_name, std::move(handle)); |
| 286 } | 190 } |
| 287 | 191 |
| 288 } // namespace IPC | 192 } // namespace IPC |
| OLD | NEW |