| OLD | NEW |
| 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
| 6 | 6 |
| 7 #include "base/lazy_instance.h" | 7 #include "base/lazy_instance.h" |
| 8 #include "base/logging.h" | 8 #include "base/logging.h" |
| 9 #include "base/message_loop.h" | 9 #include "base/message_loop.h" |
| 10 #include "base/threading/thread_local.h" | 10 #include "base/threading/thread_local.h" |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 48 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); | 48 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); |
| 49 } | 49 } |
| 50 rv->listener_count_++; | 50 rv->listener_count_++; |
| 51 return rv; | 51 return rv; |
| 52 } | 52 } |
| 53 | 53 |
| 54 // Called on IPC thread when a synchronous message or reply arrives. | 54 // Called on IPC thread when a synchronous message or reply arrives. |
| 55 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { | 55 void QueueMessage(const Message& msg, SyncChannel::SyncContext* context) { |
| 56 bool was_task_pending; | 56 bool was_task_pending; |
| 57 { | 57 { |
| 58 AutoLock auto_lock(message_lock_); | 58 base::AutoLock auto_lock(message_lock_); |
| 59 | 59 |
| 60 was_task_pending = task_pending_; | 60 was_task_pending = task_pending_; |
| 61 task_pending_ = true; | 61 task_pending_ = true; |
| 62 | 62 |
| 63 // We set the event in case the listener thread is blocked (or is about | 63 // We set the event in case the listener thread is blocked (or is about |
| 64 // to). In case it's not, the PostTask dispatches the messages. | 64 // to). In case it's not, the PostTask dispatches the messages. |
| 65 message_queue_.push_back(QueuedMessage(new Message(msg), context)); | 65 message_queue_.push_back(QueuedMessage(new Message(msg), context)); |
| 66 } | 66 } |
| 67 | 67 |
| 68 dispatch_event_.Signal(); | 68 dispatch_event_.Signal(); |
| 69 if (!was_task_pending) { | 69 if (!was_task_pending) { |
| 70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( | 70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
| 71 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); | 71 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); |
| 72 } | 72 } |
| 73 } | 73 } |
| 74 | 74 |
| 75 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 75 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
| 76 received_replies_.push_back(QueuedMessage(new Message(msg), context)); | 76 received_replies_.push_back(QueuedMessage(new Message(msg), context)); |
| 77 } | 77 } |
| 78 | 78 |
| 79 // Called on the listener's thread to process any queues synchronous | 79 // Called on the listener's thread to process any queues synchronous |
| 80 // messages. | 80 // messages. |
| 81 void DispatchMessagesTask() { | 81 void DispatchMessagesTask() { |
| 82 { | 82 { |
| 83 AutoLock auto_lock(message_lock_); | 83 base::AutoLock auto_lock(message_lock_); |
| 84 task_pending_ = false; | 84 task_pending_ = false; |
| 85 } | 85 } |
| 86 DispatchMessages(); | 86 DispatchMessages(); |
| 87 } | 87 } |
| 88 | 88 |
| 89 void DispatchMessages() { | 89 void DispatchMessages() { |
| 90 while (true) { | 90 while (true) { |
| 91 Message* message; | 91 Message* message; |
| 92 scoped_refptr<SyncChannel::SyncContext> context; | 92 scoped_refptr<SyncChannel::SyncContext> context; |
| 93 { | 93 { |
| 94 AutoLock auto_lock(message_lock_); | 94 base::AutoLock auto_lock(message_lock_); |
| 95 if (message_queue_.empty()) | 95 if (message_queue_.empty()) |
| 96 break; | 96 break; |
| 97 | 97 |
| 98 message = message_queue_.front().message; | 98 message = message_queue_.front().message; |
| 99 context = message_queue_.front().context; | 99 context = message_queue_.front().context; |
| 100 message_queue_.pop_front(); | 100 message_queue_.pop_front(); |
| 101 } | 101 } |
| 102 | 102 |
| 103 context->OnDispatchMessage(*message); | 103 context->OnDispatchMessage(*message); |
| 104 delete message; | 104 delete message; |
| 105 } | 105 } |
| 106 } | 106 } |
| 107 | 107 |
| 108 // SyncChannel calls this in its destructor. | 108 // SyncChannel calls this in its destructor. |
| 109 void RemoveContext(SyncContext* context) { | 109 void RemoveContext(SyncContext* context) { |
| 110 AutoLock auto_lock(message_lock_); | 110 base::AutoLock auto_lock(message_lock_); |
| 111 | 111 |
| 112 SyncMessageQueue::iterator iter = message_queue_.begin(); | 112 SyncMessageQueue::iterator iter = message_queue_.begin(); |
| 113 while (iter != message_queue_.end()) { | 113 while (iter != message_queue_.end()) { |
| 114 if (iter->context == context) { | 114 if (iter->context == context) { |
| 115 delete iter->message; | 115 delete iter->message; |
| 116 iter = message_queue_.erase(iter); | 116 iter = message_queue_.erase(iter); |
| 117 } else { | 117 } else { |
| 118 iter++; | 118 iter++; |
| 119 } | 119 } |
| 120 } | 120 } |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 178 typedef std::deque<QueuedMessage> SyncMessageQueue; | 178 typedef std::deque<QueuedMessage> SyncMessageQueue; |
| 179 SyncMessageQueue message_queue_; | 179 SyncMessageQueue message_queue_; |
| 180 | 180 |
| 181 std::vector<QueuedMessage> received_replies_; | 181 std::vector<QueuedMessage> received_replies_; |
| 182 | 182 |
| 183 // Set when we got a synchronous message that we must respond to as the | 183 // Set when we got a synchronous message that we must respond to as the |
| 184 // sender needs its reply before it can reply to our original synchronous | 184 // sender needs its reply before it can reply to our original synchronous |
| 185 // message. | 185 // message. |
| 186 WaitableEvent dispatch_event_; | 186 WaitableEvent dispatch_event_; |
| 187 MessageLoop* listener_message_loop_; | 187 MessageLoop* listener_message_loop_; |
| 188 Lock message_lock_; | 188 base::Lock message_lock_; |
| 189 bool task_pending_; | 189 bool task_pending_; |
| 190 int listener_count_; | 190 int listener_count_; |
| 191 | 191 |
| 192 // The current send done event watcher for this thread. Used to maintain | 192 // The current send done event watcher for this thread. Used to maintain |
| 193 // a local global stack of send done watchers to ensure that nested sync | 193 // a local global stack of send done watchers to ensure that nested sync |
| 194 // message loops complete correctly. | 194 // message loops complete correctly. |
| 195 base::WaitableEventWatcher* top_send_done_watcher_; | 195 base::WaitableEventWatcher* top_send_done_watcher_; |
| 196 }; | 196 }; |
| 197 | 197 |
| 198 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 198 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| (...skipping 17 matching lines...) Expand all Loading... |
| 216 // we know how to deserialize the reply. Returns a handle that's set when | 216 // we know how to deserialize the reply. Returns a handle that's set when |
| 217 // the reply has arrived. | 217 // the reply has arrived. |
| 218 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { | 218 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
| 219 // The event is created as manual reset because in between Signal and | 219 // The event is created as manual reset because in between Signal and |
| 220 // OnObjectSignalled, another Send can happen which would stop the watcher | 220 // OnObjectSignalled, another Send can happen which would stop the watcher |
| 221 // from being called. The event would get watched later, when the nested | 221 // from being called. The event would get watched later, when the nested |
| 222 // Send completes, so the event will need to remain set. | 222 // Send completes, so the event will need to remain set. |
| 223 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), | 223 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), |
| 224 sync_msg->GetReplyDeserializer(), | 224 sync_msg->GetReplyDeserializer(), |
| 225 new WaitableEvent(true, false)); | 225 new WaitableEvent(true, false)); |
| 226 AutoLock auto_lock(deserializers_lock_); | 226 base::AutoLock auto_lock(deserializers_lock_); |
| 227 deserializers_.push_back(pending); | 227 deserializers_.push_back(pending); |
| 228 } | 228 } |
| 229 | 229 |
| 230 bool SyncChannel::SyncContext::Pop() { | 230 bool SyncChannel::SyncContext::Pop() { |
| 231 bool result; | 231 bool result; |
| 232 { | 232 { |
| 233 AutoLock auto_lock(deserializers_lock_); | 233 base::AutoLock auto_lock(deserializers_lock_); |
| 234 PendingSyncMsg msg = deserializers_.back(); | 234 PendingSyncMsg msg = deserializers_.back(); |
| 235 delete msg.deserializer; | 235 delete msg.deserializer; |
| 236 delete msg.done_event; | 236 delete msg.done_event; |
| 237 msg.done_event = NULL; | 237 msg.done_event = NULL; |
| 238 deserializers_.pop_back(); | 238 deserializers_.pop_back(); |
| 239 result = msg.send_result; | 239 result = msg.send_result; |
| 240 } | 240 } |
| 241 | 241 |
| 242 // We got a reply to a synchronous Send() call that's blocking the listener | 242 // We got a reply to a synchronous Send() call that's blocking the listener |
| 243 // thread. However, further down the call stack there could be another | 243 // thread. However, further down the call stack there could be another |
| 244 // blocking Send() call, whose reply we received after we made this last | 244 // blocking Send() call, whose reply we received after we made this last |
| 245 // Send() call. So check if we have any queued replies available that | 245 // Send() call. So check if we have any queued replies available that |
| 246 // can now unblock the listener thread. | 246 // can now unblock the listener thread. |
| 247 ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 247 ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| 248 received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies)); | 248 received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies)); |
| 249 | 249 |
| 250 return result; | 250 return result; |
| 251 } | 251 } |
| 252 | 252 |
| 253 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { | 253 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
| 254 AutoLock auto_lock(deserializers_lock_); | 254 base::AutoLock auto_lock(deserializers_lock_); |
| 255 return deserializers_.back().done_event; | 255 return deserializers_.back().done_event; |
| 256 } | 256 } |
| 257 | 257 |
| 258 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { | 258 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
| 259 return received_sync_msgs_->dispatch_event(); | 259 return received_sync_msgs_->dispatch_event(); |
| 260 } | 260 } |
| 261 | 261 |
| 262 void SyncChannel::SyncContext::DispatchMessages() { | 262 void SyncChannel::SyncContext::DispatchMessages() { |
| 263 received_sync_msgs_->DispatchMessages(); | 263 received_sync_msgs_->DispatchMessages(); |
| 264 } | 264 } |
| 265 | 265 |
| 266 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 266 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
| 267 AutoLock auto_lock(deserializers_lock_); | 267 base::AutoLock auto_lock(deserializers_lock_); |
| 268 if (deserializers_.empty() || | 268 if (deserializers_.empty() || |
| 269 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 269 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
| 270 return false; | 270 return false; |
| 271 } | 271 } |
| 272 | 272 |
| 273 if (!msg->is_reply_error()) { | 273 if (!msg->is_reply_error()) { |
| 274 deserializers_.back().send_result = deserializers_.back().deserializer-> | 274 deserializers_.back().send_result = deserializers_.back().deserializer-> |
| 275 SerializeOutputParameters(*msg); | 275 SerializeOutputParameters(*msg); |
| 276 } | 276 } |
| 277 deserializers_.back().done_event->Signal(); | 277 deserializers_.back().done_event->Signal(); |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 317 Context::OnChannelOpened(); | 317 Context::OnChannelOpened(); |
| 318 } | 318 } |
| 319 | 319 |
| 320 void SyncChannel::SyncContext::OnChannelClosed() { | 320 void SyncChannel::SyncContext::OnChannelClosed() { |
| 321 CancelPendingSends(); | 321 CancelPendingSends(); |
| 322 shutdown_watcher_.StopWatching(); | 322 shutdown_watcher_.StopWatching(); |
| 323 Context::OnChannelClosed(); | 323 Context::OnChannelClosed(); |
| 324 } | 324 } |
| 325 | 325 |
| 326 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { | 326 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { |
| 327 AutoLock auto_lock(deserializers_lock_); | 327 base::AutoLock auto_lock(deserializers_lock_); |
| 328 PendingSyncMessageQueue::iterator iter; | 328 PendingSyncMessageQueue::iterator iter; |
| 329 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { | 329 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
| 330 if (iter->id == message_id) { | 330 if (iter->id == message_id) { |
| 331 iter->done_event->Signal(); | 331 iter->done_event->Signal(); |
| 332 break; | 332 break; |
| 333 } | 333 } |
| 334 } | 334 } |
| 335 } | 335 } |
| 336 | 336 |
| 337 void SyncChannel::SyncContext::CancelPendingSends() { | 337 void SyncChannel::SyncContext::CancelPendingSends() { |
| 338 AutoLock auto_lock(deserializers_lock_); | 338 base::AutoLock auto_lock(deserializers_lock_); |
| 339 PendingSyncMessageQueue::iterator iter; | 339 PendingSyncMessageQueue::iterator iter; |
| 340 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) | 340 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) |
| 341 iter->done_event->Signal(); | 341 iter->done_event->Signal(); |
| 342 } | 342 } |
| 343 | 343 |
| 344 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { | 344 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { |
| 345 if (event == shutdown_event_) { | 345 if (event == shutdown_event_) { |
| 346 // Process shut down before we can get a reply to a synchronous message. | 346 // Process shut down before we can get a reply to a synchronous message. |
| 347 // Cancel pending Send calls, which will end up setting the send done event. | 347 // Cancel pending Send calls, which will end up setting the send done event. |
| 348 CancelPendingSends(); | 348 CancelPendingSends(); |
| (...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 485 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { | 485 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { |
| 486 DCHECK(event == sync_context()->GetDispatchEvent()); | 486 DCHECK(event == sync_context()->GetDispatchEvent()); |
| 487 // The call to DispatchMessages might delete this object, so reregister | 487 // The call to DispatchMessages might delete this object, so reregister |
| 488 // the object watcher first. | 488 // the object watcher first. |
| 489 event->Reset(); | 489 event->Reset(); |
| 490 dispatch_watcher_.StartWatching(event, this); | 490 dispatch_watcher_.StartWatching(event, this); |
| 491 sync_context()->DispatchMessages(); | 491 sync_context()->DispatchMessages(); |
| 492 } | 492 } |
| 493 | 493 |
| 494 } // namespace IPC | 494 } // namespace IPC |
| OLD | NEW |