Index: tool/input_sdk/lib/async/stream_controller.dart |
diff --git a/tool/input_sdk/lib/async/stream_controller.dart b/tool/input_sdk/lib/async/stream_controller.dart |
index 0d6ce2c32c68e0025c5768315e70b422f9cd2dba..87b64e893d1e52dc89f7b55f5b3ca19175a0bba1 100644 |
--- a/tool/input_sdk/lib/async/stream_controller.dart |
+++ b/tool/input_sdk/lib/async/stream_controller.dart |
@@ -9,6 +9,18 @@ part of dart.async; |
// ------------------------------------------------------------------- |
/** |
+ * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks. |
+ */ |
+typedef void ControllerCallback(); |
+ |
+/** |
+ * Type of stream controller `onCancel` callbacks. |
+ * |
+ * The callback may return either `void` or a future. |
+ */ |
+typedef ControllerCancelCallback(); |
+ |
+/** |
* A controller with the stream it controls. |
* |
* This controller allows sending data, error and done events on |
@@ -53,13 +65,19 @@ abstract class StreamController<T> implements StreamSink<T> { |
/** |
* A controller with a [stream] that supports only one single subscriber. |
* |
- * If [sync] is true, events may be passed directly to the stream's listener |
- * during an [add], [addError] or [close] call. If [sync] is false, the event |
- * will be passed to the listener at a later time, after the code creating |
- * the event has returned. |
+ * If [sync] is true, the returned stream controller is a |
+ * [SynchronousStreamController], and must be used with the care |
+ * and attention necessary to not break the [Stream] contract. |
+ * See [Completer.sync] for some explanations on when a synchronous |
+ * dispatching can be used. |
+ * If in doubt, keep the controller non-sync. |
+ * |
+ * A Stream should be inert until a subscriber starts listening on it (using |
+ * the [onListen] callback to start producing events). Streams should not |
+ * leak resources (like websockets) when no user ever listens on the stream. |
* |
- * The controller will buffer all incoming events until the subscriber is |
- * registered. |
+ * The controller buffers all incoming events until a subscriber is |
+ * registered, but this feature should only be used in rare circumstances. |
* |
* The [onPause] function is called when the stream becomes |
* paused. [onResume] is called when the stream resumed. |
@@ -78,12 +96,6 @@ abstract class StreamController<T> implements StreamSink<T> { |
void onResume(), |
onCancel(), |
bool sync: false}) { |
- if (onListen == null && onPause == null && |
- onResume == null && onCancel == null) { |
- return sync |
- ? new _NoCallbackSyncStreamController<T>() |
- : new _NoCallbackAsyncStreamController<T>(); |
- } |
return sync |
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
@@ -95,6 +107,12 @@ abstract class StreamController<T> implements StreamSink<T> { |
* The [Stream] returned by [stream] is a broadcast stream. |
* It can be listened to more than once. |
* |
+ * A Stream should be inert until a subscriber starts listening on it (using |
+ * the [onListen] callback to start producing events). Streams should not |
+ * leak resources (like websockets) when no user ever listens on the stream. |
+ * |
+ * Broadcast streams do not buffer events when there is no listener. |
+ * |
* The controller distributes any events to all currently subscribed |
* listeners at the time when [add], [addError] or [close] is called. |
* It is not allowed to call `add`, `addError`, or `close` before a previous |
@@ -108,10 +126,16 @@ abstract class StreamController<T> implements StreamSink<T> { |
* |
* If [sync] is true, events may be fired directly by the stream's |
* subscriptions during an [add], [addError] or [close] call. |
- * If [sync] is false, the event will be fired at a later time, |
+ * The returned stream controller is a [SynchronousStreamController], |
+ * and must be used with the care and attention necessary to not break |
+ * the [Stream] contract. |
+ * See [Completer.sync] for some explanations on when a synchronous |
+ * dispatching can be used. |
+ * If in doubt, keep the controller non-sync. |
+ * |
+ * If [sync] is false, the event will always be fired at a later time, |
* after the code adding the event has completed. |
- * |
- * When [sync] is false, no guarantees are given with regard to when |
+ * In that case, no guarantees are given with regard to when |
* multiple listeners get the events, except that each listener will get |
* all events in the correct order. Each subscription handles the events |
* individually. |
@@ -137,15 +161,60 @@ abstract class StreamController<T> implements StreamSink<T> { |
} |
/** |
+ * The callback which is called when the stream is listened to. |
+ * |
+ * May be set to `null`, in which case no callback will happen. |
+ */ |
+ ControllerCallback get onListen; |
+ |
+ void set onListen(void onListenHandler()); |
+ |
+ /** |
+ * The callback which is called when the stream is paused. |
+ * |
+ * May be set to `null`, in which case no callback will happen. |
+ * |
+ * Pause related callbacks are not supported on broadcast stream controllers. |
+ */ |
+ ControllerCallback get onPause; |
+ |
+ void set onPause(void onPauseHandler()); |
+ |
+ /** |
+ * The callback which is called when the stream is resumed. |
+ * |
+ * May be set to `null`, in which case no callback will happen. |
+ * |
+ * Pause related callbacks are not supported on broadcast stream controllers. |
+ */ |
+ ControllerCallback get onResume; |
+ |
+ void set onResume(void onResumeHandler()); |
+ |
+ /** |
+ * The callback which is called when the stream is canceled. |
+ * |
+ * May be set to `null`, in which case no callback will happen. |
+ */ |
+ ControllerCancelCallback get onCancel; |
+ |
+ void set onCancel(onCancelHandler()); |
+ |
+ /** |
* Returns a view of this object that only exposes the [StreamSink] interface. |
*/ |
StreamSink<T> get sink; |
/** |
- * Whether the stream is closed for adding more events. |
+ * Whether the stream controller is closed for adding more events. |
+ * |
+ * The controller becomes closed by calling the [close] method. |
+ * New events cannot be added, by calling [add] or [addError], |
+ * to a closed controller. |
* |
- * If true, the "done" event might not have fired yet, but it has been |
- * scheduled, and it is too late to add more events. |
+ * If the controller is closed, |
+ * the "done" event might not have been delivered yet, |
+ * but it has been scheduled, and it is too late to add more events. |
*/ |
bool get isClosed; |
@@ -157,8 +226,8 @@ abstract class StreamController<T> implements StreamSink<T> { |
* controller is considered paused as well. |
* |
* A broadcast stream controller is never considered paused. It always |
- * forwards its events to all uncanceled listeners, if any, and let them |
- * handle their own pausing. |
+ * forwards its events to all uncanceled subscriptions, if any, |
+ * and let the subscriptions handle their own pausing and buffering. |
*/ |
bool get isPaused; |
@@ -169,9 +238,6 @@ abstract class StreamController<T> implements StreamSink<T> { |
* Send or enqueue an error event. |
* |
* If [error] is `null`, it is replaced by a [NullThrownError]. |
- * |
- * Also allows an objection stack trace object, on top of what [EventSink] |
- * allows. |
*/ |
void addError(Object error, [StackTrace stackTrace]); |
@@ -197,6 +263,103 @@ abstract class StreamController<T> implements StreamSink<T> { |
} |
+/** |
+ * A stream controller that delivers its events synchronously. |
+ * |
+ * A synchronous stream controller is intended for cases where |
+ * an already asynchronous event triggers an event on a stream. |
+ * |
+ * Instead of adding the event to the stream in a later microtask, |
+ * causing extra latency, the event is instead fired immediately by the |
+ * synchronous stream controller, as if the stream event was |
+ * the current event or microtask. |
+ * |
+ * The synchronous stream controller can be used to break the contract |
+ * on [Stream], and it must be used carefully to avoid doing so. |
+ * |
+ * The only advantage to using a [SynchronousStreamController] over a |
+ * normal [StreamController] is the improved latency. |
+ * Only use the synchronous version if the improvement is significant, |
+ * and if its use is safe. Otherwise just use a normal stream controller, |
+ * which will always have the correct behavior for a [Stream], and won't |
+ * accidentally break other code. |
+ * |
+ * Adding events to a synchronous controller should only happen as the |
+ * very last part of a the handling of the original event. |
+ * At that point, adding an event to the stream is equivalent to |
+ * returning to the event loop and adding the event in the next microtask. |
+ * |
+ * Each listener callback will be run as if it was a top-level event |
+ * or microtask. This means that if it throws, the error will be reported as |
+ * uncaught as soon as possible. |
+ * This is one reason to add the event as the last thing in the original event |
+ * handler - any action done after adding the event will delay the report of |
+ * errors in the event listener callbacks. |
+ * |
+ * If an event is added in a setting that isn't known to be another event, |
+ * it may cause the stream's listener to get that event before the listener |
+ * is ready to handle it. We promise that after calling [Stream.listen], |
+ * you won't get any events until the code doing the listen has completed. |
+ * Calling [add] in response to a function call of unknown origin may break |
+ * that promise. |
+ * |
+ * An [onListen] callback from the controller is *not* an asynchronous event, |
+ * and adding events to the controller in the `onListen` callback is always |
+ * wrong. The events will be delivered before the listener has even received |
+ * the subscription yet. |
+ * |
+ * The synchronous broadcast stream controller also has a restrictions that a |
+ * normal stream controller does not: |
+ * The [add], [addError], [close] and [addStream] methods *must not* be |
+ * called while an event is being delivered. |
+ * That is, if a callback on a subscription on the controller's stream causes |
+ * a call to any of the functions above, the call will fail. |
+ * A broadcast stream may have more than one listener, and if an |
+ * event is added synchronously while another is being also in the process |
+ * of being added, the latter event might reach some listeners before |
+ * the former. To prevent that, an event cannot be added while a previous |
+ * event is being fired. |
+ * This guarantees that an event is fully delivered when the |
+ * first [add], [addError] or [close] returns, |
+ * and further events will be delivered in the correct order. |
+ * |
+ * This still only guarantees that the event is delivered to the subscription. |
+ * If the subscription is paused, the actual callback may still happen later, |
+ * and the event will instead be buffered by the subscription. |
+ * Barring pausing, and the following buffered events that haven't been |
+ * delivered yet, callbacks will be called synchronously when an event is added. |
+ * |
+ * Adding an event to a synchronous non-broadcast stream controller while |
+ * another event is in progress may cause the second event to be delayed |
+ * and not be delivered synchronously, and until that event is delivered, |
+ * the controller will not act synchronously. |
+ */ |
+abstract class SynchronousStreamController<T> implements StreamController<T> { |
+ /** |
+ * Adds event to the controller's stream. |
+ * |
+ * As [StreamController.add], but must not be called while an event is |
+ * being added by [add], [addError] or [close]. |
+ */ |
+ void add(T data); |
+ |
+ /** |
+ * Adds error to the controller's stream. |
+ * |
+ * As [StreamController.addError], but must not be called while an event is |
+ * being added by [add], [addError] or [close]. |
+ */ |
+ void addError(Object error, [StackTrace stackTrace]); |
+ |
+ /** |
+ * Closes the controller's stream. |
+ * |
+ * As [StreamController.close], but must not be called while an event is |
+ * being added by [add], [addError] or [close]. |
+ */ |
+ Future close(); |
+} |
+ |
abstract class _StreamControllerLifecycle<T> { |
StreamSubscription<T> _subscribe( |
void onData(T data), |
@@ -282,15 +445,18 @@ abstract class _StreamController<T> implements StreamController<T>, |
// accessed earlier, or if close is called before subscribing. |
_Future _doneFuture; |
- _StreamController(); |
+ ControllerCallback onListen; |
+ ControllerCallback onPause; |
+ ControllerCallback onResume; |
+ ControllerCancelCallback onCancel; |
- _NotificationHandler get _onListen; |
- _NotificationHandler get _onPause; |
- _NotificationHandler get _onResume; |
- _NotificationHandler get _onCancel; |
+ _StreamController(this.onListen, |
+ this.onPause, |
+ this.onResume, |
+ this.onCancel); |
// Return a new stream every time. The streams are equal, but not identical. |
- Stream<T> get stream => new _ControllerStream(this); |
+ Stream<T> get stream => new _ControllerStream<T>(this); |
/** |
* Returns a view of this object that only exposes the [StreamSink] interface. |
@@ -328,37 +494,40 @@ abstract class _StreamController<T> implements StreamController<T>, |
// stream is listened to. |
// While adding a stream, pending events are moved into the |
// state object to allow the state object to use the _varData field. |
- _PendingEvents get _pendingEvents { |
+ _PendingEvents<T> get _pendingEvents { |
assert(_isInitialState); |
if (!_isAddingStream) { |
- return _varData; |
+ return _varData as Object /*=_PendingEvents<T>*/; |
} |
- _StreamControllerAddStreamState state = _varData; |
- return state.varData; |
+ _StreamControllerAddStreamState<T> state = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ return state.varData as Object /*=_PendingEvents<T>*/; |
} |
// Returns the pending events, and creates the object if necessary. |
- _StreamImplEvents _ensurePendingEvents() { |
+ _StreamImplEvents<T> _ensurePendingEvents() { |
assert(_isInitialState); |
if (!_isAddingStream) { |
- if (_varData == null) _varData = new _StreamImplEvents(); |
- return _varData; |
+ if (_varData == null) _varData = new _StreamImplEvents<T>(); |
+ return _varData as Object /*=_StreamImplEvents<T>*/; |
} |
- _StreamControllerAddStreamState state = _varData; |
- if (state.varData == null) state.varData = new _StreamImplEvents(); |
- return state.varData; |
+ _StreamControllerAddStreamState<T> state = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ if (state.varData == null) state.varData = new _StreamImplEvents<T>(); |
+ return state.varData as Object /*=_StreamImplEvents<T>*/; |
} |
// Get the current subscription. |
// If we are adding a stream, the subscription is moved into the state |
// object to allow the state object to use the _varData field. |
- _ControllerSubscription get _subscription { |
+ _ControllerSubscription<T> get _subscription { |
assert(hasListener); |
if (_isAddingStream) { |
- _StreamControllerAddStreamState addState = _varData; |
- return addState.varData; |
+ _StreamControllerAddStreamState<T> addState = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ return addState.varData as Object /*=_ControllerSubscription<T>*/; |
} |
- return _varData; |
+ return _varData as Object /*=_ControllerSubscription<T>*/; |
} |
/** |
@@ -378,11 +547,11 @@ abstract class _StreamController<T> implements StreamController<T>, |
Future addStream(Stream<T> source, {bool cancelOnError: true}) { |
if (!_mayAddEvent) throw _badEventState(); |
if (_isCanceled) return new _Future.immediate(null); |
- _StreamControllerAddStreamState addState = |
- new _StreamControllerAddStreamState(this, |
- _varData, |
- source, |
- cancelOnError); |
+ _StreamControllerAddStreamState<T> addState = |
+ new _StreamControllerAddStreamState<T>(this, |
+ _varData, |
+ source, |
+ cancelOnError); |
_varData = addState; |
_state |= _STATE_ADDSTREAM; |
return addState.addStreamFuture; |
@@ -416,8 +585,8 @@ abstract class _StreamController<T> implements StreamController<T>, |
* Send or enqueue an error event. |
*/ |
void addError(Object error, [StackTrace stackTrace]) { |
- error = _nonNullError(error); |
if (!_mayAddEvent) throw _badEventState(); |
+ error = _nonNullError(error); |
AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
if (replacement != null) { |
error = _nonNullError(replacement.error); |
@@ -435,7 +604,8 @@ abstract class _StreamController<T> implements StreamController<T>, |
* You are allowed to close the controller more than once, but only the first |
* call has any effect. |
* |
- * After closing, no further events may be added using [add] or [addError]. |
+ * After closing, no further events may be added using [add], [addError] |
+ * or [addStream]. |
* |
* The returned future is completed when the done event has been delivered. |
*/ |
@@ -479,7 +649,8 @@ abstract class _StreamController<T> implements StreamController<T>, |
void _close() { |
// End of addStream stream. |
assert(_isAddingStream); |
- _StreamControllerAddStreamState addState = _varData; |
+ _StreamControllerAddStreamState<T> addState = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
_varData = addState.varData; |
_state &= ~_STATE_ADDSTREAM; |
addState.complete(); |
@@ -495,14 +666,15 @@ abstract class _StreamController<T> implements StreamController<T>, |
if (!_isInitialState) { |
throw new StateError("Stream has already been listened to."); |
} |
- _ControllerSubscription subscription = |
- new _ControllerSubscription(this, onData, onError, onDone, |
- cancelOnError); |
+ _ControllerSubscription<T> subscription = |
+ new _ControllerSubscription<T>(this, onData, onError, onDone, |
+ cancelOnError); |
- _PendingEvents pendingEvents = _pendingEvents; |
+ _PendingEvents<T> pendingEvents = _pendingEvents; |
_state |= _STATE_SUBSCRIBED; |
if (_isAddingStream) { |
- _StreamControllerAddStreamState addState = _varData; |
+ _StreamControllerAddStreamState<T> addState = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
addState.varData = subscription; |
addState.resume(); |
} else { |
@@ -510,7 +682,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
} |
subscription._setPendingEvents(pendingEvents); |
subscription._guardCallback(() { |
- _runGuarded(_onListen); |
+ _runGuarded(onListen); |
}); |
return subscription; |
@@ -518,8 +690,8 @@ abstract class _StreamController<T> implements StreamController<T>, |
Future _recordCancel(StreamSubscription<T> subscription) { |
// When we cancel, we first cancel any stream being added, |
- // Then we call _onCancel, and finally the _doneFuture is completed. |
- // If either of addStream's cancel or _onCancel returns a future, |
+ // Then we call `onCancel`, and finally the _doneFuture is completed. |
+ // If either of addStream's cancel or `onCancel` returns a future, |
// we wait for it before continuing. |
// Any error during this process ends up in the returned future. |
// If more errors happen, we act as if it happens inside nested try/finallys |
@@ -527,19 +699,20 @@ abstract class _StreamController<T> implements StreamController<T>, |
// returned future. |
Future result; |
if (_isAddingStream) { |
- _StreamControllerAddStreamState addState = _varData; |
+ _StreamControllerAddStreamState<T> addState = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
result = addState.cancel(); |
} |
_varData = null; |
_state = |
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
- if (_onCancel != null) { |
+ if (onCancel != null) { |
if (result == null) { |
// Only introduce a future if one is needed. |
// If _onCancel returns null, no future is needed. |
try { |
- result = _onCancel(); |
+ result = onCancel(); |
} catch (e, s) { |
// Return the error in the returned future. |
// Complete it asynchronously, so there is time for a listener |
@@ -548,7 +721,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
} |
} else { |
// Simpler case when we already know that we will return a future. |
- result = result.whenComplete(_onCancel); |
+ result = result.whenComplete(onCancel); |
} |
} |
@@ -569,23 +742,28 @@ abstract class _StreamController<T> implements StreamController<T>, |
void _recordPause(StreamSubscription<T> subscription) { |
if (_isAddingStream) { |
- _StreamControllerAddStreamState addState = _varData; |
+ _StreamControllerAddStreamState<T> addState = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
addState.pause(); |
} |
- _runGuarded(_onPause); |
+ _runGuarded(onPause); |
} |
void _recordResume(StreamSubscription<T> subscription) { |
if (_isAddingStream) { |
- _StreamControllerAddStreamState addState = _varData; |
+ _StreamControllerAddStreamState<T> addState = |
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
addState.resume(); |
} |
- _runGuarded(_onResume); |
+ _runGuarded(onResume); |
} |
} |
abstract class _SyncStreamControllerDispatch<T> |
- implements _StreamController<T> { |
+ implements _StreamController<T>, SynchronousStreamController<T> { |
+ int get _state; |
+ void set _state(int state); |
+ |
void _sendData(T data) { |
_subscription._add(data); |
} |
@@ -602,7 +780,7 @@ abstract class _SyncStreamControllerDispatch<T> |
abstract class _AsyncStreamControllerDispatch<T> |
implements _StreamController<T> { |
void _sendData(T data) { |
- _subscription._addPending(new _DelayedData(data)); |
+ _subscription._addPending(new _DelayedData<dynamic /*=T*/>(data)); |
} |
void _sendError(Object error, StackTrace stackTrace) { |
@@ -617,44 +795,11 @@ abstract class _AsyncStreamControllerDispatch<T> |
// TODO(lrn): Use common superclass for callback-controllers when VM supports |
// constructors in mixin superclasses. |
-class _AsyncStreamController<T> extends _StreamController<T> |
- with _AsyncStreamControllerDispatch<T> { |
- final _NotificationHandler _onListen; |
- final _NotificationHandler _onPause; |
- final _NotificationHandler _onResume; |
- final _NotificationHandler _onCancel; |
- |
- _AsyncStreamController(void this._onListen(), |
- void this._onPause(), |
- void this._onResume(), |
- this._onCancel()); |
-} |
- |
-class _SyncStreamController<T> extends _StreamController<T> |
- with _SyncStreamControllerDispatch<T> { |
- final _NotificationHandler _onListen; |
- final _NotificationHandler _onPause; |
- final _NotificationHandler _onResume; |
- final _NotificationHandler _onCancel; |
- |
- _SyncStreamController(void this._onListen(), |
- void this._onPause(), |
- void this._onResume(), |
- this._onCancel()); |
-} |
- |
-abstract class _NoCallbacks { |
- _NotificationHandler get _onListen => null; |
- _NotificationHandler get _onPause => null; |
- _NotificationHandler get _onResume => null; |
- _NotificationHandler get _onCancel => null; |
-} |
- |
-class _NoCallbackAsyncStreamController<T> = _StreamController<T> |
- with _AsyncStreamControllerDispatch<T>, _NoCallbacks; |
+class _AsyncStreamController<T> = _StreamController<T> |
+ with _AsyncStreamControllerDispatch<T>; |
-class _NoCallbackSyncStreamController<T> = _StreamController<T> |
- with _SyncStreamControllerDispatch<T>, _NoCallbacks; |
+class _SyncStreamController<T> = _StreamController<T> |
+ with _SyncStreamControllerDispatch<T>; |
typedef _NotificationHandler(); |
@@ -772,12 +917,12 @@ class _AddStreamState<T> { |
* Return a future if the cancel takes time, otherwise return `null`. |
*/ |
Future cancel() { |
- var cancel2 = addSubscription.cancel(); |
- if (cancel2 == null) { |
+ var cancel = addSubscription.cancel(); |
+ if (cancel == null) { |
addStreamFuture._asyncComplete(null); |
return null; |
} |
- return cancel2.whenComplete(() { addStreamFuture._asyncComplete(null); }); |
+ return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); }); |
} |
void complete() { |
@@ -791,7 +936,7 @@ class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
// to store this state object. |
var varData; |
- _StreamControllerAddStreamState(_StreamController controller, |
+ _StreamControllerAddStreamState(_StreamController<T> controller, |
this.varData, |
Stream source, |
bool cancelOnError) |