OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
14 #include "base/location.h" | 14 #include "base/location.h" |
15 #include "base/logging.h" | 15 #include "base/logging.h" |
16 #include "base/macros.h" | |
17 #include "base/memory/ptr_util.h" | 16 #include "base/memory/ptr_util.h" |
18 #include "base/run_loop.h" | |
19 #include "base/synchronization/waitable_event.h" | 17 #include "base/synchronization/waitable_event.h" |
| 18 #include "base/synchronization/waitable_event_watcher.h" |
20 #include "base/threading/thread_local.h" | 19 #include "base/threading/thread_local.h" |
21 #include "base/threading/thread_task_runner_handle.h" | 20 #include "base/threading/thread_task_runner_handle.h" |
22 #include "base/trace_event/trace_event.h" | 21 #include "base/trace_event/trace_event.h" |
23 #include "ipc/ipc_channel_factory.h" | 22 #include "ipc/ipc_channel_factory.h" |
24 #include "ipc/ipc_logging.h" | 23 #include "ipc/ipc_logging.h" |
25 #include "ipc/ipc_message_macros.h" | 24 #include "ipc/ipc_message_macros.h" |
26 #include "ipc/ipc_sync_message.h" | 25 #include "ipc/ipc_sync_message.h" |
27 #include "ipc/mojo_event.h" | |
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | |
29 | 26 |
30 using base::TimeDelta; | 27 using base::TimeDelta; |
31 using base::TimeTicks; | 28 using base::TimeTicks; |
32 using base::WaitableEvent; | 29 using base::WaitableEvent; |
33 | 30 |
34 namespace IPC { | 31 namespace IPC { |
35 | |
36 namespace { | |
37 | |
38 // A lazy thread-local Mojo Event which is always signaled. Used to wake up the | |
39 // sync waiter when a SyncMessage requires the MessageLoop to be pumped while | |
40 // waiting for a reply. This object is created lazily and ref-counted so it can | |
41 // be cleaned up when no longer in use. | |
42 class PumpMessagesEvent { | |
43 public: | |
44 // Acquires the event for this thread. Creates a new instance if necessary. | |
45 static PumpMessagesEvent* Acquire() { | |
46 PumpMessagesEvent* pump_messages_event = g_event_.Pointer()->Get(); | |
47 if (!pump_messages_event) { | |
48 pump_messages_event = new PumpMessagesEvent; | |
49 pump_messages_event->event_.Signal(); | |
50 g_event_.Pointer()->Set(pump_messages_event); | |
51 } | |
52 pump_messages_event->ref_count_++; | |
53 return pump_messages_event; | |
54 } | |
55 | |
56 // Releases a handle to this event. There must be a 1:1 correspondence between | |
57 // calls to Acquire() and calls to Release(). | |
58 static void Release() { | |
59 PumpMessagesEvent* pump_messages_event = g_event_.Pointer()->Get(); | |
60 DCHECK(pump_messages_event); | |
61 DCHECK_GT(pump_messages_event->ref_count_, 0); | |
62 pump_messages_event->ref_count_--; | |
63 if (!pump_messages_event->ref_count_) { | |
64 g_event_.Pointer()->Set(nullptr); | |
65 delete pump_messages_event; | |
66 } | |
67 } | |
68 | |
69 const mojo::Handle& GetHandle() const { return event_.GetHandle(); } | |
70 | |
71 private: | |
72 PumpMessagesEvent() {} | |
73 ~PumpMessagesEvent() {} | |
74 | |
75 int ref_count_ = 0; | |
76 MojoEvent event_; | |
77 | |
78 static base::LazyInstance<base::ThreadLocalPointer<PumpMessagesEvent>> | |
79 g_event_; | |
80 | |
81 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); | |
82 }; | |
83 | |
84 // A generic callback used when watching handles synchronously. Sets |*signal| | |
85 // to true. Also sets |*error| to true in case of an error. | |
86 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | |
87 *signal = true; | |
88 *error = result != MOJO_RESULT_OK; | |
89 } | |
90 | |
91 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but | |
92 // is only used in cases where failure should be impossible) and runs | |
93 // |callback|. | |
94 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { | |
95 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); | |
96 if (result == MOJO_RESULT_OK) | |
97 callback.Run(); | |
98 } | |
99 | |
100 } // namespace | |
101 | |
102 base::LazyInstance<base::ThreadLocalPointer<PumpMessagesEvent>> | |
103 PumpMessagesEvent::g_event_ = LAZY_INSTANCE_INITIALIZER; | |
104 | |
105 // When we're blocked in a Send(), we need to process incoming synchronous | 32 // When we're blocked in a Send(), we need to process incoming synchronous |
106 // messages right away because it could be blocking our reply (either | 33 // messages right away because it could be blocking our reply (either |
107 // directly from the same object we're calling, or indirectly through one or | 34 // directly from the same object we're calling, or indirectly through one or |
108 // more other channels). That means that in SyncContext's OnMessageReceived, | 35 // more other channels). That means that in SyncContext's OnMessageReceived, |
109 // we need to process sync message right away if we're blocked. However a | 36 // we need to process sync message right away if we're blocked. However a |
110 // simple check isn't sufficient, because the listener thread can be in the | 37 // simple check isn't sufficient, because the listener thread can be in the |
111 // process of calling Send. | 38 // process of calling Send. |
112 // To work around this, when SyncChannel filters a sync message, it sets | 39 // To work around this, when SyncChannel filters a sync message, it sets |
113 // an event that the listener thread waits on during its Send() call. This | 40 // an event that the listener thread waits on during its Send() call. This |
114 // allows us to dispatch incoming sync messages when blocked. The race | 41 // allows us to dispatch incoming sync messages when blocked. The race |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
221 iter++; | 148 iter++; |
222 } | 149 } |
223 } | 150 } |
224 | 151 |
225 if (--listener_count_ == 0) { | 152 if (--listener_count_ == 0) { |
226 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 153 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
227 lazy_tls_ptr_.Pointer()->Set(NULL); | 154 lazy_tls_ptr_.Pointer()->Set(NULL); |
228 } | 155 } |
229 } | 156 } |
230 | 157 |
231 MojoEvent* dispatch_event() { return &dispatch_event_; } | 158 WaitableEvent* dispatch_event() { return &dispatch_event_; } |
232 base::SingleThreadTaskRunner* listener_task_runner() { | 159 base::SingleThreadTaskRunner* listener_task_runner() { |
233 return listener_task_runner_.get(); | 160 return listener_task_runner_.get(); |
234 } | 161 } |
235 | 162 |
236 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 163 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
237 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 164 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
238 lazy_tls_ptr_; | 165 lazy_tls_ptr_; |
239 | 166 |
240 // Called on the ipc thread to check if we can unblock any current Send() | 167 // Called on the ipc thread to check if we can unblock any current Send() |
241 // calls based on a queued reply. | 168 // calls based on a queued reply. |
242 void DispatchReplies() { | 169 void DispatchReplies() { |
243 for (size_t i = 0; i < received_replies_.size(); ++i) { | 170 for (size_t i = 0; i < received_replies_.size(); ++i) { |
244 Message* message = received_replies_[i].message; | 171 Message* message = received_replies_[i].message; |
245 if (received_replies_[i].context->TryToUnblockListener(message)) { | 172 if (received_replies_[i].context->TryToUnblockListener(message)) { |
246 delete message; | 173 delete message; |
247 received_replies_.erase(received_replies_.begin() + i); | 174 received_replies_.erase(received_replies_.begin() + i); |
248 return; | 175 return; |
249 } | 176 } |
250 } | 177 } |
251 } | 178 } |
252 | 179 |
253 mojo::Watcher* top_send_done_watcher() { | 180 base::WaitableEventWatcher* top_send_done_watcher() { |
254 return top_send_done_watcher_; | 181 return top_send_done_watcher_; |
255 } | 182 } |
256 | 183 |
257 void set_top_send_done_watcher(mojo::Watcher* watcher) { | 184 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { |
258 top_send_done_watcher_ = watcher; | 185 top_send_done_watcher_ = watcher; |
259 } | 186 } |
260 | 187 |
261 private: | 188 private: |
262 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 189 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
263 | 190 |
264 // See the comment in SyncChannel::SyncChannel for why this event is created | 191 // See the comment in SyncChannel::SyncChannel for why this event is created |
265 // as manual reset. | 192 // as manual reset. |
266 ReceivedSyncMsgQueue() | 193 ReceivedSyncMsgQueue() |
267 : message_queue_version_(0), | 194 : message_queue_version_(0), |
| 195 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| 196 base::WaitableEvent::InitialState::NOT_SIGNALED), |
268 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 197 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
269 task_pending_(false), | 198 task_pending_(false), |
270 listener_count_(0), | 199 listener_count_(0), |
271 top_send_done_watcher_(NULL) {} | 200 top_send_done_watcher_(NULL) {} |
272 | 201 |
273 ~ReceivedSyncMsgQueue() {} | 202 ~ReceivedSyncMsgQueue() {} |
274 | 203 |
275 // Holds information about a queued synchronous message or reply. | 204 // Holds information about a queued synchronous message or reply. |
276 struct QueuedMessage { | 205 struct QueuedMessage { |
277 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 206 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
278 Message* message; | 207 Message* message; |
279 scoped_refptr<SyncChannel::SyncContext> context; | 208 scoped_refptr<SyncChannel::SyncContext> context; |
280 }; | 209 }; |
281 | 210 |
282 typedef std::list<QueuedMessage> SyncMessageQueue; | 211 typedef std::list<QueuedMessage> SyncMessageQueue; |
283 SyncMessageQueue message_queue_; | 212 SyncMessageQueue message_queue_; |
284 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 213 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan |
285 | 214 |
286 std::vector<QueuedMessage> received_replies_; | 215 std::vector<QueuedMessage> received_replies_; |
287 | 216 |
288 // Signaled when we get a synchronous message that we must respond to, as the | 217 // Set when we got a synchronous message that we must respond to as the |
289 // sender needs its reply before it can reply to our original synchronous | 218 // sender needs its reply before it can reply to our original synchronous |
290 // message. | 219 // message. |
291 MojoEvent dispatch_event_; | 220 WaitableEvent dispatch_event_; |
292 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 221 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
293 base::Lock message_lock_; | 222 base::Lock message_lock_; |
294 bool task_pending_; | 223 bool task_pending_; |
295 int listener_count_; | 224 int listener_count_; |
296 | 225 |
297 // The current send done handle watcher for this thread. Used to maintain | 226 // The current send done event watcher for this thread. Used to maintain |
298 // a thread-local stack of send done watchers to ensure that nested sync | 227 // a local global stack of send done watchers to ensure that nested sync |
299 // message loops complete correctly. | 228 // message loops complete correctly. |
300 mojo::Watcher* top_send_done_watcher_; | 229 base::WaitableEventWatcher* top_send_done_watcher_; |
301 }; | 230 }; |
302 | 231 |
303 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 232 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
304 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 233 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
305 LAZY_INSTANCE_INITIALIZER; | 234 LAZY_INSTANCE_INITIALIZER; |
306 | 235 |
307 SyncChannel::SyncContext::SyncContext( | 236 SyncChannel::SyncContext::SyncContext( |
308 Listener* listener, | 237 Listener* listener, |
309 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 238 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
310 WaitableEvent* shutdown_event) | 239 WaitableEvent* shutdown_event) |
(...skipping 15 matching lines...) Expand all Loading... |
326 // Create the tracking information for this message. This object is stored | 255 // Create the tracking information for this message. This object is stored |
327 // by value since all members are pointers that are cheap to copy. These | 256 // by value since all members are pointers that are cheap to copy. These |
328 // pointers are cleaned up in the Pop() function. | 257 // pointers are cleaned up in the Pop() function. |
329 // | 258 // |
330 // The event is created as manual reset because in between Signal and | 259 // The event is created as manual reset because in between Signal and |
331 // OnObjectSignalled, another Send can happen which would stop the watcher | 260 // OnObjectSignalled, another Send can happen which would stop the watcher |
332 // from being called. The event would get watched later, when the nested | 261 // from being called. The event would get watched later, when the nested |
333 // Send completes, so the event will need to remain set. | 262 // Send completes, so the event will need to remain set. |
334 PendingSyncMsg pending( | 263 PendingSyncMsg pending( |
335 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), | 264 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
336 new MojoEvent); | 265 new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, |
| 266 base::WaitableEvent::InitialState::NOT_SIGNALED)); |
337 base::AutoLock auto_lock(deserializers_lock_); | 267 base::AutoLock auto_lock(deserializers_lock_); |
338 deserializers_.push_back(pending); | 268 deserializers_.push_back(pending); |
339 } | 269 } |
340 | 270 |
341 bool SyncChannel::SyncContext::Pop() { | 271 bool SyncChannel::SyncContext::Pop() { |
342 bool result; | 272 bool result; |
343 { | 273 { |
344 base::AutoLock auto_lock(deserializers_lock_); | 274 base::AutoLock auto_lock(deserializers_lock_); |
345 PendingSyncMsg msg = deserializers_.back(); | 275 PendingSyncMsg msg = deserializers_.back(); |
346 delete msg.deserializer; | 276 delete msg.deserializer; |
347 delete msg.done_event; | 277 delete msg.done_event; |
348 msg.done_event = nullptr; | 278 msg.done_event = NULL; |
349 deserializers_.pop_back(); | 279 deserializers_.pop_back(); |
350 result = msg.send_result; | 280 result = msg.send_result; |
351 } | 281 } |
352 | 282 |
353 // We got a reply to a synchronous Send() call that's blocking the listener | 283 // We got a reply to a synchronous Send() call that's blocking the listener |
354 // thread. However, further down the call stack there could be another | 284 // thread. However, further down the call stack there could be another |
355 // blocking Send() call, whose reply we received after we made this last | 285 // blocking Send() call, whose reply we received after we made this last |
356 // Send() call. So check if we have any queued replies available that | 286 // Send() call. So check if we have any queued replies available that |
357 // can now unblock the listener thread. | 287 // can now unblock the listener thread. |
358 ipc_task_runner()->PostTask( | 288 ipc_task_runner()->PostTask( |
359 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 289 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
360 received_sync_msgs_.get())); | 290 received_sync_msgs_.get())); |
361 | 291 |
362 return result; | 292 return result; |
363 } | 293 } |
364 | 294 |
365 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 295 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
366 base::AutoLock auto_lock(deserializers_lock_); | 296 base::AutoLock auto_lock(deserializers_lock_); |
367 return deserializers_.back().done_event; | 297 return deserializers_.back().done_event; |
368 } | 298 } |
369 | 299 |
370 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 300 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
371 return received_sync_msgs_->dispatch_event(); | 301 return received_sync_msgs_->dispatch_event(); |
372 } | 302 } |
373 | 303 |
374 void SyncChannel::SyncContext::DispatchMessages() { | 304 void SyncChannel::SyncContext::DispatchMessages() { |
375 received_sync_msgs_->DispatchMessages(this); | 305 received_sync_msgs_->DispatchMessages(this); |
376 } | 306 } |
377 | 307 |
378 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 308 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
379 base::AutoLock auto_lock(deserializers_lock_); | 309 base::AutoLock auto_lock(deserializers_lock_); |
380 if (deserializers_.empty() || | 310 if (deserializers_.empty() || |
381 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 311 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
382 return false; | 312 return false; |
383 } | 313 } |
384 | 314 |
385 if (!msg->is_reply_error()) { | 315 if (!msg->is_reply_error()) { |
386 bool send_result = deserializers_.back().deserializer-> | 316 bool send_result = deserializers_.back().deserializer-> |
387 SerializeOutputParameters(*msg); | 317 SerializeOutputParameters(*msg); |
388 deserializers_.back().send_result = send_result; | 318 deserializers_.back().send_result = send_result; |
389 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; | 319 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; |
390 } else { | 320 } else { |
391 DVLOG(1) << "Received error reply"; | 321 DVLOG(1) << "Received error reply"; |
392 } | 322 } |
393 | 323 |
394 MojoEvent* done_event = deserializers_.back().done_event; | 324 base::WaitableEvent* done_event = deserializers_.back().done_event; |
395 TRACE_EVENT_FLOW_BEGIN0( | 325 TRACE_EVENT_FLOW_BEGIN0( |
396 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 326 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
397 "SyncChannel::SyncContext::TryToUnblockListener", done_event); | 327 "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
398 | 328 |
399 done_event->Signal(); | 329 done_event->Signal(); |
400 | 330 |
401 return true; | 331 return true; |
402 } | 332 } |
403 | 333 |
404 void SyncChannel::SyncContext::Clear() { | 334 void SyncChannel::SyncContext::Clear() { |
(...skipping 25 matching lines...) Expand all Loading... |
430 | 360 |
431 void SyncChannel::SyncContext::OnChannelError() { | 361 void SyncChannel::SyncContext::OnChannelError() { |
432 CancelPendingSends(); | 362 CancelPendingSends(); |
433 shutdown_watcher_.StopWatching(); | 363 shutdown_watcher_.StopWatching(); |
434 Context::OnChannelError(); | 364 Context::OnChannelError(); |
435 } | 365 } |
436 | 366 |
437 void SyncChannel::SyncContext::OnChannelOpened() { | 367 void SyncChannel::SyncContext::OnChannelOpened() { |
438 shutdown_watcher_.StartWatching( | 368 shutdown_watcher_.StartWatching( |
439 shutdown_event_, | 369 shutdown_event_, |
440 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled, | 370 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, |
441 base::Unretained(this))); | 371 base::Unretained(this))); |
442 Context::OnChannelOpened(); | 372 Context::OnChannelOpened(); |
443 } | 373 } |
444 | 374 |
445 void SyncChannel::SyncContext::OnChannelClosed() { | 375 void SyncChannel::SyncContext::OnChannelClosed() { |
446 CancelPendingSends(); | 376 CancelPendingSends(); |
447 shutdown_watcher_.StopWatching(); | 377 shutdown_watcher_.StopWatching(); |
448 Context::OnChannelClosed(); | 378 Context::OnChannelClosed(); |
449 } | 379 } |
450 | 380 |
451 void SyncChannel::SyncContext::CancelPendingSends() { | 381 void SyncChannel::SyncContext::CancelPendingSends() { |
452 base::AutoLock auto_lock(deserializers_lock_); | 382 base::AutoLock auto_lock(deserializers_lock_); |
453 PendingSyncMessageQueue::iterator iter; | 383 PendingSyncMessageQueue::iterator iter; |
454 DVLOG(1) << "Canceling pending sends"; | 384 DVLOG(1) << "Canceling pending sends"; |
455 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { | 385 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
456 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 386 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
457 "SyncChannel::SyncContext::CancelPendingSends", | 387 "SyncChannel::SyncContext::CancelPendingSends", |
458 iter->done_event); | 388 iter->done_event); |
459 iter->done_event->Signal(); | 389 iter->done_event->Signal(); |
460 } | 390 } |
461 } | 391 } |
462 | 392 |
463 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) { | 393 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { |
464 DCHECK_EQ(event, shutdown_event_); | 394 if (event == shutdown_event_) { |
| 395 // Process shut down before we can get a reply to a synchronous message. |
| 396 // Cancel pending Send calls, which will end up setting the send done event. |
| 397 CancelPendingSends(); |
| 398 } else { |
| 399 // We got the reply, timed out or the process shutdown. |
| 400 DCHECK_EQ(GetSendDoneEvent(), event); |
| 401 base::MessageLoop::current()->QuitNow(); |
| 402 } |
| 403 } |
465 | 404 |
466 // Process shut down before we can get a reply to a synchronous message. | 405 base::WaitableEventWatcher::EventCallback |
467 // Cancel pending Send calls, which will end up setting the send done event. | 406 SyncChannel::SyncContext::MakeWaitableEventCallback() { |
468 CancelPendingSends(); | 407 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); |
469 } | 408 } |
470 | 409 |
471 // static | 410 // static |
472 std::unique_ptr<SyncChannel> SyncChannel::Create( | 411 std::unique_ptr<SyncChannel> SyncChannel::Create( |
473 const IPC::ChannelHandle& channel_handle, | 412 const IPC::ChannelHandle& channel_handle, |
474 Channel::Mode mode, | 413 Channel::Mode mode, |
475 Listener* listener, | 414 Listener* listener, |
476 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 415 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
477 bool create_pipe_now, | 416 bool create_pipe_now, |
478 base::WaitableEvent* shutdown_event) { | 417 base::WaitableEvent* shutdown_event) { |
(...skipping 23 matching lines...) Expand all Loading... |
502 WaitableEvent* shutdown_event) { | 441 WaitableEvent* shutdown_event) { |
503 return base::WrapUnique( | 442 return base::WrapUnique( |
504 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 443 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
505 } | 444 } |
506 | 445 |
507 SyncChannel::SyncChannel( | 446 SyncChannel::SyncChannel( |
508 Listener* listener, | 447 Listener* listener, |
509 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 448 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
510 WaitableEvent* shutdown_event) | 449 WaitableEvent* shutdown_event) |
511 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { | 450 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { |
512 // Keep a thread-local PumpMessagesEvent alive at least as long as any | |
513 // SyncChannel exists. This is balanced in the SyncChannel destructor below. | |
514 PumpMessagesEvent::Acquire(); | |
515 | |
516 // The current (listener) thread must be distinct from the IPC thread, or else | 451 // The current (listener) thread must be distinct from the IPC thread, or else |
517 // sending synchronous messages will deadlock. | 452 // sending synchronous messages will deadlock. |
518 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 453 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
519 StartWatching(); | 454 StartWatching(); |
520 } | 455 } |
521 | 456 |
522 SyncChannel::~SyncChannel() { | 457 SyncChannel::~SyncChannel() { |
523 PumpMessagesEvent::Release(); | |
524 } | 458 } |
525 | 459 |
526 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 460 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
527 sync_context()->set_restrict_dispatch_group(group); | 461 sync_context()->set_restrict_dispatch_group(group); |
528 } | 462 } |
529 | 463 |
530 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { | 464 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { |
531 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter( | 465 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter( |
532 sync_context()->shutdown_event(), | 466 sync_context()->shutdown_event(), |
533 sync_context()->IsChannelSendThreadSafe()); | 467 sync_context()->IsChannelSendThreadSafe()); |
(...skipping 20 matching lines...) Expand all Loading... |
554 | 488 |
555 // *this* might get deleted in WaitForReply. | 489 // *this* might get deleted in WaitForReply. |
556 scoped_refptr<SyncContext> context(sync_context()); | 490 scoped_refptr<SyncContext> context(sync_context()); |
557 if (context->shutdown_event()->IsSignaled()) { | 491 if (context->shutdown_event()->IsSignaled()) { |
558 DVLOG(1) << "shutdown event is signaled"; | 492 DVLOG(1) << "shutdown event is signaled"; |
559 delete message; | 493 delete message; |
560 return false; | 494 return false; |
561 } | 495 } |
562 | 496 |
563 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); | 497 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
564 bool pump_messages = sync_msg->ShouldPumpMessages(); | |
565 context->Push(sync_msg); | 498 context->Push(sync_msg); |
| 499 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); |
566 | 500 |
567 ChannelProxy::Send(message); | 501 ChannelProxy::Send(message); |
568 | 502 |
569 // Wait for reply, or for any other incoming synchronous messages. | 503 // Wait for reply, or for any other incoming synchronous messages. |
570 // *this* might get deleted, so only call static functions at this point. | 504 // *this* might get deleted, so only call static functions at this point. |
571 WaitForReply(context.get(), pump_messages); | 505 WaitForReply(context.get(), pump_messages_event); |
572 | 506 |
573 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 507 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
574 "SyncChannel::Send", context->GetSendDoneEvent()); | 508 "SyncChannel::Send", context->GetSendDoneEvent()); |
575 | 509 |
576 return context->Pop(); | 510 return context->Pop(); |
577 } | 511 } |
578 | 512 |
579 void SyncChannel::WaitForReply(SyncContext* context, bool pump_messages) { | 513 void SyncChannel::WaitForReply( |
| 514 SyncContext* context, WaitableEvent* pump_messages_event) { |
580 context->DispatchMessages(); | 515 context->DispatchMessages(); |
| 516 while (true) { |
| 517 WaitableEvent* objects[] = { |
| 518 context->GetDispatchEvent(), |
| 519 context->GetSendDoneEvent(), |
| 520 pump_messages_event |
| 521 }; |
581 | 522 |
582 PumpMessagesEvent* pump_messages_event = nullptr; | 523 unsigned count = pump_messages_event ? 3: 2; |
583 if (pump_messages) | 524 size_t result = WaitableEvent::WaitMany(objects, count); |
584 pump_messages_event = PumpMessagesEvent::Acquire(); | 525 if (result == 0 /* dispatch event */) { |
585 | |
586 scoped_refptr<mojo::SyncHandleRegistry> registry = | |
587 mojo::SyncHandleRegistry::current(); | |
588 | |
589 while (true) { | |
590 bool dispatch = false; | |
591 bool send_done = false; | |
592 bool should_pump_messages = false; | |
593 bool error = false; | |
594 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), | |
595 MOJO_HANDLE_SIGNAL_READABLE, | |
596 base::Bind(&OnSyncHandleReady, &dispatch, &error)); | |
597 registry->RegisterHandle( | |
598 context->GetSendDoneEvent()->GetHandle(), | |
599 MOJO_HANDLE_SIGNAL_READABLE, | |
600 base::Bind(&OnSyncHandleReady, &send_done, &error)); | |
601 if (pump_messages_event) { | |
602 registry->RegisterHandle( | |
603 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
604 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | |
605 } | |
606 | |
607 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | |
608 bool result = registry->WatchAllHandles(stop_flags, 3); | |
609 DCHECK(result); | |
610 DCHECK(!error); | |
611 | |
612 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); | |
613 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | |
614 if (pump_messages_event) | |
615 registry->UnregisterHandle(pump_messages_event->GetHandle()); | |
616 | |
617 if (dispatch) { | |
618 // We're waiting for a reply, but we received a blocking synchronous | 526 // We're waiting for a reply, but we received a blocking synchronous |
619 // call. We must process it or otherwise a deadlock might occur. | 527 // call. We must process it or otherwise a deadlock might occur. |
620 context->GetDispatchEvent()->Reset(); | 528 context->GetDispatchEvent()->Reset(); |
621 context->DispatchMessages(); | 529 context->DispatchMessages(); |
622 continue; | 530 continue; |
623 } | 531 } |
624 | 532 |
625 DCHECK(send_done || should_pump_messages); | 533 if (result == 2 /* pump_messages_event */) |
626 | |
627 if (should_pump_messages) | |
628 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 534 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
629 | 535 |
630 break; | 536 break; |
631 } | 537 } |
632 | |
633 if (pump_messages_event) | |
634 PumpMessagesEvent::Release(); | |
635 } | 538 } |
636 | 539 |
637 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 540 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
638 mojo::Watcher send_done_watcher; | 541 base::WaitableEventWatcher send_done_watcher; |
639 | 542 |
640 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 543 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
641 DCHECK_NE(sync_msg_queue, nullptr); | 544 DCHECK(sync_msg_queue != NULL); |
642 | 545 |
643 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); | 546 base::WaitableEventWatcher* old_send_done_event_watcher = |
644 mojo::Handle old_handle(mojo::kInvalidHandleValue); | 547 sync_msg_queue->top_send_done_watcher(); |
645 mojo::Watcher::ReadyCallback old_callback; | |
646 | 548 |
647 // Maintain a thread-local stack of watchers to ensure nested calls complete | 549 base::WaitableEventWatcher::EventCallback old_callback; |
648 // in the correct sequence, i.e. the outermost call completes first, etc. | 550 base::WaitableEvent* old_event = NULL; |
649 if (old_watcher) { | 551 |
650 old_callback = old_watcher->ready_callback(); | 552 // Maintain a local global stack of send done delegates to ensure that |
651 old_handle = old_watcher->handle(); | 553 // nested sync calls complete in the correct sequence, i.e. the |
652 old_watcher->Cancel(); | 554 // outermost call completes first, etc. |
| 555 if (old_send_done_event_watcher) { |
| 556 old_callback = old_send_done_event_watcher->callback(); |
| 557 old_event = old_send_done_event_watcher->GetWatchedEvent(); |
| 558 old_send_done_event_watcher->StopWatching(); |
653 } | 559 } |
654 | 560 |
655 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 561 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
656 | 562 |
| 563 send_done_watcher.StartWatching(context->GetSendDoneEvent(), |
| 564 context->MakeWaitableEventCallback()); |
| 565 |
657 { | 566 { |
658 base::RunLoop nested_loop; | |
659 send_done_watcher.Start( | |
660 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
661 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); | |
662 | |
663 base::MessageLoop::ScopedNestableTaskAllower allow( | 567 base::MessageLoop::ScopedNestableTaskAllower allow( |
664 base::MessageLoop::current()); | 568 base::MessageLoop::current()); |
665 nested_loop.Run(); | 569 base::MessageLoop::current()->Run(); |
666 send_done_watcher.Cancel(); | |
667 } | 570 } |
668 | 571 |
669 sync_msg_queue->set_top_send_done_watcher(old_watcher); | 572 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); |
670 if (old_watcher) | 573 if (old_send_done_event_watcher && old_event) { |
671 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); | 574 old_send_done_event_watcher->StartWatching(old_event, old_callback); |
| 575 } |
672 } | 576 } |
673 | 577 |
674 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 578 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { |
675 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); | 579 DCHECK(event == sync_context()->GetDispatchEvent()); |
676 if (result == MOJO_RESULT_OK) { | 580 // The call to DispatchMessages might delete this object, so reregister |
677 sync_context()->GetDispatchEvent()->Reset(); | 581 // the object watcher first. |
678 sync_context()->DispatchMessages(); | 582 event->Reset(); |
679 } | 583 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); |
| 584 sync_context()->DispatchMessages(); |
680 } | 585 } |
681 | 586 |
682 void SyncChannel::StartWatching() { | 587 void SyncChannel::StartWatching() { |
683 // Ideally we only want to watch this object when running a nested message | 588 // Ideally we only want to watch this object when running a nested message |
684 // loop. However, we don't know when it exits if there's another nested | 589 // loop. However, we don't know when it exits if there's another nested |
685 // message loop running under it or not, so we wouldn't know whether to | 590 // message loop running under it or not, so we wouldn't know whether to |
686 // stop or keep watching. So we always watch it. | 591 // stop or keep watching. So we always watch it, and create the event as |
687 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), | 592 // manual reset since the object watcher might otherwise reset the event |
688 MOJO_HANDLE_SIGNAL_READABLE, | 593 // when we're doing a WaitMany. |
689 base::Bind(&SyncChannel::OnDispatchHandleReady, | 594 dispatch_watcher_callback_ = |
690 base::Unretained(this))); | 595 base::Bind(&SyncChannel::OnWaitableEventSignaled, |
| 596 base::Unretained(this)); |
| 597 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), |
| 598 dispatch_watcher_callback_); |
691 } | 599 } |
692 | 600 |
693 void SyncChannel::OnChannelInit() { | 601 void SyncChannel::OnChannelInit() { |
694 for (const auto& filter : pre_init_sync_message_filters_) { | 602 for (const auto& filter : pre_init_sync_message_filters_) { |
695 filter->set_is_channel_send_thread_safe( | 603 filter->set_is_channel_send_thread_safe( |
696 context()->IsChannelSendThreadSafe()); | 604 context()->IsChannelSendThreadSafe()); |
697 } | 605 } |
698 pre_init_sync_message_filters_.clear(); | 606 pre_init_sync_message_filters_.clear(); |
699 } | 607 } |
700 | 608 |
(...skipping 23 matching lines...) Expand all Loading... |
724 TRACE_EVENT2("ipc", "SyncChannel::SendOnIPCThread", | 632 TRACE_EVENT2("ipc", "SyncChannel::SendOnIPCThread", |
725 "class", IPC_MESSAGE_ID_CLASS(message->type()), | 633 "class", IPC_MESSAGE_ID_CLASS(message->type()), |
726 "line", IPC_MESSAGE_ID_LINE(message->type())); | 634 "line", IPC_MESSAGE_ID_LINE(message->type())); |
727 #endif | 635 #endif |
728 if (!message->is_sync()) | 636 if (!message->is_sync()) |
729 return ChannelProxy::SendOnIPCThread(std::move(message)); | 637 return ChannelProxy::SendOnIPCThread(std::move(message)); |
730 return Send(message.release()); | 638 return Send(message.release()); |
731 } | 639 } |
732 | 640 |
733 } // namespace IPC | 641 } // namespace IPC |
OLD | NEW |