Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1910)

Unified Diff: packages/isolate/lib/ports.dart

Issue 2990843002: Removed fixed dependencies (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/isolate/lib/load_balancer.dart ('k') | packages/isolate/lib/registry.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/isolate/lib/ports.dart
diff --git a/packages/isolate/lib/ports.dart b/packages/isolate/lib/ports.dart
new file mode 100644
index 0000000000000000000000000000000000000000..4a185fe9e98d117c6d9ed8fbf0d48f8955d62c73
--- /dev/null
+++ b/packages/isolate/lib/ports.dart
@@ -0,0 +1,361 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// Utility functions for setting up ports and sending data.
+///
+/// This library contains a number of functions that handle the
+/// boiler-plate of setting up a receive port and receiving a
+/// single message on the port.
+///
+/// There are different functions that offer different ways to
+/// handle the incoming message.
+///
+/// The simplest function, [singleCallbackPort], takes a callback
+/// and returns a port, and then calls the callback for the first
+/// message sent on the port.
+///
+/// Other functions intercept the returned value and either
+/// does something with it, or puts it into a [Future] or [Completer].
+library isolate.ports;
+
+import "dart:async";
+import "dart:isolate";
+
+import "src/lists.dart";
+
+/// Create a [SendPort] that accepts only one message.
+///
+/// The [callback] function is called once, with the first message
+/// received by the receive port.
+/// All further messages are ignored.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the `callback` function
+/// is called once with the [timeoutValue] instead.
+///
+/// Returns the `SendPort` expecting the single message.
+///
+/// Equivalent to:
+///
+/// (new ReceivePort()
+/// ..first.timeout(duration, () => timeoutValue).then(callback))
+/// .sendPort
+SendPort singleCallbackPort(void callback(response),
+ {Duration timeout,
+ var timeoutValue}) {
+ RawReceivePort responsePort = new RawReceivePort();
+ Zone zone = Zone.current;
+ callback = zone.registerUnaryCallback(callback);
+ var timer;
+ responsePort.handler = (response) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ zone.runUnary(callback, response);
+ };
+ if (timeout != null) {
+ timer = new Timer(timeout, () {
+ responsePort.close();
+ callback(timeoutValue);
+ });
+ }
+ return responsePort.sendPort;
+}
+
+/// Create a [SendPort] that accepts only one message.
+///
+/// When the first message is received, the [callback] function is
+/// called with the message as argument,
+/// and the [completer] is completed with the result of that call.
+/// All further messages are ignored.
+///
+/// If `callback` is omitted, it defaults to an identity function.
+/// The `callback` call may return a future, and the completer will
+/// wait for that future to complete.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the [onTimeout] is called,
+/// and `completer` is completed with the result of that call
+/// instead.
+/// The [callback] function will not be interrupted by the time-out,
+/// as long as the initial message is received in time.
+/// If `onTimeout` is omitted, it defaults to completing the `completer` with
+/// a [TimeoutException].
+///
+/// The [completer] may be a synchronous completer. It is only
+/// completed in response to another event, either a port message or a timer.
+///
+/// Returns the `SendPort` expecting the single message.
+SendPort singleCompletePort(Completer completer,
+ {callback(message),
+ Duration timeout,
+ onTimeout()}) {
+ if (callback == null && timeout == null) {
+ return singleCallbackPort(completer.complete);
+ }
+ RawReceivePort responsePort = new RawReceivePort();
+ var timer;
+ if (callback == null) {
+ responsePort.handler = (response) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ completer.complete(response);
+ };
+ } else {
+ Zone zone = Zone.current;
+ Function action = zone.registerUnaryCallback((response) {
+ completer.complete(new Future.sync(() => callback(response)));
+ });
+ responsePort.handler = (response) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ zone.runUnary(action, response);
+ };
+ }
+ if (timeout != null) {
+ timer = new Timer(timeout, () {
+ responsePort.close();
+ if (onTimeout != null) {
+ completer.complete(new Future.sync(onTimeout));
+ } else {
+ completer.completeError(
+ new TimeoutException("Future not completed", timeout));
+ }
+ });
+ }
+ return responsePort.sendPort;
+}
+
+/// Creates a [Future], and a [SendPort] that can be used to complete that
+/// future.
+///
+/// Calls [action] with the response `SendPort`, then waits for someone
+/// to send a value on that port
+/// The returned `Future` is completed with the value sent on the port.
+///
+/// If [action] throws, which it shouldn't,
+/// the returned future is completed with that error.
+/// Any return value of `action` is ignored, and if it is asynchronous,
+/// it should handle its own errors.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the [timeoutValue] used
+/// as the returned future's value instead.
+///
+/// If you want a timeout on the returned future, it's recommended to
+/// use the [timeout] parameter, and not [Future.timeout] on the result.
+/// The `Future` method won't be able to close the underlying [ReceivePort].
+Future singleResponseFuture(void action(SendPort responsePort),
+ {Duration timeout,
+ var timeoutValue}) {
+ Completer completer = new Completer.sync();
+ RawReceivePort responsePort = new RawReceivePort();
+ Timer timer;
+ Zone zone = Zone.current;
+ responsePort.handler = (v) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ zone.run(() {
+ completer.complete(v);
+ });
+ };
+ if (timeout != null) {
+ timer = new Timer(timeout, () {
+ responsePort.close();
+ completer.complete(timeoutValue);
+ });
+ }
+ try {
+ action(responsePort.sendPort);
+ } catch (e, s) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ // Delay completion because completer is sync.
+ scheduleMicrotask(() {
+ completer.completeError(e, s);
+ });
+ }
+ return completer.future;
+}
+
+
+/// Send the result of a future, either value or error, as a message.
+///
+/// The result of [future] is sent on [resultPort] in a form expected by
+/// either [receiveFutureResult], [completeFutureResult], or
+/// by the port of [singleResultFuture].
+void sendFutureResult(Future future, SendPort resultPort) {
+ future.then(
+ (v) { resultPort.send(list1(v));
+ }, onError: (e, s) {
+ resultPort.send(list2("$e", "$s"));
+ });
+}
+
+
+/// Creates a [Future], and a [SendPort] that can be used to complete that
+/// future.
+///
+/// Calls [action] with the response `SendPort`, then waits for someone
+/// to send a future result on that port using [sendFutureResult].
+/// The returned `Future` is completed with the future result sent on the port.
+///
+/// If [action] throws, which it shouldn't,
+/// the returned future is completed with that error,
+/// unless someone manages to send a message on the port before `action` throws.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the [onTimeout] is called,
+/// and the future is completed with the result of that call
+/// instead.
+/// If `onTimeout` is omitted, it defaults to throwing
+/// a [TimeoutException].
+Future singleResultFuture(void action(SendPort responsePort),
+ {Duration timeout,
+ onTimeout()}) {
+ Completer completer = new Completer.sync();
+ SendPort port = singleCompletePort(completer,
+ callback: receiveFutureResult,
+ timeout: timeout,
+ onTimeout: onTimeout);
+ try {
+ action(port);
+ } catch (e, s) {
+ // This should not happen.
+ sendFutureResult(new Future.error(e, s), port);
+ }
+ return completer.future;
+}
+
+/// Completes a completer with a message created by [sendFutureResult]
+///
+/// The [response] must be a message on the format sent by [sendFutureResult].
+void completeFutureResult(var response, Completer completer) {
+ if (response.length == 2) {
+ var error = new RemoteError(response[0], response[1]);
+ completer.completeError(error, error.stackTrace);
+ } else {
+ var result = response[0];
+ completer.complete(result);
+ }
+}
+
+
+/// Converts a received message created by [sendFutureResult] to a future
+/// result.
+///
+/// The [response] must be a message on the format sent by [sendFutureResult].
+Future receiveFutureResult(var response) {
+ if (response.length == 2) {
+ var error = new RemoteError(response[0], response[1]);
+ return new Future.error(error, error.stackTrace);
+ }
+ var result = response[0];
+ return new Future.value(result);
+}
+
+/// A [Future] and a [SendPort] that can be used to complete the future.
+///
+/// The first value sent to [port] is used to complete the [result].
+/// All following values sent to `port` are ignored.
+class SingleResponseChannel {
+ Zone _zone;
+ final RawReceivePort _receivePort;
+ final Completer _completer;
+ final Function _callback;
+ Timer _timer;
+
+ /// Creates a response channel.
+ ///
+ /// The [result] is completed with the first value sent to [port].
+ ///
+ /// If [callback] is provided, the value sent to [port] is first passed
+ /// to `callback`, and the result of that is used to complete `result`.
+ ///
+ /// If [timeout] is provided, the future is completed after that
+ /// duration if it hasn't received a value from the port earlier.
+ /// If [throwOnTimeout] is true, the the future is completed with a
+ /// [TimeoutException] as an error if it times out.
+ /// Otherwise, if [onTimeout] is provided,
+ /// the future is completed with the result of running `onTimeout()`.
+ /// If `onTimeout` is not provided either,
+ /// the future is completed with `timeoutValue`, which defaults to `null`.
+ SingleResponseChannel({callback(value),
+ Duration timeout,
+ bool throwOnTimeout: false,
+ onTimeout(),
+ var timeoutValue})
+ : _receivePort = new RawReceivePort(),
+ _completer = new Completer.sync(),
+ _callback = callback,
+ _zone = Zone.current {
+ _receivePort.handler = _handleResponse;
+ if (timeout != null) {
+ _timer = new Timer(timeout, () {
+ // Executed as a timer event.
+ _receivePort.close();
+ if (!_completer.isCompleted) {
+ if (throwOnTimeout) {
+ _completer.completeError(
+ new TimeoutException('Timeout waiting for response', timeout));
+ } else if (onTimeout != null) {
+ _completer.complete(new Future.sync(onTimeout));
+ } else {
+ _completer.complete(timeoutValue);
+ }
+ }
+ });
+ }
+ }
+
+ /// The port expecting a value that will complete [result].
+ SendPort get port => _receivePort.sendPort;
+
+ /// Future completed by the first value sent to [port].
+ Future get result => _completer.future;
+
+ /// If the channel hasn't completed yet, interrupt it and complete the result.
+ ///
+ /// If the channel hasn't received a value yet, or timed out, it is stopped
+ /// (like by a timeout) and the [SingleResponseChannel.result]
+ /// is completed with [result].
+ void interrupt([result]) {
+ _receivePort.close();
+ _cancelTimer();
+ if (!_completer.isCompleted) {
+ // Not in event tail position, so complete the sync completer later.
+ _completer.complete(new Future.microtask(() => result));
+ }
+ }
+
+ void _cancelTimer() {
+ if (_timer != null) {
+ _timer.cancel();
+ _timer = null;
+ }
+ }
+
+ void _handleResponse(v) {
+ // Executed as a port event.
+ _receivePort.close();
+ _cancelTimer();
+ if (_callback == null) {
+ _completer.complete(v);
+ } else {
+ // The _handleResponse function is the handler of a RawReceivePort.
+ // As such, it runs in the root zone.
+ // The _callback should be run in the original zone, both because it's
+ // what the user expects, and because it may create an error that needs
+ // to be propagated to the original completer. If that completer was
+ // created in a different error zone, an error from the root zone
+ // would become uncaught.
+ _zone.run(() {
+ _completer.complete(new Future.sync(() => _callback(v)));
+ });
+ }
+ }
+}
« no previous file with comments | « packages/isolate/lib/load_balancer.dart ('k') | packages/isolate/lib/registry.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698