Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(106)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 2072393003: Implements Mojo-based waiting for IPC::SyncChannel (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/lazy_instance.h" 13 #include "base/lazy_instance.h"
14 #include "base/location.h" 14 #include "base/location.h"
15 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/macros.h"
16 #include "base/memory/ptr_util.h" 17 #include "base/memory/ptr_util.h"
18 #include "base/run_loop.h"
17 #include "base/synchronization/waitable_event.h" 19 #include "base/synchronization/waitable_event.h"
18 #include "base/synchronization/waitable_event_watcher.h"
19 #include "base/threading/thread_local.h" 20 #include "base/threading/thread_local.h"
20 #include "base/threading/thread_task_runner_handle.h" 21 #include "base/threading/thread_task_runner_handle.h"
21 #include "base/trace_event/trace_event.h" 22 #include "base/trace_event/trace_event.h"
22 #include "ipc/ipc_channel_factory.h" 23 #include "ipc/ipc_channel_factory.h"
23 #include "ipc/ipc_logging.h" 24 #include "ipc/ipc_logging.h"
24 #include "ipc/ipc_message_macros.h" 25 #include "ipc/ipc_message_macros.h"
25 #include "ipc/ipc_sync_message.h" 26 #include "ipc/ipc_sync_message.h"
27 #include "ipc/mojo_event.h"
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
26 29
27 using base::TimeDelta;
28 using base::TimeTicks;
29 using base::WaitableEvent; 30 using base::WaitableEvent;
30 31
31 namespace IPC { 32 namespace IPC {
33
34 namespace {
35
36 // A generic callback used when watching handles synchronously. Sets |*signal|
37 // to true. Also sets |*error| to true in case of an error.
38 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
39 *signal = true;
40 *error = result != MOJO_RESULT_OK;
41 }
42
43 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but
44 // is only used in cases where failure should be impossible) and runs
45 // |callback|.
46 void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
47 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
48 if (result == MOJO_RESULT_OK)
49 callback.Run();
50 }
51
52 } // namespace
53
54 // A lazy thread-local Mojo Event which is always signaled. Used to wake up the
55 // sync waiter when a SyncMessage requires the MessageLoop to be pumped while
56 // waiting for a reply. This object is created lazily and ref-counted so it can
57 // be cleaned up when no longer in use.
jam 2016/06/21 20:09:05 is cleaning up really necessary? i.e. is this prem
58 class SyncChannel::PumpMessagesEvent
59 : public base::RefCountedThreadSafe<PumpMessagesEvent> {
60 public:
61 static scoped_refptr<PumpMessagesEvent> current() {
62 scoped_refptr<PumpMessagesEvent> current = current_event_.Pointer()->Get();
63 if (!current) {
64 current = new PumpMessagesEvent;
65 current_event_.Pointer()->Set(current.get());
66 }
67 return current;
68 }
69
70 const MojoEvent* event() const { return &event_; }
71
72 private:
73 friend class base::RefCountedThreadSafe<PumpMessagesEvent>;
74
75 PumpMessagesEvent() { event_.Signal(); }
76
77 ~PumpMessagesEvent() {
78 DCHECK_EQ(current_event_.Pointer()->Get(), this);
79 current_event_.Pointer()->Set(nullptr);
80 }
81
82 MojoEvent event_;
83
84 static base::LazyInstance<base::ThreadLocalPointer<
85 SyncChannel::PumpMessagesEvent>> current_event_;
86
87 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
88 };
89
90 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::PumpMessagesEvent>>
91 SyncChannel::PumpMessagesEvent::current_event_ =
92 LAZY_INSTANCE_INITIALIZER;
93
94
32 // When we're blocked in a Send(), we need to process incoming synchronous 95 // When we're blocked in a Send(), we need to process incoming synchronous
33 // messages right away because it could be blocking our reply (either 96 // messages right away because it could be blocking our reply (either
34 // directly from the same object we're calling, or indirectly through one or 97 // directly from the same object we're calling, or indirectly through one or
35 // more other channels). That means that in SyncContext's OnMessageReceived, 98 // more other channels). That means that in SyncContext's OnMessageReceived,
36 // we need to process sync message right away if we're blocked. However a 99 // we need to process sync message right away if we're blocked. However a
37 // simple check isn't sufficient, because the listener thread can be in the 100 // simple check isn't sufficient, because the listener thread can be in the
38 // process of calling Send. 101 // process of calling Send.
39 // To work around this, when SyncChannel filters a sync message, it sets 102 // To work around this, when SyncChannel filters a sync message, it sets
40 // an event that the listener thread waits on during its Send() call. This 103 // an event that the listener thread waits on during its Send() call. This
41 // allows us to dispatch incoming sync messages when blocked. The race 104 // allows us to dispatch incoming sync messages when blocked. The race
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
148 iter++; 211 iter++;
149 } 212 }
150 } 213 }
151 214
152 if (--listener_count_ == 0) { 215 if (--listener_count_ == 0) {
153 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 216 DCHECK(lazy_tls_ptr_.Pointer()->Get());
154 lazy_tls_ptr_.Pointer()->Set(NULL); 217 lazy_tls_ptr_.Pointer()->Set(NULL);
155 } 218 }
156 } 219 }
157 220
158 WaitableEvent* dispatch_event() { return &dispatch_event_; } 221 MojoEvent* dispatch_event() { return &dispatch_event_; }
159 base::SingleThreadTaskRunner* listener_task_runner() { 222 base::SingleThreadTaskRunner* listener_task_runner() {
160 return listener_task_runner_.get(); 223 return listener_task_runner_.get();
161 } 224 }
162 225
163 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 226 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
164 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 227 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
165 lazy_tls_ptr_; 228 lazy_tls_ptr_;
166 229
167 // Called on the ipc thread to check if we can unblock any current Send() 230 // Called on the ipc thread to check if we can unblock any current Send()
168 // calls based on a queued reply. 231 // calls based on a queued reply.
169 void DispatchReplies() { 232 void DispatchReplies() {
170 for (size_t i = 0; i < received_replies_.size(); ++i) { 233 for (size_t i = 0; i < received_replies_.size(); ++i) {
171 Message* message = received_replies_[i].message; 234 Message* message = received_replies_[i].message;
172 if (received_replies_[i].context->TryToUnblockListener(message)) { 235 if (received_replies_[i].context->TryToUnblockListener(message)) {
173 delete message; 236 delete message;
174 received_replies_.erase(received_replies_.begin() + i); 237 received_replies_.erase(received_replies_.begin() + i);
175 return; 238 return;
176 } 239 }
177 } 240 }
178 } 241 }
179 242
180 base::WaitableEventWatcher* top_send_done_watcher() { 243 mojo::Watcher* top_send_done_watcher() {
181 return top_send_done_watcher_; 244 return top_send_done_watcher_;
182 } 245 }
183 246
184 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { 247 void set_top_send_done_watcher(mojo::Watcher* watcher) {
185 top_send_done_watcher_ = watcher; 248 top_send_done_watcher_ = watcher;
186 } 249 }
187 250
188 private: 251 private:
189 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 252 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
190 253
191 // See the comment in SyncChannel::SyncChannel for why this event is created 254 // See the comment in SyncChannel::SyncChannel for why this event is created
192 // as manual reset. 255 // as manual reset.
193 ReceivedSyncMsgQueue() 256 ReceivedSyncMsgQueue()
194 : message_queue_version_(0), 257 : message_queue_version_(0),
195 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
196 base::WaitableEvent::InitialState::NOT_SIGNALED),
197 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 258 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
198 task_pending_(false), 259 task_pending_(false),
199 listener_count_(0), 260 listener_count_(0),
200 top_send_done_watcher_(NULL) {} 261 top_send_done_watcher_(NULL) {}
201 262
202 ~ReceivedSyncMsgQueue() {} 263 ~ReceivedSyncMsgQueue() {}
203 264
204 // Holds information about a queued synchronous message or reply. 265 // Holds information about a queued synchronous message or reply.
205 struct QueuedMessage { 266 struct QueuedMessage {
206 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 267 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
207 Message* message; 268 Message* message;
208 scoped_refptr<SyncChannel::SyncContext> context; 269 scoped_refptr<SyncChannel::SyncContext> context;
209 }; 270 };
210 271
211 typedef std::list<QueuedMessage> SyncMessageQueue; 272 typedef std::list<QueuedMessage> SyncMessageQueue;
212 SyncMessageQueue message_queue_; 273 SyncMessageQueue message_queue_;
213 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan 274 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan
214 275
215 std::vector<QueuedMessage> received_replies_; 276 std::vector<QueuedMessage> received_replies_;
216 277
217 // Set when we got a synchronous message that we must respond to as the 278 // Signaled when we get a synchronous message that we must respond to, as the
218 // sender needs its reply before it can reply to our original synchronous 279 // sender needs its reply before it can reply to our original synchronous
219 // message. 280 // message.
220 WaitableEvent dispatch_event_; 281 MojoEvent dispatch_event_;
221 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 282 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
222 base::Lock message_lock_; 283 base::Lock message_lock_;
223 bool task_pending_; 284 bool task_pending_;
224 int listener_count_; 285 int listener_count_;
225 286
226 // The current send done event watcher for this thread. Used to maintain 287 // The current send done handle watcher for this thread. Used to maintain
227 // a local global stack of send done watchers to ensure that nested sync 288 // a thread-local stack of send done watchers to ensure that nested sync
228 // message loops complete correctly. 289 // message loops complete correctly.
229 base::WaitableEventWatcher* top_send_done_watcher_; 290 mojo::Watcher* top_send_done_watcher_;
230 }; 291 };
231 292
232 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 293 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
233 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 294 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
234 LAZY_INSTANCE_INITIALIZER; 295 LAZY_INSTANCE_INITIALIZER;
235 296
236 SyncChannel::SyncContext::SyncContext( 297 SyncChannel::SyncContext::SyncContext(
237 Listener* listener, 298 Listener* listener,
238 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 299 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
239 WaitableEvent* shutdown_event) 300 WaitableEvent* shutdown_event)
240 : ChannelProxy::Context(listener, ipc_task_runner), 301 : ChannelProxy::Context(listener, ipc_task_runner),
241 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 302 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
242 shutdown_event_(shutdown_event), 303 shutdown_event_(shutdown_event),
243 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 304 restrict_dispatch_group_(kRestrictDispatchGroup_None) {
244 } 305 }
245 306
246 SyncChannel::SyncContext::~SyncContext() { 307 SyncChannel::SyncContext::~SyncContext() {
247 while (!deserializers_.empty()) 308 while (!deserializers_.empty())
248 Pop(); 309 Pop();
249 } 310 }
250 311
251 // Adds information about an outgoing sync message to the context so that 312 // Adds information about an outgoing sync message to the context so that
252 // we know how to deserialize the reply. Returns a handle that's set when 313 // we know how to deserialize the reply. Returns |true| if the message was added
253 // the reply has arrived. 314 // to the context or |false| if it was rejected (e.g. due to shutdown.)
254 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 315 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
255 // Create the tracking information for this message. This object is stored 316 // Create the tracking information for this message. This object is stored
256 // by value since all members are pointers that are cheap to copy. These 317 // by value since all members are pointers that are cheap to copy. These
257 // pointers are cleaned up in the Pop() function. 318 // pointers are cleaned up in the Pop() function.
258 // 319 //
259 // The event is created as manual reset because in between Signal and 320 // The event is created as manual reset because in between Signal and
260 // OnObjectSignalled, another Send can happen which would stop the watcher 321 // OnObjectSignalled, another Send can happen which would stop the watcher
261 // from being called. The event would get watched later, when the nested 322 // from being called. The event would get watched later, when the nested
262 // Send completes, so the event will need to remain set. 323 // Send completes, so the event will need to remain set.
324 base::AutoLock auto_lock(deserializers_lock_);
325 if (reject_new_deserializers_)
326 return false;
263 PendingSyncMsg pending( 327 PendingSyncMsg pending(
264 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), 328 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
265 new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, 329 new MojoEvent);
266 base::WaitableEvent::InitialState::NOT_SIGNALED));
267 base::AutoLock auto_lock(deserializers_lock_);
268 deserializers_.push_back(pending); 330 deserializers_.push_back(pending);
331 return true;
269 } 332 }
270 333
271 bool SyncChannel::SyncContext::Pop() { 334 bool SyncChannel::SyncContext::Pop() {
272 bool result; 335 bool result;
273 { 336 {
274 base::AutoLock auto_lock(deserializers_lock_); 337 base::AutoLock auto_lock(deserializers_lock_);
275 PendingSyncMsg msg = deserializers_.back(); 338 PendingSyncMsg msg = deserializers_.back();
276 delete msg.deserializer; 339 delete msg.deserializer;
277 delete msg.done_event; 340 delete msg.done_event;
278 msg.done_event = NULL; 341 msg.done_event = nullptr;
279 deserializers_.pop_back(); 342 deserializers_.pop_back();
280 result = msg.send_result; 343 result = msg.send_result;
281 } 344 }
282 345
283 // We got a reply to a synchronous Send() call that's blocking the listener 346 // We got a reply to a synchronous Send() call that's blocking the listener
284 // thread. However, further down the call stack there could be another 347 // thread. However, further down the call stack there could be another
285 // blocking Send() call, whose reply we received after we made this last 348 // blocking Send() call, whose reply we received after we made this last
286 // Send() call. So check if we have any queued replies available that 349 // Send() call. So check if we have any queued replies available that
287 // can now unblock the listener thread. 350 // can now unblock the listener thread.
288 ipc_task_runner()->PostTask( 351 ipc_task_runner()->PostTask(
289 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 352 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
290 received_sync_msgs_.get())); 353 received_sync_msgs_.get()));
291 354
292 return result; 355 return result;
293 } 356 }
294 357
295 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 358 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
296 base::AutoLock auto_lock(deserializers_lock_); 359 base::AutoLock auto_lock(deserializers_lock_);
297 return deserializers_.back().done_event; 360 return deserializers_.back().done_event;
298 } 361 }
299 362
300 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { 363 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() {
301 return received_sync_msgs_->dispatch_event(); 364 return received_sync_msgs_->dispatch_event();
302 } 365 }
303 366
304 void SyncChannel::SyncContext::DispatchMessages() { 367 void SyncChannel::SyncContext::DispatchMessages() {
305 received_sync_msgs_->DispatchMessages(this); 368 received_sync_msgs_->DispatchMessages(this);
306 } 369 }
307 370
308 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 371 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
309 base::AutoLock auto_lock(deserializers_lock_); 372 base::AutoLock auto_lock(deserializers_lock_);
310 if (deserializers_.empty() || 373 if (deserializers_.empty() ||
311 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 374 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
312 return false; 375 return false;
313 } 376 }
314 377
315 if (!msg->is_reply_error()) { 378 if (!msg->is_reply_error()) {
316 bool send_result = deserializers_.back().deserializer-> 379 bool send_result = deserializers_.back().deserializer->
317 SerializeOutputParameters(*msg); 380 SerializeOutputParameters(*msg);
318 deserializers_.back().send_result = send_result; 381 deserializers_.back().send_result = send_result;
319 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; 382 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
320 } else { 383 } else {
321 DVLOG(1) << "Received error reply"; 384 DVLOG(1) << "Received error reply";
322 } 385 }
323 386
324 base::WaitableEvent* done_event = deserializers_.back().done_event; 387 MojoEvent* done_event = deserializers_.back().done_event;
325 TRACE_EVENT_FLOW_BEGIN0( 388 TRACE_EVENT_FLOW_BEGIN0(
326 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 389 TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
327 "SyncChannel::SyncContext::TryToUnblockListener", done_event); 390 "SyncChannel::SyncContext::TryToUnblockListener", done_event);
328 391
329 done_event->Signal(); 392 done_event->Signal();
330 393
331 return true; 394 return true;
332 } 395 }
333 396
334 void SyncChannel::SyncContext::Clear() { 397 void SyncChannel::SyncContext::Clear() {
(...skipping 25 matching lines...) Expand all
360 423
361 void SyncChannel::SyncContext::OnChannelError() { 424 void SyncChannel::SyncContext::OnChannelError() {
362 CancelPendingSends(); 425 CancelPendingSends();
363 shutdown_watcher_.StopWatching(); 426 shutdown_watcher_.StopWatching();
364 Context::OnChannelError(); 427 Context::OnChannelError();
365 } 428 }
366 429
367 void SyncChannel::SyncContext::OnChannelOpened() { 430 void SyncChannel::SyncContext::OnChannelOpened() {
368 shutdown_watcher_.StartWatching( 431 shutdown_watcher_.StartWatching(
369 shutdown_event_, 432 shutdown_event_,
370 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, 433 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
371 base::Unretained(this))); 434 base::Unretained(this)));
372 Context::OnChannelOpened(); 435 Context::OnChannelOpened();
373 } 436 }
374 437
375 void SyncChannel::SyncContext::OnChannelClosed() { 438 void SyncChannel::SyncContext::OnChannelClosed() {
376 CancelPendingSends(); 439 CancelPendingSends();
377 shutdown_watcher_.StopWatching(); 440 shutdown_watcher_.StopWatching();
378 Context::OnChannelClosed(); 441 Context::OnChannelClosed();
379 } 442 }
380 443
381 void SyncChannel::SyncContext::CancelPendingSends() { 444 void SyncChannel::SyncContext::CancelPendingSends() {
382 base::AutoLock auto_lock(deserializers_lock_); 445 base::AutoLock auto_lock(deserializers_lock_);
446 reject_new_deserializers_ = true;
383 PendingSyncMessageQueue::iterator iter; 447 PendingSyncMessageQueue::iterator iter;
384 DVLOG(1) << "Canceling pending sends"; 448 DVLOG(1) << "Canceling pending sends";
385 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { 449 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
386 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 450 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
387 "SyncChannel::SyncContext::CancelPendingSends", 451 "SyncChannel::SyncContext::CancelPendingSends",
388 iter->done_event); 452 iter->done_event);
389 iter->done_event->Signal(); 453 iter->done_event->Signal();
390 } 454 }
391 } 455 }
392 456
393 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { 457 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
394 if (event == shutdown_event_) { 458 DCHECK_EQ(event, shutdown_event_);
395 // Process shut down before we can get a reply to a synchronous message.
396 // Cancel pending Send calls, which will end up setting the send done event.
397 CancelPendingSends();
398 } else {
399 // We got the reply, timed out or the process shutdown.
400 DCHECK_EQ(GetSendDoneEvent(), event);
401 base::MessageLoop::current()->QuitNow();
402 }
403 }
404 459
405 base::WaitableEventWatcher::EventCallback 460 // Process shut down before we can get a reply to a synchronous message.
406 SyncChannel::SyncContext::MakeWaitableEventCallback() { 461 // Cancel pending Send calls, which will end up setting the send done event.
407 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); 462 CancelPendingSends();
408 } 463 }
409 464
410 // static 465 // static
411 std::unique_ptr<SyncChannel> SyncChannel::Create( 466 std::unique_ptr<SyncChannel> SyncChannel::Create(
412 const IPC::ChannelHandle& channel_handle, 467 const IPC::ChannelHandle& channel_handle,
413 Channel::Mode mode, 468 Channel::Mode mode,
414 Listener* listener, 469 Listener* listener,
415 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 470 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
416 bool create_pipe_now, 471 bool create_pipe_now,
417 base::WaitableEvent* shutdown_event) { 472 base::WaitableEvent* shutdown_event) {
(...skipping 22 matching lines...) Expand all
440 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 495 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
441 WaitableEvent* shutdown_event) { 496 WaitableEvent* shutdown_event) {
442 return base::WrapUnique( 497 return base::WrapUnique(
443 new SyncChannel(listener, ipc_task_runner, shutdown_event)); 498 new SyncChannel(listener, ipc_task_runner, shutdown_event));
444 } 499 }
445 500
446 SyncChannel::SyncChannel( 501 SyncChannel::SyncChannel(
447 Listener* listener, 502 Listener* listener,
448 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 503 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
449 WaitableEvent* shutdown_event) 504 WaitableEvent* shutdown_event)
450 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { 505 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
506 pump_messages_event_(PumpMessagesEvent::current()),
507 sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
451 // The current (listener) thread must be distinct from the IPC thread, or else 508 // The current (listener) thread must be distinct from the IPC thread, or else
452 // sending synchronous messages will deadlock. 509 // sending synchronous messages will deadlock.
453 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); 510 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
454 StartWatching(); 511 StartWatching();
455 } 512 }
456 513
457 SyncChannel::~SyncChannel() { 514 SyncChannel::~SyncChannel() {
458 } 515 }
459 516
460 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 517 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
(...skipping 18 matching lines...) Expand all
479 #else 536 #else
480 TRACE_EVENT2("ipc", "SyncChannel::Send", 537 TRACE_EVENT2("ipc", "SyncChannel::Send",
481 "class", IPC_MESSAGE_ID_CLASS(message->type()), 538 "class", IPC_MESSAGE_ID_CLASS(message->type()),
482 "line", IPC_MESSAGE_ID_LINE(message->type())); 539 "line", IPC_MESSAGE_ID_LINE(message->type()));
483 #endif 540 #endif
484 if (!message->is_sync()) { 541 if (!message->is_sync()) {
485 ChannelProxy::Send(message); 542 ChannelProxy::Send(message);
486 return true; 543 return true;
487 } 544 }
488 545
546 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
547 bool pump_messages = sync_msg->ShouldPumpMessages();
548
489 // *this* might get deleted in WaitForReply. 549 // *this* might get deleted in WaitForReply.
490 scoped_refptr<SyncContext> context(sync_context()); 550 scoped_refptr<SyncContext> context(sync_context());
491 if (context->shutdown_event()->IsSignaled()) { 551 if (!context->Push(sync_msg)) {
492 DVLOG(1) << "shutdown event is signaled"; 552 DVLOG(1) << "Channel is shutting down. Dropping sync message.";
493 delete message; 553 delete message;
494 return false; 554 return false;
495 } 555 }
496 556
497 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
498 context->Push(sync_msg);
499 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
500
501 ChannelProxy::Send(message); 557 ChannelProxy::Send(message);
502 558
503 // Wait for reply, or for any other incoming synchronous messages. 559 // Wait for reply, or for any other incoming synchronous messages.
504 // *this* might get deleted, so only call static functions at this point. 560 // *this* might get deleted, so only call static functions at this point.
505 WaitForReply(context.get(), pump_messages_event); 561 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
562 scoped_refptr<PumpMessagesEvent> pump_messages_event = pump_messages_event_;
563 WaitForReply(registry.get(), context.get(),
564 pump_messages ? pump_messages_event->event() : nullptr);
506 565
507 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 566 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
508 "SyncChannel::Send", context->GetSendDoneEvent()); 567 "SyncChannel::Send", context->GetSendDoneEvent());
509 568
510 return context->Pop(); 569 return context->Pop();
511 } 570 }
512 571
513 void SyncChannel::WaitForReply( 572 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
514 SyncContext* context, WaitableEvent* pump_messages_event) { 573 SyncContext* context,
574 const MojoEvent* pump_messages_event) {
515 context->DispatchMessages(); 575 context->DispatchMessages();
576
516 while (true) { 577 while (true) {
517 WaitableEvent* objects[] = { 578 bool dispatch = false;
518 context->GetDispatchEvent(), 579 bool send_done = false;
519 context->GetSendDoneEvent(), 580 bool should_pump_messages = false;
520 pump_messages_event 581 bool error = false;
521 }; 582 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(),
583 MOJO_HANDLE_SIGNAL_READABLE,
584 base::Bind(&OnSyncHandleReady, &dispatch, &error));
585 registry->RegisterHandle(
586 context->GetSendDoneEvent()->GetHandle(),
587 MOJO_HANDLE_SIGNAL_READABLE,
588 base::Bind(&OnSyncHandleReady, &send_done, &error));
589 if (pump_messages_event) {
590 registry->RegisterHandle(
591 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
592 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
593 }
522 594
523 unsigned count = pump_messages_event ? 3: 2; 595 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
524 size_t result = WaitableEvent::WaitMany(objects, count); 596 registry->WatchAllHandles(stop_flags, 3);
525 if (result == 0 /* dispatch event */) { 597 DCHECK(!error);
598
599 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle());
600 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
601 if (pump_messages_event)
602 registry->UnregisterHandle(pump_messages_event->GetHandle());
603
604 if (dispatch) {
526 // We're waiting for a reply, but we received a blocking synchronous 605 // We're waiting for a reply, but we received a blocking synchronous
527 // call. We must process it or otherwise a deadlock might occur. 606 // call. We must process it or otherwise a deadlock might occur.
528 context->GetDispatchEvent()->Reset(); 607 context->GetDispatchEvent()->Reset();
529 context->DispatchMessages(); 608 context->DispatchMessages();
530 continue; 609 continue;
531 } 610 }
532 611
533 if (result == 2 /* pump_messages_event */) 612 if (should_pump_messages)
534 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 613 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
535 614
536 break; 615 break;
537 } 616 }
538 } 617 }
539 618
540 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 619 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
541 base::WaitableEventWatcher send_done_watcher; 620 mojo::Watcher send_done_watcher;
542 621
543 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 622 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
544 DCHECK(sync_msg_queue != NULL); 623 DCHECK_NE(sync_msg_queue, nullptr);
545 624
546 base::WaitableEventWatcher* old_send_done_event_watcher = 625 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher();
547 sync_msg_queue->top_send_done_watcher(); 626 mojo::Handle old_handle(mojo::kInvalidHandleValue);
627 mojo::Watcher::ReadyCallback old_callback;
548 628
549 base::WaitableEventWatcher::EventCallback old_callback; 629 // Maintain a thread-local stack of watchers to ensure nested calls complete
550 base::WaitableEvent* old_event = NULL; 630 // in the correct sequence, i.e. the outermost call completes first, etc.
551 631 if (old_watcher) {
552 // Maintain a local global stack of send done delegates to ensure that 632 old_callback = old_watcher->ready_callback();
553 // nested sync calls complete in the correct sequence, i.e. the 633 old_handle = old_watcher->handle();
554 // outermost call completes first, etc. 634 old_watcher->Cancel();
555 if (old_send_done_event_watcher) {
556 old_callback = old_send_done_event_watcher->callback();
557 old_event = old_send_done_event_watcher->GetWatchedEvent();
558 old_send_done_event_watcher->StopWatching();
559 } 635 }
560 636
561 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); 637 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
562 638
563 send_done_watcher.StartWatching(context->GetSendDoneEvent(), 639 {
564 context->MakeWaitableEventCallback()); 640 base::RunLoop nested_loop;
641 send_done_watcher.Start(
642 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
643 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
565 644
566 {
567 base::MessageLoop::ScopedNestableTaskAllower allow( 645 base::MessageLoop::ScopedNestableTaskAllower allow(
568 base::MessageLoop::current()); 646 base::MessageLoop::current());
569 base::MessageLoop::current()->Run(); 647 nested_loop.Run();
648 send_done_watcher.Cancel();
570 } 649 }
571 650
572 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); 651 sync_msg_queue->set_top_send_done_watcher(old_watcher);
573 if (old_send_done_event_watcher && old_event) { 652 if (old_watcher)
574 old_send_done_event_watcher->StartWatching(old_event, old_callback); 653 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
575 }
576 } 654 }
577 655
578 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { 656 void SyncChannel::OnDispatchHandleReady(MojoResult result) {
579 DCHECK(event == sync_context()->GetDispatchEvent()); 657 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
580 // The call to DispatchMessages might delete this object, so reregister 658 if (result == MOJO_RESULT_OK) {
581 // the object watcher first. 659 sync_context()->GetDispatchEvent()->Reset();
582 event->Reset(); 660 sync_context()->DispatchMessages();
583 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); 661 }
584 sync_context()->DispatchMessages();
585 } 662 }
586 663
587 void SyncChannel::StartWatching() { 664 void SyncChannel::StartWatching() {
588 // Ideally we only want to watch this object when running a nested message 665 // Ideally we only want to watch this object when running a nested message
589 // loop. However, we don't know when it exits if there's another nested 666 // loop. However, we don't know when it exits if there's another nested
590 // message loop running under it or not, so we wouldn't know whether to 667 // message loop running under it or not, so we wouldn't know whether to
591 // stop or keep watching. So we always watch it, and create the event as 668 // stop or keep watching. So we always watch it.
592 // manual reset since the object watcher might otherwise reset the event 669 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(),
593 // when we're doing a WaitMany. 670 MOJO_HANDLE_SIGNAL_READABLE,
594 dispatch_watcher_callback_ = 671 base::Bind(&SyncChannel::OnDispatchHandleReady,
595 base::Bind(&SyncChannel::OnWaitableEventSignaled, 672 base::Unretained(this)));
596 base::Unretained(this));
597 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
598 dispatch_watcher_callback_);
599 } 673 }
600 674
601 void SyncChannel::OnChannelInit() { 675 void SyncChannel::OnChannelInit() {
602 for (const auto& filter : pre_init_sync_message_filters_) { 676 for (const auto& filter : pre_init_sync_message_filters_) {
603 filter->set_is_channel_send_thread_safe( 677 filter->set_is_channel_send_thread_safe(
604 context()->IsChannelSendThreadSafe()); 678 context()->IsChannelSendThreadSafe());
605 } 679 }
606 pre_init_sync_message_filters_.clear(); 680 pre_init_sync_message_filters_.clear();
607 } 681 }
608 682
609 } // namespace IPC 683 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698