Index: runtime/lib/async_patch.dart |
diff --git a/runtime/lib/async_patch.dart b/runtime/lib/async_patch.dart |
index 682ee8b10a4ec2d6b8496a5f06496f428819a00e..d78534719656c37ca06fc3cdc7354a930bbeadad 100644 |
--- a/runtime/lib/async_patch.dart |
+++ b/runtime/lib/async_patch.dart |
@@ -4,3 +4,181 @@ |
import "dart:_internal"; |
+// We need to pass the value as first argument and leave the second and third |
+// arguments empty (used for error handling). |
+// See vm/ast_transformer.cc for usage. |
+Function _asyncThenWrapperHelper(continuation) { |
+ // Any function that is used as an asynchronous callback must be registered |
+ // in the current Zone. Normally, this is done by the future when a |
+ // callback is registered (for example with `.then` or `.catchError`). In our |
+ // case we want to reuse the same callback multiple times and therefore avoid |
+ // the multiple registrations. For our internal futures (`_Future`) we can |
+ // use the shortcut-version of `.then`, and skip the registration. However, |
+ // that means that the continuation must be registered by us. |
+ // |
+ // Furthermore, we know that the root-zone doesn't actually do anything and |
+ // we can therefore skip the registration call for it. |
+ // |
+ // Note, that the continuation accepts up to three arguments. If the current |
+ // zone is the root zone, we don't wrap the continuation, and a bad |
+ // `Future` implementation could potentially invoke the callback with the |
+ // wrong number of arguments. |
+ if (Zone.current == Zone.ROOT) return continuation; |
+ return Zone.current.registerUnaryCallback((x) => continuation(x, null, null)); |
+} |
+ |
+// We need to pass the exception and stack trace objects as second and third |
+// parameter to the continuation. See vm/ast_transformer.cc for usage. |
+Function _asyncErrorWrapperHelper(continuation) { |
+ // See comments of `_asyncThenWrapperHelper`. |
+ var errorCallback = (e, s) => continuation(null, e, s); |
+ if (Zone.current == Zone.ROOT) return errorCallback; |
+ return Zone.current.registerBinaryCallback(errorCallback); |
+} |
+ |
+/// Registers the [thenCallback] and [errorCallback] on the given [object]. |
+/// |
+/// If [object] is not a future, then it is wrapped into one. |
+/// |
+/// Returns the result of registering with `.then`. |
+Future _awaitHelper( |
+ var object, Function thenCallback, Function errorCallback) { |
+ if (object is! Future) { |
+ object = new _Future().._setValue(object); |
+ } else if (object is! _Future) { |
+ return object.then(thenCallback, onError: errorCallback); |
+ } |
+ // `object` is a `_Future`. |
+ // |
+ // Since the callbacks have been registered in the current zone (see |
+ // [_asyncThenWrapperHelper] and [_asyncErrorWrapperHelper]), we can avoid |
+ // another registration and directly invoke the no-zone-registration `.then`. |
+ // |
+ // We can only do this for our internal futures (the default implementation of |
+ // all futures that are constructed by the `dart:async` library). |
+ return object._thenNoZoneRegistration(thenCallback, errorCallback); |
+} |
+ |
+// _AsyncStarStreamController is used by the compiler to implement |
+// async* generator functions. |
+class _AsyncStarStreamController { |
+ StreamController controller; |
+ Function asyncStarBody; |
+ bool isAdding = false; |
+ bool onListenReceived = false; |
+ bool isScheduled = false; |
+ bool isSuspendedAtYield = false; |
+ Completer cancellationCompleter = null; |
+ |
+ Stream get stream => controller.stream; |
+ |
+ void runBody() { |
+ isScheduled = false; |
+ isSuspendedAtYield = false; |
+ asyncStarBody(); |
+ } |
+ |
+ void scheduleGenerator() { |
+ if (isScheduled || controller.isPaused || isAdding) { |
+ return; |
+ } |
+ isScheduled = true; |
+ scheduleMicrotask(runBody); |
+ } |
+ |
+ // Adds element to steam, returns true if the caller should terminate |
+ // execution of the generator. |
+ // |
+ // TODO(hausner): Per spec, the generator should be suspended before |
+ // exiting when the stream is closed. We could add a getter like this: |
+ // get isCancelled => controller.hasListener; |
+ // The generator would translate a 'yield e' statement to |
+ // controller.add(e); |
+ // suspend; |
+ // if (controller.isCancelled) return; |
+ bool add(event) { |
+ if (!onListenReceived) _fatal("yield before stream is listened to!"); |
+ if (isSuspendedAtYield) _fatal("unexpected yield"); |
+ // If stream is cancelled, tell caller to exit the async generator. |
+ if (!controller.hasListener) { |
+ return true; |
+ } |
+ controller.add(event); |
+ scheduleGenerator(); |
+ isSuspendedAtYield = true; |
+ return false; |
+ } |
+ |
+ // Adds the elements of stream into this controller's stream. |
+ // The generator will be scheduled again when all of the |
+ // elements of the added stream have been consumed. |
+ // Returns true if the caller should terminate |
+ // execution of the generator. |
+ bool addStream(Stream stream) { |
+ if (!onListenReceived) _fatal("yield before stream is listened to!"); |
+ // If stream is cancelled, tell caller to exit the async generator. |
+ if (!controller.hasListener) return true; |
+ isAdding = true; |
+ var whenDoneAdding = |
+ controller.addStream(stream as Stream, cancelOnError: false); |
+ whenDoneAdding.then((_) { |
+ isAdding = false; |
+ scheduleGenerator(); |
+ }); |
+ return false; |
+ } |
+ |
+ void addError(error, stackTrace) { |
+ if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { |
+ // If the stream has been cancelled, complete the cancellation future |
+ // with the error. |
+ cancellationCompleter.completeError(error, stackTrace); |
+ return; |
+ } |
+ // If stream is cancelled, tell caller to exit the async generator. |
+ if (!controller.hasListener) return; |
+ controller.addError(error, stackTrace); |
+ // No need to schedule the generator body here. This code is only |
+ // called from the catch clause of the implicit try-catch-finally |
+ // around the generator body. That is, we are on the error path out |
+ // of the generator and do not need to run the generator again. |
+ } |
+ |
+ close() { |
+ if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { |
+ // If the stream has been cancelled, complete the cancellation future |
+ // with the error. |
+ cancellationCompleter.complete(); |
+ } |
+ controller.close(); |
+ } |
+ |
+ _AsyncStarStreamController(this.asyncStarBody) { |
+ controller = new StreamController(onListen: this.onListen, |
+ onResume: this.onResume, |
+ onCancel: this.onCancel); |
+ } |
+ |
+ onListen() { |
+ assert(!onListenReceived); |
+ onListenReceived = true; |
+ scheduleGenerator(); |
+ } |
+ |
+ onResume() { |
+ if (isSuspendedAtYield) { |
+ scheduleGenerator(); |
+ } |
+ } |
+ |
+ onCancel() { |
+ if (controller.isClosed) { |
+ return null; |
+ } |
+ if (cancellationCompleter == null) { |
+ cancellationCompleter = new Completer(); |
+ scheduleGenerator(); |
+ } |
+ return cancellationCompleter.future; |
+ } |
+} |