Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(225)

Unified Diff: lib/isolaterunner.dart

Issue 928663003: Add IsolateRunner as a helper around Isolate. (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Add .status. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/isolate.dart ('k') | lib/loadbalancer.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/isolaterunner.dart
diff --git a/lib/isolaterunner.dart b/lib/isolaterunner.dart
new file mode 100644
index 0000000000000000000000000000000000000000..7e15a406abe2bec1c5c4dd0d33e92172eba92e40
--- /dev/null
+++ b/lib/isolaterunner.dart
@@ -0,0 +1,320 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.isolaterunner;
+
+import "dart:isolate";
+import "dart:async";
+import "runner.dart";
+import "ports.dart";
+import "src/functionref.dart";
+import "src/lists.dart";
+
+// Command tags. Shared between IsolateRunner and IsolateRunnerRemote.
+const int _SHUTDOWN = 0;
+const int _RUN = 1;
+
+/**
+ * An easier to use interface on top of an [Isolate].
+ *
+ * Wraps an `Isolate` and allows pausing, killing and inspecting
+ * the isolate more conveniently than the raw `Isolate` methods.
+ *
+ * Also allows running simple functions in the other isolate, and get back
+ * the result.
+ */
+class IsolateRunner implements Runner {
+ /** The underlying [Isolate] object of the isolate being controlled. */
+ final Isolate isolate;
+
+ /** Command port for the [IsolateRunnerRemote]. */
+ final SendPort _commandPort;
+
+ /** Future returned by [onExit]. Set when [onExit] is first read. */
+ Future _onExitFuture;
+
+ /**
+ * Create an [IsolateRunner] wrapper for [isolate]
+ *
+ * The preferred way to create an `IsolateRunner` is to use [spawn]
+ * to create a new isolate and a runner for it.
+ *
+ * This constructor allows creating a runner for an already existing
+ * isolate.
+ * The [commandPort] must be the [IsolateRunnerRemote.commandPort] of
+ * a remote running in that isolate.
+ */
+ IsolateRunner(this.isolate, SendPort commandPort)
+ : _commandPort = commandPort;
+
+ /**
+ * Create a new [Isolate], as by [Isolate.spawn] and wrap that.
+ *
+ * The returned [IsolateRunner] forwards operations to the new isolate,
+ * and keeps a port open in the new isolate that receives commands
+ * from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when
+ * it's no longer needed.
+ */
+ static Future<IsolateRunner> spawn() {
+ Completer portCompleter = new Completer.sync();
+ SendPort initPort = singleCompletePort(portCompleter);
+ return Isolate.spawn(IsolateRunnerRemote._create, initPort)
+ .then((Isolate isolate) {
+ // TODO: Add when VM supports it.
+ // isolate.setErrorsFatal(false);
+ return portCompleter.future.then((SendPort commandPort) {
+ var result = new IsolateRunner(isolate, commandPort);
+ // Guarantees that setErrorsFatal has completed.
+ return result.ping().then((_) => result);
+ });
+ });
+ }
+
+ /**
+ * Closes the `IsolateRunner` communication down.
+ *
+ * If the isolate isn't running something else to keep it alive,
+ * this will also make the isolate shut down.
+ *
+ * Can be used to create an isolate, use [run] to start a service, and
+ * then drop the connection and let the service control the isolate's
+ * life cycle.
+ */
+ Future close() {
+ Completer portCompleter = new Completer.sync();
+ SendPort closePort = singleCallbackPort(portCompleter.complete);
+ _commandPort.send(list2(_SHUTDOWN, closePort));
+ return portCompleter.future;
+ }
+
+ /**
+ * Kills the isolate.
+ *
+ * Starts by calling [close], but if that doesn't cause the isolate to
+ * shut down in a timely manner, as given by [timeout], it follows up
+ * with [Isolate.kill], with increasing urgency if necessary.
+ *
+ * If [timeout] is a zero duration, it goes directly to the most urgent
+ * kill.
+ *
+ * If the isolate is already dead, the returned future will not complete.
+ * If that may be the case, use [Future.timeout] on the returned future
+ * to take extra action after a while. Example:
+ *
+ * var f = isolate.kill();
+ * f.then((_) => print('Dead')
+ * .timeout(new Duration(...), onTimeout: () => print('No response'));
+ */
+ Future kill({Duration timeout: const Duration(seconds: 1)}) {
+ Future onExit = singleResponseFuture(isolate.addOnExitListener);
+ if (Duration.ZERO == timeout) {
+ isolate.kill(Isolate.IMMEDIATE);
+ return onExit;
+ } else {
+ // Try a more gentle shutdown sequence.
+ _commandPort.send(list1(_SHUTDOWN));
+ return onExit.timeout(timeout, onTimeout: () {
+ isolate.kill(Isolate.IMMEDIATE);
+ return onExit;
+ });
+ }
+ }
+
+ /**
+ * Queries the isolate on whether it's alive.
+ *
+ * If the isolate is alive and responding to commands, the
+ * returned future completes with `true`.
+ *
+ * If the other isolate is not alive (like after calling [kill]),
+ * or doesn't answer within [timeout] for any other reason,
+ * the returned future completes with `false`.
+ *
+ * Guaranteed to only complete after all previous sent isolate commands
+ * (like pause and resume) have been handled.
+ * Paused isolates do respond to ping requests.
+ */
+ Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) {
+ Completer completer = new Completer<bool>();
+ SendPort port = singleCompletePort(completer,
+ callback: _kTrue,
+ timeout: timeout,
+ onTimeout: _kFalse);
+ isolate.ping(port);
+ return completer.future;
+ }
+
+ static bool _kTrue(_) => true;
+ static bool _kFalse() => false;
+
+ /**
+ * Pauses the isolate.
+ *
+ * While paused, no normal messages are processed, and calls to [run] will
+ * be delayed until the isolate is resumed.
+ *
+ * Commands like [kill] and [ping] are still executed while the isolate is
+ * paused.
+ *
+ * If [resumeCapability] is omitted, it defaults to the [isolate]'s
+ * [Isolate.pauseCapability].
+ *
+ * Calling pause more than once with the same `resumeCapability`
+ * has no further effect. Only a single call to [resume] is needed
+ * to resume the isolate.
+ */
+ void pause([Capability resumeCapability]) {
+ if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
+ isolate.pause(resumeCapability);
+ }
+
+ /**
+ * Resumes after a [pause].
+ *
+ * If [resumeCapability] is omitted, it defaults to the isolate's
+ * [Isolate.pauseCapability].
+ *
+ * Even if `pause` has been called more than once with the same
+ * `resumeCapability`, a single resume call with stop the pause.
+ */
+ void resume([Capability resumeCapability]) {
+ if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
+ isolate.resume(resumeCapability);
+ }
+
+ /**
+ * Execute `function(argument)` in the isolate and return the result.
+ *
+ * Sends [function] and [argument] to the isolate, runs the call, and
+ * returns the result, whether it returned a value or threw.
+ * If the call returns a [Future], the final result of that future
+ * will be returned.
+ *
+ * This works similar to the arguments to [Isolate.spawn], except that
+ * it runs in the existing isolate and the return value is returned to
+ * the caller.
+ *
+ * Example:
+ *
+ * IsolateRunner iso = await IsolateRunner.spawn();
+ * try {
+ * return await iso.run(heavyComputation, argument);
+ * } finally {
+ * await iso.close();
+ * }
+ */
+ Future run(function(argument), argument,
+ {Duration timeout, onTimeout()}) {
+ return singleResultFuture((SendPort port) {
+ _commandPort.send(
+ list4(_RUN, FunctionRef.from(function), argument, port));
+ }, timeout: timeout, onTimeout: onTimeout);
+ }
+
+ /**
+ * A broadcast stream of uncaught errors from the isolate.
+ *
+ * When listening on the stream, errors from the isolate will be reported
+ * as errors in the stream. Be ready to handle the errors.
+ *
+ * The stream closes when the isolate shuts down.
+ */
+ Stream get errors {
+ StreamController controller;
+ RawReceivePort port;
+ void handleError(message) {
+ if (message == null) {
+ // Isolate shutdown.
+ port.close();
+ controller.close();
+ } else {
+ // Uncaught error.
+ String errorDescription = message[0];
+ String stackDescription = message[1];
+ var error = new RemoteError(errorDescription, stackDescription);
+ controller.addError(error, error.stackTrace);
+ }
+ }
+ controller = new StreamController.broadcast(
+ sync: true,
+ onListen: () {
+ port = new RawReceivePort(handleError);
+ // TODO: When supported, uncomment this.
+ // isolate.addErrorListener(port.sendPort);
+ // isolate.addOnExitListener(port.sendPort);
+ // And remove the send below, which acts as an immediate close.
+ port.sendPort.send(null);
+ },
+ onCancel: () {
+ port.close();
+ // this.removeErrorListener(port.sendPort);
+ // this.removeOnExitListener(port.sendPort);
+ port = null;
+ });
+ return controller.stream;
+ }
+
+ /**
+ * Waits for the [isolate] to terminate.
+ *
+ * Completes the returned future when the isolate terminates.
+ *
+ * If the isolate has already stopped responding to commands,
+ * the returned future will never terminate.
+ */
+ Future get onExit {
+ // TODO(lrn): Is there a way to see if an isolate is dead
+ // so we can close the receive port for this future?
+ if (_onExitFuture == null) {
+ _onExitFuture = singleResponseFuture(isolate.addOnExitListener);
+ }
+ return _onExitFuture;
+ }
+}
+
+/**
+ * The remote part of an [IsolateRunner].
+ *
+ * The `IsolateRunner` sends commands to the controlled isolate through
+ * the `IsolateRunnerRemote` [commandPort].
+ *
+ * Only use this class if you need to set up the isolate manually
+ * instead of relying on [IsolateRunner.spawn].
+ */
+class IsolateRunnerRemote {
+ final RawReceivePort _commandPort = new RawReceivePort();
+ IsolateRunnerRemote() {
+ _commandPort.handler = _handleCommand;
+ }
+
+ /**
+ * The command port that can be used to send commands to this remote.
+ *
+ * Use this as argument to [new IsolateRunner] if creating the link
+ * manually, otherwise it's handled by [IsolateRunner.spawn].
+ */
+ SendPort get commandPort => _commandPort.sendPort;
+
+ static void _create(SendPort initPort) {
+ var remote = new IsolateRunnerRemote();
+ initPort.send(remote.commandPort);
+ }
+
+ void _handleCommand(List command) {
+ switch (command[0]) {
+ case _SHUTDOWN:
+ SendPort responsePort = command[1];
+ _commandPort.close();
+ responsePort.send(null);
+ return;
+ case _RUN:
+ Function function = command[1].function;
+ var argument = command[2];
+ SendPort responsePort = command[3];
+ sendFutureResult(new Future.sync(() => function(argument)),
+ responsePort);
+ return;
+ }
+ }
+}
« no previous file with comments | « lib/isolate.dart ('k') | lib/loadbalancer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698