Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <utility> | 10 #include <utility> |
| 11 | 11 |
| 12 #include "base/bind.h" | 12 #include "base/bind.h" |
| 13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
| 14 #include "base/location.h" | 14 #include "base/location.h" |
| 15 #include "base/logging.h" | 15 #include "base/logging.h" |
| 16 #include "base/macros.h" | 16 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
| 18 #include "base/run_loop.h" | 18 #include "base/run_loop.h" |
| 19 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
| 20 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
| 21 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
| 22 #include "base/trace_event/trace_event.h" | 22 #include "base/trace_event/trace_event.h" |
| 23 #include "ipc/ipc_channel_factory.h" | 23 #include "ipc/ipc_channel_factory.h" |
| 24 #include "ipc/ipc_logging.h" | 24 #include "ipc/ipc_logging.h" |
| 25 #include "ipc/ipc_message_macros.h" | 25 #include "ipc/ipc_message_macros.h" |
| 26 #include "ipc/ipc_sync_message.h" | 26 #include "ipc/ipc_sync_message.h" |
| 27 #include "ipc/mojo_event.h" | 27 #include "ipc/mojo_event.h" |
| 28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" | 28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
| 29 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | |
| 29 | 30 |
| 30 using base::WaitableEvent; | 31 using base::WaitableEvent; |
| 31 | 32 |
| 32 namespace IPC { | 33 namespace IPC { |
| 33 | 34 |
| 34 namespace { | 35 namespace { |
| 35 | 36 |
| 36 // A generic callback used when watching handles synchronously. Sets |*signal| | 37 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 37 // to true. Also sets |*error| to true in case of an error. | 38 // to true. Also sets |*error| to true in case of an error. |
| 38 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { | 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 94 // SyncChannel objects can block the same thread). | 95 // SyncChannel objects can block the same thread). |
| 95 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); | 96 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); |
| 96 if (!rv) { | 97 if (!rv) { |
| 97 rv = new ReceivedSyncMsgQueue(); | 98 rv = new ReceivedSyncMsgQueue(); |
| 98 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); | 99 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); |
| 99 } | 100 } |
| 100 rv->listener_count_++; | 101 rv->listener_count_++; |
| 101 return rv; | 102 return rv; |
| 102 } | 103 } |
| 103 | 104 |
| 105 // Prevents messages from being dispatched immediately when the dispatch event | |
| 106 // is signaled. Instead, |*dispatch_flag| will be set. | |
| 107 void BlockDispatch(bool *dispatch_flag) { dispatch_flag_ = dispatch_flag; } | |
|
yzshen1
2016/08/02 17:38:37
style nit: "*" should be next to "bool" instead of
Ken Rockot(use gerrit already)
2016/08/02 19:05:05
Done
| |
| 108 | |
| 109 // Allows messages to be dispatched immediately when the dispatch event is | |
| 110 // signaled. | |
| 111 void UnblockDispatch() { dispatch_flag_ = nullptr; } | |
| 112 | |
| 104 // Called on IPC thread when a synchronous message or reply arrives. | 113 // Called on IPC thread when a synchronous message or reply arrives. |
| 105 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { | 114 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { |
| 106 bool was_task_pending; | 115 bool was_task_pending; |
| 107 { | 116 { |
| 108 base::AutoLock auto_lock(message_lock_); | 117 base::AutoLock auto_lock(message_lock_); |
| 109 | 118 |
| 110 was_task_pending = task_pending_; | 119 was_task_pending = task_pending_; |
| 111 task_pending_ = true; | 120 task_pending_ = true; |
| 112 | 121 |
| 113 // We set the event in case the listener thread is blocked (or is about | 122 // We set the event in case the listener thread is blocked (or is about |
| (...skipping 17 matching lines...) Expand all Loading... | |
| 131 // Called on the listener's thread to process any queues synchronous | 140 // Called on the listener's thread to process any queues synchronous |
| 132 // messages. | 141 // messages. |
| 133 void DispatchMessagesTask(SyncContext* context) { | 142 void DispatchMessagesTask(SyncContext* context) { |
| 134 { | 143 { |
| 135 base::AutoLock auto_lock(message_lock_); | 144 base::AutoLock auto_lock(message_lock_); |
| 136 task_pending_ = false; | 145 task_pending_ = false; |
| 137 } | 146 } |
| 138 context->DispatchMessages(); | 147 context->DispatchMessages(); |
| 139 } | 148 } |
| 140 | 149 |
| 150 // Dispatches any queued incoming sync messages. If |dispatching_context| is | |
| 151 // not null, messages which target a restricted dispatch channel will only be | |
| 152 // dispatched if |dispatching_context| belongs to the same restricted dispatch | |
| 153 // group as that channel. If |dispatching_context| is null, all queued | |
| 154 // messages are dispatched. | |
| 141 void DispatchMessages(SyncContext* dispatching_context) { | 155 void DispatchMessages(SyncContext* dispatching_context) { |
| 142 bool first_time = true; | 156 bool first_time = true; |
| 143 uint32_t expected_version = 0; | 157 uint32_t expected_version = 0; |
| 144 SyncMessageQueue::iterator it; | 158 SyncMessageQueue::iterator it; |
| 145 while (true) { | 159 while (true) { |
| 146 Message* message = NULL; | 160 Message* message = nullptr; |
| 147 scoped_refptr<SyncChannel::SyncContext> context; | 161 scoped_refptr<SyncChannel::SyncContext> context; |
| 148 { | 162 { |
| 149 base::AutoLock auto_lock(message_lock_); | 163 base::AutoLock auto_lock(message_lock_); |
| 150 if (first_time || message_queue_version_ != expected_version) { | 164 if (first_time || message_queue_version_ != expected_version) { |
| 151 it = message_queue_.begin(); | 165 it = message_queue_.begin(); |
| 152 first_time = false; | 166 first_time = false; |
| 153 } | 167 } |
| 154 for (; it != message_queue_.end(); it++) { | 168 for (; it != message_queue_.end(); it++) { |
| 155 int message_group = it->context->restrict_dispatch_group(); | 169 int message_group = it->context->restrict_dispatch_group(); |
| 156 if (message_group == kRestrictDispatchGroup_None || | 170 if (!dispatching_context || |
| 171 message_group == kRestrictDispatchGroup_None || | |
| 157 message_group == dispatching_context->restrict_dispatch_group()) { | 172 message_group == dispatching_context->restrict_dispatch_group()) { |
| 158 message = it->message; | 173 message = it->message; |
| 159 context = it->context; | 174 context = it->context; |
| 160 it = message_queue_.erase(it); | 175 it = message_queue_.erase(it); |
| 161 message_queue_version_++; | 176 message_queue_version_++; |
| 162 expected_version = message_queue_version_; | 177 expected_version = message_queue_version_; |
| 163 break; | 178 break; |
| 164 } | 179 } |
| 165 } | 180 } |
| 166 } | 181 } |
| 167 | 182 |
| 168 if (message == NULL) | 183 if (message == nullptr) |
| 169 break; | 184 break; |
| 170 context->OnDispatchMessage(*message); | 185 context->OnDispatchMessage(*message); |
| 171 delete message; | 186 delete message; |
| 172 } | 187 } |
| 173 } | 188 } |
| 174 | 189 |
| 175 // SyncChannel calls this in its destructor. | 190 // SyncChannel calls this in its destructor. |
| 176 void RemoveContext(SyncContext* context) { | 191 void RemoveContext(SyncContext* context) { |
| 177 base::AutoLock auto_lock(message_lock_); | 192 base::AutoLock auto_lock(message_lock_); |
| 178 | 193 |
| 179 SyncMessageQueue::iterator iter = message_queue_.begin(); | 194 SyncMessageQueue::iterator iter = message_queue_.begin(); |
| 180 while (iter != message_queue_.end()) { | 195 while (iter != message_queue_.end()) { |
| 181 if (iter->context.get() == context) { | 196 if (iter->context.get() == context) { |
| 182 delete iter->message; | 197 delete iter->message; |
| 183 iter = message_queue_.erase(iter); | 198 iter = message_queue_.erase(iter); |
| 184 message_queue_version_++; | 199 message_queue_version_++; |
| 185 } else { | 200 } else { |
| 186 iter++; | 201 iter++; |
| 187 } | 202 } |
| 188 } | 203 } |
| 189 | 204 |
| 190 if (--listener_count_ == 0) { | 205 if (--listener_count_ == 0) { |
| 191 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 206 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
| 192 lazy_tls_ptr_.Pointer()->Set(NULL); | 207 lazy_tls_ptr_.Pointer()->Set(nullptr); |
| 208 sync_dispatch_watcher_.reset(); | |
| 193 } | 209 } |
| 194 } | 210 } |
| 195 | 211 |
| 196 MojoEvent* dispatch_event() { return &dispatch_event_; } | 212 MojoEvent* dispatch_event() { return &dispatch_event_; } |
| 197 base::SingleThreadTaskRunner* listener_task_runner() { | 213 base::SingleThreadTaskRunner* listener_task_runner() { |
| 198 return listener_task_runner_.get(); | 214 return listener_task_runner_.get(); |
| 199 } | 215 } |
| 200 | 216 |
| 201 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 217 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
| 202 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 218 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 226 private: | 242 private: |
| 227 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 243 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
| 228 | 244 |
| 229 // See the comment in SyncChannel::SyncChannel for why this event is created | 245 // See the comment in SyncChannel::SyncChannel for why this event is created |
| 230 // as manual reset. | 246 // as manual reset. |
| 231 ReceivedSyncMsgQueue() | 247 ReceivedSyncMsgQueue() |
| 232 : message_queue_version_(0), | 248 : message_queue_version_(0), |
| 233 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 249 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 234 task_pending_(false), | 250 task_pending_(false), |
| 235 listener_count_(0), | 251 listener_count_(0), |
| 236 top_send_done_watcher_(NULL) {} | 252 top_send_done_watcher_(nullptr) { |
| 253 sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher( | |
| 254 dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
| 255 base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady, | |
| 256 base::Unretained(this)))); | |
| 257 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | |
| 258 } | |
| 237 | 259 |
| 238 ~ReceivedSyncMsgQueue() {} | 260 ~ReceivedSyncMsgQueue() {} |
| 239 | 261 |
| 262 void OnDispatchHandleReady(MojoResult result) { | |
| 263 if (result != MOJO_RESULT_OK) | |
| 264 return; | |
| 265 | |
| 266 if (dispatch_flag_) { | |
| 267 *dispatch_flag_ = true; | |
| 268 return; | |
| 269 } | |
| 270 | |
| 271 // We were woken up during a sync wait, but no specific SyncChannel is | |
| 272 // currently waiting. i.e., some other Mojo interface on this thread is | |
| 273 // waiting for a response. Since we don't support anything analogous to | |
| 274 // restricted dispatch on Mojo interfaces, in this case it's safe to | |
| 275 // dispatch sync messages for any context. | |
| 276 DispatchMessages(nullptr); | |
| 277 } | |
| 278 | |
| 240 // Holds information about a queued synchronous message or reply. | 279 // Holds information about a queued synchronous message or reply. |
| 241 struct QueuedMessage { | 280 struct QueuedMessage { |
| 242 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 281 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
| 243 Message* message; | 282 Message* message; |
| 244 scoped_refptr<SyncChannel::SyncContext> context; | 283 scoped_refptr<SyncChannel::SyncContext> context; |
| 245 }; | 284 }; |
| 246 | 285 |
| 247 typedef std::list<QueuedMessage> SyncMessageQueue; | 286 typedef std::list<QueuedMessage> SyncMessageQueue; |
| 248 SyncMessageQueue message_queue_; | 287 SyncMessageQueue message_queue_; |
| 249 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 288 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan |
| 250 | 289 |
| 251 std::vector<QueuedMessage> received_replies_; | 290 std::vector<QueuedMessage> received_replies_; |
| 252 | 291 |
| 253 // Signaled when we get a synchronous message that we must respond to, as the | 292 // Signaled when we get a synchronous message that we must respond to, as the |
| 254 // sender needs its reply before it can reply to our original synchronous | 293 // sender needs its reply before it can reply to our original synchronous |
| 255 // message. | 294 // message. |
| 256 MojoEvent dispatch_event_; | 295 MojoEvent dispatch_event_; |
| 257 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 296 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
| 258 base::Lock message_lock_; | 297 base::Lock message_lock_; |
| 259 bool task_pending_; | 298 bool task_pending_; |
| 260 int listener_count_; | 299 int listener_count_; |
| 261 | 300 |
| 262 // The current send done handle watcher for this thread. Used to maintain | 301 // The current send done handle watcher for this thread. Used to maintain |
| 263 // a thread-local stack of send done watchers to ensure that nested sync | 302 // a thread-local stack of send done watchers to ensure that nested sync |
| 264 // message loops complete correctly. | 303 // message loops complete correctly. |
| 265 mojo::Watcher* top_send_done_watcher_; | 304 mojo::Watcher* top_send_done_watcher_; |
| 305 | |
| 306 // If not null, the address of a flag to set when the dispatch event signals, | |
| 307 // in lieu of actually dispatching messages. This is used by | |
| 308 // SyncChannel::WaitForReply to restrict the scope of queued messages we're | |
| 309 // allowed to process while it's waiting. | |
| 310 bool* dispatch_flag_ = nullptr; | |
| 311 | |
| 312 // Watches |dispatch_event_| during all sync handle watches on this thread. | |
| 313 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_; | |
| 266 }; | 314 }; |
| 267 | 315 |
| 268 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 316 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| 269 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 317 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
| 270 LAZY_INSTANCE_INITIALIZER; | 318 LAZY_INSTANCE_INITIALIZER; |
| 271 | 319 |
| 272 SyncChannel::SyncContext::SyncContext( | 320 SyncChannel::SyncContext::SyncContext( |
| 273 Listener* listener, | 321 Listener* listener, |
| 274 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 322 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
| 275 WaitableEvent* shutdown_event) | 323 WaitableEvent* shutdown_event) |
| (...skipping 222 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 498 sync_context()->IsChannelSendThreadSafe()); | 546 sync_context()->IsChannelSendThreadSafe()); |
| 499 AddFilter(filter.get()); | 547 AddFilter(filter.get()); |
| 500 if (!did_init()) | 548 if (!did_init()) |
| 501 pre_init_sync_message_filters_.push_back(filter); | 549 pre_init_sync_message_filters_.push_back(filter); |
| 502 return filter; | 550 return filter; |
| 503 } | 551 } |
| 504 | 552 |
| 505 bool SyncChannel::Send(Message* message) { | 553 bool SyncChannel::Send(Message* message) { |
| 506 #ifdef IPC_MESSAGE_LOG_ENABLED | 554 #ifdef IPC_MESSAGE_LOG_ENABLED |
| 507 std::string name; | 555 std::string name; |
| 508 Logging::GetInstance()->GetMessageText(message->type(), &name, message, NULL); | 556 Logging::GetInstance()->GetMessageText( |
| 557 message->type(), &name, message, nullptr); | |
| 509 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name); | 558 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name); |
| 510 #else | 559 #else |
| 511 TRACE_EVENT2("ipc", "SyncChannel::Send", | 560 TRACE_EVENT2("ipc", "SyncChannel::Send", |
| 512 "class", IPC_MESSAGE_ID_CLASS(message->type()), | 561 "class", IPC_MESSAGE_ID_CLASS(message->type()), |
| 513 "line", IPC_MESSAGE_ID_LINE(message->type())); | 562 "line", IPC_MESSAGE_ID_LINE(message->type())); |
| 514 #endif | 563 #endif |
| 515 if (!message->is_sync()) { | 564 if (!message->is_sync()) { |
| 516 ChannelProxy::Send(message); | 565 ChannelProxy::Send(message); |
| 517 return true; | 566 return true; |
| 518 } | 567 } |
| 519 | 568 |
| 520 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); | 569 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| 521 bool pump_messages = sync_msg->ShouldPumpMessages(); | 570 bool pump_messages = sync_msg->ShouldPumpMessages(); |
| 522 | 571 |
| 523 // *this* might get deleted in WaitForReply. | 572 // *this* might get deleted in WaitForReply. |
| 524 scoped_refptr<SyncContext> context(sync_context()); | 573 scoped_refptr<SyncContext> context(sync_context()); |
| 525 if (!context->Push(sync_msg)) { | 574 if (!context->Push(sync_msg)) { |
| 526 DVLOG(1) << "Channel is shutting down. Dropping sync message."; | 575 DVLOG(1) << "Channel is shutting down. Dropping sync message."; |
| 527 delete message; | 576 delete message; |
| 528 return false; | 577 return false; |
| 529 } | 578 } |
| 530 | 579 |
| 531 ChannelProxy::Send(message); | 580 ChannelProxy::Send(message); |
| 532 | 581 |
| 533 // Wait for reply, or for any other incoming synchronous messages. | 582 // Wait for reply, or for any other incoming synchronous messages. |
| 534 // *this* might get deleted, so only call static functions at this point. | 583 // |this| might get deleted, so only call static functions at this point. |
| 535 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; | 584 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; |
| 536 WaitForReply(registry.get(), context.get(), pump_messages); | 585 WaitForReply(registry.get(), context.get(), pump_messages); |
| 537 | 586 |
| 538 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 587 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
| 539 "SyncChannel::Send", context->GetSendDoneEvent()); | 588 "SyncChannel::Send", context->GetSendDoneEvent()); |
| 540 | 589 |
| 541 return context->Pop(); | 590 return context->Pop(); |
| 542 } | 591 } |
| 543 | 592 |
| 544 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, | 593 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
| 545 SyncContext* context, | 594 SyncContext* context, |
| 546 bool pump_messages) { | 595 bool pump_messages) { |
| 547 context->DispatchMessages(); | 596 context->DispatchMessages(); |
| 548 | 597 |
| 549 const MojoEvent* pump_messages_event = nullptr; | 598 const MojoEvent* pump_messages_event = nullptr; |
| 550 if (pump_messages) | 599 if (pump_messages) |
| 551 pump_messages_event = g_pump_messages_event.Get().event(); | 600 pump_messages_event = g_pump_messages_event.Get().event(); |
| 552 | 601 |
| 553 while (true) { | 602 while (true) { |
| 554 bool dispatch = false; | 603 bool dispatch = false; |
| 555 bool send_done = false; | 604 bool send_done = false; |
| 556 bool should_pump_messages = false; | 605 bool should_pump_messages = false; |
| 557 bool error = false; | 606 bool error = false; |
| 558 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), | 607 bool registered = registry->RegisterHandle( |
| 559 MOJO_HANDLE_SIGNAL_READABLE, | |
| 560 base::Bind(&OnSyncHandleReady, &dispatch, &error)); | |
| 561 registry->RegisterHandle( | |
| 562 context->GetSendDoneEvent()->GetHandle(), | 608 context->GetSendDoneEvent()->GetHandle(), |
| 563 MOJO_HANDLE_SIGNAL_READABLE, | 609 MOJO_HANDLE_SIGNAL_READABLE, |
| 564 base::Bind(&OnSyncHandleReady, &send_done, &error)); | 610 base::Bind(&OnSyncHandleReady, &send_done, &error)); |
| 611 DCHECK(registered); | |
| 565 if (pump_messages_event) { | 612 if (pump_messages_event) { |
| 566 registry->RegisterHandle( | 613 registered = registry->RegisterHandle( |
| 567 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | 614 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 568 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); | 615 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| 616 DCHECK(registered); | |
| 569 } | 617 } |
| 570 | 618 |
| 571 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; | 619 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
| 620 context->received_sync_msgs()->BlockDispatch(&dispatch); | |
| 572 registry->WatchAllHandles(stop_flags, 3); | 621 registry->WatchAllHandles(stop_flags, 3); |
| 622 context->received_sync_msgs()->UnblockDispatch(); | |
| 573 DCHECK(!error); | 623 DCHECK(!error); |
| 574 | 624 |
| 575 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); | 625 |
| 576 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); | 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| 577 if (pump_messages_event) | 627 if (pump_messages_event) |
| 578 registry->UnregisterHandle(pump_messages_event->GetHandle()); | 628 registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| 579 | 629 |
| 580 if (dispatch) { | 630 if (dispatch) { |
| 581 // We're waiting for a reply, but we received a blocking synchronous | 631 // We're waiting for a reply, but we received a blocking synchronous call. |
| 582 // call. We must process it or otherwise a deadlock might occur. | 632 // We must process it to avoid potential deadlocks. |
| 583 context->GetDispatchEvent()->Reset(); | 633 context->GetDispatchEvent()->Reset(); |
| 584 context->DispatchMessages(); | 634 context->DispatchMessages(); |
| 585 continue; | 635 continue; |
| 586 } | 636 } |
| 587 | 637 |
| 588 if (should_pump_messages) | 638 if (should_pump_messages) |
| 589 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
| 590 | 640 |
| 591 break; | 641 break; |
| 592 } | 642 } |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 631 | 681 |
| 632 void SyncChannel::OnDispatchHandleReady(MojoResult result) { | 682 void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
| 633 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); | 683 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); |
| 634 if (result == MOJO_RESULT_OK) { | 684 if (result == MOJO_RESULT_OK) { |
| 635 sync_context()->GetDispatchEvent()->Reset(); | 685 sync_context()->GetDispatchEvent()->Reset(); |
| 636 sync_context()->DispatchMessages(); | 686 sync_context()->DispatchMessages(); |
| 637 } | 687 } |
| 638 } | 688 } |
| 639 | 689 |
| 640 void SyncChannel::StartWatching() { | 690 void SyncChannel::StartWatching() { |
| 641 // Ideally we only want to watch this object when running a nested message | 691 // |dispatch_watcher_| watches the event asynchronously, only dispatching |
| 642 // loop. However, we don't know when it exits if there's another nested | 692 // messages once the listener thread is unblocked and pumping its task queue. |
| 643 // message loop running under it or not, so we wouldn't know whether to | 693 // The ReceivedSyncMsgQueue also watches this event and may dispatch |
| 644 // stop or keep watching. So we always watch it. | 694 // immediately if woken up by the same |
|
yzshen1
2016/08/02 17:38:37
the comment doesn't seem to be complete.
Ken Rockot(use gerrit already)
2016/08/02 19:05:05
Yikes - Done
| |
| 645 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), | 695 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
| 646 MOJO_HANDLE_SIGNAL_READABLE, | 696 MOJO_HANDLE_SIGNAL_READABLE, |
| 647 base::Bind(&SyncChannel::OnDispatchHandleReady, | 697 base::Bind(&SyncChannel::OnDispatchHandleReady, |
| 648 base::Unretained(this))); | 698 base::Unretained(this))); |
| 649 } | 699 } |
| 650 | 700 |
| 651 void SyncChannel::OnChannelInit() { | 701 void SyncChannel::OnChannelInit() { |
| 652 for (const auto& filter : pre_init_sync_message_filters_) { | 702 for (const auto& filter : pre_init_sync_message_filters_) { |
| 653 filter->set_is_channel_send_thread_safe( | 703 filter->set_is_channel_send_thread_safe( |
| 654 context()->IsChannelSendThreadSafe()); | 704 context()->IsChannelSendThreadSafe()); |
| 655 } | 705 } |
| 656 pre_init_sync_message_filters_.clear(); | 706 pre_init_sync_message_filters_.clear(); |
| 657 } | 707 } |
| 658 | 708 |
| 659 } // namespace IPC | 709 } // namespace IPC |
| OLD | NEW |