| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2015, the Dartino project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE.md file. | |
| 4 | |
| 5 library dart.fletch; | |
| 6 | |
| 7 import 'dart:fletch._system' as fletch; | |
| 8 | |
| 9 /// Fibers are lightweight co-operative multitask units of execution. They | |
| 10 /// are scheduled on top of OS-level threads, but they are cheap to create | |
| 11 /// and block. | |
| 12 class Fiber { | |
| 13 | |
| 14 // We keep track of the top of the coroutine stack and | |
| 15 // the list of other fibers that are waiting for this | |
| 16 // fiber to exit. | |
| 17 Coroutine _coroutine; | |
| 18 List<Fiber> _joiners; | |
| 19 | |
| 20 // When a fiber exits, we keep the result of running | |
| 21 // its code around so we can return to other fibers | |
| 22 // that join it. | |
| 23 bool _isDone = false; | |
| 24 var _result; | |
| 25 | |
| 26 // The ready fibers are linked together. | |
| 27 Fiber _previous; | |
| 28 Fiber _next; | |
| 29 | |
| 30 // We do not use an initializer for [_current] to ensure we can treeshake | |
| 31 // the [Fiber] class. | |
| 32 static Fiber _current; | |
| 33 static Fiber _idleFibers; | |
| 34 | |
| 35 Fiber._initial() { | |
| 36 _previous = this; | |
| 37 _next = this; | |
| 38 } | |
| 39 | |
| 40 Fiber._forked(entry) { | |
| 41 current; // Force initialization of fiber sub-system. | |
| 42 _coroutine = new Coroutine((ignore) { | |
| 43 fletch.runToEnd(entry); | |
| 44 }); | |
| 45 } | |
| 46 | |
| 47 static Fiber get current { | |
| 48 var current = _current; | |
| 49 if (current != null) return current; | |
| 50 return _current = new Fiber._initial(); | |
| 51 } | |
| 52 | |
| 53 static Fiber fork(entry) { | |
| 54 Fiber fiber = new Fiber._forked(entry); | |
| 55 _markReady(fiber); | |
| 56 return fiber; | |
| 57 } | |
| 58 | |
| 59 static void yield() { | |
| 60 // Handle messages so that fibers that are blocked on receiving | |
| 61 // messages can wake up. | |
| 62 Process._handleMessages(); | |
| 63 _current._coroutine = Coroutine._coroutineCurrent(); | |
| 64 _schedule(_current._next); | |
| 65 } | |
| 66 | |
| 67 static void exit([value]) { | |
| 68 // If we never created a fiber, we can just go ahead and halt now. | |
| 69 if (_current == null) fletch.halt(); | |
| 70 | |
| 71 _current._exit(value); | |
| 72 } | |
| 73 | |
| 74 join() { | |
| 75 // If the fiber is already done, we just return the result. | |
| 76 if (_isDone) return _result; | |
| 77 | |
| 78 // Add the current fiber to the list of fibers waiting | |
| 79 // to join [this] fiber. | |
| 80 Fiber fiber = _current; | |
| 81 if (_joiners == null) { | |
| 82 _joiners = [ fiber ]; | |
| 83 } else { | |
| 84 _joiners.add(fiber); | |
| 85 } | |
| 86 | |
| 87 // Suspend the current fiber and change to the scheduler. | |
| 88 // When we get back, the [this] fiber has exited and we | |
| 89 // can go ahead and return the result. | |
| 90 Fiber next = _suspendFiber(fiber, false); | |
| 91 fiber._coroutine = Coroutine._coroutineCurrent(); | |
| 92 _schedule(next); | |
| 93 return _result; | |
| 94 } | |
| 95 | |
| 96 void _exit(value) { | |
| 97 Fiber fiber = this; | |
| 98 List<Fiber> joiners = fiber._joiners; | |
| 99 if (joiners != null) { | |
| 100 for (Fiber joiner in joiners) _resumeFiber(joiner); | |
| 101 joiners.clear(); | |
| 102 } | |
| 103 | |
| 104 fiber._isDone = true; | |
| 105 fiber._result = value; | |
| 106 | |
| 107 // Suspend the current fiber. It will never wake up again. | |
| 108 Fiber next = _suspendFiber(fiber, true); | |
| 109 fiber._coroutine = null; | |
| 110 _schedule(next); | |
| 111 } | |
| 112 | |
| 113 static void _resumeFiber(Fiber fiber) { | |
| 114 _idleFibers = _unlink(fiber); | |
| 115 _markReady(fiber); | |
| 116 } | |
| 117 | |
| 118 static void _markReady(Fiber fiber) { | |
| 119 _current = _link(fiber, _current); | |
| 120 } | |
| 121 | |
| 122 static Fiber _link(Fiber fiber, Fiber list) { | |
| 123 if (list == null) { | |
| 124 fiber._next = fiber; | |
| 125 fiber._previous = fiber; | |
| 126 return fiber; | |
| 127 } | |
| 128 | |
| 129 Fiber next = list._next; | |
| 130 list._next = fiber; | |
| 131 next._previous = fiber; | |
| 132 fiber._previous = list; | |
| 133 fiber._next = next; | |
| 134 return list; | |
| 135 } | |
| 136 | |
| 137 static Fiber _unlink(Fiber fiber) { | |
| 138 Fiber next = fiber._next; | |
| 139 if (identical(fiber, next)) { | |
| 140 return null; | |
| 141 } | |
| 142 | |
| 143 Fiber previous = fiber._previous; | |
| 144 previous._next = next; | |
| 145 next._previous = previous; | |
| 146 return next; | |
| 147 } | |
| 148 | |
| 149 static Fiber _suspendFiber(Fiber fiber, bool exiting) { | |
| 150 Fiber current = _current = _unlink(fiber); | |
| 151 | |
| 152 if (exiting) { | |
| 153 fiber._next = null; | |
| 154 fiber._previous = null; | |
| 155 } else { | |
| 156 _idleFibers = _link(fiber, _idleFibers); | |
| 157 } | |
| 158 | |
| 159 if (current != null) return current; | |
| 160 | |
| 161 // If we don't have any idle fibers, halt. | |
| 162 if (exiting && _idleFibers == null) fletch.halt(); | |
| 163 | |
| 164 while (true) { | |
| 165 Process._handleMessages(); | |
| 166 // A call to _handleMessages can handle more messages than signaled, so | |
| 167 // we can get a following false-positive wakeup. If no new _current is | |
| 168 // set, simply yield again. | |
| 169 current = _current; | |
| 170 if (current != null) return current; | |
| 171 fletch.yield(fletch.InterruptKind.yield.index); | |
| 172 } | |
| 173 } | |
| 174 | |
| 175 static void _yieldTo(Fiber from, Fiber to) { | |
| 176 from._coroutine = Coroutine._coroutineCurrent(); | |
| 177 _schedule(to); | |
| 178 } | |
| 179 | |
| 180 // TODO(kasperl): This is temporary debugging support. We | |
| 181 // should probably replace this support for passing in an | |
| 182 // id of some sort when forking a fiber. | |
| 183 static int _count = 0; | |
| 184 int _index = _count++; | |
| 185 toString() => "fiber:$_index"; | |
| 186 | |
| 187 static void _schedule(Fiber to) { | |
| 188 _current = to; | |
| 189 fletch.coroutineChange(to._coroutine, null); | |
| 190 } | |
| 191 } | |
| 192 | |
| 193 class Coroutine { | |
| 194 | |
| 195 // TODO(kasperl): The VM has assumptions about the layout | |
| 196 // of coroutine fields. We should validate that the code | |
| 197 // agrees with those assumptions. | |
| 198 var _stack; | |
| 199 var _caller; | |
| 200 | |
| 201 Coroutine(entry) { | |
| 202 _stack = _coroutineNewStack(this, entry); | |
| 203 } | |
| 204 | |
| 205 bool get isSuspended => identical(_caller, null); | |
| 206 bool get isRunning => !isSuspended && !isDone; | |
| 207 bool get isDone => identical(_caller, this); | |
| 208 | |
| 209 call(argument) { | |
| 210 if (!isSuspended) throw "Cannot call non-suspended coroutine"; | |
| 211 _caller = _coroutineCurrent(); | |
| 212 var result = fletch.coroutineChange(this, argument); | |
| 213 | |
| 214 // If the called coroutine is done now, we clear the | |
| 215 // stack reference in it so the memory can be reclaimed. | |
| 216 if (isDone) { | |
| 217 _stack = null; | |
| 218 } else { | |
| 219 _caller = null; | |
| 220 } | |
| 221 return result; | |
| 222 } | |
| 223 | |
| 224 static yield(value) { | |
| 225 Coroutine caller = _coroutineCurrent()._caller; | |
| 226 if (caller == null) throw "Cannot yield outside coroutine"; | |
| 227 return fletch.coroutineChange(caller, value); | |
| 228 } | |
| 229 | |
| 230 _coroutineStart(entry) { | |
| 231 // The first call to changeStack is actually skipped but we need | |
| 232 // it to make sure the newly started coroutine is suspended in | |
| 233 // exactly the same way as we do when yielding. | |
| 234 var argument = fletch.coroutineChange(0, 0); | |
| 235 var result = entry(argument); | |
| 236 | |
| 237 // Mark this coroutine as done and change back to the caller. | |
| 238 Coroutine caller = _caller; | |
| 239 _caller = this; | |
| 240 fletch.coroutineChange(caller, result); | |
| 241 } | |
| 242 | |
| 243 @fletch.native external static _coroutineCurrent(); | |
| 244 @fletch.native external static _coroutineNewStack(coroutine, entry); | |
| 245 } | |
| 246 | |
| 247 class ProcessDeath { | |
| 248 final Process process; | |
| 249 final int _reason; | |
| 250 | |
| 251 ProcessDeath._(this.process, this._reason); | |
| 252 | |
| 253 DeathReason get reason => DeathReason.values[_reason]; | |
| 254 } | |
| 255 | |
| 256 // TODO: Keep these in sync with src/vm/process.h:Signal::Kind | |
| 257 enum DeathReason { | |
| 258 CompileTimeError, | |
| 259 Terminated, | |
| 260 UncaughtException, | |
| 261 UnhandledSignal, | |
| 262 Killed, | |
| 263 } | |
| 264 | |
| 265 class Process { | |
| 266 // This is the address of the native process/4 so that it fits in a Smi. | |
| 267 final int _nativeProcessHandle; | |
| 268 | |
| 269 bool operator==(other) { | |
| 270 return | |
| 271 other is Process && | |
| 272 other._nativeProcessHandle == _nativeProcessHandle; | |
| 273 } | |
| 274 | |
| 275 int get hashCode => _nativeProcessHandle.hashCode; | |
| 276 | |
| 277 @fletch.native bool link() { | |
| 278 throw fletch.nativeError; | |
| 279 } | |
| 280 | |
| 281 @fletch.native void unlink() { | |
| 282 switch (fletch.nativeError) { | |
| 283 case fletch.wrongArgumentType: | |
| 284 throw new StateError("Cannot unlink from parent process."); | |
| 285 default: | |
| 286 throw fletch.nativeError; | |
| 287 } | |
| 288 } | |
| 289 | |
| 290 @fletch.native bool monitor(Port port) { | |
| 291 switch (fletch.nativeError) { | |
| 292 case fletch.wrongArgumentType: | |
| 293 throw new StateError("The argument to monitor must be a Port object."); | |
| 294 default: | |
| 295 throw fletch.nativeError; | |
| 296 } | |
| 297 } | |
| 298 | |
| 299 @fletch.native void unmonitor(Port port) { | |
| 300 switch (fletch.nativeError) { | |
| 301 case fletch.wrongArgumentType: | |
| 302 throw new StateError( | |
| 303 "The argument to unmonitor must be a Port object."); | |
| 304 default: | |
| 305 throw fletch.nativeError; | |
| 306 } | |
| 307 } | |
| 308 | |
| 309 @fletch.native void kill() { | |
| 310 throw fletch.nativeError; | |
| 311 } | |
| 312 | |
| 313 static Process spawn(Function fn, [argument]) { | |
| 314 if (!isImmutable(fn)) { | |
| 315 throw new ArgumentError( | |
| 316 'The closure passed to Process.spawn() must be immutable.'); | |
| 317 } | |
| 318 | |
| 319 if (!isImmutable(argument)) { | |
| 320 throw new ArgumentError( | |
| 321 'The optional argument passed to Process.spawn() must be immutable.'); | |
| 322 } | |
| 323 | |
| 324 return _spawn(_entry, fn, argument, true, true, null); | |
| 325 } | |
| 326 | |
| 327 static Process spawnDetached(Function fn, {Port monitor}) { | |
| 328 if (!isImmutable(fn)) { | |
| 329 throw new ArgumentError( | |
| 330 'The closure passed to Process.spawnDetached() must be immutable.'); | |
| 331 } | |
| 332 | |
| 333 return _spawn(_entry, fn, null, true, false, monitor); | |
| 334 } | |
| 335 | |
| 336 /** | |
| 337 * Divide the elements in [arguments] into a matching number of processes | |
| 338 * with [fn] as entry. The current process blocks until all processes have | |
| 339 * terminated. | |
| 340 * | |
| 341 * The elements in [arguments] can be any immutable (see [isImmutable]) | |
| 342 * object. | |
| 343 * | |
| 344 * The function [fn] must be a top-level or static function. | |
| 345 */ | |
| 346 static List divide(fn(argument), List arguments) { | |
| 347 if (fn == null) { | |
| 348 throw new ArgumentError.notNull("fn"); | |
| 349 } | |
| 350 if (!isImmutable(fn)) { | |
| 351 throw new ArgumentError.value( | |
| 352 fn, "fn", "Closure passed to Process.divide must be immutable."); | |
| 353 } | |
| 354 if (arguments == null) { | |
| 355 throw new ArgumentError.notNull("arguments"); | |
| 356 } | |
| 357 | |
| 358 int length = arguments.length; | |
| 359 for (int i = 0; i < length; i++) { | |
| 360 if (!isImmutable(arguments[i])) { | |
| 361 throw new ArgumentError.value( | |
| 362 arguments[i], "@$i", | |
| 363 "Cannot pass mutable arguments to subprocess via Process.divide."); | |
| 364 } | |
| 365 } | |
| 366 | |
| 367 List channels = new List(length); | |
| 368 for (int i = 0; i < length; i++) { | |
| 369 channels[i] = new Channel(); | |
| 370 | |
| 371 final argument = arguments[i]; | |
| 372 final port = new Port(channels[i]); | |
| 373 Process.spawnDetached(() { | |
| 374 try { | |
| 375 Process.exit(value: fn(argument), to: port); | |
| 376 } finally { | |
| 377 // TODO(kustermann): Handle error properly. Once we do this, we can | |
| 378 // remove the 'fn == null' check above. | |
| 379 Process.exit(to: port); | |
| 380 } | |
| 381 }); | |
| 382 } | |
| 383 for (int i = 0; i < length; i++) { | |
| 384 channels[i] = channels[i].receive(); | |
| 385 } | |
| 386 return channels; | |
| 387 } | |
| 388 | |
| 389 /** | |
| 390 * Exit the current process. If a non-null [to] port is provided, | |
| 391 * the process will send the provided [value] to the [to] port as | |
| 392 * its final action. | |
| 393 */ | |
| 394 static void exit({value, Port to}) { | |
| 395 try { | |
| 396 if (to != null) to._sendExit(value); | |
| 397 } finally { | |
| 398 fletch.yield(fletch.InterruptKind.terminate.index); | |
| 399 } | |
| 400 } | |
| 401 | |
| 402 // Low-level entry for spawned processes. | |
| 403 static void _entry(fn, argument) { | |
| 404 if (argument == null) { | |
| 405 fletch.runToEnd(fn); | |
| 406 } else { | |
| 407 fletch.runToEnd(() => fn(argument)); | |
| 408 } | |
| 409 } | |
| 410 | |
| 411 // Low-level helper function for spawning. | |
| 412 @fletch.native static Process _spawn(Function entry, | |
| 413 Function fn, | |
| 414 argument, | |
| 415 bool linkToChild, | |
| 416 bool linkFromChild, | |
| 417 Port monitor) { | |
| 418 throw new ArgumentError(); | |
| 419 } | |
| 420 | |
| 421 static void _handleMessages() { | |
| 422 Channel channel; | |
| 423 while ((channel = _queueGetChannel()) != null) { | |
| 424 var message = _queueGetMessage(); | |
| 425 if (message is ProcessDeath) { | |
| 426 message = _queueSetupProcessDeath(message); | |
| 427 } | |
| 428 channel.send(message); | |
| 429 } | |
| 430 } | |
| 431 | |
| 432 @fletch.native external static Process get current; | |
| 433 @fletch.native external static _queueGetMessage(); | |
| 434 @fletch.native external static _queueSetupProcessDeath(ProcessDeath message); | |
| 435 @fletch.native external static Channel _queueGetChannel(); | |
| 436 } | |
| 437 | |
| 438 // Ports allow you to send messages to a channel. Ports are | |
| 439 // are transferable and can be sent between processes. | |
| 440 class Port { | |
| 441 // A Smi stores the aligned pointer to the C++ port object. | |
| 442 final int _port; | |
| 443 | |
| 444 factory Port(Channel channel) { | |
| 445 return Port._create(channel); | |
| 446 } | |
| 447 | |
| 448 // TODO(kasperl): Temporary debugging aid. | |
| 449 int get id => _port; | |
| 450 | |
| 451 // Send a message to the channel. Not blocking. | |
| 452 @fletch.native void send(message) { | |
| 453 switch (fletch.nativeError) { | |
| 454 case fletch.wrongArgumentType: | |
| 455 throw new ArgumentError(); | |
| 456 case fletch.illegalState: | |
| 457 throw new StateError("Port is closed."); | |
| 458 default: | |
| 459 throw fletch.nativeError; | |
| 460 } | |
| 461 } | |
| 462 | |
| 463 @fletch.native void _sendExit(value) { | |
| 464 throw new StateError("Port is closed."); | |
| 465 } | |
| 466 | |
| 467 @fletch.native external static Port _create(Channel channel); | |
| 468 } | |
| 469 | |
| 470 class Channel { | |
| 471 Fiber _receiver; // TODO(kasperl): Should this be a queue too? | |
| 472 | |
| 473 // TODO(kasperl): Maybe make this a bit smarter and store | |
| 474 // the elements in a growable list? Consider allowing bounds | |
| 475 // on the queue size. | |
| 476 _ChannelEntry _head; | |
| 477 _ChannelEntry _tail; | |
| 478 | |
| 479 // Deliver the message synchronously. If the receiver | |
| 480 // isn't ready to receive yet, the sender blocks. | |
| 481 void deliver(message) { | |
| 482 Fiber sender = Fiber.current; | |
| 483 _enqueue(new _ChannelEntry(message, sender)); | |
| 484 Fiber next = Fiber._suspendFiber(sender, false); | |
| 485 // TODO(kasperl): Should we yield to receiver if possible? | |
| 486 Fiber._yieldTo(sender, next); | |
| 487 } | |
| 488 | |
| 489 // Send a message to the channel. Not blocking. | |
| 490 void send(message) { | |
| 491 _enqueue(new _ChannelEntry(message, null)); | |
| 492 } | |
| 493 | |
| 494 // Receive a message. If no messages are available | |
| 495 // the receiver blocks. | |
| 496 receive() { | |
| 497 if (_receiver != null) { | |
| 498 throw new StateError("Channel cannot have multiple receivers (yet)."); | |
| 499 } | |
| 500 | |
| 501 if (_head == null) { | |
| 502 Fiber receiver = Fiber.current; | |
| 503 _receiver = receiver; | |
| 504 Fiber next = Fiber._suspendFiber(receiver, false); | |
| 505 Fiber._yieldTo(receiver, next); | |
| 506 } | |
| 507 | |
| 508 return _dequeue(); | |
| 509 } | |
| 510 | |
| 511 _enqueue(_ChannelEntry entry) { | |
| 512 if (_tail == null) { | |
| 513 _head = _tail = entry; | |
| 514 } else { | |
| 515 _tail = _tail.next = entry; | |
| 516 } | |
| 517 | |
| 518 // Signal the receiver (if any). | |
| 519 Fiber receiver = _receiver; | |
| 520 if (receiver != null) { | |
| 521 _receiver = null; | |
| 522 Fiber._resumeFiber(receiver); | |
| 523 } | |
| 524 } | |
| 525 | |
| 526 _dequeue() { | |
| 527 _ChannelEntry entry = _head; | |
| 528 _ChannelEntry next = entry.next; | |
| 529 _head = next; | |
| 530 if (next == null) _tail = next; | |
| 531 Fiber sender = entry.sender; | |
| 532 if (sender != null) Fiber._resumeFiber(sender); | |
| 533 return entry.message; | |
| 534 } | |
| 535 } | |
| 536 | |
| 537 class _ChannelEntry { | |
| 538 final message; | |
| 539 final Fiber sender; | |
| 540 _ChannelEntry next; | |
| 541 _ChannelEntry(this.message, this.sender); | |
| 542 } | |
| 543 | |
| 544 bool isImmutable(Object object) => _isImmutable(object); | |
| 545 | |
| 546 @fletch.native external bool _isImmutable(String string); | |
| 547 | |
| 548 /// Returns a channel that will receive a message in [milliseconds] | |
| 549 /// milliseconds. | |
| 550 // TODO(sigurdm): Move this function? | |
| 551 Channel sleep(int milliseconds) { | |
| 552 if (milliseconds is! int) throw new ArgumentError(milliseconds); | |
| 553 Channel channel = new Channel(); | |
| 554 Port port = new Port(channel); | |
| 555 _sleep(milliseconds, port); | |
| 556 return channel; | |
| 557 } | |
| 558 | |
| 559 @fletch.native external void _sleep(int milliseconds, Port port); | |
| OLD | NEW |