Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: chrome/common/ipc_sync_channel.cc

Issue 16554: WaitableEvent (Closed)
Patch Set: Addresssing darin's comments (round 2) Created 11 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « chrome/common/ipc_sync_channel.h ('k') | chrome/common/ipc_sync_message.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include <windows.h>
6
7 #include "chrome/common/ipc_sync_channel.h" 5 #include "chrome/common/ipc_sync_channel.h"
8 6
9 #include "base/lazy_instance.h" 7 #include "base/lazy_instance.h"
10 #include "base/logging.h" 8 #include "base/logging.h"
11 #include "base/thread_local.h" 9 #include "base/thread_local.h"
12 #include "chrome/common/child_process.h" 10 #include "base/message_loop.h"
11 #include "base/waitable_event.h"
12 #include "base/waitable_event_watcher.h"
13 #include "chrome/common/ipc_logging.h" 13 #include "chrome/common/ipc_logging.h"
14 #include "chrome/common/ipc_sync_message.h" 14 #include "chrome/common/ipc_sync_message.h"
15 15
16 #if !defined(OS_WIN)
17 #define INFINITE -1
18 #endif
19
16 using base::TimeDelta; 20 using base::TimeDelta;
17 using base::TimeTicks; 21 using base::TimeTicks;
22 using base::WaitableEvent;
18 23
19 namespace IPC { 24 namespace IPC {
20 // When we're blocked in a Send(), we need to process incoming synchronous 25 // When we're blocked in a Send(), we need to process incoming synchronous
21 // messages right away because it could be blocking our reply (either 26 // messages right away because it could be blocking our reply (either
22 // directly from the same object we're calling, or indirectly through one or 27 // directly from the same object we're calling, or indirectly through one or
23 // more other channels). That means that in SyncContext's OnMessageReceived, 28 // more other channels). That means that in SyncContext's OnMessageReceived,
24 // we need to process sync message right away if we're blocked. However a 29 // we need to process sync message right away if we're blocked. However a
25 // simple check isn't sufficient, because the listener thread can be in the 30 // simple check isn't sufficient, because the listener thread can be in the
26 // process of calling Send. 31 // process of calling Send.
27 // To work around this, when SyncChannel filters a sync message, it sets 32 // To work around this, when SyncChannel filters a sync message, it sets
28 // an event that the listener thread waits on during its Send() call. This 33 // an event that the listener thread waits on during its Send() call. This
29 // allows us to dispatch incoming sync messages when blocked. The race 34 // allows us to dispatch incoming sync messages when blocked. The race
30 // condition is handled because if Send is in the process of being called, it 35 // condition is handled because if Send is in the process of being called, it
31 // will check the event. In case the listener thread isn't sending a message, 36 // will check the event. In case the listener thread isn't sending a message,
32 // we queue a task on the listener thread to dispatch the received messages. 37 // we queue a task on the listener thread to dispatch the received messages.
33 // The messages are stored in this queue object that's shared among all 38 // The messages are stored in this queue object that's shared among all
34 // SyncChannel objects on the same thread (since one object can receive a 39 // SyncChannel objects on the same thread (since one object can receive a
35 // sync message while another one is blocked). 40 // sync message while another one is blocked).
36 41
37 class SyncChannel::ReceivedSyncMsgQueue;
38
39 class SyncChannel::ReceivedSyncMsgQueue : 42 class SyncChannel::ReceivedSyncMsgQueue :
40 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { 43 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
41 public: 44 public:
42 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one 45 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
43 // if necessary. Call RemoveContext on the same thread when done. 46 // if necessary. Call RemoveContext on the same thread when done.
44 static ReceivedSyncMsgQueue* AddContext() { 47 static ReceivedSyncMsgQueue* AddContext() {
45 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple 48 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
46 // SyncChannel objects can block the same thread). 49 // SyncChannel objects can block the same thread).
47 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get(); 50 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
48 if (!rv) { 51 if (!rv) {
(...skipping 14 matching lines...) Expand all
63 AutoLock auto_lock(message_lock_); 66 AutoLock auto_lock(message_lock_);
64 67
65 was_task_pending = task_pending_; 68 was_task_pending = task_pending_;
66 task_pending_ = true; 69 task_pending_ = true;
67 70
68 // We set the event in case the listener thread is blocked (or is about 71 // We set the event in case the listener thread is blocked (or is about
69 // to). In case it's not, the PostTask dispatches the messages. 72 // to). In case it's not, the PostTask dispatches the messages.
70 message_queue_.push_back(QueuedMessage(new Message(msg), context)); 73 message_queue_.push_back(QueuedMessage(new Message(msg), context));
71 } 74 }
72 75
73 SetEvent(dispatch_event_); 76 dispatch_event_.Signal();
74 if (!was_task_pending) { 77 if (!was_task_pending) {
75 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( 78 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
76 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); 79 this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
77 } 80 }
78 } 81 }
79 82
80 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { 83 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
81 received_replies_.push_back(QueuedMessage(new Message(msg), context)); 84 received_replies_.push_back(QueuedMessage(new Message(msg), context));
82 } 85 }
83 86
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
136 iter++; 139 iter++;
137 } 140 }
138 } 141 }
139 142
140 if (--listener_count_ == 0) { 143 if (--listener_count_ == 0) {
141 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 144 DCHECK(lazy_tls_ptr_.Pointer()->Get());
142 lazy_tls_ptr_.Pointer()->Set(NULL); 145 lazy_tls_ptr_.Pointer()->Set(NULL);
143 } 146 }
144 } 147 }
145 148
146 HANDLE dispatch_event() { return dispatch_event_; } 149 WaitableEvent* dispatch_event() { return &dispatch_event_; }
147 MessageLoop* listener_message_loop() { return listener_message_loop_; } 150 MessageLoop* listener_message_loop() { return listener_message_loop_; }
148 151
149 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 152 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
150 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 153 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
151 lazy_tls_ptr_; 154 lazy_tls_ptr_;
152 155
153 // Called on the ipc thread to check if we can unblock any current Send() 156 // Called on the ipc thread to check if we can unblock any current Send()
154 // calls based on a queued reply. 157 // calls based on a queued reply.
155 void DispatchReplies() { 158 void DispatchReplies() {
156 for (size_t i = 0; i < received_replies_.size(); ++i) { 159 for (size_t i = 0; i < received_replies_.size(); ++i) {
157 Message* message = received_replies_[i].message; 160 Message* message = received_replies_[i].message;
158 if (received_replies_[i].context->TryToUnblockListener(message)) { 161 if (received_replies_[i].context->TryToUnblockListener(message)) {
159 delete message; 162 delete message;
160 received_replies_.erase(received_replies_.begin() + i); 163 received_replies_.erase(received_replies_.begin() + i);
161 return; 164 return;
162 } 165 }
163 } 166 }
164 } 167 }
165 168
166 private: 169 private:
167 // See the comment in SyncChannel::SyncChannel for why this event is created 170 // See the comment in SyncChannel::SyncChannel for why this event is created
168 // as manual reset. 171 // as manual reset.
169 ReceivedSyncMsgQueue() : 172 ReceivedSyncMsgQueue() :
170 dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)), 173 dispatch_event_(true, false),
174 listener_message_loop_(MessageLoop::current()),
171 task_pending_(false), 175 task_pending_(false),
172 listener_message_loop_(MessageLoop::current()),
173 listener_count_(0) { 176 listener_count_(0) {
174 } 177 }
175 178
176 // Holds information about a queued synchronous message or reply. 179 // Holds information about a queued synchronous message or reply.
177 struct QueuedMessage { 180 struct QueuedMessage {
178 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 181 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
179 Message* message; 182 Message* message;
180 scoped_refptr<SyncChannel::SyncContext> context; 183 scoped_refptr<SyncChannel::SyncContext> context;
181 }; 184 };
182 185
183 typedef std::deque<QueuedMessage> SyncMessageQueue; 186 typedef std::deque<QueuedMessage> SyncMessageQueue;
184 SyncMessageQueue message_queue_; 187 SyncMessageQueue message_queue_;
185 188
186 std::vector<QueuedMessage> received_replies_; 189 std::vector<QueuedMessage> received_replies_;
187 190
188 // Set when we got a synchronous message that we must respond to as the 191 // Set when we got a synchronous message that we must respond to as the
189 // sender needs its reply before it can reply to our original synchronous 192 // sender needs its reply before it can reply to our original synchronous
190 // message. 193 // message.
191 ScopedHandle dispatch_event_; 194 WaitableEvent dispatch_event_;
192 MessageLoop* listener_message_loop_; 195 MessageLoop* listener_message_loop_;
193 Lock message_lock_; 196 Lock message_lock_;
194 bool task_pending_; 197 bool task_pending_;
195 int listener_count_; 198 int listener_count_;
196 }; 199 };
197 200
198 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 201 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
199 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); 202 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED);
200 203
201 SyncChannel::SyncContext::SyncContext( 204 SyncChannel::SyncContext::SyncContext(
202 Channel::Listener* listener, 205 Channel::Listener* listener,
203 MessageFilter* filter, 206 MessageFilter* filter,
204 MessageLoop* ipc_thread, 207 MessageLoop* ipc_thread,
205 HANDLE shutdown_event) 208 WaitableEvent* shutdown_event)
206 : ChannelProxy::Context(listener, filter, ipc_thread), 209 : ChannelProxy::Context(listener, filter, ipc_thread),
207 shutdown_event_(shutdown_event), 210 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
208 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()){ 211 shutdown_event_(shutdown_event) {
209 } 212 }
210 213
211 SyncChannel::SyncContext::~SyncContext() { 214 SyncChannel::SyncContext::~SyncContext() {
212 while (!deserializers_.empty()) 215 while (!deserializers_.empty())
213 Pop(); 216 Pop();
214 } 217 }
215 218
216 // Adds information about an outgoing sync message to the context so that 219 // Adds information about an outgoing sync message to the context so that
217 // we know how to deserialize the reply. Returns a handle that's set when 220 // we know how to deserialize the reply. Returns a handle that's set when
218 // the reply has arrived. 221 // the reply has arrived.
219 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) { 222 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
220 // The event is created as manual reset because in between SetEvent and 223 // The event is created as manual reset because in between Signal and
221 // OnObjectSignalled, another Send can happen which would stop the watcher 224 // OnObjectSignalled, another Send can happen which would stop the watcher
222 // from being called. The event would get watched later, when the nested 225 // from being called. The event would get watched later, when the nested
223 // Send completes, so the event will need to remain set. 226 // Send completes, so the event will need to remain set.
224 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg), 227 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
225 sync_msg->GetReplyDeserializer(), 228 sync_msg->GetReplyDeserializer(),
226 CreateEvent(NULL, TRUE, FALSE, NULL)); 229 new WaitableEvent(true, false));
227 AutoLock auto_lock(deserializers_lock_); 230 AutoLock auto_lock(deserializers_lock_);
228 deserializers_.push_back(pending); 231 deserializers_.push_back(pending);
229 } 232 }
230 233
231 bool SyncChannel::SyncContext::Pop() { 234 bool SyncChannel::SyncContext::Pop() {
232 bool result; 235 bool result;
233 { 236 {
234 AutoLock auto_lock(deserializers_lock_); 237 AutoLock auto_lock(deserializers_lock_);
235 PendingSyncMsg msg = deserializers_.back(); 238 PendingSyncMsg msg = deserializers_.back();
236 delete msg.deserializer; 239 delete msg.deserializer;
237 CloseHandle(msg.done_event); 240 delete msg.done_event;
241 msg.done_event = NULL;
238 deserializers_.pop_back(); 242 deserializers_.pop_back();
239 result = msg.send_result; 243 result = msg.send_result;
240 } 244 }
241 245
242 // We got a reply to a synchronous Send() call that's blocking the listener 246 // We got a reply to a synchronous Send() call that's blocking the listener
243 // thread. However, further down the call stack there could be another 247 // thread. However, further down the call stack there could be another
244 // blocking Send() call, whose reply we received after we made this last 248 // blocking Send() call, whose reply we received after we made this last
245 // Send() call. So check if we have any queued replies available that 249 // Send() call. So check if we have any queued replies available that
246 // can now unblock the listener thread. 250 // can now unblock the listener thread.
247 ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 251 ipc_message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
248 received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies)); 252 received_sync_msgs_.get(), &ReceivedSyncMsgQueue::DispatchReplies));
249 253
250 return result; 254 return result;
251 } 255 }
252 256
253 HANDLE SyncChannel::SyncContext::GetSendDoneEvent() { 257 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
254 AutoLock auto_lock(deserializers_lock_); 258 AutoLock auto_lock(deserializers_lock_);
255 return deserializers_.back().done_event; 259 return deserializers_.back().done_event;
256 } 260 }
257 261
258 HANDLE SyncChannel::SyncContext::GetDispatchEvent() { 262 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
259 return received_sync_msgs_->dispatch_event(); 263 return received_sync_msgs_->dispatch_event();
260 } 264 }
261 265
262 void SyncChannel::SyncContext::DispatchMessages() { 266 void SyncChannel::SyncContext::DispatchMessages() {
263 received_sync_msgs_->DispatchMessages(); 267 received_sync_msgs_->DispatchMessages();
264 } 268 }
265 269
266 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 270 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
267 AutoLock auto_lock(deserializers_lock_); 271 AutoLock auto_lock(deserializers_lock_);
268 if (deserializers_.empty() || 272 if (deserializers_.empty() ||
269 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 273 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
270 return false; 274 return false;
271 } 275 }
272 276
273 if (!msg->is_reply_error()) { 277 if (!msg->is_reply_error()) {
274 deserializers_.back().send_result = deserializers_.back().deserializer-> 278 deserializers_.back().send_result = deserializers_.back().deserializer->
275 SerializeOutputParameters(*msg); 279 SerializeOutputParameters(*msg);
276 } 280 }
277 SetEvent(deserializers_.back().done_event); 281 deserializers_.back().done_event->Signal();
278 282
279 return true; 283 return true;
280 } 284 }
281 285
282 void SyncChannel::SyncContext::Clear() { 286 void SyncChannel::SyncContext::Clear() {
283 CancelPendingSends(); 287 CancelPendingSends();
284 received_sync_msgs_->RemoveContext(this); 288 received_sync_msgs_->RemoveContext(this);
285 289
286 Context::Clear(); 290 Context::Clear();
287 } 291 }
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
320 void SyncChannel::SyncContext::OnChannelClosed() { 324 void SyncChannel::SyncContext::OnChannelClosed() {
321 shutdown_watcher_.StopWatching(); 325 shutdown_watcher_.StopWatching();
322 Context::OnChannelClosed(); 326 Context::OnChannelClosed();
323 } 327 }
324 328
325 void SyncChannel::SyncContext::OnSendTimeout(int message_id) { 329 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
326 AutoLock auto_lock(deserializers_lock_); 330 AutoLock auto_lock(deserializers_lock_);
327 PendingSyncMessageQueue::iterator iter; 331 PendingSyncMessageQueue::iterator iter;
328 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) { 332 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
329 if (iter->id == message_id) { 333 if (iter->id == message_id) {
330 SetEvent(iter->done_event); 334 iter->done_event->Signal();
331 break; 335 break;
332 } 336 }
333 } 337 }
334 } 338 }
335 339
336 void SyncChannel::SyncContext::CancelPendingSends() { 340 void SyncChannel::SyncContext::CancelPendingSends() {
337 AutoLock auto_lock(deserializers_lock_); 341 AutoLock auto_lock(deserializers_lock_);
338 PendingSyncMessageQueue::iterator iter; 342 PendingSyncMessageQueue::iterator iter;
339 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) 343 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
340 SetEvent(iter->done_event); 344 iter->done_event->Signal();
341 } 345 }
342 346
343 void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) { 347 void SyncChannel::SyncContext::OnWaitableEventSignaled(WaitableEvent* event) {
344 DCHECK(object == shutdown_event_); 348 DCHECK(event == shutdown_event_);
345 // Process shut down before we can get a reply to a synchronous message. 349 // Process shut down before we can get a reply to a synchronous message.
346 // Cancel pending Send calls, which will end up setting the send done event. 350 // Cancel pending Send calls, which will end up setting the send done event.
347 CancelPendingSends(); 351 CancelPendingSends();
348 } 352 }
349 353
350 354
351 SyncChannel::SyncChannel( 355 SyncChannel::SyncChannel(
352 const std::wstring& channel_id, Channel::Mode mode, 356 const std::wstring& channel_id, Channel::Mode mode,
353 Channel::Listener* listener, MessageFilter* filter, 357 Channel::Listener* listener, MessageFilter* filter,
354 MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event) 358 MessageLoop* ipc_message_loop, bool create_pipe_now,
359 WaitableEvent* shutdown_event)
355 : ChannelProxy( 360 : ChannelProxy(
356 channel_id, mode, ipc_message_loop, 361 channel_id, mode, ipc_message_loop,
357 new SyncContext(listener, filter, ipc_message_loop, shutdown_event), 362 new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
358 create_pipe_now), 363 create_pipe_now),
359 sync_messages_with_no_timeout_allowed_(true) { 364 sync_messages_with_no_timeout_allowed_(true) {
360 // Ideally we only want to watch this object when running a nested message 365 // Ideally we only want to watch this object when running a nested message
361 // loop. However, we don't know when it exits if there's another nested 366 // loop. However, we don't know when it exits if there's another nested
362 // message loop running under it or not, so we wouldn't know whether to 367 // message loop running under it or not, so we wouldn't know whether to
363 // stop or keep watching. So we always watch it, and create the event as 368 // stop or keep watching. So we always watch it, and create the event as
364 // manual reset since the object watcher might otherwise reset the event 369 // manual reset since the object watcher might otherwise reset the event
365 // when we're doing a WaitForMultipleObjects. 370 // when we're doing a WaitMany.
366 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); 371 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
367 } 372 }
368 373
369 SyncChannel::~SyncChannel() { 374 SyncChannel::~SyncChannel() {
370 } 375 }
371 376
372 bool SyncChannel::Send(Message* message) { 377 bool SyncChannel::Send(Message* message) {
373 return SendWithTimeout(message, INFINITE); 378 return SendWithTimeout(message, INFINITE);
374 } 379 }
375 380
376 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { 381 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
377 if (!message->is_sync()) { 382 if (!message->is_sync()) {
378 ChannelProxy::Send(message); 383 ChannelProxy::Send(message);
379 return true; 384 return true;
380 } 385 }
381 386
382 // *this* might get deleted in WaitForReply. 387 // *this* might get deleted in WaitForReply.
383 scoped_refptr<SyncContext> context(sync_context()); 388 scoped_refptr<SyncContext> context(sync_context());
384 if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) { 389 if (context->shutdown_event()->IsSignaled()) {
385 delete message; 390 delete message;
386 return false; 391 return false;
387 } 392 }
388 393
389 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE); 394 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
390 SyncMessage* sync_msg = static_cast<SyncMessage*>(message); 395 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
391 context->Push(sync_msg); 396 context->Push(sync_msg);
392 int message_id = SyncMessage::GetMessageId(*sync_msg); 397 int message_id = SyncMessage::GetMessageId(*sync_msg);
393 HANDLE pump_messages_event = sync_msg->pump_messages_event(); 398 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
394 399
395 ChannelProxy::Send(message); 400 ChannelProxy::Send(message);
396 401
397 if (timeout_ms != INFINITE) { 402 if (timeout_ms != INFINITE) {
398 // We use the sync message id so that when a message times out, we don't 403 // We use the sync message id so that when a message times out, we don't
399 // confuse it with another send that is either above/below this Send in 404 // confuse it with another send that is either above/below this Send in
400 // the call stack. 405 // the call stack.
401 context->ipc_message_loop()->PostDelayedTask(FROM_HERE, 406 context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
402 NewRunnableMethod(context.get(), 407 NewRunnableMethod(context.get(),
403 &SyncContext::OnSendTimeout, message_id), timeout_ms); 408 &SyncContext::OnSendTimeout, message_id), timeout_ms);
404 } 409 }
405 410
406 // Wait for reply, or for any other incoming synchronous messages. 411 // Wait for reply, or for any other incoming synchronous messages.
407 WaitForReply(pump_messages_event); 412 WaitForReply(pump_messages_event);
408 413
409 return context->Pop(); 414 return context->Pop();
410 } 415 }
411 416
412 void SyncChannel::WaitForReply(HANDLE pump_messages_event) { 417 void SyncChannel::WaitForReply(WaitableEvent* pump_messages_event) {
413 while (true) { 418 while (true) {
414 HANDLE objects[] = { sync_context()->GetDispatchEvent(), 419 WaitableEvent* objects[] = {
415 sync_context()->GetSendDoneEvent(), 420 sync_context()->GetDispatchEvent(),
416 pump_messages_event }; 421 sync_context()->GetSendDoneEvent(),
417 uint32 count = pump_messages_event ? 3: 2; 422 pump_messages_event
418 DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE); 423 };
419 if (result == WAIT_OBJECT_0) { 424
425 unsigned count = pump_messages_event ? 3: 2;
426 unsigned result = WaitableEvent::WaitMany(objects, count);
427 if (result == 0 /* dispatch event */) {
420 // We're waiting for a reply, but we received a blocking synchronous 428 // We're waiting for a reply, but we received a blocking synchronous
421 // call. We must process it or otherwise a deadlock might occur. 429 // call. We must process it or otherwise a deadlock might occur.
422 ResetEvent(sync_context()->GetDispatchEvent()); 430 sync_context()->GetDispatchEvent()->Reset();
423 sync_context()->DispatchMessages(); 431 sync_context()->DispatchMessages();
424 continue; 432 continue;
425 } 433 }
426 434
427 if (result == WAIT_OBJECT_0 + 2) 435 if (result == 2 /* pump_messages_event */)
428 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop. 436 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop.
429 437
430 break; 438 break;
431 } 439 }
432 } 440 }
433 441
434 void SyncChannel::WaitForReplyWithNestedMessageLoop() { 442 void SyncChannel::WaitForReplyWithNestedMessageLoop() {
435 HANDLE old_done_event = send_done_watcher_.GetWatchedObject(); 443 WaitableEvent* old_done_event = send_done_watcher_.GetWatchedEvent();
436 send_done_watcher_.StopWatching(); 444 send_done_watcher_.StopWatching();
437 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this); 445 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
438 bool old_state = MessageLoop::current()->NestableTasksAllowed(); 446 bool old_state = MessageLoop::current()->NestableTasksAllowed();
439 MessageLoop::current()->SetNestableTasksAllowed(true); 447 MessageLoop::current()->SetNestableTasksAllowed(true);
440 MessageLoop::current()->Run(); 448 MessageLoop::current()->Run();
441 MessageLoop::current()->SetNestableTasksAllowed(old_state); 449 MessageLoop::current()->SetNestableTasksAllowed(old_state);
442 if (old_done_event) 450 if (old_done_event)
443 send_done_watcher_.StartWatching(old_done_event, this); 451 send_done_watcher_.StartWatching(old_done_event, this);
444 } 452 }
445 453
446 void SyncChannel::OnObjectSignaled(HANDLE object) { 454 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
447 HANDLE dispatch_event = sync_context()->GetDispatchEvent(); 455 WaitableEvent* dispatch_event = sync_context()->GetDispatchEvent();
448 if (object == dispatch_event) { 456 if (event == dispatch_event) {
449 // The call to DispatchMessages might delete this object, so reregister 457 // The call to DispatchMessages might delete this object, so reregister
450 // the object watcher first. 458 // the object watcher first.
451 ResetEvent(dispatch_event); 459 dispatch_event->Reset();
452 dispatch_watcher_.StartWatching(dispatch_event, this); 460 dispatch_watcher_.StartWatching(dispatch_event, this);
453 sync_context()->DispatchMessages(); 461 sync_context()->DispatchMessages();
454 } else { 462 } else {
455 // We got the reply, timed out or the process shutdown. 463 // We got the reply, timed out or the process shutdown.
456 DCHECK(object == sync_context()->GetSendDoneEvent()); 464 DCHECK(event == sync_context()->GetSendDoneEvent());
457 MessageLoop::current()->Quit(); 465 MessageLoop::current()->Quit();
458 } 466 }
459 } 467 }
460 468
461 } // namespace IPC 469 } // namespace IPC
OLDNEW
« no previous file with comments | « chrome/common/ipc_sync_channel.h ('k') | chrome/common/ipc_sync_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698