| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/bind.h" | 12 #include "base/bind.h" |
| 13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
| 14 #include "base/location.h" | 14 #include "base/location.h" |
| 15 #include "base/logging.h" | 15 #include "base/logging.h" |
| 16 #include "base/macros.h" | 16 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
| 18 #include "base/run_loop.h" | 18 #include "base/run_loop.h" |
| 19 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
| 20 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
| 21 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
| 22 #include "base/trace_event/trace_event.h" | 22 #include "base/trace_event/trace_event.h" |
| 23 #include "ipc/ipc_channel_factory.h" | 23 #include "ipc/ipc_channel_factory.h" |
| 24 #include "ipc/ipc_logging.h" | 24 #include "ipc/ipc_logging.h" |
| 25 #include "ipc/ipc_message_macros.h" | 25 #include "ipc/ipc_message_macros.h" |
| 26 #include "ipc/ipc_sync_message.h" | 26 #include "ipc/ipc_sync_message.h" |
| 27 #include "ipc/mojo_event.h" | 27 #include "mojo/public/cpp/bindings/sync_event_watcher.h" |
| 28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | |
| 29 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | |
| 30 | 28 |
| 31 using base::WaitableEvent; | 29 using base::WaitableEvent; |
| 32 | 30 |
| 33 namespace IPC { | 31 namespace IPC { |
| 34 | 32 |
| 35 namespace { | 33 namespace { |
| 36 | 34 |
| 37 // A generic callback used when watching handles synchronously. Sets |*signal| | 35 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 38 // to true. Also sets |*error| to true in case of an error. | 36 // to true. |
| 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 37 void OnEventReady(bool* signal) { |
| 40 *signal = true; | 38 *signal = true; |
| 41 *error = result != MOJO_RESULT_OK; | |
| 42 } | 39 } |
| 43 | 40 |
| 44 // A ReadyCallback for use with mojo::SimpleWatcher. Ignores the result | 41 base::LazyInstance<std::unique_ptr<base::WaitableEvent>>::Leaky |
| 45 // (DCHECKs, but is only used in cases where failure should be impossible) and | 42 g_pump_messages_event = LAZY_INSTANCE_INITIALIZER; |
| 46 // runs |callback|. | |
| 47 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { | |
| 48 DCHECK_EQ(result, MOJO_RESULT_OK); | |
| 49 callback.Run(); | |
| 50 } | |
| 51 | |
| 52 class PumpMessagesEvent { | |
| 53 public: | |
| 54 PumpMessagesEvent() { event_.Signal(); } | |
| 55 ~PumpMessagesEvent() {} | |
| 56 | |
| 57 const MojoEvent* event() const { return &event_; } | |
| 58 | |
| 59 private: | |
| 60 MojoEvent event_; | |
| 61 | |
| 62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); | |
| 63 }; | |
| 64 | |
| 65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event = | |
| 66 LAZY_INSTANCE_INITIALIZER; | |
| 67 | 43 |
| 68 } // namespace | 44 } // namespace |
| 69 | 45 |
| 70 // When we're blocked in a Send(), we need to process incoming synchronous | 46 // When we're blocked in a Send(), we need to process incoming synchronous |
| 71 // messages right away because it could be blocking our reply (either | 47 // messages right away because it could be blocking our reply (either |
| 72 // directly from the same object we're calling, or indirectly through one or | 48 // directly from the same object we're calling, or indirectly through one or |
| 73 // more other channels). That means that in SyncContext's OnMessageReceived, | 49 // more other channels). That means that in SyncContext's OnMessageReceived, |
| 74 // we need to process sync message right away if we're blocked. However a | 50 // we need to process sync message right away if we're blocked. However a |
| 75 // simple check isn't sufficient, because the listener thread can be in the | 51 // simple check isn't sufficient, because the listener thread can be in the |
| 76 // process of calling Send. | 52 // process of calling Send. |
| 77 // To work around this, when SyncChannel filters a sync message, it sets | 53 // To work around this, when SyncChannel filters a sync message, it sets |
| 78 // an event that the listener thread waits on during its Send() call. This | 54 // an event that the listener thread waits on during its Send() call. This |
| 79 // allows us to dispatch incoming sync messages when blocked. The race | 55 // allows us to dispatch incoming sync messages when blocked. The race |
| 80 // condition is handled because if Send is in the process of being called, it | 56 // condition is handled because if Send is in the process of being called, it |
| 81 // will check the event. In case the listener thread isn't sending a message, | 57 // will check the event. In case the listener thread isn't sending a message, |
| 82 // we queue a task on the listener thread to dispatch the received messages. | 58 // we queue a task on the listener thread to dispatch the received messages. |
| 83 // The messages are stored in this queue object that's shared among all | 59 // The messages are stored in this queue object that's shared among all |
| 84 // SyncChannel objects on the same thread (since one object can receive a | 60 // SyncChannel objects on the same thread (since one object can receive a |
| 85 // sync message while another one is blocked). | 61 // sync message while another one is blocked). |
| 86 | 62 |
| 87 class SyncChannel::ReceivedSyncMsgQueue : | 63 class SyncChannel::ReceivedSyncMsgQueue : |
| 88 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { | 64 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { |
| 89 public: | 65 public: |
| 66 // SyncChannel::WaitForReplyWithNestedMessageLoop may be re-entered, i.e. we |
| 67 // may nest waiting message loops arbitrarily deep on the SyncChannel's |
| 68 // thread. Every such operation has a corresponding WaitableEvent to be |
| 69 // watched which, when signalled for IPC completion, breaks out of the loop. |
| 70 // A reference to the innermost (i.e. topmost) watcher is held in |
| 71 // |ReceivedSyncMsgQueue::top_send_done_event_watcher_|. |
| 72 // |
| 73 // NestedSendDoneWatcher provides a simple scoper which is used by |
| 74 // WaitForReplyWithNestedMessageLoop to begin watching a new local "send done" |
| 75 // event, preserving the previous topmost state on the local stack until the |
| 76 // new inner loop is broken. If yet another subsequent nested loop is started |
| 77 // therein the process is repeated again in the new inner stack frame, and so |
| 78 // on. |
| 79 // |
| 80 // When this object is destroyed on stack unwind, the previous topmost state |
| 81 // is swapped back into |ReceivedSyncMsgQueue::top_send_done_event_watcher_|, |
| 82 // and its watch is resumed immediately. |
| 83 class NestedSendDoneWatcher { |
| 84 public: |
| 85 NestedSendDoneWatcher(SyncChannel::SyncContext* context, |
| 86 base::RunLoop* run_loop) |
| 87 : sync_msg_queue_(context->received_sync_msgs()), |
| 88 outer_state_(sync_msg_queue_->top_send_done_event_watcher_), |
| 89 event_(context->GetSendDoneEvent()), |
| 90 callback_( |
| 91 base::Bind(&SyncChannel::SyncContext::OnSendDoneEventSignaled, |
| 92 context, |
| 93 run_loop)) { |
| 94 sync_msg_queue_->top_send_done_event_watcher_ = this; |
| 95 if (outer_state_) |
| 96 outer_state_->StopWatching(); |
| 97 StartWatching(); |
| 98 } |
| 99 |
| 100 ~NestedSendDoneWatcher() { |
| 101 sync_msg_queue_->top_send_done_event_watcher_ = outer_state_; |
| 102 if (outer_state_) |
| 103 outer_state_->StartWatching(); |
| 104 } |
| 105 |
| 106 private: |
| 107 void StartWatching() { watcher_.StartWatching(event_, callback_); } |
| 108 void StopWatching() { watcher_.StopWatching(); } |
| 109 |
| 110 ReceivedSyncMsgQueue* const sync_msg_queue_; |
| 111 NestedSendDoneWatcher* const outer_state_; |
| 112 |
| 113 base::WaitableEvent* const event_; |
| 114 const base::WaitableEventWatcher::EventCallback callback_; |
| 115 base::WaitableEventWatcher watcher_; |
| 116 |
| 117 DISALLOW_COPY_AND_ASSIGN(NestedSendDoneWatcher); |
| 118 }; |
| 119 |
| 90 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one | 120 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one |
| 91 // if necessary. Call RemoveContext on the same thread when done. | 121 // if necessary. Call RemoveContext on the same thread when done. |
| 92 static ReceivedSyncMsgQueue* AddContext() { | 122 static ReceivedSyncMsgQueue* AddContext() { |
| 93 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple | 123 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple |
| 94 // SyncChannel objects can block the same thread). | 124 // SyncChannel objects can block the same thread). |
| 95 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); | 125 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); |
| 96 if (!rv) { | 126 if (!rv) { |
| 97 rv = new ReceivedSyncMsgQueue(); | 127 rv = new ReceivedSyncMsgQueue(); |
| 98 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); | 128 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); |
| 99 } | 129 } |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 201 } | 231 } |
| 202 } | 232 } |
| 203 | 233 |
| 204 if (--listener_count_ == 0) { | 234 if (--listener_count_ == 0) { |
| 205 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 235 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
| 206 lazy_tls_ptr_.Pointer()->Set(nullptr); | 236 lazy_tls_ptr_.Pointer()->Set(nullptr); |
| 207 sync_dispatch_watcher_.reset(); | 237 sync_dispatch_watcher_.reset(); |
| 208 } | 238 } |
| 209 } | 239 } |
| 210 | 240 |
| 211 MojoEvent* dispatch_event() { return &dispatch_event_; } | 241 base::WaitableEvent* dispatch_event() { return &dispatch_event_; } |
| 212 base::SingleThreadTaskRunner* listener_task_runner() { | 242 base::SingleThreadTaskRunner* listener_task_runner() { |
| 213 return listener_task_runner_.get(); | 243 return listener_task_runner_.get(); |
| 214 } | 244 } |
| 215 | 245 |
| 216 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 246 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
| 217 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: | 247 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue>>:: |
| 218 DestructorAtExit lazy_tls_ptr_; | 248 DestructorAtExit lazy_tls_ptr_; |
| 219 | 249 |
| 220 // Called on the ipc thread to check if we can unblock any current Send() | 250 // Called on the ipc thread to check if we can unblock any current Send() |
| 221 // calls based on a queued reply. | 251 // calls based on a queued reply. |
| 222 void DispatchReplies() { | 252 void DispatchReplies() { |
| 223 for (size_t i = 0; i < received_replies_.size(); ++i) { | 253 for (size_t i = 0; i < received_replies_.size(); ++i) { |
| 224 Message* message = received_replies_[i].message; | 254 Message* message = received_replies_[i].message; |
| 225 if (received_replies_[i].context->TryToUnblockListener(message)) { | 255 if (received_replies_[i].context->TryToUnblockListener(message)) { |
| 226 delete message; | 256 delete message; |
| 227 received_replies_.erase(received_replies_.begin() + i); | 257 received_replies_.erase(received_replies_.begin() + i); |
| 228 return; | 258 return; |
| 229 } | 259 } |
| 230 } | 260 } |
| 231 } | 261 } |
| 232 | 262 |
| 233 mojo::SimpleWatcher* top_send_done_watcher() { | |
| 234 return top_send_done_watcher_; | |
| 235 } | |
| 236 | |
| 237 void set_top_send_done_watcher(mojo::SimpleWatcher* watcher) { | |
| 238 top_send_done_watcher_ = watcher; | |
| 239 } | |
| 240 | |
| 241 private: | 263 private: |
| 242 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 264 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
| 243 | 265 |
| 244 // See the comment in SyncChannel::SyncChannel for why this event is created | 266 // See the comment in SyncChannel::SyncChannel for why this event is created |
| 245 // as manual reset. | 267 // as manual reset. |
| 246 ReceivedSyncMsgQueue() | 268 ReceivedSyncMsgQueue() |
| 247 : message_queue_version_(0), | 269 : message_queue_version_(0), |
| 270 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| 271 base::WaitableEvent::InitialState::NOT_SIGNALED), |
| 248 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 272 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 249 task_pending_(false), | 273 sync_dispatch_watcher_(base::MakeUnique<mojo::SyncEventWatcher>( |
| 250 listener_count_(0), | 274 &dispatch_event_, |
| 251 top_send_done_watcher_(nullptr) { | 275 base::Bind(&ReceivedSyncMsgQueue::OnDispatchEventReady, |
| 252 sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( | 276 base::Unretained(this)))) { |
| 253 dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
| 254 base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, | |
| 255 base::Unretained(this)))); | |
| 256 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 277 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 257 } | 278 } |
| 258 | 279 |
| 259 ~ReceivedSyncMsgQueue() {} | 280 ~ReceivedSyncMsgQueue() {} |
| 260 | 281 |
| 261 void OnDispatchHandleReady(MojoResult result) { | 282 void OnDispatchEventReady() { |
| 262 if (result != MOJO_RESULT_OK) | |
| 263 return; | |
| 264 | |
| 265 if (dispatch_flag_) { | 283 if (dispatch_flag_) { |
| 266 *dispatch_flag_ = true; | 284 *dispatch_flag_ = true; |
| 267 return; | 285 return; |
| 268 } | 286 } |
| 269 | 287 |
| 270 // We were woken up during a sync wait, but no specific SyncChannel is | 288 // We were woken up during a sync wait, but no specific SyncChannel is |
| 271 // currently waiting. i.e., some other Mojo interface on this thread is | 289 // currently waiting. i.e., some other Mojo interface on this thread is |
| 272 // waiting for a response. Since we don't support anything analogous to | 290 // waiting for a response. Since we don't support anything analogous to |
| 273 // restricted dispatch on Mojo interfaces, in this case it's safe to | 291 // restricted dispatch on Mojo interfaces, in this case it's safe to |
| 274 // dispatch sync messages for any context. | 292 // dispatch sync messages for any context. |
| 275 DispatchMessages(nullptr); | 293 DispatchMessages(nullptr); |
| 276 } | 294 } |
| 277 | 295 |
| 278 // Holds information about a queued synchronous message or reply. | 296 // Holds information about a queued synchronous message or reply. |
| 279 struct QueuedMessage { | 297 struct QueuedMessage { |
| 280 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 298 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
| 281 Message* message; | 299 Message* message; |
| 282 scoped_refptr<SyncChannel::SyncContext> context; | 300 scoped_refptr<SyncChannel::SyncContext> context; |
| 283 }; | 301 }; |
| 284 | 302 |
| 285 typedef std::list<QueuedMessage> SyncMessageQueue; | 303 typedef std::list<QueuedMessage> SyncMessageQueue; |
| 286 SyncMessageQueue message_queue_; | 304 SyncMessageQueue message_queue_; |
| 287 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 305 |
| 306 // Used to signal DispatchMessages to rescan |
| 307 uint32_t message_queue_version_ = 0; |
| 288 | 308 |
| 289 std::vector<QueuedMessage> received_replies_; | 309 std::vector<QueuedMessage> received_replies_; |
| 290 | 310 |
| 291 // Signaled when we get a synchronous message that we must respond to, as the | 311 // Signaled when we get a synchronous message that we must respond to, as the |
| 292 // sender needs its reply before it can reply to our original synchronous | 312 // sender needs its reply before it can reply to our original synchronous |
| 293 // message. | 313 // message. |
| 294 MojoEvent dispatch_event_; | 314 base::WaitableEvent dispatch_event_; |
| 295 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 315 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
| 296 base::Lock message_lock_; | 316 base::Lock message_lock_; |
| 297 bool task_pending_; | 317 bool task_pending_ = false; |
| 298 int listener_count_; | 318 int listener_count_ = 0; |
| 299 | 319 |
| 300 // The current send done handle watcher for this thread. Used to maintain | 320 // The current NestedSendDoneWatcher for this thread, if we're currently |
| 301 // a thread-local stack of send done watchers to ensure that nested sync | 321 // in a SyncChannel::WaitForReplyWithNestedMessageLoop. See |
| 302 // message loops complete correctly. | 322 // NestedSendDoneWatcher comments for more details. |
| 303 mojo::SimpleWatcher* top_send_done_watcher_; | 323 NestedSendDoneWatcher* top_send_done_event_watcher_ = nullptr; |
| 304 | 324 |
| 305 // If not null, the address of a flag to set when the dispatch event signals, | 325 // If not null, the address of a flag to set when the dispatch event signals, |
| 306 // in lieu of actually dispatching messages. This is used by | 326 // in lieu of actually dispatching messages. This is used by |
| 307 // SyncChannel::WaitForReply to restrict the scope of queued messages we're | 327 // SyncChannel::WaitForReply to restrict the scope of queued messages we're |
| 308 // allowed to process while it's waiting. | 328 // allowed to process while it's waiting. |
| 309 bool* dispatch_flag_ = nullptr; | 329 bool* dispatch_flag_ = nullptr; |
| 310 | 330 |
| 311 // Watches |dispatch_event_| during all sync handle watches on this thread. | 331 // Watches |dispatch_event_| during all sync handle watches on this thread. |
| 312 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; | 332 std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_; |
| 313 }; | 333 }; |
| 314 | 334 |
| 315 base::LazyInstance<base::ThreadLocalPointer< | 335 base::LazyInstance<base::ThreadLocalPointer< |
| 316 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit | 336 SyncChannel::ReceivedSyncMsgQueue>>::DestructorAtExit |
| 317 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 337 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
| 318 LAZY_INSTANCE_INITIALIZER; | 338 LAZY_INSTANCE_INITIALIZER; |
| 319 | 339 |
| 320 SyncChannel::SyncContext::SyncContext( | 340 SyncChannel::SyncContext::SyncContext( |
| 321 Listener* listener, | 341 Listener* listener, |
| 322 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 342 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| 323 WaitableEvent* shutdown_event) | 343 WaitableEvent* shutdown_event) |
| 324 : ChannelProxy::Context(listener, ipc_task_runner), | 344 : ChannelProxy::Context(listener, ipc_task_runner), |
| 325 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 345 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
| 326 shutdown_event_(shutdown_event), | 346 shutdown_event_(shutdown_event), |
| 327 restrict_dispatch_group_(kRestrictDispatchGroup_None) { | 347 restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
| 328 } | 348 } |
| 329 | 349 |
| 350 void SyncChannel::SyncContext::OnSendDoneEventSignaled( |
| 351 base::RunLoop* nested_loop, |
| 352 base::WaitableEvent* event) { |
| 353 DCHECK_EQ(GetSendDoneEvent(), event); |
| 354 nested_loop->Quit(); |
| 355 } |
| 356 |
| 330 SyncChannel::SyncContext::~SyncContext() { | 357 SyncChannel::SyncContext::~SyncContext() { |
| 331 while (!deserializers_.empty()) | 358 while (!deserializers_.empty()) |
| 332 Pop(); | 359 Pop(); |
| 333 } | 360 } |
| 334 | 361 |
| 335 // Adds information about an outgoing sync message to the context so that | 362 // Adds information about an outgoing sync message to the context so that |
| 336 // we know how to deserialize the reply. Returns |true| if the message was added | 363 // we know how to deserialize the reply. Returns |true| if the message was added |
| 337 // to the context or |false| if it was rejected (e.g. due to shutdown.) | 364 // to the context or |false| if it was rejected (e.g. due to shutdown.) |
| 338 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { | 365 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
| 339 // Create the tracking information for this message. This object is stored | 366 // Create the tracking information for this message. This object is stored |
| 340 // by value since all members are pointers that are cheap to copy. These | 367 // by value since all members are pointers that are cheap to copy. These |
| 341 // pointers are cleaned up in the Pop() function. | 368 // pointers are cleaned up in the Pop() function. |
| 342 // | 369 // |
| 343 // The event is created as manual reset because in between Signal and | 370 // The event is created as manual reset because in between Signal and |
| 344 // OnObjectSignalled, another Send can happen which would stop the watcher | 371 // OnObjectSignalled, another Send can happen which would stop the watcher |
| 345 // from being called. The event would get watched later, when the nested | 372 // from being called. The event would get watched later, when the nested |
| 346 // Send completes, so the event will need to remain set. | 373 // Send completes, so the event will need to remain set. |
| 347 base::AutoLock auto_lock(deserializers_lock_); | 374 base::AutoLock auto_lock(deserializers_lock_); |
| 348 if (reject_new_deserializers_) | 375 if (reject_new_deserializers_) |
| 349 return false; | 376 return false; |
| 350 PendingSyncMsg pending( | 377 PendingSyncMsg pending( |
| 351 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), | 378 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
| 352 new MojoEvent); | 379 new base::WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, |
| 380 base::WaitableEvent::InitialState::NOT_SIGNALED)); |
| 353 deserializers_.push_back(pending); | 381 deserializers_.push_back(pending); |
| 354 return true; | 382 return true; |
| 355 } | 383 } |
| 356 | 384 |
| 357 bool SyncChannel::SyncContext::Pop() { | 385 bool SyncChannel::SyncContext::Pop() { |
| 358 bool result; | 386 bool result; |
| 359 { | 387 { |
| 360 base::AutoLock auto_lock(deserializers_lock_); | 388 base::AutoLock auto_lock(deserializers_lock_); |
| 361 PendingSyncMsg msg = deserializers_.back(); | 389 PendingSyncMsg msg = deserializers_.back(); |
| 362 delete msg.deserializer; | 390 delete msg.deserializer; |
| 363 delete msg.done_event; | 391 delete msg.done_event; |
| 364 msg.done_event = nullptr; | 392 msg.done_event = nullptr; |
| 365 deserializers_.pop_back(); | 393 deserializers_.pop_back(); |
| 366 result = msg.send_result; | 394 result = msg.send_result; |
| 367 } | 395 } |
| 368 | 396 |
| 369 // We got a reply to a synchronous Send() call that's blocking the listener | 397 // We got a reply to a synchronous Send() call that's blocking the listener |
| 370 // thread. However, further down the call stack there could be another | 398 // thread. However, further down the call stack there could be another |
| 371 // blocking Send() call, whose reply we received after we made this last | 399 // blocking Send() call, whose reply we received after we made this last |
| 372 // Send() call. So check if we have any queued replies available that | 400 // Send() call. So check if we have any queued replies available that |
| 373 // can now unblock the listener thread. | 401 // can now unblock the listener thread. |
| 374 ipc_task_runner()->PostTask( | 402 ipc_task_runner()->PostTask( |
| 375 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 403 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
| 376 received_sync_msgs_)); | 404 received_sync_msgs_)); |
| 377 | 405 |
| 378 return result; | 406 return result; |
| 379 } | 407 } |
| 380 | 408 |
| 381 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 409 base::WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| 382 base::AutoLock auto_lock(deserializers_lock_); | 410 base::AutoLock auto_lock(deserializers_lock_); |
| 383 return deserializers_.back().done_event; | 411 return deserializers_.back().done_event; |
| 384 } | 412 } |
| 385 | 413 |
| 386 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 414 base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| 387 return received_sync_msgs_->dispatch_event(); | 415 return received_sync_msgs_->dispatch_event(); |
| 388 } | 416 } |
| 389 | 417 |
| 390 void SyncChannel::SyncContext::DispatchMessages() { | 418 void SyncChannel::SyncContext::DispatchMessages() { |
| 391 received_sync_msgs_->DispatchMessages(this); | 419 received_sync_msgs_->DispatchMessages(this); |
| 392 } | 420 } |
| 393 | 421 |
| 394 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 422 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
| 395 base::AutoLock auto_lock(deserializers_lock_); | 423 base::AutoLock auto_lock(deserializers_lock_); |
| 396 if (deserializers_.empty() || | 424 if (deserializers_.empty() || |
| 397 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 425 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
| 398 return false; | 426 return false; |
| 399 } | 427 } |
| 400 | 428 |
| 401 if (!msg->is_reply_error()) { | 429 if (!msg->is_reply_error()) { |
| 402 bool send_result = deserializers_.back().deserializer-> | 430 bool send_result = deserializers_.back().deserializer-> |
| 403 SerializeOutputParameters(*msg); | 431 SerializeOutputParameters(*msg); |
| 404 deserializers_.back().send_result = send_result; | 432 deserializers_.back().send_result = send_result; |
| 405 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; | 433 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; |
| 406 } else { | 434 } else { |
| 407 DVLOG(1) << "Received error reply"; | 435 DVLOG(1) << "Received error reply"; |
| 408 } | 436 } |
| 409 | 437 |
| 410 MojoEvent* done_event = deserializers_.back().done_event; | 438 base::WaitableEvent* done_event = deserializers_.back().done_event; |
| 411 TRACE_EVENT_FLOW_BEGIN0( | 439 TRACE_EVENT_FLOW_BEGIN0( |
| 412 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 440 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 413 "SyncChannel::SyncContext::TryToUnblockListener", done_event); | 441 "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
| 414 | 442 |
| 415 done_event->Signal(); | 443 done_event->Signal(); |
| 416 | 444 |
| 417 return true; | 445 return true; |
| 418 } | 446 } |
| 419 | 447 |
| 420 void SyncChannel::SyncContext::Clear() { | 448 void SyncChannel::SyncContext::Clear() { |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 519 WaitableEvent* shutdown_event) { | 547 WaitableEvent* shutdown_event) { |
| 520 return base::WrapUnique( | 548 return base::WrapUnique( |
| 521 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 549 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
| 522 } | 550 } |
| 523 | 551 |
| 524 SyncChannel::SyncChannel( | 552 SyncChannel::SyncChannel( |
| 525 Listener* listener, | 553 Listener* listener, |
| 526 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 554 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| 527 WaitableEvent* shutdown_event) | 555 WaitableEvent* shutdown_event) |
| 528 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), | 556 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
| 529 sync_handle_registry_(mojo::SyncHandleRegistry::current()), | 557 sync_handle_registry_(mojo::SyncHandleRegistry::current()) { |
| 530 dispatch_watcher_(FROM_HERE, | |
| 531 mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) { | |
| 532 // The current (listener) thread must be distinct from the IPC thread, or else | 558 // The current (listener) thread must be distinct from the IPC thread, or else |
| 533 // sending synchronous messages will deadlock. | 559 // sending synchronous messages will deadlock. |
| 534 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 560 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
| 535 StartWatching(); | 561 StartWatching(); |
| 536 } | 562 } |
| 537 | 563 |
| 538 SyncChannel::~SyncChannel() { | 564 SyncChannel::~SyncChannel() { |
| 539 } | 565 } |
| 540 | 566 |
| 541 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 567 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 589 "SyncChannel::Send", context->GetSendDoneEvent()); | 615 "SyncChannel::Send", context->GetSendDoneEvent()); |
| 590 | 616 |
| 591 return context->Pop(); | 617 return context->Pop(); |
| 592 } | 618 } |
| 593 | 619 |
| 594 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, | 620 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
| 595 SyncContext* context, | 621 SyncContext* context, |
| 596 bool pump_messages) { | 622 bool pump_messages) { |
| 597 context->DispatchMessages(); | 623 context->DispatchMessages(); |
| 598 | 624 |
| 599 const MojoEvent* pump_messages_event = nullptr; | 625 base::WaitableEvent* pump_messages_event = nullptr; |
| 600 if (pump_messages) | 626 if (pump_messages) { |
| 601 pump_messages_event = g_pump_messages_event.Get().event(); | 627 if (!g_pump_messages_event.Get()) { |
| 628 g_pump_messages_event.Get() = base::MakeUnique<base::WaitableEvent>( |
| 629 base::WaitableEvent::ResetPolicy::MANUAL, |
| 630 base::WaitableEvent::InitialState::SIGNALED); |
| 631 } |
| 632 pump_messages_event = g_pump_messages_event.Get().get(); |
| 633 } |
| 602 | 634 |
| 603 while (true) { | 635 while (true) { |
| 604 bool dispatch = false; | 636 bool dispatch = false; |
| 605 bool send_done = false; | 637 bool send_done = false; |
| 606 bool should_pump_messages = false; | 638 bool should_pump_messages = false; |
| 607 bool error = false; | 639 bool registered = registry->RegisterEvent( |
| 608 bool registered = registry->RegisterHandle( | 640 context->GetSendDoneEvent(), base::Bind(&OnEventReady, &send_done)); |
| 609 context->GetSendDoneEvent()->GetHandle(), | |
| 610 MOJO_HANDLE_SIGNAL_READABLE, | |
| 611 base::Bind(&OnSyncHandleReady, &send_done, &error)); | |
| 612 DCHECK(registered); | 641 DCHECK(registered); |
| 642 |
| 613 if (pump_messages_event) { | 643 if (pump_messages_event) { |
| 614 registered = registry->RegisterHandle( | 644 registered = registry->RegisterEvent( |
| 615 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | 645 pump_messages_event, |
| 616 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | 646 base::Bind(&OnEventReady, &should_pump_messages)); |
| 617 DCHECK(registered); | 647 DCHECK(registered); |
| 618 } | 648 } |
| 619 | 649 |
| 620 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | 650 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
| 621 context->received_sync_msgs()->BlockDispatch(&dispatch); | 651 context->received_sync_msgs()->BlockDispatch(&dispatch); |
| 622 registry->WatchAllHandles(stop_flags, 3); | 652 registry->Wait(stop_flags, 3); |
| 623 context->received_sync_msgs()->UnblockDispatch(); | 653 context->received_sync_msgs()->UnblockDispatch(); |
| 624 DCHECK(!error); | |
| 625 | 654 |
| 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | 655 registry->UnregisterEvent(context->GetSendDoneEvent()); |
| 627 if (pump_messages_event) | 656 if (pump_messages_event) |
| 628 registry->UnregisterHandle(pump_messages_event->GetHandle()); | 657 registry->UnregisterEvent(pump_messages_event); |
| 629 | 658 |
| 630 if (dispatch) { | 659 if (dispatch) { |
| 631 // We're waiting for a reply, but we received a blocking synchronous call. | 660 // We're waiting for a reply, but we received a blocking synchronous call. |
| 632 // We must process it to avoid potential deadlocks. | 661 // We must process it to avoid potential deadlocks. |
| 633 context->GetDispatchEvent()->Reset(); | 662 context->GetDispatchEvent()->Reset(); |
| 634 context->DispatchMessages(); | 663 context->DispatchMessages(); |
| 635 continue; | 664 continue; |
| 636 } | 665 } |
| 637 | 666 |
| 638 if (should_pump_messages) | 667 if (should_pump_messages) |
| 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 668 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
| 640 | 669 |
| 641 break; | 670 break; |
| 642 } | 671 } |
| 643 } | 672 } |
| 644 | 673 |
| 645 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 674 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
| 646 mojo::SimpleWatcher send_done_watcher( | 675 base::MessageLoop::ScopedNestableTaskAllower allow( |
| 647 FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC); | 676 base::MessageLoop::current()); |
| 648 | 677 base::RunLoop nested_loop; |
| 649 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 678 ReceivedSyncMsgQueue::NestedSendDoneWatcher watcher(context, &nested_loop); |
| 650 DCHECK_NE(sync_msg_queue, nullptr); | 679 nested_loop.Run(); |
| 651 | |
| 652 mojo::SimpleWatcher* old_watcher = sync_msg_queue->top_send_done_watcher(); | |
| 653 mojo::Handle old_handle(mojo::kInvalidHandleValue); | |
| 654 mojo::SimpleWatcher::ReadyCallback old_callback; | |
| 655 | |
| 656 // Maintain a thread-local stack of watchers to ensure nested calls complete | |
| 657 // in the correct sequence, i.e. the outermost call completes first, etc. | |
| 658 if (old_watcher) { | |
| 659 old_callback = old_watcher->ready_callback(); | |
| 660 old_handle = old_watcher->handle(); | |
| 661 old_watcher->Cancel(); | |
| 662 } | |
| 663 | |
| 664 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | |
| 665 | |
| 666 { | |
| 667 base::RunLoop nested_loop; | |
| 668 send_done_watcher.Watch( | |
| 669 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
| 670 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); | |
| 671 | |
| 672 base::MessageLoop::ScopedNestableTaskAllower allow( | |
| 673 base::MessageLoop::current()); | |
| 674 nested_loop.Run(); | |
| 675 send_done_watcher.Cancel(); | |
| 676 } | |
| 677 | |
| 678 sync_msg_queue->set_top_send_done_watcher(old_watcher); | |
| 679 if (old_watcher) | |
| 680 old_watcher->Watch(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); | |
| 681 } | 680 } |
| 682 | 681 |
| 683 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 682 void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) { |
| 684 DCHECK_EQ(result, MOJO_RESULT_OK); | 683 DCHECK_EQ(sync_context()->GetDispatchEvent(), event); |
| 685 sync_context()->GetDispatchEvent()->Reset(); | 684 sync_context()->GetDispatchEvent()->Reset(); |
| 685 |
| 686 StartWatching(); |
| 687 |
| 688 // NOTE: May delete |this|. |
| 686 sync_context()->DispatchMessages(); | 689 sync_context()->DispatchMessages(); |
| 687 } | 690 } |
| 688 | 691 |
| 689 void SyncChannel::StartWatching() { | 692 void SyncChannel::StartWatching() { |
| 690 // |dispatch_watcher_| watches the event asynchronously, only dispatching | 693 // |dispatch_watcher_| watches the event asynchronously, only dispatching |
| 691 // messages once the listener thread is unblocked and pumping its task queue. | 694 // messages once the listener thread is unblocked and pumping its task queue. |
| 692 // The ReceivedSyncMsgQueue also watches this event and may dispatch | 695 // The ReceivedSyncMsgQueue also watches this event and may dispatch |
| 693 // immediately if woken up by a message which it's allowed to dispatch. | 696 // immediately if woken up by a message which it's allowed to dispatch. |
| 694 dispatch_watcher_.Watch( | 697 dispatch_watcher_.StartWatching( |
| 695 sync_context()->GetDispatchEvent()->GetHandle(), | 698 sync_context()->GetDispatchEvent(), |
| 696 MOJO_HANDLE_SIGNAL_READABLE, | 699 base::Bind(&SyncChannel::OnDispatchEventSignaled, |
| 697 base::Bind(&SyncChannel::OnDispatchHandleReady, base::Unretained(this))); | 700 base::Unretained(this))); |
| 698 } | 701 } |
| 699 | 702 |
| 700 void SyncChannel::OnChannelInit() { | 703 void SyncChannel::OnChannelInit() { |
| 701 pre_init_sync_message_filters_.clear(); | 704 pre_init_sync_message_filters_.clear(); |
| 702 } | 705 } |
| 703 | 706 |
| 704 } // namespace IPC | 707 } // namespace IPC |
| OLD | NEW |