Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(613)

Side by Side Diff: ipc/ipc_sync_channel.cc

Issue 6810013: Add sync context dispatch restriction (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: fix flaky test Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_sync_channel.h" 5 #include "ipc/ipc_sync_channel.h"
6 6
7 #include "base/lazy_instance.h" 7 #include "base/lazy_instance.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/message_loop.h" 9 #include "base/message_loop.h"
10 #include "base/threading/thread_local.h" 10 #include "base/threading/thread_local.h"
11 #include "base/synchronization/waitable_event.h" 11 #include "base/synchronization/waitable_event.h"
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
61 task_pending_ = true; 61 task_pending_ = true;
62 62
63 // We set the event in case the listener thread is blocked (or is about 63 // We set the event in case the listener thread is blocked (or is about
64 // to). In case it's not, the PostTask dispatches the messages. 64 // to). In case it's not, the PostTask dispatches the messages.
65 message_queue_.push_back(QueuedMessage(new Message(msg), context)); 65 message_queue_.push_back(QueuedMessage(new Message(msg), context));
66 } 66 }
67 67
68 dispatch_event_.Signal(); 68 dispatch_event_.Signal();
69 if (!was_task_pending) { 69 if (!was_task_pending) {
70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod( 70 listener_message_loop_->PostTask(FROM_HERE, NewRunnableMethod(
71 this, &ReceivedSyncMsgQueue::DispatchMessagesTask)); 71 this,
72 &ReceivedSyncMsgQueue::DispatchMessagesTask,
73 scoped_refptr<SyncContext>(context)));
72 } 74 }
73 } 75 }
74 76
75 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { 77 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) {
76 received_replies_.push_back(QueuedMessage(new Message(msg), context)); 78 received_replies_.push_back(QueuedMessage(new Message(msg), context));
77 } 79 }
78 80
79 // Called on the listener's thread to process any queues synchronous 81 // Called on the listener's thread to process any queues synchronous
80 // messages. 82 // messages.
81 void DispatchMessagesTask() { 83 void DispatchMessagesTask(SyncContext* context) {
82 { 84 {
83 base::AutoLock auto_lock(message_lock_); 85 base::AutoLock auto_lock(message_lock_);
84 task_pending_ = false; 86 task_pending_ = false;
85 } 87 }
86 DispatchMessages(); 88 context->DispatchMessages();
87 } 89 }
88 90
89 void DispatchMessages() { 91 void DispatchMessages(SyncContext* dispatching_context) {
92 SyncMessageQueue delayed_queue;
90 while (true) { 93 while (true) {
91 Message* message; 94 Message* message;
92 scoped_refptr<SyncChannel::SyncContext> context; 95 scoped_refptr<SyncChannel::SyncContext> context;
93 { 96 {
94 base::AutoLock auto_lock(message_lock_); 97 base::AutoLock auto_lock(message_lock_);
95 if (message_queue_.empty()) 98 if (message_queue_.empty()) {
99 message_queue_ = delayed_queue;
96 break; 100 break;
101 }
97 102
98 message = message_queue_.front().message; 103 message = message_queue_.front().message;
99 context = message_queue_.front().context; 104 context = message_queue_.front().context;
100 message_queue_.pop_front(); 105 message_queue_.pop_front();
101 } 106 }
102 107 if (context->restrict_dispatch() && context != dispatching_context) {
103 context->OnDispatchMessage(*message); 108 delayed_queue.push_back(QueuedMessage(message, context));
104 delete message; 109 } else {
110 context->OnDispatchMessage(*message);
111 delete message;
112 }
105 } 113 }
106 } 114 }
107 115
108 // SyncChannel calls this in its destructor. 116 // SyncChannel calls this in its destructor.
109 void RemoveContext(SyncContext* context) { 117 void RemoveContext(SyncContext* context) {
110 base::AutoLock auto_lock(message_lock_); 118 base::AutoLock auto_lock(message_lock_);
111 119
112 SyncMessageQueue::iterator iter = message_queue_.begin(); 120 SyncMessageQueue::iterator iter = message_queue_.begin();
113 while (iter != message_queue_.end()) { 121 while (iter != message_queue_.end()) {
114 if (iter->context == context) { 122 if (iter->context == context) {
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
197 205
198 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> > 206 base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >
199 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED); 207 SyncChannel::ReceivedSyncMsgQueue::lazy_tls_ptr_(base::LINKER_INITIALIZED);
200 208
201 SyncChannel::SyncContext::SyncContext( 209 SyncChannel::SyncContext::SyncContext(
202 Channel::Listener* listener, 210 Channel::Listener* listener,
203 MessageLoop* ipc_thread, 211 MessageLoop* ipc_thread,
204 WaitableEvent* shutdown_event) 212 WaitableEvent* shutdown_event)
205 : ChannelProxy::Context(listener, ipc_thread), 213 : ChannelProxy::Context(listener, ipc_thread),
206 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), 214 received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
207 shutdown_event_(shutdown_event) { 215 shutdown_event_(shutdown_event),
216 restrict_dispatch_(false) {
208 } 217 }
209 218
210 SyncChannel::SyncContext::~SyncContext() { 219 SyncChannel::SyncContext::~SyncContext() {
211 while (!deserializers_.empty()) 220 while (!deserializers_.empty())
212 Pop(); 221 Pop();
213 } 222 }
214 223
215 // Adds information about an outgoing sync message to the context so that 224 // Adds information about an outgoing sync message to the context so that
216 // we know how to deserialize the reply. Returns a handle that's set when 225 // we know how to deserialize the reply. Returns a handle that's set when
217 // the reply has arrived. 226 // the reply has arrived.
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
253 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() { 262 WaitableEvent* SyncChannel::SyncContext::GetSendDoneEvent() {
254 base::AutoLock auto_lock(deserializers_lock_); 263 base::AutoLock auto_lock(deserializers_lock_);
255 return deserializers_.back().done_event; 264 return deserializers_.back().done_event;
256 } 265 }
257 266
258 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() { 267 WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
259 return received_sync_msgs_->dispatch_event(); 268 return received_sync_msgs_->dispatch_event();
260 } 269 }
261 270
262 void SyncChannel::SyncContext::DispatchMessages() { 271 void SyncChannel::SyncContext::DispatchMessages() {
263 received_sync_msgs_->DispatchMessages(); 272 received_sync_msgs_->DispatchMessages(this);
264 } 273 }
265 274
266 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) { 275 bool SyncChannel::SyncContext::TryToUnblockListener(const Message* msg) {
267 base::AutoLock auto_lock(deserializers_lock_); 276 base::AutoLock auto_lock(deserializers_lock_);
268 if (deserializers_.empty() || 277 if (deserializers_.empty() ||
269 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) { 278 !SyncMessage::IsMessageReplyTo(*msg, deserializers_.back().id)) {
270 return false; 279 return false;
271 } 280 }
272 281
273 if (!msg->is_reply_error()) { 282 if (!msg->is_reply_error()) {
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
371 // message loop running under it or not, so we wouldn't know whether to 380 // message loop running under it or not, so we wouldn't know whether to
372 // stop or keep watching. So we always watch it, and create the event as 381 // stop or keep watching. So we always watch it, and create the event as
373 // manual reset since the object watcher might otherwise reset the event 382 // manual reset since the object watcher might otherwise reset the event
374 // when we're doing a WaitMany. 383 // when we're doing a WaitMany.
375 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); 384 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this);
376 } 385 }
377 386
378 SyncChannel::~SyncChannel() { 387 SyncChannel::~SyncChannel() {
379 } 388 }
380 389
390 void SyncChannel::SetRestrictDispatchToSameChannel(bool value) {
391 sync_context()->set_restrict_dispatch(value);
392 }
393
381 bool SyncChannel::Send(Message* message) { 394 bool SyncChannel::Send(Message* message) {
382 return SendWithTimeout(message, base::kNoTimeout); 395 return SendWithTimeout(message, base::kNoTimeout);
383 } 396 }
384 397
385 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { 398 bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
386 if (!message->is_sync()) { 399 if (!message->is_sync()) {
387 ChannelProxy::Send(message); 400 ChannelProxy::Send(message);
388 return true; 401 return true;
389 } 402 }
390 403
(...skipping 24 matching lines...) Expand all
415 428
416 // Wait for reply, or for any other incoming synchronous messages. 429 // Wait for reply, or for any other incoming synchronous messages.
417 // *this* might get deleted, so only call static functions at this point. 430 // *this* might get deleted, so only call static functions at this point.
418 WaitForReply(context, pump_messages_event); 431 WaitForReply(context, pump_messages_event);
419 432
420 return context->Pop(); 433 return context->Pop();
421 } 434 }
422 435
423 void SyncChannel::WaitForReply( 436 void SyncChannel::WaitForReply(
424 SyncContext* context, WaitableEvent* pump_messages_event) { 437 SyncContext* context, WaitableEvent* pump_messages_event) {
438 context->DispatchMessages();
425 while (true) { 439 while (true) {
426 WaitableEvent* objects[] = { 440 WaitableEvent* objects[] = {
427 context->GetDispatchEvent(), 441 context->GetDispatchEvent(),
428 context->GetSendDoneEvent(), 442 context->GetSendDoneEvent(),
429 pump_messages_event 443 pump_messages_event
430 }; 444 };
431 445
432 unsigned count = pump_messages_event ? 3: 2; 446 unsigned count = pump_messages_event ? 3: 2;
433 size_t result = WaitableEvent::WaitMany(objects, count); 447 size_t result = WaitableEvent::WaitMany(objects, count);
434 if (result == 0 /* dispatch event */) { 448 if (result == 0 /* dispatch event */) {
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
485 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) { 499 void SyncChannel::OnWaitableEventSignaled(WaitableEvent* event) {
486 DCHECK(event == sync_context()->GetDispatchEvent()); 500 DCHECK(event == sync_context()->GetDispatchEvent());
487 // The call to DispatchMessages might delete this object, so reregister 501 // The call to DispatchMessages might delete this object, so reregister
488 // the object watcher first. 502 // the object watcher first.
489 event->Reset(); 503 event->Reset();
490 dispatch_watcher_.StartWatching(event, this); 504 dispatch_watcher_.StartWatching(event, this);
491 sync_context()->DispatchMessages(); 505 sync_context()->DispatchMessages();
492 } 506 }
493 507
494 } // namespace IPC 508 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_sync_channel.h ('k') | ipc/ipc_sync_channel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698