Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: lib/ports.dart

Issue 928663003: Add IsolateRunner as a helper around Isolate. (Closed) Base URL: https://github.com/dart-lang/isolate.git@master
Patch Set: Add .status. Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/loadbalancer.dart ('k') | lib/registry.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /**
6 * Utility functions for setting up ports and sending data.
7 *
8 * This library contains a number of functions that handle the
9 * boiler-plate of setting up a receive port and receiving a
10 * single message on the port.
11 *
12 * There are different functions that offer different ways to
13 * handle the incoming message.
14 *
15 * The simplest function, [singleCallbackPort], takes a callback
16 * and returns a port, and then calls the callback for the first
17 * message sent on the port.
18 *
19 * Other functions intercept the returned value and either
20 * does something with it, or puts it into a [Future] or [Completer].
21 */
22 library dart.pkg.isolate.ports;
23
24 import "dart:isolate";
25 import "dart:async";
26 import "src/lists.dart";
27
28 /**
29 * Create a [SendPort] that accepts only one message.
30 *
31 * The [callback] function is called once, with the first message
32 * received by the receive port.
33 * All futher messages are ignored.
34 *
35 * If [timeout] is supplied, it is used as a limit on how
36 * long it can take before the message is received. If a
37 * message isn't received in time, the `callback` function
38 * is called once with the [timeoutValue] instead.
39 *
40 * Returns the `SendPort` expecting the single message.
41 *
42 * Equivalent to:
43 *
44 * (new ReceivePort()
45 * ..first.timeout(duration, () => timeoutValue).then(callback))
46 * .sendPort
47 */
48 SendPort singleCallbackPort(void callback(response),
49 {Duration timeout,
50 var timeoutValue}) {
51 RawReceivePort responsePort = new RawReceivePort();
52 Zone zone = Zone.current;
53 callback = zone.registerUnaryCallback(callback);
54 var timer;
55 responsePort.handler = (response) {
56 responsePort.close();
57 if (timer != null) timer.cancel();
58 zone.runUnary(callback, response);
59 };
60 if (timeout != null) {
61 timer = new Timer(timeout, () {
62 responsePort.close();
63 callback(timeoutValue);
64 });
65 }
66 return responsePort.sendPort;
67 }
68
69 /**
70 * Create a [SendPort] that accepts only one message.
71 *
72 * When the first message is received, the [callback] function is
73 * called with the message as argument,
74 * and the [completer] is completed with the result of that call.
75 * All further messages are ignored.
76 *
77 * If `callback` is omitted, it defaults to an identity function.
78 * The `callback` call may return a future, and the completer will
79 * wait for that future to complete.
80 *
81 * If [timeout] is supplied, it is used as a limit on how
82 * long it can take before the message is received. If a
83 * message isn't received in time, the [onTimeout] is called,
84 * and `completer` is completed with the result of that call
85 * instead.
86 * The [callback] function will not be interrupted by the time-out,
87 * as long as the initial message is received in time.
88 * If `onTimeout` is omitted, it defaults to completing the `completer` with
89 * a [TimeoutException].
90 *
91 * The [completer] may be a synchronous completer. It is only
92 * completed in response to another event, either a port message or a timer.
93 *
94 * Returns the `SendPort` expecting the single message.
95 */
96 SendPort singleCompletePort(Completer completer,
97 {callback(message),
98 Duration timeout,
99 onTimeout()}) {
100 if (callback == null && timeout == null) {
101 return singleCallbackPort(completer.complete);
102 }
103 RawReceivePort responsePort = new RawReceivePort();
104 var timer;
105 if (callback == null) {
106 responsePort.handler = (response) {
107 responsePort.close();
108 if (timer != null) timer.cancel();
109 completer.complete(response);
110 };
111 } else {
112 Zone zone = Zone.current;
113 Function action = zone.registerUnaryCallback((response) {
114 completer.complete(new Future.sync(() => callback(response)));
115 });
116 responsePort.handler = (response) {
117 responsePort.close();
118 if (timer != null) timer.cancel();
119 zone.runUnary(action, response);
120 };
121 }
122 if (timeout != null) {
123 timer = new Timer(timeout, () {
124 responsePort.close();
125 if (onTimeout != null) {
126 completer.complete(new Future.sync(onTimeout));
127 } else {
128 completer.completeError(new TimeoutException("Future not completed",
129 timeout));
130 }
131 });
132 }
133 return responsePort.sendPort;
134 }
135
136 /**
137 * Creates a [Future], and a [SendPort] that can be used to complete that
138 * future.
139 *
140 * Calls [action] with the response `SendPort`, then waits for someone
141 * to send a value on that port
142 * The returned `Future` is completed with the value sent on the port.
143 *
144 * If [action] throws, which it shouldn't,
145 * the returned future is completed with that error.
146 * Any return value of `action` is ignored, and if it is asynchronous,
147 * it should handle its own errors.
148 *
149 * If [timeout] is supplied, it is used as a limit on how
150 * long it can take before the message is received. If a
151 * message isn't received in time, the [timeoutValue] used
152 * as the returned future's value instead.
153 *
154 * If you want a timeout on the returned future, it's recommended to
155 * use the [timeout] parameter, and not [Future.timeout] on the result.
156 * The `Future` method won't be able to close the underlying [ReceivePort].
157 */
158 Future singleResponseFuture(void action(SendPort responsePort),
159 {Duration timeout,
160 var timeoutValue}) {
161 Completer completer = new Completer.sync();
162 RawReceivePort responsePort = new RawReceivePort();
163 Timer timer;
164 Zone zone = Zone.current;
165 responsePort.handler = (v) {
166 responsePort.close();
167 if (timer != null) timer.cancel();
168 zone.run(() {
169 completer.complete(v);
170 });
171 };
172 if (timeout != null) {
173 timer = new Timer(timeout, () {
174 responsePort.close();
175 completer.complete(timeoutValue);
176 });
177 }
178 try {
179 action(responsePort.sendPort);
180 } catch (e, s) {
181 responsePort.close();
182 if (timer != null) timer.cancel();
183 // Delay completion because completer is sync.
184 scheduleMicrotask(() { completer.completeError(e, s); });
185 }
186 return completer.future;
187 }
188
189
190 /**
191 * Send the result of a future, either value or error, as a message.
192 *
193 * The result of [future] is sent on [resultPort] in a form expected by
194 * either [receiveFutureResult], [completeFutureResult], or
195 * by the port of [singleResultFuture].
196 */
197 void sendFutureResult(Future future, SendPort resultPort) {
198 future.then((v) { resultPort.send(list1(v)); },
199 onError: (e, s) { resultPort.send(list2("$e", "$s")); });
200 }
201
202
203 /**
204 * Creates a [Future], and a [SendPort] that can be used to complete that
205 * future.
206 *
207 * Calls [action] with the response `SendPort`, then waits for someone
208 * to send a future result on that port using [sendFutureResult].
209 * The returned `Future` is completed with the future result sent on the port.
210 *
211 * If [action] throws, which it shouldn't,
212 * the returned future is completed with that error,
213 * unless someone manages to send a message on the port before `action` throws.
214 *
215 * If [timeout] is supplied, it is used as a limit on how
216 * long it can take before the message is received. If a
217 * message isn't received in time, the [onTimeout] is called,
218 * and the future is completed with the result of that call
219 * instead.
220 * If `onTimeout` is omitted, it defaults to throwing
221 * a [TimeoutException].
222 */
223 Future singleResultFuture(void action(SendPort responsePort),
224 {Duration timeout,
225 onTimeout()}) {
226 Completer completer = new Completer.sync();
227 SendPort port = singleCompletePort(completer,
228 callback: receiveFutureResult,
229 timeout: timeout,
230 onTimeout: onTimeout);
231 try {
232 action(port);
233 } catch (e, s) {
234 // This should not happen.
235 sendFutureResult(new Future.error(e, s), port);
236 }
237 return completer.future;
238 }
239
240 /**
241 * Completes a completer with a message created by [sendFutureResult]
242 *
243 * The [response] must be a message on the format sent by [sendFutureResult].
244 */
245 void completeFutureResult(var response, Completer completer) {
246 if (response.length == 2) {
247 var error = new RemoteError(response[0], response[1]);
248 completer.completeError(error, error.stackTrace);
249 } else {
250 var result = response[0];
251 completer.complete(result);
252 }
253 }
254
255
256 /**
257 * Convertes a received message created by [sendFutureResult] to a future
258 * result.
259 *
260 * The [response] must be a message on the format sent by [sendFutureResult].
261 */
262 Future receiveFutureResult(var response) {
263 if (response.length == 2) {
264 var error = new RemoteError(response[0], response[1]);
265 return new Future.error(error, error.stackTrace);
266 }
267 var result = response[0];
268 return new Future.value(result);
269 }
OLDNEW
« no previous file with comments | « lib/loadbalancer.dart ('k') | lib/registry.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698