Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(188)

Side by Side Diff: lib/isolaterunner.dart

Issue 928663003: Add IsolateRunner as a helper around Isolate. (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Add .status. Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/isolate.dart ('k') | lib/loadbalancer.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library dart.pkg.isolate.isolaterunner;
6
7 import "dart:isolate";
8 import "dart:async";
9 import "runner.dart";
10 import "ports.dart";
11 import "src/functionref.dart";
12 import "src/lists.dart";
13
14 // Command tags. Shared between IsolateRunner and IsolateRunnerRemote.
15 const int _SHUTDOWN = 0;
16 const int _RUN = 1;
17
18 /**
19 * An easier to use interface on top of an [Isolate].
20 *
21 * Wraps an `Isolate` and allows pausing, killing and inspecting
22 * the isolate more conveniently than the raw `Isolate` methods.
23 *
24 * Also allows running simple functions in the other isolate, and get back
25 * the result.
26 */
27 class IsolateRunner implements Runner {
28 /** The underlying [Isolate] object of the isolate being controlled. */
29 final Isolate isolate;
30
31 /** Command port for the [IsolateRunnerRemote]. */
32 final SendPort _commandPort;
33
34 /** Future returned by [onExit]. Set when [onExit] is first read. */
35 Future _onExitFuture;
36
37 /**
38 * Create an [IsolateRunner] wrapper for [isolate]
39 *
40 * The preferred way to create an `IsolateRunner` is to use [spawn]
41 * to create a new isolate and a runner for it.
42 *
43 * This constructor allows creating a runner for an already existing
44 * isolate.
45 * The [commandPort] must be the [IsolateRunnerRemote.commandPort] of
46 * a remote running in that isolate.
47 */
48 IsolateRunner(this.isolate, SendPort commandPort)
49 : _commandPort = commandPort;
50
51 /**
52 * Create a new [Isolate], as by [Isolate.spawn] and wrap that.
53 *
54 * The returned [IsolateRunner] forwards operations to the new isolate,
55 * and keeps a port open in the new isolate that receives commands
56 * from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when
57 * it's no longer needed.
58 */
59 static Future<IsolateRunner> spawn() {
60 Completer portCompleter = new Completer.sync();
61 SendPort initPort = singleCompletePort(portCompleter);
62 return Isolate.spawn(IsolateRunnerRemote._create, initPort)
63 .then((Isolate isolate) {
64 // TODO: Add when VM supports it.
65 // isolate.setErrorsFatal(false);
66 return portCompleter.future.then((SendPort commandPort) {
67 var result = new IsolateRunner(isolate, commandPort);
68 // Guarantees that setErrorsFatal has completed.
69 return result.ping().then((_) => result);
70 });
71 });
72 }
73
74 /**
75 * Closes the `IsolateRunner` communication down.
76 *
77 * If the isolate isn't running something else to keep it alive,
78 * this will also make the isolate shut down.
79 *
80 * Can be used to create an isolate, use [run] to start a service, and
81 * then drop the connection and let the service control the isolate's
82 * life cycle.
83 */
84 Future close() {
85 Completer portCompleter = new Completer.sync();
86 SendPort closePort = singleCallbackPort(portCompleter.complete);
87 _commandPort.send(list2(_SHUTDOWN, closePort));
88 return portCompleter.future;
89 }
90
91 /**
92 * Kills the isolate.
93 *
94 * Starts by calling [close], but if that doesn't cause the isolate to
95 * shut down in a timely manner, as given by [timeout], it follows up
96 * with [Isolate.kill], with increasing urgency if necessary.
97 *
98 * If [timeout] is a zero duration, it goes directly to the most urgent
99 * kill.
100 *
101 * If the isolate is already dead, the returned future will not complete.
102 * If that may be the case, use [Future.timeout] on the returned future
103 * to take extra action after a while. Example:
104 *
105 * var f = isolate.kill();
106 * f.then((_) => print('Dead')
107 * .timeout(new Duration(...), onTimeout: () => print('No response'));
108 */
109 Future kill({Duration timeout: const Duration(seconds: 1)}) {
110 Future onExit = singleResponseFuture(isolate.addOnExitListener);
111 if (Duration.ZERO == timeout) {
112 isolate.kill(Isolate.IMMEDIATE);
113 return onExit;
114 } else {
115 // Try a more gentle shutdown sequence.
116 _commandPort.send(list1(_SHUTDOWN));
117 return onExit.timeout(timeout, onTimeout: () {
118 isolate.kill(Isolate.IMMEDIATE);
119 return onExit;
120 });
121 }
122 }
123
124 /**
125 * Queries the isolate on whether it's alive.
126 *
127 * If the isolate is alive and responding to commands, the
128 * returned future completes with `true`.
129 *
130 * If the other isolate is not alive (like after calling [kill]),
131 * or doesn't answer within [timeout] for any other reason,
132 * the returned future completes with `false`.
133 *
134 * Guaranteed to only complete after all previous sent isolate commands
135 * (like pause and resume) have been handled.
136 * Paused isolates do respond to ping requests.
137 */
138 Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) {
139 Completer completer = new Completer<bool>();
140 SendPort port = singleCompletePort(completer,
141 callback: _kTrue,
142 timeout: timeout,
143 onTimeout: _kFalse);
144 isolate.ping(port);
145 return completer.future;
146 }
147
148 static bool _kTrue(_) => true;
149 static bool _kFalse() => false;
150
151 /**
152 * Pauses the isolate.
153 *
154 * While paused, no normal messages are processed, and calls to [run] will
155 * be delayed until the isolate is resumed.
156 *
157 * Commands like [kill] and [ping] are still executed while the isolate is
158 * paused.
159 *
160 * If [resumeCapability] is omitted, it defaults to the [isolate]'s
161 * [Isolate.pauseCapability].
162 *
163 * Calling pause more than once with the same `resumeCapability`
164 * has no further effect. Only a single call to [resume] is needed
165 * to resume the isolate.
166 */
167 void pause([Capability resumeCapability]) {
168 if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
169 isolate.pause(resumeCapability);
170 }
171
172 /**
173 * Resumes after a [pause].
174 *
175 * If [resumeCapability] is omitted, it defaults to the isolate's
176 * [Isolate.pauseCapability].
177 *
178 * Even if `pause` has been called more than once with the same
179 * `resumeCapability`, a single resume call with stop the pause.
180 */
181 void resume([Capability resumeCapability]) {
182 if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
183 isolate.resume(resumeCapability);
184 }
185
186 /**
187 * Execute `function(argument)` in the isolate and return the result.
188 *
189 * Sends [function] and [argument] to the isolate, runs the call, and
190 * returns the result, whether it returned a value or threw.
191 * If the call returns a [Future], the final result of that future
192 * will be returned.
193 *
194 * This works similar to the arguments to [Isolate.spawn], except that
195 * it runs in the existing isolate and the return value is returned to
196 * the caller.
197 *
198 * Example:
199 *
200 * IsolateRunner iso = await IsolateRunner.spawn();
201 * try {
202 * return await iso.run(heavyComputation, argument);
203 * } finally {
204 * await iso.close();
205 * }
206 */
207 Future run(function(argument), argument,
208 {Duration timeout, onTimeout()}) {
209 return singleResultFuture((SendPort port) {
210 _commandPort.send(
211 list4(_RUN, FunctionRef.from(function), argument, port));
212 }, timeout: timeout, onTimeout: onTimeout);
213 }
214
215 /**
216 * A broadcast stream of uncaught errors from the isolate.
217 *
218 * When listening on the stream, errors from the isolate will be reported
219 * as errors in the stream. Be ready to handle the errors.
220 *
221 * The stream closes when the isolate shuts down.
222 */
223 Stream get errors {
224 StreamController controller;
225 RawReceivePort port;
226 void handleError(message) {
227 if (message == null) {
228 // Isolate shutdown.
229 port.close();
230 controller.close();
231 } else {
232 // Uncaught error.
233 String errorDescription = message[0];
234 String stackDescription = message[1];
235 var error = new RemoteError(errorDescription, stackDescription);
236 controller.addError(error, error.stackTrace);
237 }
238 }
239 controller = new StreamController.broadcast(
240 sync: true,
241 onListen: () {
242 port = new RawReceivePort(handleError);
243 // TODO: When supported, uncomment this.
244 // isolate.addErrorListener(port.sendPort);
245 // isolate.addOnExitListener(port.sendPort);
246 // And remove the send below, which acts as an immediate close.
247 port.sendPort.send(null);
248 },
249 onCancel: () {
250 port.close();
251 // this.removeErrorListener(port.sendPort);
252 // this.removeOnExitListener(port.sendPort);
253 port = null;
254 });
255 return controller.stream;
256 }
257
258 /**
259 * Waits for the [isolate] to terminate.
260 *
261 * Completes the returned future when the isolate terminates.
262 *
263 * If the isolate has already stopped responding to commands,
264 * the returned future will never terminate.
265 */
266 Future get onExit {
267 // TODO(lrn): Is there a way to see if an isolate is dead
268 // so we can close the receive port for this future?
269 if (_onExitFuture == null) {
270 _onExitFuture = singleResponseFuture(isolate.addOnExitListener);
271 }
272 return _onExitFuture;
273 }
274 }
275
276 /**
277 * The remote part of an [IsolateRunner].
278 *
279 * The `IsolateRunner` sends commands to the controlled isolate through
280 * the `IsolateRunnerRemote` [commandPort].
281 *
282 * Only use this class if you need to set up the isolate manually
283 * instead of relying on [IsolateRunner.spawn].
284 */
285 class IsolateRunnerRemote {
286 final RawReceivePort _commandPort = new RawReceivePort();
287 IsolateRunnerRemote() {
288 _commandPort.handler = _handleCommand;
289 }
290
291 /**
292 * The command port that can be used to send commands to this remote.
293 *
294 * Use this as argument to [new IsolateRunner] if creating the link
295 * manually, otherwise it's handled by [IsolateRunner.spawn].
296 */
297 SendPort get commandPort => _commandPort.sendPort;
298
299 static void _create(SendPort initPort) {
300 var remote = new IsolateRunnerRemote();
301 initPort.send(remote.commandPort);
302 }
303
304 void _handleCommand(List command) {
305 switch (command[0]) {
306 case _SHUTDOWN:
307 SendPort responsePort = command[1];
308 _commandPort.close();
309 responsePort.send(null);
310 return;
311 case _RUN:
312 Function function = command[1].function;
313 var argument = command[2];
314 SendPort responsePort = command[3];
315 sendFutureResult(new Future.sync(() => function(argument)),
316 responsePort);
317 return;
318 }
319 }
320 }
OLDNEW
« no previous file with comments | « lib/isolate.dart ('k') | lib/loadbalancer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698