Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(233)

Unified Diff: sdk/lib/_internal/compiler/js_lib/js_helper.dart

Issue 1070733002: Dart2js async* functions only resume execution of the body when suspended on yield. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/language/async_star_regression_23116_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/_internal/compiler/js_lib/js_helper.dart
diff --git a/sdk/lib/_internal/compiler/js_lib/js_helper.dart b/sdk/lib/_internal/compiler/js_lib/js_helper.dart
index 163833b6ffadbac2da7a19366980a6bcf9041896..2248dc6ef575f2e5c520a19db96778f89f5b9ee7 100644
--- a/sdk/lib/_internal/compiler/js_lib/js_helper.dart
+++ b/sdk/lib/_internal/compiler/js_lib/js_helper.dart
@@ -3642,10 +3642,11 @@ dynamic asyncHelper(dynamic object,
future.then(_wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
async_error_codes.SUCCESS),
onError: (dynamic error, StackTrace stackTrace) {
- ExceptionAndStackTrace wrapped =
+ ExceptionAndStackTrace wrappedException =
new ExceptionAndStackTrace(error, stackTrace);
- return _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
- async_error_codes.ERROR)(wrapped);
+ Function wrapped =_wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
+ async_error_codes.ERROR);
+ wrapped(wrappedException);
});
return completer.future;
}
@@ -3714,7 +3715,7 @@ void asyncStarHelper(dynamic object,
AsyncStarStreamController controller) {
if (identical(bodyFunctionOrErrorCode, async_error_codes.SUCCESS)) {
// This happens on return from the async* function.
- if (controller.cancelationCompleter != null) {
+ if (controller.isCanceled) {
controller.cancelationCompleter.complete();
} else {
controller.close();
@@ -3722,7 +3723,7 @@ void asyncStarHelper(dynamic object,
return;
} else if (identical(bodyFunctionOrErrorCode, async_error_codes.ERROR)) {
// The error is a js-error.
- if (controller.cancelationCompleter != null) {
+ if (controller.isCanceled) {
controller.cancelationCompleter.completeError(
unwrapException(object),
getTraceFromException(object));
@@ -3735,34 +3736,44 @@ void asyncStarHelper(dynamic object,
}
if (object is IterationMarker) {
- if (controller.cancelationCompleter != null) {
- _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
- async_error_codes.STREAM_WAS_CANCELED)(null);
+ if (controller.isCanceled) {
+ Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
+ async_error_codes.STREAM_WAS_CANCELED);
+ wrapped(null);
return;
}
if (object.state == IterationMarker.YIELD_SINGLE) {
controller.add(object.value);
- // If the controller is paused we stop producing more values.
- if (controller.isPaused) {
- return;
- }
- // TODO(sigurdm): We should not suspend here according to the spec.
+
scheduleMicrotask(() {
- _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
- async_error_codes.SUCCESS)
- (null);
+ if (controller.isPaused) {
+ // We only suspend the thread inside the microtask in order to allow
+ // listeners on the output stream to pause in response to the just
+ // output value, and have the stream immediately stop producing.
+ controller.isSuspended = true;
+ return;
+ }
+ Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
+ async_error_codes.SUCCESS);
+ wrapped(null);
});
return;
} else if (object.state == IterationMarker.YIELD_STAR) {
Stream stream = object.value;
- controller.isAdding = true;
// Errors of [stream] are passed though to the main stream. (see
- // [AsyncStreamController.addStream].
+ // [AsyncStreamController.addStream]).
// TODO(sigurdm): The spec is not very clear here. Clarify with Gilad.
controller.addStream(stream).then((_) {
- controller.isAdding = false;
- _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
- async_error_codes.SUCCESS)(null);
+ // No check for isPaused here because the spec 17.16.2 only
+ // demands checks *before* each element in [stream] not after the last
+ // one. On the other hand we check for isCanceled, as that check happens
+ // after insertion of each element.
+ int errorCode = controller.isCanceled
+ ? async_error_codes.STREAM_WAS_CANCELED
+ : async_error_codes.SUCCESS;
+ Function wrapped = _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
+ errorCode);
+ wrapped(null);
});
return;
}
@@ -3772,11 +3783,11 @@ void asyncStarHelper(dynamic object,
future.then(_wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
async_error_codes.SUCCESS),
onError: (error, StackTrace stackTrace) {
- ExceptionAndStackTrace wrapped =
+ ExceptionAndStackTrace wrappedException =
new ExceptionAndStackTrace(error, stackTrace);
- return _wrapJsFunctionForAsync(bodyFunctionOrErrorCode,
- async_error_codes.ERROR)
- (wrapped);
+ Function wrapped = _wrapJsFunctionForAsync(
+ bodyFunctionOrErrorCode, async_error_codes.ERROR);
+ return wrapped(wrappedException);
});
}
@@ -3784,46 +3795,79 @@ Stream streamOfController(AsyncStarStreamController controller) {
return controller.stream;
}
-/// A wrapper around a [StreamController] that remembers if that controller
-/// got a cancel.
+/// A wrapper around a [StreamController] that keeps track of the state of
+/// the execution of an async* function.
+/// It can be in 1 of 3 states:
///
-/// Also has a subSubscription that when not null will provide events for the
-/// stream, and will be paused and resumed along with this controller.
+/// - running/scheduled
+/// - suspended
+/// - canceled
+///
+/// If yielding while the subscription is paused it will become suspended. And
+/// only resume after the subscription is resumed or canceled.
class AsyncStarStreamController {
StreamController controller;
Stream get stream => controller.stream;
+
+ /// True when the async* function has yielded while being paused.
+ /// When true execution will only resume after a `onResume` or `onCancel`
+ /// event.
+ bool isSuspended = false;
+
+ bool get isPaused => controller.isPaused;
+
Completer cancelationCompleter = null;
+
+ /// True after the StreamSubscription has been cancelled.
+ /// When this is true, errors thrown from the async* body should go to the
+ /// [cancelationCompleter] instead of adding them to [controller], and
+ /// returning from the async function should complete [cancelationCompleter].
bool get isCanceled => cancelationCompleter != null;
- bool isAdding = false;
- bool isPaused = false;
+
add(event) => controller.add(event);
+
addStream(Stream stream) {
return controller.addStream(stream, cancelOnError: false);
}
+
addError(error, stackTrace) => controller.addError(error, stackTrace);
+
close() => controller.close();
AsyncStarStreamController(body) {
+
+ _resumeBody() {
+ scheduleMicrotask(() {
+ Function wrapped =
+ _wrapJsFunctionForAsync(body, async_error_codes.SUCCESS);
+ wrapped(null);
+ });
+ }
+
controller = new StreamController(
onListen: () {
- scheduleMicrotask(() {
- Function wrapped = _wrapJsFunctionForAsync(body,
- async_error_codes.SUCCESS);
- wrapped(null);
- });
- },
- onPause: () {
- isPaused = true;
+ _resumeBody();
}, onResume: () {
- isPaused = false;
- if (!isAdding) {
- asyncStarHelper(null, body, this);
+ // Only schedule again if the async* function actually is suspended.
+ // Resume directly instead of scheduling, so that the sequence
+ // `pause-resume-pause` will result in one extra event produced.
+ if (isSuspended) {
+ isSuspended = false;
+ _resumeBody();
}
}, onCancel: () {
+ // If the async* is finished we ignore cancel events.
if (!controller.isClosed) {
cancelationCompleter = new Completer();
- if (isPaused) asyncStarHelper(null, body, this);
-
+ if (isSuspended) {
+ // Resume the suspended async* function to run finalizers.
+ isSuspended = false;
+ scheduleMicrotask(() {
+ Function wrapped =_wrapJsFunctionForAsync(body,
+ async_error_codes.STREAM_WAS_CANCELED);
+ wrapped(null);
+ });
+ }
return cancelationCompleter.future;
}
});
« no previous file with comments | « no previous file | tests/language/async_star_regression_23116_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698