| OLD | NEW |
| 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include <windows.h> | 5 #include <windows.h> |
| 6 | 6 |
| 7 #include "chrome/common/ipc_sync_channel.h" | 7 #include "chrome/common/ipc_sync_channel.h" |
| 8 | 8 |
| 9 #include "base/lazy_instance.h" | 9 #include "base/lazy_instance.h" |
| 10 #include "base/logging.h" | 10 #include "base/logging.h" |
| (...skipping 19 matching lines...) Expand all Loading... |
| 30 // we queue a task on the listener thread to dispatch the received messages. | 30 // we queue a task on the listener thread to dispatch the received messages. |
| 31 // The messages are stored in this queue object that's shared among all | 31 // The messages are stored in this queue object that's shared among all |
| 32 // SyncChannel objects on the same thread (since one object can receive a | 32 // SyncChannel objects on the same thread (since one object can receive a |
| 33 // sync message while another one is blocked). | 33 // sync message while another one is blocked). |
| 34 | 34 |
| 35 class SyncChannel::ReceivedSyncMsgQueue; | 35 class SyncChannel::ReceivedSyncMsgQueue; |
| 36 | 36 |
| 37 class SyncChannel::ReceivedSyncMsgQueue : | 37 class SyncChannel::ReceivedSyncMsgQueue : |
| 38 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { | 38 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { |
| 39 public: | 39 public: |
| 40 ReceivedSyncMsgQueue() : | 40 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one |
| 41 blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)), | 41 // if necessary. Call RemoveListener on the same thread when done. |
| 42 task_pending_(false), | 42 static ReceivedSyncMsgQueue* AddListener() { |
| 43 listener_message_loop_(MessageLoop::current()) { | 43 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple |
| 44 // SyncChannel objects can block the same thread). |
| 45 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); |
| 46 if (!rv) { |
| 47 rv = new ReceivedSyncMsgQueue(); |
| 48 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); |
| 49 } |
| 50 rv->listener_count_++; |
| 51 return rv; |
| 44 } | 52 } |
| 45 | 53 |
| 46 ~ReceivedSyncMsgQueue() { | 54 ~ReceivedSyncMsgQueue() { |
| 47 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | |
| 48 DCHECK(MessageLoop::current() == listener_message_loop_); | |
| 49 CloseHandle(blocking_event_); | |
| 50 lazy_tls_ptr_.Pointer()->Set(NULL); | |
| 51 } | 55 } |
| 52 | 56 |
| 53 // Called on IPC thread when a synchronous message or reply arrives. | 57 // Called on IPC thread when a synchronous message or reply arrives. |
| 54 void QueueMessage(const Message& msg, Channel::Listener* listener, | 58 void QueueMessage(const Message& msg, Channel::Listener* listener, |
| 55 const std::wstring& channel_id) { | 59 const std::wstring& channel_id) { |
| 56 bool was_task_pending; | 60 bool was_task_pending; |
| 57 { | 61 { |
| 58 AutoLock auto_lock(message_lock_); | 62 AutoLock auto_lock(message_lock_); |
| 59 | 63 |
| 60 was_task_pending = task_pending_; | 64 was_task_pending = task_pending_; |
| 61 task_pending_ = true; | 65 task_pending_ = true; |
| 62 | 66 |
| 63 // We set the event in case the listener thread is blocked (or is about | 67 // We set the event in case the listener thread is blocked (or is about |
| 64 // to). In case it's not, the PostTask dispatches the messages. | 68 // to). In case it's not, the PostTask dispatches the messages. |
| 65 message_queue_.push(ReceivedMessage(new Message(msg), listener, | 69 message_queue_.push(ReceivedMessage(new Message(msg), listener, |
| 66 channel_id)); | 70 channel_id)); |
| 67 } | 71 } |
| 68 | 72 |
| 69 SetEvent(blocking_event_); | 73 SetEvent(dispatch_event_); |
| 70 if (!was_task_pending) { | 74 if (!was_task_pending) { |
| 71 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( | 75 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
| 72 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); | 76 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); |
| 73 } | 77 } |
| 74 } | 78 } |
| 75 | 79 |
| 76 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 80 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
| 77 received_replies_.push_back(Reply(new Message(msg), context)); | 81 received_replies_.push_back(Reply(new Message(msg), context)); |
| 78 } | 82 } |
| 79 | 83 |
| (...skipping 18 matching lines...) Expand all Loading... |
| 98 break; | 102 break; |
| 99 | 103 |
| 100 ReceivedMessage& blocking_msg = message_queue_.front(); | 104 ReceivedMessage& blocking_msg = message_queue_.front(); |
| 101 message = blocking_msg.message; | 105 message = blocking_msg.message; |
| 102 listener = blocking_msg.listener; | 106 listener = blocking_msg.listener; |
| 103 channel_id = blocking_msg.channel_id; | 107 channel_id = blocking_msg.channel_id; |
| 104 message_queue_.pop(); | 108 message_queue_.pop(); |
| 105 } | 109 } |
| 106 | 110 |
| 107 #ifdef IPC_MESSAGE_LOG_ENABLED | 111 #ifdef IPC_MESSAGE_LOG_ENABLED |
| 108 IPC::Logging* logger = IPC::Logging::current(); | 112 Logging* logger = Logging::current(); |
| 109 if (logger->Enabled()) | 113 if (logger->Enabled()) |
| 110 logger->OnPreDispatchMessage(*message); | 114 logger->OnPreDispatchMessage(*message); |
| 111 #endif | 115 #endif |
| 112 | 116 |
| 113 if (listener) | 117 if (listener) |
| 114 listener->OnMessageReceived(*message); | 118 listener->OnMessageReceived(*message); |
| 115 | 119 |
| 116 #ifdef IPC_MESSAGE_LOG_ENABLED | 120 #ifdef IPC_MESSAGE_LOG_ENABLED |
| 117 if (logger->Enabled()) | 121 if (logger->Enabled()) |
| 118 logger->OnPostDispatchMessage(*message, channel_id); | 122 logger->OnPostDispatchMessage(*message, channel_id); |
| 119 #endif | 123 #endif |
| 120 | 124 |
| 121 delete message; | 125 delete message; |
| 122 } | 126 } |
| 123 } | 127 } |
| 124 | 128 |
| 125 // Called on the IPC thread when the current sync Send() call is unblocked. | 129 // Called on the IPC thread when the current sync Send() call is unblocked. |
| 126 void OnUnblock() { | 130 void DidUnblock() { |
| 127 if (!received_replies_.empty()) { | 131 if (!received_replies_.empty()) { |
| 128 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( | 132 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( |
| 129 this, &ReceivedSyncMsgQueue::DispatchReplies)); | 133 this, &ReceivedSyncMsgQueue::DispatchReplies)); |
| 130 } | 134 } |
| 131 } | 135 } |
| 132 | 136 |
| 133 // SyncChannel calls this in its destructor. | 137 // SyncChannel calls this in its destructor. |
| 134 void RemoveListener(Channel::Listener* listener) { | 138 void RemoveListener(Channel::Listener* listener) { |
| 135 AutoLock auto_lock(message_lock_); | 139 AutoLock auto_lock(message_lock_); |
| 136 | 140 |
| 137 SyncMessageQueue temp_queue; | 141 SyncMessageQueue temp_queue; |
| 138 while (!message_queue_.empty()) { | 142 while (!message_queue_.empty()) { |
| 139 if (message_queue_.front().listener != listener) { | 143 if (message_queue_.front().listener != listener) { |
| 140 temp_queue.push(message_queue_.front()); | 144 temp_queue.push(message_queue_.front()); |
| 141 } else { | 145 } else { |
| 142 delete message_queue_.front().message; | 146 delete message_queue_.front().message; |
| 143 } | 147 } |
| 144 | 148 |
| 145 message_queue_.pop(); | 149 message_queue_.pop(); |
| 146 } | 150 } |
| 147 | 151 |
| 148 while (!temp_queue.empty()) { | 152 while (!temp_queue.empty()) { |
| 149 message_queue_.push(temp_queue.front()); | 153 message_queue_.push(temp_queue.front()); |
| 150 temp_queue.pop(); | 154 temp_queue.pop(); |
| 151 } | 155 } |
| 156 |
| 157 if (--listener_count_ == 0) { |
| 158 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
| 159 lazy_tls_ptr_.Pointer()->Set(NULL); |
| 160 } |
| 152 } | 161 } |
| 153 | 162 |
| 154 HANDLE blocking_event() { return blocking_event_; } | 163 HANDLE dispatch_event() { return dispatch_event_; } |
| 155 MessageLoop* listener_message_loop() { return listener_message_loop_; } | 164 MessageLoop* listener_message_loop() { return listener_message_loop_; } |
| 156 | 165 |
| 157 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 166 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
| 158 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 167 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
| 159 lazy_tls_ptr_; | 168 lazy_tls_ptr_; |
| 160 | 169 |
| 161 private: | 170 private: |
| 171 ReceivedSyncMsgQueue() : |
| 172 dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)), |
| 173 task_pending_(false), |
| 174 listener_message_loop_(MessageLoop::current()), |
| 175 listener_count_(0) { |
| 176 } |
| 177 |
| 162 // Called on the ipc thread to check if we can unblock any current Send() | 178 // Called on the ipc thread to check if we can unblock any current Send() |
| 163 // calls based on a queued reply. | 179 // calls based on a queued reply. |
| 164 void DispatchReplies() { | 180 void DispatchReplies() { |
| 165 for (size_t i = 0; i < received_replies_.size(); ++i) { | 181 for (size_t i = 0; i < received_replies_.size(); ++i) { |
| 166 Message* message = received_replies_[i].message; | 182 Message* message = received_replies_[i].message; |
| 167 if (received_replies_[i].context->UnblockListener(message)) { | 183 if (received_replies_[i].context->TryToUnblockListener(message)) { |
| 168 delete message; | 184 delete message; |
| 169 received_replies_.erase(received_replies_.begin() + i); | 185 received_replies_.erase(received_replies_.begin() + i); |
| 170 return; | 186 return; |
| 171 } | 187 } |
| 172 } | 188 } |
| 173 } | 189 } |
| 174 | 190 |
| 175 // Set when we got a synchronous message that we must respond to as the | |
| 176 // sender needs its reply before it can reply to our original synchronous | |
| 177 // message. | |
| 178 HANDLE blocking_event_; | |
| 179 | |
| 180 MessageLoop* listener_message_loop_; | |
| 181 | |
| 182 // Holds information about a queued synchronous message. | 191 // Holds information about a queued synchronous message. |
| 183 struct ReceivedMessage { | 192 struct ReceivedMessage { |
| 184 ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i) | 193 ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i) |
| 185 : message(m), listener(l), channel_id(i) { } | 194 : message(m), listener(l), channel_id(i) { } |
| 186 Message* message; | 195 Message* message; |
| 187 Channel::Listener* listener; | 196 Channel::Listener* listener; |
| 188 std::wstring channel_id; | 197 std::wstring channel_id; |
| 189 }; | 198 }; |
| 190 | 199 |
| 191 typedef std::queue<ReceivedMessage> SyncMessageQueue; | 200 typedef std::queue<ReceivedMessage> SyncMessageQueue; |
| 192 SyncMessageQueue message_queue_; | 201 SyncMessageQueue message_queue_; |
| 193 Lock message_lock_; | |
| 194 bool task_pending_; | |
| 195 | 202 |
| 196 // Holds information about a queued reply message. | 203 // Holds information about a queued reply message. |
| 197 struct Reply { | 204 struct Reply { |
| 198 Reply(Message* m, SyncChannel::SyncContext* c) | 205 Reply(Message* m, SyncChannel::SyncContext* c) |
| 199 : message(m), | 206 : message(m), |
| 200 context(c) { } | 207 context(c) { } |
| 201 | 208 |
| 202 Message* message; | 209 Message* message; |
| 203 scoped_refptr<SyncChannel::SyncContext> context; | 210 scoped_refptr<SyncChannel::SyncContext> context; |
| 204 }; | 211 }; |
| 205 | 212 |
| 206 std::vector<Reply> received_replies_; | 213 std::vector<Reply> received_replies_; |
| 214 |
| 215 // Set when we got a synchronous message that we must respond to as the |
| 216 // sender needs its reply before it can reply to our original synchronous |
| 217 // message. |
| 218 ScopedHandle dispatch_event_; |
| 219 MessageLoop* listener_message_loop_; |
| 220 Lock message_lock_; |
| 221 bool task_pending_; |
| 222 int listener_count_; |
| 207 }; | 223 }; |
| 208 | 224 |
| 209 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
| 210 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); | 226 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); |
| 211 | 227 |
| 212 SyncChannel::SyncContext::SyncContext( | 228 SyncChannel::SyncContext::SyncContext( |
| 213 Channel::Listener* listener, | 229 Channel::Listener* listener, |
| 214 MessageFilter* filter, | 230 MessageFilter* filter, |
| 215 MessageLoop* ipc_thread) | 231 MessageLoop* ipc_thread, |
| 232 HANDLE shutdown_event) |
| 216 : ChannelProxy::Context(listener, filter, ipc_thread), | 233 : ChannelProxy::Context(listener, filter, ipc_thread), |
| 217 channel_closed_(false), | 234 shutdown_event_(shutdown_event), |
| 218 reply_deserialize_result_(false) { | 235 received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){ |
| 219 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple | |
| 220 // SyncChannel objects that can block the same thread). | |
| 221 received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get(); | |
| 222 | |
| 223 if (!received_sync_msgs_) { | |
| 224 // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we | |
| 225 // need to be able to access it in the IPC thread. | |
| 226 received_sync_msgs_ = new ReceivedSyncMsgQueue(); | |
| 227 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_); | |
| 228 } | |
| 229 | |
| 230 // Addref manually so that we can ensure destruction on the listener thread | |
| 231 // (so that the TLS object is NULLd). | |
| 232 received_sync_msgs_->AddRef(); | |
| 233 } | 236 } |
| 234 | 237 |
| 235 SyncChannel::SyncContext::~SyncContext() { | 238 SyncChannel::SyncContext::~SyncContext() { |
| 236 while (!deserializers_.empty()) | 239 while (!deserializers_.empty()) |
| 237 PopDeserializer(true); | 240 Pop(); |
| 238 | |
| 239 received_sync_msgs_->listener_message_loop()->ReleaseSoon( | |
| 240 FROM_HERE, received_sync_msgs_); | |
| 241 } | 241 } |
| 242 | 242 |
| 243 // Adds information about an outgoing sync message to the context so that | 243 // Adds information about an outgoing sync message to the context so that |
| 244 // we know how to deserialize the reply. Returns a handle that's set when | 244 // we know how to deserialize the reply. Returns a handle that's set when |
| 245 // the reply has arrived. | 245 // the reply has arrived. |
| 246 HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) { | 246 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
| 247 PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg), | 247 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), |
| 248 sync_msg->GetReplyDeserializer(), | 248 sync_msg->GetReplyDeserializer(), |
| 249 CreateEvent(NULL, FALSE, FALSE, NULL)); | 249 CreateEvent(NULL, FALSE, FALSE, NULL)); |
| 250 AutoLock auto_lock(deserializers_lock_); | 250 AutoLock auto_lock(deserializers_lock_); |
| 251 deserializers_.push(pending); | 251 deserializers_.push_back(pending); |
| 252 | |
| 253 return pending.reply_event; | |
| 254 } | 252 } |
| 255 | 253 |
| 256 HANDLE SyncChannel::SyncContext::blocking_event() { | 254 bool SyncChannel::SyncContext::Pop() { |
| 257 return received_sync_msgs_->blocking_event(); | 255 AutoLock auto_lock(deserializers_lock_); |
| 256 PendingSyncMsg msg = deserializers_.back(); |
| 257 delete msg.deserializer; |
| 258 CloseHandle(msg.done_event); |
| 259 deserializers_.pop_back(); |
| 260 return msg.send_result; |
| 261 } |
| 262 |
| 263 HANDLE SyncChannel::SyncContext::GetSendDoneEvent() { |
| 264 AutoLock auto_lock(deserializers_lock_); |
| 265 return deserializers_.back().done_event; |
| 266 } |
| 267 |
| 268 HANDLE SyncChannel::SyncContext::GetDispatchEvent() { |
| 269 return received_sync_msgs_->dispatch_event(); |
| 258 } | 270 } |
| 259 | 271 |
| 260 void SyncChannel::SyncContext::DispatchMessages() { | 272 void SyncChannel::SyncContext::DispatchMessages() { |
| 261 received_sync_msgs_->DispatchMessages(); | 273 received_sync_msgs_->DispatchMessages(); |
| 262 } | 274 } |
| 263 | 275 |
| 264 void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) { | 276 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
| 265 received_sync_msgs_->RemoveListener(listener); | 277 { |
| 266 } | 278 AutoLock auto_lock(deserializers_lock_); |
| 279 if (deserializers_.empty() || |
| 280 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
| 281 return false; |
| 282 } |
| 267 | 283 |
| 268 bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { | 284 if (!msg->is_reply_error()) { |
| 269 bool rv = false; | 285 deserializers_.back().send_result = deserializers_.back().deserializer-> |
| 270 HANDLE reply_event = NULL; | 286 SerializeOutputParameters(*msg); |
| 271 { | |
| 272 if (channel_closed_) { | |
| 273 // The channel is closed, or we couldn't connect, so cancel all Send() | |
| 274 // calls. | |
| 275 reply_deserialize_result_ = false; | |
| 276 { | |
| 277 AutoLock auto_lock(deserializers_lock_); | |
| 278 if (!deserializers_.empty()) | |
| 279 reply_event = deserializers_.top().reply_event; | |
| 280 } | |
| 281 | |
| 282 if (reply_event) | |
| 283 PopDeserializer(false); | |
| 284 } else { | |
| 285 { | |
| 286 AutoLock auto_lock(deserializers_lock_); | |
| 287 if (deserializers_.empty()) | |
| 288 return false; | |
| 289 | |
| 290 if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id)) | |
| 291 return false; | |
| 292 | |
| 293 rv = true; | |
| 294 if (msg->is_reply_error()) { | |
| 295 reply_deserialize_result_ = false; | |
| 296 } else { | |
| 297 reply_deserialize_result_ = deserializers_.top().deserializer-> | |
| 298 SerializeOutputParameters(*msg); | |
| 299 } | |
| 300 | |
| 301 // Can't CloseHandle the event just yet, since doing so might cause the | |
| 302 // Wait call above to never return. | |
| 303 reply_event = deserializers_.top().reply_event; | |
| 304 } | |
| 305 PopDeserializer(false); | |
| 306 } | 287 } |
| 288 SetEvent(deserializers_.back().done_event); |
| 307 } | 289 } |
| 308 | 290 |
| 309 if (reply_event) | |
| 310 SetEvent(reply_event); | |
| 311 | |
| 312 // We got a reply to a synchronous Send() call that's blocking the listener | 291 // We got a reply to a synchronous Send() call that's blocking the listener |
| 313 // thread. However, further down the call stack there could be another | 292 // thread. However, further down the call stack there could be another |
| 314 // blocking Send() call, whose reply we received after we made this last | 293 // blocking Send() call, whose reply we received after we made this last |
| 315 // Send() call. So check if we have any queued replies available that | 294 // Send() call. So check if we have any queued replies available that |
| 316 // can now unblock the listener thread. | 295 // can now unblock the listener thread. |
| 317 received_sync_msgs_->OnUnblock(); | 296 received_sync_msgs_->DidUnblock(); |
| 318 | 297 |
| 319 return rv; | 298 return true; |
| 320 } | 299 } |
| 321 | 300 |
| 322 // Called on the IPC thread. | 301 void SyncChannel::SyncContext::Clear() { |
| 302 CancelPendingSends(); |
| 303 received_sync_msgs_->RemoveListener(listener()); |
| 304 |
| 305 Context::Clear(); |
| 306 } |
| 307 |
| 323 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { | 308 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { |
| 324 // Give the filters a chance at processing this message. | 309 // Give the filters a chance at processing this message. |
| 325 if (TryFilters(msg)) | 310 if (TryFilters(msg)) |
| 326 return; | 311 return; |
| 327 | 312 |
| 328 if (UnblockListener(&msg)) | 313 if (TryToUnblockListener(&msg)) |
| 329 return; | 314 return; |
| 330 | 315 |
| 331 if (msg.should_unblock()) { | 316 if (msg.should_unblock()) { |
| 332 received_sync_msgs_->QueueMessage(msg, listener(), channel_id()); | 317 received_sync_msgs_->QueueMessage(msg, listener(), channel_id()); |
| 333 return; | 318 return; |
| 334 } | 319 } |
| 335 | 320 |
| 336 if (msg.is_reply()) { | 321 if (msg.is_reply()) { |
| 337 received_sync_msgs_->QueueReply(msg, this); | 322 received_sync_msgs_->QueueReply(msg, this); |
| 338 return; | 323 return; |
| 339 } | 324 } |
| 340 | 325 |
| 341 return Context::OnMessageReceived(msg); | 326 return Context::OnMessageReceivedNoFilter(msg); |
| 342 } | 327 } |
| 343 | 328 |
| 344 // Called on the IPC thread. | |
| 345 void SyncChannel::SyncContext::OnChannelError() { | 329 void SyncChannel::SyncContext::OnChannelError() { |
| 346 channel_closed_ = true; | 330 CancelPendingSends(); |
| 347 UnblockListener(NULL); | |
| 348 | |
| 349 Context::OnChannelError(); | 331 Context::OnChannelError(); |
| 350 } | 332 } |
| 351 | 333 |
| 352 void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) { | 334 void SyncChannel::SyncContext::OnChannelOpened() { |
| 353 PendingSyncMsg msg = deserializers_.top(); | 335 shutdown_watcher_.StartWatching(shutdown_event_, this); |
| 354 delete msg.deserializer; | 336 Context::OnChannelOpened(); |
| 355 if (close_reply_event) | |
| 356 CloseHandle(msg.reply_event); | |
| 357 deserializers_.pop(); | |
| 358 } | 337 } |
| 359 | 338 |
| 360 SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode, | 339 void SyncChannel::SyncContext::OnChannelClosed() { |
| 361 Channel::Listener* listener, MessageFilter* filter, | 340 shutdown_watcher_.StopWatching(); |
| 362 MessageLoop* ipc_message_loop, | 341 Context::OnChannelClosed(); |
| 363 bool create_pipe_now, HANDLE shutdown_event) | 342 } |
| 364 : ChannelProxy(channel_id, mode, ipc_message_loop, | 343 |
| 365 new SyncContext(listener, filter, ipc_message_loop), | 344 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { |
| 366 create_pipe_now), | 345 AutoLock auto_lock(deserializers_lock_); |
| 367 shutdown_event_(shutdown_event), | 346 PendingSyncMessageQueue::iterator iter; |
| 347 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
| 348 if ((*iter).id == message_id) { |
| 349 SetEvent((*iter).done_event); |
| 350 break; |
| 351 } |
| 352 } |
| 353 } |
| 354 |
| 355 void SyncChannel::SyncContext::CancelPendingSends() { |
| 356 AutoLock auto_lock(deserializers_lock_); |
| 357 PendingSyncMessageQueue::iterator iter; |
| 358 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) |
| 359 SetEvent((*iter).done_event); |
| 360 } |
| 361 |
| 362 void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) { |
| 363 DCHECK(object == shutdown_event_); |
| 364 // Process shut down before we can get a reply to a synchronous message. |
| 365 // Cancel pending Send calls, which will end up setting the send done event. |
| 366 CancelPendingSends(); |
| 367 } |
| 368 |
| 369 |
| 370 SyncChannel::SyncChannel( |
| 371 const std::wstring& channel_id, Channel::Mode mode, |
| 372 Channel::Listener* listener, MessageFilter* filter, |
| 373 MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event) |
| 374 : ChannelProxy( |
| 375 channel_id, mode, ipc_message_loop, |
| 376 new SyncContext(listener, filter, ipc_message_loop, shutdown_event), |
| 377 create_pipe_now), |
| 368 sync_messages_with_no_timeout_allowed_(true) { | 378 sync_messages_with_no_timeout_allowed_(true) { |
| 369 DCHECK(shutdown_event_ != NULL); | 379 // Ideally we only want to watch this object when running a nested message |
| 380 // loop. However, we don't know when it exits if there's another nested |
| 381 // message loop running under it or not, so we wouldn't know whether to |
| 382 // stop or keep watching. So we always watch it, and create the event as |
| 383 // manual reset since the object watcher might otherwise reset the event |
| 384 // when we're doing a WaitForMultipleObjects. |
| 385 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
| 370 } | 386 } |
| 371 | 387 |
| 372 SyncChannel::~SyncChannel() { | 388 SyncChannel::~SyncChannel() { |
| 373 // The listener ensures that its lifetime is greater than SyncChannel. But | |
| 374 // after SyncChannel is destructed there's no guarantee that the listener is | |
| 375 // still around, so we wouldn't want ReceivedSyncMsgQueue to call the | |
| 376 // listener. | |
| 377 sync_context()->RemoveListener(listener()); | |
| 378 } | 389 } |
| 379 | 390 |
| 380 bool SyncChannel::Send(IPC::Message* message) { | 391 bool SyncChannel::Send(Message* message) { |
| 381 return SendWithTimeout(message, INFINITE); | 392 return SendWithTimeout(message, INFINITE); |
| 382 } | 393 } |
| 383 | 394 |
| 384 bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) { | 395 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { |
| 385 bool message_is_sync = message->is_sync(); | 396 if (!message->is_sync()) { |
| 386 HANDLE pump_messages_event = NULL; | 397 ChannelProxy::Send(message); |
| 387 | 398 return true; |
| 388 HANDLE reply_event = NULL; | |
| 389 if (message_is_sync) { | |
| 390 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); | |
| 391 IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message); | |
| 392 reply_event = sync_context()->Push(sync_msg); | |
| 393 pump_messages_event = sync_msg->pump_messages_event(); | |
| 394 } | 399 } |
| 395 | 400 |
| 396 // Send the message using the ChannelProxy | 401 // *this* might get deleted in WaitForReply. |
| 402 scoped_refptr<SyncContext> context(sync_context()); |
| 403 if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) { |
| 404 delete message; |
| 405 return false; |
| 406 } |
| 407 |
| 408 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); |
| 409 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| 410 context->Push(sync_msg); |
| 411 int message_id = SyncMessage::GetMessageId(*sync_msg); |
| 412 HANDLE pump_messages_event = sync_msg->pump_messages_event(); |
| 413 |
| 397 ChannelProxy::Send(message); | 414 ChannelProxy::Send(message); |
| 398 if (!message_is_sync) | |
| 399 return true; | |
| 400 | 415 |
| 401 do { | 416 if (timeout_ms != INFINITE) { |
| 402 // Wait for reply, or for any other incoming synchronous message. | 417 // We use the sync message id so that when a message times out, we don't |
| 403 DCHECK(reply_event != NULL); | 418 // confuse it with another send that is either above/below this Send in |
| 404 HANDLE objects[] = { shutdown_event_, | 419 // the call stack. |
| 405 reply_event, | 420 context->ipc_message_loop()->PostDelayedTask(FROM_HERE, |
| 406 sync_context()->blocking_event(), | 421 NewRunnableMethod(context.get(), |
| 407 pump_messages_event}; | 422 &SyncContext::OnSendTimeout, message_id), timeout_ms); |
| 423 } |
| 408 | 424 |
| 409 DWORD result; | 425 // Wait for reply, or for any other incoming synchronous messages. |
| 410 TimeTicks before = TimeTicks::Now(); | 426 WaitForReply(pump_messages_event); |
| 411 if (pump_messages_event == NULL) { | 427 |
| 412 // No need to pump messages since we didn't get an event to check. | 428 return context->Pop(); |
| 413 result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms); | 429 } |
| 414 } else { | 430 |
| 415 // If the event is set, then we pump messages. Otherwise we also wait on | 431 void SyncChannel::WaitForReply(HANDLE pump_messages_event) { |
| 416 // it so that if it gets set we start pumping messages. | 432 while (true) { |
| 417 if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) { | 433 HANDLE objects[] = { sync_context()->GetDispatchEvent(), |
| 418 // Before calling MsgWaitForMultipleObjects() we check that our events | 434 sync_context()->GetSendDoneEvent(), |
| 419 // are not signaled. The windows message queue might always have events | 435 pump_messages_event }; |
| 420 // starving the checking of our events otherwise. | 436 uint32 count = pump_messages_event ? 3: 2; |
| 421 result = WaitForMultipleObjects(3, objects, FALSE, 0); | 437 DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE); |
| 422 if (result == WAIT_TIMEOUT) { | 438 if (result == WAIT_OBJECT_0) { |
| 423 result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms, | 439 // We're waiting for a reply, but we received a blocking synchronous |
| 424 QS_ALLINPUT); | 440 // call. We must process it or otherwise a deadlock might occur. |
| 425 } | 441 ResetEvent(sync_context()->GetDispatchEvent()); |
| 426 } else { | 442 sync_context()->DispatchMessages(); |
| 427 result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms); | 443 continue; |
| 428 } | |
| 429 } | 444 } |
| 430 | 445 |
| 431 if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) { | 446 if (result == WAIT_OBJECT_0 + 2) |
| 432 // Process shut down before we can get a reply to a synchronous message, | 447 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. |
| 433 // or timed-out. Unblock the thread. | |
| 434 sync_context()->PopDeserializer(true); | |
| 435 return false; | |
| 436 } | |
| 437 | 448 |
| 438 if (result == WAIT_OBJECT_0 + 1) { | 449 break; |
| 439 // We got the reply to our synchronous message. | 450 } |
| 440 CloseHandle(reply_event); | |
| 441 return sync_context()->reply_deserialize_result(); | |
| 442 } | |
| 443 | |
| 444 if (result == WAIT_OBJECT_0 + 2) { | |
| 445 // We're waiting for a reply, but we received a blocking synchronous | |
| 446 // call. We must process it or otherwise a deadlock might occur. | |
| 447 sync_context()->DispatchMessages(); | |
| 448 } else if (result == WAIT_OBJECT_0 + 3) { | |
| 449 // Run a nested messsage loop to pump all the thread's messages. We | |
| 450 // shutdown the nested loop when there are no more messages. | |
| 451 pump_messages_events_.push(pump_messages_event); | |
| 452 bool old_state = MessageLoop::current()->NestableTasksAllowed(); | |
| 453 MessageLoop::current()->SetNestableTasksAllowed(true); | |
| 454 // Process a message, but come right back out of the MessageLoop (don't | |
| 455 // loop, sleep, or wait for a kMsgQuit). | |
| 456 MessageLoop::current()->RunAllPending(); | |
| 457 MessageLoop::current()->SetNestableTasksAllowed(old_state); | |
| 458 pump_messages_events_.pop(); | |
| 459 } else { | |
| 460 DCHECK(result == WAIT_OBJECT_0 + 4); | |
| 461 // We were doing a WaitForMultipleObjects, but now the pump messages | |
| 462 // event is set, so the next time we loop we'll use | |
| 463 // MsgWaitForMultipleObjects instead. | |
| 464 } | |
| 465 | |
| 466 if (timeout_ms != INFINITE) { | |
| 467 TimeDelta time_delta = TimeTicks::Now() - before; | |
| 468 timeout_ms -= static_cast<int>(time_delta.InMilliseconds()); | |
| 469 if (timeout_ms <= 0) { | |
| 470 // We timed-out while processing messages. | |
| 471 sync_context()->PopDeserializer(true); | |
| 472 return false; | |
| 473 } | |
| 474 } | |
| 475 | |
| 476 // Continue looping until we either get the reply to our synchronous message | |
| 477 // or we time-out. | |
| 478 } while (true); | |
| 479 } | 451 } |
| 480 | 452 |
| 481 bool SyncChannel::UnblockListener(Message* message) { | 453 void SyncChannel::WaitForReplyWithNestedMessageLoop() { |
| 482 return sync_context()->UnblockListener(message); | 454 HANDLE old_done_event = send_done_watcher_.GetWatchedObject(); |
| 455 send_done_watcher_.StopWatching(); |
| 456 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); |
| 457 bool old_state = MessageLoop::current()->NestableTasksAllowed(); |
| 458 MessageLoop::current()->SetNestableTasksAllowed(true); |
| 459 MessageLoop::current()->Run(); |
| 460 MessageLoop::current()->SetNestableTasksAllowed(old_state); |
| 461 if (old_done_event) |
| 462 send_done_watcher_.StartWatching(old_done_event, this); |
| 463 } |
| 464 |
| 465 void SyncChannel::OnObjectSignaled(HANDLE object) { |
| 466 HANDLE dispatch_event = sync_context()->GetDispatchEvent(); |
| 467 if (object == dispatch_event) { |
| 468 // The call to DispatchMessages might delete this object, so reregister |
| 469 // the object watcher first. |
| 470 ResetEvent(dispatch_event); |
| 471 dispatch_watcher_.StartWatching(dispatch_event, this); |
| 472 sync_context()->DispatchMessages(); |
| 473 } else { |
| 474 // We got the reply, timed out or the process shutdown. |
| 475 DCHECK(object == sync_context()->GetSendDoneEvent()); |
| 476 MessageLoop::current()->Quit(); |
| 477 } |
| 483 } | 478 } |
| 484 | 479 |
| 485 } // namespace IPC | 480 } // namespace IPC |
| 486 | |
| OLD | NEW |