Chromium Code Reviews| Index: sdk/lib/_internal/compiler/js_lib/js_helper.dart |
| diff --git a/sdk/lib/_internal/compiler/js_lib/js_helper.dart b/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
| index 163833b6ffadbac2da7a19366980a6bcf9041896..5d0c45230dc6667a6aa329d42db4c776966a9d0f 100644 |
| --- a/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
| +++ b/sdk/lib/_internal/compiler/js_lib/js_helper.dart |
| @@ -3714,7 +3714,7 @@ void asyncStarHelper(dynamic object, |
| AsyncStarStreamController controller) { |
| if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) { |
| // This happens on return from the async* function. |
| - if (controller.cancelationCompleter != null) { |
| + if (controller.isCanceled) { |
| controller.cancelationCompleter.complete(); |
| } else { |
| controller.close(); |
| @@ -3722,7 +3722,7 @@ void asyncStarHelper(dynamic object, |
| return; |
| } else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) { |
| // The error is a js-error. |
| - if (controller.cancelationCompleter != null) { |
| + if (controller.isCanceled) { |
| controller.cancelationCompleter.completeError( |
| unwrapException(object), |
| getTraceFromException(object)); |
| @@ -3737,17 +3737,21 @@ void asyncStarHelper(dynamic object, |
| if (object is IterationMarker) { |
| if (controller.cancelationCompleter != null) { |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
if (controller.isCanceled) {
?
sigurdm
2015/04/10 08:13:24
Done.
|
| _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
| - async_error_codes.STREAM_WAS_CANCELED)(null); |
| + async_error_codes.STREAM_WAS_CANCELED) |
| + (null); |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
Odd linebreak.
Could (null) be on the previous lin
sigurdm
2015/04/10 08:13:25
It is to emphasize the call of the returned functi
|
| return; |
| } |
| if (object.state == IterationMarker.YIELD_SINGLE) { |
| controller.add(object.value); |
| - // If the controller is paused we stop producing more values. |
| - if (controller.isPaused) { |
| - return; |
| - } |
| - // TODO(sigurdm): We should not suspend here according to the spec. |
| + |
| scheduleMicrotask(() { |
| + if (controller.isPaused) { |
| + // We only suspend the thread inside the microtask in order to allow |
| + // listeners on the output stream to pause in response to the just |
| + // output value, and have the stream immediately stop producing. |
| + controller.isSuspended = true; |
| + return; |
| + } |
| _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
| async_error_codes.SUCCESS) |
| (null); |
| @@ -3755,14 +3759,15 @@ void asyncStarHelper(dynamic object, |
| return; |
| } else if (object.state == IterationMarker.YIELD_STAR) { |
| Stream stream = object.value; |
| - controller.isAdding = true; |
| // Errors of [stream] are passed though to the main stream. (see |
| // [AsyncStreamController.addStream]. |
| // TODO(sigurdm): The spec is not very clear here. Clarify with Gilad. |
| controller.addStream(stream).then((_) { |
| - controller.isAdding = false; |
| + // We do not check for isPaused here because the spec 17.16.2 only |
| + // demands checks for each element in [stream] not after the last one. |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
... demands checks *before* each element ...
sigurdm
2015/04/10 08:13:25
Done.
|
| _wrapJsFunctionForAsync(bodyFunctionOrErrorCode, |
| - async_error_codes.SUCCESS)(null); |
| + async_error_codes.SUCCESS) |
| + (null); |
| }); |
| return; |
| } |
| @@ -3784,46 +3789,74 @@ Stream streamOfController(AsyncStarStreamController controller) { |
| return controller.stream; |
| } |
| -/// A wrapper around a [StreamController] that remembers if that controller |
| -/// got a cancel. |
| +/// A wrapper around a [StreamController] that keeps track of the state of |
| +/// the execution of an async* function. |
| +/// It can be in 1 of 3 states: |
| +/// |
| +/// - running/scheduled |
| +/// - suspended |
| +/// - canceled |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
I guess the "done" state is implicit.
sigurdm
2015/04/10 08:13:25
Yes - it corresponds to controller.isClosed
|
| /// |
| -/// Also has a subSubscription that when not null will provide events for the |
| -/// stream, and will be paused and resumed along with this controller. |
| +/// If yielding while the subscription is paused it will become suspended. And |
| +/// only resume after the subscription is resumed or canceled. |
| class AsyncStarStreamController { |
| StreamController controller; |
| Stream get stream => controller.stream; |
| + |
| + /// True when the async* function has yielded while being paused. |
| + /// Or if it has yielded a |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
incomplete sentence.
sigurdm
2015/04/10 08:13:25
Thanks
|
| + /// When true execution will only resume after a `onResume` or `onCancel` |
| + /// event. |
| + bool isSuspended = false; |
| + |
| + bool get isPaused => controller.isPaused; |
| + |
| Completer cancelationCompleter = null; |
| + |
| + /// True after the StreamSubscription has been cancelled. |
| + /// When this is true, errors thrown from the async* body should go to the |
| + /// [cancelationCompleter] instead of adding them to [controller], and |
| + /// returning from the async function should complete [cancelationCompleter]. |
| bool get isCanceled => cancelationCompleter != null; |
| - bool isAdding = false; |
| - bool isPaused = false; |
| + |
| add(event) => controller.add(event); |
| + |
| addStream(Stream stream) { |
| return controller.addStream(stream, cancelOnError: false); |
| } |
| + |
| addError(error, stackTrace) => controller.addError(error, stackTrace); |
| + |
| close() => controller.close(); |
| AsyncStarStreamController(body) { |
| + |
| + _resumeBody() { |
| + Function wrapped = |
| + _wrapJsFunctionForAsync(body, async_error_codes.SUCCESS); |
| + wrapped(null); |
| + } |
| + |
| controller = new StreamController( |
| onListen: () { |
| - scheduleMicrotask(() { |
| - Function wrapped = _wrapJsFunctionForAsync(body, |
| - async_error_codes.SUCCESS); |
| - wrapped(null); |
| - }); |
| - }, |
| - onPause: () { |
| - isPaused = true; |
| + scheduleMicrotask(_resumeBody); |
| }, onResume: () { |
| - isPaused = false; |
| - if (!isAdding) { |
| - asyncStarHelper(null, body, this); |
| + // Only schedule again if the async* function actually os suspended. |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
os -> is
sigurdm
2015/04/10 08:13:25
Done.
|
| + // Resume directly instead of scheduling, so that the sequence |
| + // `pause-resume-pause` will result in one extra event produced. |
| + if (isSuspended) { |
| + isSuspended = false; |
| + _resumeBody(); |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
I think you still need to do a scheduleMicrotask (
sigurdm
2015/04/10 08:13:25
Ah - now I understand what you meant. I thought it
|
| } |
| }, onCancel: () { |
| + // If the async* is finished we ignore cancel events. |
| if (!controller.isClosed) { |
| cancelationCompleter = new Completer(); |
| - if (isPaused) asyncStarHelper(null, body, this); |
| - |
| + if (isSuspended) { |
| + // Resume the suspended async* function to run finalizers. |
| + isSuspended = false; |
| + scheduleMicrotask(_resumeBody); |
|
Lasse Reichstein Nielsen
2015/04/09 12:22:09
Shouldn't the body be called with async_error_code
sigurdm
2015/04/10 08:13:25
Good catch - Done
|
| + } |
| return cancelationCompleter.future; |
| } |
| }); |