OLD | NEW |
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include <windows.h> | 5 #include <windows.h> |
6 | 6 |
7 #include "chrome/common/ipc_sync_channel.h" | 7 #include "chrome/common/ipc_sync_channel.h" |
8 | 8 |
9 #include "base/lazy_instance.h" | 9 #include "base/lazy_instance.h" |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
(...skipping 19 matching lines...) Expand all Loading... |
30 // we queue a task on the listener thread to dispatch the received messages. | 30 // we queue a task on the listener thread to dispatch the received messages. |
31 // The messages are stored in this queue object that's shared among all | 31 // The messages are stored in this queue object that's shared among all |
32 // SyncChannel objects on the same thread (since one object can receive a | 32 // SyncChannel objects on the same thread (since one object can receive a |
33 // sync message while another one is blocked). | 33 // sync message while another one is blocked). |
34 | 34 |
35 class SyncChannel::ReceivedSyncMsgQueue; | 35 class SyncChannel::ReceivedSyncMsgQueue; |
36 | 36 |
37 class SyncChannel::ReceivedSyncMsgQueue : | 37 class SyncChannel::ReceivedSyncMsgQueue : |
38 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { | 38 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { |
39 public: | 39 public: |
40 ReceivedSyncMsgQueue() : | 40 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one |
41 blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)), | 41 // if necessary. Call RemoveListener on the same thread when done. |
42 task_pending_(false), | 42 static ReceivedSyncMsgQueue* AddListener() { |
43 listener_message_loop_(MessageLoop::current()) { | 43 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple |
| 44 // SyncChannel objects can block the same thread). |
| 45 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); |
| 46 if (!rv) { |
| 47 rv = new ReceivedSyncMsgQueue(); |
| 48 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv); |
| 49 } |
| 50 rv->listener_count_++; |
| 51 return rv; |
44 } | 52 } |
45 | 53 |
46 ~ReceivedSyncMsgQueue() { | 54 ~ReceivedSyncMsgQueue() { |
47 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | |
48 DCHECK(MessageLoop::current() == listener_message_loop_); | |
49 CloseHandle(blocking_event_); | |
50 lazy_tls_ptr_.Pointer()->Set(NULL); | |
51 } | 55 } |
52 | 56 |
53 // Called on IPC thread when a synchronous message or reply arrives. | 57 // Called on IPC thread when a synchronous message or reply arrives. |
54 void QueueMessage(const Message& msg, Channel::Listener* listener, | 58 void QueueMessage(const Message& msg, Channel::Listener* listener, |
55 const std::wstring& channel_id) { | 59 const std::wstring& channel_id) { |
56 bool was_task_pending; | 60 bool was_task_pending; |
57 { | 61 { |
58 AutoLock auto_lock(message_lock_); | 62 AutoLock auto_lock(message_lock_); |
59 | 63 |
60 was_task_pending = task_pending_; | 64 was_task_pending = task_pending_; |
61 task_pending_ = true; | 65 task_pending_ = true; |
62 | 66 |
63 // We set the event in case the listener thread is blocked (or is about | 67 // We set the event in case the listener thread is blocked (or is about |
64 // to). In case it's not, the PostTask dispatches the messages. | 68 // to). In case it's not, the PostTask dispatches the messages. |
65 message_queue_.push(ReceivedMessage(new Message(msg), listener, | 69 message_queue_.push(ReceivedMessage(new Message(msg), listener, |
66 channel_id)); | 70 channel_id)); |
67 } | 71 } |
68 | 72 |
69 SetEvent(blocking_event_); | 73 SetEvent(dispatch_event_); |
70 if (!was_task_pending) { | 74 if (!was_task_pending) { |
71 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( | 75 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
72 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); | 76 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); |
73 } | 77 } |
74 } | 78 } |
75 | 79 |
76 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 80 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
77 received_replies_.push_back(Reply(new Message(msg), context)); | 81 received_replies_.push_back(Reply(new Message(msg), context)); |
78 } | 82 } |
79 | 83 |
(...skipping 18 matching lines...) Expand all Loading... |
98 break; | 102 break; |
99 | 103 |
100 ReceivedMessage& blocking_msg = message_queue_.front(); | 104 ReceivedMessage& blocking_msg = message_queue_.front(); |
101 message = blocking_msg.message; | 105 message = blocking_msg.message; |
102 listener = blocking_msg.listener; | 106 listener = blocking_msg.listener; |
103 channel_id = blocking_msg.channel_id; | 107 channel_id = blocking_msg.channel_id; |
104 message_queue_.pop(); | 108 message_queue_.pop(); |
105 } | 109 } |
106 | 110 |
107 #ifdef IPC_MESSAGE_LOG_ENABLED | 111 #ifdef IPC_MESSAGE_LOG_ENABLED |
108 IPC::Logging* logger = IPC::Logging::current(); | 112 Logging* logger = Logging::current(); |
109 if (logger->Enabled()) | 113 if (logger->Enabled()) |
110 logger->OnPreDispatchMessage(*message); | 114 logger->OnPreDispatchMessage(*message); |
111 #endif | 115 #endif |
112 | 116 |
113 if (listener) | 117 if (listener) |
114 listener->OnMessageReceived(*message); | 118 listener->OnMessageReceived(*message); |
115 | 119 |
116 #ifdef IPC_MESSAGE_LOG_ENABLED | 120 #ifdef IPC_MESSAGE_LOG_ENABLED |
117 if (logger->Enabled()) | 121 if (logger->Enabled()) |
118 logger->OnPostDispatchMessage(*message, channel_id); | 122 logger->OnPostDispatchMessage(*message, channel_id); |
119 #endif | 123 #endif |
120 | 124 |
121 delete message; | 125 delete message; |
122 } | 126 } |
123 } | 127 } |
124 | 128 |
125 // Called on the IPC thread when the current sync Send() call is unblocked. | 129 // Called on the IPC thread when the current sync Send() call is unblocked. |
126 void OnUnblock() { | 130 void DidUnblock() { |
127 if (!received_replies_.empty()) { | 131 if (!received_replies_.empty()) { |
128 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( | 132 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( |
129 this, &ReceivedSyncMsgQueue::DispatchReplies)); | 133 this, &ReceivedSyncMsgQueue::DispatchReplies)); |
130 } | 134 } |
131 } | 135 } |
132 | 136 |
133 // SyncChannel calls this in its destructor. | 137 // SyncChannel calls this in its destructor. |
134 void RemoveListener(Channel::Listener* listener) { | 138 void RemoveListener(Channel::Listener* listener) { |
135 AutoLock auto_lock(message_lock_); | 139 AutoLock auto_lock(message_lock_); |
136 | 140 |
137 SyncMessageQueue temp_queue; | 141 SyncMessageQueue temp_queue; |
138 while (!message_queue_.empty()) { | 142 while (!message_queue_.empty()) { |
139 if (message_queue_.front().listener != listener) { | 143 if (message_queue_.front().listener != listener) { |
140 temp_queue.push(message_queue_.front()); | 144 temp_queue.push(message_queue_.front()); |
141 } else { | 145 } else { |
142 delete message_queue_.front().message; | 146 delete message_queue_.front().message; |
143 } | 147 } |
144 | 148 |
145 message_queue_.pop(); | 149 message_queue_.pop(); |
146 } | 150 } |
147 | 151 |
148 while (!temp_queue.empty()) { | 152 while (!temp_queue.empty()) { |
149 message_queue_.push(temp_queue.front()); | 153 message_queue_.push(temp_queue.front()); |
150 temp_queue.pop(); | 154 temp_queue.pop(); |
151 } | 155 } |
| 156 |
| 157 if (--listener_count_ == 0) { |
| 158 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
| 159 lazy_tls_ptr_.Pointer()->Set(NULL); |
| 160 } |
152 } | 161 } |
153 | 162 |
154 HANDLE blocking_event() { return blocking_event_; } | 163 HANDLE dispatch_event() { return dispatch_event_; } |
155 MessageLoop* listener_message_loop() { return listener_message_loop_; } | 164 MessageLoop* listener_message_loop() { return listener_message_loop_; } |
156 | 165 |
157 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 166 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
158 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 167 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
159 lazy_tls_ptr_; | 168 lazy_tls_ptr_; |
160 | 169 |
161 private: | 170 private: |
| 171 ReceivedSyncMsgQueue() : |
| 172 dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)), |
| 173 task_pending_(false), |
| 174 listener_message_loop_(MessageLoop::current()), |
| 175 listener_count_(0) { |
| 176 } |
| 177 |
162 // Called on the ipc thread to check if we can unblock any current Send() | 178 // Called on the ipc thread to check if we can unblock any current Send() |
163 // calls based on a queued reply. | 179 // calls based on a queued reply. |
164 void DispatchReplies() { | 180 void DispatchReplies() { |
165 for (size_t i = 0; i < received_replies_.size(); ++i) { | 181 for (size_t i = 0; i < received_replies_.size(); ++i) { |
166 Message* message = received_replies_[i].message; | 182 Message* message = received_replies_[i].message; |
167 if (received_replies_[i].context->UnblockListener(message)) { | 183 if (received_replies_[i].context->TryToUnblockListener(message)) { |
168 delete message; | 184 delete message; |
169 received_replies_.erase(received_replies_.begin() + i); | 185 received_replies_.erase(received_replies_.begin() + i); |
170 return; | 186 return; |
171 } | 187 } |
172 } | 188 } |
173 } | 189 } |
174 | 190 |
175 // Set when we got a synchronous message that we must respond to as the | |
176 // sender needs its reply before it can reply to our original synchronous | |
177 // message. | |
178 HANDLE blocking_event_; | |
179 | |
180 MessageLoop* listener_message_loop_; | |
181 | |
182 // Holds information about a queued synchronous message. | 191 // Holds information about a queued synchronous message. |
183 struct ReceivedMessage { | 192 struct ReceivedMessage { |
184 ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i) | 193 ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i) |
185 : message(m), listener(l), channel_id(i) { } | 194 : message(m), listener(l), channel_id(i) { } |
186 Message* message; | 195 Message* message; |
187 Channel::Listener* listener; | 196 Channel::Listener* listener; |
188 std::wstring channel_id; | 197 std::wstring channel_id; |
189 }; | 198 }; |
190 | 199 |
191 typedef std::queue<ReceivedMessage> SyncMessageQueue; | 200 typedef std::queue<ReceivedMessage> SyncMessageQueue; |
192 SyncMessageQueue message_queue_; | 201 SyncMessageQueue message_queue_; |
193 Lock message_lock_; | |
194 bool task_pending_; | |
195 | 202 |
196 // Holds information about a queued reply message. | 203 // Holds information about a queued reply message. |
197 struct Reply { | 204 struct Reply { |
198 Reply(Message* m, SyncChannel::SyncContext* c) | 205 Reply(Message* m, SyncChannel::SyncContext* c) |
199 : message(m), | 206 : message(m), |
200 context(c) { } | 207 context(c) { } |
201 | 208 |
202 Message* message; | 209 Message* message; |
203 scoped_refptr<SyncChannel::SyncContext> context; | 210 scoped_refptr<SyncChannel::SyncContext> context; |
204 }; | 211 }; |
205 | 212 |
206 std::vector<Reply> received_replies_; | 213 std::vector<Reply> received_replies_; |
| 214 |
| 215 // Set when we got a synchronous message that we must respond to as the |
| 216 // sender needs its reply before it can reply to our original synchronous |
| 217 // message. |
| 218 ScopedHandle dispatch_event_; |
| 219 MessageLoop* listener_message_loop_; |
| 220 Lock message_lock_; |
| 221 bool task_pending_; |
| 222 int listener_count_; |
207 }; | 223 }; |
208 | 224 |
209 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
210 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); | 226 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); |
211 | 227 |
212 SyncChannel::SyncContext::SyncContext( | 228 SyncChannel::SyncContext::SyncContext( |
213 Channel::Listener* listener, | 229 Channel::Listener* listener, |
214 MessageFilter* filter, | 230 MessageFilter* filter, |
215 MessageLoop* ipc_thread) | 231 MessageLoop* ipc_thread, |
| 232 HANDLE shutdown_event) |
216 : ChannelProxy::Context(listener, filter, ipc_thread), | 233 : ChannelProxy::Context(listener, filter, ipc_thread), |
217 channel_closed_(false), | 234 shutdown_event_(shutdown_event), |
218 reply_deserialize_result_(false) { | 235 received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){ |
219 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple | |
220 // SyncChannel objects that can block the same thread). | |
221 received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get(); | |
222 | |
223 if (!received_sync_msgs_) { | |
224 // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we | |
225 // need to be able to access it in the IPC thread. | |
226 received_sync_msgs_ = new ReceivedSyncMsgQueue(); | |
227 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_); | |
228 } | |
229 | |
230 // Addref manually so that we can ensure destruction on the listener thread | |
231 // (so that the TLS object is NULLd). | |
232 received_sync_msgs_->AddRef(); | |
233 } | 236 } |
234 | 237 |
235 SyncChannel::SyncContext::~SyncContext() { | 238 SyncChannel::SyncContext::~SyncContext() { |
236 while (!deserializers_.empty()) | 239 while (!deserializers_.empty()) |
237 PopDeserializer(true); | 240 Pop(); |
238 | |
239 received_sync_msgs_->listener_message_loop()->ReleaseSoon( | |
240 FROM_HERE, received_sync_msgs_); | |
241 } | 241 } |
242 | 242 |
243 // Adds information about an outgoing sync message to the context so that | 243 // Adds information about an outgoing sync message to the context so that |
244 // we know how to deserialize the reply. Returns a handle that's set when | 244 // we know how to deserialize the reply. Returns a handle that's set when |
245 // the reply has arrived. | 245 // the reply has arrived. |
246 HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) { | 246 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
247 PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg), | 247 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), |
248 sync_msg->GetReplyDeserializer(), | 248 sync_msg->GetReplyDeserializer(), |
249 CreateEvent(NULL, FALSE, FALSE, NULL)); | 249 CreateEvent(NULL, FALSE, FALSE, NULL)); |
250 AutoLock auto_lock(deserializers_lock_); | 250 AutoLock auto_lock(deserializers_lock_); |
251 deserializers_.push(pending); | 251 deserializers_.push_back(pending); |
252 | |
253 return pending.reply_event; | |
254 } | 252 } |
255 | 253 |
256 HANDLE SyncChannel::SyncContext::blocking_event() { | 254 bool SyncChannel::SyncContext::Pop() { |
257 return received_sync_msgs_->blocking_event(); | 255 AutoLock auto_lock(deserializers_lock_); |
| 256 PendingSyncMsg msg = deserializers_.back(); |
| 257 delete msg.deserializer; |
| 258 CloseHandle(msg.done_event); |
| 259 deserializers_.pop_back(); |
| 260 return msg.send_result; |
| 261 } |
| 262 |
| 263 HANDLE SyncChannel::SyncContext::GetSendDoneEvent() { |
| 264 AutoLock auto_lock(deserializers_lock_); |
| 265 return deserializers_.back().done_event; |
| 266 } |
| 267 |
| 268 HANDLE SyncChannel::SyncContext::GetDispatchEvent() { |
| 269 return received_sync_msgs_->dispatch_event(); |
258 } | 270 } |
259 | 271 |
260 void SyncChannel::SyncContext::DispatchMessages() { | 272 void SyncChannel::SyncContext::DispatchMessages() { |
261 received_sync_msgs_->DispatchMessages(); | 273 received_sync_msgs_->DispatchMessages(); |
262 } | 274 } |
263 | 275 |
264 void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) { | 276 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
265 received_sync_msgs_->RemoveListener(listener); | 277 { |
266 } | 278 AutoLock auto_lock(deserializers_lock_); |
| 279 if (deserializers_.empty() || |
| 280 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
| 281 return false; |
| 282 } |
267 | 283 |
268 bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { | 284 if (!msg->is_reply_error()) { |
269 bool rv = false; | 285 deserializers_.back().send_result = deserializers_.back().deserializer-> |
270 HANDLE reply_event = NULL; | 286 SerializeOutputParameters(*msg); |
271 { | |
272 if (channel_closed_) { | |
273 // The channel is closed, or we couldn't connect, so cancel all Send() | |
274 // calls. | |
275 reply_deserialize_result_ = false; | |
276 { | |
277 AutoLock auto_lock(deserializers_lock_); | |
278 if (!deserializers_.empty()) | |
279 reply_event = deserializers_.top().reply_event; | |
280 } | |
281 | |
282 if (reply_event) | |
283 PopDeserializer(false); | |
284 } else { | |
285 { | |
286 AutoLock auto_lock(deserializers_lock_); | |
287 if (deserializers_.empty()) | |
288 return false; | |
289 | |
290 if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id)) | |
291 return false; | |
292 | |
293 rv = true; | |
294 if (msg->is_reply_error()) { | |
295 reply_deserialize_result_ = false; | |
296 } else { | |
297 reply_deserialize_result_ = deserializers_.top().deserializer-> | |
298 SerializeOutputParameters(*msg); | |
299 } | |
300 | |
301 // Can't CloseHandle the event just yet, since doing so might cause the | |
302 // Wait call above to never return. | |
303 reply_event = deserializers_.top().reply_event; | |
304 } | |
305 PopDeserializer(false); | |
306 } | 287 } |
| 288 SetEvent(deserializers_.back().done_event); |
307 } | 289 } |
308 | 290 |
309 if (reply_event) | |
310 SetEvent(reply_event); | |
311 | |
312 // We got a reply to a synchronous Send() call that's blocking the listener | 291 // We got a reply to a synchronous Send() call that's blocking the listener |
313 // thread. However, further down the call stack there could be another | 292 // thread. However, further down the call stack there could be another |
314 // blocking Send() call, whose reply we received after we made this last | 293 // blocking Send() call, whose reply we received after we made this last |
315 // Send() call. So check if we have any queued replies available that | 294 // Send() call. So check if we have any queued replies available that |
316 // can now unblock the listener thread. | 295 // can now unblock the listener thread. |
317 received_sync_msgs_->OnUnblock(); | 296 received_sync_msgs_->DidUnblock(); |
318 | 297 |
319 return rv; | 298 return true; |
320 } | 299 } |
321 | 300 |
322 // Called on the IPC thread. | 301 void SyncChannel::SyncContext::Clear() { |
| 302 CancelPendingSends(); |
| 303 received_sync_msgs_->RemoveListener(listener()); |
| 304 |
| 305 Context::Clear(); |
| 306 } |
| 307 |
323 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { | 308 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { |
324 // Give the filters a chance at processing this message. | 309 // Give the filters a chance at processing this message. |
325 if (TryFilters(msg)) | 310 if (TryFilters(msg)) |
326 return; | 311 return; |
327 | 312 |
328 if (UnblockListener(&msg)) | 313 if (TryToUnblockListener(&msg)) |
329 return; | 314 return; |
330 | 315 |
331 if (msg.should_unblock()) { | 316 if (msg.should_unblock()) { |
332 received_sync_msgs_->QueueMessage(msg, listener(), channel_id()); | 317 received_sync_msgs_->QueueMessage(msg, listener(), channel_id()); |
333 return; | 318 return; |
334 } | 319 } |
335 | 320 |
336 if (msg.is_reply()) { | 321 if (msg.is_reply()) { |
337 received_sync_msgs_->QueueReply(msg, this); | 322 received_sync_msgs_->QueueReply(msg, this); |
338 return; | 323 return; |
339 } | 324 } |
340 | 325 |
341 return Context::OnMessageReceived(msg); | 326 return Context::OnMessageReceivedNoFilter(msg); |
342 } | 327 } |
343 | 328 |
344 // Called on the IPC thread. | |
345 void SyncChannel::SyncContext::OnChannelError() { | 329 void SyncChannel::SyncContext::OnChannelError() { |
346 channel_closed_ = true; | 330 CancelPendingSends(); |
347 UnblockListener(NULL); | |
348 | |
349 Context::OnChannelError(); | 331 Context::OnChannelError(); |
350 } | 332 } |
351 | 333 |
352 void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) { | 334 void SyncChannel::SyncContext::OnChannelOpened() { |
353 PendingSyncMsg msg = deserializers_.top(); | 335 shutdown_watcher_.StartWatching(shutdown_event_, this); |
354 delete msg.deserializer; | 336 Context::OnChannelOpened(); |
355 if (close_reply_event) | |
356 CloseHandle(msg.reply_event); | |
357 deserializers_.pop(); | |
358 } | 337 } |
359 | 338 |
360 SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode, | 339 void SyncChannel::SyncContext::OnChannelClosed() { |
361 Channel::Listener* listener, MessageFilter* filter, | 340 shutdown_watcher_.StopWatching(); |
362 MessageLoop* ipc_message_loop, | 341 Context::OnChannelClosed(); |
363 bool create_pipe_now, HANDLE shutdown_event) | 342 } |
364 : ChannelProxy(channel_id, mode, ipc_message_loop, | 343 |
365 new SyncContext(listener, filter, ipc_message_loop), | 344 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { |
366 create_pipe_now), | 345 AutoLock auto_lock(deserializers_lock_); |
367 shutdown_event_(shutdown_event), | 346 PendingSyncMessageQueue::iterator iter; |
| 347 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
| 348 if ((*iter).id == message_id) { |
| 349 SetEvent((*iter).done_event); |
| 350 break; |
| 351 } |
| 352 } |
| 353 } |
| 354 |
| 355 void SyncChannel::SyncContext::CancelPendingSends() { |
| 356 AutoLock auto_lock(deserializers_lock_); |
| 357 PendingSyncMessageQueue::iterator iter; |
| 358 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) |
| 359 SetEvent((*iter).done_event); |
| 360 } |
| 361 |
| 362 void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) { |
| 363 DCHECK(object == shutdown_event_); |
| 364 // Process shut down before we can get a reply to a synchronous message. |
| 365 // Cancel pending Send calls, which will end up setting the send done event. |
| 366 CancelPendingSends(); |
| 367 } |
| 368 |
| 369 |
| 370 SyncChannel::SyncChannel( |
| 371 const std::wstring& channel_id, Channel::Mode mode, |
| 372 Channel::Listener* listener, MessageFilter* filter, |
| 373 MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event) |
| 374 : ChannelProxy( |
| 375 channel_id, mode, ipc_message_loop, |
| 376 new SyncContext(listener, filter, ipc_message_loop, shutdown_event), |
| 377 create_pipe_now), |
368 sync_messages_with_no_timeout_allowed_(true) { | 378 sync_messages_with_no_timeout_allowed_(true) { |
369 DCHECK(shutdown_event_ != NULL); | 379 // Ideally we only want to watch this object when running a nested message |
| 380 // loop. However, we don't know when it exits if there's another nested |
| 381 // message loop running under it or not, so we wouldn't know whether to |
| 382 // stop or keep watching. So we always watch it, and create the event as |
| 383 // manual reset since the object watcher might otherwise reset the event |
| 384 // when we're doing a WaitForMultipleObjects. |
| 385 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
370 } | 386 } |
371 | 387 |
372 SyncChannel::~SyncChannel() { | 388 SyncChannel::~SyncChannel() { |
373 // The listener ensures that its lifetime is greater than SyncChannel. But | |
374 // after SyncChannel is destructed there's no guarantee that the listener is | |
375 // still around, so we wouldn't want ReceivedSyncMsgQueue to call the | |
376 // listener. | |
377 sync_context()->RemoveListener(listener()); | |
378 } | 389 } |
379 | 390 |
380 bool SyncChannel::Send(IPC::Message* message) { | 391 bool SyncChannel::Send(Message* message) { |
381 return SendWithTimeout(message, INFINITE); | 392 return SendWithTimeout(message, INFINITE); |
382 } | 393 } |
383 | 394 |
384 bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) { | 395 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { |
385 bool message_is_sync = message->is_sync(); | 396 if (!message->is_sync()) { |
386 HANDLE pump_messages_event = NULL; | 397 ChannelProxy::Send(message); |
387 | 398 return true; |
388 HANDLE reply_event = NULL; | |
389 if (message_is_sync) { | |
390 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); | |
391 IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message); | |
392 reply_event = sync_context()->Push(sync_msg); | |
393 pump_messages_event = sync_msg->pump_messages_event(); | |
394 } | 399 } |
395 | 400 |
396 // Send the message using the ChannelProxy | 401 // *this* might get deleted in WaitForReply. |
| 402 scoped_refptr<SyncContext> context(sync_context()); |
| 403 if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) { |
| 404 delete message; |
| 405 return false; |
| 406 } |
| 407 |
| 408 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); |
| 409 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
| 410 context->Push(sync_msg); |
| 411 int message_id = SyncMessage::GetMessageId(*sync_msg); |
| 412 HANDLE pump_messages_event = sync_msg->pump_messages_event(); |
| 413 |
397 ChannelProxy::Send(message); | 414 ChannelProxy::Send(message); |
398 if (!message_is_sync) | |
399 return true; | |
400 | 415 |
401 do { | 416 if (timeout_ms != INFINITE) { |
402 // Wait for reply, or for any other incoming synchronous message. | 417 // We use the sync message id so that when a message times out, we don't |
403 DCHECK(reply_event != NULL); | 418 // confuse it with another send that is either above/below this Send in |
404 HANDLE objects[] = { shutdown_event_, | 419 // the call stack. |
405 reply_event, | 420 context->ipc_message_loop()->PostDelayedTask(FROM_HERE, |
406 sync_context()->blocking_event(), | 421 NewRunnableMethod(context.get(), |
407 pump_messages_event}; | 422 &SyncContext::OnSendTimeout, message_id), timeout_ms); |
| 423 } |
408 | 424 |
409 DWORD result; | 425 // Wait for reply, or for any other incoming synchronous messages. |
410 TimeTicks before = TimeTicks::Now(); | 426 WaitForReply(pump_messages_event); |
411 if (pump_messages_event == NULL) { | 427 |
412 // No need to pump messages since we didn't get an event to check. | 428 return context->Pop(); |
413 result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms); | 429 } |
414 } else { | 430 |
415 // If the event is set, then we pump messages. Otherwise we also wait on | 431 void SyncChannel::WaitForReply(HANDLE pump_messages_event) { |
416 // it so that if it gets set we start pumping messages. | 432 while (true) { |
417 if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) { | 433 HANDLE objects[] = { sync_context()->GetDispatchEvent(), |
418 // Before calling MsgWaitForMultipleObjects() we check that our events | 434 sync_context()->GetSendDoneEvent(), |
419 // are not signaled. The windows message queue might always have events | 435 pump_messages_event }; |
420 // starving the checking of our events otherwise. | 436 uint32 count = pump_messages_event ? 3: 2; |
421 result = WaitForMultipleObjects(3, objects, FALSE, 0); | 437 DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE); |
422 if (result == WAIT_TIMEOUT) { | 438 if (result == WAIT_OBJECT_0) { |
423 result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms, | 439 // We're waiting for a reply, but we received a blocking synchronous |
424 QS_ALLINPUT); | 440 // call. We must process it or otherwise a deadlock might occur. |
425 } | 441 ResetEvent(sync_context()->GetDispatchEvent()); |
426 } else { | 442 sync_context()->DispatchMessages(); |
427 result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms); | 443 continue; |
428 } | |
429 } | 444 } |
430 | 445 |
431 if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) { | 446 if (result == WAIT_OBJECT_0 + 2) |
432 // Process shut down before we can get a reply to a synchronous message, | 447 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. |
433 // or timed-out. Unblock the thread. | |
434 sync_context()->PopDeserializer(true); | |
435 return false; | |
436 } | |
437 | 448 |
438 if (result == WAIT_OBJECT_0 + 1) { | 449 break; |
439 // We got the reply to our synchronous message. | 450 } |
440 CloseHandle(reply_event); | |
441 return sync_context()->reply_deserialize_result(); | |
442 } | |
443 | |
444 if (result == WAIT_OBJECT_0 + 2) { | |
445 // We're waiting for a reply, but we received a blocking synchronous | |
446 // call. We must process it or otherwise a deadlock might occur. | |
447 sync_context()->DispatchMessages(); | |
448 } else if (result == WAIT_OBJECT_0 + 3) { | |
449 // Run a nested messsage loop to pump all the thread's messages. We | |
450 // shutdown the nested loop when there are no more messages. | |
451 pump_messages_events_.push(pump_messages_event); | |
452 bool old_state = MessageLoop::current()->NestableTasksAllowed(); | |
453 MessageLoop::current()->SetNestableTasksAllowed(true); | |
454 // Process a message, but come right back out of the MessageLoop (don't | |
455 // loop, sleep, or wait for a kMsgQuit). | |
456 MessageLoop::current()->RunAllPending(); | |
457 MessageLoop::current()->SetNestableTasksAllowed(old_state); | |
458 pump_messages_events_.pop(); | |
459 } else { | |
460 DCHECK(result == WAIT_OBJECT_0 + 4); | |
461 // We were doing a WaitForMultipleObjects, but now the pump messages | |
462 // event is set, so the next time we loop we'll use | |
463 // MsgWaitForMultipleObjects instead. | |
464 } | |
465 | |
466 if (timeout_ms != INFINITE) { | |
467 TimeDelta time_delta = TimeTicks::Now() - before; | |
468 timeout_ms -= static_cast<int>(time_delta.InMilliseconds()); | |
469 if (timeout_ms <= 0) { | |
470 // We timed-out while processing messages. | |
471 sync_context()->PopDeserializer(true); | |
472 return false; | |
473 } | |
474 } | |
475 | |
476 // Continue looping until we either get the reply to our synchronous message | |
477 // or we time-out. | |
478 } while (true); | |
479 } | 451 } |
480 | 452 |
481 bool SyncChannel::UnblockListener(Message* message) { | 453 void SyncChannel::WaitForReplyWithNestedMessageLoop() { |
482 return sync_context()->UnblockListener(message); | 454 HANDLE old_done_event = send_done_watcher_.GetWatchedObject(); |
| 455 send_done_watcher_.StopWatching(); |
| 456 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); |
| 457 bool old_state = MessageLoop::current()->NestableTasksAllowed(); |
| 458 MessageLoop::current()->SetNestableTasksAllowed(true); |
| 459 MessageLoop::current()->Run(); |
| 460 MessageLoop::current()->SetNestableTasksAllowed(old_state); |
| 461 if (old_done_event) |
| 462 send_done_watcher_.StartWatching(old_done_event, this); |
| 463 } |
| 464 |
| 465 void SyncChannel::OnObjectSignaled(HANDLE object) { |
| 466 HANDLE dispatch_event = sync_context()->GetDispatchEvent(); |
| 467 if (object == dispatch_event) { |
| 468 // The call to DispatchMessages might delete this object, so reregister |
| 469 // the object watcher first. |
| 470 ResetEvent(dispatch_event); |
| 471 dispatch_watcher_.StartWatching(dispatch_event, this); |
| 472 sync_context()->DispatchMessages(); |
| 473 } else { |
| 474 // We got the reply, timed out or the process shutdown. |
| 475 DCHECK(object == sync_context()->GetSendDoneEvent()); |
| 476 MessageLoop::current()->Quit(); |
| 477 } |
483 } | 478 } |
484 | 479 |
485 } // namespace IPC | 480 } // namespace IPC |
486 | |
OLD | NEW |