OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE.md file. | |
4 | |
5 library dart.fletch; | |
6 | |
7 import 'dart:fletch._system' as fletch; | |
8 | |
9 /// Fibers are lightweight co-operative multitask units of execution. They | |
10 /// are scheduled on top of OS-level threads, but they are cheap to create | |
11 /// and block. | |
12 class Fiber { | |
13 | |
14 // We keep track of the top of the coroutine stack and | |
15 // the list of other fibers that are waiting for this | |
16 // fiber to exit. | |
17 Coroutine _coroutine; | |
18 List<Fiber> _joiners; | |
19 | |
20 // When a fiber exits, we keep the result of running | |
21 // its code around so we can return to other fibers | |
22 // that join it. | |
23 bool _isDone = false; | |
24 var _result; | |
25 | |
26 // The ready fibers are linked together. | |
27 Fiber _previous; | |
28 Fiber _next; | |
29 | |
30 // We do not use an initializer for [_current] to ensure we can treeshake | |
31 // the [Fiber] class. | |
32 static Fiber _current; | |
33 static Fiber _idleFibers; | |
34 | |
35 Fiber._initial() { | |
36 _previous = this; | |
37 _next = this; | |
38 } | |
39 | |
40 Fiber._forked(entry) { | |
41 current; // Force initialization of fiber sub-system. | |
42 _coroutine = new Coroutine((ignore) { | |
43 fletch.runToEnd(entry); | |
44 }); | |
45 } | |
46 | |
47 static Fiber get current { | |
48 var current = _current; | |
49 if (current != null) return current; | |
50 return _current = new Fiber._initial(); | |
51 } | |
52 | |
53 static Fiber fork(entry) { | |
54 Fiber fiber = new Fiber._forked(entry); | |
55 _markReady(fiber); | |
56 return fiber; | |
57 } | |
58 | |
59 static void yield() { | |
60 // Handle messages so that fibers that are blocked on receiving | |
61 // messages can wake up. | |
62 Process._handleMessages(); | |
63 _current._coroutine = Coroutine._coroutineCurrent(); | |
64 _schedule(_current._next); | |
65 } | |
66 | |
67 static void exit([value]) { | |
68 // If we never created a fiber, we can just go ahead and halt now. | |
69 if (_current == null) fletch.halt(); | |
70 | |
71 _current._exit(value); | |
72 } | |
73 | |
74 join() { | |
75 // If the fiber is already done, we just return the result. | |
76 if (_isDone) return _result; | |
77 | |
78 // Add the current fiber to the list of fibers waiting | |
79 // to join [this] fiber. | |
80 Fiber fiber = _current; | |
81 if (_joiners == null) { | |
82 _joiners = [ fiber ]; | |
83 } else { | |
84 _joiners.add(fiber); | |
85 } | |
86 | |
87 // Suspend the current fiber and change to the scheduler. | |
88 // When we get back, the [this] fiber has exited and we | |
89 // can go ahead and return the result. | |
90 Fiber next = _suspendFiber(fiber, false); | |
91 fiber._coroutine = Coroutine._coroutineCurrent(); | |
92 _schedule(next); | |
93 return _result; | |
94 } | |
95 | |
96 void _exit(value) { | |
97 Fiber fiber = this; | |
98 List<Fiber> joiners = fiber._joiners; | |
99 if (joiners != null) { | |
100 for (Fiber joiner in joiners) _resumeFiber(joiner); | |
101 joiners.clear(); | |
102 } | |
103 | |
104 fiber._isDone = true; | |
105 fiber._result = value; | |
106 | |
107 // Suspend the current fiber. It will never wake up again. | |
108 Fiber next = _suspendFiber(fiber, true); | |
109 fiber._coroutine = null; | |
110 _schedule(next); | |
111 } | |
112 | |
113 static void _resumeFiber(Fiber fiber) { | |
114 _idleFibers = _unlink(fiber); | |
115 _markReady(fiber); | |
116 } | |
117 | |
118 static void _markReady(Fiber fiber) { | |
119 _current = _link(fiber, _current); | |
120 } | |
121 | |
122 static Fiber _link(Fiber fiber, Fiber list) { | |
123 if (list == null) { | |
124 fiber._next = fiber; | |
125 fiber._previous = fiber; | |
126 return fiber; | |
127 } | |
128 | |
129 Fiber next = list._next; | |
130 list._next = fiber; | |
131 next._previous = fiber; | |
132 fiber._previous = list; | |
133 fiber._next = next; | |
134 return list; | |
135 } | |
136 | |
137 static Fiber _unlink(Fiber fiber) { | |
138 Fiber next = fiber._next; | |
139 if (identical(fiber, next)) { | |
140 return null; | |
141 } | |
142 | |
143 Fiber previous = fiber._previous; | |
144 previous._next = next; | |
145 next._previous = previous; | |
146 return next; | |
147 } | |
148 | |
149 static Fiber _suspendFiber(Fiber fiber, bool exiting) { | |
150 Fiber current = _current = _unlink(fiber); | |
151 | |
152 if (exiting) { | |
153 fiber._next = null; | |
154 fiber._previous = null; | |
155 } else { | |
156 _idleFibers = _link(fiber, _idleFibers); | |
157 } | |
158 | |
159 if (current != null) return current; | |
160 | |
161 // If we don't have any idle fibers, halt. | |
162 if (exiting && _idleFibers == null) fletch.halt(); | |
163 | |
164 while (true) { | |
165 Process._handleMessages(); | |
166 // A call to _handleMessages can handle more messages than signaled, so | |
167 // we can get a following false-positive wakeup. If no new _current is | |
168 // set, simply yield again. | |
169 current = _current; | |
170 if (current != null) return current; | |
171 fletch.yield(fletch.InterruptKind.yield.index); | |
172 } | |
173 } | |
174 | |
175 static void _yieldTo(Fiber from, Fiber to) { | |
176 from._coroutine = Coroutine._coroutineCurrent(); | |
177 _schedule(to); | |
178 } | |
179 | |
180 // TODO(kasperl): This is temporary debugging support. We | |
181 // should probably replace this support for passing in an | |
182 // id of some sort when forking a fiber. | |
183 static int _count = 0; | |
184 int _index = _count++; | |
185 toString() => "fiber:$_index"; | |
186 | |
187 static void _schedule(Fiber to) { | |
188 _current = to; | |
189 fletch.coroutineChange(to._coroutine, null); | |
190 } | |
191 } | |
192 | |
193 class Coroutine { | |
194 | |
195 // TODO(kasperl): The VM has assumptions about the layout | |
196 // of coroutine fields. We should validate that the code | |
197 // agrees with those assumptions. | |
198 var _stack; | |
199 var _caller; | |
200 | |
201 Coroutine(entry) { | |
202 _stack = _coroutineNewStack(this, entry); | |
203 } | |
204 | |
205 bool get isSuspended => identical(_caller, null); | |
206 bool get isRunning => !isSuspended && !isDone; | |
207 bool get isDone => identical(_caller, this); | |
208 | |
209 call(argument) { | |
210 if (!isSuspended) throw "Cannot call non-suspended coroutine"; | |
211 _caller = _coroutineCurrent(); | |
212 var result = fletch.coroutineChange(this, argument); | |
213 | |
214 // If the called coroutine is done now, we clear the | |
215 // stack reference in it so the memory can be reclaimed. | |
216 if (isDone) { | |
217 _stack = null; | |
218 } else { | |
219 _caller = null; | |
220 } | |
221 return result; | |
222 } | |
223 | |
224 static yield(value) { | |
225 Coroutine caller = _coroutineCurrent()._caller; | |
226 if (caller == null) throw "Cannot yield outside coroutine"; | |
227 return fletch.coroutineChange(caller, value); | |
228 } | |
229 | |
230 _coroutineStart(entry) { | |
231 // The first call to changeStack is actually skipped but we need | |
232 // it to make sure the newly started coroutine is suspended in | |
233 // exactly the same way as we do when yielding. | |
234 var argument = fletch.coroutineChange(0, 0); | |
235 var result = entry(argument); | |
236 | |
237 // Mark this coroutine as done and change back to the caller. | |
238 Coroutine caller = _caller; | |
239 _caller = this; | |
240 fletch.coroutineChange(caller, result); | |
241 } | |
242 | |
243 @fletch.native external static _coroutineCurrent(); | |
244 @fletch.native external static _coroutineNewStack(coroutine, entry); | |
245 } | |
246 | |
247 class ProcessDeath { | |
248 final Process process; | |
249 final int _reason; | |
250 | |
251 ProcessDeath._(this.process, this._reason); | |
252 | |
253 DeathReason get reason => DeathReason.values[_reason]; | |
254 } | |
255 | |
256 // TODO: Keep these in sync with src/vm/process.h:Signal::Kind | |
257 enum DeathReason { | |
258 CompileTimeError, | |
259 Terminated, | |
260 UncaughtException, | |
261 UnhandledSignal, | |
262 Killed, | |
263 } | |
264 | |
265 class Process { | |
266 // This is the address of the native process/4 so that it fits in a Smi. | |
267 final int _nativeProcessHandle; | |
268 | |
269 bool operator==(other) { | |
270 return | |
271 other is Process && | |
272 other._nativeProcessHandle == _nativeProcessHandle; | |
273 } | |
274 | |
275 int get hashCode => _nativeProcessHandle.hashCode; | |
276 | |
277 @fletch.native bool link() { | |
278 throw fletch.nativeError; | |
279 } | |
280 | |
281 @fletch.native void unlink() { | |
282 switch (fletch.nativeError) { | |
283 case fletch.wrongArgumentType: | |
284 throw new StateError("Cannot unlink from parent process."); | |
285 default: | |
286 throw fletch.nativeError; | |
287 } | |
288 } | |
289 | |
290 @fletch.native bool monitor(Port port) { | |
291 switch (fletch.nativeError) { | |
292 case fletch.wrongArgumentType: | |
293 throw new StateError("The argument to monitor must be a Port object."); | |
294 default: | |
295 throw fletch.nativeError; | |
296 } | |
297 } | |
298 | |
299 @fletch.native void unmonitor(Port port) { | |
300 switch (fletch.nativeError) { | |
301 case fletch.wrongArgumentType: | |
302 throw new StateError( | |
303 "The argument to unmonitor must be a Port object."); | |
304 default: | |
305 throw fletch.nativeError; | |
306 } | |
307 } | |
308 | |
309 @fletch.native void kill() { | |
310 throw fletch.nativeError; | |
311 } | |
312 | |
313 static Process spawn(Function fn, [argument]) { | |
314 if (!isImmutable(fn)) { | |
315 throw new ArgumentError( | |
316 'The closure passed to Process.spawn() must be immutable.'); | |
317 } | |
318 | |
319 if (!isImmutable(argument)) { | |
320 throw new ArgumentError( | |
321 'The optional argument passed to Process.spawn() must be immutable.'); | |
322 } | |
323 | |
324 return _spawn(_entry, fn, argument, true, true, null); | |
325 } | |
326 | |
327 static Process spawnDetached(Function fn, {Port monitor}) { | |
328 if (!isImmutable(fn)) { | |
329 throw new ArgumentError( | |
330 'The closure passed to Process.spawnDetached() must be immutable.'); | |
331 } | |
332 | |
333 return _spawn(_entry, fn, null, true, false, monitor); | |
334 } | |
335 | |
336 /** | |
337 * Divide the elements in [arguments] into a matching number of processes | |
338 * with [fn] as entry. The current process blocks until all processes have | |
339 * terminated. | |
340 * | |
341 * The elements in [arguments] can be any immutable (see [isImmutable]) | |
342 * object. | |
343 * | |
344 * The function [fn] must be a top-level or static function. | |
345 */ | |
346 static List divide(fn(argument), List arguments) { | |
347 if (fn == null) { | |
348 throw new ArgumentError.notNull("fn"); | |
349 } | |
350 if (!isImmutable(fn)) { | |
351 throw new ArgumentError.value( | |
352 fn, "fn", "Closure passed to Process.divide must be immutable."); | |
353 } | |
354 if (arguments == null) { | |
355 throw new ArgumentError.notNull("arguments"); | |
356 } | |
357 | |
358 int length = arguments.length; | |
359 for (int i = 0; i < length; i++) { | |
360 if (!isImmutable(arguments[i])) { | |
361 throw new ArgumentError.value( | |
362 arguments[i], "@$i", | |
363 "Cannot pass mutable arguments to subprocess via Process.divide."); | |
364 } | |
365 } | |
366 | |
367 List channels = new List(length); | |
368 for (int i = 0; i < length; i++) { | |
369 channels[i] = new Channel(); | |
370 | |
371 final argument = arguments[i]; | |
372 final port = new Port(channels[i]); | |
373 Process.spawnDetached(() { | |
374 try { | |
375 Process.exit(value: fn(argument), to: port); | |
376 } finally { | |
377 // TODO(kustermann): Handle error properly. Once we do this, we can | |
378 // remove the 'fn == null' check above. | |
379 Process.exit(to: port); | |
380 } | |
381 }); | |
382 } | |
383 for (int i = 0; i < length; i++) { | |
384 channels[i] = channels[i].receive(); | |
385 } | |
386 return channels; | |
387 } | |
388 | |
389 /** | |
390 * Exit the current process. If a non-null [to] port is provided, | |
391 * the process will send the provided [value] to the [to] port as | |
392 * its final action. | |
393 */ | |
394 static void exit({value, Port to}) { | |
395 try { | |
396 if (to != null) to._sendExit(value); | |
397 } finally { | |
398 fletch.yield(fletch.InterruptKind.terminate.index); | |
399 } | |
400 } | |
401 | |
402 // Low-level entry for spawned processes. | |
403 static void _entry(fn, argument) { | |
404 if (argument == null) { | |
405 fletch.runToEnd(fn); | |
406 } else { | |
407 fletch.runToEnd(() => fn(argument)); | |
408 } | |
409 } | |
410 | |
411 // Low-level helper function for spawning. | |
412 @fletch.native static Process _spawn(Function entry, | |
413 Function fn, | |
414 argument, | |
415 bool linkToChild, | |
416 bool linkFromChild, | |
417 Port monitor) { | |
418 throw new ArgumentError(); | |
419 } | |
420 | |
421 static void _handleMessages() { | |
422 Channel channel; | |
423 while ((channel = _queueGetChannel()) != null) { | |
424 var message = _queueGetMessage(); | |
425 if (message is ProcessDeath) { | |
426 message = _queueSetupProcessDeath(message); | |
427 } | |
428 channel.send(message); | |
429 } | |
430 } | |
431 | |
432 @fletch.native external static Process get current; | |
433 @fletch.native external static _queueGetMessage(); | |
434 @fletch.native external static _queueSetupProcessDeath(ProcessDeath message); | |
435 @fletch.native external static Channel _queueGetChannel(); | |
436 } | |
437 | |
438 // Ports allow you to send messages to a channel. Ports are | |
439 // are transferable and can be sent between processes. | |
440 class Port { | |
441 // A Smi stores the aligned pointer to the C++ port object. | |
442 final int _port; | |
443 | |
444 factory Port(Channel channel) { | |
445 return Port._create(channel); | |
446 } | |
447 | |
448 // TODO(kasperl): Temporary debugging aid. | |
449 int get id => _port; | |
450 | |
451 // Send a message to the channel. Not blocking. | |
452 @fletch.native void send(message) { | |
453 switch (fletch.nativeError) { | |
454 case fletch.wrongArgumentType: | |
455 throw new ArgumentError(); | |
456 case fletch.illegalState: | |
457 throw new StateError("Port is closed."); | |
458 default: | |
459 throw fletch.nativeError; | |
460 } | |
461 } | |
462 | |
463 @fletch.native void _sendExit(value) { | |
464 throw new StateError("Port is closed."); | |
465 } | |
466 | |
467 @fletch.native external static Port _create(Channel channel); | |
468 } | |
469 | |
470 class Channel { | |
471 Fiber _receiver; // TODO(kasperl): Should this be a queue too? | |
472 | |
473 // TODO(kasperl): Maybe make this a bit smarter and store | |
474 // the elements in a growable list? Consider allowing bounds | |
475 // on the queue size. | |
476 _ChannelEntry _head; | |
477 _ChannelEntry _tail; | |
478 | |
479 // Deliver the message synchronously. If the receiver | |
480 // isn't ready to receive yet, the sender blocks. | |
481 void deliver(message) { | |
482 Fiber sender = Fiber.current; | |
483 _enqueue(new _ChannelEntry(message, sender)); | |
484 Fiber next = Fiber._suspendFiber(sender, false); | |
485 // TODO(kasperl): Should we yield to receiver if possible? | |
486 Fiber._yieldTo(sender, next); | |
487 } | |
488 | |
489 // Send a message to the channel. Not blocking. | |
490 void send(message) { | |
491 _enqueue(new _ChannelEntry(message, null)); | |
492 } | |
493 | |
494 // Receive a message. If no messages are available | |
495 // the receiver blocks. | |
496 receive() { | |
497 if (_receiver != null) { | |
498 throw new StateError("Channel cannot have multiple receivers (yet)."); | |
499 } | |
500 | |
501 if (_head == null) { | |
502 Fiber receiver = Fiber.current; | |
503 _receiver = receiver; | |
504 Fiber next = Fiber._suspendFiber(receiver, false); | |
505 Fiber._yieldTo(receiver, next); | |
506 } | |
507 | |
508 return _dequeue(); | |
509 } | |
510 | |
511 _enqueue(_ChannelEntry entry) { | |
512 if (_tail == null) { | |
513 _head = _tail = entry; | |
514 } else { | |
515 _tail = _tail.next = entry; | |
516 } | |
517 | |
518 // Signal the receiver (if any). | |
519 Fiber receiver = _receiver; | |
520 if (receiver != null) { | |
521 _receiver = null; | |
522 Fiber._resumeFiber(receiver); | |
523 } | |
524 } | |
525 | |
526 _dequeue() { | |
527 _ChannelEntry entry = _head; | |
528 _ChannelEntry next = entry.next; | |
529 _head = next; | |
530 if (next == null) _tail = next; | |
531 Fiber sender = entry.sender; | |
532 if (sender != null) Fiber._resumeFiber(sender); | |
533 return entry.message; | |
534 } | |
535 } | |
536 | |
537 class _ChannelEntry { | |
538 final message; | |
539 final Fiber sender; | |
540 _ChannelEntry next; | |
541 _ChannelEntry(this.message, this.sender); | |
542 } | |
543 | |
544 bool isImmutable(Object object) => _isImmutable(object); | |
545 | |
546 @fletch.native external bool _isImmutable(String string); | |
547 | |
548 /// Returns a channel that will receive a message in [milliseconds] | |
549 /// milliseconds. | |
550 // TODO(sigurdm): Move this function? | |
551 Channel sleep(int milliseconds) { | |
552 if (milliseconds is! int) throw new ArgumentError(milliseconds); | |
553 Channel channel = new Channel(); | |
554 Port port = new Port(channel); | |
555 _sleep(milliseconds, port); | |
556 return channel; | |
557 } | |
558 | |
559 @fletch.native external void _sleep(int milliseconds, Port port); | |
OLD | NEW |