OLD | NEW |
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include "base/lazy_instance.h" | 7 #include "base/lazy_instance.h" |
8 #include "base/logging.h" | 8 #include "base/logging.h" |
9 #include "base/message_loop.h" | 9 #include "base/message_loop.h" |
10 #include "base/threading/thread_local.h" | 10 #include "base/threading/thread_local.h" |
11 #include "base/synchronization/waitable_event.h" | 11 #include "base/synchronization/waitable_event.h" |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
61 task_pending_ = true; | 61 task_pending_ = true; |
62 | 62 |
63 // We set the event in case the listener thread is blocked (or is about | 63 // We set the event in case the listener thread is blocked (or is about |
64 // to). In case it's not, the PostTask dispatches the messages. | 64 // to). In case it's not, the PostTask dispatches the messages. |
65 message_queue_.push_back(QueuedMessage(new Message(msg), context)); | 65 message_queue_.push_back(QueuedMessage(new Message(msg), context)); |
66 } | 66 } |
67 | 67 |
68 dispatch_event_.Signal(); | 68 dispatch_event_.Signal(); |
69 if (!was_task_pending) { | 69 if (!was_task_pending) { |
70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( | 70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
71 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); | 71 this, |
| 72 &ReceivedSyncMsgQueue::DispatchMessagesTask, |
| 73 scoped_refptr<SyncContext>(context))); |
72 } | 74 } |
73 } | 75 } |
74 | 76 |
75 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 77 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
76 received_replies_.push_back(QueuedMessage(new Message(msg), context)); | 78 received_replies_.push_back(QueuedMessage(new Message(msg), context)); |
77 } | 79 } |
78 | 80 |
79 // Called on the listener's thread to process any queues synchronous | 81 // Called on the listener's thread to process any queues synchronous |
80 // messages. | 82 // messages. |
81 void DispatchMessagesTask() { | 83 void DispatchMessagesTask(SyncContext* context) { |
82 { | 84 { |
83 base::AutoLock auto_lock(message_lock_); | 85 base::AutoLock auto_lock(message_lock_); |
84 task_pending_ = false; | 86 task_pending_ = false; |
85 } | 87 } |
86 DispatchMessages(); | 88 context->DispatchMessages(); |
87 } | 89 } |
88 | 90 |
89 void DispatchMessages() { | 91 void DispatchMessages(SyncContext* dispatching_context) { |
| 92 SyncMessageQueue delayed_queue; |
90 while (true) { | 93 while (true) { |
91 Message* message; | 94 Message* message; |
92 scoped_refptr<SyncChannel::SyncContext> context; | 95 scoped_refptr<SyncChannel::SyncContext> context; |
93 { | 96 { |
94 base::AutoLock auto_lock(message_lock_); | 97 base::AutoLock auto_lock(message_lock_); |
95 if (message_queue_.empty()) | 98 if (message_queue_.empty()) { |
| 99 message_queue_ = delayed_queue; |
96 break; | 100 break; |
| 101 } |
97 | 102 |
98 message = message_queue_.front().message; | 103 message = message_queue_.front().message; |
99 context = message_queue_.front().context; | 104 context = message_queue_.front().context; |
100 message_queue_.pop_front(); | 105 message_queue_.pop_front(); |
101 } | 106 } |
102 | 107 if (context->restrict_dispatch() && context != dispatching_context) { |
103 context->OnDispatchMessage(*message); | 108 delayed_queue.push_back(QueuedMessage(message, context)); |
104 delete message; | 109 } else { |
| 110 context->OnDispatchMessage(*message); |
| 111 delete message; |
| 112 } |
105 } | 113 } |
106 } | 114 } |
107 | 115 |
108 // SyncChannel calls this in its destructor. | 116 // SyncChannel calls this in its destructor. |
109 void RemoveContext(SyncContext* context) { | 117 void RemoveContext(SyncContext* context) { |
110 base::AutoLock auto_lock(message_lock_); | 118 base::AutoLock auto_lock(message_lock_); |
111 | 119 |
112 SyncMessageQueue::iterator iter = message_queue_.begin(); | 120 SyncMessageQueue::iterator iter = message_queue_.begin(); |
113 while (iter != message_queue_.end()) { | 121 while (iter != message_queue_.end()) { |
114 if (iter->context == context) { | 122 if (iter->context == context) { |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
197 | 205 |
198 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 206 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
199 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); | 207 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); |
200 | 208 |
201 SyncChannel::SyncContext::SyncContext( | 209 SyncChannel::SyncContext::SyncContext( |
202 Channel::Listener* listener, | 210 Channel::Listener* listener, |
203 MessageLoop* ipc_thread, | 211 MessageLoop* ipc_thread, |
204 WaitableEvent* shutdown_event) | 212 WaitableEvent* shutdown_event) |
205 : ChannelProxy::Context(listener, ipc_thread), | 213 : ChannelProxy::Context(listener, ipc_thread), |
206 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 214 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
207 shutdown_event_(shutdown_event) { | 215 shutdown_event_(shutdown_event), |
| 216 restrict_dispatch_(false) { |
208 } | 217 } |
209 | 218 |
210 SyncChannel::SyncContext::~SyncContext() { | 219 SyncChannel::SyncContext::~SyncContext() { |
211 while (!deserializers_.empty()) | 220 while (!deserializers_.empty()) |
212 Pop(); | 221 Pop(); |
213 } | 222 } |
214 | 223 |
215 // Adds information about an outgoing sync message to the context so that | 224 // Adds information about an outgoing sync message to the context so that |
216 // we know how to deserialize the reply. Returns a handle that's set when | 225 // we know how to deserialize the reply. Returns a handle that's set when |
217 // the reply has arrived. | 226 // the reply has arrived. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
253 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 262 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
254 base::AutoLock auto_lock(deserializers_lock_); | 263 base::AutoLock auto_lock(deserializers_lock_); |
255 return deserializers_.back().done_event; | 264 return deserializers_.back().done_event; |
256 } | 265 } |
257 | 266 |
258 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 267 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
259 return received_sync_msgs_->dispatch_event(); | 268 return received_sync_msgs_->dispatch_event(); |
260 } | 269 } |
261 | 270 |
262 void SyncChannel::SyncContext::DispatchMessages() { | 271 void SyncChannel::SyncContext::DispatchMessages() { |
263 received_sync_msgs_->DispatchMessages(); | 272 received_sync_msgs_->DispatchMessages(this); |
264 } | 273 } |
265 | 274 |
266 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 275 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
267 base::AutoLock auto_lock(deserializers_lock_); | 276 base::AutoLock auto_lock(deserializers_lock_); |
268 if (deserializers_.empty() || | 277 if (deserializers_.empty() || |
269 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 278 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
270 return false; | 279 return false; |
271 } | 280 } |
272 | 281 |
273 if (!msg->is_reply_error()) { | 282 if (!msg->is_reply_error()) { |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
371 // message loop running under it or not, so we wouldn't know whether to | 380 // message loop running under it or not, so we wouldn't know whether to |
372 // stop or keep watching. So we always watch it, and create the event as | 381 // stop or keep watching. So we always watch it, and create the event as |
373 // manual reset since the object watcher might otherwise reset the event | 382 // manual reset since the object watcher might otherwise reset the event |
374 // when we're doing a WaitMany. | 383 // when we're doing a WaitMany. |
375 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); | 384 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
376 } | 385 } |
377 | 386 |
378 SyncChannel::~SyncChannel() { | 387 SyncChannel::~SyncChannel() { |
379 } | 388 } |
380 | 389 |
| 390 void SyncChannel::SetRestrictDispatchToSameChannel(bool value) { |
| 391 sync_context()->set_restrict_dispatch(value); |
| 392 } |
| 393 |
381 bool SyncChannel::Send(Message* message) { | 394 bool SyncChannel::Send(Message* message) { |
382 return SendWithTimeout(message, base::kNoTimeout); | 395 return SendWithTimeout(message, base::kNoTimeout); |
383 } | 396 } |
384 | 397 |
385 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { | 398 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { |
386 if (!message->is_sync()) { | 399 if (!message->is_sync()) { |
387 ChannelProxy::Send(message); | 400 ChannelProxy::Send(message); |
388 return true; | 401 return true; |
389 } | 402 } |
390 | 403 |
(...skipping 24 matching lines...) Expand all Loading... |
415 | 428 |
416 // Wait for reply, or for any other incoming synchronous messages. | 429 // Wait for reply, or for any other incoming synchronous messages. |
417 // *this* might get deleted, so only call static functions at this point. | 430 // *this* might get deleted, so only call static functions at this point. |
418 WaitForReply(context, pump_messages_event); | 431 WaitForReply(context, pump_messages_event); |
419 | 432 |
420 return context->Pop(); | 433 return context->Pop(); |
421 } | 434 } |
422 | 435 |
423 void SyncChannel::WaitForReply( | 436 void SyncChannel::WaitForReply( |
424 SyncContext* context, WaitableEvent* pump_messages_event) { | 437 SyncContext* context, WaitableEvent* pump_messages_event) { |
| 438 context->DispatchMessages(); |
425 while (true) { | 439 while (true) { |
426 WaitableEvent* objects[] = { | 440 WaitableEvent* objects[] = { |
427 context->GetDispatchEvent(), | 441 context->GetDispatchEvent(), |
428 context->GetSendDoneEvent(), | 442 context->GetSendDoneEvent(), |
429 pump_messages_event | 443 pump_messages_event |
430 }; | 444 }; |
431 | 445 |
432 unsigned count = pump_messages_event ? 3: 2; | 446 unsigned count = pump_messages_event ? 3: 2; |
433 size_t result = WaitableEvent::WaitMany(objects, count); | 447 size_t result = WaitableEvent::WaitMany(objects, count); |
434 if (result == 0 /* dispatch event */) { | 448 if (result == 0 /* dispatch event */) { |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
485 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { | 499 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { |
486 DCHECK(event == sync_context()->GetDispatchEvent()); | 500 DCHECK(event == sync_context()->GetDispatchEvent()); |
487 // The call to DispatchMessages might delete this object, so reregister | 501 // The call to DispatchMessages might delete this object, so reregister |
488 // the object watcher first. | 502 // the object watcher first. |
489 event->Reset(); | 503 event->Reset(); |
490 dispatch_watcher_.StartWatching(event, this); | 504 dispatch_watcher_.StartWatching(event, this); |
491 sync_context()->DispatchMessages(); | 505 sync_context()->DispatchMessages(); |
492 } | 506 } |
493 | 507 |
494 } // namespace IPC | 508 } // namespace IPC |
OLD | NEW |