Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(310)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 10694014: Cleanup IPC::ChannelProxy to use SingleThreadTaskRunner (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 8 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/lazy_instance.h" 8 #include "base/lazy_instance.h"
9 #include "base/location.h" 9 #include "base/location.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "base/threading/thread_local.h"
12 #include "base/synchronization/waitable_event.h" 11 #include "base/synchronization/waitable_event.h"
13 #include "base/synchronization/waitable_event_watcher.h" 12 #include "base/synchronization/waitable_event_watcher.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "base/threading/thread_local.h"
14 #include "ipc/ipc_sync_message.h" 15 #include "ipc/ipc_sync_message.h"
15 16
16 using base::TimeDelta; 17 using base::TimeDelta;
17 using base::TimeTicks; 18 using base::TimeTicks;
18 using base::WaitableEvent; 19 using base::WaitableEvent;
19 20
20 namespace IPC { 21 namespace IPC {
21 // When we're blocked in a Send(), we need to process incoming synchronous 22 // When we're blocked in a Send(), we need to process incoming synchronous
22 // messages right away because it could be blocking our reply (either 23 // messages right away because it could be blocking our reply (either
23 // directly from the same object we're calling, or indirectly through one or 24 // directly from the same object we're calling, or indirectly through one or
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
62 task_pending_ = true; 63 task_pending_ = true;
63 64
64 // We set the event in case the listener thread is blocked (or is about 65 // We set the event in case the listener thread is blocked (or is about
65 // to). In case it's not, the PostTask dispatches the messages. 66 // to). In case it's not, the PostTask dispatches the messages.
66 message_queue_.push_back(QueuedMessage(new Message(msg), context)); 67 message_queue_.push_back(QueuedMessage(new Message(msg), context));
67 message_queue_version_++; 68 message_queue_version_++;
68 } 69 }
69 70
70 dispatch_event_.Signal(); 71 dispatch_event_.Signal();
71 if (!was_task_pending) { 72 if (!was_task_pending) {
72 listener_message_loop_->PostTask( 73 listener_task_runner_->PostTask(
73 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, 74 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
74 this, scoped_refptr<SyncContext>(context))); 75 this, scoped_refptr<SyncContext>(context)));
75 } 76 }
76 } 77 }
77 78
78 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { 79 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
79 received_replies_.push_back(QueuedMessage(new Message(msg), context)); 80 received_replies_.push_back(QueuedMessage(new Message(msg), context));
80 } 81 }
81 82
82 // Called on the listener's thread to process any queues synchronous 83 // Called on the listener's thread to process any queues synchronous
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 } 139 }
139 } 140 }
140 141
141 if (--listener_count_ == 0) { 142 if (--listener_count_ == 0) {
142 DCHECK(lazy_tls_ptr_.Pointer()->Get()); 143 DCHECK(lazy_tls_ptr_.Pointer()->Get());
143 lazy_tls_ptr_.Pointer()->Set(NULL); 144 lazy_tls_ptr_.Pointer()->Set(NULL);
144 } 145 }
145 } 146 }
146 147
147 WaitableEvent* dispatch_event() { return &dispatch_event_; } 148 WaitableEvent* dispatch_event() { return &dispatch_event_; }
148 base::MessageLoopProxy* listener_message_loop() { 149 base::SingleThreadTaskRunner* listener_task_runner() {
149 return listener_message_loop_; 150 return listener_task_runner_;
150 } 151 }
151 152
152 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. 153 // Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
153 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> > 154 static base::LazyInstance<base::ThreadLocalPointer<ReceivedSyncMsgQueue> >
154 lazy_tls_ptr_; 155 lazy_tls_ptr_;
155 156
156 // Called on the ipc thread to check if we can unblock any current Send() 157 // Called on the ipc thread to check if we can unblock any current Send()
157 // calls based on a queued reply. 158 // calls based on a queued reply.
158 void DispatchReplies() { 159 void DispatchReplies() {
159 for (size_t i = 0; i < received_replies_.size(); ++i) { 160 for (size_t i = 0; i < received_replies_.size(); ++i) {
(...skipping 15 matching lines...) Expand all
175 } 176 }
176 177
177 private: 178 private:
178 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; 179 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;
179 180
180 // See the comment in SyncChannel::SyncChannel for why this event is created 181 // See the comment in SyncChannel::SyncChannel for why this event is created
181 // as manual reset. 182 // as manual reset.
182 ReceivedSyncMsgQueue() : 183 ReceivedSyncMsgQueue() :
183 message_queue_version_(0), 184 message_queue_version_(0),
184 dispatch_event_(true, false), 185 dispatch_event_(true, false),
185 listener_message_loop_(base::MessageLoopProxy::current()), 186 listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
186 task_pending_(false), 187 task_pending_(false),
187 listener_count_(0), 188 listener_count_(0),
188 top_send_done_watcher_(NULL) { 189 top_send_done_watcher_(NULL) {
189 } 190 }
190 191
191 ~ReceivedSyncMsgQueue() {} 192 ~ReceivedSyncMsgQueue() {}
192 193
193 // Holds information about a queued synchronous message or reply. 194 // Holds information about a queued synchronous message or reply.
194 struct QueuedMessage { 195 struct QueuedMessage {
195 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } 196 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { }
196 Message* message; 197 Message* message;
197 scoped_refptr<SyncChannel::SyncContext> context; 198 scoped_refptr<SyncChannel::SyncContext> context;
198 }; 199 };
199 200
200 typedef std::list<QueuedMessage> SyncMessageQueue; 201 typedef std::list<QueuedMessage> SyncMessageQueue;
201 SyncMessageQueue message_queue_; 202 SyncMessageQueue message_queue_;
202 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan 203 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan
203 204
204 std::vector<QueuedMessage> received_replies_; 205 std::vector<QueuedMessage> received_replies_;
205 206
206 // Set when we got a synchronous message that we must respond to as the 207 // Set when we got a synchronous message that we must respond to as the
207 // sender needs its reply before it can reply to our original synchronous 208 // sender needs its reply before it can reply to our original synchronous
208 // message. 209 // message.
209 WaitableEvent dispatch_event_; 210 WaitableEvent dispatch_event_;
210 scoped_refptr<base::MessageLoopProxy> listener_message_loop_; 211 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
211 base::Lock message_lock_; 212 base::Lock message_lock_;
212 bool task_pending_; 213 bool task_pending_;
213 int listener_count_; 214 int listener_count_;
214 215
215 // The current send done event watcher for this thread. Used to maintain 216 // The current send done event watcher for this thread. Used to maintain
216 // a local global stack of send done watchers to ensure that nested sync 217 // a local global stack of send done watchers to ensure that nested sync
217 // message loops complete correctly. 218 // message loops complete correctly.
218 base::WaitableEventWatcher* top_send_done_watcher_; 219 base::WaitableEventWatcher* top_send_done_watcher_;
219 }; 220 };
220 221
221 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 222 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
222 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ = 223 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_ =
223 LAZY_INSTANCE_INITIALIZER; 224 LAZY_INSTANCE_INITIALIZER;
224 225
225 SyncChannel::SyncContext::SyncContext( 226 SyncChannel::SyncContext::SyncContext(
226 Listener* listener, 227 Listener* listener,
227 base::MessageLoopProxy* ipc_thread, 228 base::SingleThreadTaskRunner* ipc_task_runner,
228 WaitableEvent* shutdown_event) 229 WaitableEvent* shutdown_event)
229 : ChannelProxy::Context(listener, ipc_thread), 230 : ChannelProxy::Context(listener, ipc_task_runner),
230 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 231 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
231 shutdown_event_(shutdown_event), 232 shutdown_event_(shutdown_event),
232 restrict_dispatch_group_(kRestrictDispatchGroup_None) { 233 restrict_dispatch_group_(kRestrictDispatchGroup_None) {
233 } 234 }
234 235
235 SyncChannel::SyncContext::~SyncContext() { 236 SyncChannel::SyncContext::~SyncContext() {
236 while (!deserializers_.empty()) 237 while (!deserializers_.empty())
237 Pop(); 238 Pop();
238 } 239 }
239 240
(...skipping 26 matching lines...) Expand all
266 msg.done_event = NULL; 267 msg.done_event = NULL;
267 deserializers_.pop_back(); 268 deserializers_.pop_back();
268 result = msg.send_result; 269 result = msg.send_result;
269 } 270 }
270 271
271 // We got a reply to a synchronous Send() call that's blocking the listener 272 // We got a reply to a synchronous Send() call that's blocking the listener
272 // thread. However, further down the call stack there could be another 273 // thread. However, further down the call stack there could be another
273 // blocking Send() call, whose reply we received after we made this last 274 // blocking Send() call, whose reply we received after we made this last
274 // Send() call. So check if we have any queued replies available that 275 // Send() call. So check if we have any queued replies available that
275 // can now unblock the listener thread. 276 // can now unblock the listener thread.
276 ipc_message_loop()->PostTask( 277 ipc_task_runner()->PostTask(
277 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, 278 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
278 received_sync_msgs_.get())); 279 received_sync_msgs_.get()));
279 280
280 return result; 281 return result;
281 } 282 }
282 283
283 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 284 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
284 base::AutoLock auto_lock(deserializers_lock_); 285 base::AutoLock auto_lock(deserializers_lock_);
285 return deserializers_.back().done_event; 286 return deserializers_.back().done_event;
286 } 287 }
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
381 DCHECK_EQ(GetSendDoneEvent(), event); 382 DCHECK_EQ(GetSendDoneEvent(), event);
382 MessageLoop::current()->QuitNow(); 383 MessageLoop::current()->QuitNow();
383 } 384 }
384 } 385 }
385 386
386 387
387 SyncChannel::SyncChannel( 388 SyncChannel::SyncChannel(
388 const IPC::ChannelHandle& channel_handle, 389 const IPC::ChannelHandle& channel_handle,
389 Channel::Mode mode, 390 Channel::Mode mode,
390 Listener* listener, 391 Listener* listener,
391 base::MessageLoopProxy* ipc_message_loop, 392 base::SingleThreadTaskRunner* ipc_task_runner,
392 bool create_pipe_now, 393 bool create_pipe_now,
393 WaitableEvent* shutdown_event) 394 WaitableEvent* shutdown_event)
394 : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), 395 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
395 sync_messages_with_no_timeout_allowed_(true) { 396 sync_messages_with_no_timeout_allowed_(true) {
396 ChannelProxy::Init(channel_handle, mode, create_pipe_now); 397 ChannelProxy::Init(channel_handle, mode, create_pipe_now);
397 StartWatching(); 398 StartWatching();
398 } 399 }
399 400
400 SyncChannel::SyncChannel( 401 SyncChannel::SyncChannel(
401 Listener* listener, 402 Listener* listener,
402 base::MessageLoopProxy* ipc_message_loop, 403 base::SingleThreadTaskRunner* ipc_task_runner,
403 WaitableEvent* shutdown_event) 404 WaitableEvent* shutdown_event)
404 : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), 405 : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
405 sync_messages_with_no_timeout_allowed_(true) { 406 sync_messages_with_no_timeout_allowed_(true) {
406 StartWatching(); 407 StartWatching();
407 } 408 }
408 409
409 SyncChannel::~SyncChannel() { 410 SyncChannel::~SyncChannel() {
410 } 411 }
411 412
412 void SyncChannel::SetRestrictDispatchChannelGroup(int group) { 413 void SyncChannel::SetRestrictDispatchChannelGroup(int group) {
413 sync_context()->set_restrict_dispatch_group(group); 414 sync_context()->set_restrict_dispatch_group(group);
414 } 415 }
(...skipping 21 matching lines...) Expand all
436 context->Push(sync_msg); 437 context->Push(sync_msg);
437 int message_id = SyncMessage::GetMessageId(*sync_msg); 438 int message_id = SyncMessage::GetMessageId(*sync_msg);
438 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event(); 439 WaitableEvent* pump_messages_event = sync_msg->pump_messages_event();
439 440
440 ChannelProxy::Send(message); 441 ChannelProxy::Send(message);
441 442
442 if (timeout_ms != base::kNoTimeout) { 443 if (timeout_ms != base::kNoTimeout) {
443 // We use the sync message id so that when a message times out, we don't 444 // We use the sync message id so that when a message times out, we don't
444 // confuse it with another send that is either above/below this Send in 445 // confuse it with another send that is either above/below this Send in
445 // the call stack. 446 // the call stack.
446 context->ipc_message_loop()->PostDelayedTask( 447 context->ipc_task_runner()->PostDelayedTask(
447 FROM_HERE, 448 FROM_HERE,
448 base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id), 449 base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id),
449 base::TimeDelta::FromMilliseconds(timeout_ms)); 450 base::TimeDelta::FromMilliseconds(timeout_ms));
450 } 451 }
451 452
452 // Wait for reply, or for any other incoming synchronous messages. 453 // Wait for reply, or for any other incoming synchronous messages.
453 // *this* might get deleted, so only call static functions at this point. 454 // *this* might get deleted, so only call static functions at this point.
454 WaitForReply(context, pump_messages_event); 455 WaitForReply(context, pump_messages_event);
455 456
456 return context->Pop(); 457 return context->Pop();
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
532 // Ideally we only want to watch this object when running a nested message 533 // Ideally we only want to watch this object when running a nested message
533 // loop. However, we don't know when it exits if there's another nested 534 // loop. However, we don't know when it exits if there's another nested
534 // message loop running under it or not, so we wouldn't know whether to 535 // message loop running under it or not, so we wouldn't know whether to
535 // stop or keep watching. So we always watch it, and create the event as 536 // stop or keep watching. So we always watch it, and create the event as
536 // manual reset since the object watcher might otherwise reset the event 537 // manual reset since the object watcher might otherwise reset the event
537 // when we're doing a WaitMany. 538 // when we're doing a WaitMany.
538 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); 539 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
539 } 540 }
540 541
541 } // namespace IPC 542 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698