| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/lazy_instance.h" | 8 #include "base/lazy_instance.h" |
| 9 #include "base/location.h" | 9 #include "base/location.h" |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| 11 #include "base/threading/thread_local.h" | |
| 12 #include "base/synchronization/waitable_event.h" | 11 #include "base/synchronization/waitable_event.h" |
| 13 #include "base/synchronization/waitable_event_watcher.h" | 12 #include "base/synchronization/waitable_event_watcher.h" |
| 13 #include "base/thread_task_runner_handle.h" |
| 14 #include "base/threading/thread_local.h" |
| 14 #include "ipc/ipc_sync_message.h" | 15 #include "ipc/ipc_sync_message.h" |
| 15 | 16 |
| 16 using base::TimeDelta; | 17 using base::TimeDelta; |
| 17 using base::TimeTicks; | 18 using base::TimeTicks; |
| 18 using base::WaitableEvent; | 19 using base::WaitableEvent; |
| 19 | 20 |
| 20 namespace IPC { | 21 namespace IPC { |
| 21 // When we're blocked in a Send(), we need to process incoming synchronous | 22 // When we're blocked in a Send(), we need to process incoming synchronous |
| 22 // messages right away because it could be blocking our reply (either | 23 // messages right away because it could be blocking our reply (either |
| 23 // directly from the same object we're calling, or indirectly through one or | 24 // directly from the same object we're calling, or indirectly through one or |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 62 task_pending_ = true; | 63 task_pending_ = true; |
| 63 | 64 |
| 64 // We set the event in case the listener thread is blocked (or is about | 65 // We set the event in case the listener thread is blocked (or is about |
| 65 // to). In case it's not, the PostTask dispatches the messages. | 66 // to). In case it's not, the PostTask dispatches the messages. |
| 66 message_queue_.push_back(QueuedMessage(new Message(msg), context)); | 67 message_queue_.push_back(QueuedMessage(new Message(msg), context)); |
| 67 message_queue_version_++; | 68 message_queue_version_++; |
| 68 } | 69 } |
| 69 | 70 |
| 70 dispatch_event_.Signal(); | 71 dispatch_event_.Signal(); |
| 71 if (!was_task_pending) { | 72 if (!was_task_pending) { |
| 72 listener_message_loop_->PostTask( | 73 listener_task_runner_->PostTask( |
| 73 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, | 74 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, |
| 74 this, scoped_refptr<SyncContext>(context))); | 75 this, scoped_refptr<SyncContext>(context))); |
| 75 } | 76 } |
| 76 } | 77 } |
| 77 | 78 |
| 78 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 79 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
| 79 received_replies_.push_back(QueuedMessage(new Message(msg), context)); | 80 received_replies_.push_back(QueuedMessage(new Message(msg), context)); |
| 80 } | 81 } |
| 81 | 82 |
| 82 // Called on the listener's thread to process any queues synchronous | 83 // Called on the listener's thread to process any queues synchronous |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 138 } | 139 } |
| 139 } | 140 } |
| 140 | 141 |
| 141 if (--listener_count_ == 0) { | 142 if (--listener_count_ == 0) { |
| 142 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 143 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
| 143 lazy_tls_ptr_.Pointer()->Set(NULL); | 144 lazy_tls_ptr_.Pointer()->Set(NULL); |
| 144 } | 145 } |
| 145 } | 146 } |
| 146 | 147 |
| 147 WaitableEvent* dispatch_event() { return &dispatch_event_; } | 148 WaitableEvent* dispatch_event() { return &dispatch_event_; } |
| 148 base::MessageLoopProxy* listener_message_loop() { | 149 base::SingleThreadTaskRunner* listener_task_runner() { |
| 149 return listener_message_loop_; | 150 return listener_task_runner_; |
| 150 } | 151 } |
| 151 | 152 |
| 152 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 153 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
| 153 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 154 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
| 154 lazy_tls_ptr_; | 155 lazy_tls_ptr_; |
| 155 | 156 |
| 156 // Called on the ipc thread to check if we can unblock any current Send() | 157 // Called on the ipc thread to check if we can unblock any current Send() |
| 157 // calls based on a queued reply. | 158 // calls based on a queued reply. |
| 158 void DispatchReplies() { | 159 void DispatchReplies() { |
| 159 for (size_t i = 0; i < received_replies_.size(); ++i) { | 160 for (size_t i = 0; i < received_replies_.size(); ++i) { |
| (...skipping 15 matching lines...) Expand all Loading... |
| 175 } | 176 } |
| 176 | 177 |
| 177 private: | 178 private: |
| 178 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 179 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
| 179 | 180 |
| 180 // See the comment in SyncChannel::SyncChannel for why this event is created | 181 // See the comment in SyncChannel::SyncChannel for why this event is created |
| 181 // as manual reset. | 182 // as manual reset. |
| 182 ReceivedSyncMsgQueue() : | 183 ReceivedSyncMsgQueue() : |
| 183 message_queue_version_(0), | 184 message_queue_version_(0), |
| 184 dispatch_event_(true, false), | 185 dispatch_event_(true, false), |
| 185 listener_message_loop_(base::MessageLoopProxy::current()), | 186 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 186 task_pending_(false), | 187 task_pending_(false), |
| 187 listener_count_(0), | 188 listener_count_(0), |
| 188 top_send_done_watcher_(NULL) { | 189 top_send_done_watcher_(NULL) { |
| 189 } | 190 } |
| 190 | 191 |
| 191 ~ReceivedSyncMsgQueue() {} | 192 ~ReceivedSyncMsgQueue() {} |
| 192 | 193 |
| 193 // Holds information about a queued synchronous message or reply. | 194 // Holds information about a queued synchronous message or reply. |
| 194 struct QueuedMessage { | 195 struct QueuedMessage { |
| 195 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 196 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
| 196 Message* message; | 197 Message* message; |
| 197 scoped_refptr<SyncChannel::SyncContext> context; | 198 scoped_refptr<SyncChannel::SyncContext> context; |
| 198 }; | 199 }; |
| 199 | 200 |
| 200 typedef std::list<QueuedMessage> SyncMessageQueue; | 201 typedef std::list<QueuedMessage> SyncMessageQueue; |
| 201 SyncMessageQueue message_queue_; | 202 SyncMessageQueue message_queue_; |
| 202 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan | 203 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan |
| 203 | 204 |
| 204 std::vector<QueuedMessage> received_replies_; | 205 std::vector<QueuedMessage> received_replies_; |
| 205 | 206 |
| 206 // Set when we got a synchronous message that we must respond to as the | 207 // Set when we got a synchronous message that we must respond to as the |
| 207 // sender needs its reply before it can reply to our original synchronous | 208 // sender needs its reply before it can reply to our original synchronous |
| 208 // message. | 209 // message. |
| 209 WaitableEvent dispatch_event_; | 210 WaitableEvent dispatch_event_; |
| 210 scoped_refptr<base::MessageLoopProxy> listener_message_loop_; | 211 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
| 211 base::Lock message_lock_; | 212 base::Lock message_lock_; |
| 212 bool task_pending_; | 213 bool task_pending_; |
| 213 int listener_count_; | 214 int listener_count_; |
| 214 | 215 |
| 215 // The current send done event watcher for this thread. Used to maintain | 216 // The current send done event watcher for this thread. Used to maintain |
| 216 // a local global stack of send done watchers to ensure that nested sync | 217 // a local global stack of send done watchers to ensure that nested sync |
| 217 // message loops complete correctly. | 218 // message loops complete correctly. |
| 218 base::WaitableEventWatcher* top_send_done_watcher_; | 219 base::WaitableEventWatcher* top_send_done_watcher_; |
| 219 }; | 220 }; |
| 220 | 221 |
| 221 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 222 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| 222 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 223 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
| 223 LAZY_INSTANCE_INITIALIZER; | 224 LAZY_INSTANCE_INITIALIZER; |
| 224 | 225 |
| 225 SyncChannel::SyncContext::SyncContext( | 226 SyncChannel::SyncContext::SyncContext( |
| 226 Listener* listener, | 227 Listener* listener, |
| 227 base::MessageLoopProxy* ipc_thread, | 228 base::SingleThreadTaskRunner* ipc_task_runner, |
| 228 WaitableEvent* shutdown_event) | 229 WaitableEvent* shutdown_event) |
| 229 : ChannelProxy::Context(listener, ipc_thread), | 230 : ChannelProxy::Context(listener, ipc_task_runner), |
| 230 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 231 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
| 231 shutdown_event_(shutdown_event), | 232 shutdown_event_(shutdown_event), |
| 232 restrict_dispatch_group_(kRestrictDispatchGroup_None) { | 233 restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
| 233 } | 234 } |
| 234 | 235 |
| 235 SyncChannel::SyncContext::~SyncContext() { | 236 SyncChannel::SyncContext::~SyncContext() { |
| 236 while (!deserializers_.empty()) | 237 while (!deserializers_.empty()) |
| 237 Pop(); | 238 Pop(); |
| 238 } | 239 } |
| 239 | 240 |
| (...skipping 26 matching lines...) Expand all Loading... |
| 266 msg.done_event = NULL; | 267 msg.done_event = NULL; |
| 267 deserializers_.pop_back(); | 268 deserializers_.pop_back(); |
| 268 result = msg.send_result; | 269 result = msg.send_result; |
| 269 } | 270 } |
| 270 | 271 |
| 271 // We got a reply to a synchronous Send() call that's blocking the listener | 272 // We got a reply to a synchronous Send() call that's blocking the listener |
| 272 // thread. However, further down the call stack there could be another | 273 // thread. However, further down the call stack there could be another |
| 273 // blocking Send() call, whose reply we received after we made this last | 274 // blocking Send() call, whose reply we received after we made this last |
| 274 // Send() call. So check if we have any queued replies available that | 275 // Send() call. So check if we have any queued replies available that |
| 275 // can now unblock the listener thread. | 276 // can now unblock the listener thread. |
| 276 ipc_message_loop()->PostTask( | 277 ipc_task_runner()->PostTask( |
| 277 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 278 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
| 278 received_sync_msgs_.get())); | 279 received_sync_msgs_.get())); |
| 279 | 280 |
| 280 return result; | 281 return result; |
| 281 } | 282 } |
| 282 | 283 |
| 283 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 284 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| 284 base::AutoLock auto_lock(deserializers_lock_); | 285 base::AutoLock auto_lock(deserializers_lock_); |
| 285 return deserializers_.back().done_event; | 286 return deserializers_.back().done_event; |
| 286 } | 287 } |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 381 DCHECK_EQ(GetSendDoneEvent(), event); | 382 DCHECK_EQ(GetSendDoneEvent(), event); |
| 382 MessageLoop::current()->QuitNow(); | 383 MessageLoop::current()->QuitNow(); |
| 383 } | 384 } |
| 384 } | 385 } |
| 385 | 386 |
| 386 | 387 |
| 387 SyncChannel::SyncChannel( | 388 SyncChannel::SyncChannel( |
| 388 const IPC::ChannelHandle& channel_handle, | 389 const IPC::ChannelHandle& channel_handle, |
| 389 Channel::Mode mode, | 390 Channel::Mode mode, |
| 390 Listener* listener, | 391 Listener* listener, |
| 391 base::MessageLoopProxy* ipc_message_loop, | 392 base::SingleThreadTaskRunner* ipc_task_runner, |
| 392 bool create_pipe_now, | 393 bool create_pipe_now, |
| 393 WaitableEvent* shutdown_event) | 394 WaitableEvent* shutdown_event) |
| 394 : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), | 395 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
| 395 sync_messages_with_no_timeout_allowed_(true) { | 396 sync_messages_with_no_timeout_allowed_(true) { |
| 396 ChannelProxy::Init(channel_handle, mode, create_pipe_now); | 397 ChannelProxy::Init(channel_handle, mode, create_pipe_now); |
| 397 StartWatching(); | 398 StartWatching(); |
| 398 } | 399 } |
| 399 | 400 |
| 400 SyncChannel::SyncChannel( | 401 SyncChannel::SyncChannel( |
| 401 Listener* listener, | 402 Listener* listener, |
| 402 base::MessageLoopProxy* ipc_message_loop, | 403 base::SingleThreadTaskRunner* ipc_task_runner, |
| 403 WaitableEvent* shutdown_event) | 404 WaitableEvent* shutdown_event) |
| 404 : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), | 405 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
| 405 sync_messages_with_no_timeout_allowed_(true) { | 406 sync_messages_with_no_timeout_allowed_(true) { |
| 406 StartWatching(); | 407 StartWatching(); |
| 407 } | 408 } |
| 408 | 409 |
| 409 SyncChannel::~SyncChannel() { | 410 SyncChannel::~SyncChannel() { |
| 410 } | 411 } |
| 411 | 412 |
| 412 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 413 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
| 413 sync_context()->set_restrict_dispatch_group(group); | 414 sync_context()->set_restrict_dispatch_group(group); |
| 414 } | 415 } |
| (...skipping 21 matching lines...) Expand all Loading... |
| 436 context->Push(sync_msg); | 437 context->Push(sync_msg); |
| 437 int message_id = SyncMessage::GetMessageId(*sync_msg); | 438 int message_id = SyncMessage::GetMessageId(*sync_msg); |
| 438 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); | 439 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); |
| 439 | 440 |
| 440 ChannelProxy::Send(message); | 441 ChannelProxy::Send(message); |
| 441 | 442 |
| 442 if (timeout_ms != base::kNoTimeout) { | 443 if (timeout_ms != base::kNoTimeout) { |
| 443 // We use the sync message id so that when a message times out, we don't | 444 // We use the sync message id so that when a message times out, we don't |
| 444 // confuse it with another send that is either above/below this Send in | 445 // confuse it with another send that is either above/below this Send in |
| 445 // the call stack. | 446 // the call stack. |
| 446 context->ipc_message_loop()->PostDelayedTask( | 447 context->ipc_task_runner()->PostDelayedTask( |
| 447 FROM_HERE, | 448 FROM_HERE, |
| 448 base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id), | 449 base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id), |
| 449 base::TimeDelta::FromMilliseconds(timeout_ms)); | 450 base::TimeDelta::FromMilliseconds(timeout_ms)); |
| 450 } | 451 } |
| 451 | 452 |
| 452 // Wait for reply, or for any other incoming synchronous messages. | 453 // Wait for reply, or for any other incoming synchronous messages. |
| 453 // *this* might get deleted, so only call static functions at this point. | 454 // *this* might get deleted, so only call static functions at this point. |
| 454 WaitForReply(context, pump_messages_event); | 455 WaitForReply(context, pump_messages_event); |
| 455 | 456 |
| 456 return context->Pop(); | 457 return context->Pop(); |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 532 // Ideally we only want to watch this object when running a nested message | 533 // Ideally we only want to watch this object when running a nested message |
| 533 // loop. However, we don't know when it exits if there's another nested | 534 // loop. However, we don't know when it exits if there's another nested |
| 534 // message loop running under it or not, so we wouldn't know whether to | 535 // message loop running under it or not, so we wouldn't know whether to |
| 535 // stop or keep watching. So we always watch it, and create the event as | 536 // stop or keep watching. So we always watch it, and create the event as |
| 536 // manual reset since the object watcher might otherwise reset the event | 537 // manual reset since the object watcher might otherwise reset the event |
| 537 // when we're doing a WaitMany. | 538 // when we're doing a WaitMany. |
| 538 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); | 539 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
| 539 } | 540 } |
| 540 | 541 |
| 541 } // namespace IPC | 542 } // namespace IPC |
| OLD | NEW |