OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
14 #include "base/location.h" | 14 #include "base/location.h" |
15 #include "base/logging.h" | 15 #include "base/logging.h" |
| 16 #include "base/macros.h" |
16 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
| 18 #include "base/run_loop.h" |
17 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
18 #include "base/synchronization/waitable_event_watcher.h" | |
19 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
20 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
21 #include "base/trace_event/trace_event.h" | 22 #include "base/trace_event/trace_event.h" |
22 #include "ipc/ipc_channel_factory.h" | 23 #include "ipc/ipc_channel_factory.h" |
23 #include "ipc/ipc_logging.h" | 24 #include "ipc/ipc_logging.h" |
24 #include "ipc/ipc_message_macros.h" | 25 #include "ipc/ipc_message_macros.h" |
25 #include "ipc/ipc_sync_message.h" | 26 #include "ipc/ipc_sync_message.h" |
| 27 #include "ipc/mojo_event.h" |
| 28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
26 | 29 |
27 using base::TimeDelta; | 30 using base::TimeDelta; |
28 using base::TimeTicks; | 31 using base::TimeTicks; |
29 using base::WaitableEvent; | 32 using base::WaitableEvent; |
30 | 33 |
31 namespace IPC { | 34 namespace IPC { |
| 35 |
| 36 namespace { |
| 37 |
| 38 // A lazy thread-local Mojo Event which is always signaled. Used to wake up the |
| 39 // sync waiter when a SyncMessage requires the MessageLoop to be pumped while |
| 40 // waiting for a reply. This object is created lazily and ref-counted so it can |
| 41 // be cleaned up when no longer in use. |
| 42 class PumpMessagesEvent { |
| 43 public: |
| 44 // Acquires the event for this thread. Creates a new instance if necessary. |
| 45 static PumpMessagesEvent* Acquire() { |
| 46 PumpMessagesEvent* pump_messages_event = g_event_.Pointer()->Get(); |
| 47 if (!pump_messages_event) { |
| 48 pump_messages_event = new PumpMessagesEvent; |
| 49 pump_messages_event->event_.Signal(); |
| 50 g_event_.Pointer()->Set(pump_messages_event); |
| 51 } |
| 52 pump_messages_event->ref_count_++; |
| 53 return pump_messages_event; |
| 54 } |
| 55 |
| 56 // Releases a handle to this event. There must be a 1:1 correspondence between |
| 57 // calls to Acquire() and calls to Release(). |
| 58 static void Release() { |
| 59 PumpMessagesEvent* pump_messages_event = g_event_.Pointer()->Get(); |
| 60 DCHECK(pump_messages_event); |
| 61 DCHECK_GT(pump_messages_event->ref_count_, 0); |
| 62 pump_messages_event->ref_count_--; |
| 63 if (!pump_messages_event->ref_count_) { |
| 64 g_event_.Pointer()->Set(nullptr); |
| 65 delete pump_messages_event; |
| 66 } |
| 67 } |
| 68 |
| 69 const mojo::Handle& GetHandle() const { return event_.GetHandle(); } |
| 70 |
| 71 private: |
| 72 PumpMessagesEvent() {} |
| 73 ~PumpMessagesEvent() {} |
| 74 |
| 75 int ref_count_ = 0; |
| 76 MojoEvent event_; |
| 77 |
| 78 static base::LazyInstance<base::ThreadLocalPointer<PumpMessagesEvent>> |
| 79 g_event_; |
| 80 |
| 81 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); |
| 82 }; |
| 83 |
| 84 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 85 // to true. Also sets |*error| to true in case of an error. |
| 86 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| 87 *signal = true; |
| 88 *error = result != MOJO_RESULT_OK; |
| 89 } |
| 90 |
| 91 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but |
| 92 // is only used in cases where failure should be impossible) and runs |
| 93 // |callback|. |
| 94 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
| 95 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); |
| 96 if (result == MOJO_RESULT_OK) |
| 97 callback.Run(); |
| 98 } |
| 99 |
| 100 } // namespace |
| 101 |
| 102 base::LazyInstance<base::ThreadLocalPointer<PumpMessagesEvent>> |
| 103 PumpMessagesEvent::g_event_ = LAZY_INSTANCE_INITIALIZER; |
| 104 |
32 // When we're blocked in a Send(), we need to process incoming synchronous | 105 // When we're blocked in a Send(), we need to process incoming synchronous |
33 // messages right away because it could be blocking our reply (either | 106 // messages right away because it could be blocking our reply (either |
34 // directly from the same object we're calling, or indirectly through one or | 107 // directly from the same object we're calling, or indirectly through one or |
35 // more other channels). That means that in SyncContext's OnMessageReceived, | 108 // more other channels). That means that in SyncContext's OnMessageReceived, |
36 // we need to process sync message right away if we're blocked. However a | 109 // we need to process sync message right away if we're blocked. However a |
37 // simple check isn't sufficient, because the listener thread can be in the | 110 // simple check isn't sufficient, because the listener thread can be in the |
38 // process of calling Send. | 111 // process of calling Send. |
39 // To work around this, when SyncChannel filters a sync message, it sets | 112 // To work around this, when SyncChannel filters a sync message, it sets |
40 // an event that the listener thread waits on during its Send() call. This | 113 // an event that the listener thread waits on during its Send() call. This |
41 // allows us to dispatch incoming sync messages when blocked. The race | 114 // allows us to dispatch incoming sync messages when blocked. The race |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
148 iter++; | 221 iter++; |
149 } | 222 } |
150 } | 223 } |
151 | 224 |
152 if (--listener_count_ == 0) { | 225 if (--listener_count_ == 0) { |
153 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 226 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
154 lazy_tls_ptr_.Pointer()->Set(NULL); | 227 lazy_tls_ptr_.Pointer()->Set(NULL); |
155 } | 228 } |
156 } | 229 } |
157 | 230 |
158 WaitableEvent* dispatch_event() { return &dispatch_event_; } | 231 MojoEvent* dispatch_event() { return &dispatch_event_; } |
159 base::SingleThreadTaskRunner* listener_task_runner() { | 232 base::SingleThreadTaskRunner* listener_task_runner() { |
160 return listener_task_runner_.get(); | 233 return listener_task_runner_.get(); |
161 } | 234 } |
162 | 235 |
163 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 236 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
164 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 237 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
165 lazy_tls_ptr_; | 238 lazy_tls_ptr_; |
166 | 239 |
167 // Called on the ipc thread to check if we can unblock any current Send() | 240 // Called on the ipc thread to check if we can unblock any current Send() |
168 // calls based on a queued reply. | 241 // calls based on a queued reply. |
169 void DispatchReplies() { | 242 void DispatchReplies() { |
170 for (size_t i = 0; i < received_replies_.size(); ++i) { | 243 for (size_t i = 0; i < received_replies_.size(); ++i) { |
171 Message* message = received_replies_[i].message; | 244 Message* message = received_replies_[i].message; |
172 if (received_replies_[i].context->TryToUnblockListener(message)) { | 245 if (received_replies_[i].context->TryToUnblockListener(message)) { |
173 delete message; | 246 delete message; |
174 received_replies_.erase(received_replies_.begin() + i); | 247 received_replies_.erase(received_replies_.begin() + i); |
175 return; | 248 return; |
176 } | 249 } |
177 } | 250 } |
178 } | 251 } |
179 | 252 |
180 base::WaitableEventWatcher* top_send_done_watcher() { | 253 mojo::Watcher* top_send_done_watcher() { |
181 return top_send_done_watcher_; | 254 return top_send_done_watcher_; |
182 } | 255 } |
183 | 256 |
184 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { | 257 void set_top_send_done_watcher(mojo::Watcher* watcher) { |
185 top_send_done_watcher_ = watcher; | 258 top_send_done_watcher_ = watcher; |
186 } | 259 } |
187 | 260 |
188 private: | 261 private: |
189 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 262 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
190 | 263 |
191 // See the comment in SyncChannel::SyncChannel for why this event is created | 264 // See the comment in SyncChannel::SyncChannel for why this event is created |
192 // as manual reset. | 265 // as manual reset. |
193 ReceivedSyncMsgQueue() | 266 ReceivedSyncMsgQueue() |
194 : message_queue_version_(0), | 267 : message_queue_version_(0), |
195 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, | |
196 base::WaitableEvent::InitialState::NOT_SIGNALED), | |
197 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 268 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
198 task_pending_(false), | 269 task_pending_(false), |
199 listener_count_(0), | 270 listener_count_(0), |
200 top_send_done_watcher_(NULL) {} | 271 top_send_done_watcher_(NULL) {} |
201 | 272 |
202 ~ReceivedSyncMsgQueue() {} | 273 ~ReceivedSyncMsgQueue() {} |
203 | 274 |
204 // Holds information about a queued synchronous message or reply. | 275 // Holds information about a queued synchronous message or reply. |
205 struct QueuedMessage { | 276 struct QueuedMessage { |
206 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 277 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
207 Message* message; | 278 Message* message; |
208 scoped_refptr<SyncChannel::SyncContext> context; | 279 scoped_refptr<SyncChannel::SyncContext> context; |
209 }; | 280 }; |
210 | 281 |
211 typedef std::list<QueuedMessage> SyncMessageQueue; | 282 typedef std::list<QueuedMessage> SyncMessageQueue; |
212 SyncMessageQueue message_queue_; | 283 SyncMessageQueue message_queue_; |
213 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 284 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan |
214 | 285 |
215 std::vector<QueuedMessage> received_replies_; | 286 std::vector<QueuedMessage> received_replies_; |
216 | 287 |
217 // Set when we got a synchronous message that we must respond to as the | 288 // Signaled when we get a synchronous message that we must respond to, as the |
218 // sender needs its reply before it can reply to our original synchronous | 289 // sender needs its reply before it can reply to our original synchronous |
219 // message. | 290 // message. |
220 WaitableEvent dispatch_event_; | 291 MojoEvent dispatch_event_; |
221 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 292 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
222 base::Lock message_lock_; | 293 base::Lock message_lock_; |
223 bool task_pending_; | 294 bool task_pending_; |
224 int listener_count_; | 295 int listener_count_; |
225 | 296 |
226 // The current send done event watcher for this thread. Used to maintain | 297 // The current send done handle watcher for this thread. Used to maintain |
227 // a local global stack of send done watchers to ensure that nested sync | 298 // a thread-local stack of send done watchers to ensure that nested sync |
228 // message loops complete correctly. | 299 // message loops complete correctly. |
229 base::WaitableEventWatcher* top_send_done_watcher_; | 300 mojo::Watcher* top_send_done_watcher_; |
230 }; | 301 }; |
231 | 302 |
232 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 303 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
233 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 304 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
234 LAZY_INSTANCE_INITIALIZER; | 305 LAZY_INSTANCE_INITIALIZER; |
235 | 306 |
236 SyncChannel::SyncContext::SyncContext( | 307 SyncChannel::SyncContext::SyncContext( |
237 Listener* listener, | 308 Listener* listener, |
238 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 309 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
239 WaitableEvent* shutdown_event) | 310 WaitableEvent* shutdown_event) |
(...skipping 15 matching lines...) Expand all Loading... |
255 // Create the tracking information for this message. This object is stored | 326 // Create the tracking information for this message. This object is stored |
256 // by value since all members are pointers that are cheap to copy. These | 327 // by value since all members are pointers that are cheap to copy. These |
257 // pointers are cleaned up in the Pop() function. | 328 // pointers are cleaned up in the Pop() function. |
258 // | 329 // |
259 // The event is created as manual reset because in between Signal and | 330 // The event is created as manual reset because in between Signal and |
260 // OnObjectSignalled, another Send can happen which would stop the watcher | 331 // OnObjectSignalled, another Send can happen which would stop the watcher |
261 // from being called. The event would get watched later, when the nested | 332 // from being called. The event would get watched later, when the nested |
262 // Send completes, so the event will need to remain set. | 333 // Send completes, so the event will need to remain set. |
263 PendingSyncMsg pending( | 334 PendingSyncMsg pending( |
264 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), | 335 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
265 new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, | 336 new MojoEvent); |
266 base::WaitableEvent::InitialState::NOT_SIGNALED)); | |
267 base::AutoLock auto_lock(deserializers_lock_); | 337 base::AutoLock auto_lock(deserializers_lock_); |
268 deserializers_.push_back(pending); | 338 deserializers_.push_back(pending); |
269 } | 339 } |
270 | 340 |
271 bool SyncChannel::SyncContext::Pop() { | 341 bool SyncChannel::SyncContext::Pop() { |
272 bool result; | 342 bool result; |
273 { | 343 { |
274 base::AutoLock auto_lock(deserializers_lock_); | 344 base::AutoLock auto_lock(deserializers_lock_); |
275 PendingSyncMsg msg = deserializers_.back(); | 345 PendingSyncMsg msg = deserializers_.back(); |
276 delete msg.deserializer; | 346 delete msg.deserializer; |
277 delete msg.done_event; | 347 delete msg.done_event; |
278 msg.done_event = NULL; | 348 msg.done_event = nullptr; |
279 deserializers_.pop_back(); | 349 deserializers_.pop_back(); |
280 result = msg.send_result; | 350 result = msg.send_result; |
281 } | 351 } |
282 | 352 |
283 // We got a reply to a synchronous Send() call that's blocking the listener | 353 // We got a reply to a synchronous Send() call that's blocking the listener |
284 // thread. However, further down the call stack there could be another | 354 // thread. However, further down the call stack there could be another |
285 // blocking Send() call, whose reply we received after we made this last | 355 // blocking Send() call, whose reply we received after we made this last |
286 // Send() call. So check if we have any queued replies available that | 356 // Send() call. So check if we have any queued replies available that |
287 // can now unblock the listener thread. | 357 // can now unblock the listener thread. |
288 ipc_task_runner()->PostTask( | 358 ipc_task_runner()->PostTask( |
289 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 359 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
290 received_sync_msgs_.get())); | 360 received_sync_msgs_.get())); |
291 | 361 |
292 return result; | 362 return result; |
293 } | 363 } |
294 | 364 |
295 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 365 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
296 base::AutoLock auto_lock(deserializers_lock_); | 366 base::AutoLock auto_lock(deserializers_lock_); |
297 return deserializers_.back().done_event; | 367 return deserializers_.back().done_event; |
298 } | 368 } |
299 | 369 |
300 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 370 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
301 return received_sync_msgs_->dispatch_event(); | 371 return received_sync_msgs_->dispatch_event(); |
302 } | 372 } |
303 | 373 |
304 void SyncChannel::SyncContext::DispatchMessages() { | 374 void SyncChannel::SyncContext::DispatchMessages() { |
305 received_sync_msgs_->DispatchMessages(this); | 375 received_sync_msgs_->DispatchMessages(this); |
306 } | 376 } |
307 | 377 |
308 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 378 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
309 base::AutoLock auto_lock(deserializers_lock_); | 379 base::AutoLock auto_lock(deserializers_lock_); |
310 if (deserializers_.empty() || | 380 if (deserializers_.empty() || |
311 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 381 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
312 return false; | 382 return false; |
313 } | 383 } |
314 | 384 |
315 if (!msg->is_reply_error()) { | 385 if (!msg->is_reply_error()) { |
316 bool send_result = deserializers_.back().deserializer-> | 386 bool send_result = deserializers_.back().deserializer-> |
317 SerializeOutputParameters(*msg); | 387 SerializeOutputParameters(*msg); |
318 deserializers_.back().send_result = send_result; | 388 deserializers_.back().send_result = send_result; |
319 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; | 389 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; |
320 } else { | 390 } else { |
321 DVLOG(1) << "Received error reply"; | 391 DVLOG(1) << "Received error reply"; |
322 } | 392 } |
323 | 393 |
324 base::WaitableEvent* done_event = deserializers_.back().done_event; | 394 MojoEvent* done_event = deserializers_.back().done_event; |
325 TRACE_EVENT_FLOW_BEGIN0( | 395 TRACE_EVENT_FLOW_BEGIN0( |
326 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 396 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
327 "SyncChannel::SyncContext::TryToUnblockListener", done_event); | 397 "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
328 | 398 |
329 done_event->Signal(); | 399 done_event->Signal(); |
330 | 400 |
331 return true; | 401 return true; |
332 } | 402 } |
333 | 403 |
334 void SyncChannel::SyncContext::Clear() { | 404 void SyncChannel::SyncContext::Clear() { |
(...skipping 25 matching lines...) Expand all Loading... |
360 | 430 |
361 void SyncChannel::SyncContext::OnChannelError() { | 431 void SyncChannel::SyncContext::OnChannelError() { |
362 CancelPendingSends(); | 432 CancelPendingSends(); |
363 shutdown_watcher_.StopWatching(); | 433 shutdown_watcher_.StopWatching(); |
364 Context::OnChannelError(); | 434 Context::OnChannelError(); |
365 } | 435 } |
366 | 436 |
367 void SyncChannel::SyncContext::OnChannelOpened() { | 437 void SyncChannel::SyncContext::OnChannelOpened() { |
368 shutdown_watcher_.StartWatching( | 438 shutdown_watcher_.StartWatching( |
369 shutdown_event_, | 439 shutdown_event_, |
370 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, | 440 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled, |
371 base::Unretained(this))); | 441 base::Unretained(this))); |
372 Context::OnChannelOpened(); | 442 Context::OnChannelOpened(); |
373 } | 443 } |
374 | 444 |
375 void SyncChannel::SyncContext::OnChannelClosed() { | 445 void SyncChannel::SyncContext::OnChannelClosed() { |
376 CancelPendingSends(); | 446 CancelPendingSends(); |
377 shutdown_watcher_.StopWatching(); | 447 shutdown_watcher_.StopWatching(); |
378 Context::OnChannelClosed(); | 448 Context::OnChannelClosed(); |
379 } | 449 } |
380 | 450 |
381 void SyncChannel::SyncContext::CancelPendingSends() { | 451 void SyncChannel::SyncContext::CancelPendingSends() { |
382 base::AutoLock auto_lock(deserializers_lock_); | 452 base::AutoLock auto_lock(deserializers_lock_); |
383 PendingSyncMessageQueue::iterator iter; | 453 PendingSyncMessageQueue::iterator iter; |
384 DVLOG(1) << "Canceling pending sends"; | 454 DVLOG(1) << "Canceling pending sends"; |
385 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { | 455 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
386 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 456 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
387 "SyncChannel::SyncContext::CancelPendingSends", | 457 "SyncChannel::SyncContext::CancelPendingSends", |
388 iter->done_event); | 458 iter->done_event); |
389 iter->done_event->Signal(); | 459 iter->done_event->Signal(); |
390 } | 460 } |
391 } | 461 } |
392 | 462 |
393 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { | 463 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) { |
394 if (event == shutdown_event_) { | 464 DCHECK_EQ(event, shutdown_event_); |
395 // Process shut down before we can get a reply to a synchronous message. | |
396 // Cancel pending Send calls, which will end up setting the send done event. | |
397 CancelPendingSends(); | |
398 } else { | |
399 // We got the reply, timed out or the process shutdown. | |
400 DCHECK_EQ(GetSendDoneEvent(), event); | |
401 base::MessageLoop::current()->QuitNow(); | |
402 } | |
403 } | |
404 | 465 |
405 base::WaitableEventWatcher::EventCallback | 466 // Process shut down before we can get a reply to a synchronous message. |
406 SyncChannel::SyncContext::MakeWaitableEventCallback() { | 467 // Cancel pending Send calls, which will end up setting the send done event. |
407 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); | 468 CancelPendingSends(); |
408 } | 469 } |
409 | 470 |
410 // static | 471 // static |
411 std::unique_ptr<SyncChannel> SyncChannel::Create( | 472 std::unique_ptr<SyncChannel> SyncChannel::Create( |
412 const IPC::ChannelHandle& channel_handle, | 473 const IPC::ChannelHandle& channel_handle, |
413 Channel::Mode mode, | 474 Channel::Mode mode, |
414 Listener* listener, | 475 Listener* listener, |
415 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 476 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
416 bool create_pipe_now, | 477 bool create_pipe_now, |
417 base::WaitableEvent* shutdown_event) { | 478 base::WaitableEvent* shutdown_event) { |
(...skipping 23 matching lines...) Expand all Loading... |
441 WaitableEvent* shutdown_event) { | 502 WaitableEvent* shutdown_event) { |
442 return base::WrapUnique( | 503 return base::WrapUnique( |
443 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 504 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
444 } | 505 } |
445 | 506 |
446 SyncChannel::SyncChannel( | 507 SyncChannel::SyncChannel( |
447 Listener* listener, | 508 Listener* listener, |
448 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 509 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
449 WaitableEvent* shutdown_event) | 510 WaitableEvent* shutdown_event) |
450 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { | 511 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { |
| 512 // Keep a thread-local PumpMessagesEvent alive at least as long as any |
| 513 // SyncChannel exists. This is balanced in the SyncChannel destructor below. |
| 514 PumpMessagesEvent::Acquire(); |
| 515 |
451 // The current (listener) thread must be distinct from the IPC thread, or else | 516 // The current (listener) thread must be distinct from the IPC thread, or else |
452 // sending synchronous messages will deadlock. | 517 // sending synchronous messages will deadlock. |
453 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 518 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
454 StartWatching(); | 519 StartWatching(); |
455 } | 520 } |
456 | 521 |
457 SyncChannel::~SyncChannel() { | 522 SyncChannel::~SyncChannel() { |
| 523 PumpMessagesEvent::Release(); |
458 } | 524 } |
459 | 525 |
460 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 526 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
461 sync_context()->set_restrict_dispatch_group(group); | 527 sync_context()->set_restrict_dispatch_group(group); |
462 } | 528 } |
463 | 529 |
464 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { | 530 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { |
465 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter( | 531 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter( |
466 sync_context()->shutdown_event(), | 532 sync_context()->shutdown_event(), |
467 sync_context()->IsChannelSendThreadSafe()); | 533 sync_context()->IsChannelSendThreadSafe()); |
(...skipping 20 matching lines...) Expand all Loading... |
488 | 554 |
489 // *this* might get deleted in WaitForReply. | 555 // *this* might get deleted in WaitForReply. |
490 scoped_refptr<SyncContext> context(sync_context()); | 556 scoped_refptr<SyncContext> context(sync_context()); |
491 if (context->shutdown_event()->IsSignaled()) { | 557 if (context->shutdown_event()->IsSignaled()) { |
492 DVLOG(1) << "shutdown event is signaled"; | 558 DVLOG(1) << "shutdown event is signaled"; |
493 delete message; | 559 delete message; |
494 return false; | 560 return false; |
495 } | 561 } |
496 | 562 |
497 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); | 563 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| 564 bool pump_messages = sync_msg->ShouldPumpMessages(); |
498 context->Push(sync_msg); | 565 context->Push(sync_msg); |
499 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); | |
500 | 566 |
501 ChannelProxy::Send(message); | 567 ChannelProxy::Send(message); |
502 | 568 |
503 // Wait for reply, or for any other incoming synchronous messages. | 569 // Wait for reply, or for any other incoming synchronous messages. |
504 // *this* might get deleted, so only call static functions at this point. | 570 // *this* might get deleted, so only call static functions at this point. |
505 WaitForReply(context.get(), pump_messages_event); | 571 WaitForReply(context.get(), pump_messages); |
506 | 572 |
507 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 573 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
508 "SyncChannel::Send", context->GetSendDoneEvent()); | 574 "SyncChannel::Send", context->GetSendDoneEvent()); |
509 | 575 |
510 return context->Pop(); | 576 return context->Pop(); |
511 } | 577 } |
512 | 578 |
513 void SyncChannel::WaitForReply( | 579 void SyncChannel::WaitForReply(SyncContext* context, bool pump_messages) { |
514 SyncContext* context, WaitableEvent* pump_messages_event) { | |
515 context->DispatchMessages(); | 580 context->DispatchMessages(); |
| 581 |
| 582 PumpMessagesEvent* pump_messages_event = nullptr; |
| 583 if (pump_messages) |
| 584 pump_messages_event = PumpMessagesEvent::Acquire(); |
| 585 |
| 586 scoped_refptr<mojo::SyncHandleRegistry> registry = |
| 587 mojo::SyncHandleRegistry::current(); |
| 588 |
516 while (true) { | 589 while (true) { |
517 WaitableEvent* objects[] = { | 590 bool dispatch = false; |
518 context->GetDispatchEvent(), | 591 bool send_done = false; |
519 context->GetSendDoneEvent(), | 592 bool should_pump_messages = false; |
520 pump_messages_event | 593 bool error = false; |
521 }; | 594 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), |
| 595 MOJO_HANDLE_SIGNAL_READABLE, |
| 596 base::Bind(&OnSyncHandleReady, &dispatch, &error)); |
| 597 registry->RegisterHandle( |
| 598 context->GetSendDoneEvent()->GetHandle(), |
| 599 MOJO_HANDLE_SIGNAL_READABLE, |
| 600 base::Bind(&OnSyncHandleReady, &send_done, &error)); |
| 601 if (pump_messages_event) { |
| 602 registry->RegisterHandle( |
| 603 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 604 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| 605 } |
522 | 606 |
523 unsigned count = pump_messages_event ? 3: 2; | 607 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
524 size_t result = WaitableEvent::WaitMany(objects, count); | 608 bool result = registry->WatchAllHandles(stop_flags, 3); |
525 if (result == 0 /* dispatch event */) { | 609 DCHECK(result); |
| 610 DCHECK(!error); |
| 611 |
| 612 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); |
| 613 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| 614 if (pump_messages_event) |
| 615 registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| 616 |
| 617 if (dispatch) { |
526 // We're waiting for a reply, but we received a blocking synchronous | 618 // We're waiting for a reply, but we received a blocking synchronous |
527 // call. We must process it or otherwise a deadlock might occur. | 619 // call. We must process it or otherwise a deadlock might occur. |
528 context->GetDispatchEvent()->Reset(); | 620 context->GetDispatchEvent()->Reset(); |
529 context->DispatchMessages(); | 621 context->DispatchMessages(); |
530 continue; | 622 continue; |
531 } | 623 } |
532 | 624 |
533 if (result == 2 /* pump_messages_event */) | 625 DCHECK(send_done || should_pump_messages); |
| 626 |
| 627 if (should_pump_messages) |
534 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 628 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
535 | 629 |
536 break; | 630 break; |
537 } | 631 } |
| 632 |
| 633 if (pump_messages_event) |
| 634 PumpMessagesEvent::Release(); |
538 } | 635 } |
539 | 636 |
540 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 637 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
541 base::WaitableEventWatcher send_done_watcher; | 638 mojo::Watcher send_done_watcher; |
542 | 639 |
543 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 640 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
544 DCHECK(sync_msg_queue != NULL); | 641 DCHECK_NE(sync_msg_queue, nullptr); |
545 | 642 |
546 base::WaitableEventWatcher* old_send_done_event_watcher = | 643 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
547 sync_msg_queue->top_send_done_watcher(); | 644 mojo::Handle old_handle(mojo::kInvalidHandleValue); |
| 645 mojo::Watcher::ReadyCallback old_callback; |
548 | 646 |
549 base::WaitableEventWatcher::EventCallback old_callback; | 647 // Maintain a thread-local stack of watchers to ensure nested calls complete |
550 base::WaitableEvent* old_event = NULL; | 648 // in the correct sequence, i.e. the outermost call completes first, etc. |
551 | 649 if (old_watcher) { |
552 // Maintain a local global stack of send done delegates to ensure that | 650 old_callback = old_watcher->ready_callback(); |
553 // nested sync calls complete in the correct sequence, i.e. the | 651 old_handle = old_watcher->handle(); |
554 // outermost call completes first, etc. | 652 old_watcher->Cancel(); |
555 if (old_send_done_event_watcher) { | |
556 old_callback = old_send_done_event_watcher->callback(); | |
557 old_event = old_send_done_event_watcher->GetWatchedEvent(); | |
558 old_send_done_event_watcher->StopWatching(); | |
559 } | 653 } |
560 | 654 |
561 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 655 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
562 | 656 |
563 send_done_watcher.StartWatching(context->GetSendDoneEvent(), | 657 { |
564 context->MakeWaitableEventCallback()); | 658 base::RunLoop nested_loop; |
| 659 send_done_watcher.Start( |
| 660 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 661 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
565 | 662 |
566 { | |
567 base::MessageLoop::ScopedNestableTaskAllower allow( | 663 base::MessageLoop::ScopedNestableTaskAllower allow( |
568 base::MessageLoop::current()); | 664 base::MessageLoop::current()); |
569 base::MessageLoop::current()->Run(); | 665 nested_loop.Run(); |
| 666 send_done_watcher.Cancel(); |
570 } | 667 } |
571 | 668 |
572 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); | 669 sync_msg_queue->set_top_send_done_watcher(old_watcher); |
573 if (old_send_done_event_watcher && old_event) { | 670 if (old_watcher) |
574 old_send_done_event_watcher->StartWatching(old_event, old_callback); | 671 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
575 } | |
576 } | 672 } |
577 | 673 |
578 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { | 674 void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
579 DCHECK(event == sync_context()->GetDispatchEvent()); | 675 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); |
580 // The call to DispatchMessages might delete this object, so reregister | 676 if (result == MOJO_RESULT_OK) { |
581 // the object watcher first. | 677 sync_context()->GetDispatchEvent()->Reset(); |
582 event->Reset(); | 678 sync_context()->DispatchMessages(); |
583 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); | 679 } |
584 sync_context()->DispatchMessages(); | |
585 } | 680 } |
586 | 681 |
587 void SyncChannel::StartWatching() { | 682 void SyncChannel::StartWatching() { |
588 // Ideally we only want to watch this object when running a nested message | 683 // Ideally we only want to watch this object when running a nested message |
589 // loop. However, we don't know when it exits if there's another nested | 684 // loop. However, we don't know when it exits if there's another nested |
590 // message loop running under it or not, so we wouldn't know whether to | 685 // message loop running under it or not, so we wouldn't know whether to |
591 // stop or keep watching. So we always watch it, and create the event as | 686 // stop or keep watching. So we always watch it. |
592 // manual reset since the object watcher might otherwise reset the event | 687 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
593 // when we're doing a WaitMany. | 688 MOJO_HANDLE_SIGNAL_READABLE, |
594 dispatch_watcher_callback_ = | 689 base::Bind(&SyncChannel::OnDispatchHandleReady, |
595 base::Bind(&SyncChannel::OnWaitableEventSignaled, | 690 base::Unretained(this))); |
596 base::Unretained(this)); | |
597 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), | |
598 dispatch_watcher_callback_); | |
599 } | 691 } |
600 | 692 |
601 void SyncChannel::OnChannelInit() { | 693 void SyncChannel::OnChannelInit() { |
602 for (const auto& filter : pre_init_sync_message_filters_) { | 694 for (const auto& filter : pre_init_sync_message_filters_) { |
603 filter->set_is_channel_send_thread_safe( | 695 filter->set_is_channel_send_thread_safe( |
604 context()->IsChannelSendThreadSafe()); | 696 context()->IsChannelSendThreadSafe()); |
605 } | 697 } |
606 pre_init_sync_message_filters_.clear(); | 698 pre_init_sync_message_filters_.clear(); |
607 } | 699 } |
608 | 700 |
(...skipping 23 matching lines...) Expand all Loading... |
632 TRACE_EVENT2("ipc", "SyncChannel::SendOnIPCThread", | 724 TRACE_EVENT2("ipc", "SyncChannel::SendOnIPCThread", |
633 "class", IPC_MESSAGE_ID_CLASS(message->type()), | 725 "class", IPC_MESSAGE_ID_CLASS(message->type()), |
634 "line", IPC_MESSAGE_ID_LINE(message->type())); | 726 "line", IPC_MESSAGE_ID_LINE(message->type())); |
635 #endif | 727 #endif |
636 if (!message->is_sync()) | 728 if (!message->is_sync()) |
637 return ChannelProxy::SendOnIPCThread(std::move(message)); | 729 return ChannelProxy::SendOnIPCThread(std::move(message)); |
638 return Send(message.release()); | 730 return Send(message.release()); |
639 } | 731 } |
640 | 732 |
641 } // namespace IPC | 733 } // namespace IPC |
OLD | NEW |