Index: sdk/lib/_internal/compiler/js_lib/js_helper.dart |
diff --git a/sdk/lib/_internal/compiler/js_lib/js_helper.dart b/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
index 163833b6ffadbac2da7a19366980a6bcf9041896..5d0c45230dc6667a6aa329d42db4c776966a9d0f 100644 |
--- a/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
+++ b/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
@@ -3714,7 +3714,7 @@ void asyncStarHelper(dynamic object, |
AsyncStarStreamController controller) { |
if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) { |
// This happens on return from the async* function. |
- if (controller.cancelationCompleter != null) { |
+ if (controller.isCanceled) { |
controller.cancelationCompleter.complete(); |
} else { |
controller.close(); |
@@ -3722,7 +3722,7 @@ void asyncStarHelper(dynamic object, |
return; |
} else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) { |
// The error is a js-error. |
- if (controller.cancelationCompleter != null) { |
+ if (controller.isCanceled) { |
controller.cancelationCompleter.completeError( |
unwrapException(object), |
getTraceFromException(object)); |
@@ -3737,17 +3737,21 @@ void asyncStarHelper(dynamic object, |
if (object is IterationMarker) { |
if (controller.cancelationCompleter != null) { |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
if (controller.isCanceled) {
?
sigurdm
2015/04/10 08:13:24
Done.
|
_wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
- async_error_codes.STREAM_WAS_CANCELED)(null); |
+ async_error_codes.STREAM_WAS_CANCELED) |
+ (null); |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
Odd linebreak.
Could (null) be on the previous lin
sigurdm
2015/04/10 08:13:25
It is to emphasize the call of the returned functi
|
return; |
} |
if (object.state == IterationMarker.YIELD_SINGLE) { |
controller.add(object.value); |
- // If the controller is paused we stop producing more values. |
- if (controller.isPaused) { |
- return; |
- } |
- // TODO(sigurdm): We should not suspend here according to the spec. |
+ |
scheduleMicrotask(() { |
+ if (controller.isPaused) { |
+ // We only suspend the thread inside the microtask in order to allow |
+ // listeners on the output stream to pause in response to the just |
+ // output value, and have the stream immediately stop producing. |
+ controller.isSuspended = true; |
+ return; |
+ } |
_wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
async_error_codes.SUCCESS) |
(null); |
@@ -3755,14 +3759,15 @@ void asyncStarHelper(dynamic object, |
return; |
} else if (object.state == IterationMarker.YIELD_STAR) { |
Stream stream = object.value; |
- controller.isAdding = true; |
// Errors of [stream] are passed though to the main stream. (see |
// [AsyncStreamController.addStream]. |
// TODO(sigurdm): The spec is not very clear here. Clarify with Gilad. |
controller.addStream(stream).then((_) { |
- controller.isAdding = false; |
+ // We do not check for isPaused here because the spec 17.16.2 only |
+ // demands checks for each element in [stream] not after the last one. |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
... demands checks *before* each element ...
sigurdm
2015/04/10 08:13:25
Done.
|
_wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
- async_error_codes.SUCCESS)(null); |
+ async_error_codes.SUCCESS) |
+ (null); |
}); |
return; |
} |
@@ -3784,46 +3789,74 @@ Stream streamOfController(AsyncStarStreamController controller) { |
return controller.stream; |
} |
-/// A wrapper around a [StreamController] that remembers if that controller |
-/// got a cancel. |
+/// A wrapper around a [StreamController] that keeps track of the state of |
+/// the execution of an async* function. |
+/// It can be in 1 of 3 states: |
+/// |
+/// - running/scheduled |
+/// - suspended |
+/// - canceled |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
I guess the "done" state is implicit.
sigurdm
2015/04/10 08:13:25
Yes - it corresponds to controller.isClosed
|
/// |
-/// Also has a subSubscription that when not null will provide events for the |
-/// stream, and will be paused and resumed along with this controller. |
+/// If yielding while the subscription is paused it will become suspended. And |
+/// only resume after the subscription is resumed or canceled. |
class AsyncStarStreamController { |
StreamController controller; |
Stream get stream => controller.stream; |
+ |
+ /// True when the async* function has yielded while being paused. |
+ /// Or if it has yielded a |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
incomplete sentence.
sigurdm
2015/04/10 08:13:25
Thanks
|
+ /// When true execution will only resume after a `onResume` or `onCancel` |
+ /// event. |
+ bool isSuspended = false; |
+ |
+ bool get isPaused => controller.isPaused; |
+ |
Completer cancelationCompleter = null; |
+ |
+ /// True after the StreamSubscription has been cancelled. |
+ /// When this is true, errors thrown from the async* body should go to the |
+ /// [cancelationCompleter] instead of adding them to [controller], and |
+ /// returning from the async function should complete [cancelationCompleter]. |
bool get isCanceled => cancelationCompleter != null; |
- bool isAdding = false; |
- bool isPaused = false; |
+ |
add(event) => controller.add(event); |
+ |
addStream(Stream stream) { |
return controller.addStream(stream, cancelOnError: false); |
} |
+ |
addError(error, stackTrace) => controller.addError(error, stackTrace); |
+ |
close() => controller.close(); |
AsyncStarStreamController(body) { |
+ |
+ _resumeBody() { |
+ Function wrapped = |
+ _wrapJsFunctionForAsync(body, async_error_codes.SUCCESS); |
+ wrapped(null); |
+ } |
+ |
controller = new StreamController( |
onListen: () { |
- scheduleMicrotask(() { |
- Function wrapped = _wrapJsFunctionForAsync(body, |
- async_error_codes.SUCCESS); |
- wrapped(null); |
- }); |
- }, |
- onPause: () { |
- isPaused = true; |
+ scheduleMicrotask(_resumeBody); |
}, onResume: () { |
- isPaused = false; |
- if (!isAdding) { |
- asyncStarHelper(null, body, this); |
+ // Only schedule again if the async* function actually os suspended. |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
os -> is
sigurdm
2015/04/10 08:13:25
Done.
|
+ // Resume directly instead of scheduling, so that the sequence |
+ // `pause-resume-pause` will result in one extra event produced. |
+ if (isSuspended) { |
+ isSuspended = false; |
+ _resumeBody(); |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
I think you still need to do a scheduleMicrotask (
sigurdm
2015/04/10 08:13:25
Ah - now I understand what you meant. I thought it
|
} |
}, onCancel: () { |
+ // If the async* is finished we ignore cancel events. |
if (!controller.isClosed) { |
cancelationCompleter = new Completer(); |
- if (isPaused) asyncStarHelper(null, body, this); |
- |
+ if (isSuspended) { |
+ // Resume the suspended async* function to run finalizers. |
+ isSuspended = false; |
+ scheduleMicrotask(_resumeBody); |
Lasse Reichstein Nielsen
2015/04/09 12:22:09
Shouldn't the body be called with async_error_code
sigurdm
2015/04/10 08:13:25
Good catch - Done
|
+ } |
return cancelationCompleter.future; |
} |
}); |