Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(537)

Side by Side Diff: mojo/public/platform/dart/dart_handle_watcher.cc

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <mojo/result.h>
6 #include <mojo/system/handle.h>
7 #include <mojo/system/message_pipe.h>
8 #include <mojo/system/time.h>
9 #include <mojo/system/wait.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <sys/time.h>
13
14 #include <memory>
15 #include <mutex>
16 #include <set>
17 #include <unordered_map>
18 #include <vector>
19
20 #include "mojo/public/platform/dart/dart_handle_watcher.h"
21
22 #include "dart/runtime/include/dart_api.h"
23 #include "dart/runtime/include/dart_native_api.h"
24
25 namespace mojo {
26 namespace dart {
27
28 #define CONTROL_HANDLE_INDEX 0
29
30 static void PostNull(Dart_Port port) {
31 if (port == ILLEGAL_PORT) {
32 return;
33 }
34 Dart_CObject message;
35 message.type = Dart_CObject_kNull;
36 Dart_PostCObject(port, &message);
37 }
38
39 static void PostSignal(Dart_Port port, int32_t signalled) {
40 if (port == ILLEGAL_PORT) {
41 return;
42 }
43 Dart_PostInteger(port, signalled);
44 }
45
46 // The internal state of the handle watcher thread.
47 class HandleWatcherThreadState {
48 public:
49 HandleWatcherThreadState(MojoHandle control_pipe_consumer_handle);
50
51 ~HandleWatcherThreadState();
52
53 void Run();
54
55 private:
56 struct HandleWatcherTimer {
57 int64_t deadline;
58 Dart_Port port;
59
60 // Sort on deadline.
61 friend bool operator<(const HandleWatcherTimer& l,
62 const HandleWatcherTimer& r) {
63 return l.deadline < r.deadline;
64 }
65 };
66
67 void AddHandle(MojoHandle handle,
68 MojoHandleSignals signals,
69 Dart_Port port);
70
71 void RemoveHandle(MojoHandle handle);
72
73 void CloseHandle(MojoHandle handle, Dart_Port port, bool pruning = false);
74
75 void UpdateTimer(int64_t deadline, Dart_Port port);
76
77 void Shutdown();
78
79 void RemoveHandleAtIndex(intptr_t i);
80
81 void ProcessControlMessage();
82
83 void ProcessTimers();
84
85 void ProcessWaitManyResults(MojoResult result, uint32_t result_index);
86
87 void PruneClosedHandles(bool signals_state_is_valid);
88
89 void CompleteNextTimer();
90
91 bool HasTimers();
92
93 int64_t NextTimerDeadline();
94
95 int64_t WaitDeadline();
96
97 bool shutdown_;
98
99 MojoHandle control_pipe_consumer_handle_;
100
101 // All of these vectors are indexed together.
102 std::vector<MojoHandle> wait_many_handles_;
103 std::vector<MojoHandleSignals> wait_many_signals_;
104 std::vector<MojoHandleSignalsState> wait_many_signals_state_;
105 std::vector<Dart_Port> handle_ports_;
106
107 // Map from MojoHandle -> index into above arrays.
108 std::unordered_map<MojoHandle, intptr_t> handle_to_index_map_;
109
110 // Set of timers sorted by earliest deadline.
111 std::set<HandleWatcherTimer> timers_;
112
113 MOJO_DISALLOW_COPY_AND_ASSIGN(HandleWatcherThreadState);
114 };
115
116 HandleWatcherThreadState::HandleWatcherThreadState(
117 MojoHandle control_pipe_consumer_handle)
118 : shutdown_(false),
119 control_pipe_consumer_handle_(control_pipe_consumer_handle) {
120 MOJO_CHECK(control_pipe_consumer_handle_ != MOJO_HANDLE_INVALID);
121 // Add the control handle.
122 AddHandle(control_pipe_consumer_handle_,
123 MOJO_HANDLE_SIGNAL_READABLE,
124 ILLEGAL_PORT);
125 }
126
127 HandleWatcherThreadState::~HandleWatcherThreadState() {
128 if (control_pipe_consumer_handle_ != MOJO_HANDLE_INVALID) {
129 MojoClose(control_pipe_consumer_handle_);
130 control_pipe_consumer_handle_ = MOJO_HANDLE_INVALID;
131 }
132 }
133
134 void HandleWatcherThreadState::AddHandle(MojoHandle handle,
135 MojoHandleSignals signals,
136 Dart_Port port) {
137 const size_t index = wait_many_handles_.size();
138 MojoHandleSignalsState signals_state =
139 { MOJO_HANDLE_SIGNAL_NONE, MOJO_HANDLE_SIGNAL_NONE};
140
141 auto it = handle_to_index_map_.find(handle);
142 if (it != handle_to_index_map_.end()) {
143 intptr_t index = it->second;
144 // Sanity check.
145 MOJO_CHECK(wait_many_handles_[index] == handle);
146 // We only support 1:1 mapping from handles to ports.
147 if (handle_ports_[index] != port) {
148 MOJO_LOG(ERROR) << "(Dart Handle Watcher) "
149 << "Handle " << handle << " is already bound!";
150 PostSignal(port, MOJO_HANDLE_SIGNAL_PEER_CLOSED);
151 return;
152 }
153 // Adjust the signals for this handle.
154 wait_many_signals_[index] |= signals;
155 } else {
156 // New handle.
157 wait_many_handles_.push_back(handle);
158 wait_many_signals_.push_back(signals);
159 wait_many_signals_state_.push_back(signals_state);
160 handle_ports_.push_back(port);
161 handle_to_index_map_[handle] = index;
162 }
163
164 // Sanity check.
165 MOJO_CHECK(wait_many_handles_.size() == handle_to_index_map_.size());
166 }
167
168 void HandleWatcherThreadState::RemoveHandle(MojoHandle handle) {
169 auto it = handle_to_index_map_.find(handle);
170
171 // Removal of a handle for an incoming event can race with the removal of
172 // a handle for an unsubscribe() call on the Dart MojoEventSubscription.
173 // This is not an error, so we ignore attempts to remove a handle that is not
174 // in the map.
175 if (it == handle_to_index_map_.end()) {
176 return;
177 }
178 const intptr_t index = it->second;
179 // We should never be removing the control handle.
180 MOJO_CHECK(index != CONTROL_HANDLE_INDEX);
181 RemoveHandleAtIndex(index);
182 }
183
184 void HandleWatcherThreadState::CloseHandle(MojoHandle handle,
185 Dart_Port port,
186 bool pruning) {
187 MOJO_CHECK(!pruning || (port == ILLEGAL_PORT));
188 auto it = handle_to_index_map_.find(handle);
189 if (it == handle_to_index_map_.end()) {
190 // An app isolate may request that the handle watcher close a handle that
191 // has already been pruned. This happens when the app isolate has not yet
192 // received the PEER_CLOSED event. The app isolate will not close the
193 // handle, so we must do so here.
194 MojoClose(handle);
195 if (port != ILLEGAL_PORT) {
196 // Notify that close is done.
197 PostNull(port);
198 }
199 return;
200 }
201 MojoClose(handle);
202 if (port != ILLEGAL_PORT) {
203 // Notify that close is done.
204 PostNull(port);
205 }
206 const intptr_t index = it->second;
207 MOJO_CHECK(index != CONTROL_HANDLE_INDEX);
208 if (pruning) {
209 // If this handle is being pruned, notify the application isolate
210 // by sending PEER_CLOSED;
211 PostSignal(handle_ports_[index], MOJO_HANDLE_SIGNAL_PEER_CLOSED);
212 }
213 // Remove the handle.
214 RemoveHandle(handle);
215 }
216
217 void HandleWatcherThreadState::UpdateTimer(int64_t deadline, Dart_Port port) {
218 // Scan the timers to see if we have a timer with |port|.
219 auto it = timers_.begin();
220 while (it != timers_.end()) {
221 if (it->port == port) {
222 break;
223 }
224 it++;
225 }
226
227 // We found an existing timer with |port|. Remove it.
228 if (it != timers_.end()) {
229 timers_.erase(it);
230 }
231
232 if (deadline < 0) {
233 // A negative deadline means we should cancel this timer completely.
234 return;
235 }
236
237 // Create a new timer with the current deadline.
238 HandleWatcherTimer timer;
239 timer.deadline = deadline;
240 timer.port = port;
241
242 timers_.insert(timer);
243 }
244
245 void HandleWatcherThreadState::Shutdown() {
246 // Break out of the loop by setting the shutdown_ to true.
247 shutdown_ = true;
248 }
249
250 void HandleWatcherThreadState::RemoveHandleAtIndex(intptr_t index) {
251 MOJO_CHECK(index != CONTROL_HANDLE_INDEX);
252 const intptr_t last_index = wait_many_handles_.size() - 1;
253
254 // Remove handle from handle map.
255 handle_to_index_map_.erase(wait_many_handles_[index]);
256
257 if (index != last_index) {
258 // We should never be overwriting CONTROL_HANDLE_INDEX.
259
260 MojoHandle handle = wait_many_handles_[last_index];
261
262 // Replace |index| with |last_index|.
263 wait_many_handles_[index] = wait_many_handles_[last_index];
264 wait_many_signals_[index] = wait_many_signals_[last_index];
265 wait_many_signals_state_[index] = wait_many_signals_state_[last_index];
266 handle_ports_[index] = handle_ports_[last_index];
267
268 // Update handle map.
269 handle_to_index_map_[handle] = index;
270 }
271
272 wait_many_handles_.pop_back();
273 wait_many_signals_.pop_back();
274 wait_many_signals_state_.pop_back();
275 handle_ports_.pop_back();
276 MOJO_CHECK(wait_many_handles_.size() >= 1);
277
278 // Sanity check.
279 MOJO_CHECK(wait_many_handles_.size() == handle_to_index_map_.size());
280 }
281
282 void HandleWatcherThreadState::ProcessControlMessage() {
283 HandleWatcherCommand command = HandleWatcherCommand::Empty();
284 uint32_t num_bytes = sizeof(command);
285 uint32_t num_handles = 0;
286 MojoResult res = MojoReadMessage(control_pipe_consumer_handle_,
287 reinterpret_cast<void*>(&command),
288 &num_bytes,
289 nullptr,
290 &num_handles,
291 0);
292 // Sanity check that we received the expected amount of data.
293 MOJO_CHECK(res == MOJO_RESULT_OK);
294 MOJO_CHECK(num_bytes == sizeof(command));
295 MOJO_CHECK(num_handles == 0);
296 switch (command.command()) {
297 case HandleWatcherCommand::kCommandAddHandle:
298 AddHandle(command.handle(), command.signals(), command.port());
299 break;
300 case HandleWatcherCommand::kCommandRemoveHandle:
301 RemoveHandle(command.handle());
302 break;
303 case HandleWatcherCommand::kCommandCloseHandle:
304 CloseHandle(command.handle(), command.port());
305 break;
306 case HandleWatcherCommand::kCommandAddTimer:
307 UpdateTimer(command.deadline(), command.port());
308 break;
309 case HandleWatcherCommand::kCommandShutdownHandleWatcher:
310 Shutdown();
311 break;
312 default:
313 MOJO_CHECK(false);
314 break;
315 }
316 }
317
318 // Dart's Timer class uses MojoCoreNatives.timerMillisecondClock(), which
319 // calls MojoGetTimeTicksNow() and divides by 1000;
320 static int64_t GetDartTimeInMillis() {
321 MojoTimeTicks ticks = MojoGetTimeTicksNow();
322 return static_cast<int64_t>(ticks) / 1000;
323 }
324
325 void HandleWatcherThreadState::ProcessTimers() {
326 int64_t now = GetDartTimeInMillis();
327 while (HasTimers() && now >= NextTimerDeadline()) {
328 CompleteNextTimer();
329 now = GetDartTimeInMillis();
330 }
331 }
332
333 void HandleWatcherThreadState::CompleteNextTimer() {
334 auto it = timers_.begin();
335 MOJO_CHECK(it != timers_.end());
336 // Notify that the timer is complete.
337 PostNull(it->port);
338 // Remove it from the timer set.
339 timers_.erase(it);
340 }
341
342 bool HandleWatcherThreadState::HasTimers() {
343 return !timers_.empty();
344 }
345
346 int64_t HandleWatcherThreadState::NextTimerDeadline() {
347 auto it = timers_.begin();
348 MOJO_CHECK(it != timers_.end());
349 return it->deadline;
350 }
351
352 int64_t HandleWatcherThreadState::WaitDeadline() {
353 if (!HasTimers()) {
354 // No pending timers. Wait indefinitely.
355 return MOJO_DEADLINE_INDEFINITE;
356 }
357 int64_t now = GetDartTimeInMillis();
358 return (NextTimerDeadline() - now) * 1000;
359 }
360
361 static bool ShouldCloseHandle(MojoHandle handle) {
362 if (handle == MOJO_HANDLE_INVALID) {
363 return false;
364 }
365 // Call wait with a deadline of 0. If the result of this is OK or
366 // DEADLINE_EXCEEDED, the handle is still open.
367 MojoResult result = MojoWait(handle, MOJO_HANDLE_SIGNAL_ALL, 0, NULL);
368 return (result != MOJO_RESULT_OK) &&
369 (result != MOJO_RESULT_DEADLINE_EXCEEDED);
370 }
371
372 void HandleWatcherThreadState::PruneClosedHandles(bool signals_state_is_valid) {
373 std::vector<MojoHandle> closed_handles;
374 const intptr_t num_handles = wait_many_handles_.size();
375 if (signals_state_is_valid) {
376 // We can rely on |wait_many_signals_state_| having valid data.
377 for (intptr_t i = 0; i < num_handles; i++) {
378 // Check if the handle at index |i| has been closed.
379 MojoHandleSignals satisfied_signals =
380 wait_many_signals_state_[i].satisfied_signals;
381 if ((satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0) {
382 closed_handles.push_back(wait_many_handles_[i]);
383 }
384 }
385 } else {
386 // We can't rely on |wait_many_signals_state_| having valid data. So
387 // we call Wait on each handle and check the status.
388 for (intptr_t i = 0; i < num_handles; i++) {
389 MojoHandle handle = wait_many_handles_[i];
390 if (ShouldCloseHandle(handle)) {
391 closed_handles.push_back(handle);
392 }
393 }
394 }
395
396 // Process all closed handles and notify their ports.
397 for (size_t i = 0; i < closed_handles.size(); i++) {
398 MojoHandle handle = closed_handles[i];
399 CloseHandle(handle, ILLEGAL_PORT, true);
400 }
401 }
402
403 void HandleWatcherThreadState::ProcessWaitManyResults(MojoResult result,
404 uint32_t result_index) {
405 MOJO_CHECK(result != MOJO_RESULT_DEADLINE_EXCEEDED);
406 if (result != MOJO_RESULT_OK) {
407 // The WaitMany call failed. We need to prune closed handles from our
408 // wait many set and try again.
409 //
410 // If the result is an invalid argument |wait_many_signals_state_| is
411 // meaningless.
412 PruneClosedHandles(result != MOJO_RESULT_INVALID_ARGUMENT);
413 return;
414 }
415 MOJO_CHECK(result == MOJO_RESULT_OK);
416
417 // Indexes of handles that we are done with.
418 std::vector<intptr_t> to_remove;
419
420 const intptr_t num_handles = wait_many_handles_.size();
421
422 // Loop over all handles except for the control handle.
423 // The order of the looping matters because we call RemoveHandleAtIndex
424 // and need the handle indexes to start at the highest and decrease.
425 for (intptr_t i = num_handles - 1; i > 0; i--) {
426 MojoHandleSignals signals = wait_many_signals_[i];
427 MojoHandleSignals satisfied_signals =
428 wait_many_signals_state_[i].satisfied_signals;
429 satisfied_signals &= signals;
430 if (satisfied_signals != 0) {
431 // Something happened to this handle.
432
433 // Notify the port.
434 PostSignal(handle_ports_[i], satisfied_signals);
435
436 // Now that we have notified the waiting Dart program, remove this handle
437 // from the wait many set until we are requested to add it again.
438 to_remove.push_back(i);
439 }
440 }
441
442 // Remove any handles we are finished with.
443 const intptr_t num_to_remove = to_remove.size();
444 for (intptr_t i = 0; i < num_to_remove; i++) {
445 RemoveHandleAtIndex(to_remove[i]);
446 }
447
448 // Now check for control messages.
449 {
450 MojoHandleSignals signals = wait_many_signals_[CONTROL_HANDLE_INDEX];
451 MojoHandleSignals satisfied_signals =
452 wait_many_signals_state_[CONTROL_HANDLE_INDEX].satisfied_signals;
453 satisfied_signals &= signals;
454 if (satisfied_signals != 0) {
455 // We have a control message.
456 ProcessControlMessage();
457 }
458 }
459 }
460
461 void HandleWatcherThreadState::Run() {
462 while (!shutdown_) {
463 // Process timers.
464 ProcessTimers();
465 // Wait for the next timer or an event on a handle.
466 uint32_t result_index = -1;
467 uint32_t num_handles = wait_many_handles_.size();
468 MOJO_CHECK(wait_many_signals_.size() == num_handles);
469 MojoResult result = MojoWaitMany(wait_many_handles_.data(),
470 wait_many_signals_.data(),
471 num_handles,
472 WaitDeadline(),
473 &result_index,
474 wait_many_signals_state_.data());
475
476 if (result == MOJO_RESULT_DEADLINE_EXCEEDED) {
477 // Timers are ready.
478 continue;
479 }
480
481 // Process wait results.
482 ProcessWaitManyResults(result, result_index);
483 }
484
485 // Close our end of the message pipe.
486 MojoClose(control_pipe_consumer_handle_);
487 }
488
489 std::unordered_map<MojoHandle, std::thread*>
490 HandleWatcher::handle_watcher_threads_;
491 std::mutex HandleWatcher::handle_watcher_threads_mutex_;
492
493 // Create a message pipe for communication and spawns a handle watcher thread.
494 MojoHandle HandleWatcher::Start() {
495 MojoCreateMessagePipeOptions options;
496 options.struct_size = sizeof(MojoCreateMessagePipeOptions);
497 options.flags = MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
498
499 MojoHandle control_pipe_consumer_handle = MOJO_HANDLE_INVALID;
500 MojoHandle control_pipe_producer_handle = MOJO_HANDLE_INVALID;
501 MojoResult res = MojoCreateMessagePipe(&options,
502 &control_pipe_consumer_handle,
503 &control_pipe_producer_handle);
504 if (res != MOJO_RESULT_OK) {
505 return MOJO_HANDLE_INVALID;
506 }
507
508 // Spawn thread and pass both ends of the pipe to it.
509 std::thread* thread = new std::thread(
510 ThreadMain, control_pipe_consumer_handle);
511
512 {
513 std::lock_guard<std::mutex> lock(handle_watcher_threads_mutex_);
514 // Record the thread object so that we can join on it during shutdown.
515 MOJO_CHECK(handle_watcher_threads_.find(control_pipe_producer_handle) ==
516 handle_watcher_threads_.end());
517 handle_watcher_threads_[control_pipe_producer_handle] = thread;
518 }
519
520 // Return producer end of pipe to caller.
521 return control_pipe_producer_handle;
522 }
523
524 void HandleWatcher::ThreadMain(MojoHandle control_pipe_consumer_handle) {
525 HandleWatcherThreadState state(control_pipe_consumer_handle);
526
527 // Run the main loop. When this returns the handle watcher has exited.
528 state.Run();
529 }
530
531 MojoResult HandleWatcher::SendCommand(MojoHandle control_pipe_producer_handle,
532 const HandleWatcherCommand& command) {
533 return MojoWriteMessage(control_pipe_producer_handle,
534 reinterpret_cast<const void*>(&command),
535 sizeof(command),
536 nullptr,
537 0,
538 0);
539 }
540
541 std::thread* HandleWatcher::RemoveLocked(MojoHandle handle) {
542 std::thread* t;
543 auto mapping = handle_watcher_threads_.find(handle);
544 if (mapping == handle_watcher_threads_.end()) {
545 return nullptr;
546 }
547 t = mapping->second;
548 handle_watcher_threads_.erase(handle);
549 return t;
550 }
551
552 void HandleWatcher::Stop(MojoHandle control_pipe_producer_handle) {
553 std::thread *t;
554 {
555 std::lock_guard<std::mutex> lock(handle_watcher_threads_mutex_);
556 t = RemoveLocked(control_pipe_producer_handle);
557 }
558
559 if (t == nullptr) {
560 return;
561 }
562
563 SendCommand(control_pipe_producer_handle, HandleWatcherCommand::Shutdown());
564 t->join();
565
566 MojoClose(control_pipe_producer_handle);
567 delete t;
568 }
569
570 void HandleWatcher::StopLocked(MojoHandle handle) {
571 std::thread *t = RemoveLocked(handle);
572 MOJO_CHECK(t != nullptr);
573
574 SendCommand(handle, HandleWatcherCommand::Shutdown());
575 t->join();
576
577 MojoClose(handle);
578 delete t;
579 }
580
581 void HandleWatcher::StopAll() {
582 std::lock_guard<std::mutex> lock(handle_watcher_threads_mutex_);
583
584 std::vector<MojoHandle> control_handles;
585 control_handles.reserve(handle_watcher_threads_.size());
586
587 for (const auto& it : handle_watcher_threads_) {
588 control_handles.push_back(it.first);
589 }
590
591 for (auto it : control_handles) {
592 StopLocked(it);
593 }
594 }
595
596 } // namespace dart
597 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/platform/dart/dart_handle_watcher.h ('k') | mojo/public/platform/dart/mojo_natives.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698