Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(231)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 2195953002: Adds sync message support to Channel-associated interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/lazy_instance.h" 13 #include "base/lazy_instance.h"
14 #include "base/location.h" 14 #include "base/location.h"
15 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/macros.h" 16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h" 17 #include "base/memory/ptr_util.h"
18 #include "base/run_loop.h" 18 #include "base/run_loop.h"
19 #include "base/synchronization/waitable_event.h" 19 #include "base/synchronization/waitable_event.h"
20 #include "base/threading/thread_local.h" 20 #include "base/threading/thread_local.h"
21 #include "base/threading/thread_task_runner_handle.h" 21 #include "base/threading/thread_task_runner_handle.h"
22 #include "base/trace_event/trace_event.h" 22 #include "base/trace_event/trace_event.h"
23 #include "ipc/ipc_channel_factory.h" 23 #include "ipc/ipc_channel_factory.h"
24 #include "ipc/ipc_logging.h" 24 #include "ipc/ipc_logging.h"
25 #include "ipc/ipc_message_macros.h" 25 #include "ipc/ipc_message_macros.h"
26 #include "ipc/ipc_sync_message.h" 26 #include "ipc/ipc_sync_message.h"
27 #include "ipc/mojo_event.h" 27 #include "ipc/mojo_event.h"
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" 28 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
29 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
29 30
30 using base::WaitableEvent; 31 using base::WaitableEvent;
31 32
32 namespace IPC { 33 namespace IPC {
33 34
34 namespace { 35 namespace {
35 36
36 // A generic callback used when watching handles synchronously. Sets |*signal| 37 // A generic callback used when watching handles synchronously. Sets |*signal|
37 // to true. Also sets |*error| to true in case of an error. 38 // to true. Also sets |*error| to true in case of an error.
38 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { 39 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
94 // SyncChannel objects can block the same thread). 95 // SyncChannel objects can block the same thread).
95 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); 96 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
96 if (!rv) { 97 if (!rv) {
97 rv = new ReceivedSyncMsgQueue(); 98 rv = new ReceivedSyncMsgQueue();
98 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); 99 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
99 } 100 }
100 rv->listener_count_++; 101 rv->listener_count_++;
101 return rv; 102 return rv;
102 } 103 }
103 104
105 // Prevents messages from being dispatched immediately when the dispatch event
106 // is signaled. Instead, |*dispatch_flag| will be set.
107 void BlockDispatch(bool *dispatch_flag) { dispatch_flag_ = dispatch_flag; }
yzshen1 2016/08/02 17:38:37 style nit: "*" should be next to "bool" instead of
Ken Rockot(use gerrit already) 2016/08/02 19:05:05 Done
108
109 // Allows messages to be dispatched immediately when the dispatch event is
110 // signaled.
111 void UnblockDispatch() { dispatch_flag_ = nullptr; }
112
104 // Called on IPC thread when a synchronous message or reply arrives. 113 // Called on IPC thread when a synchronous message or reply arrives.
105 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { 114 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) {
106 bool was_task_pending; 115 bool was_task_pending;
107 { 116 {
108 base::AutoLock auto_lock(message_lock_); 117 base::AutoLock auto_lock(message_lock_);
109 118
110 was_task_pending = task_pending_; 119 was_task_pending = task_pending_;
111 task_pending_ = true; 120 task_pending_ = true;
112 121
113 // We set the event in case the listener thread is blocked (or is about 122 // We set the event in case the listener thread is blocked (or is about
(...skipping 17 matching lines...) Expand all
131 // Called on the listener's thread to process any queues synchronous 140 // Called on the listener's thread to process any queues synchronous
132 // messages. 141 // messages.
133 void DispatchMessagesTask(SyncContext* context) { 142 void DispatchMessagesTask(SyncContext* context) {
134 { 143 {
135 base::AutoLock auto_lock(message_lock_); 144 base::AutoLock auto_lock(message_lock_);
136 task_pending_ = false; 145 task_pending_ = false;
137 } 146 }
138 context->DispatchMessages(); 147 context->DispatchMessages();
139 } 148 }
140 149
150 // Dispatches any queued incoming sync messages. If |dispatching_context| is
151 // not null, messages which target a restricted dispatch channel will only be
152 // dispatched if |dispatching_context| belongs to the same restricted dispatch
153 // group as that channel. If |dispatching_context| is null, all queued
154 // messages are dispatched.
141 void DispatchMessages(SyncContext* dispatching_context) { 155 void DispatchMessages(SyncContext* dispatching_context) {
142 bool first_time = true; 156 bool first_time = true;
143 uint32_t expected_version = 0; 157 uint32_t expected_version = 0;
144 SyncMessageQueue::iterator it; 158 SyncMessageQueue::iterator it;
145 while (true) { 159 while (true) {
146 Message* message = NULL; 160 Message* message = nullptr;
147 scoped_refptr<SyncChannel::SyncContext> context; 161 scoped_refptr<SyncChannel::SyncContext> context;
148 { 162 {
149 base::AutoLock auto_lock(message_lock_); 163 base::AutoLock auto_lock(message_lock_);
150 if (first_time || message_queue_version_ != expected_version) { 164 if (first_time || message_queue_version_ != expected_version) {
151 it = message_queue_.begin(); 165 it = message_queue_.begin();
152 first_time = false; 166 first_time = false;
153 } 167 }
154 for (; it != message_queue_.end(); it++) { 168 for (; it != message_queue_.end(); it++) {
155 int message_group = it->context->restrict_dispatch_group(); 169 int message_group = it->context->restrict_dispatch_group();
156 if (message_group == kRestrictDispatchGroup_None || 170 if (!dispatching_context ||
171 message_group == kRestrictDispatchGroup_None ||
157 message_group == dispatching_context->restrict_dispatch_group()) { 172 message_group == dispatching_context->restrict_dispatch_group()) {
158 message = it->message; 173 message = it->message;
159 context = it->context; 174 context = it->context;
160 it = message_queue_.erase(it); 175 it = message_queue_.erase(it);
161 message_queue_version_++; 176 message_queue_version_++;
162 expected_version = message_queue_version_; 177 expected_version = message_queue_version_;
163 break; 178 break;
164 } 179 }
165 } 180 }
166 } 181 }
167 182
168 if (message == NULL) 183 if (message == nullptr)
169 break; 184 break;
170 context->OnDispatchMessage(*message); 185 context->OnDispatchMessage(*message);
171 delete message; 186 delete message;
172 } 187 }
173 } 188 }
174 189
175 // SyncChannel calls this in its destructor. 190 // SyncChannel calls this in its destructor.
176 void RemoveContext(SyncContext* context) { 191 void RemoveContext(SyncContext* context) {
177 base::AutoLock auto_lock(message_lock_); 192 base::AutoLock auto_lock(message_lock_);
178 193
179 SyncMessageQueue::iterator iter = message_queue_.begin(); 194 SyncMessageQueue::iterator iter = message_queue_.begin();
180 while (iter != message_queue_.end()) { 195 while (iter != message_queue_.end()) {
181 if (iter->context.get() == context) { 196 if (iter->context.get() == context) {
182 delete iter->message; 197 delete iter->message;
183 iter = message_queue_.erase(iter); 198 iter = message_queue_.erase(iter);
184 message_queue_version_++; 199 message_queue_version_++;
185 } else { 200 } else {
186 iter++; 201 iter++;
187 } 202 }
188 } 203 }
189 204
190 if (--listener_count_ == 0) { 205 if (--listener_count_ == 0) {
191 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 206 DCHECK(lazy_tls_ptr_.Pointer()->Get());
192 lazy_tls_ptr_.Pointer()->Set(NULL); 207 lazy_tls_ptr_.Pointer()->Set(nullptr);
208 sync_dispatch_watcher_.reset();
193 } 209 }
194 } 210 }
195 211
196 MojoEvent* dispatch_event() { return &dispatch_event_; } 212 MojoEvent* dispatch_event() { return &dispatch_event_; }
197 base::SingleThreadTaskRunner* listener_task_runner() { 213 base::SingleThreadTaskRunner* listener_task_runner() {
198 return listener_task_runner_.get(); 214 return listener_task_runner_.get();
199 } 215 }
200 216
201 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 217 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
202 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 218 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
(...skipping 23 matching lines...) Expand all
226 private: 242 private:
227 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 243 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
228 244
229 // See the comment in SyncChannel::SyncChannel for why this event is created 245 // See the comment in SyncChannel::SyncChannel for why this event is created
230 // as manual reset. 246 // as manual reset.
231 ReceivedSyncMsgQueue() 247 ReceivedSyncMsgQueue()
232 : message_queue_version_(0), 248 : message_queue_version_(0),
233 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 249 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
234 task_pending_(false), 250 task_pending_(false),
235 listener_count_(0), 251 listener_count_(0),
236 top_send_done_watcher_(NULL) {} 252 top_send_done_watcher_(nullptr) {
253 sync_dispatch_watcher_.reset(new mojo::SyncHandleWatcher(
254 dispatch_event_.GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
255 base::Bind(&ReceivedSyncMsgQueue::OnDispatchHandleReady,
256 base::Unretained(this))));
257 sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
258 }
237 259
238 ~ReceivedSyncMsgQueue() {} 260 ~ReceivedSyncMsgQueue() {}
239 261
262 void OnDispatchHandleReady(MojoResult result) {
263 if (result != MOJO_RESULT_OK)
264 return;
265
266 if (dispatch_flag_) {
267 *dispatch_flag_ = true;
268 return;
269 }
270
271 // We were woken up during a sync wait, but no specific SyncChannel is
272 // currently waiting. i.e., some other Mojo interface on this thread is
273 // waiting for a response. Since we don't support anything analogous to
274 // restricted dispatch on Mojo interfaces, in this case it's safe to
275 // dispatch sync messages for any context.
276 DispatchMessages(nullptr);
277 }
278
240 // Holds information about a queued synchronous message or reply. 279 // Holds information about a queued synchronous message or reply.
241 struct QueuedMessage { 280 struct QueuedMessage {
242 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 281 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
243 Message* message; 282 Message* message;
244 scoped_refptr<SyncChannel::SyncContext> context; 283 scoped_refptr<SyncChannel::SyncContext> context;
245 }; 284 };
246 285
247 typedef std::list<QueuedMessage> SyncMessageQueue; 286 typedef std::list<QueuedMessage> SyncMessageQueue;
248 SyncMessageQueue message_queue_; 287 SyncMessageQueue message_queue_;
249 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan 288 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan
250 289
251 std::vector<QueuedMessage> received_replies_; 290 std::vector<QueuedMessage> received_replies_;
252 291
253 // Signaled when we get a synchronous message that we must respond to, as the 292 // Signaled when we get a synchronous message that we must respond to, as the
254 // sender needs its reply before it can reply to our original synchronous 293 // sender needs its reply before it can reply to our original synchronous
255 // message. 294 // message.
256 MojoEvent dispatch_event_; 295 MojoEvent dispatch_event_;
257 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 296 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
258 base::Lock message_lock_; 297 base::Lock message_lock_;
259 bool task_pending_; 298 bool task_pending_;
260 int listener_count_; 299 int listener_count_;
261 300
262 // The current send done handle watcher for this thread. Used to maintain 301 // The current send done handle watcher for this thread. Used to maintain
263 // a thread-local stack of send done watchers to ensure that nested sync 302 // a thread-local stack of send done watchers to ensure that nested sync
264 // message loops complete correctly. 303 // message loops complete correctly.
265 mojo::Watcher* top_send_done_watcher_; 304 mojo::Watcher* top_send_done_watcher_;
305
306 // If not null, the address of a flag to set when the dispatch event signals,
307 // in lieu of actually dispatching messages. This is used by
308 // SyncChannel::WaitForReply to restrict the scope of queued messages we're
309 // allowed to process while it's waiting.
310 bool* dispatch_flag_ = nullptr;
311
312 // Watches |dispatch_event_| during all sync handle watches on this thread.
313 std::unique_ptr<mojo::SyncHandleWatcher> sync_dispatch_watcher_;
266 }; 314 };
267 315
268 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 316 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
269 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 317 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
270 LAZY_INSTANCE_INITIALIZER; 318 LAZY_INSTANCE_INITIALIZER;
271 319
272 SyncChannel::SyncContext::SyncContext( 320 SyncChannel::SyncContext::SyncContext(
273 Listener* listener, 321 Listener* listener,
274 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 322 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
275 WaitableEvent* shutdown_event) 323 WaitableEvent* shutdown_event)
(...skipping 222 matching lines...) Expand 10 before | Expand all | Expand 10 after
498 sync_context()->IsChannelSendThreadSafe()); 546 sync_context()->IsChannelSendThreadSafe());
499 AddFilter(filter.get()); 547 AddFilter(filter.get());
500 if (!did_init()) 548 if (!did_init())
501 pre_init_sync_message_filters_.push_back(filter); 549 pre_init_sync_message_filters_.push_back(filter);
502 return filter; 550 return filter;
503 } 551 }
504 552
505 bool SyncChannel::Send(Message* message) { 553 bool SyncChannel::Send(Message* message) {
506 #ifdef IPC_MESSAGE_LOG_ENABLED 554 #ifdef IPC_MESSAGE_LOG_ENABLED
507 std::string name; 555 std::string name;
508 Logging::GetInstance()->GetMessageText(message->type(), &name, message, NULL); 556 Logging::GetInstance()->GetMessageText(
557 message->type(), &name, message, nullptr);
509 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name); 558 TRACE_EVENT1("ipc", "SyncChannel::Send", "name", name);
510 #else 559 #else
511 TRACE_EVENT2("ipc", "SyncChannel::Send", 560 TRACE_EVENT2("ipc", "SyncChannel::Send",
512 "class", IPC_MESSAGE_ID_CLASS(message->type()), 561 "class", IPC_MESSAGE_ID_CLASS(message->type()),
513 "line", IPC_MESSAGE_ID_LINE(message->type())); 562 "line", IPC_MESSAGE_ID_LINE(message->type()));
514 #endif 563 #endif
515 if (!message->is_sync()) { 564 if (!message->is_sync()) {
516 ChannelProxy::Send(message); 565 ChannelProxy::Send(message);
517 return true; 566 return true;
518 } 567 }
519 568
520 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); 569 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
521 bool pump_messages = sync_msg->ShouldPumpMessages(); 570 bool pump_messages = sync_msg->ShouldPumpMessages();
522 571
523 // *this* might get deleted in WaitForReply. 572 // *this* might get deleted in WaitForReply.
524 scoped_refptr<SyncContext> context(sync_context()); 573 scoped_refptr<SyncContext> context(sync_context());
525 if (!context->Push(sync_msg)) { 574 if (!context->Push(sync_msg)) {
526 DVLOG(1) << "Channel is shutting down. Dropping sync message."; 575 DVLOG(1) << "Channel is shutting down. Dropping sync message.";
527 delete message; 576 delete message;
528 return false; 577 return false;
529 } 578 }
530 579
531 ChannelProxy::Send(message); 580 ChannelProxy::Send(message);
532 581
533 // Wait for reply, or for any other incoming synchronous messages. 582 // Wait for reply, or for any other incoming synchronous messages.
534 // *this* might get deleted, so only call static functions at this point. 583 // |this| might get deleted, so only call static functions at this point.
535 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; 584 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
536 WaitForReply(registry.get(), context.get(), pump_messages); 585 WaitForReply(registry.get(), context.get(), pump_messages);
537 586
538 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 587 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
539 "SyncChannel::Send", context->GetSendDoneEvent()); 588 "SyncChannel::Send", context->GetSendDoneEvent());
540 589
541 return context->Pop(); 590 return context->Pop();
542 } 591 }
543 592
544 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, 593 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
545 SyncContext* context, 594 SyncContext* context,
546 bool pump_messages) { 595 bool pump_messages) {
547 context->DispatchMessages(); 596 context->DispatchMessages();
548 597
549 const MojoEvent* pump_messages_event = nullptr; 598 const MojoEvent* pump_messages_event = nullptr;
550 if (pump_messages) 599 if (pump_messages)
551 pump_messages_event = g_pump_messages_event.Get().event(); 600 pump_messages_event = g_pump_messages_event.Get().event();
552 601
553 while (true) { 602 while (true) {
554 bool dispatch = false; 603 bool dispatch = false;
555 bool send_done = false; 604 bool send_done = false;
556 bool should_pump_messages = false; 605 bool should_pump_messages = false;
557 bool error = false; 606 bool error = false;
558 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), 607 bool registered = registry->RegisterHandle(
559 MOJO_HANDLE_SIGNAL_READABLE,
560 base::Bind(&OnSyncHandleReady, &dispatch, &error));
561 registry->RegisterHandle(
562 context->GetSendDoneEvent()->GetHandle(), 608 context->GetSendDoneEvent()->GetHandle(),
563 MOJO_HANDLE_SIGNAL_READABLE, 609 MOJO_HANDLE_SIGNAL_READABLE,
564 base::Bind(&OnSyncHandleReady, &send_done, &error)); 610 base::Bind(&OnSyncHandleReady, &send_done, &error));
611 DCHECK(registered);
565 if (pump_messages_event) { 612 if (pump_messages_event) {
566 registry->RegisterHandle( 613 registered = registry->RegisterHandle(
567 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, 614 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
568 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); 615 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
616 DCHECK(registered);
569 } 617 }
570 618
571 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; 619 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
620 context->received_sync_msgs()->BlockDispatch(&dispatch);
572 registry->WatchAllHandles(stop_flags, 3); 621 registry->WatchAllHandles(stop_flags, 3);
622 context->received_sync_msgs()->UnblockDispatch();
573 DCHECK(!error); 623 DCHECK(!error);
574 624
575 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); 625
576 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); 626 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
577 if (pump_messages_event) 627 if (pump_messages_event)
578 registry->UnregisterHandle(pump_messages_event->GetHandle()); 628 registry->UnregisterHandle(pump_messages_event->GetHandle());
579 629
580 if (dispatch) { 630 if (dispatch) {
581 // We're waiting for a reply, but we received a blocking synchronous 631 // We're waiting for a reply, but we received a blocking synchronous call.
582 // call. We must process it or otherwise a deadlock might occur. 632 // We must process it to avoid potential deadlocks.
583 context->GetDispatchEvent()->Reset(); 633 context->GetDispatchEvent()->Reset();
584 context->DispatchMessages(); 634 context->DispatchMessages();
585 continue; 635 continue;
586 } 636 }
587 637
588 if (should_pump_messages) 638 if (should_pump_messages)
589 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 639 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
590 640
591 break; 641 break;
592 } 642 }
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
631 681
632 void SyncChannel::OnDispatchHandleReady(MojoResult result) { 682 void SyncChannel::OnDispatchHandleReady(MojoResult result) {
633 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); 683 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
634 if (result == MOJO_RESULT_OK) { 684 if (result == MOJO_RESULT_OK) {
635 sync_context()->GetDispatchEvent()->Reset(); 685 sync_context()->GetDispatchEvent()->Reset();
636 sync_context()->DispatchMessages(); 686 sync_context()->DispatchMessages();
637 } 687 }
638 } 688 }
639 689
640 void SyncChannel::StartWatching() { 690 void SyncChannel::StartWatching() {
641 // Ideally we only want to watch this object when running a nested message 691 // |dispatch_watcher_| watches the event asynchronously, only dispatching
642 // loop. However, we don't know when it exits if there's another nested 692 // messages once the listener thread is unblocked and pumping its task queue.
643 // message loop running under it or not, so we wouldn't know whether to 693 // The ReceivedSyncMsgQueue also watches this event and may dispatch
644 // stop or keep watching. So we always watch it. 694 // immediately if woken up by the same
yzshen1 2016/08/02 17:38:37 the comment doesn't seem to be complete.
Ken Rockot(use gerrit already) 2016/08/02 19:05:05 Yikes - Done
645 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), 695 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(),
646 MOJO_HANDLE_SIGNAL_READABLE, 696 MOJO_HANDLE_SIGNAL_READABLE,
647 base::Bind(&SyncChannel::OnDispatchHandleReady, 697 base::Bind(&SyncChannel::OnDispatchHandleReady,
648 base::Unretained(this))); 698 base::Unretained(this)));
649 } 699 }
650 700
651 void SyncChannel::OnChannelInit() { 701 void SyncChannel::OnChannelInit() {
652 for (const auto& filter : pre_init_sync_message_filters_) { 702 for (const auto& filter : pre_init_sync_message_filters_) {
653 filter->set_is_channel_send_thread_safe( 703 filter->set_is_channel_send_thread_safe(
654 context()->IsChannelSendThreadSafe()); 704 context()->IsChannelSendThreadSafe());
655 } 705 }
656 pre_init_sync_message_filters_.clear(); 706 pre_init_sync_message_filters_.clear();
657 } 707 }
658 708
659 } // namespace IPC 709 } // namespace IPC
OLDNEW
« ipc/ipc_mojo_bootstrap.cc ('K') | « ipc/ipc_mojo_bootstrap.cc ('k') | ipc/ipc_test.mojom » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698