OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_message_filter.h" | 5 #include "ipc/ipc_sync_message_filter.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/location.h" | 8 #include "base/location.h" |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/macros.h" |
| 11 #include "base/memory/ptr_util.h" |
| 12 #include "base/memory/ref_counted.h" |
| 13 #include "base/message_loop/message_loop.h" |
10 #include "base/single_thread_task_runner.h" | 14 #include "base/single_thread_task_runner.h" |
11 #include "base/synchronization/waitable_event.h" | 15 #include "base/synchronization/waitable_event.h" |
12 #include "base/threading/thread_task_runner_handle.h" | 16 #include "base/threading/thread_task_runner_handle.h" |
13 #include "ipc/ipc_channel.h" | 17 #include "ipc/ipc_channel.h" |
14 #include "ipc/ipc_sync_message.h" | 18 #include "ipc/ipc_sync_message.h" |
| 19 #include "ipc/mojo_event.h" |
| 20 #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
15 | 21 |
16 namespace IPC { | 22 namespace IPC { |
17 | 23 |
| 24 namespace { |
| 25 |
| 26 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 27 // to true. Also sets |*error| to true in case of an error. |
| 28 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| 29 *signal = true; |
| 30 *error = result != MOJO_RESULT_OK; |
| 31 } |
| 32 |
| 33 } // namespace |
| 34 |
| 35 // A helper class created by SyncMessageFilter to watch the lifetime of the IO |
| 36 // MessageLoop. This holds a weak ref to the SyncMessageFilter and notifies it |
| 37 // on its own thread if the SyncMessageFilter is still alive at the time of |
| 38 // IO MessageLoop destruction. |
| 39 class SyncMessageFilter::IOMessageLoopObserver |
| 40 : public base::MessageLoop::DestructionObserver, |
| 41 public base::RefCountedThreadSafe<IOMessageLoopObserver> { |
| 42 public: |
| 43 IOMessageLoopObserver( |
| 44 base::WeakPtr<SyncMessageFilter> weak_filter, |
| 45 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner) |
| 46 : weak_filter_(weak_filter), filter_task_runner_(filter_task_runner) {} |
| 47 |
| 48 void StartOnIOThread() { |
| 49 DCHECK(!watching_); |
| 50 watching_ = true; |
| 51 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); |
| 52 base::MessageLoop::current()->AddDestructionObserver(this); |
| 53 } |
| 54 |
| 55 void Stop() { |
| 56 if (!io_task_runner_) |
| 57 return; |
| 58 |
| 59 if (io_task_runner_->BelongsToCurrentThread()) { |
| 60 StopOnIOThread(); |
| 61 } else { |
| 62 io_task_runner_->PostTask( |
| 63 FROM_HERE, base::Bind(&IOMessageLoopObserver::StopOnIOThread, this)); |
| 64 } |
| 65 } |
| 66 |
| 67 private: |
| 68 void StopOnIOThread() { |
| 69 DCHECK(io_task_runner_->BelongsToCurrentThread()); |
| 70 if (!watching_) |
| 71 return; |
| 72 watching_ = false; |
| 73 base::MessageLoop::current()->RemoveDestructionObserver(this); |
| 74 } |
| 75 |
| 76 // base::MessageLoop::DestructionObserver: |
| 77 void WillDestroyCurrentMessageLoop() override { |
| 78 DCHECK(io_task_runner_ && io_task_runner_->BelongsToCurrentThread()); |
| 79 DCHECK(watching_); |
| 80 StopOnIOThread(); |
| 81 filter_task_runner_->PostTask( |
| 82 FROM_HERE, |
| 83 base::Bind(&SyncMessageFilter::OnIOMessageLoopDestroyed, weak_filter_)); |
| 84 } |
| 85 |
| 86 friend class base::RefCountedThreadSafe<IOMessageLoopObserver>; |
| 87 |
| 88 ~IOMessageLoopObserver() override {} |
| 89 |
| 90 bool watching_ = false; |
| 91 base::WeakPtr<SyncMessageFilter> weak_filter_; |
| 92 scoped_refptr<base::SingleThreadTaskRunner> filter_task_runner_; |
| 93 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_; |
| 94 |
| 95 DISALLOW_COPY_AND_ASSIGN(IOMessageLoopObserver); |
| 96 }; |
| 97 |
18 bool SyncMessageFilter::Send(Message* message) { | 98 bool SyncMessageFilter::Send(Message* message) { |
19 if (!message->is_sync()) { | 99 if (!message->is_sync()) { |
20 { | 100 { |
21 base::AutoLock auto_lock(lock_); | 101 base::AutoLock auto_lock(lock_); |
22 if (sender_ && is_channel_send_thread_safe_) { | 102 if (sender_ && is_channel_send_thread_safe_) { |
23 sender_->Send(message); | 103 sender_->Send(message); |
24 return true; | 104 return true; |
25 } else if (!io_task_runner_.get()) { | 105 } else if (!io_task_runner_.get()) { |
26 pending_messages_.push_back(message); | 106 pending_messages_.emplace_back(base::WrapUnique(message)); |
27 return true; | 107 return true; |
28 } | 108 } |
29 } | 109 } |
30 io_task_runner_->PostTask( | 110 io_task_runner_->PostTask( |
31 FROM_HERE, | 111 FROM_HERE, |
32 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 112 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
33 return true; | 113 return true; |
34 } | 114 } |
35 | 115 |
36 base::WaitableEvent done_event( | 116 MojoEvent done_event; |
37 base::WaitableEvent::ResetPolicy::MANUAL, | |
38 base::WaitableEvent::InitialState::NOT_SIGNALED); | |
39 PendingSyncMsg pending_message( | 117 PendingSyncMsg pending_message( |
40 SyncMessage::GetMessageId(*message), | 118 SyncMessage::GetMessageId(*message), |
41 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), | 119 static_cast<SyncMessage*>(message)->GetReplyDeserializer(), |
42 &done_event); | 120 &done_event); |
43 | 121 |
44 { | 122 { |
45 base::AutoLock auto_lock(lock_); | 123 base::AutoLock auto_lock(lock_); |
46 // Can't use this class on the main thread or else it can lead to deadlocks. | 124 // Can't use this class on the main thread or else it can lead to deadlocks. |
47 // Also by definition, can't use this on IO thread since we're blocking it. | 125 // Also by definition, can't use this on IO thread since we're blocking it. |
48 if (base::ThreadTaskRunnerHandle::IsSet()) { | 126 if (base::ThreadTaskRunnerHandle::IsSet()) { |
49 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); | 127 DCHECK(base::ThreadTaskRunnerHandle::Get() != listener_task_runner_); |
50 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); | 128 DCHECK(base::ThreadTaskRunnerHandle::Get() != io_task_runner_); |
51 } | 129 } |
52 pending_sync_messages_.insert(&pending_message); | 130 pending_sync_messages_.insert(&pending_message); |
53 | 131 |
54 if (io_task_runner_.get()) { | 132 if (io_task_runner_.get()) { |
55 io_task_runner_->PostTask( | 133 io_task_runner_->PostTask( |
56 FROM_HERE, | 134 FROM_HERE, |
57 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); | 135 base::Bind(&SyncMessageFilter::SendOnIOThread, this, message)); |
58 } else { | 136 } else { |
59 pending_messages_.push_back(message); | 137 pending_messages_.emplace_back(base::WrapUnique(message)); |
60 } | 138 } |
61 } | 139 } |
62 | 140 |
63 base::WaitableEvent* events[2] = { shutdown_event_, &done_event }; | 141 bool done = false; |
64 if (base::WaitableEvent::WaitMany(events, 2) == 1) { | 142 bool shutdown = false; |
| 143 bool error = false; |
| 144 scoped_refptr<mojo::SyncHandleRegistry> registry = |
| 145 mojo::SyncHandleRegistry::current(); |
| 146 registry->RegisterHandle(shutdown_mojo_event_.GetHandle(), |
| 147 MOJO_HANDLE_SIGNAL_READABLE, |
| 148 base::Bind(&OnSyncHandleReady, &shutdown, &error)); |
| 149 registry->RegisterHandle(done_event.GetHandle(), |
| 150 MOJO_HANDLE_SIGNAL_READABLE, |
| 151 base::Bind(&OnSyncHandleReady, &done, &error)); |
| 152 |
| 153 const bool* stop_flags[] = { &done, &shutdown }; |
| 154 registry->WatchAllHandles(stop_flags, 2); |
| 155 DCHECK(!error); |
| 156 |
| 157 if (done) { |
65 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 158 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
66 "SyncMessageFilter::Send", &done_event); | 159 "SyncMessageFilter::Send", &done_event); |
67 } | 160 } |
| 161 registry->UnregisterHandle(shutdown_mojo_event_.GetHandle()); |
| 162 registry->UnregisterHandle(done_event.GetHandle()); |
68 | 163 |
69 { | 164 { |
70 base::AutoLock auto_lock(lock_); | 165 base::AutoLock auto_lock(lock_); |
71 delete pending_message.deserializer; | 166 delete pending_message.deserializer; |
72 pending_sync_messages_.erase(&pending_message); | 167 pending_sync_messages_.erase(&pending_message); |
73 } | 168 } |
74 | 169 |
75 return pending_message.send_result; | 170 return pending_message.send_result; |
76 } | 171 } |
77 | 172 |
78 void SyncMessageFilter::OnFilterAdded(Sender* sender) { | 173 void SyncMessageFilter::OnFilterAdded(Sender* sender) { |
79 std::vector<Message*> pending_messages; | 174 std::vector<std::unique_ptr<Message>> pending_messages; |
80 { | 175 { |
81 base::AutoLock auto_lock(lock_); | 176 base::AutoLock auto_lock(lock_); |
82 sender_ = sender; | 177 sender_ = sender; |
83 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | 178 io_task_runner_ = base::ThreadTaskRunnerHandle::Get(); |
84 pending_messages_.release(&pending_messages); | 179 shutdown_watcher_.StartWatching( |
| 180 shutdown_event_, |
| 181 base::Bind(&SyncMessageFilter::OnShutdownEventSignaled, this)); |
| 182 io_message_loop_observer_->StartOnIOThread(); |
| 183 std::swap(pending_messages_, pending_messages); |
85 } | 184 } |
86 for (auto* msg : pending_messages) | 185 for (auto& msg : pending_messages) |
87 SendOnIOThread(msg); | 186 SendOnIOThread(msg.release()); |
88 } | 187 } |
89 | 188 |
90 void SyncMessageFilter::OnChannelError() { | 189 void SyncMessageFilter::OnChannelError() { |
91 base::AutoLock auto_lock(lock_); | 190 base::AutoLock auto_lock(lock_); |
92 sender_ = NULL; | 191 sender_ = NULL; |
| 192 shutdown_watcher_.StopWatching(); |
93 SignalAllEvents(); | 193 SignalAllEvents(); |
94 } | 194 } |
95 | 195 |
96 void SyncMessageFilter::OnChannelClosing() { | 196 void SyncMessageFilter::OnChannelClosing() { |
97 base::AutoLock auto_lock(lock_); | 197 base::AutoLock auto_lock(lock_); |
98 sender_ = NULL; | 198 sender_ = NULL; |
| 199 shutdown_watcher_.StopWatching(); |
99 SignalAllEvents(); | 200 SignalAllEvents(); |
100 } | 201 } |
101 | 202 |
102 bool SyncMessageFilter::OnMessageReceived(const Message& message) { | 203 bool SyncMessageFilter::OnMessageReceived(const Message& message) { |
103 base::AutoLock auto_lock(lock_); | 204 base::AutoLock auto_lock(lock_); |
104 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 205 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
105 iter != pending_sync_messages_.end(); ++iter) { | 206 iter != pending_sync_messages_.end(); ++iter) { |
106 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { | 207 if (SyncMessage::IsMessageReplyTo(message, (*iter)->id)) { |
107 if (!message.is_reply_error()) { | 208 if (!message.is_reply_error()) { |
108 (*iter)->send_result = | 209 (*iter)->send_result = |
109 (*iter)->deserializer->SerializeOutputParameters(message); | 210 (*iter)->deserializer->SerializeOutputParameters(message); |
110 } | 211 } |
111 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 212 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
112 "SyncMessageFilter::OnMessageReceived", | 213 "SyncMessageFilter::OnMessageReceived", |
113 (*iter)->done_event); | 214 (*iter)->done_event); |
114 (*iter)->done_event->Signal(); | 215 (*iter)->done_event->Signal(); |
115 return true; | 216 return true; |
116 } | 217 } |
117 } | 218 } |
118 | 219 |
119 return false; | 220 return false; |
120 } | 221 } |
121 | 222 |
122 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event, | 223 SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event, |
123 bool is_channel_send_thread_safe) | 224 bool is_channel_send_thread_safe) |
124 : sender_(NULL), | 225 : sender_(NULL), |
125 is_channel_send_thread_safe_(is_channel_send_thread_safe), | 226 is_channel_send_thread_safe_(is_channel_send_thread_safe), |
126 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 227 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
127 shutdown_event_(shutdown_event) { | 228 shutdown_event_(shutdown_event), |
| 229 weak_factory_(this) { |
| 230 io_message_loop_observer_ = new IOMessageLoopObserver( |
| 231 weak_factory_.GetWeakPtr(), listener_task_runner_); |
128 } | 232 } |
129 | 233 |
130 SyncMessageFilter::~SyncMessageFilter() { | 234 SyncMessageFilter::~SyncMessageFilter() { |
| 235 io_message_loop_observer_->Stop(); |
131 } | 236 } |
132 | 237 |
133 void SyncMessageFilter::SendOnIOThread(Message* message) { | 238 void SyncMessageFilter::SendOnIOThread(Message* message) { |
134 if (sender_) { | 239 if (sender_) { |
135 sender_->Send(message); | 240 sender_->Send(message); |
136 return; | 241 return; |
137 } | 242 } |
138 | 243 |
139 if (message->is_sync()) { | 244 if (message->is_sync()) { |
140 // We don't know which thread sent it, but it doesn't matter, just signal | 245 // We don't know which thread sent it, but it doesn't matter, just signal |
141 // them all. | 246 // them all. |
142 base::AutoLock auto_lock(lock_); | 247 base::AutoLock auto_lock(lock_); |
143 SignalAllEvents(); | 248 SignalAllEvents(); |
144 } | 249 } |
145 | 250 |
146 delete message; | 251 delete message; |
147 } | 252 } |
148 | 253 |
149 void SyncMessageFilter::SignalAllEvents() { | 254 void SyncMessageFilter::SignalAllEvents() { |
150 lock_.AssertAcquired(); | 255 lock_.AssertAcquired(); |
151 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); | 256 for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); |
152 iter != pending_sync_messages_.end(); ++iter) { | 257 iter != pending_sync_messages_.end(); ++iter) { |
153 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 258 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
154 "SyncMessageFilter::SignalAllEvents", | 259 "SyncMessageFilter::SignalAllEvents", |
155 (*iter)->done_event); | 260 (*iter)->done_event); |
156 (*iter)->done_event->Signal(); | 261 (*iter)->done_event->Signal(); |
157 } | 262 } |
158 } | 263 } |
159 | 264 |
| 265 void SyncMessageFilter::OnShutdownEventSignaled(base::WaitableEvent* event) { |
| 266 DCHECK_EQ(event, shutdown_event_); |
| 267 shutdown_mojo_event_.Signal(); |
| 268 } |
| 269 |
| 270 void SyncMessageFilter::OnIOMessageLoopDestroyed() { |
| 271 // Since we use an async WaitableEventWatcher to watch the shutdown event |
| 272 // from the IO thread, we can't forward the shutdown signal after the IO |
| 273 // message loop is destroyed. Since that destruction indicates shutdown |
| 274 // anyway, we manually signal the shutdown event in this case. |
| 275 shutdown_mojo_event_.Signal(); |
| 276 } |
| 277 |
160 } // namespace IPC | 278 } // namespace IPC |
OLD | NEW |