Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(625)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 2101163002: Reland Mojo-based waiting for IPC::SyncChannel (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: rebase Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/lazy_instance.h" 13 #include "base/lazy_instance.h"
14 #include "base/location.h" 14 #include "base/location.h"
15 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/macros.h"
16 #include "base/memory/ptr_util.h" 17 #include "base/memory/ptr_util.h"
17 #include "base/run_loop.h" 18 #include "base/run_loop.h"
18 #include "base/synchronization/waitable_event.h" 19 #include "base/synchronization/waitable_event.h"
19 #include "base/synchronization/waitable_event_watcher.h"
20 #include "base/threading/thread_local.h" 20 #include "base/threading/thread_local.h"
21 #include "base/threading/thread_task_runner_handle.h" 21 #include "base/threading/thread_task_runner_handle.h"
22 #include "base/trace_event/trace_event.h" 22 #include "base/trace_event/trace_event.h"
23 #include "ipc/ipc_channel_factory.h" 23 #include "ipc/ipc_channel_factory.h"
24 #include "ipc/ipc_logging.h" 24 #include "ipc/ipc_logging.h"
25 #include "ipc/ipc_message_macros.h" 25 #include "ipc/ipc_message_macros.h"
26 #include "ipc/ipc_sync_message.h" 26 #include "ipc/ipc_sync_message.h"
27 #include "ipc/mojo_event.h"
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
27 29
28 using base::TimeDelta;
29 using base::TimeTicks;
30 using base::WaitableEvent; 30 using base::WaitableEvent;
31 31
32 namespace IPC { 32 namespace IPC {
33
34 namespace {
35
36 // A generic callback used when watching handles synchronously. Sets |*signal|
37 // to true. Also sets |*error| to true in case of an error.
38 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
39 *signal = true;
40 *error = result != MOJO_RESULT_OK;
41 }
42
43 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but
44 // is only used in cases where failure should be impossible) and runs
45 // |callback|.
46 void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
47 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
48 if (result == MOJO_RESULT_OK)
49 callback.Run();
50 }
51
52 class PumpMessagesEvent {
53 public:
54 PumpMessagesEvent() { event_.Signal(); }
55 ~PumpMessagesEvent() {}
56
57 const MojoEvent* event() const { return &event_; }
58
59 private:
60 MojoEvent event_;
61
62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
63 };
64
65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
66 LAZY_INSTANCE_INITIALIZER;
67
68 } // namespace
69
33 // When we're blocked in a Send(), we need to process incoming synchronous 70 // When we're blocked in a Send(), we need to process incoming synchronous
34 // messages right away because it could be blocking our reply (either 71 // messages right away because it could be blocking our reply (either
35 // directly from the same object we're calling, or indirectly through one or 72 // directly from the same object we're calling, or indirectly through one or
36 // more other channels). That means that in SyncContext's OnMessageReceived, 73 // more other channels). That means that in SyncContext's OnMessageReceived,
37 // we need to process sync message right away if we're blocked. However a 74 // we need to process sync message right away if we're blocked. However a
38 // simple check isn't sufficient, because the listener thread can be in the 75 // simple check isn't sufficient, because the listener thread can be in the
39 // process of calling Send. 76 // process of calling Send.
40 // To work around this, when SyncChannel filters a sync message, it sets 77 // To work around this, when SyncChannel filters a sync message, it sets
41 // an event that the listener thread waits on during its Send() call. This 78 // an event that the listener thread waits on during its Send() call. This
42 // allows us to dispatch incoming sync messages when blocked. The race 79 // allows us to dispatch incoming sync messages when blocked. The race
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
149 iter++; 186 iter++;
150 } 187 }
151 } 188 }
152 189
153 if (--listener_count_ == 0) { 190 if (--listener_count_ == 0) {
154 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 191 DCHECK(lazy_tls_ptr_.Pointer()->Get());
155 lazy_tls_ptr_.Pointer()->Set(NULL); 192 lazy_tls_ptr_.Pointer()->Set(NULL);
156 } 193 }
157 } 194 }
158 195
159 WaitableEvent* dispatch_event() { return &dispatch_event_; } 196 MojoEvent* dispatch_event() { return &dispatch_event_; }
160 base::SingleThreadTaskRunner* listener_task_runner() { 197 base::SingleThreadTaskRunner* listener_task_runner() {
161 return listener_task_runner_.get(); 198 return listener_task_runner_.get();
162 } 199 }
163 200
164 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 201 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
165 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 202 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
166 lazy_tls_ptr_; 203 lazy_tls_ptr_;
167 204
168 // Called on the ipc thread to check if we can unblock any current Send() 205 // Called on the ipc thread to check if we can unblock any current Send()
169 // calls based on a queued reply. 206 // calls based on a queued reply.
170 void DispatchReplies() { 207 void DispatchReplies() {
171 for (size_t i = 0; i < received_replies_.size(); ++i) { 208 for (size_t i = 0; i < received_replies_.size(); ++i) {
172 Message* message = received_replies_[i].message; 209 Message* message = received_replies_[i].message;
173 if (received_replies_[i].context->TryToUnblockListener(message)) { 210 if (received_replies_[i].context->TryToUnblockListener(message)) {
174 delete message; 211 delete message;
175 received_replies_.erase(received_replies_.begin() + i); 212 received_replies_.erase(received_replies_.begin() + i);
176 return; 213 return;
177 } 214 }
178 } 215 }
179 } 216 }
180 217
181 base::WaitableEventWatcher* top_send_done_watcher() { 218 mojo::Watcher* top_send_done_watcher() {
182 return top_send_done_watcher_; 219 return top_send_done_watcher_;
183 } 220 }
184 221
185 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { 222 void set_top_send_done_watcher(mojo::Watcher* watcher) {
186 top_send_done_watcher_ = watcher; 223 top_send_done_watcher_ = watcher;
187 } 224 }
188 225
189 private: 226 private:
190 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 227 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
191 228
192 // See the comment in SyncChannel::SyncChannel for why this event is created 229 // See the comment in SyncChannel::SyncChannel for why this event is created
193 // as manual reset. 230 // as manual reset.
194 ReceivedSyncMsgQueue() 231 ReceivedSyncMsgQueue()
195 : message_queue_version_(0), 232 : message_queue_version_(0),
196 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
197 base::WaitableEvent::InitialState::NOT_SIGNALED),
198 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 233 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
199 task_pending_(false), 234 task_pending_(false),
200 listener_count_(0), 235 listener_count_(0),
201 top_send_done_watcher_(NULL) {} 236 top_send_done_watcher_(NULL) {}
202 237
203 ~ReceivedSyncMsgQueue() {} 238 ~ReceivedSyncMsgQueue() {}
204 239
205 // Holds information about a queued synchronous message or reply. 240 // Holds information about a queued synchronous message or reply.
206 struct QueuedMessage { 241 struct QueuedMessage {
207 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 242 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
208 Message* message; 243 Message* message;
209 scoped_refptr<SyncChannel::SyncContext> context; 244 scoped_refptr<SyncChannel::SyncContext> context;
210 }; 245 };
211 246
212 typedef std::list<QueuedMessage> SyncMessageQueue; 247 typedef std::list<QueuedMessage> SyncMessageQueue;
213 SyncMessageQueue message_queue_; 248 SyncMessageQueue message_queue_;
214 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan 249 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan
215 250
216 std::vector<QueuedMessage> received_replies_; 251 std::vector<QueuedMessage> received_replies_;
217 252
218 // Set when we got a synchronous message that we must respond to as the 253 // Signaled when we get a synchronous message that we must respond to, as the
219 // sender needs its reply before it can reply to our original synchronous 254 // sender needs its reply before it can reply to our original synchronous
220 // message. 255 // message.
221 WaitableEvent dispatch_event_; 256 MojoEvent dispatch_event_;
222 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 257 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
223 base::Lock message_lock_; 258 base::Lock message_lock_;
224 bool task_pending_; 259 bool task_pending_;
225 int listener_count_; 260 int listener_count_;
226 261
227 // The current send done event watcher for this thread. Used to maintain 262 // The current send done handle watcher for this thread. Used to maintain
228 // a local global stack of send done watchers to ensure that nested sync 263 // a thread-local stack of send done watchers to ensure that nested sync
229 // message loops complete correctly. 264 // message loops complete correctly.
230 base::WaitableEventWatcher* top_send_done_watcher_; 265 mojo::Watcher* top_send_done_watcher_;
231 }; 266 };
232 267
233 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 268 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
234 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 269 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
235 LAZY_INSTANCE_INITIALIZER; 270 LAZY_INSTANCE_INITIALIZER;
236 271
237 SyncChannel::SyncContext::SyncContext( 272 SyncChannel::SyncContext::SyncContext(
238 Listener* listener, 273 Listener* listener,
239 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 274 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
240 WaitableEvent* shutdown_event) 275 WaitableEvent* shutdown_event)
241 : ChannelProxy::Context(listener, ipc_task_runner), 276 : ChannelProxy::Context(listener, ipc_task_runner),
242 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 277 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
243 shutdown_event_(shutdown_event), 278 shutdown_event_(shutdown_event),
244 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 279 restrict_dispatch_group_(kRestrictDispatchGroup_None) {
245 } 280 }
246 281
247 SyncChannel::SyncContext::~SyncContext() { 282 SyncChannel::SyncContext::~SyncContext() {
248 while (!deserializers_.empty()) 283 while (!deserializers_.empty())
249 Pop(); 284 Pop();
250 } 285 }
251 286
252 // Adds information about an outgoing sync message to the context so that 287 // Adds information about an outgoing sync message to the context so that
253 // we know how to deserialize the reply. Returns a handle that's set when 288 // we know how to deserialize the reply. Returns |true| if the message was added
254 // the reply has arrived. 289 // to the context or |false| if it was rejected (e.g. due to shutdown.)
255 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 290 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
256 // Create the tracking information for this message. This object is stored 291 // Create the tracking information for this message. This object is stored
257 // by value since all members are pointers that are cheap to copy. These 292 // by value since all members are pointers that are cheap to copy. These
258 // pointers are cleaned up in the Pop() function. 293 // pointers are cleaned up in the Pop() function.
259 // 294 //
260 // The event is created as manual reset because in between Signal and 295 // The event is created as manual reset because in between Signal and
261 // OnObjectSignalled, another Send can happen which would stop the watcher 296 // OnObjectSignalled, another Send can happen which would stop the watcher
262 // from being called. The event would get watched later, when the nested 297 // from being called. The event would get watched later, when the nested
263 // Send completes, so the event will need to remain set. 298 // Send completes, so the event will need to remain set.
299 base::AutoLock auto_lock(deserializers_lock_);
300 if (reject_new_deserializers_)
301 return false;
264 PendingSyncMsg pending( 302 PendingSyncMsg pending(
265 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), 303 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
266 new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, 304 new MojoEvent);
267 base::WaitableEvent::InitialState::NOT_SIGNALED));
268 base::AutoLock auto_lock(deserializers_lock_);
269 deserializers_.push_back(pending); 305 deserializers_.push_back(pending);
306 return true;
270 } 307 }
271 308
272 bool SyncChannel::SyncContext::Pop() { 309 bool SyncChannel::SyncContext::Pop() {
273 bool result; 310 bool result;
274 { 311 {
275 base::AutoLock auto_lock(deserializers_lock_); 312 base::AutoLock auto_lock(deserializers_lock_);
276 PendingSyncMsg msg = deserializers_.back(); 313 PendingSyncMsg msg = deserializers_.back();
277 delete msg.deserializer; 314 delete msg.deserializer;
278 delete msg.done_event; 315 delete msg.done_event;
279 msg.done_event = NULL; 316 msg.done_event = nullptr;
280 deserializers_.pop_back(); 317 deserializers_.pop_back();
281 result = msg.send_result; 318 result = msg.send_result;
282 } 319 }
283 320
284 // We got a reply to a synchronous Send() call that's blocking the listener 321 // We got a reply to a synchronous Send() call that's blocking the listener
285 // thread. However, further down the call stack there could be another 322 // thread. However, further down the call stack there could be another
286 // blocking Send() call, whose reply we received after we made this last 323 // blocking Send() call, whose reply we received after we made this last
287 // Send() call. So check if we have any queued replies available that 324 // Send() call. So check if we have any queued replies available that
288 // can now unblock the listener thread. 325 // can now unblock the listener thread.
289 ipc_task_runner()->PostTask( 326 ipc_task_runner()->PostTask(
290 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 327 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
291 received_sync_msgs_.get())); 328 received_sync_msgs_.get()));
292 329
293 return result; 330 return result;
294 } 331 }
295 332
296 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 333 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
297 base::AutoLock auto_lock(deserializers_lock_); 334 base::AutoLock auto_lock(deserializers_lock_);
298 return deserializers_.back().done_event; 335 return deserializers_.back().done_event;
299 } 336 }
300 337
301 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { 338 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() {
302 return received_sync_msgs_->dispatch_event(); 339 return received_sync_msgs_->dispatch_event();
303 } 340 }
304 341
305 void SyncChannel::SyncContext::DispatchMessages() { 342 void SyncChannel::SyncContext::DispatchMessages() {
306 received_sync_msgs_->DispatchMessages(this); 343 received_sync_msgs_->DispatchMessages(this);
307 } 344 }
308 345
309 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 346 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
310 base::AutoLock auto_lock(deserializers_lock_); 347 base::AutoLock auto_lock(deserializers_lock_);
311 if (deserializers_.empty() || 348 if (deserializers_.empty() ||
312 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 349 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
313 return false; 350 return false;
314 } 351 }
315 352
316 if (!msg->is_reply_error()) { 353 if (!msg->is_reply_error()) {
317 bool send_result = deserializers_.back().deserializer-> 354 bool send_result = deserializers_.back().deserializer->
318 SerializeOutputParameters(*msg); 355 SerializeOutputParameters(*msg);
319 deserializers_.back().send_result = send_result; 356 deserializers_.back().send_result = send_result;
320 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; 357 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
321 } else { 358 } else {
322 DVLOG(1) << "Received error reply"; 359 DVLOG(1) << "Received error reply";
323 } 360 }
324 361
325 base::WaitableEvent* done_event = deserializers_.back().done_event; 362 MojoEvent* done_event = deserializers_.back().done_event;
326 TRACE_EVENT_FLOW_BEGIN0( 363 TRACE_EVENT_FLOW_BEGIN0(
327 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 364 TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
328 "SyncChannel::SyncContext::TryToUnblockListener", done_event); 365 "SyncChannel::SyncContext::TryToUnblockListener", done_event);
329 366
330 done_event->Signal(); 367 done_event->Signal();
331 368
332 return true; 369 return true;
333 } 370 }
334 371
335 void SyncChannel::SyncContext::Clear() { 372 void SyncChannel::SyncContext::Clear() {
(...skipping 25 matching lines...) Expand all
361 398
362 void SyncChannel::SyncContext::OnChannelError() { 399 void SyncChannel::SyncContext::OnChannelError() {
363 CancelPendingSends(); 400 CancelPendingSends();
364 shutdown_watcher_.StopWatching(); 401 shutdown_watcher_.StopWatching();
365 Context::OnChannelError(); 402 Context::OnChannelError();
366 } 403 }
367 404
368 void SyncChannel::SyncContext::OnChannelOpened() { 405 void SyncChannel::SyncContext::OnChannelOpened() {
369 shutdown_watcher_.StartWatching( 406 shutdown_watcher_.StartWatching(
370 shutdown_event_, 407 shutdown_event_,
371 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, 408 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled,
372 base::Unretained(this))); 409 base::Unretained(this)));
373 Context::OnChannelOpened(); 410 Context::OnChannelOpened();
374 } 411 }
375 412
376 void SyncChannel::SyncContext::OnChannelClosed() { 413 void SyncChannel::SyncContext::OnChannelClosed() {
377 CancelPendingSends(); 414 CancelPendingSends();
378 shutdown_watcher_.StopWatching(); 415 shutdown_watcher_.StopWatching();
379 Context::OnChannelClosed(); 416 Context::OnChannelClosed();
380 } 417 }
381 418
382 void SyncChannel::SyncContext::CancelPendingSends() { 419 void SyncChannel::SyncContext::CancelPendingSends() {
383 base::AutoLock auto_lock(deserializers_lock_); 420 base::AutoLock auto_lock(deserializers_lock_);
421 reject_new_deserializers_ = true;
384 PendingSyncMessageQueue::iterator iter; 422 PendingSyncMessageQueue::iterator iter;
385 DVLOG(1) << "Canceling pending sends"; 423 DVLOG(1) << "Canceling pending sends";
386 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { 424 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
387 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 425 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
388 "SyncChannel::SyncContext::CancelPendingSends", 426 "SyncChannel::SyncContext::CancelPendingSends",
389 iter->done_event); 427 iter->done_event);
390 iter->done_event->Signal(); 428 iter->done_event->Signal();
391 } 429 }
392 } 430 }
393 431
394 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { 432 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
395 if (event == shutdown_event_) { 433 DCHECK_EQ(event, shutdown_event_);
396 // Process shut down before we can get a reply to a synchronous message.
397 // Cancel pending Send calls, which will end up setting the send done event.
398 CancelPendingSends();
399 } else {
400 // We got the reply, timed out or the process shutdown.
401 DCHECK_EQ(GetSendDoneEvent(), event);
402 base::MessageLoop::current()->QuitNow();
403 }
404 }
405 434
406 base::WaitableEventWatcher::EventCallback 435 // Process shut down before we can get a reply to a synchronous message.
407 SyncChannel::SyncContext::MakeWaitableEventCallback() { 436 // Cancel pending Send calls, which will end up setting the send done event.
408 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); 437 CancelPendingSends();
409 } 438 }
410 439
411 // static 440 // static
412 std::unique_ptr<SyncChannel> SyncChannel::Create( 441 std::unique_ptr<SyncChannel> SyncChannel::Create(
413 const IPC::ChannelHandle& channel_handle, 442 const IPC::ChannelHandle& channel_handle,
414 Channel::Mode mode, 443 Channel::Mode mode,
415 Listener* listener, 444 Listener* listener,
416 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 445 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
417 bool create_pipe_now, 446 bool create_pipe_now,
418 base::WaitableEvent* shutdown_event) { 447 base::WaitableEvent* shutdown_event) {
(...skipping 22 matching lines...) Expand all
441 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 470 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
442 WaitableEvent* shutdown_event) { 471 WaitableEvent* shutdown_event) {
443 return base::WrapUnique( 472 return base::WrapUnique(
444 new SyncChannel(listener, ipc_task_runner, shutdown_event)); 473 new SyncChannel(listener, ipc_task_runner, shutdown_event));
445 } 474 }
446 475
447 SyncChannel::SyncChannel( 476 SyncChannel::SyncChannel(
448 Listener* listener, 477 Listener* listener,
449 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 478 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
450 WaitableEvent* shutdown_event) 479 WaitableEvent* shutdown_event)
451 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { 480 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
481 sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
452 // The current (listener) thread must be distinct from the IPC thread, or else 482 // The current (listener) thread must be distinct from the IPC thread, or else
453 // sending synchronous messages will deadlock. 483 // sending synchronous messages will deadlock.
454 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); 484 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
455 StartWatching(); 485 StartWatching();
456 } 486 }
457 487
458 SyncChannel::~SyncChannel() { 488 SyncChannel::~SyncChannel() {
459 } 489 }
460 490
461 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 491 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
(...skipping 18 matching lines...) Expand all
480 #else 510 #else
481 TRACE_EVENT2("ipc", "SyncChannel::Send", 511 TRACE_EVENT2("ipc", "SyncChannel::Send",
482 "class", IPC_MESSAGE_ID_CLASS(message->type()), 512 "class", IPC_MESSAGE_ID_CLASS(message->type()),
483 "line", IPC_MESSAGE_ID_LINE(message->type())); 513 "line", IPC_MESSAGE_ID_LINE(message->type()));
484 #endif 514 #endif
485 if (!message->is_sync()) { 515 if (!message->is_sync()) {
486 ChannelProxy::Send(message); 516 ChannelProxy::Send(message);
487 return true; 517 return true;
488 } 518 }
489 519
520 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
521 bool pump_messages = sync_msg->ShouldPumpMessages();
522
490 // *this* might get deleted in WaitForReply. 523 // *this* might get deleted in WaitForReply.
491 scoped_refptr<SyncContext> context(sync_context()); 524 scoped_refptr<SyncContext> context(sync_context());
492 if (context->shutdown_event()->IsSignaled()) { 525 if (!context->Push(sync_msg)) {
493 DVLOG(1) << "shutdown event is signaled"; 526 DVLOG(1) << "Channel is shutting down. Dropping sync message.";
494 delete message; 527 delete message;
495 return false; 528 return false;
496 } 529 }
497 530
498 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
499 context->Push(sync_msg);
500 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
501
502 ChannelProxy::Send(message); 531 ChannelProxy::Send(message);
503 532
504 // Wait for reply, or for any other incoming synchronous messages. 533 // Wait for reply, or for any other incoming synchronous messages.
505 // *this* might get deleted, so only call static functions at this point. 534 // *this* might get deleted, so only call static functions at this point.
506 WaitForReply(context.get(), pump_messages_event); 535 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_;
536 WaitForReply(registry.get(), context.get(), pump_messages);
507 537
508 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 538 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
509 "SyncChannel::Send", context->GetSendDoneEvent()); 539 "SyncChannel::Send", context->GetSendDoneEvent());
510 540
511 return context->Pop(); 541 return context->Pop();
512 } 542 }
513 543
514 void SyncChannel::WaitForReply( 544 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry,
515 SyncContext* context, WaitableEvent* pump_messages_event) { 545 SyncContext* context,
546 bool pump_messages) {
516 context->DispatchMessages(); 547 context->DispatchMessages();
548
549 const MojoEvent* pump_messages_event = nullptr;
550 if (pump_messages)
551 pump_messages_event = g_pump_messages_event.Get().event();
552
517 while (true) { 553 while (true) {
518 WaitableEvent* objects[] = { 554 bool dispatch = false;
519 context->GetDispatchEvent(), 555 bool send_done = false;
520 context->GetSendDoneEvent(), 556 bool should_pump_messages = false;
521 pump_messages_event 557 bool error = false;
522 }; 558 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(),
559 MOJO_HANDLE_SIGNAL_READABLE,
560 base::Bind(&OnSyncHandleReady, &dispatch, &error));
561 registry->RegisterHandle(
562 context->GetSendDoneEvent()->GetHandle(),
563 MOJO_HANDLE_SIGNAL_READABLE,
564 base::Bind(&OnSyncHandleReady, &send_done, &error));
565 if (pump_messages_event) {
566 registry->RegisterHandle(
567 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
568 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
569 }
523 570
524 unsigned count = pump_messages_event ? 3: 2; 571 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
525 size_t result = WaitableEvent::WaitMany(objects, count); 572 registry->WatchAllHandles(stop_flags, 3);
526 if (result == 0 /* dispatch event */) { 573 DCHECK(!error);
574
575 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle());
576 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
577 if (pump_messages_event)
578 registry->UnregisterHandle(pump_messages_event->GetHandle());
579
580 if (dispatch) {
527 // We're waiting for a reply, but we received a blocking synchronous 581 // We're waiting for a reply, but we received a blocking synchronous
528 // call. We must process it or otherwise a deadlock might occur. 582 // call. We must process it or otherwise a deadlock might occur.
529 context->GetDispatchEvent()->Reset(); 583 context->GetDispatchEvent()->Reset();
530 context->DispatchMessages(); 584 context->DispatchMessages();
531 continue; 585 continue;
532 } 586 }
533 587
534 if (result == 2 /* pump_messages_event */) 588 if (should_pump_messages)
535 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 589 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
536 590
537 break; 591 break;
538 } 592 }
539 } 593 }
540 594
541 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 595 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
542 base::WaitableEventWatcher send_done_watcher; 596 mojo::Watcher send_done_watcher;
543 597
544 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 598 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
545 DCHECK(sync_msg_queue != NULL); 599 DCHECK_NE(sync_msg_queue, nullptr);
546 600
547 base::WaitableEventWatcher* old_send_done_event_watcher = 601 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher();
548 sync_msg_queue->top_send_done_watcher(); 602 mojo::Handle old_handle(mojo::kInvalidHandleValue);
603 mojo::Watcher::ReadyCallback old_callback;
549 604
550 base::WaitableEventWatcher::EventCallback old_callback; 605 // Maintain a thread-local stack of watchers to ensure nested calls complete
551 base::WaitableEvent* old_event = NULL; 606 // in the correct sequence, i.e. the outermost call completes first, etc.
552 607 if (old_watcher) {
553 // Maintain a local global stack of send done delegates to ensure that 608 old_callback = old_watcher->ready_callback();
554 // nested sync calls complete in the correct sequence, i.e. the 609 old_handle = old_watcher->handle();
555 // outermost call completes first, etc. 610 old_watcher->Cancel();
556 if (old_send_done_event_watcher) {
557 old_callback = old_send_done_event_watcher->callback();
558 old_event = old_send_done_event_watcher->GetWatchedEvent();
559 old_send_done_event_watcher->StopWatching();
560 } 611 }
561 612
562 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); 613 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
563 614
564 send_done_watcher.StartWatching(context->GetSendDoneEvent(), 615 {
565 context->MakeWaitableEventCallback()); 616 base::RunLoop nested_loop;
617 send_done_watcher.Start(
618 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
619 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
566 620
567 {
568 base::MessageLoop::ScopedNestableTaskAllower allow( 621 base::MessageLoop::ScopedNestableTaskAllower allow(
569 base::MessageLoop::current()); 622 base::MessageLoop::current());
570 base::RunLoop().Run(); 623 nested_loop.Run();
624 send_done_watcher.Cancel();
571 } 625 }
572 626
573 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); 627 sync_msg_queue->set_top_send_done_watcher(old_watcher);
574 if (old_send_done_event_watcher && old_event) { 628 if (old_watcher)
575 old_send_done_event_watcher->StartWatching(old_event, old_callback); 629 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback);
576 }
577 } 630 }
578 631
579 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { 632 void SyncChannel::OnDispatchHandleReady(MojoResult result) {
580 DCHECK(event == sync_context()->GetDispatchEvent()); 633 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
581 // The call to DispatchMessages might delete this object, so reregister 634 if (result == MOJO_RESULT_OK) {
582 // the object watcher first. 635 sync_context()->GetDispatchEvent()->Reset();
583 event->Reset(); 636 sync_context()->DispatchMessages();
584 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); 637 }
585 sync_context()->DispatchMessages();
586 } 638 }
587 639
588 void SyncChannel::StartWatching() { 640 void SyncChannel::StartWatching() {
589 // Ideally we only want to watch this object when running a nested message 641 // Ideally we only want to watch this object when running a nested message
590 // loop. However, we don't know when it exits if there's another nested 642 // loop. However, we don't know when it exits if there's another nested
591 // message loop running under it or not, so we wouldn't know whether to 643 // message loop running under it or not, so we wouldn't know whether to
592 // stop or keep watching. So we always watch it, and create the event as 644 // stop or keep watching. So we always watch it.
593 // manual reset since the object watcher might otherwise reset the event 645 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(),
594 // when we're doing a WaitMany. 646 MOJO_HANDLE_SIGNAL_READABLE,
595 dispatch_watcher_callback_ = 647 base::Bind(&SyncChannel::OnDispatchHandleReady,
596 base::Bind(&SyncChannel::OnWaitableEventSignaled, 648 base::Unretained(this)));
597 base::Unretained(this));
598 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
599 dispatch_watcher_callback_);
600 } 649 }
601 650
602 void SyncChannel::OnChannelInit() { 651 void SyncChannel::OnChannelInit() {
603 for (const auto& filter : pre_init_sync_message_filters_) { 652 for (const auto& filter : pre_init_sync_message_filters_) {
604 filter->set_is_channel_send_thread_safe( 653 filter->set_is_channel_send_thread_safe(
605 context()->IsChannelSendThreadSafe()); 654 context()->IsChannelSendThreadSafe());
606 } 655 }
607 pre_init_sync_message_filters_.clear(); 656 pre_init_sync_message_filters_.clear();
608 } 657 }
609 658
610 } // namespace IPC 659 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698