Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(389)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 2754143005: Use WaitableEvents to wake up sync IPC waiting (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/lazy_instance.h" 13 #include "base/lazy_instance.h"
14 #include "base/location.h" 14 #include "base/location.h"
15 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/macros.h" 16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h" 17 #include "base/memory/ptr_util.h"
18 #include "base/run_loop.h" 18 #include "base/run_loop.h"
19 #include "base/synchronization/waitable_event.h" 19 #include "base/synchronization/waitable_event.h"
20 #include "base/threading/thread_local.h" 20 #include "base/threading/thread_local.h"
21 #include "base/threading/thread_task_runner_handle.h" 21 #include "base/threading/thread_task_runner_handle.h"
22 #include "base/trace_event/trace_event.h" 22 #include "base/trace_event/trace_event.h"
23 #include "ipc/ipc_channel_factory.h" 23 #include "ipc/ipc_channel_factory.h"
24 #include "ipc/ipc_logging.h" 24 #include "ipc/ipc_logging.h"
25 #include "ipc/ipc_message_macros.h" 25 #include "ipc/ipc_message_macros.h"
26 #include "ipc/ipc_sync_message.h" 26 #include "ipc/ipc_sync_message.h"
27 #include "ipc/mojo_event.h" 27 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
29 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
30 28
31 using base::WaitableEvent; 29 using base::WaitableEvent;
32 30
33 namespace IPC { 31 namespace IPC {
34 32
35 namespace { 33 namespace {
36 34
37 // A generic callback used when watching handles synchronously. Sets |*signal| 35 // A generic callback used when watching handles synchronously. Sets |*signal|
38 // to true. Also sets |*error| to true in case of an error. 36 // to true.
39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { 37 void OnEventReady(bool* signal) {
40 *signal = true; 38 *signal = true;
41 *error = result != MOJO_RESULT_OK;
42 } 39 }
43 40
44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result 41 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky
45 // (DCHECKs, but is only used in cases where failure should be impossible) and 42 g_pump_messages_event = LAZY_INSTANCE_INITIALIZER;
46 // runs |callback|.
47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
48 DCHECK_EQ(result, MOJO_RESULT_OK);
49 callback.Run();
50 }
51
52 class PumpMessagesEvent {
53 public:
54 PumpMessagesEvent() { event_.Signal(); }
55 ~PumpMessagesEvent() {}
56
57 const MojoEvent* event() const { return &event_; }
58
59 private:
60 MojoEvent event_;
61
62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
63 };
64
65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
66 LAZY_INSTANCE_INITIALIZER;
67 43
68 } // namespace 44 } // namespace
69 45
70 // When we're blocked in a Send(), we need to process incoming synchronous 46 // When we're blocked in a Send(), we need to process incoming synchronous
71 // messages right away because it could be blocking our reply (either 47 // messages right away because it could be blocking our reply (either
72 // directly from the same object we're calling, or indirectly through one or 48 // directly from the same object we're calling, or indirectly through one or
73 // more other channels). That means that in SyncContext's OnMessageReceived, 49 // more other channels). That means that in SyncContext's OnMessageReceived,
74 // we need to process sync message right away if we're blocked. However a 50 // we need to process sync message right away if we're blocked. However a
75 // simple check isn't sufficient, because the listener thread can be in the 51 // simple check isn't sufficient, because the listener thread can be in the
76 // process of calling Send. 52 // process of calling Send.
77 // To work around this, when SyncChannel filters a sync message, it sets 53 // To work around this, when SyncChannel filters a sync message, it sets
78 // an event that the listener thread waits on during its Send() call. This 54 // an event that the listener thread waits on during its Send() call. This
79 // allows us to dispatch incoming sync messages when blocked. The race 55 // allows us to dispatch incoming sync messages when blocked. The race
80 // condition is handled because if Send is in the process of being called, it 56 // condition is handled because if Send is in the process of being called, it
81 // will check the event. In case the listener thread isn't sending a message, 57 // will check the event. In case the listener thread isn't sending a message,
82 // we queue a task on the listener thread to dispatch the received messages. 58 // we queue a task on the listener thread to dispatch the received messages.
83 // The messages are stored in this queue object that's shared among all 59 // The messages are stored in this queue object that's shared among all
84 // SyncChannel objects on the same thread (since one object can receive a 60 // SyncChannel objects on the same thread (since one object can receive a
85 // sync message while another one is blocked). 61 // sync message while another one is blocked).
86 62
87 class SyncChannel::ReceivedSyncMsgQueue : 63 class SyncChannel::ReceivedSyncMsgQueue :
88 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { 64 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
89 public: 65 public:
66 // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we
67 // may nest waiting message loops arbitrarily deep on the SyncChannel's
68 // thread. Every such operation has a corresponding WaitableEvent to be
69 // watched which, when signalled for IPC completion, breaks out of the loop.
70 // A reference to the innermost (i.e. topmost) watcher is held in
71 // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|.
72 //
73 // NestedSendDoneWatcher provides a simple scoper which is used by
74 // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done"
75 // event, preserving the previous topmost state on the local stack until the
76 // new inner loop is broken. If yet another subsequent nested loop is started
77 // therein the process is repeated again in the new inner stack frame, and so
78 // on.
79 //
80 // When this object is destroyed on stack unwind, the previous topmost state
81 // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|,
82 // and its watch is resumed immediately.
83 class NestedSendDoneWatcher {
84 public:
85 NestedSendDoneWatcher(SyncChannel::SyncContext* context,
86 base::RunLoop* run_loop)
87 : sync_msg_queue_(context->received_sync_msgs()),
88 outer_state_(sync_msg_queue_->top_send_done_event_watcher_),
89 event_(context->GetSendDoneEvent()),
90 callback_(
91 base::Bind(&SyncChannel::SyncContext::OnSendDoneEventSignaled,
92 context,
93 run_loop)) {
94 sync_msg_queue_->top_send_done_event_watcher_ = this;
95 if (outer_state_)
96 outer_state_->StopWatching();
97 StartWatching();
98 }
99
100 ~NestedSendDoneWatcher() {
101 sync_msg_queue_->top_send_done_event_watcher_ = outer_state_;
102 if (outer_state_)
103 outer_state_->StartWatching();
104 }
105
106 private:
107 void StartWatching() { watcher_.StartWatching(event_, callback_); }
108 void StopWatching() { watcher_.StopWatching(); }
109
110 ReceivedSyncMsgQueue* const sync_msg_queue_;
111 NestedSendDoneWatcher* const outer_state_;
112
113 base::WaitableEvent* const event_;
114 const base::WaitableEventWatcher::EventCallback callback_;
115 base::WaitableEventWatcher watcher_;
116
117 DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher);
118 };
119
90 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one 120 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
91 // if necessary. Call RemoveContext on the same thread when done. 121 // if necessary. Call RemoveContext on the same thread when done.
92 static ReceivedSyncMsgQueue* AddContext() { 122 static ReceivedSyncMsgQueue* AddContext() {
93 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple 123 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
94 // SyncChannel objects can block the same thread). 124 // SyncChannel objects can block the same thread).
95 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); 125 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
96 if (!rv) { 126 if (!rv) {
97 rv = new ReceivedSyncMsgQueue(); 127 rv = new ReceivedSyncMsgQueue();
98 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); 128 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
99 } 129 }
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
201 } 231 }
202 } 232 }
203 233
204 if (--listener_count_ == 0) { 234 if (--listener_count_ == 0) {
205 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 235 DCHECK(lazy_tls_ptr_.Pointer()->Get());
206 lazy_tls_ptr_.Pointer()->Set(nullptr); 236 lazy_tls_ptr_.Pointer()->Set(nullptr);
207 sync_dispatch_watcher_.reset(); 237 sync_dispatch_watcher_.reset();
208 } 238 }
209 } 239 }
210 240
211 MojoEvent* dispatch_event() { return &dispatch_event_; } 241 base::WaitableEvent* dispatch_event() { return &dispatch_event_; }
212 base::SingleThreadTaskRunner* listener_task_runner() { 242 base::SingleThreadTaskRunner* listener_task_runner() {
213 return listener_task_runner_.get(); 243 return listener_task_runner_.get();
214 } 244 }
215 245
216 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 246 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
217 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: 247 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>::
218 DestructorAtExit lazy_tls_ptr_; 248 DestructorAtExit lazy_tls_ptr_;
219 249
220 // Called on the ipc thread to check if we can unblock any current Send() 250 // Called on the ipc thread to check if we can unblock any current Send()
221 // calls based on a queued reply. 251 // calls based on a queued reply.
222 void DispatchReplies() { 252 void DispatchReplies() {
223 for (size_t i = 0; i < received_replies_.size(); ++i) { 253 for (size_t i = 0; i < received_replies_.size(); ++i) {
224 Message* message = received_replies_[i].message; 254 Message* message = received_replies_[i].message;
225 if (received_replies_[i].context->TryToUnblockListener(message)) { 255 if (received_replies_[i].context->TryToUnblockListener(message)) {
226 delete message; 256 delete message;
227 received_replies_.erase(received_replies_.begin() + i); 257 received_replies_.erase(received_replies_.begin() + i);
228 return; 258 return;
229 } 259 }
230 } 260 }
231 } 261 }
232 262
233 mojo::SimpleWatcher* top_send_done_watcher() {
234 return top_send_done_watcher_;
235 }
236
237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) {
238 top_send_done_watcher_ = watcher;
239 }
240
241 private: 263 private:
242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 264 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
243 265
244 // See the comment in SyncChannel::SyncChannel for why this event is created 266 // See the comment in SyncChannel::SyncChannel for why this event is created
245 // as manual reset. 267 // as manual reset.
246 ReceivedSyncMsgQueue() 268 ReceivedSyncMsgQueue()
247 : message_queue_version_(0), 269 : message_queue_version_(0),
270 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
271 base::WaitableEvent::InitialState::NOT_SIGNALED),
248 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 272 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
249 task_pending_(false), 273 sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>(
250 listener_count_(0), 274 &dispatch_event_,
251 top_send_done_watcher_(nullptr) { 275 base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady,
252 sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( 276 base::Unretained(this)))) {
253 dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
254 base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady,
255 base::Unretained(this))));
256 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 277 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
257 } 278 }
258 279
259 ~ReceivedSyncMsgQueue() {} 280 ~ReceivedSyncMsgQueue() {}
260 281
261 void OnDispatchHandleReady(MojoResult result) { 282 void OnDispatchEventReady() {
262 if (result != MOJO_RESULT_OK)
263 return;
264
265 if (dispatch_flag_) { 283 if (dispatch_flag_) {
266 *dispatch_flag_ = true; 284 *dispatch_flag_ = true;
267 return; 285 return;
268 } 286 }
269 287
270 // We were woken up during a sync wait, but no specific SyncChannel is 288 // We were woken up during a sync wait, but no specific SyncChannel is
271 // currently waiting. i.e., some other Mojo interface on this thread is 289 // currently waiting. i.e., some other Mojo interface on this thread is
272 // waiting for a response. Since we don't support anything analogous to 290 // waiting for a response. Since we don't support anything analogous to
273 // restricted dispatch on Mojo interfaces, in this case it's safe to 291 // restricted dispatch on Mojo interfaces, in this case it's safe to
274 // dispatch sync messages for any context. 292 // dispatch sync messages for any context.
275 DispatchMessages(nullptr); 293 DispatchMessages(nullptr);
276 } 294 }
277 295
278 // Holds information about a queued synchronous message or reply. 296 // Holds information about a queued synchronous message or reply.
279 struct QueuedMessage { 297 struct QueuedMessage {
280 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 298 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
281 Message* message; 299 Message* message;
282 scoped_refptr<SyncChannel::SyncContext> context; 300 scoped_refptr<SyncChannel::SyncContext> context;
283 }; 301 };
284 302
285 typedef std::list<QueuedMessage> SyncMessageQueue; 303 typedef std::list<QueuedMessage> SyncMessageQueue;
286 SyncMessageQueue message_queue_; 304 SyncMessageQueue message_queue_;
287 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan 305
306 // Used to signal DispatchMessages to rescan
307 uint32_t message_queue_version_ = 0;
288 308
289 std::vector<QueuedMessage> received_replies_; 309 std::vector<QueuedMessage> received_replies_;
290 310
291 // Signaled when we get a synchronous message that we must respond to, as the 311 // Signaled when we get a synchronous message that we must respond to, as the
292 // sender needs its reply before it can reply to our original synchronous 312 // sender needs its reply before it can reply to our original synchronous
293 // message. 313 // message.
294 MojoEvent dispatch_event_; 314 base::WaitableEvent dispatch_event_;
295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 315 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
296 base::Lock message_lock_; 316 base::Lock message_lock_;
297 bool task_pending_; 317 bool task_pending_ = false;
298 int listener_count_; 318 int listener_count_ = 0;
299 319
300 // The current send done handle watcher for this thread. Used to maintain 320 // The current NestedSendDoneWatcher for this thread, if we're currently
301 // a thread-local stack of send done watchers to ensure that nested sync 321 // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See
302 // message loops complete correctly. 322 // NestedSendDoneWatcher comments for more details.
303 mojo::SimpleWatcher* top_send_done_watcher_; 323 NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr;
304 324
305 // If not null, the address of a flag to set when the dispatch event signals, 325 // If not null, the address of a flag to set when the dispatch event signals,
306 // in lieu of actually dispatching messages. This is used by 326 // in lieu of actually dispatching messages. This is used by
307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're 327 // SyncChannel::WaitForReply to restrict the scope of queued messages we're
308 // allowed to process while it's waiting. 328 // allowed to process while it's waiting.
309 bool* dispatch_flag_ = nullptr; 329 bool* dispatch_flag_ = nullptr;
310 330
311 // Watches |dispatch_event_| during all sync handle watches on this thread. 331 // Watches |dispatch_event_| during all sync handle watches on this thread.
312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; 332 std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
313 }; 333 };
314 334
315 base::LazyInstance<base::ThreadLocalPointer< 335 base::LazyInstance<base::ThreadLocalPointer<
316 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit 336 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit
317 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 337 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
318 LAZY_INSTANCE_INITIALIZER; 338 LAZY_INSTANCE_INITIALIZER;
319 339
320 SyncChannel::SyncContext::SyncContext( 340 SyncChannel::SyncContext::SyncContext(
321 Listener* listener, 341 Listener* listener,
322 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 342 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
323 WaitableEvent* shutdown_event) 343 WaitableEvent* shutdown_event)
324 : ChannelProxy::Context(listener, ipc_task_runner), 344 : ChannelProxy::Context(listener, ipc_task_runner),
325 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 345 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
326 shutdown_event_(shutdown_event), 346 shutdown_event_(shutdown_event),
327 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 347 restrict_dispatch_group_(kRestrictDispatchGroup_None) {
328 } 348 }
329 349
350 void SyncChannel::SyncContext::OnSendDoneEventSignaled(
351 base::RunLoop* nested_loop,
352 base::WaitableEvent* event) {
353 DCHECK_EQ(GetSendDoneEvent(), event);
354 nested_loop->Quit();
355 }
356
330 SyncChannel::SyncContext::~SyncContext() { 357 SyncChannel::SyncContext::~SyncContext() {
331 while (!deserializers_.empty()) 358 while (!deserializers_.empty())
332 Pop(); 359 Pop();
333 } 360 }
334 361
335 // Adds information about an outgoing sync message to the context so that 362 // Adds information about an outgoing sync message to the context so that
336 // we know how to deserialize the reply. Returns |true| if the message was added 363 // we know how to deserialize the reply. Returns |true| if the message was added
337 // to the context or |false| if it was rejected (e.g. due to shutdown.) 364 // to the context or |false| if it was rejected (e.g. due to shutdown.)
338 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 365 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
339 // Create the tracking information for this message. This object is stored 366 // Create the tracking information for this message. This object is stored
340 // by value since all members are pointers that are cheap to copy. These 367 // by value since all members are pointers that are cheap to copy. These
341 // pointers are cleaned up in the Pop() function. 368 // pointers are cleaned up in the Pop() function.
342 // 369 //
343 // The event is created as manual reset because in between Signal and 370 // The event is created as manual reset because in between Signal and
344 // OnObjectSignalled, another Send can happen which would stop the watcher 371 // OnObjectSignalled, another Send can happen which would stop the watcher
345 // from being called. The event would get watched later, when the nested 372 // from being called. The event would get watched later, when the nested
346 // Send completes, so the event will need to remain set. 373 // Send completes, so the event will need to remain set.
347 base::AutoLock auto_lock(deserializers_lock_); 374 base::AutoLock auto_lock(deserializers_lock_);
348 if (reject_new_deserializers_) 375 if (reject_new_deserializers_)
349 return false; 376 return false;
350 PendingSyncMsg pending( 377 PendingSyncMsg pending(
351 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), 378 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
352 new MojoEvent); 379 new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
380 base::WaitableEvent::InitialState::NOT_SIGNALED));
353 deserializers_.push_back(pending); 381 deserializers_.push_back(pending);
354 return true; 382 return true;
355 } 383 }
356 384
357 bool SyncChannel::SyncContext::Pop() { 385 bool SyncChannel::SyncContext::Pop() {
358 bool result; 386 bool result;
359 { 387 {
360 base::AutoLock auto_lock(deserializers_lock_); 388 base::AutoLock auto_lock(deserializers_lock_);
361 PendingSyncMsg msg = deserializers_.back(); 389 PendingSyncMsg msg = deserializers_.back();
362 delete msg.deserializer; 390 delete msg.deserializer;
363 delete msg.done_event; 391 delete msg.done_event;
364 msg.done_event = nullptr; 392 msg.done_event = nullptr;
365 deserializers_.pop_back(); 393 deserializers_.pop_back();
366 result = msg.send_result; 394 result = msg.send_result;
367 } 395 }
368 396
369 // We got a reply to a synchronous Send() call that's blocking the listener 397 // We got a reply to a synchronous Send() call that's blocking the listener
370 // thread. However, further down the call stack there could be another 398 // thread. However, further down the call stack there could be another
371 // blocking Send() call, whose reply we received after we made this last 399 // blocking Send() call, whose reply we received after we made this last
372 // Send() call. So check if we have any queued replies available that 400 // Send() call. So check if we have any queued replies available that
373 // can now unblock the listener thread. 401 // can now unblock the listener thread.
374 ipc_task_runner()->PostTask( 402 ipc_task_runner()->PostTask(
375 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 403 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
376 received_sync_msgs_)); 404 received_sync_msgs_));
377 405
378 return result; 406 return result;
379 } 407 }
380 408
381 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 409 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
382 base::AutoLock auto_lock(deserializers_lock_); 410 base::AutoLock auto_lock(deserializers_lock_);
383 return deserializers_.back().done_event; 411 return deserializers_.back().done_event;
384 } 412 }
385 413
386 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { 414 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
387 return received_sync_msgs_->dispatch_event(); 415 return received_sync_msgs_->dispatch_event();
388 } 416 }
389 417
390 void SyncChannel::SyncContext::DispatchMessages() { 418 void SyncChannel::SyncContext::DispatchMessages() {
391 received_sync_msgs_->DispatchMessages(this); 419 received_sync_msgs_->DispatchMessages(this);
392 } 420 }
393 421
394 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 422 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
395 base::AutoLock auto_lock(deserializers_lock_); 423 base::AutoLock auto_lock(deserializers_lock_);
396 if (deserializers_.empty() || 424 if (deserializers_.empty() ||
397 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 425 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
398 return false; 426 return false;
399 } 427 }
400 428
401 if (!msg->is_reply_error()) { 429 if (!msg->is_reply_error()) {
402 bool send_result = deserializers_.back().deserializer-> 430 bool send_result = deserializers_.back().deserializer->
403 SerializeOutputParameters(*msg); 431 SerializeOutputParameters(*msg);
404 deserializers_.back().send_result = send_result; 432 deserializers_.back().send_result = send_result;
405 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; 433 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
406 } else { 434 } else {
407 DVLOG(1) << "Received error reply"; 435 DVLOG(1) << "Received error reply";
408 } 436 }
409 437
410 MojoEvent* done_event = deserializers_.back().done_event; 438 base::WaitableEvent* done_event = deserializers_.back().done_event;
411 TRACE_EVENT_FLOW_BEGIN0( 439 TRACE_EVENT_FLOW_BEGIN0(
412 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 440 TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
413 "SyncChannel::SyncContext::TryToUnblockListener", done_event); 441 "SyncChannel::SyncContext::TryToUnblockListener", done_event);
414 442
415 done_event->Signal(); 443 done_event->Signal();
416 444
417 return true; 445 return true;
418 } 446 }
419 447
420 void SyncChannel::SyncContext::Clear() { 448 void SyncChannel::SyncContext::Clear() {
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
519 WaitableEvent* shutdown_event) { 547 WaitableEvent* shutdown_event) {
520 return base::WrapUnique( 548 return base::WrapUnique(
521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); 549 new SyncChannel(listener, ipc_task_runner, shutdown_event));
522 } 550 }
523 551
524 SyncChannel::SyncChannel( 552 SyncChannel::SyncChannel(
525 Listener* listener, 553 Listener* listener,
526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 554 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
527 WaitableEvent* shutdown_event) 555 WaitableEvent* shutdown_event)
528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), 556 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), 557 sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
530 dispatch_watcher_(FROM_HERE,
531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
532 // The current (listener) thread must be distinct from the IPC thread, or else 558 // The current (listener) thread must be distinct from the IPC thread, or else
533 // sending synchronous messages will deadlock. 559 // sending synchronous messages will deadlock.
534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); 560 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
535 StartWatching(); 561 StartWatching();
536 } 562 }
537 563
538 SyncChannel::~SyncChannel() { 564 SyncChannel::~SyncChannel() {
539 } 565 }
540 566
541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 567 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
589 "SyncChannel::Send", context->GetSendDoneEvent()); 615 "SyncChannel::Send", context->GetSendDoneEvent());
590 616
591 return context->Pop(); 617 return context->Pop();
592 } 618 }
593 619
594 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, 620 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
595 SyncContext* context, 621 SyncContext* context,
596 bool pump_messages) { 622 bool pump_messages) {
597 context->DispatchMessages(); 623 context->DispatchMessages();
598 624
599 const MojoEvent* pump_messages_event = nullptr; 625 base::WaitableEvent* pump_messages_event = nullptr;
600 if (pump_messages) 626 if (pump_messages) {
601 pump_messages_event = g_pump_messages_event.Get().event(); 627 if (!g_pump_messages_event.Get()) {
628 g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>(
629 base::WaitableEvent::ResetPolicy::MANUAL,
630 base::WaitableEvent::InitialState::SIGNALED);
631 }
632 pump_messages_event = g_pump_messages_event.Get().get();
633 }
602 634
603 while (true) { 635 while (true) {
604 bool dispatch = false; 636 bool dispatch = false;
605 bool send_done = false; 637 bool send_done = false;
606 bool should_pump_messages = false; 638 bool should_pump_messages = false;
607 bool error = false; 639 bool registered = registry->RegisterEvent(
608 bool registered = registry->RegisterHandle( 640 context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done));
609 context->GetSendDoneEvent()->GetHandle(),
610 MOJO_HANDLE_SIGNAL_READABLE,
611 base::Bind(&OnSyncHandleReady, &send_done, &error));
612 DCHECK(registered); 641 DCHECK(registered);
642
613 if (pump_messages_event) { 643 if (pump_messages_event) {
614 registered = registry->RegisterHandle( 644 registered = registry->RegisterEvent(
615 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, 645 pump_messages_event,
616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); 646 base::Bind(&OnEventReady, &should_pump_messages));
617 DCHECK(registered); 647 DCHECK(registered);
618 } 648 }
619 649
620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; 650 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
621 context->received_sync_msgs()->BlockDispatch(&dispatch); 651 context->received_sync_msgs()->BlockDispatch(&dispatch);
622 registry->WatchAllHandles(stop_flags, 3); 652 registry->Wait(stop_flags, 3);
623 context->received_sync_msgs()->UnblockDispatch(); 653 context->received_sync_msgs()->UnblockDispatch();
624 DCHECK(!error);
625 654
626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); 655 registry->UnregisterEvent(context->GetSendDoneEvent());
627 if (pump_messages_event) 656 if (pump_messages_event)
628 registry->UnregisterHandle(pump_messages_event->GetHandle()); 657 registry->UnregisterEvent(pump_messages_event);
629 658
630 if (dispatch) { 659 if (dispatch) {
631 // We're waiting for a reply, but we received a blocking synchronous call. 660 // We're waiting for a reply, but we received a blocking synchronous call.
632 // We must process it to avoid potential deadlocks. 661 // We must process it to avoid potential deadlocks.
633 context->GetDispatchEvent()->Reset(); 662 context->GetDispatchEvent()->Reset();
634 context->DispatchMessages(); 663 context->DispatchMessages();
635 continue; 664 continue;
636 } 665 }
637 666
638 if (should_pump_messages) 667 if (should_pump_messages)
639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 668 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
640 669
641 break; 670 break;
642 } 671 }
643 } 672 }
644 673
645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 674 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
646 mojo::SimpleWatcher send_done_watcher( 675 base::MessageLoop::ScopedNestableTaskAllower allow(
647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); 676 base::MessageLoop::current());
648 677 base::RunLoop nested_loop;
649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 678 ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(context, &nested_loop);
650 DCHECK_NE(sync_msg_queue, nullptr); 679 nested_loop.Run();
651
652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher();
653 mojo::Handle old_handle(mojo::kInvalidHandleValue);
654 mojo::SimpleWatcher::ReadyCallback old_callback;
655
656 // Maintain a thread-local stack of watchers to ensure nested calls complete
657 // in the correct sequence, i.e. the outermost call completes first, etc.
658 if (old_watcher) {
659 old_callback = old_watcher->ready_callback();
660 old_handle = old_watcher->handle();
661 old_watcher->Cancel();
662 }
663
664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
665
666 {
667 base::RunLoop nested_loop;
668 send_done_watcher.Watch(
669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
671
672 base::MessageLoop::ScopedNestableTaskAllower allow(
673 base::MessageLoop::current());
674 nested_loop.Run();
675 send_done_watcher.Cancel();
676 }
677
678 sync_msg_queue->set_top_send_done_watcher(old_watcher);
679 if (old_watcher)
680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
681 } 680 }
682 681
683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { 682 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
684 DCHECK_EQ(result, MOJO_RESULT_OK); 683 DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
685 sync_context()->GetDispatchEvent()->Reset(); 684 sync_context()->GetDispatchEvent()->Reset();
685
686 StartWatching();
687
688 // NOTE: May delete |this|.
686 sync_context()->DispatchMessages(); 689 sync_context()->DispatchMessages();
687 } 690 }
688 691
689 void SyncChannel::StartWatching() { 692 void SyncChannel::StartWatching() {
690 // |dispatch_watcher_| watches the event asynchronously, only dispatching 693 // |dispatch_watcher_| watches the event asynchronously, only dispatching
691 // messages once the listener thread is unblocked and pumping its task queue. 694 // messages once the listener thread is unblocked and pumping its task queue.
692 // The ReceivedSyncMsgQueue also watches this event and may dispatch 695 // The ReceivedSyncMsgQueue also watches this event and may dispatch
693 // immediately if woken up by a message which it's allowed to dispatch. 696 // immediately if woken up by a message which it's allowed to dispatch.
694 dispatch_watcher_.Watch( 697 dispatch_watcher_.StartWatching(
695 sync_context()->GetDispatchEvent()->GetHandle(), 698 sync_context()->GetDispatchEvent(),
696 MOJO_HANDLE_SIGNAL_READABLE, 699 base::Bind(&SyncChannel::OnDispatchEventSignaled,
697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); 700 base::Unretained(this)));
698 } 701 }
699 702
700 void SyncChannel::OnChannelInit() { 703 void SyncChannel::OnChannelInit() {
701 pre_init_sync_message_filters_.clear(); 704 pre_init_sync_message_filters_.clear();
702 } 705 }
703 706
704 } // namespace IPC 707 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698