OLD | NEW |
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include <windows.h> | |
6 | |
7 #include "chrome/common/ipc_sync_channel.h" | 5 #include "chrome/common/ipc_sync_channel.h" |
8 | 6 |
9 #include "base/lazy_instance.h" | 7 #include "base/lazy_instance.h" |
10 #include "base/logging.h" | 8 #include "base/logging.h" |
11 #include "base/thread_local.h" | 9 #include "base/thread_local.h" |
12 #include "chrome/common/child_process.h" | 10 #include "base/message_loop.h" |
| 11 #include "base/waitable_event.h" |
| 12 #include "base/waitable_event_watcher.h" |
13 #include "chrome/common/ipc_logging.h" | 13 #include "chrome/common/ipc_logging.h" |
14 #include "chrome/common/ipc_sync_message.h" | 14 #include "chrome/common/ipc_sync_message.h" |
15 | 15 |
| 16 #if !defined(OS_WIN) |
| 17 #define INFINITE -1 |
| 18 #endif |
| 19 |
16 using base::TimeDelta; | 20 using base::TimeDelta; |
17 using base::TimeTicks; | 21 using base::TimeTicks; |
| 22 using base::WaitableEvent; |
18 | 23 |
19 namespace IPC { | 24 namespace IPC { |
20 // When we're blocked in a Send(), we need to process incoming synchronous | 25 // When we're blocked in a Send(), we need to process incoming synchronous |
21 // messages right away because it could be blocking our reply (either | 26 // messages right away because it could be blocking our reply (either |
22 // directly from the same object we're calling, or indirectly through one or | 27 // directly from the same object we're calling, or indirectly through one or |
23 // more other channels). That means that in SyncContext's OnMessageReceived, | 28 // more other channels). That means that in SyncContext's OnMessageReceived, |
24 // we need to process sync message right away if we're blocked. However a | 29 // we need to process sync message right away if we're blocked. However a |
25 // simple check isn't sufficient, because the listener thread can be in the | 30 // simple check isn't sufficient, because the listener thread can be in the |
26 // process of calling Send. | 31 // process of calling Send. |
27 // To work around this, when SyncChannel filters a sync message, it sets | 32 // To work around this, when SyncChannel filters a sync message, it sets |
28 // an event that the listener thread waits on during its Send() call. This | 33 // an event that the listener thread waits on during its Send() call. This |
29 // allows us to dispatch incoming sync messages when blocked. The race | 34 // allows us to dispatch incoming sync messages when blocked. The race |
30 // condition is handled because if Send is in the process of being called, it | 35 // condition is handled because if Send is in the process of being called, it |
31 // will check the event. In case the listener thread isn't sending a message, | 36 // will check the event. In case the listener thread isn't sending a message, |
32 // we queue a task on the listener thread to dispatch the received messages. | 37 // we queue a task on the listener thread to dispatch the received messages. |
33 // The messages are stored in this queue object that's shared among all | 38 // The messages are stored in this queue object that's shared among all |
34 // SyncChannel objects on the same thread (since one object can receive a | 39 // SyncChannel objects on the same thread (since one object can receive a |
35 // sync message while another one is blocked). | 40 // sync message while another one is blocked). |
36 | 41 |
37 class SyncChannel::ReceivedSyncMsgQueue; | |
38 | |
39 class SyncChannel::ReceivedSyncMsgQueue : | 42 class SyncChannel::ReceivedSyncMsgQueue : |
40 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { | 43 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { |
41 public: | 44 public: |
42 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one | 45 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one |
43 // if necessary. Call RemoveContext on the same thread when done. | 46 // if necessary. Call RemoveContext on the same thread when done. |
44 static ReceivedSyncMsgQueue* AddContext() { | 47 static ReceivedSyncMsgQueue* AddContext() { |
45 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple | 48 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple |
46 // SyncChannel objects can block the same thread). | 49 // SyncChannel objects can block the same thread). |
47 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); | 50 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); |
48 if (!rv) { | 51 if (!rv) { |
(...skipping 14 matching lines...) Expand all Loading... |
63 AutoLock auto_lock(message_lock_); | 66 AutoLock auto_lock(message_lock_); |
64 | 67 |
65 was_task_pending = task_pending_; | 68 was_task_pending = task_pending_; |
66 task_pending_ = true; | 69 task_pending_ = true; |
67 | 70 |
68 // We set the event in case the listener thread is blocked (or is about | 71 // We set the event in case the listener thread is blocked (or is about |
69 // to). In case it's not, the PostTask dispatches the messages. | 72 // to). In case it's not, the PostTask dispatches the messages. |
70 message_queue_.push_back(QueuedMessage(new Message(msg), context)); | 73 message_queue_.push_back(QueuedMessage(new Message(msg), context)); |
71 } | 74 } |
72 | 75 |
73 SetEvent(dispatch_event_); | 76 dispatch_event_.Signal(); |
74 if (!was_task_pending) { | 77 if (!was_task_pending) { |
75 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( | 78 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( |
76 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); | 79 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); |
77 } | 80 } |
78 } | 81 } |
79 | 82 |
80 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 83 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
81 received_replies_.push_back(QueuedMessage(new Message(msg), context)); | 84 received_replies_.push_back(QueuedMessage(new Message(msg), context)); |
82 } | 85 } |
83 | 86 |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
136 iter++; | 139 iter++; |
137 } | 140 } |
138 } | 141 } |
139 | 142 |
140 if (--listener_count_ == 0) { | 143 if (--listener_count_ == 0) { |
141 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 144 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
142 lazy_tls_ptr_.Pointer()->Set(NULL); | 145 lazy_tls_ptr_.Pointer()->Set(NULL); |
143 } | 146 } |
144 } | 147 } |
145 | 148 |
146 HANDLE dispatch_event() { return dispatch_event_; } | 149 WaitableEvent* dispatch_event() { return &dispatch_event_; } |
147 MessageLoop* listener_message_loop() { return listener_message_loop_; } | 150 MessageLoop* listener_message_loop() { return listener_message_loop_; } |
148 | 151 |
149 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. | 152 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. |
150 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > | 153 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > |
151 lazy_tls_ptr_; | 154 lazy_tls_ptr_; |
152 | 155 |
153 // Called on the ipc thread to check if we can unblock any current Send() | 156 // Called on the ipc thread to check if we can unblock any current Send() |
154 // calls based on a queued reply. | 157 // calls based on a queued reply. |
155 void DispatchReplies() { | 158 void DispatchReplies() { |
156 for (size_t i = 0; i < received_replies_.size(); ++i) { | 159 for (size_t i = 0; i < received_replies_.size(); ++i) { |
157 Message* message = received_replies_[i].message; | 160 Message* message = received_replies_[i].message; |
158 if (received_replies_[i].context->TryToUnblockListener(message)) { | 161 if (received_replies_[i].context->TryToUnblockListener(message)) { |
159 delete message; | 162 delete message; |
160 received_replies_.erase(received_replies_.begin() + i); | 163 received_replies_.erase(received_replies_.begin() + i); |
161 return; | 164 return; |
162 } | 165 } |
163 } | 166 } |
164 } | 167 } |
165 | 168 |
166 private: | 169 private: |
167 // See the comment in SyncChannel::SyncChannel for why this event is created | 170 // See the comment in SyncChannel::SyncChannel for why this event is created |
168 // as manual reset. | 171 // as manual reset. |
169 ReceivedSyncMsgQueue() : | 172 ReceivedSyncMsgQueue() : |
170 dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)), | 173 dispatch_event_(true, false), |
| 174 listener_message_loop_(MessageLoop::current()), |
171 task_pending_(false), | 175 task_pending_(false), |
172 listener_message_loop_(MessageLoop::current()), | |
173 listener_count_(0) { | 176 listener_count_(0) { |
174 } | 177 } |
175 | 178 |
176 // Holds information about a queued synchronous message or reply. | 179 // Holds information about a queued synchronous message or reply. |
177 struct QueuedMessage { | 180 struct QueuedMessage { |
178 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 181 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
179 Message* message; | 182 Message* message; |
180 scoped_refptr<SyncChannel::SyncContext> context; | 183 scoped_refptr<SyncChannel::SyncContext> context; |
181 }; | 184 }; |
182 | 185 |
183 typedef std::deque<QueuedMessage> SyncMessageQueue; | 186 typedef std::deque<QueuedMessage> SyncMessageQueue; |
184 SyncMessageQueue message_queue_; | 187 SyncMessageQueue message_queue_; |
185 | 188 |
186 std::vector<QueuedMessage> received_replies_; | 189 std::vector<QueuedMessage> received_replies_; |
187 | 190 |
188 // Set when we got a synchronous message that we must respond to as the | 191 // Set when we got a synchronous message that we must respond to as the |
189 // sender needs its reply before it can reply to our original synchronous | 192 // sender needs its reply before it can reply to our original synchronous |
190 // message. | 193 // message. |
191 ScopedHandle dispatch_event_; | 194 WaitableEvent dispatch_event_; |
192 MessageLoop* listener_message_loop_; | 195 MessageLoop* listener_message_loop_; |
193 Lock message_lock_; | 196 Lock message_lock_; |
194 bool task_pending_; | 197 bool task_pending_; |
195 int listener_count_; | 198 int listener_count_; |
196 }; | 199 }; |
197 | 200 |
198 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > | 201 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > |
199 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); | 202 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); |
200 | 203 |
201 SyncChannel::SyncContext::SyncContext( | 204 SyncChannel::SyncContext::SyncContext( |
202 Channel::Listener* listener, | 205 Channel::Listener* listener, |
203 MessageFilter* filter, | 206 MessageFilter* filter, |
204 MessageLoop* ipc_thread, | 207 MessageLoop* ipc_thread, |
205 HANDLE shutdown_event) | 208 WaitableEvent* shutdown_event) |
206 : ChannelProxy::Context(listener, filter, ipc_thread), | 209 : ChannelProxy::Context(listener, filter, ipc_thread), |
207 shutdown_event_(shutdown_event), | 210 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), |
208 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()){ | 211 shutdown_event_(shutdown_event) { |
209 } | 212 } |
210 | 213 |
211 SyncChannel::SyncContext::~SyncContext() { | 214 SyncChannel::SyncContext::~SyncContext() { |
212 while (!deserializers_.empty()) | 215 while (!deserializers_.empty()) |
213 Pop(); | 216 Pop(); |
214 } | 217 } |
215 | 218 |
216 // Adds information about an outgoing sync message to the context so that | 219 // Adds information about an outgoing sync message to the context so that |
217 // we know how to deserialize the reply. Returns a handle that's set when | 220 // we know how to deserialize the reply. Returns a handle that's set when |
218 // the reply has arrived. | 221 // the reply has arrived. |
219 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { | 222 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { |
220 // The event is created as manual reset because in between SetEvent and | 223 // The event is created as manual reset because in between Signal and |
221 // OnObjectSignalled, another Send can happen which would stop the watcher | 224 // OnObjectSignalled, another Send can happen which would stop the watcher |
222 // from being called. The event would get watched later, when the nested | 225 // from being called. The event would get watched later, when the nested |
223 // Send completes, so the event will need to remain set. | 226 // Send completes, so the event will need to remain set. |
224 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), | 227 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), |
225 sync_msg->GetReplyDeserializer(), | 228 sync_msg->GetReplyDeserializer(), |
226 CreateEvent(NULL, TRUE, FALSE, NULL)); | 229 new WaitableEvent(true, false)); |
227 AutoLock auto_lock(deserializers_lock_); | 230 AutoLock auto_lock(deserializers_lock_); |
228 deserializers_.push_back(pending); | 231 deserializers_.push_back(pending); |
229 } | 232 } |
230 | 233 |
231 bool SyncChannel::SyncContext::Pop() { | 234 bool SyncChannel::SyncContext::Pop() { |
232 bool result; | 235 bool result; |
233 { | 236 { |
234 AutoLock auto_lock(deserializers_lock_); | 237 AutoLock auto_lock(deserializers_lock_); |
235 PendingSyncMsg msg = deserializers_.back(); | 238 PendingSyncMsg msg = deserializers_.back(); |
236 delete msg.deserializer; | 239 delete msg.deserializer; |
237 CloseHandle(msg.done_event); | 240 delete msg.done_event; |
| 241 msg.done_event = NULL; |
238 deserializers_.pop_back(); | 242 deserializers_.pop_back(); |
239 result = msg.send_result; | 243 result = msg.send_result; |
240 } | 244 } |
241 | 245 |
242 // We got a reply to a synchronous Send() call that's blocking the listener | 246 // We got a reply to a synchronous Send() call that's blocking the listener |
243 // thread. However, further down the call stack there could be another | 247 // thread. However, further down the call stack there could be another |
244 // blocking Send() call, whose reply we received after we made this last | 248 // blocking Send() call, whose reply we received after we made this last |
245 // Send() call. So check if we have any queued replies available that | 249 // Send() call. So check if we have any queued replies available that |
246 // can now unblock the listener thread. | 250 // can now unblock the listener thread. |
247 ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 251 ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
248 received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies)); | 252 received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies)); |
249 | 253 |
250 return result; | 254 return result; |
251 } | 255 } |
252 | 256 |
253 HANDLE SyncChannel::SyncContext::GetSendDoneEvent() { | 257 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { |
254 AutoLock auto_lock(deserializers_lock_); | 258 AutoLock auto_lock(deserializers_lock_); |
255 return deserializers_.back().done_event; | 259 return deserializers_.back().done_event; |
256 } | 260 } |
257 | 261 |
258 HANDLE SyncChannel::SyncContext::GetDispatchEvent() { | 262 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { |
259 return received_sync_msgs_->dispatch_event(); | 263 return received_sync_msgs_->dispatch_event(); |
260 } | 264 } |
261 | 265 |
262 void SyncChannel::SyncContext::DispatchMessages() { | 266 void SyncChannel::SyncContext::DispatchMessages() { |
263 received_sync_msgs_->DispatchMessages(); | 267 received_sync_msgs_->DispatchMessages(); |
264 } | 268 } |
265 | 269 |
266 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { | 270 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { |
267 AutoLock auto_lock(deserializers_lock_); | 271 AutoLock auto_lock(deserializers_lock_); |
268 if (deserializers_.empty() || | 272 if (deserializers_.empty() || |
269 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { | 273 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { |
270 return false; | 274 return false; |
271 } | 275 } |
272 | 276 |
273 if (!msg->is_reply_error()) { | 277 if (!msg->is_reply_error()) { |
274 deserializers_.back().send_result = deserializers_.back().deserializer-> | 278 deserializers_.back().send_result = deserializers_.back().deserializer-> |
275 SerializeOutputParameters(*msg); | 279 SerializeOutputParameters(*msg); |
276 } | 280 } |
277 SetEvent(deserializers_.back().done_event); | 281 deserializers_.back().done_event->Signal(); |
278 | 282 |
279 return true; | 283 return true; |
280 } | 284 } |
281 | 285 |
282 void SyncChannel::SyncContext::Clear() { | 286 void SyncChannel::SyncContext::Clear() { |
283 CancelPendingSends(); | 287 CancelPendingSends(); |
284 received_sync_msgs_->RemoveContext(this); | 288 received_sync_msgs_->RemoveContext(this); |
285 | 289 |
286 Context::Clear(); | 290 Context::Clear(); |
287 } | 291 } |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
320 void SyncChannel::SyncContext::OnChannelClosed() { | 324 void SyncChannel::SyncContext::OnChannelClosed() { |
321 shutdown_watcher_.StopWatching(); | 325 shutdown_watcher_.StopWatching(); |
322 Context::OnChannelClosed(); | 326 Context::OnChannelClosed(); |
323 } | 327 } |
324 | 328 |
325 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { | 329 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { |
326 AutoLock auto_lock(deserializers_lock_); | 330 AutoLock auto_lock(deserializers_lock_); |
327 PendingSyncMessageQueue::iterator iter; | 331 PendingSyncMessageQueue::iterator iter; |
328 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { | 332 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { |
329 if (iter->id == message_id) { | 333 if (iter->id == message_id) { |
330 SetEvent(iter->done_event); | 334 iter->done_event->Signal(); |
331 break; | 335 break; |
332 } | 336 } |
333 } | 337 } |
334 } | 338 } |
335 | 339 |
336 void SyncChannel::SyncContext::CancelPendingSends() { | 340 void SyncChannel::SyncContext::CancelPendingSends() { |
337 AutoLock auto_lock(deserializers_lock_); | 341 AutoLock auto_lock(deserializers_lock_); |
338 PendingSyncMessageQueue::iterator iter; | 342 PendingSyncMessageQueue::iterator iter; |
339 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) | 343 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) |
340 SetEvent(iter->done_event); | 344 iter->done_event->Signal(); |
341 } | 345 } |
342 | 346 |
343 void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) { | 347 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) { |
344 DCHECK(object == shutdown_event_); | 348 DCHECK(event == shutdown_event_); |
345 // Process shut down before we can get a reply to a synchronous message. | 349 // Process shut down before we can get a reply to a synchronous message. |
346 // Cancel pending Send calls, which will end up setting the send done event. | 350 // Cancel pending Send calls, which will end up setting the send done event. |
347 CancelPendingSends(); | 351 CancelPendingSends(); |
348 } | 352 } |
349 | 353 |
350 | 354 |
351 SyncChannel::SyncChannel( | 355 SyncChannel::SyncChannel( |
352 const std::wstring& channel_id, Channel::Mode mode, | 356 const std::wstring& channel_id, Channel::Mode mode, |
353 Channel::Listener* listener, MessageFilter* filter, | 357 Channel::Listener* listener, MessageFilter* filter, |
354 MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event) | 358 MessageLoop* ipc_message_loop, bool create_pipe_now, |
| 359 WaitableEvent* shutdown_event) |
355 : ChannelProxy( | 360 : ChannelProxy( |
356 channel_id, mode, ipc_message_loop, | 361 channel_id, mode, ipc_message_loop, |
357 new SyncContext(listener, filter, ipc_message_loop, shutdown_event), | 362 new SyncContext(listener, filter, ipc_message_loop, shutdown_event), |
358 create_pipe_now), | 363 create_pipe_now), |
359 sync_messages_with_no_timeout_allowed_(true) { | 364 sync_messages_with_no_timeout_allowed_(true) { |
360 // Ideally we only want to watch this object when running a nested message | 365 // Ideally we only want to watch this object when running a nested message |
361 // loop. However, we don't know when it exits if there's another nested | 366 // loop. However, we don't know when it exits if there's another nested |
362 // message loop running under it or not, so we wouldn't know whether to | 367 // message loop running under it or not, so we wouldn't know whether to |
363 // stop or keep watching. So we always watch it, and create the event as | 368 // stop or keep watching. So we always watch it, and create the event as |
364 // manual reset since the object watcher might otherwise reset the event | 369 // manual reset since the object watcher might otherwise reset the event |
365 // when we're doing a WaitForMultipleObjects. | 370 // when we're doing a WaitMany. |
366 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); | 371 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
367 } | 372 } |
368 | 373 |
369 SyncChannel::~SyncChannel() { | 374 SyncChannel::~SyncChannel() { |
370 } | 375 } |
371 | 376 |
372 bool SyncChannel::Send(Message* message) { | 377 bool SyncChannel::Send(Message* message) { |
373 return SendWithTimeout(message, INFINITE); | 378 return SendWithTimeout(message, INFINITE); |
374 } | 379 } |
375 | 380 |
376 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { | 381 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { |
377 if (!message->is_sync()) { | 382 if (!message->is_sync()) { |
378 ChannelProxy::Send(message); | 383 ChannelProxy::Send(message); |
379 return true; | 384 return true; |
380 } | 385 } |
381 | 386 |
382 // *this* might get deleted in WaitForReply. | 387 // *this* might get deleted in WaitForReply. |
383 scoped_refptr<SyncContext> context(sync_context()); | 388 scoped_refptr<SyncContext> context(sync_context()); |
384 if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) { | 389 if (context->shutdown_event()->IsSignaled()) { |
385 delete message; | 390 delete message; |
386 return false; | 391 return false; |
387 } | 392 } |
388 | 393 |
389 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); | 394 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); |
390 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); | 395 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); |
391 context->Push(sync_msg); | 396 context->Push(sync_msg); |
392 int message_id = SyncMessage::GetMessageId(*sync_msg); | 397 int message_id = SyncMessage::GetMessageId(*sync_msg); |
393 HANDLE pump_messages_event = sync_msg->pump_messages_event(); | 398 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); |
394 | 399 |
395 ChannelProxy::Send(message); | 400 ChannelProxy::Send(message); |
396 | 401 |
397 if (timeout_ms != INFINITE) { | 402 if (timeout_ms != INFINITE) { |
398 // We use the sync message id so that when a message times out, we don't | 403 // We use the sync message id so that when a message times out, we don't |
399 // confuse it with another send that is either above/below this Send in | 404 // confuse it with another send that is either above/below this Send in |
400 // the call stack. | 405 // the call stack. |
401 context->ipc_message_loop()->PostDelayedTask(FROM_HERE, | 406 context->ipc_message_loop()->PostDelayedTask(FROM_HERE, |
402 NewRunnableMethod(context.get(), | 407 NewRunnableMethod(context.get(), |
403 &SyncContext::OnSendTimeout, message_id), timeout_ms); | 408 &SyncContext::OnSendTimeout, message_id), timeout_ms); |
404 } | 409 } |
405 | 410 |
406 // Wait for reply, or for any other incoming synchronous messages. | 411 // Wait for reply, or for any other incoming synchronous messages. |
407 WaitForReply(pump_messages_event); | 412 WaitForReply(pump_messages_event); |
408 | 413 |
409 return context->Pop(); | 414 return context->Pop(); |
410 } | 415 } |
411 | 416 |
412 void SyncChannel::WaitForReply(HANDLE pump_messages_event) { | 417 void SyncChannel::WaitForReply(WaitableEvent* pump_messages_event) { |
413 while (true) { | 418 while (true) { |
414 HANDLE objects[] = { sync_context()->GetDispatchEvent(), | 419 WaitableEvent* objects[] = { |
415 sync_context()->GetSendDoneEvent(), | 420 sync_context()->GetDispatchEvent(), |
416 pump_messages_event }; | 421 sync_context()->GetSendDoneEvent(), |
417 uint32 count = pump_messages_event ? 3: 2; | 422 pump_messages_event |
418 DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE); | 423 }; |
419 if (result == WAIT_OBJECT_0) { | 424 |
| 425 unsigned count = pump_messages_event ? 3: 2; |
| 426 unsigned result = WaitableEvent::WaitMany(objects, count); |
| 427 if (result == 0 /* dispatch event */) { |
420 // We're waiting for a reply, but we received a blocking synchronous | 428 // We're waiting for a reply, but we received a blocking synchronous |
421 // call. We must process it or otherwise a deadlock might occur. | 429 // call. We must process it or otherwise a deadlock might occur. |
422 ResetEvent(sync_context()->GetDispatchEvent()); | 430 sync_context()->GetDispatchEvent()->Reset(); |
423 sync_context()->DispatchMessages(); | 431 sync_context()->DispatchMessages(); |
424 continue; | 432 continue; |
425 } | 433 } |
426 | 434 |
427 if (result == WAIT_OBJECT_0 + 2) | 435 if (result == 2 /* pump_messages_event */) |
428 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. | 436 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. |
429 | 437 |
430 break; | 438 break; |
431 } | 439 } |
432 } | 440 } |
433 | 441 |
434 void SyncChannel::WaitForReplyWithNestedMessageLoop() { | 442 void SyncChannel::WaitForReplyWithNestedMessageLoop() { |
435 HANDLE old_done_event = send_done_watcher_.GetWatchedObject(); | 443 WaitableEvent* old_done_event = send_done_watcher_.GetWatchedEvent(); |
436 send_done_watcher_.StopWatching(); | 444 send_done_watcher_.StopWatching(); |
437 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); | 445 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); |
438 bool old_state = MessageLoop::current()->NestableTasksAllowed(); | 446 bool old_state = MessageLoop::current()->NestableTasksAllowed(); |
439 MessageLoop::current()->SetNestableTasksAllowed(true); | 447 MessageLoop::current()->SetNestableTasksAllowed(true); |
440 MessageLoop::current()->Run(); | 448 MessageLoop::current()->Run(); |
441 MessageLoop::current()->SetNestableTasksAllowed(old_state); | 449 MessageLoop::current()->SetNestableTasksAllowed(old_state); |
442 if (old_done_event) | 450 if (old_done_event) |
443 send_done_watcher_.StartWatching(old_done_event, this); | 451 send_done_watcher_.StartWatching(old_done_event, this); |
444 } | 452 } |
445 | 453 |
446 void SyncChannel::OnObjectSignaled(HANDLE object) { | 454 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { |
447 HANDLE dispatch_event = sync_context()->GetDispatchEvent(); | 455 WaitableEvent* dispatch_event = sync_context()->GetDispatchEvent(); |
448 if (object == dispatch_event) { | 456 if (event == dispatch_event) { |
449 // The call to DispatchMessages might delete this object, so reregister | 457 // The call to DispatchMessages might delete this object, so reregister |
450 // the object watcher first. | 458 // the object watcher first. |
451 ResetEvent(dispatch_event); | 459 dispatch_event->Reset(); |
452 dispatch_watcher_.StartWatching(dispatch_event, this); | 460 dispatch_watcher_.StartWatching(dispatch_event, this); |
453 sync_context()->DispatchMessages(); | 461 sync_context()->DispatchMessages(); |
454 } else { | 462 } else { |
455 // We got the reply, timed out or the process shutdown. | 463 // We got the reply, timed out or the process shutdown. |
456 DCHECK(object == sync_context()->GetSendDoneEvent()); | 464 DCHECK(event == sync_context()->GetSendDoneEvent()); |
457 MessageLoop::current()->Quit(); | 465 MessageLoop::current()->Quit(); |
458 } | 466 } |
459 } | 467 } |
460 | 468 |
461 } // namespace IPC | 469 } // namespace IPC |
OLD | NEW |