OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
14 #include "base/location.h" | 14 #include "base/location.h" |
15 #include "base/logging.h" | 15 #include "base/logging.h" |
16 #include "base/macros.h" | 16 #include "base/macros.h" |
17 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
18 #include "base/run_loop.h" | 18 #include "base/run_loop.h" |
19 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
20 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
21 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
22 #include "base/trace_event/trace_event.h" | 22 #include "base/trace_event/trace_event.h" |
23 #include "ipc/ipc_channel_factory.h" | 23 #include "ipc/ipc_channel_factory.h" |
24 #include "ipc/ipc_logging.h" | 24 #include "ipc/ipc_logging.h" |
25 #include "ipc/ipc_message_macros.h" | 25 #include "ipc/ipc_message_macros.h" |
26 #include "ipc/ipc_sync_message.h" | 26 #include "ipc/ipc_sync_message.h" |
27 #include "ipc/mojo_event.h" | 27 #include "mojo/public/cpp/bindings/sync_event_watcher.h" |
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | |
29 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | |
30 | 28 |
31 using base::WaitableEvent; | 29 using base::WaitableEvent; |
32 | 30 |
33 namespace IPC { | 31 namespace IPC { |
34 | 32 |
35 namespace { | 33 namespace { |
36 | 34 |
37 // A generic callback used when watching handles synchronously. Sets |*signal| | 35 // A generic callback used when watching handles synchronously. Sets |*signal| |
38 // to true. Also sets |*error| to true in case of an error. | 36 // to true. |
39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 37 void OnEventReady(bool* signal) { |
40 *signal = true; | 38 *signal = true; |
41 *error = result != MOJO_RESULT_OK; | |
42 } | 39 } |
43 | 40 |
44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result | 41 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky |
45 // (DCHECKs, but is only used in cases where failure should be impossible) and | 42 g_pump_messages_event = LAZY_INSTANCE_INITIALIZER; |
46 // runs |callback|. | |
47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { | |
48 DCHECK_EQ(result, MOJO_RESULT_OK); | |
49 callback.Run(); | |
50 } | |
51 | |
52 class PumpMessagesEvent { | |
53 public: | |
54 PumpMessagesEvent() { event_.Signal(); } | |
55 ~PumpMessagesEvent() {} | |
56 | |
57 const MojoEvent* event() const { return &event_; } | |
58 | |
59 private: | |
60 MojoEvent event_; | |
61 | |
62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); | |
63 }; | |
64 | |
65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event = | |
66 LAZY_INSTANCE_INITIALIZER; | |
67 | 43 |
68 } // namespace | 44 } // namespace |
69 | 45 |
70 // When we're blocked in a Send(), we need to process incoming synchronous | 46 // When we're blocked in a Send(), we need to process incoming synchronous |
71 // messages right away because it could be blocking our reply (either | 47 // messages right away because it could be blocking our reply (either |
72 // directly from the same object we're calling, or indirectly through one or | 48 // directly from the same object we're calling, or indirectly through one or |
73 // more other channels). That means that in SyncContext's OnMessageReceived, | 49 // more other channels). That means that in SyncContext's OnMessageReceived, |
74 // we need to process sync message right away if we're blocked. However a | 50 // we need to process sync message right away if we're blocked. However a |
75 // simple check isn't sufficient, because the listener thread can be in the | 51 // simple check isn't sufficient, because the listener thread can be in the |
76 // process of calling Send. | 52 // process of calling Send. |
77 // To work around this, when SyncChannel filters a sync message, it sets | 53 // To work around this, when SyncChannel filters a sync message, it sets |
78 // an event that the listener thread waits on during its Send() call. This | 54 // an event that the listener thread waits on during its Send() call. This |
79 // allows us to dispatch incoming sync messages when blocked. The race | 55 // allows us to dispatch incoming sync messages when blocked. The race |
80 // condition is handled because if Send is in the process of being called, it | 56 // condition is handled because if Send is in the process of being called, it |
81 // will check the event. In case the listener thread isn't sending a message, | 57 // will check the event. In case the listener thread isn't sending a message, |
82 // we queue a task on the listener thread to dispatch the received messages. | 58 // we queue a task on the listener thread to dispatch the received messages. |
83 // The messages are stored in this queue object that's shared among all | 59 // The messages are stored in this queue object that's shared among all |
84 // SyncChannel objects on the same thread (since one object can receive a | 60 // SyncChannel objects on the same thread (since one object can receive a |
85 // sync message while another one is blocked). | 61 // sync message while another one is blocked). |
86 | 62 |
87 class SyncChannel::ReceivedSyncMsgQueue : | 63 class SyncChannel::ReceivedSyncMsgQueue : |
88 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { | 64 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { |
89 public: | 65 public: |
| 66 // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we |
| 67 // may nest waiting message loops arbitrarily deep on the SyncChannel's |
| 68 // thread. Every such operation has a corresponding WaitableEvent to be |
| 69 // watched which, when signalled for IPC completion, breaks out of the loop. |
| 70 // A reference to the innermost (i.e. topmost) watcher is held in |
| 71 // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|. |
| 72 // |
| 73 // NestedSendDoneWatcher provides a simple scoper which is used by |
| 74 // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done" |
| 75 // event, preserving the previous topmost state on the local stack until the |
| 76 // new inner loop is broken. If yet another subsequent nested loop is started |
| 77 // therein the process is repeated again in the new inner stack frame, and so |
| 78 // on. |
| 79 // |
| 80 // When this object is destroyed on stack unwind, the previous topmost state |
| 81 // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|, |
| 82 // and its watch is resumed immediately. |
| 83 class NestedSendDoneWatcher { |
| 84 public: |
| 85 NestedSendDoneWatcher(SyncChannel::SyncContext* context, |
| 86 base::RunLoop* run_loop) |
| 87 : sync_msg_queue_(context->received_sync_msgs()), |
| 88 outer_state_(sync_msg_queue_->top_send_done_event_watcher_), |
| 89 event_(context->GetSendDoneEvent()), |
| 90 callback_( |
| 91 base::Bind(&SyncChannel::SyncContext::OnSendDoneEventSignaled, |
| 92 context, |
| 93 run_loop)) { |
| 94 sync_msg_queue_->top_send_done_event_watcher_ = this; |
| 95 if (outer_state_) |
| 96 outer_state_->StopWatching(); |
| 97 StartWatching(); |
| 98 } |
| 99 |
| 100 ~NestedSendDoneWatcher() { |
| 101 sync_msg_queue_->top_send_done_event_watcher_ = outer_state_; |
| 102 if (outer_state_) |
| 103 outer_state_->StartWatching(); |
| 104 } |
| 105 |
| 106 private: |
| 107 void StartWatching() { watcher_.StartWatching(event_, callback_); } |
| 108 void StopWatching() { watcher_.StopWatching(); } |
| 109 |
| 110 ReceivedSyncMsgQueue* const sync_msg_queue_; |
| 111 NestedSendDoneWatcher* const outer_state_; |
| 112 |
| 113 base::WaitableEvent* const event_; |
| 114 const base::WaitableEventWatcher::EventCallback callback_; |
| 115 base::WaitableEventWatcher watcher_; |
| 116 |
| 117 DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher); |
| 118 }; |
| 119 |
90 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one | 120 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one |
91 // if necessary. Call RemoveContext on the same thread when done. | 121 // if necessary. Call RemoveContext on the same thread when done. |
92 static ReceivedSyncMsgQueue* AddContext() { | 122 static ReceivedSyncMsgQueue* AddContext() { |
93 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple | 123 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple |
94 // SyncChannel objects can block the same thread). | 124 // SyncChannel objects can block the same thread). |
95 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); | 125 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); |
96 if (!rv) { | 126 if (!rv) { |
97 rv = new ReceivedSyncMsgQueue(); | 127 rv = new ReceivedSyncMsgQueue(); |
98 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); | 128 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); |
99 } | 129 } |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
201 } | 231 } |
202 } | 232 } |
203 | 233 |
204 if (--listener_count_ == 0) { | 234 if (--listener_count_ == 0) { |
205 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 235 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
206 lazy_tls_ptr_.Pointer()->Set(nullptr); | 236 lazy_tls_ptr_.Pointer()->Set(nullptr); |
207 sync_dispatch_watcher_.reset(); | 237 sync_dispatch_watcher_.reset(); |
208 } | 238 } |
209 } | 239 } |
210 | 240 |
211 MojoEvent* dispatch_event() { return &dispatch_event_; } | 241 base::WaitableEvent* dispatch_event() { return &dispatch_event_; } |
212 base::SingleThreadTaskRunner* listener_task_runner() { | 242 base::SingleThreadTaskRunner* listener_task_runner() { |
213 return listener_task_runner_.get(); | 243 return listener_task_runner_.get(); |
214 } | 244 } |
215 | 245 |
216 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 246 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
217 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: | 247 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: |
218 DestructorAtExit lazy_tls_ptr_; | 248 DestructorAtExit lazy_tls_ptr_; |
219 | 249 |
220 // Called on the ipc thread to check if we can unblock any current Send() | 250 // Called on the ipc thread to check if we can unblock any current Send() |
221 // calls based on a queued reply. | 251 // calls based on a queued reply. |
222 void DispatchReplies() { | 252 void DispatchReplies() { |
223 for (size_t i = 0; i < received_replies_.size(); ++i) { | 253 for (size_t i = 0; i < received_replies_.size(); ++i) { |
224 Message* message = received_replies_[i].message; | 254 Message* message = received_replies_[i].message; |
225 if (received_replies_[i].context->TryToUnblockListener(message)) { | 255 if (received_replies_[i].context->TryToUnblockListener(message)) { |
226 delete message; | 256 delete message; |
227 received_replies_.erase(received_replies_.begin() + i); | 257 received_replies_.erase(received_replies_.begin() + i); |
228 return; | 258 return; |
229 } | 259 } |
230 } | 260 } |
231 } | 261 } |
232 | 262 |
233 mojo::SimpleWatcher* top_send_done_watcher() { | |
234 return top_send_done_watcher_; | |
235 } | |
236 | |
237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { | |
238 top_send_done_watcher_ = watcher; | |
239 } | |
240 | |
241 private: | 263 private: |
242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 264 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
243 | 265 |
244 // See the comment in SyncChannel::SyncChannel for why this event is created | 266 // See the comment in SyncChannel::SyncChannel for why this event is created |
245 // as manual reset. | 267 // as manual reset. |
246 ReceivedSyncMsgQueue() | 268 ReceivedSyncMsgQueue() |
247 : message_queue_version_(0), | 269 : message_queue_version_(0), |
| 270 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| 271 base::WaitableEvent::InitialState::NOT_SIGNALED), |
248 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 272 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
249 task_pending_(false), | 273 sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>( |
250 listener_count_(0), | 274 &dispatch_event_, |
251 top_send_done_watcher_(nullptr) { | 275 base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady, |
252 sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( | 276 base::Unretained(this)))) { |
253 dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
254 base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, | |
255 base::Unretained(this)))); | |
256 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 277 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
257 } | 278 } |
258 | 279 |
259 ~ReceivedSyncMsgQueue() {} | 280 ~ReceivedSyncMsgQueue() {} |
260 | 281 |
261 void OnDispatchHandleReady(MojoResult result) { | 282 void OnDispatchEventReady() { |
262 if (result != MOJO_RESULT_OK) | |
263 return; | |
264 | |
265 if (dispatch_flag_) { | 283 if (dispatch_flag_) { |
266 *dispatch_flag_ = true; | 284 *dispatch_flag_ = true; |
267 return; | 285 return; |
268 } | 286 } |
269 | 287 |
270 // We were woken up during a sync wait, but no specific SyncChannel is | 288 // We were woken up during a sync wait, but no specific SyncChannel is |
271 // currently waiting. i.e., some other Mojo interface on this thread is | 289 // currently waiting. i.e., some other Mojo interface on this thread is |
272 // waiting for a response. Since we don't support anything analogous to | 290 // waiting for a response. Since we don't support anything analogous to |
273 // restricted dispatch on Mojo interfaces, in this case it's safe to | 291 // restricted dispatch on Mojo interfaces, in this case it's safe to |
274 // dispatch sync messages for any context. | 292 // dispatch sync messages for any context. |
275 DispatchMessages(nullptr); | 293 DispatchMessages(nullptr); |
276 } | 294 } |
277 | 295 |
278 // Holds information about a queued synchronous message or reply. | 296 // Holds information about a queued synchronous message or reply. |
279 struct QueuedMessage { | 297 struct QueuedMessage { |
280 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 298 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
281 Message* message; | 299 Message* message; |
282 scoped_refptr<SyncChannel::SyncContext> context; | 300 scoped_refptr<SyncChannel::SyncContext> context; |
283 }; | 301 }; |
284 | 302 |
285 typedef std::list<QueuedMessage> SyncMessageQueue; | 303 typedef std::list<QueuedMessage> SyncMessageQueue; |
286 SyncMessageQueue message_queue_; | 304 SyncMessageQueue message_queue_; |
287 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 305 |
| 306 // Used to signal DispatchMessages to rescan |
| 307 uint32_t message_queue_version_ = 0; |
288 | 308 |
289 std::vector<QueuedMessage> received_replies_; | 309 std::vector<QueuedMessage> received_replies_; |
290 | 310 |
291 // Signaled when we get a synchronous message that we must respond to, as the | 311 // Signaled when we get a synchronous message that we must respond to, as the |
292 // sender needs its reply before it can reply to our original synchronous | 312 // sender needs its reply before it can reply to our original synchronous |
293 // message. | 313 // message. |
294 MojoEvent dispatch_event_; | 314 base::WaitableEvent dispatch_event_; |
295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 315 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
296 base::Lock message_lock_; | 316 base::Lock message_lock_; |
297 bool task_pending_; | 317 bool task_pending_ = false; |
298 int listener_count_; | 318 int listener_count_ = 0; |
299 | 319 |
300 // The current send done handle watcher for this thread. Used to maintain | 320 // The current NestedSendDoneWatcher for this thread, if we're currently |
301 // a thread-local stack of send done watchers to ensure that nested sync | 321 // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See |
302 // message loops complete correctly. | 322 // NestedSendDoneWatcher comments for more details. |
303 mojo::SimpleWatcher* top_send_done_watcher_; | 323 NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr; |
304 | 324 |
305 // If not null, the address of a flag to set when the dispatch event signals, | 325 // If not null, the address of a flag to set when the dispatch event signals, |
306 // in lieu of actually dispatching messages. This is used by | 326 // in lieu of actually dispatching messages. This is used by |
307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're | 327 // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
308 // allowed to process while it's waiting. | 328 // allowed to process while it's waiting. |
309 bool* dispatch_flag_ = nullptr; | 329 bool* dispatch_flag_ = nullptr; |
310 | 330 |
311 // Watches |dispatch_event_| during all sync handle watches on this thread. | 331 // Watches |dispatch_event_| during all sync handle watches on this thread. |
312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; | 332 std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_; |
313 }; | 333 }; |
314 | 334 |
315 base::LazyInstance<base::ThreadLocalPointer< | 335 base::LazyInstance<base::ThreadLocalPointer< |
316 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit | 336 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit |
317 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 337 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
318 LAZY_INSTANCE_INITIALIZER; | 338 LAZY_INSTANCE_INITIALIZER; |
319 | 339 |
320 SyncChannel::SyncContext::SyncContext( | 340 SyncChannel::SyncContext::SyncContext( |
321 Listener* listener, | 341 Listener* listener, |
322 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 342 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
323 WaitableEvent* shutdown_event) | 343 WaitableEvent* shutdown_event) |
324 : ChannelProxy::Context(listener, ipc_task_runner), | 344 : ChannelProxy::Context(listener, ipc_task_runner), |
325 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 345 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
326 shutdown_event_(shutdown_event), | 346 shutdown_event_(shutdown_event), |
327 restrict_dispatch_group_(kRestrictDispatchGroup_None) { | 347 restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
328 } | 348 } |
329 | 349 |
| 350 void SyncChannel::SyncContext::OnSendDoneEventSignaled( |
| 351 base::RunLoop* nested_loop, |
| 352 base::WaitableEvent* event) { |
| 353 DCHECK_EQ(GetSendDoneEvent(), event); |
| 354 nested_loop->Quit(); |
| 355 } |
| 356 |
330 SyncChannel::SyncContext::~SyncContext() { | 357 SyncChannel::SyncContext::~SyncContext() { |
331 while (!deserializers_.empty()) | 358 while (!deserializers_.empty()) |
332 Pop(); | 359 Pop(); |
333 } | 360 } |
334 | 361 |
335 // Adds information about an outgoing sync message to the context so that | 362 // Adds information about an outgoing sync message to the context so that |
336 // we know how to deserialize the reply. Returns |true| if the message was added | 363 // we know how to deserialize the reply. Returns |true| if the message was added |
337 // to the context or |false| if it was rejected (e.g. due to shutdown.) | 364 // to the context or |false| if it was rejected (e.g. due to shutdown.) |
338 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { | 365 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
339 // Create the tracking information for this message. This object is stored | 366 // Create the tracking information for this message. This object is stored |
340 // by value since all members are pointers that are cheap to copy. These | 367 // by value since all members are pointers that are cheap to copy. These |
341 // pointers are cleaned up in the Pop() function. | 368 // pointers are cleaned up in the Pop() function. |
342 // | 369 // |
343 // The event is created as manual reset because in between Signal and | 370 // The event is created as manual reset because in between Signal and |
344 // OnObjectSignalled, another Send can happen which would stop the watcher | 371 // OnObjectSignalled, another Send can happen which would stop the watcher |
345 // from being called. The event would get watched later, when the nested | 372 // from being called. The event would get watched later, when the nested |
346 // Send completes, so the event will need to remain set. | 373 // Send completes, so the event will need to remain set. |
347 base::AutoLock auto_lock(deserializers_lock_); | 374 base::AutoLock auto_lock(deserializers_lock_); |
348 if (reject_new_deserializers_) | 375 if (reject_new_deserializers_) |
349 return false; | 376 return false; |
350 PendingSyncMsg pending( | 377 PendingSyncMsg pending( |
351 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), | 378 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
352 new MojoEvent); | 379 new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, |
| 380 base::WaitableEvent::InitialState::NOT_SIGNALED)); |
353 deserializers_.push_back(pending); | 381 deserializers_.push_back(pending); |
354 return true; | 382 return true; |
355 } | 383 } |
356 | 384 |
357 bool SyncChannel::SyncContext::Pop() { | 385 bool SyncChannel::SyncContext::Pop() { |
358 bool result; | 386 bool result; |
359 { | 387 { |
360 base::AutoLock auto_lock(deserializers_lock_); | 388 base::AutoLock auto_lock(deserializers_lock_); |
361 PendingSyncMsg msg = deserializers_.back(); | 389 PendingSyncMsg msg = deserializers_.back(); |
362 delete msg.deserializer; | 390 delete msg.deserializer; |
363 delete msg.done_event; | 391 delete msg.done_event; |
364 msg.done_event = nullptr; | 392 msg.done_event = nullptr; |
365 deserializers_.pop_back(); | 393 deserializers_.pop_back(); |
366 result = msg.send_result; | 394 result = msg.send_result; |
367 } | 395 } |
368 | 396 |
369 // We got a reply to a synchronous Send() call that's blocking the listener | 397 // We got a reply to a synchronous Send() call that's blocking the listener |
370 // thread. However, further down the call stack there could be another | 398 // thread. However, further down the call stack there could be another |
371 // blocking Send() call, whose reply we received after we made this last | 399 // blocking Send() call, whose reply we received after we made this last |
372 // Send() call. So check if we have any queued replies available that | 400 // Send() call. So check if we have any queued replies available that |
373 // can now unblock the listener thread. | 401 // can now unblock the listener thread. |
374 ipc_task_runner()->PostTask( | 402 ipc_task_runner()->PostTask( |
375 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 403 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
376 received_sync_msgs_)); | 404 received_sync_msgs_)); |
377 | 405 |
378 return result; | 406 return result; |
379 } | 407 } |
380 | 408 |
381 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 409 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
382 base::AutoLock auto_lock(deserializers_lock_); | 410 base::AutoLock auto_lock(deserializers_lock_); |
383 return deserializers_.back().done_event; | 411 return deserializers_.back().done_event; |
384 } | 412 } |
385 | 413 |
386 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 414 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
387 return received_sync_msgs_->dispatch_event(); | 415 return received_sync_msgs_->dispatch_event(); |
388 } | 416 } |
389 | 417 |
390 void SyncChannel::SyncContext::DispatchMessages() { | 418 void SyncChannel::SyncContext::DispatchMessages() { |
391 received_sync_msgs_->DispatchMessages(this); | 419 received_sync_msgs_->DispatchMessages(this); |
392 } | 420 } |
393 | 421 |
394 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 422 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
395 base::AutoLock auto_lock(deserializers_lock_); | 423 base::AutoLock auto_lock(deserializers_lock_); |
396 if (deserializers_.empty() || | 424 if (deserializers_.empty() || |
397 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 425 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
398 return false; | 426 return false; |
399 } | 427 } |
400 | 428 |
401 if (!msg->is_reply_error()) { | 429 if (!msg->is_reply_error()) { |
402 bool send_result = deserializers_.back().deserializer-> | 430 bool send_result = deserializers_.back().deserializer-> |
403 SerializeOutputParameters(*msg); | 431 SerializeOutputParameters(*msg); |
404 deserializers_.back().send_result = send_result; | 432 deserializers_.back().send_result = send_result; |
405 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; | 433 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; |
406 } else { | 434 } else { |
407 DVLOG(1) << "Received error reply"; | 435 DVLOG(1) << "Received error reply"; |
408 } | 436 } |
409 | 437 |
410 MojoEvent* done_event = deserializers_.back().done_event; | 438 base::WaitableEvent* done_event = deserializers_.back().done_event; |
411 TRACE_EVENT_FLOW_BEGIN0( | 439 TRACE_EVENT_FLOW_BEGIN0( |
412 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 440 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
413 "SyncChannel::SyncContext::TryToUnblockListener", done_event); | 441 "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
414 | 442 |
415 done_event->Signal(); | 443 done_event->Signal(); |
416 | 444 |
417 return true; | 445 return true; |
418 } | 446 } |
419 | 447 |
420 void SyncChannel::SyncContext::Clear() { | 448 void SyncChannel::SyncContext::Clear() { |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
519 WaitableEvent* shutdown_event) { | 547 WaitableEvent* shutdown_event) { |
520 return base::WrapUnique( | 548 return base::WrapUnique( |
521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 549 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
522 } | 550 } |
523 | 551 |
524 SyncChannel::SyncChannel( | 552 SyncChannel::SyncChannel( |
525 Listener* listener, | 553 Listener* listener, |
526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 554 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
527 WaitableEvent* shutdown_event) | 555 WaitableEvent* shutdown_event) |
528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), | 556 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), | 557 sync_handle_registry_(mojo::SyncHandleRegistry::current()) { |
530 dispatch_watcher_(FROM_HERE, | |
531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { | |
532 // The current (listener) thread must be distinct from the IPC thread, or else | 558 // The current (listener) thread must be distinct from the IPC thread, or else |
533 // sending synchronous messages will deadlock. | 559 // sending synchronous messages will deadlock. |
534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 560 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
535 StartWatching(); | 561 StartWatching(); |
536 } | 562 } |
537 | 563 |
538 SyncChannel::~SyncChannel() { | 564 SyncChannel::~SyncChannel() { |
539 } | 565 } |
540 | 566 |
541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 567 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
589 "SyncChannel::Send", context->GetSendDoneEvent()); | 615 "SyncChannel::Send", context->GetSendDoneEvent()); |
590 | 616 |
591 return context->Pop(); | 617 return context->Pop(); |
592 } | 618 } |
593 | 619 |
594 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, | 620 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
595 SyncContext* context, | 621 SyncContext* context, |
596 bool pump_messages) { | 622 bool pump_messages) { |
597 context->DispatchMessages(); | 623 context->DispatchMessages(); |
598 | 624 |
599 const MojoEvent* pump_messages_event = nullptr; | 625 base::WaitableEvent* pump_messages_event = nullptr; |
600 if (pump_messages) | 626 if (pump_messages) { |
601 pump_messages_event = g_pump_messages_event.Get().event(); | 627 if (!g_pump_messages_event.Get()) { |
| 628 g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>( |
| 629 base::WaitableEvent::ResetPolicy::MANUAL, |
| 630 base::WaitableEvent::InitialState::SIGNALED); |
| 631 } |
| 632 pump_messages_event = g_pump_messages_event.Get().get(); |
| 633 } |
602 | 634 |
603 while (true) { | 635 while (true) { |
604 bool dispatch = false; | 636 bool dispatch = false; |
605 bool send_done = false; | 637 bool send_done = false; |
606 bool should_pump_messages = false; | 638 bool should_pump_messages = false; |
607 bool error = false; | 639 bool registered = registry->RegisterEvent( |
608 bool registered = registry->RegisterHandle( | 640 context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done)); |
609 context->GetSendDoneEvent()->GetHandle(), | |
610 MOJO_HANDLE_SIGNAL_READABLE, | |
611 base::Bind(&OnSyncHandleReady, &send_done, &error)); | |
612 DCHECK(registered); | 641 DCHECK(registered); |
| 642 |
613 if (pump_messages_event) { | 643 if (pump_messages_event) { |
614 registered = registry->RegisterHandle( | 644 registered = registry->RegisterEvent( |
615 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | 645 pump_messages_event, |
616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | 646 base::Bind(&OnEventReady, &should_pump_messages)); |
617 DCHECK(registered); | 647 DCHECK(registered); |
618 } | 648 } |
619 | 649 |
620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | 650 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
621 context->received_sync_msgs()->BlockDispatch(&dispatch); | 651 context->received_sync_msgs()->BlockDispatch(&dispatch); |
622 registry->WatchAllHandles(stop_flags, 3); | 652 registry->Wait(stop_flags, 3); |
623 context->received_sync_msgs()->UnblockDispatch(); | 653 context->received_sync_msgs()->UnblockDispatch(); |
624 DCHECK(!error); | |
625 | 654 |
626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | 655 registry->UnregisterEvent(context->GetSendDoneEvent()); |
627 if (pump_messages_event) | 656 if (pump_messages_event) |
628 registry->UnregisterHandle(pump_messages_event->GetHandle()); | 657 registry->UnregisterEvent(pump_messages_event); |
629 | 658 |
630 if (dispatch) { | 659 if (dispatch) { |
631 // We're waiting for a reply, but we received a blocking synchronous call. | 660 // We're waiting for a reply, but we received a blocking synchronous call. |
632 // We must process it to avoid potential deadlocks. | 661 // We must process it to avoid potential deadlocks. |
633 context->GetDispatchEvent()->Reset(); | 662 context->GetDispatchEvent()->Reset(); |
634 context->DispatchMessages(); | 663 context->DispatchMessages(); |
635 continue; | 664 continue; |
636 } | 665 } |
637 | 666 |
638 if (should_pump_messages) | 667 if (should_pump_messages) |
639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 668 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
640 | 669 |
641 break; | 670 break; |
642 } | 671 } |
643 } | 672 } |
644 | 673 |
645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 674 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
646 mojo::SimpleWatcher send_done_watcher( | 675 base::MessageLoop::ScopedNestableTaskAllower allow( |
647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); | 676 base::MessageLoop::current()); |
648 | 677 base::RunLoop nested_loop; |
649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 678 ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(context, &nested_loop); |
650 DCHECK_NE(sync_msg_queue, nullptr); | 679 nested_loop.Run(); |
651 | |
652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); | |
653 mojo::Handle old_handle(mojo::kInvalidHandleValue); | |
654 mojo::SimpleWatcher::ReadyCallback old_callback; | |
655 | |
656 // Maintain a thread-local stack of watchers to ensure nested calls complete | |
657 // in the correct sequence, i.e. the outermost call completes first, etc. | |
658 if (old_watcher) { | |
659 old_callback = old_watcher->ready_callback(); | |
660 old_handle = old_watcher->handle(); | |
661 old_watcher->Cancel(); | |
662 } | |
663 | |
664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | |
665 | |
666 { | |
667 base::RunLoop nested_loop; | |
668 send_done_watcher.Watch( | |
669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); | |
671 | |
672 base::MessageLoop::ScopedNestableTaskAllower allow( | |
673 base::MessageLoop::current()); | |
674 nested_loop.Run(); | |
675 send_done_watcher.Cancel(); | |
676 } | |
677 | |
678 sync_msg_queue->set_top_send_done_watcher(old_watcher); | |
679 if (old_watcher) | |
680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); | |
681 } | 680 } |
682 | 681 |
683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 682 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) { |
684 DCHECK_EQ(result, MOJO_RESULT_OK); | 683 DCHECK_EQ(sync_context()->GetDispatchEvent(), event); |
685 sync_context()->GetDispatchEvent()->Reset(); | 684 sync_context()->GetDispatchEvent()->Reset(); |
| 685 |
| 686 StartWatching(); |
| 687 |
| 688 // NOTE: May delete |this|. |
686 sync_context()->DispatchMessages(); | 689 sync_context()->DispatchMessages(); |
687 } | 690 } |
688 | 691 |
689 void SyncChannel::StartWatching() { | 692 void SyncChannel::StartWatching() { |
690 // |dispatch_watcher_| watches the event asynchronously, only dispatching | 693 // |dispatch_watcher_| watches the event asynchronously, only dispatching |
691 // messages once the listener thread is unblocked and pumping its task queue. | 694 // messages once the listener thread is unblocked and pumping its task queue. |
692 // The ReceivedSyncMsgQueue also watches this event and may dispatch | 695 // The ReceivedSyncMsgQueue also watches this event and may dispatch |
693 // immediately if woken up by a message which it's allowed to dispatch. | 696 // immediately if woken up by a message which it's allowed to dispatch. |
694 dispatch_watcher_.Watch( | 697 dispatch_watcher_.StartWatching( |
695 sync_context()->GetDispatchEvent()->GetHandle(), | 698 sync_context()->GetDispatchEvent(), |
696 MOJO_HANDLE_SIGNAL_READABLE, | 699 base::Bind(&SyncChannel::OnDispatchEventSignaled, |
697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); | 700 base::Unretained(this))); |
698 } | 701 } |
699 | 702 |
700 void SyncChannel::OnChannelInit() { | 703 void SyncChannel::OnChannelInit() { |
701 pre_init_sync_message_filters_.clear(); | 704 pre_init_sync_message_filters_.clear(); |
702 } | 705 } |
703 | 706 |
704 } // namespace IPC | 707 } // namespace IPC |
OLD | NEW |