OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/message_pump/message_pump_mojo.h" | |
6 | |
7 #include <algorithm> | |
8 #include <vector> | |
9 | |
10 #include "base/debug/alias.h" | |
11 #include "base/lazy_instance.h" | |
12 #include "base/logging.h" | |
13 #include "base/threading/thread_local.h" | |
14 #include "base/time/time.h" | |
15 #include "mojo/message_pump/message_pump_mojo_handler.h" | |
16 #include "mojo/message_pump/time_helper.h" | |
17 | |
18 namespace mojo { | |
19 namespace common { | |
20 namespace { | |
21 | |
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky | |
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; | |
24 | |
25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, | |
26 base::TimeTicks now) { | |
27 // The is_null() check matches that of HandleWatcher as well as how | |
28 // |delayed_work_time| is used. | |
29 if (time_ticks.is_null()) | |
30 return MOJO_DEADLINE_INDEFINITE; | |
31 const int64_t delta = (time_ticks - now).InMicroseconds(); | |
32 return delta < 0 ? static_cast<MojoDeadline>(0) : | |
33 static_cast<MojoDeadline>(delta); | |
34 } | |
35 | |
36 } // namespace | |
37 | |
38 // State needed for one iteration of WaitMany. The first handle and flags | |
39 // corresponds to that of the control pipe. | |
40 struct MessagePumpMojo::WaitState { | |
41 std::vector<Handle> handles; | |
42 std::vector<MojoHandleSignals> wait_signals; | |
43 }; | |
44 | |
45 struct MessagePumpMojo::RunState { | |
46 RunState() : should_quit(false) { | |
47 CreateMessagePipe(NULL, &read_handle, &write_handle); | |
48 } | |
49 | |
50 base::TimeTicks delayed_work_time; | |
51 | |
52 // Used to wake up WaitForWork(). | |
53 ScopedMessagePipeHandle read_handle; | |
54 ScopedMessagePipeHandle write_handle; | |
55 | |
56 bool should_quit; | |
57 }; | |
58 | |
59 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { | |
60 DCHECK(!current()) | |
61 << "There is already a MessagePumpMojo instance on this thread."; | |
62 g_tls_current_pump.Pointer()->Set(this); | |
63 } | |
64 | |
65 MessagePumpMojo::~MessagePumpMojo() { | |
66 DCHECK_EQ(this, current()); | |
67 g_tls_current_pump.Pointer()->Set(NULL); | |
68 } | |
69 | |
70 // static | |
71 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { | |
72 return scoped_ptr<MessagePump>(new MessagePumpMojo()); | |
73 } | |
74 | |
75 // static | |
76 MessagePumpMojo* MessagePumpMojo::current() { | |
77 return g_tls_current_pump.Pointer()->Get(); | |
78 } | |
79 | |
80 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, | |
81 const Handle& handle, | |
82 MojoHandleSignals wait_signals, | |
83 base::TimeTicks deadline) { | |
84 CHECK(handler); | |
85 DCHECK(handle.is_valid()); | |
86 // Assume it's an error if someone tries to reregister an existing handle. | |
87 CHECK_EQ(0u, handlers_.count(handle)); | |
88 Handler handler_data; | |
89 handler_data.handler = handler; | |
90 handler_data.wait_signals = wait_signals; | |
91 handler_data.deadline = deadline; | |
92 handler_data.id = next_handler_id_++; | |
93 handlers_[handle] = handler_data; | |
94 } | |
95 | |
96 void MessagePumpMojo::RemoveHandler(const Handle& handle) { | |
97 handlers_.erase(handle); | |
98 } | |
99 | |
100 void MessagePumpMojo::AddObserver(Observer* observer) { | |
101 observers_.AddObserver(observer); | |
102 } | |
103 | |
104 void MessagePumpMojo::RemoveObserver(Observer* observer) { | |
105 observers_.RemoveObserver(observer); | |
106 } | |
107 | |
108 void MessagePumpMojo::Run(Delegate* delegate) { | |
109 RunState run_state; | |
110 // TODO: better deal with error handling. | |
111 CHECK(run_state.read_handle.is_valid()); | |
112 CHECK(run_state.write_handle.is_valid()); | |
113 RunState* old_state = NULL; | |
114 { | |
115 base::AutoLock auto_lock(run_state_lock_); | |
116 old_state = run_state_; | |
117 run_state_ = &run_state; | |
118 } | |
119 DoRunLoop(&run_state, delegate); | |
120 { | |
121 base::AutoLock auto_lock(run_state_lock_); | |
122 run_state_ = old_state; | |
123 } | |
124 } | |
125 | |
126 void MessagePumpMojo::Quit() { | |
127 base::AutoLock auto_lock(run_state_lock_); | |
128 if (run_state_) | |
129 run_state_->should_quit = true; | |
130 } | |
131 | |
132 void MessagePumpMojo::ScheduleWork() { | |
133 base::AutoLock auto_lock(run_state_lock_); | |
134 if (run_state_) | |
135 SignalControlPipe(*run_state_); | |
136 } | |
137 | |
138 void MessagePumpMojo::ScheduleDelayedWork( | |
139 const base::TimeTicks& delayed_work_time) { | |
140 base::AutoLock auto_lock(run_state_lock_); | |
141 if (!run_state_) | |
142 return; | |
143 run_state_->delayed_work_time = delayed_work_time; | |
144 } | |
145 | |
146 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) { | |
147 bool more_work_is_plausible = true; | |
148 for (;;) { | |
149 const bool block = !more_work_is_plausible; | |
150 more_work_is_plausible = DoInternalWork(*run_state, block); | |
151 | |
152 if (run_state->should_quit) | |
153 break; | |
154 | |
155 more_work_is_plausible |= delegate->DoWork(); | |
156 if (run_state->should_quit) | |
157 break; | |
158 | |
159 more_work_is_plausible |= delegate->DoDelayedWork( | |
160 &run_state->delayed_work_time); | |
161 if (run_state->should_quit) | |
162 break; | |
163 | |
164 if (more_work_is_plausible) | |
165 continue; | |
166 | |
167 more_work_is_plausible = delegate->DoIdleWork(); | |
168 if (run_state->should_quit) | |
169 break; | |
170 } | |
171 } | |
172 | |
173 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { | |
174 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; | |
175 const WaitState wait_state = GetWaitState(run_state); | |
176 | |
177 const WaitManyResult wait_many_result = | |
178 WaitMany(wait_state.handles, wait_state.wait_signals, deadline, nullptr); | |
179 const MojoResult result = wait_many_result.result; | |
180 bool did_work = true; | |
181 if (result == MOJO_RESULT_OK) { | |
182 if (wait_many_result.index == 0) { | |
183 // Control pipe was written to. | |
184 ReadMessageRaw(run_state.read_handle.get(), NULL, NULL, NULL, NULL, | |
185 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | |
186 } else { | |
187 DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != | |
188 handlers_.end()); | |
189 WillSignalHandler(); | |
190 handlers_[wait_state.handles[wait_many_result.index]] | |
191 .handler->OnHandleReady(wait_state.handles[wait_many_result.index]); | |
192 DidSignalHandler(); | |
193 } | |
194 } else { | |
195 switch (result) { | |
196 case MOJO_RESULT_CANCELLED: | |
197 case MOJO_RESULT_FAILED_PRECONDITION: | |
198 RemoveInvalidHandle(wait_state, result, wait_many_result.index); | |
199 break; | |
200 case MOJO_RESULT_DEADLINE_EXCEEDED: | |
201 did_work = false; | |
202 break; | |
203 default: | |
204 base::debug::Alias(&result); | |
205 // Unexpected result is likely fatal, crash so we can determine cause. | |
206 CHECK(false); | |
207 } | |
208 } | |
209 | |
210 // Notify and remove any handlers whose time has expired. Make a copy in case | |
211 // someone tries to add/remove new handlers from notification. | |
212 const HandleToHandler cloned_handlers(handlers_); | |
213 const base::TimeTicks now(internal::NowTicks()); | |
214 for (HandleToHandler::const_iterator i = cloned_handlers.begin(); | |
215 i != cloned_handlers.end(); ++i) { | |
216 // Since we're iterating over a clone of the handlers, verify the handler is | |
217 // still valid before notifying. | |
218 if (!i->second.deadline.is_null() && i->second.deadline < now && | |
219 handlers_.find(i->first) != handlers_.end() && | |
220 handlers_[i->first].id == i->second.id) { | |
221 WillSignalHandler(); | |
222 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED); | |
223 DidSignalHandler(); | |
224 handlers_.erase(i->first); | |
225 did_work = true; | |
226 } | |
227 } | |
228 return did_work; | |
229 } | |
230 | |
231 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, | |
232 MojoResult result, | |
233 uint32_t index) { | |
234 // TODO(sky): deal with control pipe going bad. | |
235 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION || | |
236 result == MOJO_RESULT_CANCELLED); | |
237 CHECK_NE(index, 0u); // Indicates the control pipe went bad. | |
238 | |
239 // Remove the handle first, this way if OnHandleError() tries to remove the | |
240 // handle our iterator isn't invalidated. | |
241 CHECK(handlers_.find(wait_state.handles[index]) != handlers_.end()); | |
242 MessagePumpMojoHandler* handler = | |
243 handlers_[wait_state.handles[index]].handler; | |
244 handlers_.erase(wait_state.handles[index]); | |
245 WillSignalHandler(); | |
246 handler->OnHandleError(wait_state.handles[index], result); | |
247 DidSignalHandler(); | |
248 } | |
249 | |
250 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) { | |
251 const MojoResult result = | |
252 WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0, | |
253 MOJO_WRITE_MESSAGE_FLAG_NONE); | |
254 // If we can't write we likely won't wake up the thread and there is a strong | |
255 // chance we'll deadlock. | |
256 CHECK_EQ(MOJO_RESULT_OK, result); | |
257 } | |
258 | |
259 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState( | |
260 const RunState& run_state) const { | |
261 WaitState wait_state; | |
262 wait_state.handles.push_back(run_state.read_handle.get()); | |
263 wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); | |
264 | |
265 for (HandleToHandler::const_iterator i = handlers_.begin(); | |
266 i != handlers_.end(); ++i) { | |
267 wait_state.handles.push_back(i->first); | |
268 wait_state.wait_signals.push_back(i->second.wait_signals); | |
269 } | |
270 return wait_state; | |
271 } | |
272 | |
273 MojoDeadline MessagePumpMojo::GetDeadlineForWait( | |
274 const RunState& run_state) const { | |
275 const base::TimeTicks now(internal::NowTicks()); | |
276 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, | |
277 now); | |
278 for (HandleToHandler::const_iterator i = handlers_.begin(); | |
279 i != handlers_.end(); ++i) { | |
280 deadline = std::min( | |
281 TimeTicksToMojoDeadline(i->second.deadline, now), deadline); | |
282 } | |
283 return deadline; | |
284 } | |
285 | |
286 void MessagePumpMojo::WillSignalHandler() { | |
287 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); | |
288 } | |
289 | |
290 void MessagePumpMojo::DidSignalHandler() { | |
291 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); | |
292 } | |
293 | |
294 } // namespace common | |
295 } // namespace mojo | |
OLD | NEW |