OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
14 #include "base/location.h" | 14 #include "base/location.h" |
15 #include "base/logging.h" | 15 #include "base/logging.h" |
16 #include "base/macros.h" | 16 #include "base/macros.h" |
17 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
18 #include "base/run_loop.h" | 18 #include "base/run_loop.h" |
19 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
20 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
21 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
22 #include "base/trace_event/trace_event.h" | 22 #include "base/trace_event/trace_event.h" |
23 #include "ipc/ipc_channel_factory.h" | 23 #include "ipc/ipc_channel_factory.h" |
24 #include "ipc/ipc_logging.h" | 24 #include "ipc/ipc_logging.h" |
25 #include "ipc/ipc_message_macros.h" | 25 #include "ipc/ipc_message_macros.h" |
26 #include "ipc/ipc_sync_message.h" | 26 #include "ipc/ipc_sync_message.h" |
27 #include "ipc/mojo_event.h" | 27 #include "mojo/public/cpp/bindings/sync_event_watcher.h" |
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | |
29 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | |
30 | 28 |
31 using base::WaitableEvent; | 29 using base::WaitableEvent; |
32 | 30 |
33 namespace IPC { | 31 namespace IPC { |
34 | 32 |
35 namespace { | 33 namespace { |
36 | 34 |
37 // A generic callback used when watching handles synchronously. Sets |*signal| | 35 // A generic callback used when watching handles synchronously. Sets |*signal| |
38 // to true. Also sets |*error| to true in case of an error. | 36 // to true. |
39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 37 void OnEventReady(bool* signal) { |
40 *signal = true; | 38 *signal = true; |
41 *error = result != MOJO_RESULT_OK; | |
42 } | 39 } |
43 | 40 |
44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result | 41 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky |
45 // (DCHECKs, but is only used in cases where failure should be impossible) and | 42 g_pump_messages_event = LAZY_INSTANCE_INITIALIZER; |
46 // runs |callback|. | |
47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { | |
48 DCHECK_EQ(result, MOJO_RESULT_OK); | |
49 callback.Run(); | |
50 } | |
51 | |
52 class PumpMessagesEvent { | |
53 public: | |
54 PumpMessagesEvent() { event_.Signal(); } | |
55 ~PumpMessagesEvent() {} | |
56 | |
57 const MojoEvent* event() const { return &event_; } | |
58 | |
59 private: | |
60 MojoEvent event_; | |
61 | |
62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); | |
63 }; | |
64 | |
65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event = | |
66 LAZY_INSTANCE_INITIALIZER; | |
67 | 43 |
68 } // namespace | 44 } // namespace |
69 | 45 |
70 // When we're blocked in a Send(), we need to process incoming synchronous | 46 // When we're blocked in a Send(), we need to process incoming synchronous |
71 // messages right away because it could be blocking our reply (either | 47 // messages right away because it could be blocking our reply (either |
72 // directly from the same object we're calling, or indirectly through one or | 48 // directly from the same object we're calling, or indirectly through one or |
73 // more other channels). That means that in SyncContext's OnMessageReceived, | 49 // more other channels). That means that in SyncContext's OnMessageReceived, |
74 // we need to process sync message right away if we're blocked. However a | 50 // we need to process sync message right away if we're blocked. However a |
75 // simple check isn't sufficient, because the listener thread can be in the | 51 // simple check isn't sufficient, because the listener thread can be in the |
76 // process of calling Send. | 52 // process of calling Send. |
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
201 } | 177 } |
202 } | 178 } |
203 | 179 |
204 if (--listener_count_ == 0) { | 180 if (--listener_count_ == 0) { |
205 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 181 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
206 lazy_tls_ptr_.Pointer()->Set(nullptr); | 182 lazy_tls_ptr_.Pointer()->Set(nullptr); |
207 sync_dispatch_watcher_.reset(); | 183 sync_dispatch_watcher_.reset(); |
208 } | 184 } |
209 } | 185 } |
210 | 186 |
211 MojoEvent* dispatch_event() { return &dispatch_event_; } | 187 base::WaitableEvent* dispatch_event() { return &dispatch_event_; } |
212 base::SingleThreadTaskRunner* listener_task_runner() { | 188 base::SingleThreadTaskRunner* listener_task_runner() { |
213 return listener_task_runner_.get(); | 189 return listener_task_runner_.get(); |
214 } | 190 } |
215 | 191 |
216 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 192 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
217 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: | 193 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: |
218 DestructorAtExit lazy_tls_ptr_; | 194 DestructorAtExit lazy_tls_ptr_; |
219 | 195 |
220 // Called on the ipc thread to check if we can unblock any current Send() | 196 // Called on the ipc thread to check if we can unblock any current Send() |
221 // calls based on a queued reply. | 197 // calls based on a queued reply. |
222 void DispatchReplies() { | 198 void DispatchReplies() { |
223 for (size_t i = 0; i < received_replies_.size(); ++i) { | 199 for (size_t i = 0; i < received_replies_.size(); ++i) { |
224 Message* message = received_replies_[i].message; | 200 Message* message = received_replies_[i].message; |
225 if (received_replies_[i].context->TryToUnblockListener(message)) { | 201 if (received_replies_[i].context->TryToUnblockListener(message)) { |
226 delete message; | 202 delete message; |
227 received_replies_.erase(received_replies_.begin() + i); | 203 received_replies_.erase(received_replies_.begin() + i); |
228 return; | 204 return; |
229 } | 205 } |
230 } | 206 } |
231 } | 207 } |
232 | 208 |
233 mojo::SimpleWatcher* top_send_done_watcher() { | 209 // See SyncChannel::WaitForReplyWithNestedMessageLoop for details. |
234 return top_send_done_watcher_; | 210 void SetTopSendDoneState( |
235 } | 211 base::WaitableEventWatcher* watcher, |
236 | 212 base::WaitableEvent* event, |
237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { | 213 const base::WaitableEventWatcher::EventCallback& callback, |
214 base::WaitableEventWatcher** outer_watcher, | |
215 base::WaitableEvent** outer_event, | |
216 base::WaitableEventWatcher::EventCallback* outer_callback) { | |
217 if (outer_watcher) { | |
218 DCHECK(outer_event && outer_callback); | |
219 *outer_watcher = top_send_done_watcher_; | |
220 *outer_event = top_send_done_event_; | |
221 *outer_callback = top_send_done_callback_; | |
222 } | |
238 top_send_done_watcher_ = watcher; | 223 top_send_done_watcher_ = watcher; |
224 top_send_done_event_ = event; | |
225 top_send_done_callback_ = callback; | |
239 } | 226 } |
240 | 227 |
241 private: | 228 private: |
242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 229 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
243 | 230 |
244 // See the comment in SyncChannel::SyncChannel for why this event is created | 231 // See the comment in SyncChannel::SyncChannel for why this event is created |
245 // as manual reset. | 232 // as manual reset. |
246 ReceivedSyncMsgQueue() | 233 ReceivedSyncMsgQueue() |
247 : message_queue_version_(0), | 234 : message_queue_version_(0), |
235 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, | |
236 base::WaitableEvent::InitialState::NOT_SIGNALED), | |
248 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 237 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
249 task_pending_(false), | 238 sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>( |
250 listener_count_(0), | 239 &dispatch_event_, |
251 top_send_done_watcher_(nullptr) { | 240 base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady, |
252 sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( | 241 base::Unretained(this)))) { |
253 dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
254 base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, | |
255 base::Unretained(this)))); | |
256 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 242 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
257 } | 243 } |
258 | 244 |
259 ~ReceivedSyncMsgQueue() {} | 245 ~ReceivedSyncMsgQueue() {} |
260 | 246 |
261 void OnDispatchHandleReady(MojoResult result) { | 247 void OnDispatchEventReady() { |
262 if (result != MOJO_RESULT_OK) | |
263 return; | |
264 | |
265 if (dispatch_flag_) { | 248 if (dispatch_flag_) { |
266 *dispatch_flag_ = true; | 249 *dispatch_flag_ = true; |
267 return; | 250 return; |
268 } | 251 } |
269 | 252 |
270 // We were woken up during a sync wait, but no specific SyncChannel is | 253 // We were woken up during a sync wait, but no specific SyncChannel is |
271 // currently waiting. i.e., some other Mojo interface on this thread is | 254 // currently waiting. i.e., some other Mojo interface on this thread is |
272 // waiting for a response. Since we don't support anything analogous to | 255 // waiting for a response. Since we don't support anything analogous to |
273 // restricted dispatch on Mojo interfaces, in this case it's safe to | 256 // restricted dispatch on Mojo interfaces, in this case it's safe to |
274 // dispatch sync messages for any context. | 257 // dispatch sync messages for any context. |
275 DispatchMessages(nullptr); | 258 DispatchMessages(nullptr); |
276 } | 259 } |
277 | 260 |
278 // Holds information about a queued synchronous message or reply. | 261 // Holds information about a queued synchronous message or reply. |
279 struct QueuedMessage { | 262 struct QueuedMessage { |
280 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 263 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
281 Message* message; | 264 Message* message; |
282 scoped_refptr<SyncChannel::SyncContext> context; | 265 scoped_refptr<SyncChannel::SyncContext> context; |
283 }; | 266 }; |
284 | 267 |
285 typedef std::list<QueuedMessage> SyncMessageQueue; | 268 typedef std::list<QueuedMessage> SyncMessageQueue; |
286 SyncMessageQueue message_queue_; | 269 SyncMessageQueue message_queue_; |
287 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 270 |
271 // Used to signal DispatchMessages to rescan | |
272 uint32_t message_queue_version_ = 0; | |
288 | 273 |
289 std::vector<QueuedMessage> received_replies_; | 274 std::vector<QueuedMessage> received_replies_; |
290 | 275 |
291 // Signaled when we get a synchronous message that we must respond to, as the | 276 // Signaled when we get a synchronous message that we must respond to, as the |
292 // sender needs its reply before it can reply to our original synchronous | 277 // sender needs its reply before it can reply to our original synchronous |
293 // message. | 278 // message. |
294 MojoEvent dispatch_event_; | 279 base::WaitableEvent dispatch_event_; |
295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 280 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
296 base::Lock message_lock_; | 281 base::Lock message_lock_; |
297 bool task_pending_; | 282 bool task_pending_ = false; |
298 int listener_count_; | 283 int listener_count_ = 0; |
299 | 284 |
300 // The current send done handle watcher for this thread. Used to maintain | 285 // The current send-done state for this thread. Used to maintain a thread- |
301 // a thread-local stack of send done watchers to ensure that nested sync | 286 // local stack of state to ensure that nested sync message loops complete |
302 // message loops complete correctly. | 287 // correctly. |
303 mojo::SimpleWatcher* top_send_done_watcher_; | 288 base::WaitableEventWatcher* top_send_done_watcher_ = nullptr; |
289 base::WaitableEvent* top_send_done_event_ = nullptr; | |
290 base::WaitableEventWatcher::EventCallback top_send_done_callback_; | |
304 | 291 |
305 // If not null, the address of a flag to set when the dispatch event signals, | 292 // If not null, the address of a flag to set when the dispatch event signals, |
306 // in lieu of actually dispatching messages. This is used by | 293 // in lieu of actually dispatching messages. This is used by |
307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're | 294 // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
308 // allowed to process while it's waiting. | 295 // allowed to process while it's waiting. |
309 bool* dispatch_flag_ = nullptr; | 296 bool* dispatch_flag_ = nullptr; |
310 | 297 |
311 // Watches |dispatch_event_| during all sync handle watches on this thread. | 298 // Watches |dispatch_event_| during all sync handle watches on this thread. |
312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; | 299 std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_; |
313 }; | 300 }; |
314 | 301 |
315 base::LazyInstance<base::ThreadLocalPointer< | 302 base::LazyInstance<base::ThreadLocalPointer< |
316 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit | 303 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit |
317 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 304 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
318 LAZY_INSTANCE_INITIALIZER; | 305 LAZY_INSTANCE_INITIALIZER; |
319 | 306 |
320 SyncChannel::SyncContext::SyncContext( | 307 SyncChannel::SyncContext::SyncContext( |
321 Listener* listener, | 308 Listener* listener, |
322 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 309 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
323 WaitableEvent* shutdown_event) | 310 WaitableEvent* shutdown_event) |
324 : ChannelProxy::Context(listener, ipc_task_runner), | 311 : ChannelProxy::Context(listener, ipc_task_runner), |
325 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 312 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
326 shutdown_event_(shutdown_event), | 313 shutdown_event_(shutdown_event), |
327 restrict_dispatch_group_(kRestrictDispatchGroup_None) { | 314 restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
328 } | 315 } |
329 | 316 |
317 void SyncChannel::SyncContext::OnSendDoneEventSignaled( | |
318 base::RunLoop* nested_loop, | |
319 base::WaitableEvent* event) { | |
320 DCHECK_EQ(GetSendDoneEvent(), event); | |
321 nested_loop->Quit(); | |
322 } | |
323 | |
330 SyncChannel::SyncContext::~SyncContext() { | 324 SyncChannel::SyncContext::~SyncContext() { |
331 while (!deserializers_.empty()) | 325 while (!deserializers_.empty()) |
332 Pop(); | 326 Pop(); |
333 } | 327 } |
334 | 328 |
335 // Adds information about an outgoing sync message to the context so that | 329 // Adds information about an outgoing sync message to the context so that |
336 // we know how to deserialize the reply. Returns |true| if the message was added | 330 // we know how to deserialize the reply. Returns |true| if the message was added |
337 // to the context or |false| if it was rejected (e.g. due to shutdown.) | 331 // to the context or |false| if it was rejected (e.g. due to shutdown.) |
338 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { | 332 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
339 // Create the tracking information for this message. This object is stored | 333 // Create the tracking information for this message. This object is stored |
340 // by value since all members are pointers that are cheap to copy. These | 334 // by value since all members are pointers that are cheap to copy. These |
341 // pointers are cleaned up in the Pop() function. | 335 // pointers are cleaned up in the Pop() function. |
342 // | 336 // |
343 // The event is created as manual reset because in between Signal and | 337 // The event is created as manual reset because in between Signal and |
344 // OnObjectSignalled, another Send can happen which would stop the watcher | 338 // OnObjectSignalled, another Send can happen which would stop the watcher |
345 // from being called. The event would get watched later, when the nested | 339 // from being called. The event would get watched later, when the nested |
346 // Send completes, so the event will need to remain set. | 340 // Send completes, so the event will need to remain set. |
347 base::AutoLock auto_lock(deserializers_lock_); | 341 base::AutoLock auto_lock(deserializers_lock_); |
348 if (reject_new_deserializers_) | 342 if (reject_new_deserializers_) |
349 return false; | 343 return false; |
350 PendingSyncMsg pending( | 344 PendingSyncMsg pending( |
351 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), | 345 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
352 new MojoEvent); | 346 new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, |
347 base::WaitableEvent::InitialState::NOT_SIGNALED)); | |
353 deserializers_.push_back(pending); | 348 deserializers_.push_back(pending); |
354 return true; | 349 return true; |
355 } | 350 } |
356 | 351 |
357 bool SyncChannel::SyncContext::Pop() { | 352 bool SyncChannel::SyncContext::Pop() { |
358 bool result; | 353 bool result; |
359 { | 354 { |
360 base::AutoLock auto_lock(deserializers_lock_); | 355 base::AutoLock auto_lock(deserializers_lock_); |
361 PendingSyncMsg msg = deserializers_.back(); | 356 PendingSyncMsg msg = deserializers_.back(); |
362 delete msg.deserializer; | 357 delete msg.deserializer; |
363 delete msg.done_event; | 358 delete msg.done_event; |
364 msg.done_event = nullptr; | 359 msg.done_event = nullptr; |
365 deserializers_.pop_back(); | 360 deserializers_.pop_back(); |
366 result = msg.send_result; | 361 result = msg.send_result; |
367 } | 362 } |
368 | 363 |
369 // We got a reply to a synchronous Send() call that's blocking the listener | 364 // We got a reply to a synchronous Send() call that's blocking the listener |
370 // thread. However, further down the call stack there could be another | 365 // thread. However, further down the call stack there could be another |
371 // blocking Send() call, whose reply we received after we made this last | 366 // blocking Send() call, whose reply we received after we made this last |
372 // Send() call. So check if we have any queued replies available that | 367 // Send() call. So check if we have any queued replies available that |
373 // can now unblock the listener thread. | 368 // can now unblock the listener thread. |
374 ipc_task_runner()->PostTask( | 369 ipc_task_runner()->PostTask( |
375 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 370 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
376 received_sync_msgs_)); | 371 received_sync_msgs_)); |
377 | 372 |
378 return result; | 373 return result; |
379 } | 374 } |
380 | 375 |
381 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 376 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
382 base::AutoLock auto_lock(deserializers_lock_); | 377 base::AutoLock auto_lock(deserializers_lock_); |
383 return deserializers_.back().done_event; | 378 return deserializers_.back().done_event; |
384 } | 379 } |
385 | 380 |
386 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 381 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
387 return received_sync_msgs_->dispatch_event(); | 382 return received_sync_msgs_->dispatch_event(); |
388 } | 383 } |
389 | 384 |
390 void SyncChannel::SyncContext::DispatchMessages() { | 385 void SyncChannel::SyncContext::DispatchMessages() { |
391 received_sync_msgs_->DispatchMessages(this); | 386 received_sync_msgs_->DispatchMessages(this); |
392 } | 387 } |
393 | 388 |
394 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 389 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
395 base::AutoLock auto_lock(deserializers_lock_); | 390 base::AutoLock auto_lock(deserializers_lock_); |
396 if (deserializers_.empty() || | 391 if (deserializers_.empty() || |
397 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 392 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
398 return false; | 393 return false; |
399 } | 394 } |
400 | 395 |
401 if (!msg->is_reply_error()) { | 396 if (!msg->is_reply_error()) { |
402 bool send_result = deserializers_.back().deserializer-> | 397 bool send_result = deserializers_.back().deserializer-> |
403 SerializeOutputParameters(*msg); | 398 SerializeOutputParameters(*msg); |
404 deserializers_.back().send_result = send_result; | 399 deserializers_.back().send_result = send_result; |
405 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; | 400 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; |
406 } else { | 401 } else { |
407 DVLOG(1) << "Received error reply"; | 402 DVLOG(1) << "Received error reply"; |
408 } | 403 } |
409 | 404 |
410 MojoEvent* done_event = deserializers_.back().done_event; | 405 base::WaitableEvent* done_event = deserializers_.back().done_event; |
411 TRACE_EVENT_FLOW_BEGIN0( | 406 TRACE_EVENT_FLOW_BEGIN0( |
412 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 407 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
413 "SyncChannel::SyncContext::TryToUnblockListener", done_event); | 408 "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
414 | 409 |
415 done_event->Signal(); | 410 done_event->Signal(); |
416 | 411 |
417 return true; | 412 return true; |
418 } | 413 } |
419 | 414 |
420 void SyncChannel::SyncContext::Clear() { | 415 void SyncChannel::SyncContext::Clear() { |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
519 WaitableEvent* shutdown_event) { | 514 WaitableEvent* shutdown_event) { |
520 return base::WrapUnique( | 515 return base::WrapUnique( |
521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 516 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
522 } | 517 } |
523 | 518 |
524 SyncChannel::SyncChannel( | 519 SyncChannel::SyncChannel( |
525 Listener* listener, | 520 Listener* listener, |
526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 521 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
527 WaitableEvent* shutdown_event) | 522 WaitableEvent* shutdown_event) |
528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), | 523 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), | 524 sync_handle_registry_(mojo::SyncHandleRegistry::current()) { |
530 dispatch_watcher_(FROM_HERE, | |
531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { | |
532 // The current (listener) thread must be distinct from the IPC thread, or else | 525 // The current (listener) thread must be distinct from the IPC thread, or else |
533 // sending synchronous messages will deadlock. | 526 // sending synchronous messages will deadlock. |
534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 527 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
535 StartWatching(); | 528 StartWatching(); |
536 } | 529 } |
537 | 530 |
538 SyncChannel::~SyncChannel() { | 531 SyncChannel::~SyncChannel() { |
539 } | 532 } |
540 | 533 |
541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 534 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
589 "SyncChannel::Send", context->GetSendDoneEvent()); | 582 "SyncChannel::Send", context->GetSendDoneEvent()); |
590 | 583 |
591 return context->Pop(); | 584 return context->Pop(); |
592 } | 585 } |
593 | 586 |
594 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, | 587 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
595 SyncContext* context, | 588 SyncContext* context, |
596 bool pump_messages) { | 589 bool pump_messages) { |
597 context->DispatchMessages(); | 590 context->DispatchMessages(); |
598 | 591 |
599 const MojoEvent* pump_messages_event = nullptr; | 592 base::WaitableEvent* pump_messages_event = nullptr; |
600 if (pump_messages) | 593 if (pump_messages) { |
601 pump_messages_event = g_pump_messages_event.Get().event(); | 594 if (!g_pump_messages_event.Get()) { |
595 g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>( | |
596 base::WaitableEvent::ResetPolicy::MANUAL, | |
597 base::WaitableEvent::InitialState::SIGNALED); | |
598 } | |
599 pump_messages_event = g_pump_messages_event.Get().get(); | |
600 } | |
602 | 601 |
603 while (true) { | 602 while (true) { |
604 bool dispatch = false; | 603 bool dispatch = false; |
605 bool send_done = false; | 604 bool send_done = false; |
606 bool should_pump_messages = false; | 605 bool should_pump_messages = false; |
607 bool error = false; | 606 bool registered = registry->RegisterEvent( |
608 bool registered = registry->RegisterHandle( | 607 context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done)); |
609 context->GetSendDoneEvent()->GetHandle(), | |
610 MOJO_HANDLE_SIGNAL_READABLE, | |
611 base::Bind(&OnSyncHandleReady, &send_done, &error)); | |
612 DCHECK(registered); | 608 DCHECK(registered); |
609 | |
613 if (pump_messages_event) { | 610 if (pump_messages_event) { |
614 registered = registry->RegisterHandle( | 611 registered = registry->RegisterEvent( |
615 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | 612 pump_messages_event, |
616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | 613 base::Bind(&OnEventReady, &should_pump_messages)); |
617 DCHECK(registered); | 614 DCHECK(registered); |
618 } | 615 } |
619 | 616 |
620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | 617 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
621 context->received_sync_msgs()->BlockDispatch(&dispatch); | 618 context->received_sync_msgs()->BlockDispatch(&dispatch); |
622 registry->WatchAllHandles(stop_flags, 3); | 619 registry->WatchAllHandles(stop_flags, 3); |
623 context->received_sync_msgs()->UnblockDispatch(); | 620 context->received_sync_msgs()->UnblockDispatch(); |
624 DCHECK(!error); | |
625 | 621 |
626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | 622 registry->UnregisterEvent(context->GetSendDoneEvent()); |
627 if (pump_messages_event) | 623 if (pump_messages_event) |
628 registry->UnregisterHandle(pump_messages_event->GetHandle()); | 624 registry->UnregisterEvent(pump_messages_event); |
629 | 625 |
630 if (dispatch) { | 626 if (dispatch) { |
631 // We're waiting for a reply, but we received a blocking synchronous call. | 627 // We're waiting for a reply, but we received a blocking synchronous call. |
632 // We must process it to avoid potential deadlocks. | 628 // We must process it to avoid potential deadlocks. |
633 context->GetDispatchEvent()->Reset(); | 629 context->GetDispatchEvent()->Reset(); |
634 context->DispatchMessages(); | 630 context->DispatchMessages(); |
635 continue; | 631 continue; |
636 } | 632 } |
637 | 633 |
638 if (should_pump_messages) | 634 if (should_pump_messages) |
639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 635 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
640 | 636 |
641 break; | 637 break; |
642 } | 638 } |
643 } | 639 } |
644 | 640 |
645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 641 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
646 mojo::SimpleWatcher send_done_watcher( | 642 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); | 643 DCHECK(sync_msg_queue); |
648 | 644 |
649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 645 base::RunLoop nested_loop; |
650 DCHECK_NE(sync_msg_queue, nullptr); | |
651 | 646 |
652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); | 647 // WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we can nest |
653 mojo::Handle old_handle(mojo::kInvalidHandleValue); | 648 // waiting message loops arbitrarily deep on the SyncChannel's thread. Every |
654 mojo::SimpleWatcher::ReadyCallback old_callback; | 649 // such operation has a corresponding WaitableEvent to be watched which, when |
650 // signalled for IPC completion, breaks out of the loop. The innermost (i.e. | |
651 // topmost) such event is stored in |sync_msg_queue| state. | |
652 // | |
653 // Here we preserve the current top-of-stack event state (if any) within the | |
654 // local stack frame. We then replace the event state in |sync_msg_queue| with | |
655 // our own and run a nested loop. If a subsequent nested loop is started | |
656 // therein the process is repeated in that stack frame, and so on. | |
657 // | |
658 // Once the innermost nested loop is broken, the locally preserved event state | |
659 // is swapped back into |sync_msg_queue| before unwinding the stack. | |
660 base::WaitableEventWatcher* outer_watcher = nullptr; | |
yzshen1
2017/03/23 20:15:50
Does it make sense to group these three things int
Ken Rockot(use gerrit already)
2017/03/23 22:04:20
Sure, that sounds nice. In fact I've gone and move
| |
661 base::WaitableEvent* outer_event = nullptr; | |
662 base::WaitableEventWatcher::EventCallback outer_callback; | |
663 base::WaitableEventWatcher send_done_watcher; | |
664 base::WaitableEvent* event = context->GetSendDoneEvent(); | |
665 const base::WaitableEventWatcher::EventCallback callback = | |
666 base::Bind(&SyncContext::OnSendDoneEventSignaled, context, &nested_loop); | |
667 sync_msg_queue->SetTopSendDoneState(&send_done_watcher, event, callback, | |
668 &outer_watcher, &outer_event, | |
669 &outer_callback); | |
670 if (outer_watcher) | |
671 outer_watcher->StopWatching(); | |
672 send_done_watcher.StartWatching(event, callback); | |
655 | 673 |
656 // Maintain a thread-local stack of watchers to ensure nested calls complete | 674 base::MessageLoop::ScopedNestableTaskAllower allow( |
657 // in the correct sequence, i.e. the outermost call completes first, etc. | 675 base::MessageLoop::current()); |
658 if (old_watcher) { | 676 nested_loop.Run(); |
659 old_callback = old_watcher->ready_callback(); | |
660 old_handle = old_watcher->handle(); | |
661 old_watcher->Cancel(); | |
662 } | |
663 | 677 |
664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 678 sync_msg_queue->SetTopSendDoneState( |
665 | 679 outer_watcher, outer_event, outer_callback, nullptr, nullptr, nullptr); |
666 { | 680 if (outer_watcher) |
667 base::RunLoop nested_loop; | 681 outer_watcher->StartWatching(outer_event, outer_callback); |
668 send_done_watcher.Watch( | |
669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); | |
671 | |
672 base::MessageLoop::ScopedNestableTaskAllower allow( | |
673 base::MessageLoop::current()); | |
674 nested_loop.Run(); | |
675 send_done_watcher.Cancel(); | |
676 } | |
677 | |
678 sync_msg_queue->set_top_send_done_watcher(old_watcher); | |
679 if (old_watcher) | |
680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); | |
681 } | 682 } |
682 | 683 |
683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 684 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) { |
684 DCHECK_EQ(result, MOJO_RESULT_OK); | 685 DCHECK_EQ(sync_context()->GetDispatchEvent(), event); |
685 sync_context()->GetDispatchEvent()->Reset(); | 686 sync_context()->GetDispatchEvent()->Reset(); |
687 | |
688 StartWatching(); | |
689 | |
690 // NOTE: May delete |this|. | |
686 sync_context()->DispatchMessages(); | 691 sync_context()->DispatchMessages(); |
687 } | 692 } |
688 | 693 |
689 void SyncChannel::StartWatching() { | 694 void SyncChannel::StartWatching() { |
690 // |dispatch_watcher_| watches the event asynchronously, only dispatching | 695 // |dispatch_watcher_| watches the event asynchronously, only dispatching |
691 // messages once the listener thread is unblocked and pumping its task queue. | 696 // messages once the listener thread is unblocked and pumping its task queue. |
692 // The ReceivedSyncMsgQueue also watches this event and may dispatch | 697 // The ReceivedSyncMsgQueue also watches this event and may dispatch |
693 // immediately if woken up by a message which it's allowed to dispatch. | 698 // immediately if woken up by a message which it's allowed to dispatch. |
694 dispatch_watcher_.Watch( | 699 dispatch_watcher_.StartWatching( |
695 sync_context()->GetDispatchEvent()->GetHandle(), | 700 sync_context()->GetDispatchEvent(), |
696 MOJO_HANDLE_SIGNAL_READABLE, | 701 base::Bind(&SyncChannel::OnDispatchEventSignaled, |
697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); | 702 base::Unretained(this))); |
698 } | 703 } |
699 | 704 |
700 void SyncChannel::OnChannelInit() { | 705 void SyncChannel::OnChannelInit() { |
701 pre_init_sync_message_filters_.clear(); | 706 pre_init_sync_message_filters_.clear(); |
702 } | 707 } |
703 | 708 |
704 } // namespace IPC | 709 } // namespace IPC |
OLD | NEW |