OLD | NEW |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_sync_channel.h" | 5 #include "ipc/ipc_sync_channel.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/lazy_instance.h" | 8 #include "base/lazy_instance.h" |
9 #include "base/location.h" | 9 #include "base/location.h" |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
57 bool was_task_pending; | 57 bool was_task_pending; |
58 { | 58 { |
59 base::AutoLock auto_lock(message_lock_); | 59 base::AutoLock auto_lock(message_lock_); |
60 | 60 |
61 was_task_pending = task_pending_; | 61 was_task_pending = task_pending_; |
62 task_pending_ = true; | 62 task_pending_ = true; |
63 | 63 |
64 // We set the event in case the listener thread is blocked (or is about | 64 // We set the event in case the listener thread is blocked (or is about |
65 // to). In case it's not, the PostTask dispatches the messages. | 65 // to). In case it's not, the PostTask dispatches the messages. |
66 message_queue_.push_back(QueuedMessage(new Message(msg), context)); | 66 message_queue_.push_back(QueuedMessage(new Message(msg), context)); |
| 67 message_queue_version_++; |
67 } | 68 } |
68 | 69 |
69 dispatch_event_.Signal(); | 70 dispatch_event_.Signal(); |
70 if (!was_task_pending) { | 71 if (!was_task_pending) { |
71 listener_message_loop_->PostTask( | 72 listener_message_loop_->PostTask( |
72 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, | 73 FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, |
73 this, scoped_refptr<SyncContext>(context))); | 74 this, scoped_refptr<SyncContext>(context))); |
74 } | 75 } |
75 } | 76 } |
76 | 77 |
77 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { | 78 void QueueReply(const Message &msg, SyncChannel::SyncContext* context) { |
78 received_replies_.push_back(QueuedMessage(new Message(msg), context)); | 79 received_replies_.push_back(QueuedMessage(new Message(msg), context)); |
79 } | 80 } |
80 | 81 |
81 // Called on the listener's thread to process any queues synchronous | 82 // Called on the listener's thread to process any queues synchronous |
82 // messages. | 83 // messages. |
83 void DispatchMessagesTask(SyncContext* context) { | 84 void DispatchMessagesTask(SyncContext* context) { |
84 { | 85 { |
85 base::AutoLock auto_lock(message_lock_); | 86 base::AutoLock auto_lock(message_lock_); |
86 task_pending_ = false; | 87 task_pending_ = false; |
87 } | 88 } |
88 context->DispatchMessages(); | 89 context->DispatchMessages(); |
89 } | 90 } |
90 | 91 |
91 void DispatchMessages(SyncContext* dispatching_context) { | 92 void DispatchMessages(SyncContext* dispatching_context) { |
92 SyncMessageQueue delayed_queue; | 93 bool first_time = true; |
| 94 uint32 expected_version = 0; |
| 95 SyncMessageQueue::iterator it; |
93 while (true) { | 96 while (true) { |
94 Message* message; | 97 Message* message = NULL; |
95 scoped_refptr<SyncChannel::SyncContext> context; | 98 scoped_refptr<SyncChannel::SyncContext> context; |
96 { | 99 { |
97 base::AutoLock auto_lock(message_lock_); | 100 base::AutoLock auto_lock(message_lock_); |
98 if (message_queue_.empty()) { | 101 if (first_time || message_queue_version_ != expected_version) { |
99 message_queue_ = delayed_queue; | 102 it = message_queue_.begin(); |
100 break; | 103 first_time = false; |
101 } | 104 } |
| 105 for (; it != message_queue_.end(); it++) { |
| 106 if (!it->context->restrict_dispatch() || |
| 107 it->context == dispatching_context) { |
| 108 message = it->message; |
| 109 context = it->context; |
| 110 it = message_queue_.erase(it); |
| 111 message_queue_version_++; |
| 112 expected_version = message_queue_version_; |
| 113 break; |
| 114 } |
| 115 } |
| 116 } |
102 | 117 |
103 message = message_queue_.front().message; | 118 if (message == NULL) |
104 context = message_queue_.front().context; | 119 break; |
105 message_queue_.pop_front(); | 120 context->OnDispatchMessage(*message); |
106 } | 121 delete message; |
107 if (context->restrict_dispatch() && context != dispatching_context) { | |
108 delayed_queue.push_back(QueuedMessage(message, context)); | |
109 } else { | |
110 context->OnDispatchMessage(*message); | |
111 delete message; | |
112 } | |
113 } | 122 } |
114 } | 123 } |
115 | 124 |
116 // SyncChannel calls this in its destructor. | 125 // SyncChannel calls this in its destructor. |
117 void RemoveContext(SyncContext* context) { | 126 void RemoveContext(SyncContext* context) { |
118 base::AutoLock auto_lock(message_lock_); | 127 base::AutoLock auto_lock(message_lock_); |
119 | 128 |
120 SyncMessageQueue::iterator iter = message_queue_.begin(); | 129 SyncMessageQueue::iterator iter = message_queue_.begin(); |
121 while (iter != message_queue_.end()) { | 130 while (iter != message_queue_.end()) { |
122 if (iter->context == context) { | 131 if (iter->context == context) { |
123 delete iter->message; | 132 delete iter->message; |
124 iter = message_queue_.erase(iter); | 133 iter = message_queue_.erase(iter); |
| 134 message_queue_version_++; |
125 } else { | 135 } else { |
126 iter++; | 136 iter++; |
127 } | 137 } |
128 } | 138 } |
129 | 139 |
130 if (--listener_count_ == 0) { | 140 if (--listener_count_ == 0) { |
131 DCHECK(lazy_tls_ptr_.Pointer()->Get()); | 141 DCHECK(lazy_tls_ptr_.Pointer()->Get()); |
132 lazy_tls_ptr_.Pointer()->Set(NULL); | 142 lazy_tls_ptr_.Pointer()->Set(NULL); |
133 } | 143 } |
134 } | 144 } |
(...skipping 27 matching lines...) Expand all Loading... |
162 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { | 172 void set_top_send_done_watcher(base::WaitableEventWatcher* watcher) { |
163 top_send_done_watcher_ = watcher; | 173 top_send_done_watcher_ = watcher; |
164 } | 174 } |
165 | 175 |
166 private: | 176 private: |
167 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; | 177 friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>; |
168 | 178 |
169 // See the comment in SyncChannel::SyncChannel for why this event is created | 179 // See the comment in SyncChannel::SyncChannel for why this event is created |
170 // as manual reset. | 180 // as manual reset. |
171 ReceivedSyncMsgQueue() : | 181 ReceivedSyncMsgQueue() : |
| 182 message_queue_version_(0), |
172 dispatch_event_(true, false), | 183 dispatch_event_(true, false), |
173 listener_message_loop_(base::MessageLoopProxy::current()), | 184 listener_message_loop_(base::MessageLoopProxy::current()), |
174 task_pending_(false), | 185 task_pending_(false), |
175 listener_count_(0), | 186 listener_count_(0), |
176 top_send_done_watcher_(NULL) { | 187 top_send_done_watcher_(NULL) { |
177 } | 188 } |
178 | 189 |
179 ~ReceivedSyncMsgQueue() {} | 190 ~ReceivedSyncMsgQueue() {} |
180 | 191 |
181 // Holds information about a queued synchronous message or reply. | 192 // Holds information about a queued synchronous message or reply. |
182 struct QueuedMessage { | 193 struct QueuedMessage { |
183 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } | 194 QueuedMessage(Message* m, SyncContext* c) : message(m), context(c) { } |
184 Message* message; | 195 Message* message; |
185 scoped_refptr<SyncChannel::SyncContext> context; | 196 scoped_refptr<SyncChannel::SyncContext> context; |
186 }; | 197 }; |
187 | 198 |
188 typedef std::deque<QueuedMessage> SyncMessageQueue; | 199 typedef std::list<QueuedMessage> SyncMessageQueue; |
189 SyncMessageQueue message_queue_; | 200 SyncMessageQueue message_queue_; |
| 201 uint32 message_queue_version_; // Used to signal DispatchMessages to rescan |
190 | 202 |
191 std::vector<QueuedMessage> received_replies_; | 203 std::vector<QueuedMessage> received_replies_; |
192 | 204 |
193 // Set when we got a synchronous message that we must respond to as the | 205 // Set when we got a synchronous message that we must respond to as the |
194 // sender needs its reply before it can reply to our original synchronous | 206 // sender needs its reply before it can reply to our original synchronous |
195 // message. | 207 // message. |
196 WaitableEvent dispatch_event_; | 208 WaitableEvent dispatch_event_; |
197 scoped_refptr<base::MessageLoopProxy> listener_message_loop_; | 209 scoped_refptr<base::MessageLoopProxy> listener_message_loop_; |
198 base::Lock message_lock_; | 210 base::Lock message_lock_; |
199 bool task_pending_; | 211 bool task_pending_; |
(...skipping 319 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
519 // Ideally we only want to watch this object when running a nested message | 531 // Ideally we only want to watch this object when running a nested message |
520 // loop. However, we don't know when it exits if there's another nested | 532 // loop. However, we don't know when it exits if there's another nested |
521 // message loop running under it or not, so we wouldn't know whether to | 533 // message loop running under it or not, so we wouldn't know whether to |
522 // stop or keep watching. So we always watch it, and create the event as | 534 // stop or keep watching. So we always watch it, and create the event as |
523 // manual reset since the object watcher might otherwise reset the event | 535 // manual reset since the object watcher might otherwise reset the event |
524 // when we're doing a WaitMany. | 536 // when we're doing a WaitMany. |
525 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); | 537 dispatch_watcher_.StartWatching(sync_context()->GetDispatchEvent(), this); |
526 } | 538 } |
527 | 539 |
528 } // namespace IPC | 540 } // namespace IPC |
OLD | NEW |