Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(203)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 2097103002: Revert Mojo-based SyncChannel waiting again (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/bind.h" 12 #include "base/bind.h"
13 #include "base/lazy_instance.h" 13 #include "base/lazy_instance.h"
14 #include "base/location.h" 14 #include "base/location.h"
15 #include "base/logging.h" 15 #include "base/logging.h"
16 #include "base/macros.h"
17 #include "base/memory/ptr_util.h" 16 #include "base/memory/ptr_util.h"
18 #include "base/run_loop.h"
19 #include "base/synchronization/waitable_event.h" 17 #include "base/synchronization/waitable_event.h"
18 #include "base/synchronization/waitable_event_watcher.h"
20 #include "base/threading/thread_local.h" 19 #include "base/threading/thread_local.h"
21 #include "base/threading/thread_task_runner_handle.h" 20 #include "base/threading/thread_task_runner_handle.h"
22 #include "base/trace_event/trace_event.h" 21 #include "base/trace_event/trace_event.h"
23 #include "ipc/ipc_channel_factory.h" 22 #include "ipc/ipc_channel_factory.h"
24 #include "ipc/ipc_logging.h" 23 #include "ipc/ipc_logging.h"
25 #include "ipc/ipc_message_macros.h" 24 #include "ipc/ipc_message_macros.h"
26 #include "ipc/ipc_sync_message.h" 25 #include "ipc/ipc_sync_message.h"
27 #include "ipc/mojo_event.h"
28 #include "mojo/public/cpp/bindings/sync_handle_registry.h"
29 26
27 using base::TimeDelta;
28 using base::TimeTicks;
30 using base::WaitableEvent; 29 using base::WaitableEvent;
31 30
32 namespace IPC { 31 namespace IPC {
33
34 namespace {
35
36 // A generic callback used when watching handles synchronously. Sets |*signal|
37 // to true. Also sets |*error| to true in case of an error.
38 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) {
39 *signal = true;
40 *error = result != MOJO_RESULT_OK;
41 }
42
43 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but
44 // is only used in cases where failure should be impossible) and runs
45 // |callback|.
46 void RunOnHandleReady(const base::Closure& callback, MojoResult result) {
47 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED);
48 if (result == MOJO_RESULT_OK)
49 callback.Run();
50 }
51
52 class PumpMessagesEvent {
53 public:
54 PumpMessagesEvent() { event_.Signal(); }
55 ~PumpMessagesEvent() {}
56
57 const MojoEvent* event() const { return &event_; }
58
59 private:
60 MojoEvent event_;
61
62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent);
63 };
64
65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event =
66 LAZY_INSTANCE_INITIALIZER;
67
68 } // namespace
69
70 // When we're blocked in a Send(), we need to process incoming synchronous 32 // When we're blocked in a Send(), we need to process incoming synchronous
71 // messages right away because it could be blocking our reply (either 33 // messages right away because it could be blocking our reply (either
72 // directly from the same object we're calling, or indirectly through one or 34 // directly from the same object we're calling, or indirectly through one or
73 // more other channels). That means that in SyncContext's OnMessageReceived, 35 // more other channels). That means that in SyncContext's OnMessageReceived,
74 // we need to process sync message right away if we're blocked. However a 36 // we need to process sync message right away if we're blocked. However a
75 // simple check isn't sufficient, because the listener thread can be in the 37 // simple check isn't sufficient, because the listener thread can be in the
76 // process of calling Send. 38 // process of calling Send.
77 // To work around this, when SyncChannel filters a sync message, it sets 39 // To work around this, when SyncChannel filters a sync message, it sets
78 // an event that the listener thread waits on during its Send() call. This 40 // an event that the listener thread waits on during its Send() call. This
79 // allows us to dispatch incoming sync messages when blocked. The race 41 // allows us to dispatch incoming sync messages when blocked. The race
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
186 iter++; 148 iter++;
187 } 149 }
188 } 150 }
189 151
190 if (--listener_count_ == 0) { 152 if (--listener_count_ == 0) {
191 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 153 DCHECK(lazy_tls_ptr_.Pointer()->Get());
192 lazy_tls_ptr_.Pointer()->Set(NULL); 154 lazy_tls_ptr_.Pointer()->Set(NULL);
193 } 155 }
194 } 156 }
195 157
196 MojoEvent* dispatch_event() { return &dispatch_event_; } 158 WaitableEvent* dispatch_event() { return &dispatch_event_; }
197 base::SingleThreadTaskRunner* listener_task_runner() { 159 base::SingleThreadTaskRunner* listener_task_runner() {
198 return listener_task_runner_.get(); 160 return listener_task_runner_.get();
199 } 161 }
200 162
201 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 163 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
202 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 164 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
203 lazy_tls_ptr_; 165 lazy_tls_ptr_;
204 166
205 // Called on the ipc thread to check if we can unblock any current Send() 167 // Called on the ipc thread to check if we can unblock any current Send()
206 // calls based on a queued reply. 168 // calls based on a queued reply.
207 void DispatchReplies() { 169 void DispatchReplies() {
208 for (size_t i = 0; i < received_replies_.size(); ++i) { 170 for (size_t i = 0; i < received_replies_.size(); ++i) {
209 Message* message = received_replies_[i].message; 171 Message* message = received_replies_[i].message;
210 if (received_replies_[i].context->TryToUnblockListener(message)) { 172 if (received_replies_[i].context->TryToUnblockListener(message)) {
211 delete message; 173 delete message;
212 received_replies_.erase(received_replies_.begin() + i); 174 received_replies_.erase(received_replies_.begin() + i);
213 return; 175 return;
214 } 176 }
215 } 177 }
216 } 178 }
217 179
218 mojo::Watcher* top_send_done_watcher() { 180 base::WaitableEventWatcher* top_send_done_watcher() {
219 return top_send_done_watcher_; 181 return top_send_done_watcher_;
220 } 182 }
221 183
222 void set_top_send_done_watcher(mojo::Watcher* watcher) { 184 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) {
223 top_send_done_watcher_ = watcher; 185 top_send_done_watcher_ = watcher;
224 } 186 }
225 187
226 private: 188 private:
227 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 189 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
228 190
229 // See the comment in SyncChannel::SyncChannel for why this event is created 191 // See the comment in SyncChannel::SyncChannel for why this event is created
230 // as manual reset. 192 // as manual reset.
231 ReceivedSyncMsgQueue() 193 ReceivedSyncMsgQueue()
232 : message_queue_version_(0), 194 : message_queue_version_(0),
195 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
196 base::WaitableEvent::InitialState::NOT_SIGNALED),
233 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), 197 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
234 task_pending_(false), 198 task_pending_(false),
235 listener_count_(0), 199 listener_count_(0),
236 top_send_done_watcher_(NULL) {} 200 top_send_done_watcher_(NULL) {}
237 201
238 ~ReceivedSyncMsgQueue() {} 202 ~ReceivedSyncMsgQueue() {}
239 203
240 // Holds information about a queued synchronous message or reply. 204 // Holds information about a queued synchronous message or reply.
241 struct QueuedMessage { 205 struct QueuedMessage {
242 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 206 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
243 Message* message; 207 Message* message;
244 scoped_refptr<SyncChannel::SyncContext> context; 208 scoped_refptr<SyncChannel::SyncContext> context;
245 }; 209 };
246 210
247 typedef std::list<QueuedMessage> SyncMessageQueue; 211 typedef std::list<QueuedMessage> SyncMessageQueue;
248 SyncMessageQueue message_queue_; 212 SyncMessageQueue message_queue_;
249 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan 213 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan
250 214
251 std::vector<QueuedMessage> received_replies_; 215 std::vector<QueuedMessage> received_replies_;
252 216
253 // Signaled when we get a synchronous message that we must respond to, as the 217 // Set when we got a synchronous message that we must respond to as the
254 // sender needs its reply before it can reply to our original synchronous 218 // sender needs its reply before it can reply to our original synchronous
255 // message. 219 // message.
256 MojoEvent dispatch_event_; 220 WaitableEvent dispatch_event_;
257 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; 221 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
258 base::Lock message_lock_; 222 base::Lock message_lock_;
259 bool task_pending_; 223 bool task_pending_;
260 int listener_count_; 224 int listener_count_;
261 225
262 // The current send done handle watcher for this thread. Used to maintain 226 // The current send done event watcher for this thread. Used to maintain
263 // a thread-local stack of send done watchers to ensure that nested sync 227 // a local global stack of send done watchers to ensure that nested sync
264 // message loops complete correctly. 228 // message loops complete correctly.
265 mojo::Watcher* top_send_done_watcher_; 229 base::WaitableEventWatcher* top_send_done_watcher_;
266 }; 230 };
267 231
268 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 232 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
269 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 233 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
270 LAZY_INSTANCE_INITIALIZER; 234 LAZY_INSTANCE_INITIALIZER;
271 235
272 SyncChannel::SyncContext::SyncContext( 236 SyncChannel::SyncContext::SyncContext(
273 Listener* listener, 237 Listener* listener,
274 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 238 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
275 WaitableEvent* shutdown_event) 239 WaitableEvent* shutdown_event)
276 : ChannelProxy::Context(listener, ipc_task_runner), 240 : ChannelProxy::Context(listener, ipc_task_runner),
277 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 241 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
278 shutdown_event_(shutdown_event), 242 shutdown_event_(shutdown_event),
279 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 243 restrict_dispatch_group_(kRestrictDispatchGroup_None) {
280 } 244 }
281 245
282 SyncChannel::SyncContext::~SyncContext() { 246 SyncChannel::SyncContext::~SyncContext() {
283 while (!deserializers_.empty()) 247 while (!deserializers_.empty())
284 Pop(); 248 Pop();
285 } 249 }
286 250
287 // Adds information about an outgoing sync message to the context so that 251 // Adds information about an outgoing sync message to the context so that
288 // we know how to deserialize the reply. Returns |true| if the message was added 252 // we know how to deserialize the reply. Returns a handle that's set when
289 // to the context or |false| if it was rejected (e.g. due to shutdown.) 253 // the reply has arrived.
290 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 254 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
291 // Create the tracking information for this message. This object is stored 255 // Create the tracking information for this message. This object is stored
292 // by value since all members are pointers that are cheap to copy. These 256 // by value since all members are pointers that are cheap to copy. These
293 // pointers are cleaned up in the Pop() function. 257 // pointers are cleaned up in the Pop() function.
294 // 258 //
295 // The event is created as manual reset because in between Signal and 259 // The event is created as manual reset because in between Signal and
296 // OnObjectSignalled, another Send can happen which would stop the watcher 260 // OnObjectSignalled, another Send can happen which would stop the watcher
297 // from being called. The event would get watched later, when the nested 261 // from being called. The event would get watched later, when the nested
298 // Send completes, so the event will need to remain set. 262 // Send completes, so the event will need to remain set.
299 base::AutoLock auto_lock(deserializers_lock_);
300 if (reject_new_deserializers_)
301 return false;
302 PendingSyncMsg pending( 263 PendingSyncMsg pending(
303 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), 264 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(),
304 new MojoEvent); 265 new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL,
266 base::WaitableEvent::InitialState::NOT_SIGNALED));
267 base::AutoLock auto_lock(deserializers_lock_);
305 deserializers_.push_back(pending); 268 deserializers_.push_back(pending);
306 return true;
307 } 269 }
308 270
309 bool SyncChannel::SyncContext::Pop() { 271 bool SyncChannel::SyncContext::Pop() {
310 bool result; 272 bool result;
311 { 273 {
312 base::AutoLock auto_lock(deserializers_lock_); 274 base::AutoLock auto_lock(deserializers_lock_);
313 PendingSyncMsg msg = deserializers_.back(); 275 PendingSyncMsg msg = deserializers_.back();
314 delete msg.deserializer; 276 delete msg.deserializer;
315 delete msg.done_event; 277 delete msg.done_event;
316 msg.done_event = nullptr; 278 msg.done_event = NULL;
317 deserializers_.pop_back(); 279 deserializers_.pop_back();
318 result = msg.send_result; 280 result = msg.send_result;
319 } 281 }
320 282
321 // We got a reply to a synchronous Send() call that's blocking the listener 283 // We got a reply to a synchronous Send() call that's blocking the listener
322 // thread. However, further down the call stack there could be another 284 // thread. However, further down the call stack there could be another
323 // blocking Send() call, whose reply we received after we made this last 285 // blocking Send() call, whose reply we received after we made this last
324 // Send() call. So check if we have any queued replies available that 286 // Send() call. So check if we have any queued replies available that
325 // can now unblock the listener thread. 287 // can now unblock the listener thread.
326 ipc_task_runner()->PostTask( 288 ipc_task_runner()->PostTask(
327 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 289 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
328 received_sync_msgs_.get())); 290 received_sync_msgs_.get()));
329 291
330 return result; 292 return result;
331 } 293 }
332 294
333 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 295 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
334 base::AutoLock auto_lock(deserializers_lock_); 296 base::AutoLock auto_lock(deserializers_lock_);
335 return deserializers_.back().done_event; 297 return deserializers_.back().done_event;
336 } 298 }
337 299
338 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { 300 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
339 return received_sync_msgs_->dispatch_event(); 301 return received_sync_msgs_->dispatch_event();
340 } 302 }
341 303
342 void SyncChannel::SyncContext::DispatchMessages() { 304 void SyncChannel::SyncContext::DispatchMessages() {
343 received_sync_msgs_->DispatchMessages(this); 305 received_sync_msgs_->DispatchMessages(this);
344 } 306 }
345 307
346 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 308 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
347 base::AutoLock auto_lock(deserializers_lock_); 309 base::AutoLock auto_lock(deserializers_lock_);
348 if (deserializers_.empty() || 310 if (deserializers_.empty() ||
349 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 311 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
350 return false; 312 return false;
351 } 313 }
352 314
353 if (!msg->is_reply_error()) { 315 if (!msg->is_reply_error()) {
354 bool send_result = deserializers_.back().deserializer-> 316 bool send_result = deserializers_.back().deserializer->
355 SerializeOutputParameters(*msg); 317 SerializeOutputParameters(*msg);
356 deserializers_.back().send_result = send_result; 318 deserializers_.back().send_result = send_result;
357 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; 319 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message";
358 } else { 320 } else {
359 DVLOG(1) << "Received error reply"; 321 DVLOG(1) << "Received error reply";
360 } 322 }
361 323
362 MojoEvent* done_event = deserializers_.back().done_event; 324 base::WaitableEvent* done_event = deserializers_.back().done_event;
363 TRACE_EVENT_FLOW_BEGIN0( 325 TRACE_EVENT_FLOW_BEGIN0(
364 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 326 TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
365 "SyncChannel::SyncContext::TryToUnblockListener", done_event); 327 "SyncChannel::SyncContext::TryToUnblockListener", done_event);
366 328
367 done_event->Signal(); 329 done_event->Signal();
368 330
369 return true; 331 return true;
370 } 332 }
371 333
372 void SyncChannel::SyncContext::Clear() { 334 void SyncChannel::SyncContext::Clear() {
(...skipping 25 matching lines...) Expand all
398 360
399 void SyncChannel::SyncContext::OnChannelError() { 361 void SyncChannel::SyncContext::OnChannelError() {
400 CancelPendingSends(); 362 CancelPendingSends();
401 shutdown_watcher_.StopWatching(); 363 shutdown_watcher_.StopWatching();
402 Context::OnChannelError(); 364 Context::OnChannelError();
403 } 365 }
404 366
405 void SyncChannel::SyncContext::OnChannelOpened() { 367 void SyncChannel::SyncContext::OnChannelOpened() {
406 shutdown_watcher_.StartWatching( 368 shutdown_watcher_.StartWatching(
407 shutdown_event_, 369 shutdown_event_,
408 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled, 370 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled,
409 base::Unretained(this))); 371 base::Unretained(this)));
410 Context::OnChannelOpened(); 372 Context::OnChannelOpened();
411 } 373 }
412 374
413 void SyncChannel::SyncContext::OnChannelClosed() { 375 void SyncChannel::SyncContext::OnChannelClosed() {
414 CancelPendingSends(); 376 CancelPendingSends();
415 shutdown_watcher_.StopWatching(); 377 shutdown_watcher_.StopWatching();
416 Context::OnChannelClosed(); 378 Context::OnChannelClosed();
417 } 379 }
418 380
419 void SyncChannel::SyncContext::CancelPendingSends() { 381 void SyncChannel::SyncContext::CancelPendingSends() {
420 base::AutoLock auto_lock(deserializers_lock_); 382 base::AutoLock auto_lock(deserializers_lock_);
421 reject_new_deserializers_ = true;
422 PendingSyncMessageQueue::iterator iter; 383 PendingSyncMessageQueue::iterator iter;
423 DVLOG(1) << "Canceling pending sends"; 384 DVLOG(1) << "Canceling pending sends";
424 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { 385 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
425 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 386 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
426 "SyncChannel::SyncContext::CancelPendingSends", 387 "SyncChannel::SyncContext::CancelPendingSends",
427 iter->done_event); 388 iter->done_event);
428 iter->done_event->Signal(); 389 iter->done_event->Signal();
429 } 390 }
430 } 391 }
431 392
432 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) { 393 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
433 DCHECK_EQ(event, shutdown_event_); 394 if (event == shutdown_event_) {
395 // Process shut down before we can get a reply to a synchronous message.
396 // Cancel pending Send calls, which will end up setting the send done event.
397 CancelPendingSends();
398 } else {
399 // We got the reply, timed out or the process shutdown.
400 DCHECK_EQ(GetSendDoneEvent(), event);
401 base::MessageLoop::current()->QuitNow();
402 }
403 }
434 404
435 // Process shut down before we can get a reply to a synchronous message. 405 base::WaitableEventWatcher::EventCallback
436 // Cancel pending Send calls, which will end up setting the send done event. 406 SyncChannel::SyncContext::MakeWaitableEventCallback() {
437 CancelPendingSends(); 407 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this);
438 } 408 }
439 409
440 // static 410 // static
441 std::unique_ptr<SyncChannel> SyncChannel::Create( 411 std::unique_ptr<SyncChannel> SyncChannel::Create(
442 const IPC::ChannelHandle& channel_handle, 412 const IPC::ChannelHandle& channel_handle,
443 Channel::Mode mode, 413 Channel::Mode mode,
444 Listener* listener, 414 Listener* listener,
445 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 415 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
446 bool create_pipe_now, 416 bool create_pipe_now,
447 base::WaitableEvent* shutdown_event) { 417 base::WaitableEvent* shutdown_event) {
(...skipping 22 matching lines...) Expand all
470 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 440 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
471 WaitableEvent* shutdown_event) { 441 WaitableEvent* shutdown_event) {
472 return base::WrapUnique( 442 return base::WrapUnique(
473 new SyncChannel(listener, ipc_task_runner, shutdown_event)); 443 new SyncChannel(listener, ipc_task_runner, shutdown_event));
474 } 444 }
475 445
476 SyncChannel::SyncChannel( 446 SyncChannel::SyncChannel(
477 Listener* listener, 447 Listener* listener,
478 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, 448 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
479 WaitableEvent* shutdown_event) 449 WaitableEvent* shutdown_event)
480 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), 450 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) {
481 sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
482 // The current (listener) thread must be distinct from the IPC thread, or else 451 // The current (listener) thread must be distinct from the IPC thread, or else
483 // sending synchronous messages will deadlock. 452 // sending synchronous messages will deadlock.
484 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); 453 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get());
485 StartWatching(); 454 StartWatching();
486 } 455 }
487 456
488 SyncChannel::~SyncChannel() { 457 SyncChannel::~SyncChannel() {
489 } 458 }
490 459
491 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 460 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
(...skipping 18 matching lines...) Expand all
510 #else 479 #else
511 TRACE_EVENT2("ipc", "SyncChannel::Send", 480 TRACE_EVENT2("ipc", "SyncChannel::Send",
512 "class", IPC_MESSAGE_ID_CLASS(message->type()), 481 "class", IPC_MESSAGE_ID_CLASS(message->type()),
513 "line", IPC_MESSAGE_ID_LINE(message->type())); 482 "line", IPC_MESSAGE_ID_LINE(message->type()));
514 #endif 483 #endif
515 if (!message->is_sync()) { 484 if (!message->is_sync()) {
516 ChannelProxy::Send(message); 485 ChannelProxy::Send(message);
517 return true; 486 return true;
518 } 487 }
519 488
520 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
521 bool pump_messages = sync_msg->ShouldPumpMessages();
522
523 // *this* might get deleted in WaitForReply. 489 // *this* might get deleted in WaitForReply.
524 scoped_refptr<SyncContext> context(sync_context()); 490 scoped_refptr<SyncContext> context(sync_context());
525 if (!context->Push(sync_msg)) { 491 if (context->shutdown_event()->IsSignaled()) {
526 DVLOG(1) << "Channel is shutting down. Dropping sync message."; 492 DVLOG(1) << "shutdown event is signaled";
527 delete message; 493 delete message;
528 return false; 494 return false;
529 } 495 }
530 496
497 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
498 context->Push(sync_msg);
499 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
500
531 ChannelProxy::Send(message); 501 ChannelProxy::Send(message);
532 502
533 // Wait for reply, or for any other incoming synchronous messages. 503 // Wait for reply, or for any other incoming synchronous messages.
534 // *this* might get deleted, so only call static functions at this point. 504 // *this* might get deleted, so only call static functions at this point.
535 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; 505 WaitForReply(context.get(), pump_messages_event);
536 WaitForReply(registry.get(), context.get(), pump_messages);
537 506
538 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), 507 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
539 "SyncChannel::Send", context->GetSendDoneEvent()); 508 "SyncChannel::Send", context->GetSendDoneEvent());
540 509
541 return context->Pop(); 510 return context->Pop();
542 } 511 }
543 512
544 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, 513 void SyncChannel::WaitForReply(
545 SyncContext* context, 514 SyncContext* context, WaitableEvent* pump_messages_event) {
546 bool pump_messages) {
547 context->DispatchMessages(); 515 context->DispatchMessages();
516 while (true) {
517 WaitableEvent* objects[] = {
518 context->GetDispatchEvent(),
519 context->GetSendDoneEvent(),
520 pump_messages_event
521 };
548 522
549 const MojoEvent* pump_messages_event = nullptr; 523 unsigned count = pump_messages_event ? 3: 2;
550 if (pump_messages) 524 size_t result = WaitableEvent::WaitMany(objects, count);
551 pump_messages_event = g_pump_messages_event.Get().event(); 525 if (result == 0 /* dispatch event */) {
552
553 while (true) {
554 bool dispatch = false;
555 bool send_done = false;
556 bool should_pump_messages = false;
557 bool error = false;
558 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(),
559 MOJO_HANDLE_SIGNAL_READABLE,
560 base::Bind(&OnSyncHandleReady, &dispatch, &error));
561 registry->RegisterHandle(
562 context->GetSendDoneEvent()->GetHandle(),
563 MOJO_HANDLE_SIGNAL_READABLE,
564 base::Bind(&OnSyncHandleReady, &send_done, &error));
565 if (pump_messages_event) {
566 registry->RegisterHandle(
567 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
568 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error));
569 }
570
571 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages };
572 registry->WatchAllHandles(stop_flags, 3);
573 DCHECK(!error);
574
575 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle());
576 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle());
577 if (pump_messages_event)
578 registry->UnregisterHandle(pump_messages_event->GetHandle());
579
580 if (dispatch) {
581 // We're waiting for a reply, but we received a blocking synchronous 526 // We're waiting for a reply, but we received a blocking synchronous
582 // call. We must process it or otherwise a deadlock might occur. 527 // call. We must process it or otherwise a deadlock might occur.
583 context->GetDispatchEvent()->Reset(); 528 context->GetDispatchEvent()->Reset();
584 context->DispatchMessages(); 529 context->DispatchMessages();
585 continue; 530 continue;
586 } 531 }
587 532
588 if (should_pump_messages) 533 if (result == 2 /* pump_messages_event */)
589 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. 534 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop.
590 535
591 break; 536 break;
592 } 537 }
593 } 538 }
594 539
595 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { 540 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) {
596 mojo::Watcher send_done_watcher; 541 base::WaitableEventWatcher send_done_watcher;
597 542
598 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); 543 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs();
599 DCHECK_NE(sync_msg_queue, nullptr); 544 DCHECK(sync_msg_queue != NULL);
600 545
601 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); 546 base::WaitableEventWatcher* old_send_done_event_watcher =
602 mojo::Handle old_handle(mojo::kInvalidHandleValue); 547 sync_msg_queue->top_send_done_watcher();
603 mojo::Watcher::ReadyCallback old_callback;
604 548
605 // Maintain a thread-local stack of watchers to ensure nested calls complete 549 base::WaitableEventWatcher::EventCallback old_callback;
606 // in the correct sequence, i.e. the outermost call completes first, etc. 550 base::WaitableEvent* old_event = NULL;
607 if (old_watcher) { 551
608 old_callback = old_watcher->ready_callback(); 552 // Maintain a local global stack of send done delegates to ensure that
609 old_handle = old_watcher->handle(); 553 // nested sync calls complete in the correct sequence, i.e. the
610 old_watcher->Cancel(); 554 // outermost call completes first, etc.
555 if (old_send_done_event_watcher) {
556 old_callback = old_send_done_event_watcher->callback();
557 old_event = old_send_done_event_watcher->GetWatchedEvent();
558 old_send_done_event_watcher->StopWatching();
611 } 559 }
612 560
613 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); 561 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher);
614 562
563 send_done_watcher.StartWatching(context->GetSendDoneEvent(),
564 context->MakeWaitableEventCallback());
565
615 { 566 {
616 base::RunLoop nested_loop;
617 send_done_watcher.Start(
618 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE,
619 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure()));
620
621 base::MessageLoop::ScopedNestableTaskAllower allow( 567 base::MessageLoop::ScopedNestableTaskAllower allow(
622 base::MessageLoop::current()); 568 base::MessageLoop::current());
623 nested_loop.Run(); 569 base::MessageLoop::current()->Run();
624 send_done_watcher.Cancel();
625 } 570 }
626 571
627 sync_msg_queue->set_top_send_done_watcher(old_watcher); 572 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher);
628 if (old_watcher) 573 if (old_send_done_event_watcher && old_event) {
629 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); 574 old_send_done_event_watcher->StartWatching(old_event, old_callback);
575 }
630 } 576 }
631 577
632 void SyncChannel::OnDispatchHandleReady(MojoResult result) { 578 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
633 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); 579 DCHECK(event == sync_context()->GetDispatchEvent());
634 if (result == MOJO_RESULT_OK) { 580 // The call to DispatchMessages might delete this object, so reregister
635 sync_context()->GetDispatchEvent()->Reset(); 581 // the object watcher first.
636 sync_context()->DispatchMessages(); 582 event->Reset();
637 } 583 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_);
584 sync_context()->DispatchMessages();
638 } 585 }
639 586
640 void SyncChannel::StartWatching() { 587 void SyncChannel::StartWatching() {
641 // Ideally we only want to watch this object when running a nested message 588 // Ideally we only want to watch this object when running a nested message
642 // loop. However, we don't know when it exits if there's another nested 589 // loop. However, we don't know when it exits if there's another nested
643 // message loop running under it or not, so we wouldn't know whether to 590 // message loop running under it or not, so we wouldn't know whether to
644 // stop or keep watching. So we always watch it. 591 // stop or keep watching. So we always watch it, and create the event as
645 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), 592 // manual reset since the object watcher might otherwise reset the event
646 MOJO_HANDLE_SIGNAL_READABLE, 593 // when we're doing a WaitMany.
647 base::Bind(&SyncChannel::OnDispatchHandleReady, 594 dispatch_watcher_callback_ =
648 base::Unretained(this))); 595 base::Bind(&SyncChannel::OnWaitableEventSignaled,
596 base::Unretained(this));
597 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(),
598 dispatch_watcher_callback_);
649 } 599 }
650 600
651 void SyncChannel::OnChannelInit() { 601 void SyncChannel::OnChannelInit() {
652 for (const auto& filter : pre_init_sync_message_filters_) { 602 for (const auto& filter : pre_init_sync_message_filters_) {
653 filter->set_is_channel_send_thread_safe( 603 filter->set_is_channel_send_thread_safe(
654 context()->IsChannelSendThreadSafe()); 604 context()->IsChannelSendThreadSafe());
655 } 605 }
656 pre_init_sync_message_filters_.clear(); 606 pre_init_sync_message_filters_.clear();
657 } 607 }
658 608
659 } // namespace IPC 609 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698