| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/bind.h" | 12 #include "base/bind.h" |
| 13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
| 14 #include "base/location.h" | 14 #include "base/location.h" |
| 15 #include "base/logging.h" | 15 #include "base/logging.h" |
| 16 #include "base/macros.h" |
| 16 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
| 18 #include "base/run_loop.h" |
| 17 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
| 18 #include "base/synchronization/waitable_event_watcher.h" | |
| 19 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
| 20 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
| 21 #include "base/trace_event/trace_event.h" | 22 #include "base/trace_event/trace_event.h" |
| 22 #include "ipc/ipc_channel_factory.h" | 23 #include "ipc/ipc_channel_factory.h" |
| 23 #include "ipc/ipc_logging.h" | 24 #include "ipc/ipc_logging.h" |
| 24 #include "ipc/ipc_message_macros.h" | 25 #include "ipc/ipc_message_macros.h" |
| 25 #include "ipc/ipc_sync_message.h" | 26 #include "ipc/ipc_sync_message.h" |
| 27 #include "ipc/mojo_event.h" |
| 28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
| 26 | 29 |
| 27 using base::TimeDelta; | 30 using base::TimeDelta; |
| 28 using base::TimeTicks; | 31 using base::TimeTicks; |
| 29 using base::WaitableEvent; | 32 using base::WaitableEvent; |
| 30 | 33 |
| 31 namespace IPC { | 34 namespace IPC { |
| 35 |
| 36 namespace { |
| 37 |
| 38 // A lazy thread-local Mojo Event which is always signaled. Used to wake up the |
| 39 // sync waiter when a SyncMessage requires the MessageLoop to be pumped while |
| 40 // waiting for a reply. This object is created lazily and ref-counted so it can |
| 41 // be cleaned up when no longer in use. |
| 42 class PumpMessagesEvent { |
| 43 public: |
| 44 // Acquires the event for this thread. Creates a new instance if necessary. |
| 45 static PumpMessagesEvent* Acquire() { |
| 46 PumpMessagesEvent* pump_messages_event = g_event_.Pointer()->Get(); |
| 47 if (!pump_messages_event) { |
| 48 pump_messages_event = new PumpMessagesEvent; |
| 49 pump_messages_event->event_.Signal(); |
| 50 g_event_.Pointer()->Set(pump_messages_event); |
| 51 } |
| 52 pump_messages_event->ref_count_++; |
| 53 return pump_messages_event; |
| 54 } |
| 55 |
| 56 // Releases a handle to this event. There must be a 1:1 correspondence between |
| 57 // calls to Acquire() and calls to Release(). |
| 58 static void Release() { |
| 59 PumpMessagesEvent* pump_messages_event = g_event_.Pointer()->Get(); |
| 60 DCHECK(pump_messages_event); |
| 61 DCHECK_GT(pump_messages_event->ref_count_, 0); |
| 62 pump_messages_event->ref_count_--; |
| 63 if (!pump_messages_event->ref_count_) { |
| 64 g_event_.Pointer()->Set(nullptr); |
| 65 delete pump_messages_event; |
| 66 } |
| 67 } |
| 68 |
| 69 const mojo::Handle& GetHandle() const { return event_.GetHandle(); } |
| 70 |
| 71 private: |
| 72 PumpMessagesEvent() {} |
| 73 ~PumpMessagesEvent() {} |
| 74 |
| 75 int ref_count_ = 0; |
| 76 MojoEvent event_; |
| 77 |
| 78 static base::LazyInstance<base::ThreadLocalPointer<PumpMessagesEvent>> |
| 79 g_event_; |
| 80 |
| 81 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); |
| 82 }; |
| 83 |
| 84 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 85 // to true. Also sets |*error| to true in case of an error. |
| 86 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| 87 *signal = true; |
| 88 *error = result != MOJO_RESULT_OK; |
| 89 } |
| 90 |
| 91 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but |
| 92 // is only used in cases where failure should be impossible) and runs |
| 93 // |callback|. |
| 94 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
| 95 DCHECK_EQ(result, MOJO_RESULT_OK); |
| 96 callback.Run(); |
| 97 } |
| 98 |
| 99 } // namespace |
| 100 |
| 101 base::LazyInstance<base::ThreadLocalPointer<PumpMessagesEvent>> |
| 102 PumpMessagesEvent::g_event_ = LAZY_INSTANCE_INITIALIZER; |
| 103 |
| 32 // When we're blocked in a Send(), we need to process incoming synchronous | 104 // When we're blocked in a Send(), we need to process incoming synchronous |
| 33 // messages right away because it could be blocking our reply (either | 105 // messages right away because it could be blocking our reply (either |
| 34 // directly from the same object we're calling, or indirectly through one or | 106 // directly from the same object we're calling, or indirectly through one or |
| 35 // more other channels). That means that in SyncContext's OnMessageReceived, | 107 // more other channels). That means that in SyncContext's OnMessageReceived, |
| 36 // we need to process sync message right away if we're blocked. However a | 108 // we need to process sync message right away if we're blocked. However a |
| 37 // simple check isn't sufficient, because the listener thread can be in the | 109 // simple check isn't sufficient, because the listener thread can be in the |
| 38 // process of calling Send. | 110 // process of calling Send. |
| 39 // To work around this, when SyncChannel filters a sync message, it sets | 111 // To work around this, when SyncChannel filters a sync message, it sets |
| 40 // an event that the listener thread waits on during its Send() call. This | 112 // an event that the listener thread waits on during its Send() call. This |
| 41 // allows us to dispatch incoming sync messages when blocked. The race | 113 // allows us to dispatch incoming sync messages when blocked. The race |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 148 iter++; | 220 iter++; |
| 149 } | 221 } |
| 150 } | 222 } |
| 151 | 223 |
| 152 if (--listener_count_ == 0) { | 224 if (--listener_count_ == 0) { |
| 153 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 225 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
| 154 lazy_tls_ptr_.Pointer()->Set(NULL); | 226 lazy_tls_ptr_.Pointer()->Set(NULL); |
| 155 } | 227 } |
| 156 } | 228 } |
| 157 | 229 |
| 158 WaitableEvent* dispatch_event() { return &dispatch_event_; } | 230 MojoEvent* dispatch_event() { return &dispatch_event_; } |
| 159 base::SingleThreadTaskRunner* listener_task_runner() { | 231 base::SingleThreadTaskRunner* listener_task_runner() { |
| 160 return listener_task_runner_.get(); | 232 return listener_task_runner_.get(); |
| 161 } | 233 } |
| 162 | 234 |
| 163 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 235 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
| 164 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 236 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
| 165 lazy_tls_ptr_; | 237 lazy_tls_ptr_; |
| 166 | 238 |
| 167 // Called on the ipc thread to check if we can unblock any current Send() | 239 // Called on the ipc thread to check if we can unblock any current Send() |
| 168 // calls based on a queued reply. | 240 // calls based on a queued reply. |
| 169 void DispatchReplies() { | 241 void DispatchReplies() { |
| 170 for (size_t i = 0; i < received_replies_.size(); ++i) { | 242 for (size_t i = 0; i < received_replies_.size(); ++i) { |
| 171 Message* message = received_replies_[i].message; | 243 Message* message = received_replies_[i].message; |
| 172 if (received_replies_[i].context->TryToUnblockListener(message)) { | 244 if (received_replies_[i].context->TryToUnblockListener(message)) { |
| 173 delete message; | 245 delete message; |
| 174 received_replies_.erase(received_replies_.begin() + i); | 246 received_replies_.erase(received_replies_.begin() + i); |
| 175 return; | 247 return; |
| 176 } | 248 } |
| 177 } | 249 } |
| 178 } | 250 } |
| 179 | 251 |
| 180 base::WaitableEventWatcher* top_send_done_watcher() { | 252 mojo::Watcher* top_send_done_watcher() { |
| 181 return top_send_done_watcher_; | 253 return top_send_done_watcher_; |
| 182 } | 254 } |
| 183 | 255 |
| 184 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { | 256 void set_top_send_done_watcher(mojo::Watcher* watcher) { |
| 185 top_send_done_watcher_ = watcher; | 257 top_send_done_watcher_ = watcher; |
| 186 } | 258 } |
| 187 | 259 |
| 188 private: | 260 private: |
| 189 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 261 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
| 190 | 262 |
| 191 // See the comment in SyncChannel::SyncChannel for why this event is created | 263 // See the comment in SyncChannel::SyncChannel for why this event is created |
| 192 // as manual reset. | 264 // as manual reset. |
| 193 ReceivedSyncMsgQueue() | 265 ReceivedSyncMsgQueue() |
| 194 : message_queue_version_(0), | 266 : message_queue_version_(0), |
| 195 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, | |
| 196 base::WaitableEvent::InitialState::NOT_SIGNALED), | |
| 197 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 267 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 198 task_pending_(false), | 268 task_pending_(false), |
| 199 listener_count_(0), | 269 listener_count_(0), |
| 200 top_send_done_watcher_(NULL) {} | 270 top_send_done_watcher_(NULL) {} |
| 201 | 271 |
| 202 ~ReceivedSyncMsgQueue() {} | 272 ~ReceivedSyncMsgQueue() {} |
| 203 | 273 |
| 204 // Holds information about a queued synchronous message or reply. | 274 // Holds information about a queued synchronous message or reply. |
| 205 struct QueuedMessage { | 275 struct QueuedMessage { |
| 206 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 276 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
| 207 Message* message; | 277 Message* message; |
| 208 scoped_refptr<SyncChannel::SyncContext> context; | 278 scoped_refptr<SyncChannel::SyncContext> context; |
| 209 }; | 279 }; |
| 210 | 280 |
| 211 typedef std::list<QueuedMessage> SyncMessageQueue; | 281 typedef std::list<QueuedMessage> SyncMessageQueue; |
| 212 SyncMessageQueue message_queue_; | 282 SyncMessageQueue message_queue_; |
| 213 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 283 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan |
| 214 | 284 |
| 215 std::vector<QueuedMessage> received_replies_; | 285 std::vector<QueuedMessage> received_replies_; |
| 216 | 286 |
| 217 // Set when we got a synchronous message that we must respond to as the | 287 // Signaled when we get a synchronous message that we must respond to, as the |
| 218 // sender needs its reply before it can reply to our original synchronous | 288 // sender needs its reply before it can reply to our original synchronous |
| 219 // message. | 289 // message. |
| 220 WaitableEvent dispatch_event_; | 290 MojoEvent dispatch_event_; |
| 221 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 291 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
| 222 base::Lock message_lock_; | 292 base::Lock message_lock_; |
| 223 bool task_pending_; | 293 bool task_pending_; |
| 224 int listener_count_; | 294 int listener_count_; |
| 225 | 295 |
| 226 // The current send done event watcher for this thread. Used to maintain | 296 // The current send done handle watcher for this thread. Used to maintain |
| 227 // a local global stack of send done watchers to ensure that nested sync | 297 // a thread-local stack of send done watchers to ensure that nested sync |
| 228 // message loops complete correctly. | 298 // message loops complete correctly. |
| 229 base::WaitableEventWatcher* top_send_done_watcher_; | 299 mojo::Watcher* top_send_done_watcher_; |
| 230 }; | 300 }; |
| 231 | 301 |
| 232 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 302 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| 233 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 303 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
| 234 LAZY_INSTANCE_INITIALIZER; | 304 LAZY_INSTANCE_INITIALIZER; |
| 235 | 305 |
| 236 SyncChannel::SyncContext::SyncContext( | 306 SyncChannel::SyncContext::SyncContext( |
| 237 Listener* listener, | 307 Listener* listener, |
| 238 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 308 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| 239 WaitableEvent* shutdown_event) | 309 WaitableEvent* shutdown_event) |
| (...skipping 15 matching lines...) Expand all Loading... |
| 255 // Create the tracking information for this message. This object is stored | 325 // Create the tracking information for this message. This object is stored |
| 256 // by value since all members are pointers that are cheap to copy. These | 326 // by value since all members are pointers that are cheap to copy. These |
| 257 // pointers are cleaned up in the Pop() function. | 327 // pointers are cleaned up in the Pop() function. |
| 258 // | 328 // |
| 259 // The event is created as manual reset because in between Signal and | 329 // The event is created as manual reset because in between Signal and |
| 260 // OnObjectSignalled, another Send can happen which would stop the watcher | 330 // OnObjectSignalled, another Send can happen which would stop the watcher |
| 261 // from being called. The event would get watched later, when the nested | 331 // from being called. The event would get watched later, when the nested |
| 262 // Send completes, so the event will need to remain set. | 332 // Send completes, so the event will need to remain set. |
| 263 PendingSyncMsg pending( | 333 PendingSyncMsg pending( |
| 264 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), | 334 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
| 265 new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, | 335 new MojoEvent); |
| 266 base::WaitableEvent::InitialState::NOT_SIGNALED)); | |
| 267 base::AutoLock auto_lock(deserializers_lock_); | 336 base::AutoLock auto_lock(deserializers_lock_); |
| 268 deserializers_.push_back(pending); | 337 deserializers_.push_back(pending); |
| 269 } | 338 } |
| 270 | 339 |
| 271 bool SyncChannel::SyncContext::Pop() { | 340 bool SyncChannel::SyncContext::Pop() { |
| 272 bool result; | 341 bool result; |
| 273 { | 342 { |
| 274 base::AutoLock auto_lock(deserializers_lock_); | 343 base::AutoLock auto_lock(deserializers_lock_); |
| 275 PendingSyncMsg msg = deserializers_.back(); | 344 PendingSyncMsg msg = deserializers_.back(); |
| 276 delete msg.deserializer; | 345 delete msg.deserializer; |
| 277 delete msg.done_event; | 346 delete msg.done_event; |
| 278 msg.done_event = NULL; | 347 msg.done_event = nullptr; |
| 279 deserializers_.pop_back(); | 348 deserializers_.pop_back(); |
| 280 result = msg.send_result; | 349 result = msg.send_result; |
| 281 } | 350 } |
| 282 | 351 |
| 283 // We got a reply to a synchronous Send() call that's blocking the listener | 352 // We got a reply to a synchronous Send() call that's blocking the listener |
| 284 // thread. However, further down the call stack there could be another | 353 // thread. However, further down the call stack there could be another |
| 285 // blocking Send() call, whose reply we received after we made this last | 354 // blocking Send() call, whose reply we received after we made this last |
| 286 // Send() call. So check if we have any queued replies available that | 355 // Send() call. So check if we have any queued replies available that |
| 287 // can now unblock the listener thread. | 356 // can now unblock the listener thread. |
| 288 ipc_task_runner()->PostTask( | 357 ipc_task_runner()->PostTask( |
| 289 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 358 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
| 290 received_sync_msgs_.get())); | 359 received_sync_msgs_.get())); |
| 291 | 360 |
| 292 return result; | 361 return result; |
| 293 } | 362 } |
| 294 | 363 |
| 295 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 364 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| 296 base::AutoLock auto_lock(deserializers_lock_); | 365 base::AutoLock auto_lock(deserializers_lock_); |
| 297 return deserializers_.back().done_event; | 366 return deserializers_.back().done_event; |
| 298 } | 367 } |
| 299 | 368 |
| 300 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 369 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| 301 return received_sync_msgs_->dispatch_event(); | 370 return received_sync_msgs_->dispatch_event(); |
| 302 } | 371 } |
| 303 | 372 |
| 304 void SyncChannel::SyncContext::DispatchMessages() { | 373 void SyncChannel::SyncContext::DispatchMessages() { |
| 305 received_sync_msgs_->DispatchMessages(this); | 374 received_sync_msgs_->DispatchMessages(this); |
| 306 } | 375 } |
| 307 | 376 |
| 308 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 377 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
| 309 base::AutoLock auto_lock(deserializers_lock_); | 378 base::AutoLock auto_lock(deserializers_lock_); |
| 310 if (deserializers_.empty() || | 379 if (deserializers_.empty() || |
| 311 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 380 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
| 312 return false; | 381 return false; |
| 313 } | 382 } |
| 314 | 383 |
| 315 if (!msg->is_reply_error()) { | 384 if (!msg->is_reply_error()) { |
| 316 bool send_result = deserializers_.back().deserializer-> | 385 bool send_result = deserializers_.back().deserializer-> |
| 317 SerializeOutputParameters(*msg); | 386 SerializeOutputParameters(*msg); |
| 318 deserializers_.back().send_result = send_result; | 387 deserializers_.back().send_result = send_result; |
| 319 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; | 388 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; |
| 320 } else { | 389 } else { |
| 321 DVLOG(1) << "Received error reply"; | 390 DVLOG(1) << "Received error reply"; |
| 322 } | 391 } |
| 323 | 392 |
| 324 base::WaitableEvent* done_event = deserializers_.back().done_event; | 393 MojoEvent* done_event = deserializers_.back().done_event; |
| 325 TRACE_EVENT_FLOW_BEGIN0( | 394 TRACE_EVENT_FLOW_BEGIN0( |
| 326 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 395 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 327 "SyncChannel::SyncContext::TryToUnblockListener", done_event); | 396 "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
| 328 | 397 |
| 329 done_event->Signal(); | 398 done_event->Signal(); |
| 330 | 399 |
| 331 return true; | 400 return true; |
| 332 } | 401 } |
| 333 | 402 |
| 334 void SyncChannel::SyncContext::Clear() { | 403 void SyncChannel::SyncContext::Clear() { |
| (...skipping 25 matching lines...) Expand all Loading... |
| 360 | 429 |
| 361 void SyncChannel::SyncContext::OnChannelError() { | 430 void SyncChannel::SyncContext::OnChannelError() { |
| 362 CancelPendingSends(); | 431 CancelPendingSends(); |
| 363 shutdown_watcher_.StopWatching(); | 432 shutdown_watcher_.StopWatching(); |
| 364 Context::OnChannelError(); | 433 Context::OnChannelError(); |
| 365 } | 434 } |
| 366 | 435 |
| 367 void SyncChannel::SyncContext::OnChannelOpened() { | 436 void SyncChannel::SyncContext::OnChannelOpened() { |
| 368 shutdown_watcher_.StartWatching( | 437 shutdown_watcher_.StartWatching( |
| 369 shutdown_event_, | 438 shutdown_event_, |
| 370 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, | 439 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled, |
| 371 base::Unretained(this))); | 440 base::Unretained(this))); |
| 372 Context::OnChannelOpened(); | 441 Context::OnChannelOpened(); |
| 373 } | 442 } |
| 374 | 443 |
| 375 void SyncChannel::SyncContext::OnChannelClosed() { | 444 void SyncChannel::SyncContext::OnChannelClosed() { |
| 376 CancelPendingSends(); | 445 CancelPendingSends(); |
| 377 shutdown_watcher_.StopWatching(); | 446 shutdown_watcher_.StopWatching(); |
| 378 Context::OnChannelClosed(); | 447 Context::OnChannelClosed(); |
| 379 } | 448 } |
| 380 | 449 |
| 381 void SyncChannel::SyncContext::CancelPendingSends() { | 450 void SyncChannel::SyncContext::CancelPendingSends() { |
| 382 base::AutoLock auto_lock(deserializers_lock_); | 451 base::AutoLock auto_lock(deserializers_lock_); |
| 383 PendingSyncMessageQueue::iterator iter; | 452 PendingSyncMessageQueue::iterator iter; |
| 384 DVLOG(1) << "Canceling pending sends"; | 453 DVLOG(1) << "Canceling pending sends"; |
| 385 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { | 454 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
| 386 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 455 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 387 "SyncChannel::SyncContext::CancelPendingSends", | 456 "SyncChannel::SyncContext::CancelPendingSends", |
| 388 iter->done_event); | 457 iter->done_event); |
| 389 iter->done_event->Signal(); | 458 iter->done_event->Signal(); |
| 390 } | 459 } |
| 391 } | 460 } |
| 392 | 461 |
| 393 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { | 462 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) { |
| 394 if (event == shutdown_event_) { | 463 DCHECK_EQ(event, shutdown_event_); |
| 395 // Process shut down before we can get a reply to a synchronous message. | |
| 396 // Cancel pending Send calls, which will end up setting the send done event. | |
| 397 CancelPendingSends(); | |
| 398 } else { | |
| 399 // We got the reply, timed out or the process shutdown. | |
| 400 DCHECK_EQ(GetSendDoneEvent(), event); | |
| 401 base::MessageLoop::current()->QuitNow(); | |
| 402 } | |
| 403 } | |
| 404 | 464 |
| 405 base::WaitableEventWatcher::EventCallback | 465 // Process shut down before we can get a reply to a synchronous message. |
| 406 SyncChannel::SyncContext::MakeWaitableEventCallback() { | 466 // Cancel pending Send calls, which will end up setting the send done event. |
| 407 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); | 467 CancelPendingSends(); |
| 408 } | 468 } |
| 409 | 469 |
| 410 // static | 470 // static |
| 411 std::unique_ptr<SyncChannel> SyncChannel::Create( | 471 std::unique_ptr<SyncChannel> SyncChannel::Create( |
| 412 const IPC::ChannelHandle& channel_handle, | 472 const IPC::ChannelHandle& channel_handle, |
| 413 Channel::Mode mode, | 473 Channel::Mode mode, |
| 414 Listener* listener, | 474 Listener* listener, |
| 415 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 475 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| 416 bool create_pipe_now, | 476 bool create_pipe_now, |
| 417 base::WaitableEvent* shutdown_event) { | 477 base::WaitableEvent* shutdown_event) { |
| (...skipping 23 matching lines...) Expand all Loading... |
| 441 WaitableEvent* shutdown_event) { | 501 WaitableEvent* shutdown_event) { |
| 442 return base::WrapUnique( | 502 return base::WrapUnique( |
| 443 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 503 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
| 444 } | 504 } |
| 445 | 505 |
| 446 SyncChannel::SyncChannel( | 506 SyncChannel::SyncChannel( |
| 447 Listener* listener, | 507 Listener* listener, |
| 448 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 508 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| 449 WaitableEvent* shutdown_event) | 509 WaitableEvent* shutdown_event) |
| 450 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { | 510 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { |
| 511 // Keep a thread-local PumpMessagesEvent alive at least as long as any |
| 512 // SyncChannel exists. This is balanced in the SyncChannel destructor below. |
| 513 PumpMessagesEvent::Acquire(); |
| 514 |
| 451 // The current (listener) thread must be distinct from the IPC thread, or else | 515 // The current (listener) thread must be distinct from the IPC thread, or else |
| 452 // sending synchronous messages will deadlock. | 516 // sending synchronous messages will deadlock. |
| 453 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 517 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
| 454 StartWatching(); | 518 StartWatching(); |
| 455 } | 519 } |
| 456 | 520 |
| 457 SyncChannel::~SyncChannel() { | 521 SyncChannel::~SyncChannel() { |
| 522 PumpMessagesEvent::Release(); |
| 458 } | 523 } |
| 459 | 524 |
| 460 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 525 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
| 461 sync_context()->set_restrict_dispatch_group(group); | 526 sync_context()->set_restrict_dispatch_group(group); |
| 462 } | 527 } |
| 463 | 528 |
| 464 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { | 529 scoped_refptr<SyncMessageFilter> SyncChannel::CreateSyncMessageFilter() { |
| 465 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter( | 530 scoped_refptr<SyncMessageFilter> filter = new SyncMessageFilter( |
| 466 sync_context()->shutdown_event(), | 531 sync_context()->shutdown_event(), |
| 467 sync_context()->IsChannelSendThreadSafe()); | 532 sync_context()->IsChannelSendThreadSafe()); |
| (...skipping 20 matching lines...) Expand all Loading... |
| 488 | 553 |
| 489 // *this* might get deleted in WaitForReply. | 554 // *this* might get deleted in WaitForReply. |
| 490 scoped_refptr<SyncContext> context(sync_context()); | 555 scoped_refptr<SyncContext> context(sync_context()); |
| 491 if (context->shutdown_event()->IsSignaled()) { | 556 if (context->shutdown_event()->IsSignaled()) { |
| 492 DVLOG(1) << "shutdown event is signaled"; | 557 DVLOG(1) << "shutdown event is signaled"; |
| 493 delete message; | 558 delete message; |
| 494 return false; | 559 return false; |
| 495 } | 560 } |
| 496 | 561 |
| 497 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); | 562 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| 563 bool pump_messages = sync_msg->ShouldPumpMessages(); |
| 498 context->Push(sync_msg); | 564 context->Push(sync_msg); |
| 499 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); | |
| 500 | 565 |
| 501 ChannelProxy::Send(message); | 566 ChannelProxy::Send(message); |
| 502 | 567 |
| 503 // Wait for reply, or for any other incoming synchronous messages. | 568 // Wait for reply, or for any other incoming synchronous messages. |
| 504 // *this* might get deleted, so only call static functions at this point. | 569 // *this* might get deleted, so only call static functions at this point. |
| 505 WaitForReply(context.get(), pump_messages_event); | 570 WaitForReply(context.get(), pump_messages); |
| 506 | 571 |
| 507 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 572 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 508 "SyncChannel::Send", context->GetSendDoneEvent()); | 573 "SyncChannel::Send", context->GetSendDoneEvent()); |
| 509 | 574 |
| 510 return context->Pop(); | 575 return context->Pop(); |
| 511 } | 576 } |
| 512 | 577 |
| 513 void SyncChannel::WaitForReply( | 578 void SyncChannel::WaitForReply(SyncContext* context, bool pump_messages) { |
| 514 SyncContext* context, WaitableEvent* pump_messages_event) { | |
| 515 context->DispatchMessages(); | 579 context->DispatchMessages(); |
| 580 |
| 581 PumpMessagesEvent* pump_messages_event = nullptr; |
| 582 if (pump_messages) |
| 583 pump_messages_event = PumpMessagesEvent::Acquire(); |
| 584 |
| 585 scoped_refptr<mojo::SyncHandleRegistry> registry = |
| 586 mojo::SyncHandleRegistry::current(); |
| 587 |
| 516 while (true) { | 588 while (true) { |
| 517 WaitableEvent* objects[] = { | 589 bool dispatch = false; |
| 518 context->GetDispatchEvent(), | 590 bool send_done = false; |
| 519 context->GetSendDoneEvent(), | 591 bool should_pump_messages = false; |
| 520 pump_messages_event | 592 bool error = false; |
| 521 }; | 593 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), |
| 594 MOJO_HANDLE_SIGNAL_READABLE, |
| 595 base::Bind(&OnSyncHandleReady, &dispatch, &error)); |
| 596 registry->RegisterHandle( |
| 597 context->GetSendDoneEvent()->GetHandle(), |
| 598 MOJO_HANDLE_SIGNAL_READABLE, |
| 599 base::Bind(&OnSyncHandleReady, &send_done, &error)); |
| 600 if (pump_messages_event) { |
| 601 registry->RegisterHandle( |
| 602 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 603 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| 604 } |
| 522 | 605 |
| 523 unsigned count = pump_messages_event ? 3: 2; | 606 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
| 524 size_t result = WaitableEvent::WaitMany(objects, count); | 607 bool result = registry->WatchAllHandles(stop_flags, 3); |
| 525 if (result == 0 /* dispatch event */) { | 608 DCHECK(result); |
| 609 DCHECK(!error); |
| 610 |
| 611 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); |
| 612 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| 613 if (pump_messages_event) |
| 614 registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| 615 |
| 616 if (dispatch) { |
| 526 // We're waiting for a reply, but we received a blocking synchronous | 617 // We're waiting for a reply, but we received a blocking synchronous |
| 527 // call. We must process it or otherwise a deadlock might occur. | 618 // call. We must process it or otherwise a deadlock might occur. |
| 528 context->GetDispatchEvent()->Reset(); | 619 context->GetDispatchEvent()->Reset(); |
| 529 context->DispatchMessages(); | 620 context->DispatchMessages(); |
| 530 continue; | 621 continue; |
| 531 } | 622 } |
| 532 | 623 |
| 533 if (result == 2 /* pump_messages_event */) | 624 DCHECK(send_done || should_pump_messages); |
| 625 |
| 626 if (should_pump_messages) |
| 534 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 627 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
| 535 | 628 |
| 536 break; | 629 break; |
| 537 } | 630 } |
| 631 |
| 632 if (pump_messages_event) |
| 633 PumpMessagesEvent::Release(); |
| 538 } | 634 } |
| 539 | 635 |
| 540 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 636 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
| 541 base::WaitableEventWatcher send_done_watcher; | 637 mojo::Watcher send_done_watcher; |
| 542 | 638 |
| 543 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 639 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
| 544 DCHECK(sync_msg_queue != NULL); | 640 DCHECK_NE(sync_msg_queue, nullptr); |
| 545 | 641 |
| 546 base::WaitableEventWatcher* old_send_done_event_watcher = | 642 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
| 547 sync_msg_queue->top_send_done_watcher(); | 643 mojo::Handle old_handle(mojo::kInvalidHandleValue); |
| 644 mojo::Watcher::ReadyCallback old_callback; |
| 548 | 645 |
| 549 base::WaitableEventWatcher::EventCallback old_callback; | 646 // Maintain a thread-local stack of watchers to ensure nested calls complete |
| 550 base::WaitableEvent* old_event = NULL; | 647 // in the correct sequence, i.e. the outermost call completes first, etc. |
| 551 | 648 if (old_watcher) { |
| 552 // Maintain a local global stack of send done delegates to ensure that | 649 old_callback = old_watcher->ready_callback(); |
| 553 // nested sync calls complete in the correct sequence, i.e. the | 650 old_handle = old_watcher->handle(); |
| 554 // outermost call completes first, etc. | 651 old_watcher->Cancel(); |
| 555 if (old_send_done_event_watcher) { | |
| 556 old_callback = old_send_done_event_watcher->callback(); | |
| 557 old_event = old_send_done_event_watcher->GetWatchedEvent(); | |
| 558 old_send_done_event_watcher->StopWatching(); | |
| 559 } | 652 } |
| 560 | 653 |
| 561 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 654 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
| 562 | 655 |
| 563 send_done_watcher.StartWatching(context->GetSendDoneEvent(), | 656 { |
| 564 context->MakeWaitableEventCallback()); | 657 base::RunLoop nested_loop; |
| 658 send_done_watcher.Start( |
| 659 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 660 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
| 565 | 661 |
| 566 { | |
| 567 base::MessageLoop::ScopedNestableTaskAllower allow( | 662 base::MessageLoop::ScopedNestableTaskAllower allow( |
| 568 base::MessageLoop::current()); | 663 base::MessageLoop::current()); |
| 569 base::MessageLoop::current()->Run(); | 664 nested_loop.Run(); |
| 665 send_done_watcher.Cancel(); |
| 570 } | 666 } |
| 571 | 667 |
| 572 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); | 668 sync_msg_queue->set_top_send_done_watcher(old_watcher); |
| 573 if (old_send_done_event_watcher && old_event) { | 669 if (old_watcher) |
| 574 old_send_done_event_watcher->StartWatching(old_event, old_callback); | 670 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
| 575 } | |
| 576 } | 671 } |
| 577 | 672 |
| 578 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { | 673 void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
| 579 DCHECK(event == sync_context()->GetDispatchEvent()); | 674 DCHECK_EQ(result, MOJO_RESULT_OK); |
| 580 // The call to DispatchMessages might delete this object, so reregister | 675 sync_context()->GetDispatchEvent()->Reset(); |
| 581 // the object watcher first. | |
| 582 event->Reset(); | |
| 583 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); | |
| 584 sync_context()->DispatchMessages(); | 676 sync_context()->DispatchMessages(); |
| 585 } | 677 } |
| 586 | 678 |
| 587 void SyncChannel::StartWatching() { | 679 void SyncChannel::StartWatching() { |
| 588 // Ideally we only want to watch this object when running a nested message | 680 // Ideally we only want to watch this object when running a nested message |
| 589 // loop. However, we don't know when it exits if there's another nested | 681 // loop. However, we don't know when it exits if there's another nested |
| 590 // message loop running under it or not, so we wouldn't know whether to | 682 // message loop running under it or not, so we wouldn't know whether to |
| 591 // stop or keep watching. So we always watch it, and create the event as | 683 // stop or keep watching. So we always watch it. |
| 592 // manual reset since the object watcher might otherwise reset the event | 684 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
| 593 // when we're doing a WaitMany. | 685 MOJO_HANDLE_SIGNAL_READABLE, |
| 594 dispatch_watcher_callback_ = | 686 base::Bind(&SyncChannel::OnDispatchHandleReady, |
| 595 base::Bind(&SyncChannel::OnWaitableEventSignaled, | 687 base::Unretained(this))); |
| 596 base::Unretained(this)); | |
| 597 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), | |
| 598 dispatch_watcher_callback_); | |
| 599 } | 688 } |
| 600 | 689 |
| 601 void SyncChannel::OnChannelInit() { | 690 void SyncChannel::OnChannelInit() { |
| 602 for (const auto& filter : pre_init_sync_message_filters_) { | 691 for (const auto& filter : pre_init_sync_message_filters_) { |
| 603 filter->set_is_channel_send_thread_safe( | 692 filter->set_is_channel_send_thread_safe( |
| 604 context()->IsChannelSendThreadSafe()); | 693 context()->IsChannelSendThreadSafe()); |
| 605 } | 694 } |
| 606 pre_init_sync_message_filters_.clear(); | 695 pre_init_sync_message_filters_.clear(); |
| 607 } | 696 } |
| 608 | 697 |
| (...skipping 23 matching lines...) Expand all Loading... |
| 632 TRACE_EVENT2("ipc", "SyncChannel::SendOnIPCThread", | 721 TRACE_EVENT2("ipc", "SyncChannel::SendOnIPCThread", |
| 633 "class", IPC_MESSAGE_ID_CLASS(message->type()), | 722 "class", IPC_MESSAGE_ID_CLASS(message->type()), |
| 634 "line", IPC_MESSAGE_ID_LINE(message->type())); | 723 "line", IPC_MESSAGE_ID_LINE(message->type())); |
| 635 #endif | 724 #endif |
| 636 if (!message->is_sync()) | 725 if (!message->is_sync()) |
| 637 return ChannelProxy::SendOnIPCThread(std::move(message)); | 726 return ChannelProxy::SendOnIPCThread(std::move(message)); |
| 638 return Send(message.release()); | 727 return Send(message.release()); |
| 639 } | 728 } |
| 640 | 729 |
| 641 } // namespace IPC | 730 } // namespace IPC |
| OLD | NEW |