OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library dart.pkg.isolate.isolaterunner; |
| 6 |
| 7 import "dart:isolate"; |
| 8 import "dart:async"; |
| 9 import "runner.dart"; |
| 10 import "ports.dart"; |
| 11 import "src/functionref.dart"; |
| 12 import "src/lists.dart"; |
| 13 |
| 14 // Command tags. Shared between IsolateRunner and IsolateRunnerRemote. |
| 15 const int _SHUTDOWN = 0; |
| 16 const int _RUN = 1; |
| 17 |
| 18 /** |
| 19 * An easier to use interface on top of an [Isolate]. |
| 20 * |
| 21 * Wraps an `Isolate` and allows pausing, killing and inspecting |
| 22 * the isolate more conveniently than the raw `Isolate` methods. |
| 23 * |
| 24 * Also allows running simple functions in the other isolate, and get back |
| 25 * the result. |
| 26 */ |
| 27 class IsolateRunner implements Runner { |
| 28 /** The underlying [Isolate] object of the isolate being controlled. */ |
| 29 final Isolate isolate; |
| 30 |
| 31 /** Command port for the [IsolateRunnerRemote]. */ |
| 32 final SendPort _commandPort; |
| 33 |
| 34 /** Future returned by [onExit]. Set when [onExit] is first read. */ |
| 35 Future _onExitFuture; |
| 36 |
| 37 /** |
| 38 * Create an [IsolateRunner] wrapper for [isolate] |
| 39 * |
| 40 * The preferred way to create an `IsolateRunner` is to use [spawn] |
| 41 * to create a new isolate and a runner for it. |
| 42 * |
| 43 * This constructor allows creating a runner for an already existing |
| 44 * isolate. |
| 45 * The [commandPort] must be the [IsolateRunnerRemote.commandPort] of |
| 46 * a remote running in that isolate. |
| 47 */ |
| 48 IsolateRunner(this.isolate, SendPort commandPort) |
| 49 : _commandPort = commandPort; |
| 50 |
| 51 /** |
| 52 * Create a new [Isolate], as by [Isolate.spawn] and wrap that. |
| 53 * |
| 54 * The returned [IsolateRunner] forwards operations to the new isolate, |
| 55 * and keeps a port open in the new isolate that receives commands |
| 56 * from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when |
| 57 * it's no longer needed. |
| 58 */ |
| 59 static Future<IsolateRunner> spawn() { |
| 60 Completer portCompleter = new Completer.sync(); |
| 61 SendPort initPort = singleCompletePort(portCompleter); |
| 62 return Isolate.spawn(IsolateRunnerRemote._create, initPort) |
| 63 .then((Isolate isolate) { |
| 64 // TODO: Add when VM supports it. |
| 65 // isolate.setErrorsFatal(false); |
| 66 return portCompleter.future.then((SendPort commandPort) { |
| 67 var result = new IsolateRunner(isolate, commandPort); |
| 68 // Guarantees that setErrorsFatal has completed. |
| 69 return result.ping().then((_) => result); |
| 70 }); |
| 71 }); |
| 72 } |
| 73 |
| 74 /** |
| 75 * Closes the `IsolateRunner` communication down. |
| 76 * |
| 77 * If the isolate isn't running something else to keep it alive, |
| 78 * this will also make the isolate shut down. |
| 79 * |
| 80 * Can be used to create an isolate, use [run] to start a service, and |
| 81 * then drop the connection and let the service control the isolate's |
| 82 * life cycle. |
| 83 */ |
| 84 Future close() { |
| 85 Completer portCompleter = new Completer.sync(); |
| 86 SendPort closePort = singleCallbackPort(portCompleter.complete); |
| 87 _commandPort.send(list2(_SHUTDOWN, closePort)); |
| 88 return portCompleter.future; |
| 89 } |
| 90 |
| 91 /** |
| 92 * Kills the isolate. |
| 93 * |
| 94 * Starts by calling [close], but if that doesn't cause the isolate to |
| 95 * shut down in a timely manner, as given by [timeout], it follows up |
| 96 * with [Isolate.kill], with increasing urgency if necessary. |
| 97 * |
| 98 * If [timeout] is a zero duration, it goes directly to the most urgent |
| 99 * kill. |
| 100 * |
| 101 * If the isolate is already dead, the returned future will not complete. |
| 102 * If that may be the case, use [Future.timeout] on the returned future |
| 103 * to take extra action after a while. Example: |
| 104 * |
| 105 * var f = isolate.kill(); |
| 106 * f.then((_) => print('Dead') |
| 107 * .timeout(new Duration(...), onTimeout: () => print('No response')); |
| 108 */ |
| 109 Future kill({Duration timeout: const Duration(seconds: 1)}) { |
| 110 Future onExit = singleResponseFuture(isolate.addOnExitListener); |
| 111 if (Duration.ZERO == timeout) { |
| 112 isolate.kill(Isolate.IMMEDIATE); |
| 113 return onExit; |
| 114 } else { |
| 115 // Try a more gentle shutdown sequence. |
| 116 _commandPort.send(list1(_SHUTDOWN)); |
| 117 return onExit.timeout(timeout, onTimeout: () { |
| 118 isolate.kill(Isolate.IMMEDIATE); |
| 119 return onExit; |
| 120 }); |
| 121 } |
| 122 } |
| 123 |
| 124 /** |
| 125 * Queries the isolate on whether it's alive. |
| 126 * |
| 127 * If the isolate is alive and responding to commands, the |
| 128 * returned future completes with `true`. |
| 129 * |
| 130 * If the other isolate is not alive (like after calling [kill]), |
| 131 * or doesn't answer within [timeout] for any other reason, |
| 132 * the returned future completes with `false`. |
| 133 * |
| 134 * Guaranteed to only complete after all previous sent isolate commands |
| 135 * (like pause and resume) have been handled. |
| 136 * Paused isolates do respond to ping requests. |
| 137 */ |
| 138 Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) { |
| 139 Completer completer = new Completer<bool>(); |
| 140 SendPort port = singleCompletePort(completer, |
| 141 callback: _kTrue, |
| 142 timeout: timeout, |
| 143 onTimeout: _kFalse); |
| 144 isolate.ping(port); |
| 145 return completer.future; |
| 146 } |
| 147 |
| 148 static bool _kTrue(_) => true; |
| 149 static bool _kFalse() => false; |
| 150 |
| 151 /** |
| 152 * Pauses the isolate. |
| 153 * |
| 154 * While paused, no normal messages are processed, and calls to [run] will |
| 155 * be delayed until the isolate is resumed. |
| 156 * |
| 157 * Commands like [kill] and [ping] are still executed while the isolate is |
| 158 * paused. |
| 159 * |
| 160 * If [resumeCapability] is omitted, it defaults to the [isolate]'s |
| 161 * [Isolate.pauseCapability]. |
| 162 * |
| 163 * Calling pause more than once with the same `resumeCapability` |
| 164 * has no further effect. Only a single call to [resume] is needed |
| 165 * to resume the isolate. |
| 166 */ |
| 167 void pause([Capability resumeCapability]) { |
| 168 if (resumeCapability == null) resumeCapability = isolate.pauseCapability; |
| 169 isolate.pause(resumeCapability); |
| 170 } |
| 171 |
| 172 /** |
| 173 * Resumes after a [pause]. |
| 174 * |
| 175 * If [resumeCapability] is omitted, it defaults to the isolate's |
| 176 * [Isolate.pauseCapability]. |
| 177 * |
| 178 * Even if `pause` has been called more than once with the same |
| 179 * `resumeCapability`, a single resume call with stop the pause. |
| 180 */ |
| 181 void resume([Capability resumeCapability]) { |
| 182 if (resumeCapability == null) resumeCapability = isolate.pauseCapability; |
| 183 isolate.resume(resumeCapability); |
| 184 } |
| 185 |
| 186 /** |
| 187 * Execute `function(argument)` in the isolate and return the result. |
| 188 * |
| 189 * Sends [function] and [argument] to the isolate, runs the call, and |
| 190 * returns the result, whether it returned a value or threw. |
| 191 * If the call returns a [Future], the final result of that future |
| 192 * will be returned. |
| 193 * |
| 194 * This works similar to the arguments to [Isolate.spawn], except that |
| 195 * it runs in the existing isolate and the return value is returned to |
| 196 * the caller. |
| 197 * |
| 198 * Example: |
| 199 * |
| 200 * IsolateRunner iso = await IsolateRunner.spawn(); |
| 201 * try { |
| 202 * return await iso.run(heavyComputation, argument); |
| 203 * } finally { |
| 204 * await iso.close(); |
| 205 * } |
| 206 */ |
| 207 Future run(function(argument), argument, |
| 208 {Duration timeout, onTimeout()}) { |
| 209 return singleResultFuture((SendPort port) { |
| 210 _commandPort.send( |
| 211 list4(_RUN, FunctionRef.from(function), argument, port)); |
| 212 }, timeout: timeout, onTimeout: onTimeout); |
| 213 } |
| 214 |
| 215 /** |
| 216 * A broadcast stream of uncaught errors from the isolate. |
| 217 * |
| 218 * When listening on the stream, errors from the isolate will be reported |
| 219 * as errors in the stream. Be ready to handle the errors. |
| 220 * |
| 221 * The stream closes when the isolate shuts down. |
| 222 */ |
| 223 Stream get errors { |
| 224 StreamController controller; |
| 225 RawReceivePort port; |
| 226 void handleError(message) { |
| 227 if (message == null) { |
| 228 // Isolate shutdown. |
| 229 port.close(); |
| 230 controller.close(); |
| 231 } else { |
| 232 // Uncaught error. |
| 233 String errorDescription = message[0]; |
| 234 String stackDescription = message[1]; |
| 235 var error = new RemoteError(errorDescription, stackDescription); |
| 236 controller.addError(error, error.stackTrace); |
| 237 } |
| 238 } |
| 239 controller = new StreamController.broadcast( |
| 240 sync: true, |
| 241 onListen: () { |
| 242 port = new RawReceivePort(handleError); |
| 243 // TODO: When supported, uncomment this. |
| 244 // isolate.addErrorListener(port.sendPort); |
| 245 // isolate.addOnExitListener(port.sendPort); |
| 246 // And remove the send below, which acts as an immediate close. |
| 247 port.sendPort.send(null); |
| 248 }, |
| 249 onCancel: () { |
| 250 port.close(); |
| 251 // this.removeErrorListener(port.sendPort); |
| 252 // this.removeOnExitListener(port.sendPort); |
| 253 port = null; |
| 254 }); |
| 255 return controller.stream; |
| 256 } |
| 257 |
| 258 /** |
| 259 * Waits for the [isolate] to terminate. |
| 260 * |
| 261 * Completes the returned future when the isolate terminates. |
| 262 * |
| 263 * If the isolate has already stopped responding to commands, |
| 264 * the returned future will never terminate. |
| 265 */ |
| 266 Future get onExit { |
| 267 // TODO(lrn): Is there a way to see if an isolate is dead |
| 268 // so we can close the receive port for this future? |
| 269 if (_onExitFuture == null) { |
| 270 _onExitFuture = singleResponseFuture(isolate.addOnExitListener); |
| 271 } |
| 272 return _onExitFuture; |
| 273 } |
| 274 } |
| 275 |
| 276 /** |
| 277 * The remote part of an [IsolateRunner]. |
| 278 * |
| 279 * The `IsolateRunner` sends commands to the controlled isolate through |
| 280 * the `IsolateRunnerRemote` [commandPort]. |
| 281 * |
| 282 * Only use this class if you need to set up the isolate manually |
| 283 * instead of relying on [IsolateRunner.spawn]. |
| 284 */ |
| 285 class IsolateRunnerRemote { |
| 286 final RawReceivePort _commandPort = new RawReceivePort(); |
| 287 IsolateRunnerRemote() { |
| 288 _commandPort.handler = _handleCommand; |
| 289 } |
| 290 |
| 291 /** |
| 292 * The command port that can be used to send commands to this remote. |
| 293 * |
| 294 * Use this as argument to [new IsolateRunner] if creating the link |
| 295 * manually, otherwise it's handled by [IsolateRunner.spawn]. |
| 296 */ |
| 297 SendPort get commandPort => _commandPort.sendPort; |
| 298 |
| 299 static void _create(SendPort initPort) { |
| 300 var remote = new IsolateRunnerRemote(); |
| 301 initPort.send(remote.commandPort); |
| 302 } |
| 303 |
| 304 void _handleCommand(List command) { |
| 305 switch (command[0]) { |
| 306 case _SHUTDOWN: |
| 307 SendPort responsePort = command[1]; |
| 308 _commandPort.close(); |
| 309 responsePort.send(null); |
| 310 return; |
| 311 case _RUN: |
| 312 Function function = command[1].function; |
| 313 var argument = command[2]; |
| 314 SendPort responsePort = command[3]; |
| 315 sendFutureResult(new Future.sync(() => function(argument)), |
| 316 responsePort); |
| 317 return; |
| 318 } |
| 319 } |
| 320 } |
OLD | NEW |