| Index: sdk/lib/_internal/compiler/js_lib/js_helper.dart
|
| diff --git a/sdk/lib/_internal/compiler/js_lib/js_helper.dart b/sdk/lib/_internal/compiler/js_lib/js_helper.dart
|
| index 163833b6ffadbac2da7a19366980a6bcf9041896..2248dc6ef575f2e5c520a19db96778f89f5b9ee7 100644
|
| --- a/sdk/lib/_internal/compiler/js_lib/js_helper.dart
|
| +++ b/sdk/lib/_internal/compiler/js_lib/js_helper.dart
|
| @@ -3642,10 +3642,11 @@ dynamic asyncHelper(dynamic object,
|
| future.then(_wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| async_error_codes.SUCCESS),
|
| onError: (dynamic error, StackTrace stackTrace) {
|
| - ExceptionAndStackTrace wrapped =
|
| + ExceptionAndStackTrace wrappedException =
|
| new ExceptionAndStackTrace(error, stackTrace);
|
| - return _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| - async_error_codes.ERROR)(wrapped);
|
| + Function wrapped =_wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| + async_error_codes.ERROR);
|
| + wrapped(wrappedException);
|
| });
|
| return completer.future;
|
| }
|
| @@ -3714,7 +3715,7 @@ void asyncStarHelper(dynamic object,
|
| AsyncStarStreamController controller) {
|
| if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) {
|
| // This happens on return from the async* function.
|
| - if (controller.cancelationCompleter != null) {
|
| + if (controller.isCanceled) {
|
| controller.cancelationCompleter.complete();
|
| } else {
|
| controller.close();
|
| @@ -3722,7 +3723,7 @@ void asyncStarHelper(dynamic object,
|
| return;
|
| } else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) {
|
| // The error is a js-error.
|
| - if (controller.cancelationCompleter != null) {
|
| + if (controller.isCanceled) {
|
| controller.cancelationCompleter.completeError(
|
| unwrapException(object),
|
| getTraceFromException(object));
|
| @@ -3735,34 +3736,44 @@ void asyncStarHelper(dynamic object,
|
| }
|
|
|
| if (object is IterationMarker) {
|
| - if (controller.cancelationCompleter != null) {
|
| - _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| - async_error_codes.STREAM_WAS_CANCELED)(null);
|
| + if (controller.isCanceled) {
|
| + Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| + async_error_codes.STREAM_WAS_CANCELED);
|
| + wrapped(null);
|
| return;
|
| }
|
| if (object.state == IterationMarker.YIELD_SINGLE) {
|
| controller.add(object.value);
|
| - // If the controller is paused we stop producing more values.
|
| - if (controller.isPaused) {
|
| - return;
|
| - }
|
| - // TODO(sigurdm): We should not suspend here according to the spec.
|
| +
|
| scheduleMicrotask(() {
|
| - _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| - async_error_codes.SUCCESS)
|
| - (null);
|
| + if (controller.isPaused) {
|
| + // We only suspend the thread inside the microtask in order to allow
|
| + // listeners on the output stream to pause in response to the just
|
| + // output value, and have the stream immediately stop producing.
|
| + controller.isSuspended = true;
|
| + return;
|
| + }
|
| + Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| + async_error_codes.SUCCESS);
|
| + wrapped(null);
|
| });
|
| return;
|
| } else if (object.state == IterationMarker.YIELD_STAR) {
|
| Stream stream = object.value;
|
| - controller.isAdding = true;
|
| // Errors of [stream] are passed though to the main stream. (see
|
| - // [AsyncStreamController.addStream].
|
| + // [AsyncStreamController.addStream]).
|
| // TODO(sigurdm): The spec is not very clear here. Clarify with Gilad.
|
| controller.addStream(stream).then((_) {
|
| - controller.isAdding = false;
|
| - _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| - async_error_codes.SUCCESS)(null);
|
| + // No check for isPaused here because the spec 17.16.2 only
|
| + // demands checks *before* each element in [stream] not after the last
|
| + // one. On the other hand we check for isCanceled, as that check happens
|
| + // after insertion of each element.
|
| + int errorCode = controller.isCanceled
|
| + ? async_error_codes.STREAM_WAS_CANCELED
|
| + : async_error_codes.SUCCESS;
|
| + Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| + errorCode);
|
| + wrapped(null);
|
| });
|
| return;
|
| }
|
| @@ -3772,11 +3783,11 @@ void asyncStarHelper(dynamic object,
|
| future.then(_wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| async_error_codes.SUCCESS),
|
| onError: (error, StackTrace stackTrace) {
|
| - ExceptionAndStackTrace wrapped =
|
| + ExceptionAndStackTrace wrappedException =
|
| new ExceptionAndStackTrace(error, stackTrace);
|
| - return _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
|
| - async_error_codes.ERROR)
|
| - (wrapped);
|
| + Function wrapped = _wrapJsFunctionForAsync(
|
| + bodyFunctionOrErrorCode, async_error_codes.ERROR);
|
| + return wrapped(wrappedException);
|
| });
|
| }
|
|
|
| @@ -3784,46 +3795,79 @@ Stream streamOfController(AsyncStarStreamController controller) {
|
| return controller.stream;
|
| }
|
|
|
| -/// A wrapper around a [StreamController] that remembers if that controller
|
| -/// got a cancel.
|
| +/// A wrapper around a [StreamController] that keeps track of the state of
|
| +/// the execution of an async* function.
|
| +/// It can be in 1 of 3 states:
|
| ///
|
| -/// Also has a subSubscription that when not null will provide events for the
|
| -/// stream, and will be paused and resumed along with this controller.
|
| +/// - running/scheduled
|
| +/// - suspended
|
| +/// - canceled
|
| +///
|
| +/// If yielding while the subscription is paused it will become suspended. And
|
| +/// only resume after the subscription is resumed or canceled.
|
| class AsyncStarStreamController {
|
| StreamController controller;
|
| Stream get stream => controller.stream;
|
| +
|
| + /// True when the async* function has yielded while being paused.
|
| + /// When true execution will only resume after a `onResume` or `onCancel`
|
| + /// event.
|
| + bool isSuspended = false;
|
| +
|
| + bool get isPaused => controller.isPaused;
|
| +
|
| Completer cancelationCompleter = null;
|
| +
|
| + /// True after the StreamSubscription has been cancelled.
|
| + /// When this is true, errors thrown from the async* body should go to the
|
| + /// [cancelationCompleter] instead of adding them to [controller], and
|
| + /// returning from the async function should complete [cancelationCompleter].
|
| bool get isCanceled => cancelationCompleter != null;
|
| - bool isAdding = false;
|
| - bool isPaused = false;
|
| +
|
| add(event) => controller.add(event);
|
| +
|
| addStream(Stream stream) {
|
| return controller.addStream(stream, cancelOnError: false);
|
| }
|
| +
|
| addError(error, stackTrace) => controller.addError(error, stackTrace);
|
| +
|
| close() => controller.close();
|
|
|
| AsyncStarStreamController(body) {
|
| +
|
| + _resumeBody() {
|
| + scheduleMicrotask(() {
|
| + Function wrapped =
|
| + _wrapJsFunctionForAsync(body, async_error_codes.SUCCESS);
|
| + wrapped(null);
|
| + });
|
| + }
|
| +
|
| controller = new StreamController(
|
| onListen: () {
|
| - scheduleMicrotask(() {
|
| - Function wrapped = _wrapJsFunctionForAsync(body,
|
| - async_error_codes.SUCCESS);
|
| - wrapped(null);
|
| - });
|
| - },
|
| - onPause: () {
|
| - isPaused = true;
|
| + _resumeBody();
|
| }, onResume: () {
|
| - isPaused = false;
|
| - if (!isAdding) {
|
| - asyncStarHelper(null, body, this);
|
| + // Only schedule again if the async* function actually is suspended.
|
| + // Resume directly instead of scheduling, so that the sequence
|
| + // `pause-resume-pause` will result in one extra event produced.
|
| + if (isSuspended) {
|
| + isSuspended = false;
|
| + _resumeBody();
|
| }
|
| }, onCancel: () {
|
| + // If the async* is finished we ignore cancel events.
|
| if (!controller.isClosed) {
|
| cancelationCompleter = new Completer();
|
| - if (isPaused) asyncStarHelper(null, body, this);
|
| -
|
| + if (isSuspended) {
|
| + // Resume the suspended async* function to run finalizers.
|
| + isSuspended = false;
|
| + scheduleMicrotask(() {
|
| + Function wrapped =_wrapJsFunctionForAsync(body,
|
| + async_error_codes.STREAM_WAS_CANCELED);
|
| + wrapped(null);
|
| + });
|
| + }
|
| return cancelationCompleter.future;
|
| }
|
| });
|
|
|