Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(490)

Side by Side Diff: mojo/message_pump/message_pump_mojo.cc

Issue 1841863002: Update monet. (Closed) Base URL: https://github.com/domokit/monet.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | mojo/message_pump/message_pump_mojo_handler.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/common/message_pump_mojo.h" 5 #include "mojo/message_pump/message_pump_mojo.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <vector> 8 #include <vector>
9 9
10 #include "base/debug/alias.h" 10 #include "base/debug/alias.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h" 11 #include "base/logging.h"
13 #include "base/threading/thread_local.h" 12 #include "base/threading/thread_local.h"
14 #include "base/time/time.h" 13 #include "base/time/time.h"
15 #include "mojo/common/message_pump_mojo_handler.h" 14 #include "mojo/message_pump/message_pump_mojo_handler.h"
16 #include "mojo/common/time_helper.h" 15 #include "mojo/message_pump/time_helper.h"
16 #include "mojo/public/cpp/system/message_pipe.h"
17 #include "mojo/public/cpp/system/wait.h"
17 18
18 namespace mojo { 19 namespace mojo {
19 namespace common { 20 namespace common {
20 namespace { 21 namespace {
21 22
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky 23 base::ThreadLocalPointer<MessagePumpMojo>* CurrentPump() {
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER; 24 static auto* tls = new base::ThreadLocalPointer<MessagePumpMojo>;
25 return tls;
26 }
24 27
25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks, 28 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
26 base::TimeTicks now) { 29 base::TimeTicks now) {
27 // The is_null() check matches that of HandleWatcher as well as how 30 // The is_null() check matches that of HandleWatcher as well as how
28 // |delayed_work_time| is used. 31 // |delayed_work_time| is used.
29 if (time_ticks.is_null()) 32 if (time_ticks.is_null())
30 return MOJO_DEADLINE_INDEFINITE; 33 return MOJO_DEADLINE_INDEFINITE;
31 const int64_t delta = (time_ticks - now).InMicroseconds(); 34 const int64_t delta = (time_ticks - now).InMicroseconds();
32 return delta < 0 ? static_cast<MojoDeadline>(0) : 35 return delta < 0 ? static_cast<MojoDeadline>(0) :
33 static_cast<MojoDeadline>(delta); 36 static_cast<MojoDeadline>(delta);
34 } 37 }
35 38
36 } // namespace 39 } // namespace
37 40
38 // State needed for one iteration of WaitMany. The first handle and flags 41 // State needed for one iteration of WaitMany. The first handle and flags
39 // corresponds to that of the control pipe. 42 // corresponds to that of the control pipe.
40 struct MessagePumpMojo::WaitState { 43 struct MessagePumpMojo::WaitState {
41 std::vector<Handle> handles; 44 std::vector<Handle> handles;
42 std::vector<MojoHandleSignals> wait_signals; 45 std::vector<MojoHandleSignals> wait_signals;
43 }; 46 };
44 47
45 struct MessagePumpMojo::RunState { 48 struct MessagePumpMojo::RunState {
46 RunState() : should_quit(false) { 49 RunState() : should_quit(false) {
47 CreateMessagePipe(NULL, &read_handle, &write_handle); 50 CreateMessagePipe(nullptr, &read_handle, &write_handle);
48 } 51 }
49 52
50 base::TimeTicks delayed_work_time; 53 base::TimeTicks delayed_work_time;
51 54
52 // Used to wake up WaitForWork(). 55 // Used to wake up WaitForWork().
53 ScopedMessagePipeHandle read_handle; 56 ScopedMessagePipeHandle read_handle;
54 ScopedMessagePipeHandle write_handle; 57 ScopedMessagePipeHandle write_handle;
55 58
59 // Cached structures to avoid the heap allocation cost of std::vector<>.
60 scoped_ptr<WaitState> wait_state;
61 scoped_ptr<HandleToHandlerList> cloned_handlers;
62
56 bool should_quit; 63 bool should_quit;
57 }; 64 };
58 65
59 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) { 66 MessagePumpMojo::MessagePumpMojo() : run_state_(nullptr), next_handler_id_(0) {
60 DCHECK(!current()) 67 DCHECK(!current())
61 << "There is already a MessagePumpMojo instance on this thread."; 68 << "There is already a MessagePumpMojo instance on this thread.";
62 g_tls_current_pump.Pointer()->Set(this); 69 CurrentPump()->Set(this);
63 } 70 }
64 71
65 MessagePumpMojo::~MessagePumpMojo() { 72 MessagePumpMojo::~MessagePumpMojo() {
66 DCHECK_EQ(this, current()); 73 DCHECK_EQ(this, current());
67 g_tls_current_pump.Pointer()->Set(NULL); 74 CurrentPump()->Set(nullptr);
68 } 75 }
69 76
70 // static 77 // static
71 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() { 78 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
72 return scoped_ptr<MessagePump>(new MessagePumpMojo()); 79 return scoped_ptr<MessagePump>(new MessagePumpMojo());
73 } 80 }
74 81
75 // static 82 // static
76 MessagePumpMojo* MessagePumpMojo::current() { 83 MessagePumpMojo* MessagePumpMojo::current() {
77 return g_tls_current_pump.Pointer()->Get(); 84 return CurrentPump()->Get();
78 } 85 }
79 86
80 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler, 87 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
81 const Handle& handle, 88 const Handle& handle,
82 MojoHandleSignals wait_signals, 89 MojoHandleSignals wait_signals,
83 base::TimeTicks deadline) { 90 base::TimeTicks deadline) {
84 CHECK(handler); 91 CHECK(handler);
85 DCHECK(handle.is_valid()); 92 DCHECK(handle.is_valid());
86 // Assume it's an error if someone tries to reregister an existing handle. 93 // Assume it's an error if someone tries to reregister an existing handle.
87 CHECK_EQ(0u, handlers_.count(handle)); 94 CHECK_EQ(0u, handlers_.count(handle));
(...skipping 15 matching lines...) Expand all
103 110
104 void MessagePumpMojo::RemoveObserver(Observer* observer) { 111 void MessagePumpMojo::RemoveObserver(Observer* observer) {
105 observers_.RemoveObserver(observer); 112 observers_.RemoveObserver(observer);
106 } 113 }
107 114
108 void MessagePumpMojo::Run(Delegate* delegate) { 115 void MessagePumpMojo::Run(Delegate* delegate) {
109 RunState run_state; 116 RunState run_state;
110 // TODO: better deal with error handling. 117 // TODO: better deal with error handling.
111 CHECK(run_state.read_handle.is_valid()); 118 CHECK(run_state.read_handle.is_valid());
112 CHECK(run_state.write_handle.is_valid()); 119 CHECK(run_state.write_handle.is_valid());
113 RunState* old_state = NULL; 120 RunState* old_state = nullptr;
114 { 121 {
115 base::AutoLock auto_lock(run_state_lock_); 122 base::AutoLock auto_lock(run_state_lock_);
116 old_state = run_state_; 123 old_state = run_state_;
117 run_state_ = &run_state; 124 run_state_ = &run_state;
118 } 125 }
119 DoRunLoop(&run_state, delegate); 126 DoRunLoop(&run_state, delegate);
120 { 127 {
121 base::AutoLock auto_lock(run_state_lock_); 128 base::AutoLock auto_lock(run_state_lock_);
122 run_state_ = old_state; 129 run_state_ = old_state;
123 } 130 }
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
165 continue; 172 continue;
166 173
167 more_work_is_plausible = delegate->DoIdleWork(); 174 more_work_is_plausible = delegate->DoIdleWork();
168 if (run_state->should_quit) 175 if (run_state->should_quit)
169 break; 176 break;
170 } 177 }
171 } 178 }
172 179
173 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) { 180 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
174 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0; 181 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
175 const WaitState wait_state = GetWaitState(run_state); 182 if (!run_state_->wait_state)
183 run_state_->wait_state.reset(new WaitState);
184 GetWaitState(run_state, run_state_->wait_state.get());
176 185
177 const WaitManyResult wait_many_result = 186 const WaitManyResult wait_many_result =
178 WaitMany(wait_state.handles, wait_state.wait_signals, deadline, nullptr); 187 WaitMany(run_state_->wait_state->handles,
188 run_state_->wait_state->wait_signals, deadline, nullptr);
179 const MojoResult result = wait_many_result.result; 189 const MojoResult result = wait_many_result.result;
180 bool did_work = true; 190 bool did_work = true;
181 if (result == MOJO_RESULT_OK) { 191 if (result == MOJO_RESULT_OK) {
182 if (wait_many_result.index == 0) { 192 if (wait_many_result.index == 0) {
183 // Control pipe was written to. 193 // Control pipe was written to.
184 ReadMessageRaw(run_state.read_handle.get(), NULL, NULL, NULL, NULL, 194 ReadMessageRaw(run_state.read_handle.get(), nullptr, nullptr, nullptr,
185 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); 195 nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
186 } else { 196 } else {
187 DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) != 197 DCHECK(handlers_.find(
198 run_state_->wait_state->handles[wait_many_result.index]) !=
188 handlers_.end()); 199 handlers_.end());
189 WillSignalHandler(); 200 WillSignalHandler();
190 handlers_[wait_state.handles[wait_many_result.index]] 201 handlers_[run_state_->wait_state->handles[wait_many_result.index]]
191 .handler->OnHandleReady(wait_state.handles[wait_many_result.index]); 202 .handler->OnHandleReady(
203 run_state_->wait_state->handles[wait_many_result.index]);
192 DidSignalHandler(); 204 DidSignalHandler();
193 } 205 }
194 } else { 206 } else {
195 switch (result) { 207 switch (result) {
196 case MOJO_RESULT_CANCELLED:
197 case MOJO_RESULT_FAILED_PRECONDITION: 208 case MOJO_RESULT_FAILED_PRECONDITION:
198 RemoveInvalidHandle(wait_state, result, wait_many_result.index); 209 RemoveInvalidHandle(*run_state_->wait_state, result,
210 wait_many_result.index);
199 break; 211 break;
200 case MOJO_RESULT_DEADLINE_EXCEEDED: 212 case MOJO_RESULT_DEADLINE_EXCEEDED:
201 did_work = false; 213 did_work = false;
202 break; 214 break;
215 case MOJO_RESULT_INVALID_ARGUMENT:
216 case MOJO_RESULT_CANCELLED:
217 case MOJO_RESULT_BUSY:
218 // These results indicate a bug in "our" code (e.g., race conditions).
219 // Fall through.
203 default: 220 default:
204 base::debug::Alias(&result); 221 base::debug::Alias(&result);
205 // Unexpected result is likely fatal, crash so we can determine cause. 222 // Unexpected result is likely fatal, crash so we can determine cause.
206 CHECK(false); 223 CHECK(false);
207 } 224 }
208 } 225 }
226 // To keep memory usage under control, delete the WaitState object at the end
227 // if it's vectors are too big by a factor of 2. Pre-C++11 doesn't have a way
228 // to shrink vectors, so just get rid of them and re-create on the next round.
229 if (run_state_->wait_state->handles.capacity() >
230 2 * run_state_->wait_state->handles.size()) {
231 // NOTE: |handles| and |wait_signals| are always in sync, so it's reasonable
232 // to only check one of those.
233 run_state_->wait_state.reset();
234 }
209 235
210 // Notify and remove any handlers whose time has expired. Make a copy in case 236 // Notify and remove any handlers whose time has expired. Make a copy in case
211 // someone tries to add/remove new handlers from notification. 237 // someone tries to add/remove new handlers from notification.
212 const HandleToHandler cloned_handlers(handlers_); 238 if (!run_state_->cloned_handlers) {
239 run_state_->cloned_handlers.reset(new HandleToHandlerList);
240 } else {
241 run_state_->cloned_handlers->clear();
242 }
243 run_state_->cloned_handlers->reserve(handlers_.size());
244 for (const auto& handler : handlers_) {
245 run_state_->cloned_handlers->push_back(handler);
246 }
213 const base::TimeTicks now(internal::NowTicks()); 247 const base::TimeTicks now(internal::NowTicks());
214 for (HandleToHandler::const_iterator i = cloned_handlers.begin(); 248 for (HandleToHandlerList::const_iterator i =
215 i != cloned_handlers.end(); ++i) { 249 run_state_->cloned_handlers->begin();
250 i != run_state_->cloned_handlers->end(); ++i) {
216 // Since we're iterating over a clone of the handlers, verify the handler is 251 // Since we're iterating over a clone of the handlers, verify the handler is
217 // still valid before notifying. 252 // still valid before notifying.
218 if (!i->second.deadline.is_null() && i->second.deadline < now && 253 if (!i->second.deadline.is_null() && i->second.deadline < now &&
219 handlers_.find(i->first) != handlers_.end() && 254 handlers_.find(i->first) != handlers_.end() &&
220 handlers_[i->first].id == i->second.id) { 255 handlers_[i->first].id == i->second.id) {
221 WillSignalHandler(); 256 WillSignalHandler();
222 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED); 257 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
223 DidSignalHandler(); 258 DidSignalHandler();
224 handlers_.erase(i->first); 259 handlers_.erase(i->first);
225 did_work = true; 260 did_work = true;
226 } 261 }
227 } 262 }
263 if (run_state_->cloned_handlers->capacity() >
264 2 * run_state_->cloned_handlers->size()) {
265 run_state_->cloned_handlers.reset();
266 }
228 return did_work; 267 return did_work;
229 } 268 }
230 269
231 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state, 270 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state,
232 MojoResult result, 271 MojoResult result,
233 uint32_t index) { 272 uint32_t index) {
234 // TODO(sky): deal with control pipe going bad. 273 // TODO(sky): deal with control pipe going bad.
235 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION || 274 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
236 result == MOJO_RESULT_CANCELLED); 275 result == MOJO_RESULT_CANCELLED);
237 CHECK_NE(index, 0u); // Indicates the control pipe went bad. 276 CHECK_NE(index, 0u); // Indicates the control pipe went bad.
238 277
239 // Remove the handle first, this way if OnHandleError() tries to remove the 278 // Remove the handle first, this way if OnHandleError() tries to remove the
240 // handle our iterator isn't invalidated. 279 // handle our iterator isn't invalidated.
241 CHECK(handlers_.find(wait_state.handles[index]) != handlers_.end()); 280 CHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
242 MessagePumpMojoHandler* handler = 281 MessagePumpMojoHandler* handler =
243 handlers_[wait_state.handles[index]].handler; 282 handlers_[wait_state.handles[index]].handler;
244 handlers_.erase(wait_state.handles[index]); 283 handlers_.erase(wait_state.handles[index]);
245 WillSignalHandler(); 284 WillSignalHandler();
246 handler->OnHandleError(wait_state.handles[index], result); 285 handler->OnHandleError(wait_state.handles[index], result);
247 DidSignalHandler(); 286 DidSignalHandler();
248 } 287 }
249 288
250 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) { 289 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) {
251 const MojoResult result = 290 const MojoResult result =
252 WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0, 291 WriteMessageRaw(run_state.write_handle.get(), nullptr, 0, nullptr, 0,
253 MOJO_WRITE_MESSAGE_FLAG_NONE); 292 MOJO_WRITE_MESSAGE_FLAG_NONE);
254 // If we can't write we likely won't wake up the thread and there is a strong 293 // If we can't write we likely won't wake up the thread and there is a strong
255 // chance we'll deadlock. 294 // chance we'll deadlock.
256 CHECK_EQ(MOJO_RESULT_OK, result); 295 CHECK_EQ(MOJO_RESULT_OK, result);
257 } 296 }
258 297
259 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState( 298 void MessagePumpMojo::GetWaitState(
260 const RunState& run_state) const { 299 const RunState& run_state,
261 WaitState wait_state; 300 MessagePumpMojo::WaitState* wait_state) const {
262 wait_state.handles.push_back(run_state.read_handle.get()); 301 const size_t num_handles = handlers_.size() + 1;
263 wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE); 302 wait_state->handles.clear();
303 wait_state->handles.reserve(num_handles);
304 wait_state->wait_signals.clear();
305 wait_state->wait_signals.reserve(num_handles);
306 wait_state->handles.push_back(run_state.read_handle.get());
307 wait_state->wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
264 308
265 for (HandleToHandler::const_iterator i = handlers_.begin(); 309 for (HandleToHandler::const_iterator i = handlers_.begin();
266 i != handlers_.end(); ++i) { 310 i != handlers_.end(); ++i) {
267 wait_state.handles.push_back(i->first); 311 wait_state->handles.push_back(i->first);
268 wait_state.wait_signals.push_back(i->second.wait_signals); 312 wait_state->wait_signals.push_back(i->second.wait_signals);
269 } 313 }
270 return wait_state;
271 } 314 }
272 315
273 MojoDeadline MessagePumpMojo::GetDeadlineForWait( 316 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
274 const RunState& run_state) const { 317 const RunState& run_state) const {
275 const base::TimeTicks now(internal::NowTicks()); 318 const base::TimeTicks now(internal::NowTicks());
276 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time, 319 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
277 now); 320 now);
278 for (HandleToHandler::const_iterator i = handlers_.begin(); 321 for (HandleToHandler::const_iterator i = handlers_.begin();
279 i != handlers_.end(); ++i) { 322 i != handlers_.end(); ++i) {
280 deadline = std::min( 323 deadline = std::min(
281 TimeTicksToMojoDeadline(i->second.deadline, now), deadline); 324 TimeTicksToMojoDeadline(i->second.deadline, now), deadline);
282 } 325 }
283 return deadline; 326 return deadline;
284 } 327 }
285 328
286 void MessagePumpMojo::WillSignalHandler() { 329 void MessagePumpMojo::WillSignalHandler() {
287 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler()); 330 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
288 } 331 }
289 332
290 void MessagePumpMojo::DidSignalHandler() { 333 void MessagePumpMojo::DidSignalHandler() {
291 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler()); 334 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
292 } 335 }
293 336
294 } // namespace common 337 } // namespace common
295 } // namespace mojo 338 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/message_pump/message_pump_mojo.h ('k') | mojo/message_pump/message_pump_mojo_handler.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698