| OLD | NEW |
| 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
| 6 | 6 |
| 7 #include "base/lazy_instance.h" | 7 #include "base/lazy_instance.h" |
| 8 #include "base/logging.h" | 8 #include "base/logging.h" |
| 9 #include "base/message_loop.h" | 9 #include "base/message_loop.h" |
| 10 #include "base/threading/thread_local.h" | 10 #include "base/threading/thread_local.h" |
| 11 #include "base/synchronization/waitable_event.h" | 11 #include "base/synchronization/waitable_event.h" |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 61 task_pending_ = true; | 61 task_pending_ = true; |
| 62 | 62 |
| 63 // We set the event in case the listener thread is blocked (or is about | 63 // We set the event in case the listener thread is blocked (or is about |
| 64 // to). In case it's not, the PostTask dispatches the messages. | 64 // to). In case it's not, the PostTask dispatches the messages. |
| 65 message_queue_.push_back(QueuedMessage(new Message(msg), context)); | 65 message_queue_.push_back(QueuedMessage(new Message(msg), context)); |
| 66 } | 66 } |
| 67 | 67 |
| 68 dispatch_event_.Signal(); | 68 dispatch_event_.Signal(); |
| 69 if (!was_task_pending) { | 69 if (!was_task_pending) { |
| 70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( | 70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
| 71 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); | 71 this, |
| 72 &ReceivedSyncMsgQueue::DispatchMessagesTask, |
| 73 scoped_refptr<SyncContext>(context))); |
| 72 } | 74 } |
| 73 } | 75 } |
| 74 | 76 |
| 75 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 77 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
| 76 received_replies_.push_back(QueuedMessage(new Message(msg), context)); | 78 received_replies_.push_back(QueuedMessage(new Message(msg), context)); |
| 77 } | 79 } |
| 78 | 80 |
| 79 // Called on the listener's thread to process any queues synchronous | 81 // Called on the listener's thread to process any queues synchronous |
| 80 // messages. | 82 // messages. |
| 81 void DispatchMessagesTask() { | 83 void DispatchMessagesTask(SyncContext* context) { |
| 82 { | 84 { |
| 83 base::AutoLock auto_lock(message_lock_); | 85 base::AutoLock auto_lock(message_lock_); |
| 84 task_pending_ = false; | 86 task_pending_ = false; |
| 85 } | 87 } |
| 86 DispatchMessages(); | 88 context->DispatchMessages(); |
| 87 } | 89 } |
| 88 | 90 |
| 89 void DispatchMessages() { | 91 void DispatchMessages(SyncContext* dispatching_context) { |
| 92 SyncMessageQueue delayed_queue; |
| 90 while (true) { | 93 while (true) { |
| 91 Message* message; | 94 Message* message; |
| 92 scoped_refptr<SyncChannel::SyncContext> context; | 95 scoped_refptr<SyncChannel::SyncContext> context; |
| 93 { | 96 { |
| 94 base::AutoLock auto_lock(message_lock_); | 97 base::AutoLock auto_lock(message_lock_); |
| 95 if (message_queue_.empty()) | 98 if (message_queue_.empty()) { |
| 99 message_queue_ = delayed_queue; |
| 96 break; | 100 break; |
| 101 } |
| 97 | 102 |
| 98 message = message_queue_.front().message; | 103 message = message_queue_.front().message; |
| 99 context = message_queue_.front().context; | 104 context = message_queue_.front().context; |
| 100 message_queue_.pop_front(); | 105 message_queue_.pop_front(); |
| 101 } | 106 } |
| 102 | 107 if (context->restrict_dispatch() && context != dispatching_context) { |
| 103 context->OnDispatchMessage(*message); | 108 delayed_queue.push_back(QueuedMessage(message, context)); |
| 104 delete message; | 109 } else { |
| 110 context->OnDispatchMessage(*message); |
| 111 delete message; |
| 112 } |
| 105 } | 113 } |
| 106 } | 114 } |
| 107 | 115 |
| 108 // SyncChannel calls this in its destructor. | 116 // SyncChannel calls this in its destructor. |
| 109 void RemoveContext(SyncContext* context) { | 117 void RemoveContext(SyncContext* context) { |
| 110 base::AutoLock auto_lock(message_lock_); | 118 base::AutoLock auto_lock(message_lock_); |
| 111 | 119 |
| 112 SyncMessageQueue::iterator iter = message_queue_.begin(); | 120 SyncMessageQueue::iterator iter = message_queue_.begin(); |
| 113 while (iter != message_queue_.end()) { | 121 while (iter != message_queue_.end()) { |
| 114 if (iter->context == context) { | 122 if (iter->context == context) { |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 197 | 205 |
| 198 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 206 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| 199 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); | 207 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); |
| 200 | 208 |
| 201 SyncChannel::SyncContext::SyncContext( | 209 SyncChannel::SyncContext::SyncContext( |
| 202 Channel::Listener* listener, | 210 Channel::Listener* listener, |
| 203 MessageLoop* ipc_thread, | 211 MessageLoop* ipc_thread, |
| 204 WaitableEvent* shutdown_event) | 212 WaitableEvent* shutdown_event) |
| 205 : ChannelProxy::Context(listener, ipc_thread), | 213 : ChannelProxy::Context(listener, ipc_thread), |
| 206 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 214 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
| 207 shutdown_event_(shutdown_event) { | 215 shutdown_event_(shutdown_event), |
| 216 restrict_dispatch_(false) { |
| 208 } | 217 } |
| 209 | 218 |
| 210 SyncChannel::SyncContext::~SyncContext() { | 219 SyncChannel::SyncContext::~SyncContext() { |
| 211 while (!deserializers_.empty()) | 220 while (!deserializers_.empty()) |
| 212 Pop(); | 221 Pop(); |
| 213 } | 222 } |
| 214 | 223 |
| 215 // Adds information about an outgoing sync message to the context so that | 224 // Adds information about an outgoing sync message to the context so that |
| 216 // we know how to deserialize the reply. Returns a handle that's set when | 225 // we know how to deserialize the reply. Returns a handle that's set when |
| 217 // the reply has arrived. | 226 // the reply has arrived. |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 253 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 262 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| 254 base::AutoLock auto_lock(deserializers_lock_); | 263 base::AutoLock auto_lock(deserializers_lock_); |
| 255 return deserializers_.back().done_event; | 264 return deserializers_.back().done_event; |
| 256 } | 265 } |
| 257 | 266 |
| 258 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 267 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| 259 return received_sync_msgs_->dispatch_event(); | 268 return received_sync_msgs_->dispatch_event(); |
| 260 } | 269 } |
| 261 | 270 |
| 262 void SyncChannel::SyncContext::DispatchMessages() { | 271 void SyncChannel::SyncContext::DispatchMessages() { |
| 263 received_sync_msgs_->DispatchMessages(); | 272 received_sync_msgs_->DispatchMessages(this); |
| 264 } | 273 } |
| 265 | 274 |
| 266 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 275 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
| 267 base::AutoLock auto_lock(deserializers_lock_); | 276 base::AutoLock auto_lock(deserializers_lock_); |
| 268 if (deserializers_.empty() || | 277 if (deserializers_.empty() || |
| 269 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 278 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
| 270 return false; | 279 return false; |
| 271 } | 280 } |
| 272 | 281 |
| 273 if (!msg->is_reply_error()) { | 282 if (!msg->is_reply_error()) { |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 371 // message loop running under it or not, so we wouldn't know whether to | 380 // message loop running under it or not, so we wouldn't know whether to |
| 372 // stop or keep watching. So we always watch it, and create the event as | 381 // stop or keep watching. So we always watch it, and create the event as |
| 373 // manual reset since the object watcher might otherwise reset the event | 382 // manual reset since the object watcher might otherwise reset the event |
| 374 // when we're doing a WaitMany. | 383 // when we're doing a WaitMany. |
| 375 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); | 384 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
| 376 } | 385 } |
| 377 | 386 |
| 378 SyncChannel::~SyncChannel() { | 387 SyncChannel::~SyncChannel() { |
| 379 } | 388 } |
| 380 | 389 |
| 390 void SyncChannel::SetRestrictDispatchToSameChannel(bool value) { |
| 391 sync_context()->set_restrict_dispatch(value); |
| 392 } |
| 393 |
| 381 bool SyncChannel::Send(Message* message) { | 394 bool SyncChannel::Send(Message* message) { |
| 382 return SendWithTimeout(message, base::kNoTimeout); | 395 return SendWithTimeout(message, base::kNoTimeout); |
| 383 } | 396 } |
| 384 | 397 |
| 385 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { | 398 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { |
| 386 if (!message->is_sync()) { | 399 if (!message->is_sync()) { |
| 387 ChannelProxy::Send(message); | 400 ChannelProxy::Send(message); |
| 388 return true; | 401 return true; |
| 389 } | 402 } |
| 390 | 403 |
| (...skipping 24 matching lines...) Expand all Loading... |
| 415 | 428 |
| 416 // Wait for reply, or for any other incoming synchronous messages. | 429 // Wait for reply, or for any other incoming synchronous messages. |
| 417 // *this* might get deleted, so only call static functions at this point. | 430 // *this* might get deleted, so only call static functions at this point. |
| 418 WaitForReply(context, pump_messages_event); | 431 WaitForReply(context, pump_messages_event); |
| 419 | 432 |
| 420 return context->Pop(); | 433 return context->Pop(); |
| 421 } | 434 } |
| 422 | 435 |
| 423 void SyncChannel::WaitForReply( | 436 void SyncChannel::WaitForReply( |
| 424 SyncContext* context, WaitableEvent* pump_messages_event) { | 437 SyncContext* context, WaitableEvent* pump_messages_event) { |
| 438 context->DispatchMessages(); |
| 425 while (true) { | 439 while (true) { |
| 426 WaitableEvent* objects[] = { | 440 WaitableEvent* objects[] = { |
| 427 context->GetDispatchEvent(), | 441 context->GetDispatchEvent(), |
| 428 context->GetSendDoneEvent(), | 442 context->GetSendDoneEvent(), |
| 429 pump_messages_event | 443 pump_messages_event |
| 430 }; | 444 }; |
| 431 | 445 |
| 432 unsigned count = pump_messages_event ? 3: 2; | 446 unsigned count = pump_messages_event ? 3: 2; |
| 433 size_t result = WaitableEvent::WaitMany(objects, count); | 447 size_t result = WaitableEvent::WaitMany(objects, count); |
| 434 if (result == 0 /* dispatch event */) { | 448 if (result == 0 /* dispatch event */) { |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 485 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { | 499 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { |
| 486 DCHECK(event == sync_context()->GetDispatchEvent()); | 500 DCHECK(event == sync_context()->GetDispatchEvent()); |
| 487 // The call to DispatchMessages might delete this object, so reregister | 501 // The call to DispatchMessages might delete this object, so reregister |
| 488 // the object watcher first. | 502 // the object watcher first. |
| 489 event->Reset(); | 503 event->Reset(); |
| 490 dispatch_watcher_.StartWatching(event, this); | 504 dispatch_watcher_.StartWatching(event, this); |
| 491 sync_context()->DispatchMessages(); | 505 sync_context()->DispatchMessages(); |
| 492 } | 506 } |
| 493 | 507 |
| 494 } // namespace IPC | 508 } // namespace IPC |
| OLD | NEW |