| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_message_filter.h" | 5 #include "ipc/ipc_sync_message_filter.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/location.h" | 8 #include "base/location.h" |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/macros.h" | |
| 11 #include "base/memory/ptr_util.h" | |
| 12 #include "base/memory/ref_counted.h" | |
| 13 #include "base/message_loop/message_loop.h" | |
| 14 #include "base/single_thread_task_runner.h" | 10 #include "base/single_thread_task_runner.h" |
| 15 #include "base/synchronization/waitable_event.h" | 11 #include "base/synchronization/waitable_event.h" |
| 16 #include "base/threading/thread_task_runner_handle.h" | 12 #include "base/threading/thread_task_runner_handle.h" |
| 17 #include "ipc/ipc_channel.h" | 13 #include "ipc/ipc_channel.h" |
| 18 #include "ipc/ipc_sync_message.h" | 14 #include "ipc/ipc_sync_message.h" |
| 19 #include "ipc/mojo_event.h" | |
| 20 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | |
| 21 | 15 |
| 22 namespace IPC { | 16 namespace IPC { |
| 23 | 17 |
| 24 namespace { | |
| 25 | |
| 26 // A generic callback used when watching handles synchronously. Sets |*signal| | |
| 27 // to true. Also sets |*error| to true in case of an error. | |
| 28 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | |
| 29 *signal = true; | |
| 30 *error = result != MOJO_RESULT_OK; | |
| 31 } | |
| 32 | |
| 33 } // namespace | |
| 34 | |
| 35 // A helper class created by SyncMessageFilter to watch the lifetime of the IO | |
| 36 // MessageLoop. This holds a weak ref to the SyncMessageFilter and notifies it | |
| 37 // on its own thread if the SyncMessageFilter is still alive at the time of | |
| 38 // IO MessageLoop destruction. | |
| 39 class SyncMessageFilter::IOMessageLoopObserver | |
| 40 : public base::MessageLoop::DestructionObserver, | |
| 41 public base::RefCountedThreadSafe<IOMessageLoopObserver> { | |
| 42 public: | |
| 43 IOMessageLoopObserver( | |
| 44 base::WeakPtr<SyncMessageFilter> weak_filter, | |
| 45 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner) | |
| 46 : weak_filter_(weak_filter), filter_task_runner_(filter_task_runner) {} | |
| 47 | |
| 48 void StartOnIOThread() { | |
| 49 DCHECK(!watching_); | |
| 50 watching_ = true; | |
| 51 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | |
| 52 base::MessageLoop::current()->AddDestructionObserver(this); | |
| 53 } | |
| 54 | |
| 55 void Stop() { | |
| 56 if (!io_task_runner_) | |
| 57 return; | |
| 58 | |
| 59 if (io_task_runner_->BelongsToCurrentThread()) { | |
| 60 StopOnIOThread(); | |
| 61 } else { | |
| 62 io_task_runner_->PostTask( | |
| 63 FROM_HERE, base::Bind(&IOMessageLoopObserver::StopOnIOThread, this)); | |
| 64 } | |
| 65 } | |
| 66 | |
| 67 private: | |
| 68 void StopOnIOThread() { | |
| 69 DCHECK(io_task_runner_->BelongsToCurrentThread()); | |
| 70 if (!watching_) | |
| 71 return; | |
| 72 watching_ = false; | |
| 73 base::MessageLoop::current()->RemoveDestructionObserver(this); | |
| 74 } | |
| 75 | |
| 76 // base::MessageLoop::DestructionObserver: | |
| 77 void WillDestroyCurrentMessageLoop() override { | |
| 78 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); | |
| 79 DCHECK(watching_); | |
| 80 StopOnIOThread(); | |
| 81 filter_task_runner_->PostTask( | |
| 82 FROM_HERE, | |
| 83 base::Bind(&SyncMessageFilter::OnIOMessageLoopDestroyed, weak_filter_)); | |
| 84 } | |
| 85 | |
| 86 friend class base::RefCountedThreadSafe<IOMessageLoopObserver>; | |
| 87 | |
| 88 ~IOMessageLoopObserver() override {} | |
| 89 | |
| 90 bool watching_ = false; | |
| 91 base::WeakPtr<SyncMessageFilter> weak_filter_; | |
| 92 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner_; | |
| 93 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_; | |
| 94 | |
| 95 DISALLOW_COPY_AND_ASSIGN(IOMessageLoopObserver); | |
| 96 }; | |
| 97 | |
| 98 bool SyncMessageFilter::Send(Message* message) { | 18 bool SyncMessageFilter::Send(Message* message) { |
| 99 if (!message->is_sync()) { | 19 if (!message->is_sync()) { |
| 100 { | 20 { |
| 101 base::AutoLock auto_lock(lock_); | 21 base::AutoLock auto_lock(lock_); |
| 102 if (sender_ && is_channel_send_thread_safe_) { | 22 if (sender_ && is_channel_send_thread_safe_) { |
| 103 sender_->Send(message); | 23 sender_->Send(message); |
| 104 return true; | 24 return true; |
| 105 } else if (!io_task_runner_.get()) { | 25 } else if (!io_task_runner_.get()) { |
| 106 pending_messages_.emplace_back(base::WrapUnique(message)); | 26 pending_messages_.push_back(message); |
| 107 return true; | 27 return true; |
| 108 } | 28 } |
| 109 } | 29 } |
| 110 io_task_runner_->PostTask( | 30 io_task_runner_->PostTask( |
| 111 FROM_HERE, | 31 FROM_HERE, |
| 112 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 32 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
| 113 return true; | 33 return true; |
| 114 } | 34 } |
| 115 | 35 |
| 116 MojoEvent done_event; | 36 base::WaitableEvent done_event( |
| 37 base::WaitableEvent::ResetPolicy::MANUAL, |
| 38 base::WaitableEvent::InitialState::NOT_SIGNALED); |
| 117 PendingSyncMsg pending_message( | 39 PendingSyncMsg pending_message( |
| 118 SyncMessage::GetMessageId(*message), | 40 SyncMessage::GetMessageId(*message), |
| 119 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), | 41 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), |
| 120 &done_event); | 42 &done_event); |
| 121 | 43 |
| 122 { | 44 { |
| 123 base::AutoLock auto_lock(lock_); | 45 base::AutoLock auto_lock(lock_); |
| 124 // Can't use this class on the main thread or else it can lead to deadlocks. | 46 // Can't use this class on the main thread or else it can lead to deadlocks. |
| 125 // Also by definition, can't use this on IO thread since we're blocking it. | 47 // Also by definition, can't use this on IO thread since we're blocking it. |
| 126 if (base::ThreadTaskRunnerHandle::IsSet()) { | 48 if (base::ThreadTaskRunnerHandle::IsSet()) { |
| 127 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); | 49 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); |
| 128 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); | 50 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); |
| 129 } | 51 } |
| 130 pending_sync_messages_.insert(&pending_message); | 52 pending_sync_messages_.insert(&pending_message); |
| 131 | 53 |
| 132 if (io_task_runner_.get()) { | 54 if (io_task_runner_.get()) { |
| 133 io_task_runner_->PostTask( | 55 io_task_runner_->PostTask( |
| 134 FROM_HERE, | 56 FROM_HERE, |
| 135 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 57 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
| 136 } else { | 58 } else { |
| 137 pending_messages_.emplace_back(base::WrapUnique(message)); | 59 pending_messages_.push_back(message); |
| 138 } | 60 } |
| 139 } | 61 } |
| 140 | 62 |
| 141 bool done = false; | 63 base::WaitableEvent* events[2] = { shutdown_event_, &done_event }; |
| 142 bool shutdown = false; | 64 if (base::WaitableEvent::WaitMany(events, 2) == 1) { |
| 143 bool error = false; | |
| 144 scoped_refptr<mojo::SyncHandleRegistry> registry = | |
| 145 mojo::SyncHandleRegistry::current(); | |
| 146 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(), | |
| 147 MOJO_HANDLE_SIGNAL_READABLE, | |
| 148 base::Bind(&OnSyncHandleReady, &shutdown, &error)); | |
| 149 registry->RegisterHandle(done_event.GetHandle(), | |
| 150 MOJO_HANDLE_SIGNAL_READABLE, | |
| 151 base::Bind(&OnSyncHandleReady, &done, &error)); | |
| 152 | |
| 153 const bool* stop_flags[] = { &done, &shutdown }; | |
| 154 registry->WatchAllHandles(stop_flags, 2); | |
| 155 DCHECK(!error); | |
| 156 | |
| 157 if (done) { | |
| 158 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 65 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 159 "SyncMessageFilter::Send", &done_event); | 66 "SyncMessageFilter::Send", &done_event); |
| 160 } | 67 } |
| 161 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle()); | |
| 162 registry->UnregisterHandle(done_event.GetHandle()); | |
| 163 | 68 |
| 164 { | 69 { |
| 165 base::AutoLock auto_lock(lock_); | 70 base::AutoLock auto_lock(lock_); |
| 166 delete pending_message.deserializer; | 71 delete pending_message.deserializer; |
| 167 pending_sync_messages_.erase(&pending_message); | 72 pending_sync_messages_.erase(&pending_message); |
| 168 } | 73 } |
| 169 | 74 |
| 170 return pending_message.send_result; | 75 return pending_message.send_result; |
| 171 } | 76 } |
| 172 | 77 |
| 173 void SyncMessageFilter::OnFilterAdded(Sender* sender) { | 78 void SyncMessageFilter::OnFilterAdded(Sender* sender) { |
| 174 std::vector<std::unique_ptr<Message>> pending_messages; | 79 std::vector<Message*> pending_messages; |
| 175 { | 80 { |
| 176 base::AutoLock auto_lock(lock_); | 81 base::AutoLock auto_lock(lock_); |
| 177 sender_ = sender; | 82 sender_ = sender; |
| 178 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | 83 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); |
| 179 shutdown_watcher_.StartWatching( | 84 pending_messages_.release(&pending_messages); |
| 180 shutdown_event_, | |
| 181 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this)); | |
| 182 io_message_loop_observer_->StartOnIOThread(); | |
| 183 std::swap(pending_messages_, pending_messages); | |
| 184 } | 85 } |
| 185 for (auto& msg : pending_messages) | 86 for (auto* msg : pending_messages) |
| 186 SendOnIOThread(msg.release()); | 87 SendOnIOThread(msg); |
| 187 } | 88 } |
| 188 | 89 |
| 189 void SyncMessageFilter::OnChannelError() { | 90 void SyncMessageFilter::OnChannelError() { |
| 190 base::AutoLock auto_lock(lock_); | 91 base::AutoLock auto_lock(lock_); |
| 191 sender_ = NULL; | 92 sender_ = NULL; |
| 192 shutdown_watcher_.StopWatching(); | |
| 193 SignalAllEvents(); | 93 SignalAllEvents(); |
| 194 } | 94 } |
| 195 | 95 |
| 196 void SyncMessageFilter::OnChannelClosing() { | 96 void SyncMessageFilter::OnChannelClosing() { |
| 197 base::AutoLock auto_lock(lock_); | 97 base::AutoLock auto_lock(lock_); |
| 198 sender_ = NULL; | 98 sender_ = NULL; |
| 199 shutdown_watcher_.StopWatching(); | |
| 200 SignalAllEvents(); | 99 SignalAllEvents(); |
| 201 } | 100 } |
| 202 | 101 |
| 203 bool SyncMessageFilter::OnMessageReceived(const Message& message) { | 102 bool SyncMessageFilter::OnMessageReceived(const Message& message) { |
| 204 base::AutoLock auto_lock(lock_); | 103 base::AutoLock auto_lock(lock_); |
| 205 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 104 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
| 206 iter != pending_sync_messages_.end(); ++iter) { | 105 iter != pending_sync_messages_.end(); ++iter) { |
| 207 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { | 106 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { |
| 208 if (!message.is_reply_error()) { | 107 if (!message.is_reply_error()) { |
| 209 (*iter)->send_result = | 108 (*iter)->send_result = |
| 210 (*iter)->deserializer->SerializeOutputParameters(message); | 109 (*iter)->deserializer->SerializeOutputParameters(message); |
| 211 } | 110 } |
| 212 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 111 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 213 "SyncMessageFilter::OnMessageReceived", | 112 "SyncMessageFilter::OnMessageReceived", |
| 214 (*iter)->done_event); | 113 (*iter)->done_event); |
| 215 (*iter)->done_event->Signal(); | 114 (*iter)->done_event->Signal(); |
| 216 return true; | 115 return true; |
| 217 } | 116 } |
| 218 } | 117 } |
| 219 | 118 |
| 220 return false; | 119 return false; |
| 221 } | 120 } |
| 222 | 121 |
| 223 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event, | 122 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event, |
| 224 bool is_channel_send_thread_safe) | 123 bool is_channel_send_thread_safe) |
| 225 : sender_(NULL), | 124 : sender_(NULL), |
| 226 is_channel_send_thread_safe_(is_channel_send_thread_safe), | 125 is_channel_send_thread_safe_(is_channel_send_thread_safe), |
| 227 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 126 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 228 shutdown_event_(shutdown_event), | 127 shutdown_event_(shutdown_event) { |
| 229 weak_factory_(this) { | |
| 230 io_message_loop_observer_ = new IOMessageLoopObserver( | |
| 231 weak_factory_.GetWeakPtr(), listener_task_runner_); | |
| 232 } | 128 } |
| 233 | 129 |
| 234 SyncMessageFilter::~SyncMessageFilter() { | 130 SyncMessageFilter::~SyncMessageFilter() { |
| 235 io_message_loop_observer_->Stop(); | |
| 236 } | 131 } |
| 237 | 132 |
| 238 void SyncMessageFilter::SendOnIOThread(Message* message) { | 133 void SyncMessageFilter::SendOnIOThread(Message* message) { |
| 239 if (sender_) { | 134 if (sender_) { |
| 240 sender_->Send(message); | 135 sender_->Send(message); |
| 241 return; | 136 return; |
| 242 } | 137 } |
| 243 | 138 |
| 244 if (message->is_sync()) { | 139 if (message->is_sync()) { |
| 245 // We don't know which thread sent it, but it doesn't matter, just signal | 140 // We don't know which thread sent it, but it doesn't matter, just signal |
| 246 // them all. | 141 // them all. |
| 247 base::AutoLock auto_lock(lock_); | 142 base::AutoLock auto_lock(lock_); |
| 248 SignalAllEvents(); | 143 SignalAllEvents(); |
| 249 } | 144 } |
| 250 | 145 |
| 251 delete message; | 146 delete message; |
| 252 } | 147 } |
| 253 | 148 |
| 254 void SyncMessageFilter::SignalAllEvents() { | 149 void SyncMessageFilter::SignalAllEvents() { |
| 255 lock_.AssertAcquired(); | 150 lock_.AssertAcquired(); |
| 256 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 151 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
| 257 iter != pending_sync_messages_.end(); ++iter) { | 152 iter != pending_sync_messages_.end(); ++iter) { |
| 258 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 153 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 259 "SyncMessageFilter::SignalAllEvents", | 154 "SyncMessageFilter::SignalAllEvents", |
| 260 (*iter)->done_event); | 155 (*iter)->done_event); |
| 261 (*iter)->done_event->Signal(); | 156 (*iter)->done_event->Signal(); |
| 262 } | 157 } |
| 263 } | 158 } |
| 264 | 159 |
| 265 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) { | |
| 266 DCHECK_EQ(event, shutdown_event_); | |
| 267 shutdown_mojo_event_.Signal(); | |
| 268 } | |
| 269 | |
| 270 void SyncMessageFilter::OnIOMessageLoopDestroyed() { | |
| 271 // Since we use an async WaitableEventWatcher to watch the shutdown event | |
| 272 // from the IO thread, we can't forward the shutdown signal after the IO | |
| 273 // message loop is destroyed. Since that destruction indicates shutdown | |
| 274 // anyway, we manually signal the shutdown event in this case. | |
| 275 shutdown_mojo_event_.Signal(); | |
| 276 } | |
| 277 | |
| 278 } // namespace IPC | 160 } // namespace IPC |
| OLD | NEW |