| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library dart.pkg.isolate.isolaterunner; | |
| 6 | |
| 7 import "dart:isolate"; | |
| 8 import "dart:async"; | |
| 9 import "runner.dart"; | |
| 10 import "ports.dart"; | |
| 11 import "src/lists.dart"; | |
| 12 | |
| 13 // Command tags. Shared between IsolateRunner and IsolateRunnerRemote. | |
| 14 const int _SHUTDOWN = 0; | |
| 15 const int _RUN = 1; | |
| 16 | |
| 17 /// An easier to use interface on top of an [Isolate]. | |
| 18 /// | |
| 19 /// Wraps an `Isolate` and allows pausing, killing and inspecting | |
| 20 /// the isolate more conveniently than the raw `Isolate` methods. | |
| 21 /// | |
| 22 /// Also allows running simple functions in the other isolate, and get back | |
| 23 /// the result. | |
| 24 class IsolateRunner implements Runner { | |
| 25 /// The underlying [Isolate] object of the isolate being controlled. | |
| 26 final Isolate isolate; | |
| 27 | |
| 28 /// Command port for the [IsolateRunnerRemote]. | |
| 29 final SendPort _commandPort; | |
| 30 | |
| 31 /// Future returned by [onExit]. Set when [onExit] is first read. | |
| 32 Future _onExitFuture; | |
| 33 | |
| 34 /// Create an [IsolateRunner] wrapper for [isolate] | |
| 35 /// | |
| 36 /// The preferred way to create an `IsolateRunner` is to use [spawn] | |
| 37 /// to create a new isolate and a runner for it. | |
| 38 /// | |
| 39 /// This constructor allows creating a runner for an already existing | |
| 40 /// isolate. | |
| 41 /// The [commandPort] must be the [IsolateRunnerRemote.commandPort] of | |
| 42 /// a remote running in that isolate. | |
| 43 IsolateRunner(this.isolate, SendPort commandPort) | |
| 44 : _commandPort = commandPort; | |
| 45 | |
| 46 /// Create a new [Isolate], as by [Isolate.spawn] and wrap that. | |
| 47 /// | |
| 48 /// The returned [IsolateRunner] forwards operations to the new isolate, | |
| 49 /// and keeps a port open in the new isolate that receives commands | |
| 50 /// from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when | |
| 51 /// it's no longer needed. | |
| 52 /// | |
| 53 /// The created isolate is set to have errors not be fatal. | |
| 54 static Future<IsolateRunner> spawn() async { | |
| 55 var channel = new SingleResponseChannel(); | |
| 56 var isolate = await Isolate.spawn(IsolateRunnerRemote._create, | |
| 57 channel.port); | |
| 58 // The runner can be used to run multiple independent functions. | |
| 59 // An accidentally uncaught error shouldn't ruin it for everybody else. | |
| 60 isolate.setErrorsFatal(false); | |
| 61 var pingChannel = new SingleResponseChannel(); | |
| 62 isolate.ping(pingChannel.port); | |
| 63 var commandPort = await channel.result; | |
| 64 var result = new IsolateRunner(isolate, commandPort); | |
| 65 // Guarantees that setErrorsFatal has completed. | |
| 66 await pingChannel.result; | |
| 67 return result; | |
| 68 } | |
| 69 | |
| 70 /// Closes the `IsolateRunner` communication down. | |
| 71 /// | |
| 72 /// If the isolate isn't running something else to keep it alive, | |
| 73 /// this will also make the isolate shut down. | |
| 74 /// | |
| 75 /// Can be used to create an isolate, use [run] to start a service, and | |
| 76 /// then drop the connection and let the service control the isolate's | |
| 77 /// life cycle. | |
| 78 Future close() { | |
| 79 var channel = new SingleResponseChannel(); | |
| 80 _commandPort.send(list2(_SHUTDOWN, channel.port)); | |
| 81 return channel.result; | |
| 82 } | |
| 83 | |
| 84 /// Kills the isolate. | |
| 85 /// | |
| 86 /// Starts by calling [close], but if that doesn't cause the isolate to | |
| 87 /// shut down in a timely manner, as given by [timeout], it follows up | |
| 88 /// with [Isolate.kill], with increasing urgency if necessary. | |
| 89 /// | |
| 90 /// If [timeout] is a zero duration, it goes directly to the most urgent | |
| 91 /// kill. | |
| 92 /// | |
| 93 /// If the isolate is already dead, the returned future will not complete. | |
| 94 /// If that may be the case, use [Future.timeout] on the returned future | |
| 95 /// to take extra action after a while. Example: | |
| 96 /// | |
| 97 /// var f = isolate.kill(); | |
| 98 /// f.then((_) => print('Dead') | |
| 99 /// .timeout(new Duration(...), onTimeout: () => print('No response')); | |
| 100 Future kill({Duration timeout: const Duration(seconds: 1)}) { | |
| 101 Future onExit = singleResponseFuture(isolate.addOnExitListener); | |
| 102 if (Duration.ZERO == timeout) { | |
| 103 isolate.kill(Isolate.IMMEDIATE); | |
| 104 return onExit; | |
| 105 } else { | |
| 106 // Try a more gentle shutdown sequence. | |
| 107 _commandPort.send(list1(_SHUTDOWN)); | |
| 108 return onExit.timeout(timeout, onTimeout: () { | |
| 109 isolate.kill(Isolate.IMMEDIATE); | |
| 110 return onExit; | |
| 111 }); | |
| 112 } | |
| 113 } | |
| 114 | |
| 115 /// Queries the isolate on whether it's alive. | |
| 116 /// | |
| 117 /// If the isolate is alive and responding to commands, the | |
| 118 /// returned future completes with `true`. | |
| 119 /// | |
| 120 /// If the other isolate is not alive (like after calling [kill]), | |
| 121 /// or doesn't answer within [timeout] for any other reason, | |
| 122 /// the returned future completes with `false`. | |
| 123 /// | |
| 124 /// Guaranteed to only complete after all previous sent isolate commands | |
| 125 /// (like pause and resume) have been handled. | |
| 126 /// Paused isolates do respond to ping requests. | |
| 127 Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) { | |
| 128 var channel = new SingleResponseChannel(callback: _kTrue, | |
| 129 timeout: timeout, | |
| 130 timeoutValue: false); | |
| 131 isolate.ping(channel.port); | |
| 132 return channel.result; | |
| 133 } | |
| 134 | |
| 135 static bool _kTrue(_) => true; | |
| 136 static bool _kFalse() => false; | |
| 137 | |
| 138 /// Pauses the isolate. | |
| 139 /// | |
| 140 /// While paused, no normal messages are processed, and calls to [run] will | |
| 141 /// be delayed until the isolate is resumed. | |
| 142 /// | |
| 143 /// Commands like [kill] and [ping] are still executed while the isolate is | |
| 144 /// paused. | |
| 145 /// | |
| 146 /// If [resumeCapability] is omitted, it defaults to the [isolate]'s | |
| 147 /// [Isolate.pauseCapability]. | |
| 148 /// | |
| 149 /// Calling pause more than once with the same `resumeCapability` | |
| 150 /// has no further effect. Only a single call to [resume] is needed | |
| 151 /// to resume the isolate. | |
| 152 void pause([Capability resumeCapability]) { | |
| 153 if (resumeCapability == null) resumeCapability = isolate.pauseCapability; | |
| 154 isolate.pause(resumeCapability); | |
| 155 } | |
| 156 | |
| 157 /// Resumes after a [pause]. | |
| 158 /// | |
| 159 /// If [resumeCapability] is omitted, it defaults to the isolate's | |
| 160 /// [Isolate.pauseCapability]. | |
| 161 /// | |
| 162 /// Even if `pause` has been called more than once with the same | |
| 163 /// `resumeCapability`, a single resume call with stop the pause. | |
| 164 void resume([Capability resumeCapability]) { | |
| 165 if (resumeCapability == null) resumeCapability = isolate.pauseCapability; | |
| 166 isolate.resume(resumeCapability); | |
| 167 } | |
| 168 | |
| 169 /// Execute `function(argument)` in the isolate and return the result. | |
| 170 /// | |
| 171 /// Sends [function] and [argument] to the isolate, runs the call, and | |
| 172 /// returns the result, whether it returned a value or threw. | |
| 173 /// If the call returns a [Future], the final result of that future | |
| 174 /// will be returned. | |
| 175 /// | |
| 176 /// This works similar to the arguments to [Isolate.spawn], except that | |
| 177 /// it runs in the existing isolate and the return value is returned to | |
| 178 /// the caller. | |
| 179 /// | |
| 180 /// Example: | |
| 181 /// | |
| 182 /// IsolateRunner iso = await IsolateRunner.spawn(); | |
| 183 /// try { | |
| 184 /// return await iso.run(heavyComputation, argument); | |
| 185 /// } finally { | |
| 186 /// await iso.close(); | |
| 187 /// } | |
| 188 Future run(function(argument), argument, {Duration timeout, onTimeout()}) { | |
| 189 return singleResultFuture((SendPort port) { | |
| 190 _commandPort.send(list4(_RUN, function, argument, port)); | |
| 191 }, timeout: timeout, onTimeout: onTimeout); | |
| 192 } | |
| 193 | |
| 194 /// A broadcast stream of uncaught errors from the isolate. | |
| 195 /// | |
| 196 /// When listening on the stream, errors from the isolate will be reported | |
| 197 /// as errors in the stream. Be ready to handle the errors. | |
| 198 /// | |
| 199 /// The stream closes when the isolate shuts down. | |
| 200 Stream get errors { | |
| 201 StreamController controller; | |
| 202 RawReceivePort port; | |
| 203 void handleError(message) { | |
| 204 if (message == null) { | |
| 205 // Isolate shutdown. | |
| 206 port.close(); | |
| 207 controller.close(); | |
| 208 } else { | |
| 209 // Uncaught error. | |
| 210 String errorDescription = message[0]; | |
| 211 String stackDescription = message[1]; | |
| 212 var error = new RemoteError(errorDescription, stackDescription); | |
| 213 controller.addError(error, error.stackTrace); | |
| 214 } | |
| 215 } | |
| 216 controller = new StreamController.broadcast( | |
| 217 sync: true, | |
| 218 onListen: () { | |
| 219 port = new RawReceivePort(handleError); | |
| 220 isolate.addErrorListener(port.sendPort); | |
| 221 isolate.addOnExitListener(port.sendPort); | |
| 222 }, | |
| 223 onCancel: () { | |
| 224 isolate.removeErrorListener(port.sendPort); | |
| 225 isolate.removeOnExitListener(port.sendPort); | |
| 226 port.close(); | |
| 227 port = null; | |
| 228 }); | |
| 229 return controller.stream; | |
| 230 } | |
| 231 | |
| 232 /// Waits for the [isolate] to terminate. | |
| 233 /// | |
| 234 /// Completes the returned future when the isolate terminates. | |
| 235 /// | |
| 236 /// If the isolate has already stopped responding to commands, | |
| 237 /// the returned future will be completed after one second, | |
| 238 /// using [ping] to check if the isolate is still alive. | |
| 239 Future get onExit { | |
| 240 // TODO(lrn): Is there a way to see if an isolate is dead | |
| 241 // so we can close the receive port for this future? | |
| 242 if (_onExitFuture == null) { | |
| 243 var channel = new SingleResponseChannel(); | |
| 244 isolate.addOnExitListener(channel.port); | |
| 245 _onExitFuture = channel.result; | |
| 246 ping().then((bool alive) { | |
| 247 if (!alive) { | |
| 248 channel.interrupt(); | |
| 249 _onExitFuture = null; | |
| 250 } | |
| 251 }); | |
| 252 } | |
| 253 return _onExitFuture; | |
| 254 } | |
| 255 } | |
| 256 | |
| 257 /// The remote part of an [IsolateRunner]. | |
| 258 /// | |
| 259 /// The `IsolateRunner` sends commands to the controlled isolate through | |
| 260 /// the `IsolateRunnerRemote` [commandPort]. | |
| 261 /// | |
| 262 /// Only use this class if you need to set up the isolate manually | |
| 263 /// instead of relying on [IsolateRunner.spawn]. | |
| 264 class IsolateRunnerRemote { | |
| 265 final RawReceivePort _commandPort = new RawReceivePort(); | |
| 266 IsolateRunnerRemote() { | |
| 267 _commandPort.handler = _handleCommand; | |
| 268 } | |
| 269 | |
| 270 /// The command port that can be used to send commands to this remote. | |
| 271 /// | |
| 272 /// Use this as argument to [new IsolateRunner] if creating the link | |
| 273 /// manually, otherwise it's handled by [IsolateRunner.spawn]. | |
| 274 SendPort get commandPort => _commandPort.sendPort; | |
| 275 | |
| 276 static void _create(SendPort initPort) { | |
| 277 var remote = new IsolateRunnerRemote(); | |
| 278 initPort.send(remote.commandPort); | |
| 279 } | |
| 280 | |
| 281 void _handleCommand(List command) { | |
| 282 switch (command[0]) { | |
| 283 case _SHUTDOWN: | |
| 284 SendPort responsePort = command[1]; | |
| 285 _commandPort.close(); | |
| 286 responsePort.send(null); | |
| 287 return; | |
| 288 case _RUN: | |
| 289 Function function = command[1]; | |
| 290 var argument = command[2]; | |
| 291 SendPort responsePort = command[3]; | |
| 292 sendFutureResult(new Future.sync(() => function(argument)), | |
| 293 responsePort); | |
| 294 return; | |
| 295 } | |
| 296 } | |
| 297 } | |
| OLD | NEW |