OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/common/message_pump_mojo.h" | 5 #include "mojo/message_pump/message_pump_mojo.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <vector> | 8 #include <vector> |
9 | 9 |
10 #include "base/debug/alias.h" | 10 #include "base/debug/alias.h" |
11 #include "base/lazy_instance.h" | |
12 #include "base/logging.h" | 11 #include "base/logging.h" |
13 #include "base/threading/thread_local.h" | 12 #include "base/threading/thread_local.h" |
14 #include "base/time/time.h" | 13 #include "base/time/time.h" |
15 #include "mojo/common/message_pump_mojo_handler.h" | 14 #include "mojo/message_pump/message_pump_mojo_handler.h" |
16 #include "mojo/common/time_helper.h" | 15 #include "mojo/message_pump/time_helper.h" |
| 16 #include "mojo/public/cpp/system/message_pipe.h" |
| 17 #include "mojo/public/cpp/system/wait.h" |
17 | 18 |
18 namespace mojo { | 19 namespace mojo { |
19 namespace common { | 20 namespace common { |
20 namespace { | 21 namespace { |
21 | 22 |
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky | 23 base::ThreadLocalPointer<MessagePumpMojo>* CurrentPump() { |
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; | 24 static auto* tls = new base::ThreadLocalPointer<MessagePumpMojo>; |
| 25 return tls; |
| 26 } |
24 | 27 |
25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, | 28 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, |
26 base::TimeTicks now) { | 29 base::TimeTicks now) { |
27 // The is_null() check matches that of HandleWatcher as well as how | 30 // The is_null() check matches that of HandleWatcher as well as how |
28 // |delayed_work_time| is used. | 31 // |delayed_work_time| is used. |
29 if (time_ticks.is_null()) | 32 if (time_ticks.is_null()) |
30 return MOJO_DEADLINE_INDEFINITE; | 33 return MOJO_DEADLINE_INDEFINITE; |
31 const int64_t delta = (time_ticks - now).InMicroseconds(); | 34 const int64_t delta = (time_ticks - now).InMicroseconds(); |
32 return delta < 0 ? static_cast<MojoDeadline>(0) : | 35 return delta < 0 ? static_cast<MojoDeadline>(0) : |
33 static_cast<MojoDeadline>(delta); | 36 static_cast<MojoDeadline>(delta); |
34 } | 37 } |
35 | 38 |
36 } // namespace | 39 } // namespace |
37 | 40 |
38 // State needed for one iteration of WaitMany. The first handle and flags | 41 // State needed for one iteration of WaitMany. The first handle and flags |
39 // corresponds to that of the control pipe. | 42 // corresponds to that of the control pipe. |
40 struct MessagePumpMojo::WaitState { | 43 struct MessagePumpMojo::WaitState { |
41 std::vector<Handle> handles; | 44 std::vector<Handle> handles; |
42 std::vector<MojoHandleSignals> wait_signals; | 45 std::vector<MojoHandleSignals> wait_signals; |
43 }; | 46 }; |
44 | 47 |
45 struct MessagePumpMojo::RunState { | 48 struct MessagePumpMojo::RunState { |
46 RunState() : should_quit(false) { | 49 RunState() : should_quit(false) { |
47 CreateMessagePipe(NULL, &read_handle, &write_handle); | 50 CreateMessagePipe(nullptr, &read_handle, &write_handle); |
48 } | 51 } |
49 | 52 |
50 base::TimeTicks delayed_work_time; | 53 base::TimeTicks delayed_work_time; |
51 | 54 |
52 // Used to wake up WaitForWork(). | 55 // Used to wake up WaitForWork(). |
53 ScopedMessagePipeHandle read_handle; | 56 ScopedMessagePipeHandle read_handle; |
54 ScopedMessagePipeHandle write_handle; | 57 ScopedMessagePipeHandle write_handle; |
55 | 58 |
| 59 // Cached structures to avoid the heap allocation cost of std::vector<>. |
| 60 scoped_ptr<WaitState> wait_state; |
| 61 scoped_ptr<HandleToHandlerList> cloned_handlers; |
| 62 |
56 bool should_quit; | 63 bool should_quit; |
57 }; | 64 }; |
58 | 65 |
59 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { | 66 MessagePumpMojo::MessagePumpMojo() : run_state_(nullptr), next_handler_id_(0) { |
60 DCHECK(!current()) | 67 DCHECK(!current()) |
61 << "There is already a MessagePumpMojo instance on this thread."; | 68 << "There is already a MessagePumpMojo instance on this thread."; |
62 g_tls_current_pump.Pointer()->Set(this); | 69 CurrentPump()->Set(this); |
63 } | 70 } |
64 | 71 |
65 MessagePumpMojo::~MessagePumpMojo() { | 72 MessagePumpMojo::~MessagePumpMojo() { |
66 DCHECK_EQ(this, current()); | 73 DCHECK_EQ(this, current()); |
67 g_tls_current_pump.Pointer()->Set(NULL); | 74 CurrentPump()->Set(nullptr); |
68 } | 75 } |
69 | 76 |
70 // static | 77 // static |
71 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { | 78 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { |
72 return scoped_ptr<MessagePump>(new MessagePumpMojo()); | 79 return scoped_ptr<MessagePump>(new MessagePumpMojo()); |
73 } | 80 } |
74 | 81 |
75 // static | 82 // static |
76 MessagePumpMojo* MessagePumpMojo::current() { | 83 MessagePumpMojo* MessagePumpMojo::current() { |
77 return g_tls_current_pump.Pointer()->Get(); | 84 return CurrentPump()->Get(); |
78 } | 85 } |
79 | 86 |
80 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, | 87 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, |
81 const Handle& handle, | 88 const Handle& handle, |
82 MojoHandleSignals wait_signals, | 89 MojoHandleSignals wait_signals, |
83 base::TimeTicks deadline) { | 90 base::TimeTicks deadline) { |
84 CHECK(handler); | 91 CHECK(handler); |
85 DCHECK(handle.is_valid()); | 92 DCHECK(handle.is_valid()); |
86 // Assume it's an error if someone tries to reregister an existing handle. | 93 // Assume it's an error if someone tries to reregister an existing handle. |
87 CHECK_EQ(0u, handlers_.count(handle)); | 94 CHECK_EQ(0u, handlers_.count(handle)); |
(...skipping 15 matching lines...) Expand all Loading... |
103 | 110 |
104 void MessagePumpMojo::RemoveObserver(Observer* observer) { | 111 void MessagePumpMojo::RemoveObserver(Observer* observer) { |
105 observers_.RemoveObserver(observer); | 112 observers_.RemoveObserver(observer); |
106 } | 113 } |
107 | 114 |
108 void MessagePumpMojo::Run(Delegate* delegate) { | 115 void MessagePumpMojo::Run(Delegate* delegate) { |
109 RunState run_state; | 116 RunState run_state; |
110 // TODO: better deal with error handling. | 117 // TODO: better deal with error handling. |
111 CHECK(run_state.read_handle.is_valid()); | 118 CHECK(run_state.read_handle.is_valid()); |
112 CHECK(run_state.write_handle.is_valid()); | 119 CHECK(run_state.write_handle.is_valid()); |
113 RunState* old_state = NULL; | 120 RunState* old_state = nullptr; |
114 { | 121 { |
115 base::AutoLock auto_lock(run_state_lock_); | 122 base::AutoLock auto_lock(run_state_lock_); |
116 old_state = run_state_; | 123 old_state = run_state_; |
117 run_state_ = &run_state; | 124 run_state_ = &run_state; |
118 } | 125 } |
119 DoRunLoop(&run_state, delegate); | 126 DoRunLoop(&run_state, delegate); |
120 { | 127 { |
121 base::AutoLock auto_lock(run_state_lock_); | 128 base::AutoLock auto_lock(run_state_lock_); |
122 run_state_ = old_state; | 129 run_state_ = old_state; |
123 } | 130 } |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
165 continue; | 172 continue; |
166 | 173 |
167 more_work_is_plausible = delegate->DoIdleWork(); | 174 more_work_is_plausible = delegate->DoIdleWork(); |
168 if (run_state->should_quit) | 175 if (run_state->should_quit) |
169 break; | 176 break; |
170 } | 177 } |
171 } | 178 } |
172 | 179 |
173 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { | 180 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { |
174 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; | 181 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; |
175 const WaitState wait_state = GetWaitState(run_state); | 182 if (!run_state_->wait_state) |
| 183 run_state_->wait_state.reset(new WaitState); |
| 184 GetWaitState(run_state, run_state_->wait_state.get()); |
176 | 185 |
177 const WaitManyResult wait_many_result = | 186 const WaitManyResult wait_many_result = |
178 WaitMany(wait_state.handles, wait_state.wait_signals, deadline, nullptr); | 187 WaitMany(run_state_->wait_state->handles, |
| 188 run_state_->wait_state->wait_signals, deadline, nullptr); |
179 const MojoResult result = wait_many_result.result; | 189 const MojoResult result = wait_many_result.result; |
180 bool did_work = true; | 190 bool did_work = true; |
181 if (result == MOJO_RESULT_OK) { | 191 if (result == MOJO_RESULT_OK) { |
182 if (wait_many_result.index == 0) { | 192 if (wait_many_result.index == 0) { |
183 // Control pipe was written to. | 193 // Control pipe was written to. |
184 ReadMessageRaw(run_state.read_handle.get(), NULL, NULL, NULL, NULL, | 194 ReadMessageRaw(run_state.read_handle.get(), nullptr, nullptr, nullptr, |
185 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | 195 nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
186 } else { | 196 } else { |
187 DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != | 197 DCHECK(handlers_.find( |
| 198 run_state_->wait_state->handles[wait_many_result.index]) != |
188 handlers_.end()); | 199 handlers_.end()); |
189 WillSignalHandler(); | 200 WillSignalHandler(); |
190 handlers_[wait_state.handles[wait_many_result.index]] | 201 handlers_[run_state_->wait_state->handles[wait_many_result.index]] |
191 .handler->OnHandleReady(wait_state.handles[wait_many_result.index]); | 202 .handler->OnHandleReady( |
| 203 run_state_->wait_state->handles[wait_many_result.index]); |
192 DidSignalHandler(); | 204 DidSignalHandler(); |
193 } | 205 } |
194 } else { | 206 } else { |
195 switch (result) { | 207 switch (result) { |
196 case MOJO_RESULT_CANCELLED: | |
197 case MOJO_RESULT_FAILED_PRECONDITION: | 208 case MOJO_RESULT_FAILED_PRECONDITION: |
198 RemoveInvalidHandle(wait_state, result, wait_many_result.index); | 209 RemoveInvalidHandle(*run_state_->wait_state, result, |
| 210 wait_many_result.index); |
199 break; | 211 break; |
200 case MOJO_RESULT_DEADLINE_EXCEEDED: | 212 case MOJO_RESULT_DEADLINE_EXCEEDED: |
201 did_work = false; | 213 did_work = false; |
202 break; | 214 break; |
| 215 case MOJO_RESULT_INVALID_ARGUMENT: |
| 216 case MOJO_RESULT_CANCELLED: |
| 217 case MOJO_RESULT_BUSY: |
| 218 // These results indicate a bug in "our" code (e.g., race conditions). |
| 219 // Fall through. |
203 default: | 220 default: |
204 base::debug::Alias(&result); | 221 base::debug::Alias(&result); |
205 // Unexpected result is likely fatal, crash so we can determine cause. | 222 // Unexpected result is likely fatal, crash so we can determine cause. |
206 CHECK(false); | 223 CHECK(false); |
207 } | 224 } |
208 } | 225 } |
| 226 // To keep memory usage under control, delete the WaitState object at the end |
| 227 // if it's vectors are too big by a factor of 2. Pre-C++11 doesn't have a way |
| 228 // to shrink vectors, so just get rid of them and re-create on the next round. |
| 229 if (run_state_->wait_state->handles.capacity() > |
| 230 2 * run_state_->wait_state->handles.size()) { |
| 231 // NOTE: |handles| and |wait_signals| are always in sync, so it's reasonable |
| 232 // to only check one of those. |
| 233 run_state_->wait_state.reset(); |
| 234 } |
209 | 235 |
210 // Notify and remove any handlers whose time has expired. Make a copy in case | 236 // Notify and remove any handlers whose time has expired. Make a copy in case |
211 // someone tries to add/remove new handlers from notification. | 237 // someone tries to add/remove new handlers from notification. |
212 const HandleToHandler cloned_handlers(handlers_); | 238 if (!run_state_->cloned_handlers) { |
| 239 run_state_->cloned_handlers.reset(new HandleToHandlerList); |
| 240 } else { |
| 241 run_state_->cloned_handlers->clear(); |
| 242 } |
| 243 run_state_->cloned_handlers->reserve(handlers_.size()); |
| 244 for (const auto& handler : handlers_) { |
| 245 run_state_->cloned_handlers->push_back(handler); |
| 246 } |
213 const base::TimeTicks now(internal::NowTicks()); | 247 const base::TimeTicks now(internal::NowTicks()); |
214 for (HandleToHandler::const_iterator i = cloned_handlers.begin(); | 248 for (HandleToHandlerList::const_iterator i = |
215 i != cloned_handlers.end(); ++i) { | 249 run_state_->cloned_handlers->begin(); |
| 250 i != run_state_->cloned_handlers->end(); ++i) { |
216 // Since we're iterating over a clone of the handlers, verify the handler is | 251 // Since we're iterating over a clone of the handlers, verify the handler is |
217 // still valid before notifying. | 252 // still valid before notifying. |
218 if (!i->second.deadline.is_null() && i->second.deadline < now && | 253 if (!i->second.deadline.is_null() && i->second.deadline < now && |
219 handlers_.find(i->first) != handlers_.end() && | 254 handlers_.find(i->first) != handlers_.end() && |
220 handlers_[i->first].id == i->second.id) { | 255 handlers_[i->first].id == i->second.id) { |
221 WillSignalHandler(); | 256 WillSignalHandler(); |
222 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED); | 257 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED); |
223 DidSignalHandler(); | 258 DidSignalHandler(); |
224 handlers_.erase(i->first); | 259 handlers_.erase(i->first); |
225 did_work = true; | 260 did_work = true; |
226 } | 261 } |
227 } | 262 } |
| 263 if (run_state_->cloned_handlers->capacity() > |
| 264 2 * run_state_->cloned_handlers->size()) { |
| 265 run_state_->cloned_handlers.reset(); |
| 266 } |
228 return did_work; | 267 return did_work; |
229 } | 268 } |
230 | 269 |
231 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, | 270 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, |
232 MojoResult result, | 271 MojoResult result, |
233 uint32_t index) { | 272 uint32_t index) { |
234 // TODO(sky): deal with control pipe going bad. | 273 // TODO(sky): deal with control pipe going bad. |
235 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION || | 274 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION || |
236 result == MOJO_RESULT_CANCELLED); | 275 result == MOJO_RESULT_CANCELLED); |
237 CHECK_NE(index, 0u); // Indicates the control pipe went bad. | 276 CHECK_NE(index, 0u); // Indicates the control pipe went bad. |
238 | 277 |
239 // Remove the handle first, this way if OnHandleError() tries to remove the | 278 // Remove the handle first, this way if OnHandleError() tries to remove the |
240 // handle our iterator isn't invalidated. | 279 // handle our iterator isn't invalidated. |
241 CHECK(handlers_.find(wait_state.handles[index]) != handlers_.end()); | 280 CHECK(handlers_.find(wait_state.handles[index]) != handlers_.end()); |
242 MessagePumpMojoHandler* handler = | 281 MessagePumpMojoHandler* handler = |
243 handlers_[wait_state.handles[index]].handler; | 282 handlers_[wait_state.handles[index]].handler; |
244 handlers_.erase(wait_state.handles[index]); | 283 handlers_.erase(wait_state.handles[index]); |
245 WillSignalHandler(); | 284 WillSignalHandler(); |
246 handler->OnHandleError(wait_state.handles[index], result); | 285 handler->OnHandleError(wait_state.handles[index], result); |
247 DidSignalHandler(); | 286 DidSignalHandler(); |
248 } | 287 } |
249 | 288 |
250 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) { | 289 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) { |
251 const MojoResult result = | 290 const MojoResult result = |
252 WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0, | 291 WriteMessageRaw(run_state.write_handle.get(), nullptr, 0, nullptr, 0, |
253 MOJO_WRITE_MESSAGE_FLAG_NONE); | 292 MOJO_WRITE_MESSAGE_FLAG_NONE); |
254 // If we can't write we likely won't wake up the thread and there is a strong | 293 // If we can't write we likely won't wake up the thread and there is a strong |
255 // chance we'll deadlock. | 294 // chance we'll deadlock. |
256 CHECK_EQ(MOJO_RESULT_OK, result); | 295 CHECK_EQ(MOJO_RESULT_OK, result); |
257 } | 296 } |
258 | 297 |
259 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState( | 298 void MessagePumpMojo::GetWaitState( |
260 const RunState& run_state) const { | 299 const RunState& run_state, |
261 WaitState wait_state; | 300 MessagePumpMojo::WaitState* wait_state) const { |
262 wait_state.handles.push_back(run_state.read_handle.get()); | 301 const size_t num_handles = handlers_.size() + 1; |
263 wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); | 302 wait_state->handles.clear(); |
| 303 wait_state->handles.reserve(num_handles); |
| 304 wait_state->wait_signals.clear(); |
| 305 wait_state->wait_signals.reserve(num_handles); |
| 306 wait_state->handles.push_back(run_state.read_handle.get()); |
| 307 wait_state->wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); |
264 | 308 |
265 for (HandleToHandler::const_iterator i = handlers_.begin(); | 309 for (HandleToHandler::const_iterator i = handlers_.begin(); |
266 i != handlers_.end(); ++i) { | 310 i != handlers_.end(); ++i) { |
267 wait_state.handles.push_back(i->first); | 311 wait_state->handles.push_back(i->first); |
268 wait_state.wait_signals.push_back(i->second.wait_signals); | 312 wait_state->wait_signals.push_back(i->second.wait_signals); |
269 } | 313 } |
270 return wait_state; | |
271 } | 314 } |
272 | 315 |
273 MojoDeadline MessagePumpMojo::GetDeadlineForWait( | 316 MojoDeadline MessagePumpMojo::GetDeadlineForWait( |
274 const RunState& run_state) const { | 317 const RunState& run_state) const { |
275 const base::TimeTicks now(internal::NowTicks()); | 318 const base::TimeTicks now(internal::NowTicks()); |
276 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, | 319 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, |
277 now); | 320 now); |
278 for (HandleToHandler::const_iterator i = handlers_.begin(); | 321 for (HandleToHandler::const_iterator i = handlers_.begin(); |
279 i != handlers_.end(); ++i) { | 322 i != handlers_.end(); ++i) { |
280 deadline = std::min( | 323 deadline = std::min( |
281 TimeTicksToMojoDeadline(i->second.deadline, now), deadline); | 324 TimeTicksToMojoDeadline(i->second.deadline, now), deadline); |
282 } | 325 } |
283 return deadline; | 326 return deadline; |
284 } | 327 } |
285 | 328 |
286 void MessagePumpMojo::WillSignalHandler() { | 329 void MessagePumpMojo::WillSignalHandler() { |
287 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); | 330 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); |
288 } | 331 } |
289 | 332 |
290 void MessagePumpMojo::DidSignalHandler() { | 333 void MessagePumpMojo::DidSignalHandler() { |
291 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); | 334 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); |
292 } | 335 } |
293 | 336 |
294 } // namespace common | 337 } // namespace common |
295 } // namespace mojo | 338 } // namespace mojo |
OLD | NEW |