Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(342)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Mark failing tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index f5b487ebfa5b2f4906ae6ec6729dac7eb4bb017a..0d82c720a20ee997ce433a17b140ac1fb1535356 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -66,7 +66,9 @@ abstract class StreamController<T> implements StreamSink<T> {
*
* The [onListen] callback is called when the stream
* receives its listener and [onCancel] when the listener ends
- * its subscription.
+ * its subscription. If [onCancel] needs to perform an asynchronous operation,
+ * [onCancel] should return a future that completes when the cancel operation
+ * is done.
*
* If the stream is canceled before the controller needs new data the
* [onResume] call might not be executed.
@@ -74,7 +76,7 @@ abstract class StreamController<T> implements StreamSink<T> {
factory StreamController({void onListen(),
void onPause(),
void onResume(),
- void onCancel(),
+ onCancel(),
bool sync: false}) {
if (onListen == null && onPause == null &&
onResume == null && onCancel == null) {
@@ -172,7 +174,7 @@ abstract class _StreamControllerLifecycle<T> {
StreamSubscription<T> _subscribe(bool cancelOnError);
void _recordPause(StreamSubscription<T> subscription) {}
void _recordResume(StreamSubscription<T> subscription) {}
- void _recordCancel(StreamSubscription<T> subscription) {}
+ Future _recordCancel(StreamSubscription<T> subscription) => null;
}
/**
@@ -463,7 +465,7 @@ abstract class _StreamController<T> implements StreamController<T>,
return subscription;
}
- void _recordCancel(StreamSubscription<T> subscription) {
+ Future _recordCancel(StreamSubscription<T> subscription) {
if (_isAddingStream) {
_StreamControllerAddStreamState addState = _varData;
addState.cancel();
@@ -471,10 +473,18 @@ abstract class _StreamController<T> implements StreamController<T>,
_varData = null;
_state =
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
- _runGuarded(_onCancel);
- if (_doneFuture != null && _doneFuture._mayComplete) {
- _doneFuture._asyncComplete(null);
+ void complete() {
+ if (_doneFuture != null && _doneFuture._mayComplete) {
+ _doneFuture._asyncComplete(null);
+ }
}
+ Future future = _runGuarded(_onCancel);
+ if (future != null) {
+ future = future.whenComplete(complete);
+ } else {
+ complete();
+ }
+ return future;
}
void _recordPause(StreamSubscription<T> subscription) {
@@ -537,7 +547,7 @@ class _AsyncStreamController<T> extends _StreamController<T>
_AsyncStreamController(void this._onListen(),
void this._onPause(),
void this._onResume(),
- void this._onCancel());
+ this._onCancel());
}
class _SyncStreamController<T> extends _StreamController<T>
@@ -550,7 +560,7 @@ class _SyncStreamController<T> extends _StreamController<T>
_SyncStreamController(void this._onListen(),
void this._onPause(),
void this._onResume(),
- void this._onCancel());
+ this._onCancel());
}
abstract class _NoCallbacks {
@@ -566,12 +576,14 @@ class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/
class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/
with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
-typedef void _NotificationHandler();
+typedef _NotificationHandler();
-void _runGuarded(_NotificationHandler notificationHandler) {
- if (notificationHandler == null) return;
+Future _runGuarded(_NotificationHandler notificationHandler) {
+ if (notificationHandler == null) return null;
try {
- notificationHandler();
+ var result = notificationHandler();
+ if (result is Future) return result;
+ return null;
} catch (e, s) {
Zone.current.handleUncaughtError(_asyncError(e, s), s);
}
@@ -605,8 +617,8 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
_ControllerSubscription(this._controller, bool cancelOnError)
: super(cancelOnError);
- void _onCancel() {
- _controller._recordCancel(this);
+ Future _onCancel() {
+ return _controller._recordCancel(this);
}
void _onPause() {
@@ -636,7 +648,7 @@ class _StreamSinkWrapper<T> implements StreamSink<T> {
* Object containing the state used to handle [StreamController.addStream].
*/
class _AddStreamState<T> {
- // [_FutureImpl] returned by call to addStream.
+ // [_Future] returned by call to addStream.
_Future addStreamFuture;
// Subscription on stream argument to addStream.
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698