Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(303)

Side by Side Diff: packages/isolate/lib/ports.dart

Issue 2990843002: Removed fixed dependencies (Closed)
Patch Set: Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/isolate/lib/load_balancer.dart ('k') | packages/isolate/lib/registry.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 /// Utility functions for setting up ports and sending data.
6 ///
7 /// This library contains a number of functions that handle the
8 /// boiler-plate of setting up a receive port and receiving a
9 /// single message on the port.
10 ///
11 /// There are different functions that offer different ways to
12 /// handle the incoming message.
13 ///
14 /// The simplest function, [singleCallbackPort], takes a callback
15 /// and returns a port, and then calls the callback for the first
16 /// message sent on the port.
17 ///
18 /// Other functions intercept the returned value and either
19 /// does something with it, or puts it into a [Future] or [Completer].
20 library isolate.ports;
21
22 import "dart:async";
23 import "dart:isolate";
24
25 import "src/lists.dart";
26
27 /// Create a [SendPort] that accepts only one message.
28 ///
29 /// The [callback] function is called once, with the first message
30 /// received by the receive port.
31 /// All further messages are ignored.
32 ///
33 /// If [timeout] is supplied, it is used as a limit on how
34 /// long it can take before the message is received. If a
35 /// message isn't received in time, the `callback` function
36 /// is called once with the [timeoutValue] instead.
37 ///
38 /// Returns the `SendPort` expecting the single message.
39 ///
40 /// Equivalent to:
41 ///
42 /// (new ReceivePort()
43 /// ..first.timeout(duration, () => timeoutValue).then(callback))
44 /// .sendPort
45 SendPort singleCallbackPort(void callback(response),
46 {Duration timeout,
47 var timeoutValue}) {
48 RawReceivePort responsePort = new RawReceivePort();
49 Zone zone = Zone.current;
50 callback = zone.registerUnaryCallback(callback);
51 var timer;
52 responsePort.handler = (response) {
53 responsePort.close();
54 if (timer != null) timer.cancel();
55 zone.runUnary(callback, response);
56 };
57 if (timeout != null) {
58 timer = new Timer(timeout, () {
59 responsePort.close();
60 callback(timeoutValue);
61 });
62 }
63 return responsePort.sendPort;
64 }
65
66 /// Create a [SendPort] that accepts only one message.
67 ///
68 /// When the first message is received, the [callback] function is
69 /// called with the message as argument,
70 /// and the [completer] is completed with the result of that call.
71 /// All further messages are ignored.
72 ///
73 /// If `callback` is omitted, it defaults to an identity function.
74 /// The `callback` call may return a future, and the completer will
75 /// wait for that future to complete.
76 ///
77 /// If [timeout] is supplied, it is used as a limit on how
78 /// long it can take before the message is received. If a
79 /// message isn't received in time, the [onTimeout] is called,
80 /// and `completer` is completed with the result of that call
81 /// instead.
82 /// The [callback] function will not be interrupted by the time-out,
83 /// as long as the initial message is received in time.
84 /// If `onTimeout` is omitted, it defaults to completing the `completer` with
85 /// a [TimeoutException].
86 ///
87 /// The [completer] may be a synchronous completer. It is only
88 /// completed in response to another event, either a port message or a timer.
89 ///
90 /// Returns the `SendPort` expecting the single message.
91 SendPort singleCompletePort(Completer completer,
92 {callback(message),
93 Duration timeout,
94 onTimeout()}) {
95 if (callback == null && timeout == null) {
96 return singleCallbackPort(completer.complete);
97 }
98 RawReceivePort responsePort = new RawReceivePort();
99 var timer;
100 if (callback == null) {
101 responsePort.handler = (response) {
102 responsePort.close();
103 if (timer != null) timer.cancel();
104 completer.complete(response);
105 };
106 } else {
107 Zone zone = Zone.current;
108 Function action = zone.registerUnaryCallback((response) {
109 completer.complete(new Future.sync(() => callback(response)));
110 });
111 responsePort.handler = (response) {
112 responsePort.close();
113 if (timer != null) timer.cancel();
114 zone.runUnary(action, response);
115 };
116 }
117 if (timeout != null) {
118 timer = new Timer(timeout, () {
119 responsePort.close();
120 if (onTimeout != null) {
121 completer.complete(new Future.sync(onTimeout));
122 } else {
123 completer.completeError(
124 new TimeoutException("Future not completed", timeout));
125 }
126 });
127 }
128 return responsePort.sendPort;
129 }
130
131 /// Creates a [Future], and a [SendPort] that can be used to complete that
132 /// future.
133 ///
134 /// Calls [action] with the response `SendPort`, then waits for someone
135 /// to send a value on that port
136 /// The returned `Future` is completed with the value sent on the port.
137 ///
138 /// If [action] throws, which it shouldn't,
139 /// the returned future is completed with that error.
140 /// Any return value of `action` is ignored, and if it is asynchronous,
141 /// it should handle its own errors.
142 ///
143 /// If [timeout] is supplied, it is used as a limit on how
144 /// long it can take before the message is received. If a
145 /// message isn't received in time, the [timeoutValue] used
146 /// as the returned future's value instead.
147 ///
148 /// If you want a timeout on the returned future, it's recommended to
149 /// use the [timeout] parameter, and not [Future.timeout] on the result.
150 /// The `Future` method won't be able to close the underlying [ReceivePort].
151 Future singleResponseFuture(void action(SendPort responsePort),
152 {Duration timeout,
153 var timeoutValue}) {
154 Completer completer = new Completer.sync();
155 RawReceivePort responsePort = new RawReceivePort();
156 Timer timer;
157 Zone zone = Zone.current;
158 responsePort.handler = (v) {
159 responsePort.close();
160 if (timer != null) timer.cancel();
161 zone.run(() {
162 completer.complete(v);
163 });
164 };
165 if (timeout != null) {
166 timer = new Timer(timeout, () {
167 responsePort.close();
168 completer.complete(timeoutValue);
169 });
170 }
171 try {
172 action(responsePort.sendPort);
173 } catch (e, s) {
174 responsePort.close();
175 if (timer != null) timer.cancel();
176 // Delay completion because completer is sync.
177 scheduleMicrotask(() {
178 completer.completeError(e, s);
179 });
180 }
181 return completer.future;
182 }
183
184
185 /// Send the result of a future, either value or error, as a message.
186 ///
187 /// The result of [future] is sent on [resultPort] in a form expected by
188 /// either [receiveFutureResult], [completeFutureResult], or
189 /// by the port of [singleResultFuture].
190 void sendFutureResult(Future future, SendPort resultPort) {
191 future.then(
192 (v) { resultPort.send(list1(v));
193 }, onError: (e, s) {
194 resultPort.send(list2("$e", "$s"));
195 });
196 }
197
198
199 /// Creates a [Future], and a [SendPort] that can be used to complete that
200 /// future.
201 ///
202 /// Calls [action] with the response `SendPort`, then waits for someone
203 /// to send a future result on that port using [sendFutureResult].
204 /// The returned `Future` is completed with the future result sent on the port.
205 ///
206 /// If [action] throws, which it shouldn't,
207 /// the returned future is completed with that error,
208 /// unless someone manages to send a message on the port before `action` throws.
209 ///
210 /// If [timeout] is supplied, it is used as a limit on how
211 /// long it can take before the message is received. If a
212 /// message isn't received in time, the [onTimeout] is called,
213 /// and the future is completed with the result of that call
214 /// instead.
215 /// If `onTimeout` is omitted, it defaults to throwing
216 /// a [TimeoutException].
217 Future singleResultFuture(void action(SendPort responsePort),
218 {Duration timeout,
219 onTimeout()}) {
220 Completer completer = new Completer.sync();
221 SendPort port = singleCompletePort(completer,
222 callback: receiveFutureResult,
223 timeout: timeout,
224 onTimeout: onTimeout);
225 try {
226 action(port);
227 } catch (e, s) {
228 // This should not happen.
229 sendFutureResult(new Future.error(e, s), port);
230 }
231 return completer.future;
232 }
233
234 /// Completes a completer with a message created by [sendFutureResult]
235 ///
236 /// The [response] must be a message on the format sent by [sendFutureResult].
237 void completeFutureResult(var response, Completer completer) {
238 if (response.length == 2) {
239 var error = new RemoteError(response[0], response[1]);
240 completer.completeError(error, error.stackTrace);
241 } else {
242 var result = response[0];
243 completer.complete(result);
244 }
245 }
246
247
248 /// Converts a received message created by [sendFutureResult] to a future
249 /// result.
250 ///
251 /// The [response] must be a message on the format sent by [sendFutureResult].
252 Future receiveFutureResult(var response) {
253 if (response.length == 2) {
254 var error = new RemoteError(response[0], response[1]);
255 return new Future.error(error, error.stackTrace);
256 }
257 var result = response[0];
258 return new Future.value(result);
259 }
260
261 /// A [Future] and a [SendPort] that can be used to complete the future.
262 ///
263 /// The first value sent to [port] is used to complete the [result].
264 /// All following values sent to `port` are ignored.
265 class SingleResponseChannel {
266 Zone _zone;
267 final RawReceivePort _receivePort;
268 final Completer _completer;
269 final Function _callback;
270 Timer _timer;
271
272 /// Creates a response channel.
273 ///
274 /// The [result] is completed with the first value sent to [port].
275 ///
276 /// If [callback] is provided, the value sent to [port] is first passed
277 /// to `callback`, and the result of that is used to complete `result`.
278 ///
279 /// If [timeout] is provided, the future is completed after that
280 /// duration if it hasn't received a value from the port earlier.
281 /// If [throwOnTimeout] is true, the the future is completed with a
282 /// [TimeoutException] as an error if it times out.
283 /// Otherwise, if [onTimeout] is provided,
284 /// the future is completed with the result of running `onTimeout()`.
285 /// If `onTimeout` is not provided either,
286 /// the future is completed with `timeoutValue`, which defaults to `null`.
287 SingleResponseChannel({callback(value),
288 Duration timeout,
289 bool throwOnTimeout: false,
290 onTimeout(),
291 var timeoutValue})
292 : _receivePort = new RawReceivePort(),
293 _completer = new Completer.sync(),
294 _callback = callback,
295 _zone = Zone.current {
296 _receivePort.handler = _handleResponse;
297 if (timeout != null) {
298 _timer = new Timer(timeout, () {
299 // Executed as a timer event.
300 _receivePort.close();
301 if (!_completer.isCompleted) {
302 if (throwOnTimeout) {
303 _completer.completeError(
304 new TimeoutException('Timeout waiting for response', timeout));
305 } else if (onTimeout != null) {
306 _completer.complete(new Future.sync(onTimeout));
307 } else {
308 _completer.complete(timeoutValue);
309 }
310 }
311 });
312 }
313 }
314
315 /// The port expecting a value that will complete [result].
316 SendPort get port => _receivePort.sendPort;
317
318 /// Future completed by the first value sent to [port].
319 Future get result => _completer.future;
320
321 /// If the channel hasn't completed yet, interrupt it and complete the result.
322 ///
323 /// If the channel hasn't received a value yet, or timed out, it is stopped
324 /// (like by a timeout) and the [SingleResponseChannel.result]
325 /// is completed with [result].
326 void interrupt([result]) {
327 _receivePort.close();
328 _cancelTimer();
329 if (!_completer.isCompleted) {
330 // Not in event tail position, so complete the sync completer later.
331 _completer.complete(new Future.microtask(() => result));
332 }
333 }
334
335 void _cancelTimer() {
336 if (_timer != null) {
337 _timer.cancel();
338 _timer = null;
339 }
340 }
341
342 void _handleResponse(v) {
343 // Executed as a port event.
344 _receivePort.close();
345 _cancelTimer();
346 if (_callback == null) {
347 _completer.complete(v);
348 } else {
349 // The _handleResponse function is the handler of a RawReceivePort.
350 // As such, it runs in the root zone.
351 // The _callback should be run in the original zone, both because it's
352 // what the user expects, and because it may create an error that needs
353 // to be propagated to the original completer. If that completer was
354 // created in a different error zone, an error from the root zone
355 // would become uncaught.
356 _zone.run(() {
357 _completer.complete(new Future.sync(() => _callback(v)));
358 });
359 }
360 }
361 }
OLDNEW
« no previous file with comments | « packages/isolate/lib/load_balancer.dart ('k') | packages/isolate/lib/registry.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698