Index: lib/isolaterunner.dart |
diff --git a/lib/isolaterunner.dart b/lib/isolaterunner.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..7e15a406abe2bec1c5c4dd0d33e92172eba92e40 |
--- /dev/null |
+++ b/lib/isolaterunner.dart |
@@ -0,0 +1,320 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library dart.pkg.isolate.isolaterunner; |
+ |
+import "dart:isolate"; |
+import "dart:async"; |
+import "runner.dart"; |
+import "ports.dart"; |
+import "src/functionref.dart"; |
+import "src/lists.dart"; |
+ |
+// Command tags. Shared between IsolateRunner and IsolateRunnerRemote. |
+const int _SHUTDOWN = 0; |
+const int _RUN = 1; |
+ |
+/** |
+ * An easier to use interface on top of an [Isolate]. |
+ * |
+ * Wraps an `Isolate` and allows pausing, killing and inspecting |
+ * the isolate more conveniently than the raw `Isolate` methods. |
+ * |
+ * Also allows running simple functions in the other isolate, and get back |
+ * the result. |
+ */ |
+class IsolateRunner implements Runner { |
+ /** The underlying [Isolate] object of the isolate being controlled. */ |
+ final Isolate isolate; |
+ |
+ /** Command port for the [IsolateRunnerRemote]. */ |
+ final SendPort _commandPort; |
+ |
+ /** Future returned by [onExit]. Set when [onExit] is first read. */ |
+ Future _onExitFuture; |
+ |
+ /** |
+ * Create an [IsolateRunner] wrapper for [isolate] |
+ * |
+ * The preferred way to create an `IsolateRunner` is to use [spawn] |
+ * to create a new isolate and a runner for it. |
+ * |
+ * This constructor allows creating a runner for an already existing |
+ * isolate. |
+ * The [commandPort] must be the [IsolateRunnerRemote.commandPort] of |
+ * a remote running in that isolate. |
+ */ |
+ IsolateRunner(this.isolate, SendPort commandPort) |
+ : _commandPort = commandPort; |
+ |
+ /** |
+ * Create a new [Isolate], as by [Isolate.spawn] and wrap that. |
+ * |
+ * The returned [IsolateRunner] forwards operations to the new isolate, |
+ * and keeps a port open in the new isolate that receives commands |
+ * from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when |
+ * it's no longer needed. |
+ */ |
+ static Future<IsolateRunner> spawn() { |
+ Completer portCompleter = new Completer.sync(); |
+ SendPort initPort = singleCompletePort(portCompleter); |
+ return Isolate.spawn(IsolateRunnerRemote._create, initPort) |
+ .then((Isolate isolate) { |
+ // TODO: Add when VM supports it. |
+ // isolate.setErrorsFatal(false); |
+ return portCompleter.future.then((SendPort commandPort) { |
+ var result = new IsolateRunner(isolate, commandPort); |
+ // Guarantees that setErrorsFatal has completed. |
+ return result.ping().then((_) => result); |
+ }); |
+ }); |
+ } |
+ |
+ /** |
+ * Closes the `IsolateRunner` communication down. |
+ * |
+ * If the isolate isn't running something else to keep it alive, |
+ * this will also make the isolate shut down. |
+ * |
+ * Can be used to create an isolate, use [run] to start a service, and |
+ * then drop the connection and let the service control the isolate's |
+ * life cycle. |
+ */ |
+ Future close() { |
+ Completer portCompleter = new Completer.sync(); |
+ SendPort closePort = singleCallbackPort(portCompleter.complete); |
+ _commandPort.send(list2(_SHUTDOWN, closePort)); |
+ return portCompleter.future; |
+ } |
+ |
+ /** |
+ * Kills the isolate. |
+ * |
+ * Starts by calling [close], but if that doesn't cause the isolate to |
+ * shut down in a timely manner, as given by [timeout], it follows up |
+ * with [Isolate.kill], with increasing urgency if necessary. |
+ * |
+ * If [timeout] is a zero duration, it goes directly to the most urgent |
+ * kill. |
+ * |
+ * If the isolate is already dead, the returned future will not complete. |
+ * If that may be the case, use [Future.timeout] on the returned future |
+ * to take extra action after a while. Example: |
+ * |
+ * var f = isolate.kill(); |
+ * f.then((_) => print('Dead') |
+ * .timeout(new Duration(...), onTimeout: () => print('No response')); |
+ */ |
+ Future kill({Duration timeout: const Duration(seconds: 1)}) { |
+ Future onExit = singleResponseFuture(isolate.addOnExitListener); |
+ if (Duration.ZERO == timeout) { |
+ isolate.kill(Isolate.IMMEDIATE); |
+ return onExit; |
+ } else { |
+ // Try a more gentle shutdown sequence. |
+ _commandPort.send(list1(_SHUTDOWN)); |
+ return onExit.timeout(timeout, onTimeout: () { |
+ isolate.kill(Isolate.IMMEDIATE); |
+ return onExit; |
+ }); |
+ } |
+ } |
+ |
+ /** |
+ * Queries the isolate on whether it's alive. |
+ * |
+ * If the isolate is alive and responding to commands, the |
+ * returned future completes with `true`. |
+ * |
+ * If the other isolate is not alive (like after calling [kill]), |
+ * or doesn't answer within [timeout] for any other reason, |
+ * the returned future completes with `false`. |
+ * |
+ * Guaranteed to only complete after all previous sent isolate commands |
+ * (like pause and resume) have been handled. |
+ * Paused isolates do respond to ping requests. |
+ */ |
+ Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) { |
+ Completer completer = new Completer<bool>(); |
+ SendPort port = singleCompletePort(completer, |
+ callback: _kTrue, |
+ timeout: timeout, |
+ onTimeout: _kFalse); |
+ isolate.ping(port); |
+ return completer.future; |
+ } |
+ |
+ static bool _kTrue(_) => true; |
+ static bool _kFalse() => false; |
+ |
+ /** |
+ * Pauses the isolate. |
+ * |
+ * While paused, no normal messages are processed, and calls to [run] will |
+ * be delayed until the isolate is resumed. |
+ * |
+ * Commands like [kill] and [ping] are still executed while the isolate is |
+ * paused. |
+ * |
+ * If [resumeCapability] is omitted, it defaults to the [isolate]'s |
+ * [Isolate.pauseCapability]. |
+ * |
+ * Calling pause more than once with the same `resumeCapability` |
+ * has no further effect. Only a single call to [resume] is needed |
+ * to resume the isolate. |
+ */ |
+ void pause([Capability resumeCapability]) { |
+ if (resumeCapability == null) resumeCapability = isolate.pauseCapability; |
+ isolate.pause(resumeCapability); |
+ } |
+ |
+ /** |
+ * Resumes after a [pause]. |
+ * |
+ * If [resumeCapability] is omitted, it defaults to the isolate's |
+ * [Isolate.pauseCapability]. |
+ * |
+ * Even if `pause` has been called more than once with the same |
+ * `resumeCapability`, a single resume call with stop the pause. |
+ */ |
+ void resume([Capability resumeCapability]) { |
+ if (resumeCapability == null) resumeCapability = isolate.pauseCapability; |
+ isolate.resume(resumeCapability); |
+ } |
+ |
+ /** |
+ * Execute `function(argument)` in the isolate and return the result. |
+ * |
+ * Sends [function] and [argument] to the isolate, runs the call, and |
+ * returns the result, whether it returned a value or threw. |
+ * If the call returns a [Future], the final result of that future |
+ * will be returned. |
+ * |
+ * This works similar to the arguments to [Isolate.spawn], except that |
+ * it runs in the existing isolate and the return value is returned to |
+ * the caller. |
+ * |
+ * Example: |
+ * |
+ * IsolateRunner iso = await IsolateRunner.spawn(); |
+ * try { |
+ * return await iso.run(heavyComputation, argument); |
+ * } finally { |
+ * await iso.close(); |
+ * } |
+ */ |
+ Future run(function(argument), argument, |
+ {Duration timeout, onTimeout()}) { |
+ return singleResultFuture((SendPort port) { |
+ _commandPort.send( |
+ list4(_RUN, FunctionRef.from(function), argument, port)); |
+ }, timeout: timeout, onTimeout: onTimeout); |
+ } |
+ |
+ /** |
+ * A broadcast stream of uncaught errors from the isolate. |
+ * |
+ * When listening on the stream, errors from the isolate will be reported |
+ * as errors in the stream. Be ready to handle the errors. |
+ * |
+ * The stream closes when the isolate shuts down. |
+ */ |
+ Stream get errors { |
+ StreamController controller; |
+ RawReceivePort port; |
+ void handleError(message) { |
+ if (message == null) { |
+ // Isolate shutdown. |
+ port.close(); |
+ controller.close(); |
+ } else { |
+ // Uncaught error. |
+ String errorDescription = message[0]; |
+ String stackDescription = message[1]; |
+ var error = new RemoteError(errorDescription, stackDescription); |
+ controller.addError(error, error.stackTrace); |
+ } |
+ } |
+ controller = new StreamController.broadcast( |
+ sync: true, |
+ onListen: () { |
+ port = new RawReceivePort(handleError); |
+ // TODO: When supported, uncomment this. |
+ // isolate.addErrorListener(port.sendPort); |
+ // isolate.addOnExitListener(port.sendPort); |
+ // And remove the send below, which acts as an immediate close. |
+ port.sendPort.send(null); |
+ }, |
+ onCancel: () { |
+ port.close(); |
+ // this.removeErrorListener(port.sendPort); |
+ // this.removeOnExitListener(port.sendPort); |
+ port = null; |
+ }); |
+ return controller.stream; |
+ } |
+ |
+ /** |
+ * Waits for the [isolate] to terminate. |
+ * |
+ * Completes the returned future when the isolate terminates. |
+ * |
+ * If the isolate has already stopped responding to commands, |
+ * the returned future will never terminate. |
+ */ |
+ Future get onExit { |
+ // TODO(lrn): Is there a way to see if an isolate is dead |
+ // so we can close the receive port for this future? |
+ if (_onExitFuture == null) { |
+ _onExitFuture = singleResponseFuture(isolate.addOnExitListener); |
+ } |
+ return _onExitFuture; |
+ } |
+} |
+ |
+/** |
+ * The remote part of an [IsolateRunner]. |
+ * |
+ * The `IsolateRunner` sends commands to the controlled isolate through |
+ * the `IsolateRunnerRemote` [commandPort]. |
+ * |
+ * Only use this class if you need to set up the isolate manually |
+ * instead of relying on [IsolateRunner.spawn]. |
+ */ |
+class IsolateRunnerRemote { |
+ final RawReceivePort _commandPort = new RawReceivePort(); |
+ IsolateRunnerRemote() { |
+ _commandPort.handler = _handleCommand; |
+ } |
+ |
+ /** |
+ * The command port that can be used to send commands to this remote. |
+ * |
+ * Use this as argument to [new IsolateRunner] if creating the link |
+ * manually, otherwise it's handled by [IsolateRunner.spawn]. |
+ */ |
+ SendPort get commandPort => _commandPort.sendPort; |
+ |
+ static void _create(SendPort initPort) { |
+ var remote = new IsolateRunnerRemote(); |
+ initPort.send(remote.commandPort); |
+ } |
+ |
+ void _handleCommand(List command) { |
+ switch (command[0]) { |
+ case _SHUTDOWN: |
+ SendPort responsePort = command[1]; |
+ _commandPort.close(); |
+ responsePort.send(null); |
+ return; |
+ case _RUN: |
+ Function function = command[1].function; |
+ var argument = command[2]; |
+ SendPort responsePort = command[3]; |
+ sendFutureResult(new Future.sync(() => function(argument)), |
+ responsePort); |
+ return; |
+ } |
+ } |
+} |