Index: sdk/lib/_internal/compiler/js_lib/js_helper.dart |
diff --git a/sdk/lib/_internal/compiler/js_lib/js_helper.dart b/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
index 163833b6ffadbac2da7a19366980a6bcf9041896..2248dc6ef575f2e5c520a19db96778f89f5b9ee7 100644 |
--- a/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
+++ b/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
@@ -3642,10 +3642,11 @@ dynamic asyncHelper(dynamic object, |
future.then(_wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
async_error_codes.SUCCESS), |
onError: (dynamic error, StackTrace stackTrace) { |
- ExceptionAndStackTrace wrapped = |
+ ExceptionAndStackTrace wrappedException = |
new ExceptionAndStackTrace(error, stackTrace); |
- return _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
- async_error_codes.ERROR)(wrapped); |
+ Function wrapped =_wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
+ async_error_codes.ERROR); |
+ wrapped(wrappedException); |
}); |
return completer.future; |
} |
@@ -3714,7 +3715,7 @@ void asyncStarHelper(dynamic object, |
AsyncStarStreamController controller) { |
if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) { |
// This happens on return from the async* function. |
- if (controller.cancelationCompleter != null) { |
+ if (controller.isCanceled) { |
controller.cancelationCompleter.complete(); |
} else { |
controller.close(); |
@@ -3722,7 +3723,7 @@ void asyncStarHelper(dynamic object, |
return; |
} else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) { |
// The error is a js-error. |
- if (controller.cancelationCompleter != null) { |
+ if (controller.isCanceled) { |
controller.cancelationCompleter.completeError( |
unwrapException(object), |
getTraceFromException(object)); |
@@ -3735,34 +3736,44 @@ void asyncStarHelper(dynamic object, |
} |
if (object is IterationMarker) { |
- if (controller.cancelationCompleter != null) { |
- _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
- async_error_codes.STREAM_WAS_CANCELED)(null); |
+ if (controller.isCanceled) { |
+ Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
+ async_error_codes.STREAM_WAS_CANCELED); |
+ wrapped(null); |
return; |
} |
if (object.state == IterationMarker.YIELD_SINGLE) { |
controller.add(object.value); |
- // If the controller is paused we stop producing more values. |
- if (controller.isPaused) { |
- return; |
- } |
- // TODO(sigurdm): We should not suspend here according to the spec. |
+ |
scheduleMicrotask(() { |
- _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
- async_error_codes.SUCCESS) |
- (null); |
+ if (controller.isPaused) { |
+ // We only suspend the thread inside the microtask in order to allow |
+ // listeners on the output stream to pause in response to the just |
+ // output value, and have the stream immediately stop producing. |
+ controller.isSuspended = true; |
+ return; |
+ } |
+ Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
+ async_error_codes.SUCCESS); |
+ wrapped(null); |
}); |
return; |
} else if (object.state == IterationMarker.YIELD_STAR) { |
Stream stream = object.value; |
- controller.isAdding = true; |
// Errors of [stream] are passed though to the main stream. (see |
- // [AsyncStreamController.addStream]. |
+ // [AsyncStreamController.addStream]). |
// TODO(sigurdm): The spec is not very clear here. Clarify with Gilad. |
controller.addStream(stream).then((_) { |
- controller.isAdding = false; |
- _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
- async_error_codes.SUCCESS)(null); |
+ // No check for isPaused here because the spec 17.16.2 only |
+ // demands checks *before* each element in [stream] not after the last |
+ // one. On the other hand we check for isCanceled, as that check happens |
+ // after insertion of each element. |
+ int errorCode = controller.isCanceled |
+ ? async_error_codes.STREAM_WAS_CANCELED |
+ : async_error_codes.SUCCESS; |
+ Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
+ errorCode); |
+ wrapped(null); |
}); |
return; |
} |
@@ -3772,11 +3783,11 @@ void asyncStarHelper(dynamic object, |
future.then(_wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
async_error_codes.SUCCESS), |
onError: (error, StackTrace stackTrace) { |
- ExceptionAndStackTrace wrapped = |
+ ExceptionAndStackTrace wrappedException = |
new ExceptionAndStackTrace(error, stackTrace); |
- return _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
- async_error_codes.ERROR) |
- (wrapped); |
+ Function wrapped = _wrapJsFunctionForAsync( |
+ bodyFunctionOrErrorCode, async_error_codes.ERROR); |
+ return wrapped(wrappedException); |
}); |
} |
@@ -3784,46 +3795,79 @@ Stream streamOfController(AsyncStarStreamController controller) { |
return controller.stream; |
} |
-/// A wrapper around a [StreamController] that remembers if that controller |
-/// got a cancel. |
+/// A wrapper around a [StreamController] that keeps track of the state of |
+/// the execution of an async* function. |
+/// It can be in 1 of 3 states: |
/// |
-/// Also has a subSubscription that when not null will provide events for the |
-/// stream, and will be paused and resumed along with this controller. |
+/// - running/scheduled |
+/// - suspended |
+/// - canceled |
+/// |
+/// If yielding while the subscription is paused it will become suspended. And |
+/// only resume after the subscription is resumed or canceled. |
class AsyncStarStreamController { |
StreamController controller; |
Stream get stream => controller.stream; |
+ |
+ /// True when the async* function has yielded while being paused. |
+ /// When true execution will only resume after a `onResume` or `onCancel` |
+ /// event. |
+ bool isSuspended = false; |
+ |
+ bool get isPaused => controller.isPaused; |
+ |
Completer cancelationCompleter = null; |
+ |
+ /// True after the StreamSubscription has been cancelled. |
+ /// When this is true, errors thrown from the async* body should go to the |
+ /// [cancelationCompleter] instead of adding them to [controller], and |
+ /// returning from the async function should complete [cancelationCompleter]. |
bool get isCanceled => cancelationCompleter != null; |
- bool isAdding = false; |
- bool isPaused = false; |
+ |
add(event) => controller.add(event); |
+ |
addStream(Stream stream) { |
return controller.addStream(stream, cancelOnError: false); |
} |
+ |
addError(error, stackTrace) => controller.addError(error, stackTrace); |
+ |
close() => controller.close(); |
AsyncStarStreamController(body) { |
+ |
+ _resumeBody() { |
+ scheduleMicrotask(() { |
+ Function wrapped = |
+ _wrapJsFunctionForAsync(body, async_error_codes.SUCCESS); |
+ wrapped(null); |
+ }); |
+ } |
+ |
controller = new StreamController( |
onListen: () { |
- scheduleMicrotask(() { |
- Function wrapped = _wrapJsFunctionForAsync(body, |
- async_error_codes.SUCCESS); |
- wrapped(null); |
- }); |
- }, |
- onPause: () { |
- isPaused = true; |
+ _resumeBody(); |
}, onResume: () { |
- isPaused = false; |
- if (!isAdding) { |
- asyncStarHelper(null, body, this); |
+ // Only schedule again if the async* function actually is suspended. |
+ // Resume directly instead of scheduling, so that the sequence |
+ // `pause-resume-pause` will result in one extra event produced. |
+ if (isSuspended) { |
+ isSuspended = false; |
+ _resumeBody(); |
} |
}, onCancel: () { |
+ // If the async* is finished we ignore cancel events. |
if (!controller.isClosed) { |
cancelationCompleter = new Completer(); |
- if (isPaused) asyncStarHelper(null, body, this); |
- |
+ if (isSuspended) { |
+ // Resume the suspended async* function to run finalizers. |
+ isSuspended = false; |
+ scheduleMicrotask(() { |
+ Function wrapped =_wrapJsFunctionForAsync(body, |
+ async_error_codes.STREAM_WAS_CANCELED); |
+ wrapped(null); |
+ }); |
+ } |
return cancelationCompleter.future; |
} |
}); |