OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 #include <stdint.h> | 8 #include <stdint.h> |
9 | 9 |
10 #include <utility> | 10 #include <utility> |
11 | 11 |
12 #include "base/bind.h" | 12 #include "base/bind.h" |
13 #include "base/lazy_instance.h" | 13 #include "base/lazy_instance.h" |
14 #include "base/location.h" | 14 #include "base/location.h" |
15 #include "base/logging.h" | 15 #include "base/logging.h" |
| 16 #include "base/macros.h" |
16 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
17 #include "base/run_loop.h" | 18 #include "base/run_loop.h" |
18 #include "base/synchronization/waitable_event.h" | 19 #include "base/synchronization/waitable_event.h" |
19 #include "base/synchronization/waitable_event_watcher.h" | |
20 #include "base/threading/thread_local.h" | 20 #include "base/threading/thread_local.h" |
21 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
22 #include "base/trace_event/trace_event.h" | 22 #include "base/trace_event/trace_event.h" |
23 #include "ipc/ipc_channel_factory.h" | 23 #include "ipc/ipc_channel_factory.h" |
24 #include "ipc/ipc_logging.h" | 24 #include "ipc/ipc_logging.h" |
25 #include "ipc/ipc_message_macros.h" | 25 #include "ipc/ipc_message_macros.h" |
26 #include "ipc/ipc_sync_message.h" | 26 #include "ipc/ipc_sync_message.h" |
| 27 #include "ipc/mojo_event.h" |
| 28 #include "mojo/public/cpp/bindings/sync_handle_registry.h" |
27 | 29 |
28 using base::TimeDelta; | |
29 using base::TimeTicks; | |
30 using base::WaitableEvent; | 30 using base::WaitableEvent; |
31 | 31 |
32 namespace IPC { | 32 namespace IPC { |
| 33 |
| 34 namespace { |
| 35 |
| 36 // A generic callback used when watching handles synchronously. Sets |*signal| |
| 37 // to true. Also sets |*error| to true in case of an error. |
| 38 void OnSyncHandleReady(bool* signal, bool* error, MojoResult result) { |
| 39 *signal = true; |
| 40 *error = result != MOJO_RESULT_OK; |
| 41 } |
| 42 |
| 43 // A ReadyCallback for use with mojo::Watcher. Ignores the result (DCHECKs, but |
| 44 // is only used in cases where failure should be impossible) and runs |
| 45 // |callback|. |
| 46 void RunOnHandleReady(const base::Closure& callback, MojoResult result) { |
| 47 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); |
| 48 if (result == MOJO_RESULT_OK) |
| 49 callback.Run(); |
| 50 } |
| 51 |
| 52 class PumpMessagesEvent { |
| 53 public: |
| 54 PumpMessagesEvent() { event_.Signal(); } |
| 55 ~PumpMessagesEvent() {} |
| 56 |
| 57 const MojoEvent* event() const { return &event_; } |
| 58 |
| 59 private: |
| 60 MojoEvent event_; |
| 61 |
| 62 DISALLOW_COPY_AND_ASSIGN(PumpMessagesEvent); |
| 63 }; |
| 64 |
| 65 base::LazyInstance<PumpMessagesEvent>::Leaky g_pump_messages_event = |
| 66 LAZY_INSTANCE_INITIALIZER; |
| 67 |
| 68 } // namespace |
| 69 |
33 // When we're blocked in a Send(), we need to process incoming synchronous | 70 // When we're blocked in a Send(), we need to process incoming synchronous |
34 // messages right away because it could be blocking our reply (either | 71 // messages right away because it could be blocking our reply (either |
35 // directly from the same object we're calling, or indirectly through one or | 72 // directly from the same object we're calling, or indirectly through one or |
36 // more other channels). That means that in SyncContext's OnMessageReceived, | 73 // more other channels). That means that in SyncContext's OnMessageReceived, |
37 // we need to process sync message right away if we're blocked. However a | 74 // we need to process sync message right away if we're blocked. However a |
38 // simple check isn't sufficient, because the listener thread can be in the | 75 // simple check isn't sufficient, because the listener thread can be in the |
39 // process of calling Send. | 76 // process of calling Send. |
40 // To work around this, when SyncChannel filters a sync message, it sets | 77 // To work around this, when SyncChannel filters a sync message, it sets |
41 // an event that the listener thread waits on during its Send() call. This | 78 // an event that the listener thread waits on during its Send() call. This |
42 // allows us to dispatch incoming sync messages when blocked. The race | 79 // allows us to dispatch incoming sync messages when blocked. The race |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
149 iter++; | 186 iter++; |
150 } | 187 } |
151 } | 188 } |
152 | 189 |
153 if (--listener_count_ == 0) { | 190 if (--listener_count_ == 0) { |
154 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 191 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
155 lazy_tls_ptr_.Pointer()->Set(NULL); | 192 lazy_tls_ptr_.Pointer()->Set(NULL); |
156 } | 193 } |
157 } | 194 } |
158 | 195 |
159 WaitableEvent* dispatch_event() { return &dispatch_event_; } | 196 MojoEvent* dispatch_event() { return &dispatch_event_; } |
160 base::SingleThreadTaskRunner* listener_task_runner() { | 197 base::SingleThreadTaskRunner* listener_task_runner() { |
161 return listener_task_runner_.get(); | 198 return listener_task_runner_.get(); |
162 } | 199 } |
163 | 200 |
164 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 201 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
165 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 202 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
166 lazy_tls_ptr_; | 203 lazy_tls_ptr_; |
167 | 204 |
168 // Called on the ipc thread to check if we can unblock any current Send() | 205 // Called on the ipc thread to check if we can unblock any current Send() |
169 // calls based on a queued reply. | 206 // calls based on a queued reply. |
170 void DispatchReplies() { | 207 void DispatchReplies() { |
171 for (size_t i = 0; i < received_replies_.size(); ++i) { | 208 for (size_t i = 0; i < received_replies_.size(); ++i) { |
172 Message* message = received_replies_[i].message; | 209 Message* message = received_replies_[i].message; |
173 if (received_replies_[i].context->TryToUnblockListener(message)) { | 210 if (received_replies_[i].context->TryToUnblockListener(message)) { |
174 delete message; | 211 delete message; |
175 received_replies_.erase(received_replies_.begin() + i); | 212 received_replies_.erase(received_replies_.begin() + i); |
176 return; | 213 return; |
177 } | 214 } |
178 } | 215 } |
179 } | 216 } |
180 | 217 |
181 base::WaitableEventWatcher* top_send_done_watcher() { | 218 mojo::Watcher* top_send_done_watcher() { |
182 return top_send_done_watcher_; | 219 return top_send_done_watcher_; |
183 } | 220 } |
184 | 221 |
185 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { | 222 void set_top_send_done_watcher(mojo::Watcher* watcher) { |
186 top_send_done_watcher_ = watcher; | 223 top_send_done_watcher_ = watcher; |
187 } | 224 } |
188 | 225 |
189 private: | 226 private: |
190 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 227 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
191 | 228 |
192 // See the comment in SyncChannel::SyncChannel for why this event is created | 229 // See the comment in SyncChannel::SyncChannel for why this event is created |
193 // as manual reset. | 230 // as manual reset. |
194 ReceivedSyncMsgQueue() | 231 ReceivedSyncMsgQueue() |
195 : message_queue_version_(0), | 232 : message_queue_version_(0), |
196 dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL, | |
197 base::WaitableEvent::InitialState::NOT_SIGNALED), | |
198 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 233 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
199 task_pending_(false), | 234 task_pending_(false), |
200 listener_count_(0), | 235 listener_count_(0), |
201 top_send_done_watcher_(NULL) {} | 236 top_send_done_watcher_(NULL) {} |
202 | 237 |
203 ~ReceivedSyncMsgQueue() {} | 238 ~ReceivedSyncMsgQueue() {} |
204 | 239 |
205 // Holds information about a queued synchronous message or reply. | 240 // Holds information about a queued synchronous message or reply. |
206 struct QueuedMessage { | 241 struct QueuedMessage { |
207 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 242 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
208 Message* message; | 243 Message* message; |
209 scoped_refptr<SyncChannel::SyncContext> context; | 244 scoped_refptr<SyncChannel::SyncContext> context; |
210 }; | 245 }; |
211 | 246 |
212 typedef std::list<QueuedMessage> SyncMessageQueue; | 247 typedef std::list<QueuedMessage> SyncMessageQueue; |
213 SyncMessageQueue message_queue_; | 248 SyncMessageQueue message_queue_; |
214 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan | 249 uint32_t message_queue_version_; // Used to signal DispatchMessages to rescan |
215 | 250 |
216 std::vector<QueuedMessage> received_replies_; | 251 std::vector<QueuedMessage> received_replies_; |
217 | 252 |
218 // Set when we got a synchronous message that we must respond to as the | 253 // Signaled when we get a synchronous message that we must respond to, as the |
219 // sender needs its reply before it can reply to our original synchronous | 254 // sender needs its reply before it can reply to our original synchronous |
220 // message. | 255 // message. |
221 WaitableEvent dispatch_event_; | 256 MojoEvent dispatch_event_; |
222 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; | 257 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
223 base::Lock message_lock_; | 258 base::Lock message_lock_; |
224 bool task_pending_; | 259 bool task_pending_; |
225 int listener_count_; | 260 int listener_count_; |
226 | 261 |
227 // The current send done event watcher for this thread. Used to maintain | 262 // The current send done handle watcher for this thread. Used to maintain |
228 // a local global stack of send done watchers to ensure that nested sync | 263 // a thread-local stack of send done watchers to ensure that nested sync |
229 // message loops complete correctly. | 264 // message loops complete correctly. |
230 base::WaitableEventWatcher* top_send_done_watcher_; | 265 mojo::Watcher* top_send_done_watcher_; |
231 }; | 266 }; |
232 | 267 |
233 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 268 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
234 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 269 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
235 LAZY_INSTANCE_INITIALIZER; | 270 LAZY_INSTANCE_INITIALIZER; |
236 | 271 |
237 SyncChannel::SyncContext::SyncContext( | 272 SyncChannel::SyncContext::SyncContext( |
238 Listener* listener, | 273 Listener* listener, |
239 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 274 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
240 WaitableEvent* shutdown_event) | 275 WaitableEvent* shutdown_event) |
241 : ChannelProxy::Context(listener, ipc_task_runner), | 276 : ChannelProxy::Context(listener, ipc_task_runner), |
242 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 277 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
243 shutdown_event_(shutdown_event), | 278 shutdown_event_(shutdown_event), |
244 restrict_dispatch_group_(kRestrictDispatchGroup_None) { | 279 restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
245 } | 280 } |
246 | 281 |
247 SyncChannel::SyncContext::~SyncContext() { | 282 SyncChannel::SyncContext::~SyncContext() { |
248 while (!deserializers_.empty()) | 283 while (!deserializers_.empty()) |
249 Pop(); | 284 Pop(); |
250 } | 285 } |
251 | 286 |
252 // Adds information about an outgoing sync message to the context so that | 287 // Adds information about an outgoing sync message to the context so that |
253 // we know how to deserialize the reply. Returns a handle that's set when | 288 // we know how to deserialize the reply. Returns |true| if the message was added |
254 // the reply has arrived. | 289 // to the context or |false| if it was rejected (e.g. due to shutdown.) |
255 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { | 290 bool SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
256 // Create the tracking information for this message. This object is stored | 291 // Create the tracking information for this message. This object is stored |
257 // by value since all members are pointers that are cheap to copy. These | 292 // by value since all members are pointers that are cheap to copy. These |
258 // pointers are cleaned up in the Pop() function. | 293 // pointers are cleaned up in the Pop() function. |
259 // | 294 // |
260 // The event is created as manual reset because in between Signal and | 295 // The event is created as manual reset because in between Signal and |
261 // OnObjectSignalled, another Send can happen which would stop the watcher | 296 // OnObjectSignalled, another Send can happen which would stop the watcher |
262 // from being called. The event would get watched later, when the nested | 297 // from being called. The event would get watched later, when the nested |
263 // Send completes, so the event will need to remain set. | 298 // Send completes, so the event will need to remain set. |
| 299 base::AutoLock auto_lock(deserializers_lock_); |
| 300 if (reject_new_deserializers_) |
| 301 return false; |
264 PendingSyncMsg pending( | 302 PendingSyncMsg pending( |
265 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), | 303 SyncMessage::GetMessageId(*sync_msg), sync_msg->GetReplyDeserializer(), |
266 new WaitableEvent(base::WaitableEvent::ResetPolicy::MANUAL, | 304 new MojoEvent); |
267 base::WaitableEvent::InitialState::NOT_SIGNALED)); | |
268 base::AutoLock auto_lock(deserializers_lock_); | |
269 deserializers_.push_back(pending); | 305 deserializers_.push_back(pending); |
| 306 return true; |
270 } | 307 } |
271 | 308 |
272 bool SyncChannel::SyncContext::Pop() { | 309 bool SyncChannel::SyncContext::Pop() { |
273 bool result; | 310 bool result; |
274 { | 311 { |
275 base::AutoLock auto_lock(deserializers_lock_); | 312 base::AutoLock auto_lock(deserializers_lock_); |
276 PendingSyncMsg msg = deserializers_.back(); | 313 PendingSyncMsg msg = deserializers_.back(); |
277 delete msg.deserializer; | 314 delete msg.deserializer; |
278 delete msg.done_event; | 315 delete msg.done_event; |
279 msg.done_event = NULL; | 316 msg.done_event = nullptr; |
280 deserializers_.pop_back(); | 317 deserializers_.pop_back(); |
281 result = msg.send_result; | 318 result = msg.send_result; |
282 } | 319 } |
283 | 320 |
284 // We got a reply to a synchronous Send() call that's blocking the listener | 321 // We got a reply to a synchronous Send() call that's blocking the listener |
285 // thread. However, further down the call stack there could be another | 322 // thread. However, further down the call stack there could be another |
286 // blocking Send() call, whose reply we received after we made this last | 323 // blocking Send() call, whose reply we received after we made this last |
287 // Send() call. So check if we have any queued replies available that | 324 // Send() call. So check if we have any queued replies available that |
288 // can now unblock the listener thread. | 325 // can now unblock the listener thread. |
289 ipc_task_runner()->PostTask( | 326 ipc_task_runner()->PostTask( |
290 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 327 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
291 received_sync_msgs_.get())); | 328 received_sync_msgs_.get())); |
292 | 329 |
293 return result; | 330 return result; |
294 } | 331 } |
295 | 332 |
296 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 333 MojoEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
297 base::AutoLock auto_lock(deserializers_lock_); | 334 base::AutoLock auto_lock(deserializers_lock_); |
298 return deserializers_.back().done_event; | 335 return deserializers_.back().done_event; |
299 } | 336 } |
300 | 337 |
301 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 338 MojoEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
302 return received_sync_msgs_->dispatch_event(); | 339 return received_sync_msgs_->dispatch_event(); |
303 } | 340 } |
304 | 341 |
305 void SyncChannel::SyncContext::DispatchMessages() { | 342 void SyncChannel::SyncContext::DispatchMessages() { |
306 received_sync_msgs_->DispatchMessages(this); | 343 received_sync_msgs_->DispatchMessages(this); |
307 } | 344 } |
308 | 345 |
309 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 346 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
310 base::AutoLock auto_lock(deserializers_lock_); | 347 base::AutoLock auto_lock(deserializers_lock_); |
311 if (deserializers_.empty() || | 348 if (deserializers_.empty() || |
312 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 349 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
313 return false; | 350 return false; |
314 } | 351 } |
315 | 352 |
316 if (!msg->is_reply_error()) { | 353 if (!msg->is_reply_error()) { |
317 bool send_result = deserializers_.back().deserializer-> | 354 bool send_result = deserializers_.back().deserializer-> |
318 SerializeOutputParameters(*msg); | 355 SerializeOutputParameters(*msg); |
319 deserializers_.back().send_result = send_result; | 356 deserializers_.back().send_result = send_result; |
320 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; | 357 DVLOG_IF(1, !send_result) << "Couldn't deserialize reply message"; |
321 } else { | 358 } else { |
322 DVLOG(1) << "Received error reply"; | 359 DVLOG(1) << "Received error reply"; |
323 } | 360 } |
324 | 361 |
325 base::WaitableEvent* done_event = deserializers_.back().done_event; | 362 MojoEvent* done_event = deserializers_.back().done_event; |
326 TRACE_EVENT_FLOW_BEGIN0( | 363 TRACE_EVENT_FLOW_BEGIN0( |
327 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 364 TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
328 "SyncChannel::SyncContext::TryToUnblockListener", done_event); | 365 "SyncChannel::SyncContext::TryToUnblockListener", done_event); |
329 | 366 |
330 done_event->Signal(); | 367 done_event->Signal(); |
331 | 368 |
332 return true; | 369 return true; |
333 } | 370 } |
334 | 371 |
335 void SyncChannel::SyncContext::Clear() { | 372 void SyncChannel::SyncContext::Clear() { |
(...skipping 25 matching lines...) Expand all Loading... |
361 | 398 |
362 void SyncChannel::SyncContext::OnChannelError() { | 399 void SyncChannel::SyncContext::OnChannelError() { |
363 CancelPendingSends(); | 400 CancelPendingSends(); |
364 shutdown_watcher_.StopWatching(); | 401 shutdown_watcher_.StopWatching(); |
365 Context::OnChannelError(); | 402 Context::OnChannelError(); |
366 } | 403 } |
367 | 404 |
368 void SyncChannel::SyncContext::OnChannelOpened() { | 405 void SyncChannel::SyncContext::OnChannelOpened() { |
369 shutdown_watcher_.StartWatching( | 406 shutdown_watcher_.StartWatching( |
370 shutdown_event_, | 407 shutdown_event_, |
371 base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, | 408 base::Bind(&SyncChannel::SyncContext::OnShutdownEventSignaled, |
372 base::Unretained(this))); | 409 base::Unretained(this))); |
373 Context::OnChannelOpened(); | 410 Context::OnChannelOpened(); |
374 } | 411 } |
375 | 412 |
376 void SyncChannel::SyncContext::OnChannelClosed() { | 413 void SyncChannel::SyncContext::OnChannelClosed() { |
377 CancelPendingSends(); | 414 CancelPendingSends(); |
378 shutdown_watcher_.StopWatching(); | 415 shutdown_watcher_.StopWatching(); |
379 Context::OnChannelClosed(); | 416 Context::OnChannelClosed(); |
380 } | 417 } |
381 | 418 |
382 void SyncChannel::SyncContext::CancelPendingSends() { | 419 void SyncChannel::SyncContext::CancelPendingSends() { |
383 base::AutoLock auto_lock(deserializers_lock_); | 420 base::AutoLock auto_lock(deserializers_lock_); |
| 421 reject_new_deserializers_ = true; |
384 PendingSyncMessageQueue::iterator iter; | 422 PendingSyncMessageQueue::iterator iter; |
385 DVLOG(1) << "Canceling pending sends"; | 423 DVLOG(1) << "Canceling pending sends"; |
386 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { | 424 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
387 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 425 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
388 "SyncChannel::SyncContext::CancelPendingSends", | 426 "SyncChannel::SyncContext::CancelPendingSends", |
389 iter->done_event); | 427 iter->done_event); |
390 iter->done_event->Signal(); | 428 iter->done_event->Signal(); |
391 } | 429 } |
392 } | 430 } |
393 | 431 |
394 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { | 432 void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) { |
395 if (event == shutdown_event_) { | 433 DCHECK_EQ(event, shutdown_event_); |
396 // Process shut down before we can get a reply to a synchronous message. | |
397 // Cancel pending Send calls, which will end up setting the send done event. | |
398 CancelPendingSends(); | |
399 } else { | |
400 // We got the reply, timed out or the process shutdown. | |
401 DCHECK_EQ(GetSendDoneEvent(), event); | |
402 base::MessageLoop::current()->QuitNow(); | |
403 } | |
404 } | |
405 | 434 |
406 base::WaitableEventWatcher::EventCallback | 435 // Process shut down before we can get a reply to a synchronous message. |
407 SyncChannel::SyncContext::MakeWaitableEventCallback() { | 436 // Cancel pending Send calls, which will end up setting the send done event. |
408 return base::Bind(&SyncChannel::SyncContext::OnWaitableEventSignaled, this); | 437 CancelPendingSends(); |
409 } | 438 } |
410 | 439 |
411 // static | 440 // static |
412 std::unique_ptr<SyncChannel> SyncChannel::Create( | 441 std::unique_ptr<SyncChannel> SyncChannel::Create( |
413 const IPC::ChannelHandle& channel_handle, | 442 const IPC::ChannelHandle& channel_handle, |
414 Channel::Mode mode, | 443 Channel::Mode mode, |
415 Listener* listener, | 444 Listener* listener, |
416 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 445 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
417 bool create_pipe_now, | 446 bool create_pipe_now, |
418 base::WaitableEvent* shutdown_event) { | 447 base::WaitableEvent* shutdown_event) { |
(...skipping 22 matching lines...) Expand all Loading... |
441 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 470 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
442 WaitableEvent* shutdown_event) { | 471 WaitableEvent* shutdown_event) { |
443 return base::WrapUnique( | 472 return base::WrapUnique( |
444 new SyncChannel(listener, ipc_task_runner, shutdown_event)); | 473 new SyncChannel(listener, ipc_task_runner, shutdown_event)); |
445 } | 474 } |
446 | 475 |
447 SyncChannel::SyncChannel( | 476 SyncChannel::SyncChannel( |
448 Listener* listener, | 477 Listener* listener, |
449 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, | 478 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner, |
450 WaitableEvent* shutdown_event) | 479 WaitableEvent* shutdown_event) |
451 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)) { | 480 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
| 481 sync_handle_registry_(mojo::SyncHandleRegistry::current()) { |
452 // The current (listener) thread must be distinct from the IPC thread, or else | 482 // The current (listener) thread must be distinct from the IPC thread, or else |
453 // sending synchronous messages will deadlock. | 483 // sending synchronous messages will deadlock. |
454 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); | 484 DCHECK_NE(ipc_task_runner.get(), base::ThreadTaskRunnerHandle::Get().get()); |
455 StartWatching(); | 485 StartWatching(); |
456 } | 486 } |
457 | 487 |
458 SyncChannel::~SyncChannel() { | 488 SyncChannel::~SyncChannel() { |
459 } | 489 } |
460 | 490 |
461 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 491 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
(...skipping 18 matching lines...) Expand all Loading... |
480 #else | 510 #else |
481 TRACE_EVENT2("ipc", "SyncChannel::Send", | 511 TRACE_EVENT2("ipc", "SyncChannel::Send", |
482 "class", IPC_MESSAGE_ID_CLASS(message->type()), | 512 "class", IPC_MESSAGE_ID_CLASS(message->type()), |
483 "line", IPC_MESSAGE_ID_LINE(message->type())); | 513 "line", IPC_MESSAGE_ID_LINE(message->type())); |
484 #endif | 514 #endif |
485 if (!message->is_sync()) { | 515 if (!message->is_sync()) { |
486 ChannelProxy::Send(message); | 516 ChannelProxy::Send(message); |
487 return true; | 517 return true; |
488 } | 518 } |
489 | 519 |
| 520 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| 521 bool pump_messages = sync_msg->ShouldPumpMessages(); |
| 522 |
490 // *this* might get deleted in WaitForReply. | 523 // *this* might get deleted in WaitForReply. |
491 scoped_refptr<SyncContext> context(sync_context()); | 524 scoped_refptr<SyncContext> context(sync_context()); |
492 if (context->shutdown_event()->IsSignaled()) { | 525 if (!context->Push(sync_msg)) { |
493 DVLOG(1) << "shutdown event is signaled"; | 526 DVLOG(1) << "Channel is shutting down. Dropping sync message."; |
494 delete message; | 527 delete message; |
495 return false; | 528 return false; |
496 } | 529 } |
497 | 530 |
498 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); | |
499 context->Push(sync_msg); | |
500 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); | |
501 | |
502 ChannelProxy::Send(message); | 531 ChannelProxy::Send(message); |
503 | 532 |
504 // Wait for reply, or for any other incoming synchronous messages. | 533 // Wait for reply, or for any other incoming synchronous messages. |
505 // *this* might get deleted, so only call static functions at this point. | 534 // *this* might get deleted, so only call static functions at this point. |
506 WaitForReply(context.get(), pump_messages_event); | 535 scoped_refptr<mojo::SyncHandleRegistry> registry = sync_handle_registry_; |
| 536 WaitForReply(registry.get(), context.get(), pump_messages); |
507 | 537 |
508 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), | 538 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"), |
509 "SyncChannel::Send", context->GetSendDoneEvent()); | 539 "SyncChannel::Send", context->GetSendDoneEvent()); |
510 | 540 |
511 return context->Pop(); | 541 return context->Pop(); |
512 } | 542 } |
513 | 543 |
514 void SyncChannel::WaitForReply( | 544 void SyncChannel::WaitForReply(mojo::SyncHandleRegistry* registry, |
515 SyncContext* context, WaitableEvent* pump_messages_event) { | 545 SyncContext* context, |
| 546 bool pump_messages) { |
516 context->DispatchMessages(); | 547 context->DispatchMessages(); |
| 548 |
| 549 const MojoEvent* pump_messages_event = nullptr; |
| 550 if (pump_messages) |
| 551 pump_messages_event = g_pump_messages_event.Get().event(); |
| 552 |
517 while (true) { | 553 while (true) { |
518 WaitableEvent* objects[] = { | 554 bool dispatch = false; |
519 context->GetDispatchEvent(), | 555 bool send_done = false; |
520 context->GetSendDoneEvent(), | 556 bool should_pump_messages = false; |
521 pump_messages_event | 557 bool error = false; |
522 }; | 558 registry->RegisterHandle(context->GetDispatchEvent()->GetHandle(), |
| 559 MOJO_HANDLE_SIGNAL_READABLE, |
| 560 base::Bind(&OnSyncHandleReady, &dispatch, &error)); |
| 561 registry->RegisterHandle( |
| 562 context->GetSendDoneEvent()->GetHandle(), |
| 563 MOJO_HANDLE_SIGNAL_READABLE, |
| 564 base::Bind(&OnSyncHandleReady, &send_done, &error)); |
| 565 if (pump_messages_event) { |
| 566 registry->RegisterHandle( |
| 567 pump_messages_event->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 568 base::Bind(&OnSyncHandleReady, &should_pump_messages, &error)); |
| 569 } |
523 | 570 |
524 unsigned count = pump_messages_event ? 3: 2; | 571 const bool* stop_flags[] = { &dispatch, &send_done, &should_pump_messages }; |
525 size_t result = WaitableEvent::WaitMany(objects, count); | 572 registry->WatchAllHandles(stop_flags, 3); |
526 if (result == 0 /* dispatch event */) { | 573 DCHECK(!error); |
| 574 |
| 575 registry->UnregisterHandle(context->GetDispatchEvent()->GetHandle()); |
| 576 registry->UnregisterHandle(context->GetSendDoneEvent()->GetHandle()); |
| 577 if (pump_messages_event) |
| 578 registry->UnregisterHandle(pump_messages_event->GetHandle()); |
| 579 |
| 580 if (dispatch) { |
527 // We're waiting for a reply, but we received a blocking synchronous | 581 // We're waiting for a reply, but we received a blocking synchronous |
528 // call. We must process it or otherwise a deadlock might occur. | 582 // call. We must process it or otherwise a deadlock might occur. |
529 context->GetDispatchEvent()->Reset(); | 583 context->GetDispatchEvent()->Reset(); |
530 context->DispatchMessages(); | 584 context->DispatchMessages(); |
531 continue; | 585 continue; |
532 } | 586 } |
533 | 587 |
534 if (result == 2 /* pump_messages_event */) | 588 if (should_pump_messages) |
535 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. | 589 WaitForReplyWithNestedMessageLoop(context); // Run a nested message loop. |
536 | 590 |
537 break; | 591 break; |
538 } | 592 } |
539 } | 593 } |
540 | 594 |
541 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { | 595 void SyncChannel::WaitForReplyWithNestedMessageLoop(SyncContext* context) { |
542 base::WaitableEventWatcher send_done_watcher; | 596 mojo::Watcher send_done_watcher; |
543 | 597 |
544 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); | 598 ReceivedSyncMsgQueue* sync_msg_queue = context->received_sync_msgs(); |
545 DCHECK(sync_msg_queue != NULL); | 599 DCHECK_NE(sync_msg_queue, nullptr); |
546 | 600 |
547 base::WaitableEventWatcher* old_send_done_event_watcher = | 601 mojo::Watcher* old_watcher = sync_msg_queue->top_send_done_watcher(); |
548 sync_msg_queue->top_send_done_watcher(); | 602 mojo::Handle old_handle(mojo::kInvalidHandleValue); |
| 603 mojo::Watcher::ReadyCallback old_callback; |
549 | 604 |
550 base::WaitableEventWatcher::EventCallback old_callback; | 605 // Maintain a thread-local stack of watchers to ensure nested calls complete |
551 base::WaitableEvent* old_event = NULL; | 606 // in the correct sequence, i.e. the outermost call completes first, etc. |
552 | 607 if (old_watcher) { |
553 // Maintain a local global stack of send done delegates to ensure that | 608 old_callback = old_watcher->ready_callback(); |
554 // nested sync calls complete in the correct sequence, i.e. the | 609 old_handle = old_watcher->handle(); |
555 // outermost call completes first, etc. | 610 old_watcher->Cancel(); |
556 if (old_send_done_event_watcher) { | |
557 old_callback = old_send_done_event_watcher->callback(); | |
558 old_event = old_send_done_event_watcher->GetWatchedEvent(); | |
559 old_send_done_event_watcher->StopWatching(); | |
560 } | 611 } |
561 | 612 |
562 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); | 613 sync_msg_queue->set_top_send_done_watcher(&send_done_watcher); |
563 | 614 |
564 send_done_watcher.StartWatching(context->GetSendDoneEvent(), | 615 { |
565 context->MakeWaitableEventCallback()); | 616 base::RunLoop nested_loop; |
| 617 send_done_watcher.Start( |
| 618 context->GetSendDoneEvent()->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 619 base::Bind(&RunOnHandleReady, nested_loop.QuitClosure())); |
566 | 620 |
567 { | |
568 base::MessageLoop::ScopedNestableTaskAllower allow( | 621 base::MessageLoop::ScopedNestableTaskAllower allow( |
569 base::MessageLoop::current()); | 622 base::MessageLoop::current()); |
570 base::RunLoop().Run(); | 623 nested_loop.Run(); |
| 624 send_done_watcher.Cancel(); |
571 } | 625 } |
572 | 626 |
573 sync_msg_queue->set_top_send_done_watcher(old_send_done_event_watcher); | 627 sync_msg_queue->set_top_send_done_watcher(old_watcher); |
574 if (old_send_done_event_watcher && old_event) { | 628 if (old_watcher) |
575 old_send_done_event_watcher->StartWatching(old_event, old_callback); | 629 old_watcher->Start(old_handle, MOJO_HANDLE_SIGNAL_READABLE, old_callback); |
576 } | |
577 } | 630 } |
578 | 631 |
579 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { | 632 void SyncChannel::OnDispatchHandleReady(MojoResult result) { |
580 DCHECK(event == sync_context()->GetDispatchEvent()); | 633 DCHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_ABORTED); |
581 // The call to DispatchMessages might delete this object, so reregister | 634 if (result == MOJO_RESULT_OK) { |
582 // the object watcher first. | 635 sync_context()->GetDispatchEvent()->Reset(); |
583 event->Reset(); | 636 sync_context()->DispatchMessages(); |
584 dispatch_watcher_.StartWatching(event, dispatch_watcher_callback_); | 637 } |
585 sync_context()->DispatchMessages(); | |
586 } | 638 } |
587 | 639 |
588 void SyncChannel::StartWatching() { | 640 void SyncChannel::StartWatching() { |
589 // Ideally we only want to watch this object when running a nested message | 641 // Ideally we only want to watch this object when running a nested message |
590 // loop. However, we don't know when it exits if there's another nested | 642 // loop. However, we don't know when it exits if there's another nested |
591 // message loop running under it or not, so we wouldn't know whether to | 643 // message loop running under it or not, so we wouldn't know whether to |
592 // stop or keep watching. So we always watch it, and create the event as | 644 // stop or keep watching. So we always watch it. |
593 // manual reset since the object watcher might otherwise reset the event | 645 dispatch_watcher_.Start(sync_context()->GetDispatchEvent()->GetHandle(), |
594 // when we're doing a WaitMany. | 646 MOJO_HANDLE_SIGNAL_READABLE, |
595 dispatch_watcher_callback_ = | 647 base::Bind(&SyncChannel::OnDispatchHandleReady, |
596 base::Bind(&SyncChannel::OnWaitableEventSignaled, | 648 base::Unretained(this))); |
597 base::Unretained(this)); | |
598 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), | |
599 dispatch_watcher_callback_); | |
600 } | 649 } |
601 | 650 |
602 void SyncChannel::OnChannelInit() { | 651 void SyncChannel::OnChannelInit() { |
603 for (const auto& filter : pre_init_sync_message_filters_) { | 652 for (const auto& filter : pre_init_sync_message_filters_) { |
604 filter->set_is_channel_send_thread_safe( | 653 filter->set_is_channel_send_thread_safe( |
605 context()->IsChannelSendThreadSafe()); | 654 context()->IsChannelSendThreadSafe()); |
606 } | 655 } |
607 pre_init_sync_message_filters_.clear(); | 656 pre_init_sync_message_filters_.clear(); |
608 } | 657 } |
609 | 658 |
610 } // namespace IPC | 659 } // namespace IPC |
OLD | NEW |