Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(141)

Side by Side Diff: chrome/common/ipc_sync_channel.cc

Issue 8001: Make IPC::SyncChannel not duplicate the underlying MessageLoop implementation... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 12 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « chrome/common/ipc_sync_channel.h ('k') | chrome/common/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include <windows.h> 5 #include <windows.h>
6 6
7 #include "chrome/common/ipc_sync_channel.h" 7 #include "chrome/common/ipc_sync_channel.h"
8 8
9 #include "base/lazy_instance.h" 9 #include "base/lazy_instance.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
(...skipping 19 matching lines...) Expand all
30 // we queue a task on the listener thread to dispatch the received messages. 30 // we queue a task on the listener thread to dispatch the received messages.
31 // The messages are stored in this queue object that's shared among all 31 // The messages are stored in this queue object that's shared among all
32 // SyncChannel objects on the same thread (since one object can receive a 32 // SyncChannel objects on the same thread (since one object can receive a
33 // sync message while another one is blocked). 33 // sync message while another one is blocked).
34 34
35 class SyncChannel::ReceivedSyncMsgQueue; 35 class SyncChannel::ReceivedSyncMsgQueue;
36 36
37 class SyncChannel::ReceivedSyncMsgQueue : 37 class SyncChannel::ReceivedSyncMsgQueue :
38 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> { 38 public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
39 public: 39 public:
40 ReceivedSyncMsgQueue() : 40 // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
41 blocking_event_(CreateEvent(NULL, FALSE, FALSE, NULL)), 41 // if necessary. Call RemoveListener on the same thread when done.
42 task_pending_(false), 42 static ReceivedSyncMsgQueue* AddListener() {
43 listener_message_loop_(MessageLoop::current()) { 43 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
44 // SyncChannel objects can block the same thread).
45 ReceivedSyncMsgQueue* rv = lazy_tls_ptr_.Pointer()->Get();
46 if (!rv) {
47 rv = new ReceivedSyncMsgQueue();
48 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(rv);
49 }
50 rv->listener_count_++;
51 return rv;
44 } 52 }
45 53
46 ~ReceivedSyncMsgQueue() { 54 ~ReceivedSyncMsgQueue() {
47 DCHECK(lazy_tls_ptr_.Pointer()->Get());
48 DCHECK(MessageLoop::current() == listener_message_loop_);
49 CloseHandle(blocking_event_);
50 lazy_tls_ptr_.Pointer()->Set(NULL);
51 } 55 }
52 56
53 // Called on IPC thread when a synchronous message or reply arrives. 57 // Called on IPC thread when a synchronous message or reply arrives.
54 void QueueMessage(const Message& msg, Channel::Listener* listener, 58 void QueueMessage(const Message& msg, Channel::Listener* listener,
55 const std::wstring& channel_id) { 59 const std::wstring& channel_id) {
56 bool was_task_pending; 60 bool was_task_pending;
57 { 61 {
58 AutoLock auto_lock(message_lock_); 62 AutoLock auto_lock(message_lock_);
59 63
60 was_task_pending = task_pending_; 64 was_task_pending = task_pending_;
61 task_pending_ = true; 65 task_pending_ = true;
62 66
63 // We set the event in case the listener thread is blocked (or is about 67 // We set the event in case the listener thread is blocked (or is about
64 // to). In case it's not, the PostTask dispatches the messages. 68 // to). In case it's not, the PostTask dispatches the messages.
65 message_queue_.push(ReceivedMessage(new Message(msg), listener, 69 message_queue_.push(ReceivedMessage(new Message(msg), listener,
66 channel_id)); 70 channel_id));
67 } 71 }
68 72
69 SetEvent(blocking_event_); 73 SetEvent(dispatch_event_);
70 if (!was_task_pending) { 74 if (!was_task_pending) {
71 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( 75 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
72 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); 76 this, &ReceivedSyncMsgQueue::DispatchMessagesTask));
73 } 77 }
74 } 78 }
75 79
76 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { 80 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
77 received_replies_.push_back(Reply(new Message(msg), context)); 81 received_replies_.push_back(Reply(new Message(msg), context));
78 } 82 }
79 83
(...skipping 18 matching lines...) Expand all
98 break; 102 break;
99 103
100 ReceivedMessage& blocking_msg = message_queue_.front(); 104 ReceivedMessage& blocking_msg = message_queue_.front();
101 message = blocking_msg.message; 105 message = blocking_msg.message;
102 listener = blocking_msg.listener; 106 listener = blocking_msg.listener;
103 channel_id = blocking_msg.channel_id; 107 channel_id = blocking_msg.channel_id;
104 message_queue_.pop(); 108 message_queue_.pop();
105 } 109 }
106 110
107 #ifdef IPC_MESSAGE_LOG_ENABLED 111 #ifdef IPC_MESSAGE_LOG_ENABLED
108 IPC::Logging* logger = IPC::Logging::current(); 112 Logging* logger = Logging::current();
109 if (logger->Enabled()) 113 if (logger->Enabled())
110 logger->OnPreDispatchMessage(*message); 114 logger->OnPreDispatchMessage(*message);
111 #endif 115 #endif
112 116
113 if (listener) 117 if (listener)
114 listener->OnMessageReceived(*message); 118 listener->OnMessageReceived(*message);
115 119
116 #ifdef IPC_MESSAGE_LOG_ENABLED 120 #ifdef IPC_MESSAGE_LOG_ENABLED
117 if (logger->Enabled()) 121 if (logger->Enabled())
118 logger->OnPostDispatchMessage(*message, channel_id); 122 logger->OnPostDispatchMessage(*message, channel_id);
119 #endif 123 #endif
120 124
121 delete message; 125 delete message;
122 } 126 }
123 } 127 }
124 128
125 // Called on the IPC thread when the current sync Send() call is unblocked. 129 // Called on the IPC thread when the current sync Send() call is unblocked.
126 void OnUnblock() { 130 void DidUnblock() {
127 if (!received_replies_.empty()) { 131 if (!received_replies_.empty()) {
128 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod( 132 MessageLoop::current()->PostTask(FROM_HERE, NewRunnableMethod(
129 this, &ReceivedSyncMsgQueue::DispatchReplies)); 133 this, &ReceivedSyncMsgQueue::DispatchReplies));
130 } 134 }
131 } 135 }
132 136
133 // SyncChannel calls this in its destructor. 137 // SyncChannel calls this in its destructor.
134 void RemoveListener(Channel::Listener* listener) { 138 void RemoveListener(Channel::Listener* listener) {
135 AutoLock auto_lock(message_lock_); 139 AutoLock auto_lock(message_lock_);
136 140
137 SyncMessageQueue temp_queue; 141 SyncMessageQueue temp_queue;
138 while (!message_queue_.empty()) { 142 while (!message_queue_.empty()) {
139 if (message_queue_.front().listener != listener) { 143 if (message_queue_.front().listener != listener) {
140 temp_queue.push(message_queue_.front()); 144 temp_queue.push(message_queue_.front());
141 } else { 145 } else {
142 delete message_queue_.front().message; 146 delete message_queue_.front().message;
143 } 147 }
144 148
145 message_queue_.pop(); 149 message_queue_.pop();
146 } 150 }
147 151
148 while (!temp_queue.empty()) { 152 while (!temp_queue.empty()) {
149 message_queue_.push(temp_queue.front()); 153 message_queue_.push(temp_queue.front());
150 temp_queue.pop(); 154 temp_queue.pop();
151 } 155 }
156
157 if (--listener_count_ == 0) {
158 DCHECK(lazy_tls_ptr_.Pointer()->Get());
159 lazy_tls_ptr_.Pointer()->Set(NULL);
160 }
152 } 161 }
153 162
154 HANDLE blocking_event() { return blocking_event_; } 163 HANDLE dispatch_event() { return dispatch_event_; }
155 MessageLoop* listener_message_loop() { return listener_message_loop_; } 164 MessageLoop* listener_message_loop() { return listener_message_loop_; }
156 165
157 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 166 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
158 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 167 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
159 lazy_tls_ptr_; 168 lazy_tls_ptr_;
160 169
161 private: 170 private:
171 ReceivedSyncMsgQueue() :
172 dispatch_event_(CreateEvent(NULL, TRUE, FALSE, NULL)),
173 task_pending_(false),
174 listener_message_loop_(MessageLoop::current()),
175 listener_count_(0) {
176 }
177
162 // Called on the ipc thread to check if we can unblock any current Send() 178 // Called on the ipc thread to check if we can unblock any current Send()
163 // calls based on a queued reply. 179 // calls based on a queued reply.
164 void DispatchReplies() { 180 void DispatchReplies() {
165 for (size_t i = 0; i < received_replies_.size(); ++i) { 181 for (size_t i = 0; i < received_replies_.size(); ++i) {
166 Message* message = received_replies_[i].message; 182 Message* message = received_replies_[i].message;
167 if (received_replies_[i].context->UnblockListener(message)) { 183 if (received_replies_[i].context->TryToUnblockListener(message)) {
168 delete message; 184 delete message;
169 received_replies_.erase(received_replies_.begin() + i); 185 received_replies_.erase(received_replies_.begin() + i);
170 return; 186 return;
171 } 187 }
172 } 188 }
173 } 189 }
174 190
175 // Set when we got a synchronous message that we must respond to as the
176 // sender needs its reply before it can reply to our original synchronous
177 // message.
178 HANDLE blocking_event_;
179
180 MessageLoop* listener_message_loop_;
181
182 // Holds information about a queued synchronous message. 191 // Holds information about a queued synchronous message.
183 struct ReceivedMessage { 192 struct ReceivedMessage {
184 ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i) 193 ReceivedMessage(Message* m, Channel::Listener* l, const std::wstring& i)
185 : message(m), listener(l), channel_id(i) { } 194 : message(m), listener(l), channel_id(i) { }
186 Message* message; 195 Message* message;
187 Channel::Listener* listener; 196 Channel::Listener* listener;
188 std::wstring channel_id; 197 std::wstring channel_id;
189 }; 198 };
190 199
191 typedef std::queue<ReceivedMessage> SyncMessageQueue; 200 typedef std::queue<ReceivedMessage> SyncMessageQueue;
192 SyncMessageQueue message_queue_; 201 SyncMessageQueue message_queue_;
193 Lock message_lock_;
194 bool task_pending_;
195 202
196 // Holds information about a queued reply message. 203 // Holds information about a queued reply message.
197 struct Reply { 204 struct Reply {
198 Reply(Message* m, SyncChannel::SyncContext* c) 205 Reply(Message* m, SyncChannel::SyncContext* c)
199 : message(m), 206 : message(m),
200 context(c) { } 207 context(c) { }
201 208
202 Message* message; 209 Message* message;
203 scoped_refptr<SyncChannel::SyncContext> context; 210 scoped_refptr<SyncChannel::SyncContext> context;
204 }; 211 };
205 212
206 std::vector<Reply> received_replies_; 213 std::vector<Reply> received_replies_;
214
215 // Set when we got a synchronous message that we must respond to as the
216 // sender needs its reply before it can reply to our original synchronous
217 // message.
218 ScopedHandle dispatch_event_;
219 MessageLoop* listener_message_loop_;
220 Lock message_lock_;
221 bool task_pending_;
222 int listener_count_;
207 }; 223 };
208 224
209 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 225 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
210 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); 226 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED);
211 227
212 SyncChannel::SyncContext::SyncContext( 228 SyncChannel::SyncContext::SyncContext(
213 Channel::Listener* listener, 229 Channel::Listener* listener,
214 MessageFilter* filter, 230 MessageFilter* filter,
215 MessageLoop* ipc_thread) 231 MessageLoop* ipc_thread,
232 HANDLE shutdown_event)
216 : ChannelProxy::Context(listener, filter, ipc_thread), 233 : ChannelProxy::Context(listener, filter, ipc_thread),
217 channel_closed_(false), 234 shutdown_event_(shutdown_event),
218 reply_deserialize_result_(false) { 235 received_sync_msgs_(ReceivedSyncMsgQueue::AddListener()){
219 // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
220 // SyncChannel objects that can block the same thread).
221 received_sync_msgs_ = ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Get();
222
223 if (!received_sync_msgs_) {
224 // Stash a pointer to the listener thread's ReceivedSyncMsgQueue, as we
225 // need to be able to access it in the IPC thread.
226 received_sync_msgs_ = new ReceivedSyncMsgQueue();
227 ReceivedSyncMsgQueue::lazy_tls_ptr_.Pointer()->Set(received_sync_msgs_);
228 }
229
230 // Addref manually so that we can ensure destruction on the listener thread
231 // (so that the TLS object is NULLd).
232 received_sync_msgs_->AddRef();
233 } 236 }
234 237
235 SyncChannel::SyncContext::~SyncContext() { 238 SyncChannel::SyncContext::~SyncContext() {
236 while (!deserializers_.empty()) 239 while (!deserializers_.empty())
237 PopDeserializer(true); 240 Pop();
238
239 received_sync_msgs_->listener_message_loop()->ReleaseSoon(
240 FROM_HERE, received_sync_msgs_);
241 } 241 }
242 242
243 // Adds information about an outgoing sync message to the context so that 243 // Adds information about an outgoing sync message to the context so that
244 // we know how to deserialize the reply. Returns a handle that's set when 244 // we know how to deserialize the reply. Returns a handle that's set when
245 // the reply has arrived. 245 // the reply has arrived.
246 HANDLE SyncChannel::SyncContext::Push(IPC::SyncMessage* sync_msg) { 246 void SyncChannel::SyncContext::Push(SyncMessage* sync_msg) {
247 PendingSyncMsg pending(IPC::SyncMessage::GetMessageId(*sync_msg), 247 PendingSyncMsg pending(SyncMessage::GetMessageId(*sync_msg),
248 sync_msg->GetReplyDeserializer(), 248 sync_msg->GetReplyDeserializer(),
249 CreateEvent(NULL, FALSE, FALSE, NULL)); 249 CreateEvent(NULL, FALSE, FALSE, NULL));
250 AutoLock auto_lock(deserializers_lock_); 250 AutoLock auto_lock(deserializers_lock_);
251 deserializers_.push(pending); 251 deserializers_.push_back(pending);
252
253 return pending.reply_event;
254 } 252 }
255 253
256 HANDLE SyncChannel::SyncContext::blocking_event() { 254 bool SyncChannel::SyncContext::Pop() {
257 return received_sync_msgs_->blocking_event(); 255 AutoLock auto_lock(deserializers_lock_);
256 PendingSyncMsg msg = deserializers_.back();
257 delete msg.deserializer;
258 CloseHandle(msg.done_event);
259 deserializers_.pop_back();
260 return msg.send_result;
261 }
262
263 HANDLE SyncChannel::SyncContext::GetSendDoneEvent() {
264 AutoLock auto_lock(deserializers_lock_);
265 return deserializers_.back().done_event;
266 }
267
268 HANDLE SyncChannel::SyncContext::GetDispatchEvent() {
269 return received_sync_msgs_->dispatch_event();
258 } 270 }
259 271
260 void SyncChannel::SyncContext::DispatchMessages() { 272 void SyncChannel::SyncContext::DispatchMessages() {
261 received_sync_msgs_->DispatchMessages(); 273 received_sync_msgs_->DispatchMessages();
262 } 274 }
263 275
264 void SyncChannel::SyncContext::RemoveListener(Channel::Listener* listener) { 276 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
265 received_sync_msgs_->RemoveListener(listener); 277 {
266 } 278 AutoLock auto_lock(deserializers_lock_);
279 if (deserializers_.empty() ||
280 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
281 return false;
282 }
267 283
268 bool SyncChannel::SyncContext::UnblockListener(const Message* msg) { 284 if (!msg->is_reply_error()) {
269 bool rv = false; 285 deserializers_.back().send_result = deserializers_.back().deserializer->
270 HANDLE reply_event = NULL; 286 SerializeOutputParameters(*msg);
271 {
272 if (channel_closed_) {
273 // The channel is closed, or we couldn't connect, so cancel all Send()
274 // calls.
275 reply_deserialize_result_ = false;
276 {
277 AutoLock auto_lock(deserializers_lock_);
278 if (!deserializers_.empty())
279 reply_event = deserializers_.top().reply_event;
280 }
281
282 if (reply_event)
283 PopDeserializer(false);
284 } else {
285 {
286 AutoLock auto_lock(deserializers_lock_);
287 if (deserializers_.empty())
288 return false;
289
290 if (!IPC::SyncMessage::IsMessageReplyTo(*msg, deserializers_.top().id))
291 return false;
292
293 rv = true;
294 if (msg->is_reply_error()) {
295 reply_deserialize_result_ = false;
296 } else {
297 reply_deserialize_result_ = deserializers_.top().deserializer->
298 SerializeOutputParameters(*msg);
299 }
300
301 // Can't CloseHandle the event just yet, since doing so might cause the
302 // Wait call above to never return.
303 reply_event = deserializers_.top().reply_event;
304 }
305 PopDeserializer(false);
306 } 287 }
288 SetEvent(deserializers_.back().done_event);
307 } 289 }
308 290
309 if (reply_event)
310 SetEvent(reply_event);
311
312 // We got a reply to a synchronous Send() call that's blocking the listener 291 // We got a reply to a synchronous Send() call that's blocking the listener
313 // thread. However, further down the call stack there could be another 292 // thread. However, further down the call stack there could be another
314 // blocking Send() call, whose reply we received after we made this last 293 // blocking Send() call, whose reply we received after we made this last
315 // Send() call. So check if we have any queued replies available that 294 // Send() call. So check if we have any queued replies available that
316 // can now unblock the listener thread. 295 // can now unblock the listener thread.
317 received_sync_msgs_->OnUnblock(); 296 received_sync_msgs_->DidUnblock();
318 297
319 return rv; 298 return true;
320 } 299 }
321 300
322 // Called on the IPC thread. 301 void SyncChannel::SyncContext::Clear() {
302 CancelPendingSends();
303 received_sync_msgs_->RemoveListener(listener());
304
305 Context::Clear();
306 }
307
323 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) { 308 void SyncChannel::SyncContext::OnMessageReceived(const Message& msg) {
324 // Give the filters a chance at processing this message. 309 // Give the filters a chance at processing this message.
325 if (TryFilters(msg)) 310 if (TryFilters(msg))
326 return; 311 return;
327 312
328 if (UnblockListener(&msg)) 313 if (TryToUnblockListener(&msg))
329 return; 314 return;
330 315
331 if (msg.should_unblock()) { 316 if (msg.should_unblock()) {
332 received_sync_msgs_->QueueMessage(msg, listener(), channel_id()); 317 received_sync_msgs_->QueueMessage(msg, listener(), channel_id());
333 return; 318 return;
334 } 319 }
335 320
336 if (msg.is_reply()) { 321 if (msg.is_reply()) {
337 received_sync_msgs_->QueueReply(msg, this); 322 received_sync_msgs_->QueueReply(msg, this);
338 return; 323 return;
339 } 324 }
340 325
341 return Context::OnMessageReceived(msg); 326 return Context::OnMessageReceivedNoFilter(msg);
342 } 327 }
343 328
344 // Called on the IPC thread.
345 void SyncChannel::SyncContext::OnChannelError() { 329 void SyncChannel::SyncContext::OnChannelError() {
346 channel_closed_ = true; 330 CancelPendingSends();
347 UnblockListener(NULL);
348
349 Context::OnChannelError(); 331 Context::OnChannelError();
350 } 332 }
351 333
352 void SyncChannel::SyncContext::PopDeserializer(bool close_reply_event) { 334 void SyncChannel::SyncContext::OnChannelOpened() {
353 PendingSyncMsg msg = deserializers_.top(); 335 shutdown_watcher_.StartWatching(shutdown_event_, this);
354 delete msg.deserializer; 336 Context::OnChannelOpened();
355 if (close_reply_event)
356 CloseHandle(msg.reply_event);
357 deserializers_.pop();
358 } 337 }
359 338
360 SyncChannel::SyncChannel(const std::wstring& channel_id, Channel::Mode mode, 339 void SyncChannel::SyncContext::OnChannelClosed() {
361 Channel::Listener* listener, MessageFilter* filter, 340 shutdown_watcher_.StopWatching();
362 MessageLoop* ipc_message_loop, 341 Context::OnChannelClosed();
363 bool create_pipe_now, HANDLE shutdown_event) 342 }
364 : ChannelProxy(channel_id, mode, ipc_message_loop, 343
365 new SyncContext(listener, filter, ipc_message_loop), 344 void SyncChannel::SyncContext::OnSendTimeout(int message_id) {
366 create_pipe_now), 345 AutoLock auto_lock(deserializers_lock_);
367 shutdown_event_(shutdown_event), 346 PendingSyncMessageQueue::iterator iter;
347 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++) {
348 if ((*iter).id == message_id) {
349 SetEvent((*iter).done_event);
350 break;
351 }
352 }
353 }
354
355 void SyncChannel::SyncContext::CancelPendingSends() {
356 AutoLock auto_lock(deserializers_lock_);
357 PendingSyncMessageQueue::iterator iter;
358 for (iter = deserializers_.begin(); iter != deserializers_.end(); iter++)
359 SetEvent((*iter).done_event);
360 }
361
362 void SyncChannel::SyncContext::OnObjectSignaled(HANDLE object) {
363 DCHECK(object == shutdown_event_);
364 // Process shut down before we can get a reply to a synchronous message.
365 // Cancel pending Send calls, which will end up setting the send done event.
366 CancelPendingSends();
367 }
368
369
370 SyncChannel::SyncChannel(
371 const std::wstring& channel_id, Channel::Mode mode,
372 Channel::Listener* listener, MessageFilter* filter,
373 MessageLoop* ipc_message_loop, bool create_pipe_now, HANDLE shutdown_event)
374 : ChannelProxy(
375 channel_id, mode, ipc_message_loop,
376 new SyncContext(listener, filter, ipc_message_loop, shutdown_event),
377 create_pipe_now),
368 sync_messages_with_no_timeout_allowed_(true) { 378 sync_messages_with_no_timeout_allowed_(true) {
369 DCHECK(shutdown_event_ != NULL); 379 // Ideally we only want to watch this object when running a nested message
380 // loop. However, we don't know when it exits if there's another nested
381 // message loop running under it or not, so we wouldn't know whether to
382 // stop or keep watching. So we always watch it, and create the event as
383 // manual reset since the object watcher might otherwise reset the event
384 // when we're doing a WaitForMultipleObjects.
385 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
370 } 386 }
371 387
372 SyncChannel::~SyncChannel() { 388 SyncChannel::~SyncChannel() {
373 // The listener ensures that its lifetime is greater than SyncChannel. But
374 // after SyncChannel is destructed there's no guarantee that the listener is
375 // still around, so we wouldn't want ReceivedSyncMsgQueue to call the
376 // listener.
377 sync_context()->RemoveListener(listener());
378 } 389 }
379 390
380 bool SyncChannel::Send(IPC::Message* message) { 391 bool SyncChannel::Send(Message* message) {
381 return SendWithTimeout(message, INFINITE); 392 return SendWithTimeout(message, INFINITE);
382 } 393 }
383 394
384 bool SyncChannel::SendWithTimeout(IPC::Message* message, int timeout_ms) { 395 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
385 bool message_is_sync = message->is_sync(); 396 if (!message->is_sync()) {
386 HANDLE pump_messages_event = NULL; 397 ChannelProxy::Send(message);
387 398 return true;
388 HANDLE reply_event = NULL;
389 if (message_is_sync) {
390 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
391 IPC::SyncMessage* sync_msg = static_cast<IPC::SyncMessage*>(message);
392 reply_event = sync_context()->Push(sync_msg);
393 pump_messages_event = sync_msg->pump_messages_event();
394 } 399 }
395 400
396 // Send the message using the ChannelProxy 401 // *this* might get deleted in WaitForReply.
402 scoped_refptr<SyncContext> context(sync_context());
403 if (WaitForSingleObject(context->shutdown_event(), 0) == WAIT_OBJECT_0) {
404 delete message;
405 return false;
406 }
407
408 DCHECK(sync_messages_with_no_timeout_allowed_ || timeout_ms != INFINITE);
409 SyncMessage* sync_msg = static_cast<SyncMessage*>(message);
410 context->Push(sync_msg);
411 int message_id = SyncMessage::GetMessageId(*sync_msg);
412 HANDLE pump_messages_event = sync_msg->pump_messages_event();
413
397 ChannelProxy::Send(message); 414 ChannelProxy::Send(message);
398 if (!message_is_sync)
399 return true;
400 415
401 do { 416 if (timeout_ms != INFINITE) {
402 // Wait for reply, or for any other incoming synchronous message. 417 // We use the sync message id so that when a message times out, we don't
403 DCHECK(reply_event != NULL); 418 // confuse it with another send that is either above/below this Send in
404 HANDLE objects[] = { shutdown_event_, 419 // the call stack.
405 reply_event, 420 context->ipc_message_loop()->PostDelayedTask(FROM_HERE,
406 sync_context()->blocking_event(), 421 NewRunnableMethod(context.get(),
407 pump_messages_event}; 422 &SyncContext::OnSendTimeout, message_id), timeout_ms);
423 }
408 424
409 DWORD result; 425 // Wait for reply, or for any other incoming synchronous messages.
410 TimeTicks before = TimeTicks::Now(); 426 WaitForReply(pump_messages_event);
411 if (pump_messages_event == NULL) { 427
412 // No need to pump messages since we didn't get an event to check. 428 return context->Pop();
413 result = WaitForMultipleObjects(3, objects, FALSE, timeout_ms); 429 }
414 } else { 430
415 // If the event is set, then we pump messages. Otherwise we also wait on 431 void SyncChannel::WaitForReply(HANDLE pump_messages_event) {
416 // it so that if it gets set we start pumping messages. 432 while (true) {
417 if (WaitForSingleObject(pump_messages_event, 0) == WAIT_OBJECT_0) { 433 HANDLE objects[] = { sync_context()->GetDispatchEvent(),
418 // Before calling MsgWaitForMultipleObjects() we check that our events 434 sync_context()->GetSendDoneEvent(),
419 // are not signaled. The windows message queue might always have events 435 pump_messages_event };
420 // starving the checking of our events otherwise. 436 uint32 count = pump_messages_event ? 3: 2;
421 result = WaitForMultipleObjects(3, objects, FALSE, 0); 437 DWORD result = WaitForMultipleObjects(count, objects, FALSE, INFINITE);
422 if (result == WAIT_TIMEOUT) { 438 if (result == WAIT_OBJECT_0) {
423 result = MsgWaitForMultipleObjects(3, objects, FALSE, timeout_ms, 439 // We're waiting for a reply, but we received a blocking synchronous
424 QS_ALLINPUT); 440 // call. We must process it or otherwise a deadlock might occur.
425 } 441 ResetEvent(sync_context()->GetDispatchEvent());
426 } else { 442 sync_context()->DispatchMessages();
427 result = WaitForMultipleObjects(4, objects, FALSE, timeout_ms); 443 continue;
428 }
429 } 444 }
430 445
431 if (result == WAIT_OBJECT_0 || result == WAIT_TIMEOUT) { 446 if (result == WAIT_OBJECT_0 + 2)
432 // Process shut down before we can get a reply to a synchronous message, 447 WaitForReplyWithNestedMessageLoop(); // Start a nested message loop.
433 // or timed-out. Unblock the thread.
434 sync_context()->PopDeserializer(true);
435 return false;
436 }
437 448
438 if (result == WAIT_OBJECT_0 + 1) { 449 break;
439 // We got the reply to our synchronous message. 450 }
440 CloseHandle(reply_event);
441 return sync_context()->reply_deserialize_result();
442 }
443
444 if (result == WAIT_OBJECT_0 + 2) {
445 // We're waiting for a reply, but we received a blocking synchronous
446 // call. We must process it or otherwise a deadlock might occur.
447 sync_context()->DispatchMessages();
448 } else if (result == WAIT_OBJECT_0 + 3) {
449 // Run a nested messsage loop to pump all the thread's messages. We
450 // shutdown the nested loop when there are no more messages.
451 pump_messages_events_.push(pump_messages_event);
452 bool old_state = MessageLoop::current()->NestableTasksAllowed();
453 MessageLoop::current()->SetNestableTasksAllowed(true);
454 // Process a message, but come right back out of the MessageLoop (don't
455 // loop, sleep, or wait for a kMsgQuit).
456 MessageLoop::current()->RunAllPending();
457 MessageLoop::current()->SetNestableTasksAllowed(old_state);
458 pump_messages_events_.pop();
459 } else {
460 DCHECK(result == WAIT_OBJECT_0 + 4);
461 // We were doing a WaitForMultipleObjects, but now the pump messages
462 // event is set, so the next time we loop we'll use
463 // MsgWaitForMultipleObjects instead.
464 }
465
466 if (timeout_ms != INFINITE) {
467 TimeDelta time_delta = TimeTicks::Now() - before;
468 timeout_ms -= static_cast<int>(time_delta.InMilliseconds());
469 if (timeout_ms <= 0) {
470 // We timed-out while processing messages.
471 sync_context()->PopDeserializer(true);
472 return false;
473 }
474 }
475
476 // Continue looping until we either get the reply to our synchronous message
477 // or we time-out.
478 } while (true);
479 } 451 }
480 452
481 bool SyncChannel::UnblockListener(Message* message) { 453 void SyncChannel::WaitForReplyWithNestedMessageLoop() {
482 return sync_context()->UnblockListener(message); 454 HANDLE old_done_event = send_done_watcher_.GetWatchedObject();
455 send_done_watcher_.StopWatching();
456 send_done_watcher_.StartWatching(sync_context()->GetSendDoneEvent(), this);
457 bool old_state = MessageLoop::current()->NestableTasksAllowed();
458 MessageLoop::current()->SetNestableTasksAllowed(true);
459 MessageLoop::current()->Run();
460 MessageLoop::current()->SetNestableTasksAllowed(old_state);
461 if (old_done_event)
462 send_done_watcher_.StartWatching(old_done_event, this);
463 }
464
465 void SyncChannel::OnObjectSignaled(HANDLE object) {
466 HANDLE dispatch_event = sync_context()->GetDispatchEvent();
467 if (object == dispatch_event) {
468 // The call to DispatchMessages might delete this object, so reregister
469 // the object watcher first.
470 ResetEvent(dispatch_event);
471 dispatch_watcher_.StartWatching(dispatch_event, this);
472 sync_context()->DispatchMessages();
473 } else {
474 // We got the reply, timed out or the process shutdown.
475 DCHECK(object == sync_context()->GetSendDoneEvent());
476 MessageLoop::current()->Quit();
477 }
483 } 478 }
484 479
485 } // namespace IPC 480 } // namespace IPC
486
OLDNEW
« no previous file with comments | « chrome/common/ipc_sync_channel.h ('k') | chrome/common/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698