OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/lazy_instance.h" | 8 #include "base/lazy_instance.h" |
9 #include "base/location.h" | 9 #include "base/location.h" |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
11 #include "base/threading/thread_local.h" | |
12 #include "base/synchronization/waitable_event.h" | 11 #include "base/synchronization/waitable_event.h" |
13 #include "base/synchronization/waitable_event_watcher.h" | 12 #include "base/synchronization/waitable_event_watcher.h" |
| 13 #include "base/thread_task_runner_handle.h" |
| 14 #include "base/threading/thread_local.h" |
14 #include "ipc/ipc_sync_message.h" | 15 #include "ipc/ipc_sync_message.h" |
15 | 16 |
16 using base::TimeDelta; | 17 using base::TimeDelta; |
17 using base::TimeTicks; | 18 using base::TimeTicks; |
18 using base::WaitableEvent; | 19 using base::WaitableEvent; |
19 | 20 |
20 namespace IPC { | 21 namespace IPC { |
21 // When we're blocked in a Send(), we need to process incoming synchronous | 22 // When we're blocked in a Send(), we need to process incoming synchronous |
22 // messages right away because it could be blocking our reply (either | 23 // messages right away because it could be blocking our reply (either |
23 // directly from the same object we're calling, or indirectly through one or | 24 // directly from the same object we're calling, or indirectly through one or |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
62 task_pending_ = true; | 63 task_pending_ = true; |
63 | 64 |
64 // We set the event in case the listener thread is blocked (or is about | 65 // We set the event in case the listener thread is blocked (or is about |
65 // to). In case it's not, the PostTask dispatches the messages. | 66 // to). In case it's not, the PostTask dispatches the messages. |
66 message_queue_.push_back(QueuedMessage(new Message(msg), context)); | 67 message_queue_.push_back(QueuedMessage(new Message(msg), context)); |
67 message_queue_version_++; | 68 message_queue_version_++; |
68 } | 69 } |
69 | 70 |
70 dispatch_event_.Signal(); | 71 dispatch_event_.Signal(); |
71 if (!was_task_pending) { | 72 if (!was_task_pending) { |
72 listener_message_loop_->PostTask( | 73 listener_task_runner_->PostTask( |
73 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, | 74 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, |
74 this, scoped_refptr<SyncContext>(context))); | 75 this, scoped_refptr<SyncContext>(context))); |
75 } | 76 } |
76 } | 77 } |
77 | 78 |
78 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 79 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
79 received_replies_.push_back(QueuedMessage(new Message(msg), context)); | 80 received_replies_.push_back(QueuedMessage(new Message(msg), context)); |
80 } | 81 } |
81 | 82 |
82 // Called on the listener's thread to process any queues synchronous | 83 // Called on the listener's thread to process any queues synchronous |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
138 } | 139 } |
139 } | 140 } |
140 | 141 |
141 if (--listener_count_ == 0) { | 142 if (--listener_count_ == 0) { |
142 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 143 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
143 lazy_tls_ptr_.Pointer()->Set(NULL); | 144 lazy_tls_ptr_.Pointer()->Set(NULL); |
144 } | 145 } |
145 } | 146 } |
146 | 147 |
147 WaitableEvent* dispatch_event() { return &dispatch_event_; } | 148 WaitableEvent* dispatch_event() { return &dispatch_event_; } |
148 base::MessageLoopProxy* listener_message_loop() { | 149 base::SingleThreadTaskRunner* listener_task_runner() { |
149 return listener_message_loop_; | 150 return listener_task_runner_; |
150 } | 151 } |
151 | 152 |
152 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 153 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
153 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 154 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
154 lazy_tls_ptr_; | 155 lazy_tls_ptr_; |
155 | 156 |
156 // Called on the ipc thread to check if we can unblock any current Send() | 157 // Called on the ipc thread to check if we can unblock any current Send() |
157 // calls based on a queued reply. | 158 // calls based on a queued reply. |
158 void DispatchReplies() { | 159 void DispatchReplies() { |
159 for (size_t i = 0; i < received_replies_.size(); ++i) { | 160 for (size_t i = 0; i < received_replies_.size(); ++i) { |
(...skipping 15 matching lines...) Expand all Loading... |
175 } | 176 } |
176 | 177 |
177 private: | 178 private: |
178 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 179 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
179 | 180 |
180 // See the comment in SyncChannel::SyncChannel for why this event is created | 181 // See the comment in SyncChannel::SyncChannel for why this event is created |
181 // as manual reset. | 182 // as manual reset. |
182 ReceivedSyncMsgQueue() : | 183 ReceivedSyncMsgQueue() : |
183 message_queue_version_(0), | 184 message_queue_version_(0), |
184 dispatch_event_(true, false), | 185 dispatch_event_(true, false), |
185 listener_message_loop_(base::MessageLoopProxy::current()), | 186 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
186 task_pending_(false), | 187 task_pending_(false), |
187 listener_count_(0), | 188 listener_count_(0), |
188 top_send_done_watcher_(NULL) { | 189 top_send_done_watcher_(NULL) { |
189 } | 190 } |
190 | 191 |
191 ~ReceivedSyncMsgQueue() {} | 192 ~ReceivedSyncMsgQueue() {} |
192 | 193 |
193 // Holds information about a queued synchronous message or reply. | 194 // Holds information about a queued synchronous message or reply. |
194 struct QueuedMessage { | 195 struct QueuedMessage { |
195 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 196 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
196 Message* message; | 197 Message* message; |
197 scoped_refptr<SyncChannel::SyncContext> context; | 198 scoped_refptr<SyncChannel::SyncContext> context; |
198 }; | 199 }; |
199 | 200 |
200 typedef std::list<QueuedMessage> SyncMessageQueue; | 201 typedef std::list<QueuedMessage> SyncMessageQueue; |
201 SyncMessageQueue message_queue_; | 202 SyncMessageQueue message_queue_; |
202 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan | 203 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan |
203 | 204 |
204 std::vector<QueuedMessage> received_replies_; | 205 std::vector<QueuedMessage> received_replies_; |
205 | 206 |
206 // Set when we got a synchronous message that we must respond to as the | 207 // Set when we got a synchronous message that we must respond to as the |
207 // sender needs its reply before it can reply to our original synchronous | 208 // sender needs its reply before it can reply to our original synchronous |
208 // message. | 209 // message. |
209 WaitableEvent dispatch_event_; | 210 WaitableEvent dispatch_event_; |
210 scoped_refptr<base::MessageLoopProxy> listener_message_loop_; | 211 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_; |
211 base::Lock message_lock_; | 212 base::Lock message_lock_; |
212 bool task_pending_; | 213 bool task_pending_; |
213 int listener_count_; | 214 int listener_count_; |
214 | 215 |
215 // The current send done event watcher for this thread. Used to maintain | 216 // The current send done event watcher for this thread. Used to maintain |
216 // a local global stack of send done watchers to ensure that nested sync | 217 // a local global stack of send done watchers to ensure that nested sync |
217 // message loops complete correctly. | 218 // message loops complete correctly. |
218 base::WaitableEventWatcher* top_send_done_watcher_; | 219 base::WaitableEventWatcher* top_send_done_watcher_; |
219 }; | 220 }; |
220 | 221 |
221 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 222 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
222 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = | 223 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = |
223 LAZY_INSTANCE_INITIALIZER; | 224 LAZY_INSTANCE_INITIALIZER; |
224 | 225 |
225 SyncChannel::SyncContext::SyncContext( | 226 SyncChannel::SyncContext::SyncContext( |
226 Listener* listener, | 227 Listener* listener, |
227 base::MessageLoopProxy* ipc_thread, | 228 base::SingleThreadTaskRunner* ipc_task_runner, |
228 WaitableEvent* shutdown_event) | 229 WaitableEvent* shutdown_event) |
229 : ChannelProxy::Context(listener, ipc_thread), | 230 : ChannelProxy::Context(listener, ipc_task_runner), |
230 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), | 231 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
231 shutdown_event_(shutdown_event), | 232 shutdown_event_(shutdown_event), |
232 restrict_dispatch_group_(kRestrictDispatchGroup_None) { | 233 restrict_dispatch_group_(kRestrictDispatchGroup_None) { |
233 } | 234 } |
234 | 235 |
235 SyncChannel::SyncContext::~SyncContext() { | 236 SyncChannel::SyncContext::~SyncContext() { |
236 while (!deserializers_.empty()) | 237 while (!deserializers_.empty()) |
237 Pop(); | 238 Pop(); |
238 } | 239 } |
239 | 240 |
(...skipping 26 matching lines...) Expand all Loading... |
266 msg.done_event = NULL; | 267 msg.done_event = NULL; |
267 deserializers_.pop_back(); | 268 deserializers_.pop_back(); |
268 result = msg.send_result; | 269 result = msg.send_result; |
269 } | 270 } |
270 | 271 |
271 // We got a reply to a synchronous Send() call that's blocking the listener | 272 // We got a reply to a synchronous Send() call that's blocking the listener |
272 // thread. However, further down the call stack there could be another | 273 // thread. However, further down the call stack there could be another |
273 // blocking Send() call, whose reply we received after we made this last | 274 // blocking Send() call, whose reply we received after we made this last |
274 // Send() call. So check if we have any queued replies available that | 275 // Send() call. So check if we have any queued replies available that |
275 // can now unblock the listener thread. | 276 // can now unblock the listener thread. |
276 ipc_message_loop()->PostTask( | 277 ipc_task_runner()->PostTask( |
277 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, | 278 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, |
278 received_sync_msgs_.get())); | 279 received_sync_msgs_.get())); |
279 | 280 |
280 return result; | 281 return result; |
281 } | 282 } |
282 | 283 |
283 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 284 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
284 base::AutoLock auto_lock(deserializers_lock_); | 285 base::AutoLock auto_lock(deserializers_lock_); |
285 return deserializers_.back().done_event; | 286 return deserializers_.back().done_event; |
286 } | 287 } |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
381 DCHECK_EQ(GetSendDoneEvent(), event); | 382 DCHECK_EQ(GetSendDoneEvent(), event); |
382 MessageLoop::current()->QuitNow(); | 383 MessageLoop::current()->QuitNow(); |
383 } | 384 } |
384 } | 385 } |
385 | 386 |
386 | 387 |
387 SyncChannel::SyncChannel( | 388 SyncChannel::SyncChannel( |
388 const IPC::ChannelHandle& channel_handle, | 389 const IPC::ChannelHandle& channel_handle, |
389 Channel::Mode mode, | 390 Channel::Mode mode, |
390 Listener* listener, | 391 Listener* listener, |
391 base::MessageLoopProxy* ipc_message_loop, | 392 base::SingleThreadTaskRunner* ipc_task_runner, |
392 bool create_pipe_now, | 393 bool create_pipe_now, |
393 WaitableEvent* shutdown_event) | 394 WaitableEvent* shutdown_event) |
394 : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), | 395 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
395 sync_messages_with_no_timeout_allowed_(true) { | 396 sync_messages_with_no_timeout_allowed_(true) { |
396 ChannelProxy::Init(channel_handle, mode, create_pipe_now); | 397 ChannelProxy::Init(channel_handle, mode, create_pipe_now); |
397 StartWatching(); | 398 StartWatching(); |
398 } | 399 } |
399 | 400 |
400 SyncChannel::SyncChannel( | 401 SyncChannel::SyncChannel( |
401 Listener* listener, | 402 Listener* listener, |
402 base::MessageLoopProxy* ipc_message_loop, | 403 base::SingleThreadTaskRunner* ipc_task_runner, |
403 WaitableEvent* shutdown_event) | 404 WaitableEvent* shutdown_event) |
404 : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), | 405 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), |
405 sync_messages_with_no_timeout_allowed_(true) { | 406 sync_messages_with_no_timeout_allowed_(true) { |
406 StartWatching(); | 407 StartWatching(); |
407 } | 408 } |
408 | 409 |
409 SyncChannel::~SyncChannel() { | 410 SyncChannel::~SyncChannel() { |
410 } | 411 } |
411 | 412 |
412 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { | 413 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { |
413 sync_context()->set_restrict_dispatch_group(group); | 414 sync_context()->set_restrict_dispatch_group(group); |
414 } | 415 } |
(...skipping 21 matching lines...) Expand all Loading... |
436 context->Push(sync_msg); | 437 context->Push(sync_msg); |
437 int message_id = SyncMessage::GetMessageId(*sync_msg); | 438 int message_id = SyncMessage::GetMessageId(*sync_msg); |
438 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); | 439 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); |
439 | 440 |
440 ChannelProxy::Send(message); | 441 ChannelProxy::Send(message); |
441 | 442 |
442 if (timeout_ms != base::kNoTimeout) { | 443 if (timeout_ms != base::kNoTimeout) { |
443 // We use the sync message id so that when a message times out, we don't | 444 // We use the sync message id so that when a message times out, we don't |
444 // confuse it with another send that is either above/below this Send in | 445 // confuse it with another send that is either above/below this Send in |
445 // the call stack. | 446 // the call stack. |
446 context->ipc_message_loop()->PostDelayedTask( | 447 context->ipc_task_runner()->PostDelayedTask( |
447 FROM_HERE, | 448 FROM_HERE, |
448 base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id), | 449 base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id), |
449 base::TimeDelta::FromMilliseconds(timeout_ms)); | 450 base::TimeDelta::FromMilliseconds(timeout_ms)); |
450 } | 451 } |
451 | 452 |
452 // Wait for reply, or for any other incoming synchronous messages. | 453 // Wait for reply, or for any other incoming synchronous messages. |
453 // *this* might get deleted, so only call static functions at this point. | 454 // *this* might get deleted, so only call static functions at this point. |
454 WaitForReply(context, pump_messages_event); | 455 WaitForReply(context, pump_messages_event); |
455 | 456 |
456 return context->Pop(); | 457 return context->Pop(); |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
532 // Ideally we only want to watch this object when running a nested message | 533 // Ideally we only want to watch this object when running a nested message |
533 // loop. However, we don't know when it exits if there's another nested | 534 // loop. However, we don't know when it exits if there's another nested |
534 // message loop running under it or not, so we wouldn't know whether to | 535 // message loop running under it or not, so we wouldn't know whether to |
535 // stop or keep watching. So we always watch it, and create the event as | 536 // stop or keep watching. So we always watch it, and create the event as |
536 // manual reset since the object watcher might otherwise reset the event | 537 // manual reset since the object watcher might otherwise reset the event |
537 // when we're doing a WaitMany. | 538 // when we're doing a WaitMany. |
538 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); | 539 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
539 } | 540 } |
540 | 541 |
541 } // namespace IPC | 542 } // namespace IPC |
OLD | NEW |