Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Unified Diff: tool/input_sdk/lib/async/stream_controller.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Remove unneeded calls. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tool/input_sdk/lib/async/stream.dart ('k') | tool/input_sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tool/input_sdk/lib/async/stream_controller.dart
diff --git a/tool/input_sdk/lib/async/stream_controller.dart b/tool/input_sdk/lib/async/stream_controller.dart
index 0d6ce2c32c68e0025c5768315e70b422f9cd2dba..87b64e893d1e52dc89f7b55f5b3ca19175a0bba1 100644
--- a/tool/input_sdk/lib/async/stream_controller.dart
+++ b/tool/input_sdk/lib/async/stream_controller.dart
@@ -9,6 +9,18 @@ part of dart.async;
// -------------------------------------------------------------------
/**
+ * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks.
+ */
+typedef void ControllerCallback();
+
+/**
+ * Type of stream controller `onCancel` callbacks.
+ *
+ * The callback may return either `void` or a future.
+ */
+typedef ControllerCancelCallback();
+
+/**
* A controller with the stream it controls.
*
* This controller allows sending data, error and done events on
@@ -53,13 +65,19 @@ abstract class StreamController<T> implements StreamSink<T> {
/**
* A controller with a [stream] that supports only one single subscriber.
*
- * If [sync] is true, events may be passed directly to the stream's listener
- * during an [add], [addError] or [close] call. If [sync] is false, the event
- * will be passed to the listener at a later time, after the code creating
- * the event has returned.
+ * If [sync] is true, the returned stream controller is a
+ * [SynchronousStreamController], and must be used with the care
+ * and attention necessary to not break the [Stream] contract.
+ * See [Completer.sync] for some explanations on when a synchronous
+ * dispatching can be used.
+ * If in doubt, keep the controller non-sync.
+ *
+ * A Stream should be inert until a subscriber starts listening on it (using
+ * the [onListen] callback to start producing events). Streams should not
+ * leak resources (like websockets) when no user ever listens on the stream.
*
- * The controller will buffer all incoming events until the subscriber is
- * registered.
+ * The controller buffers all incoming events until a subscriber is
+ * registered, but this feature should only be used in rare circumstances.
*
* The [onPause] function is called when the stream becomes
* paused. [onResume] is called when the stream resumed.
@@ -78,12 +96,6 @@ abstract class StreamController<T> implements StreamSink<T> {
void onResume(),
onCancel(),
bool sync: false}) {
- if (onListen == null && onPause == null &&
- onResume == null && onCancel == null) {
- return sync
- ? new _NoCallbackSyncStreamController<T>()
- : new _NoCallbackAsyncStreamController<T>();
- }
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
@@ -95,6 +107,12 @@ abstract class StreamController<T> implements StreamSink<T> {
* The [Stream] returned by [stream] is a broadcast stream.
* It can be listened to more than once.
*
+ * A Stream should be inert until a subscriber starts listening on it (using
+ * the [onListen] callback to start producing events). Streams should not
+ * leak resources (like websockets) when no user ever listens on the stream.
+ *
+ * Broadcast streams do not buffer events when there is no listener.
+ *
* The controller distributes any events to all currently subscribed
* listeners at the time when [add], [addError] or [close] is called.
* It is not allowed to call `add`, `addError`, or `close` before a previous
@@ -108,10 +126,16 @@ abstract class StreamController<T> implements StreamSink<T> {
*
* If [sync] is true, events may be fired directly by the stream's
* subscriptions during an [add], [addError] or [close] call.
- * If [sync] is false, the event will be fired at a later time,
+ * The returned stream controller is a [SynchronousStreamController],
+ * and must be used with the care and attention necessary to not break
+ * the [Stream] contract.
+ * See [Completer.sync] for some explanations on when a synchronous
+ * dispatching can be used.
+ * If in doubt, keep the controller non-sync.
+ *
+ * If [sync] is false, the event will always be fired at a later time,
* after the code adding the event has completed.
- *
- * When [sync] is false, no guarantees are given with regard to when
+ * In that case, no guarantees are given with regard to when
* multiple listeners get the events, except that each listener will get
* all events in the correct order. Each subscription handles the events
* individually.
@@ -137,15 +161,60 @@ abstract class StreamController<T> implements StreamSink<T> {
}
/**
+ * The callback which is called when the stream is listened to.
+ *
+ * May be set to `null`, in which case no callback will happen.
+ */
+ ControllerCallback get onListen;
+
+ void set onListen(void onListenHandler());
+
+ /**
+ * The callback which is called when the stream is paused.
+ *
+ * May be set to `null`, in which case no callback will happen.
+ *
+ * Pause related callbacks are not supported on broadcast stream controllers.
+ */
+ ControllerCallback get onPause;
+
+ void set onPause(void onPauseHandler());
+
+ /**
+ * The callback which is called when the stream is resumed.
+ *
+ * May be set to `null`, in which case no callback will happen.
+ *
+ * Pause related callbacks are not supported on broadcast stream controllers.
+ */
+ ControllerCallback get onResume;
+
+ void set onResume(void onResumeHandler());
+
+ /**
+ * The callback which is called when the stream is canceled.
+ *
+ * May be set to `null`, in which case no callback will happen.
+ */
+ ControllerCancelCallback get onCancel;
+
+ void set onCancel(onCancelHandler());
+
+ /**
* Returns a view of this object that only exposes the [StreamSink] interface.
*/
StreamSink<T> get sink;
/**
- * Whether the stream is closed for adding more events.
+ * Whether the stream controller is closed for adding more events.
+ *
+ * The controller becomes closed by calling the [close] method.
+ * New events cannot be added, by calling [add] or [addError],
+ * to a closed controller.
*
- * If true, the "done" event might not have fired yet, but it has been
- * scheduled, and it is too late to add more events.
+ * If the controller is closed,
+ * the "done" event might not have been delivered yet,
+ * but it has been scheduled, and it is too late to add more events.
*/
bool get isClosed;
@@ -157,8 +226,8 @@ abstract class StreamController<T> implements StreamSink<T> {
* controller is considered paused as well.
*
* A broadcast stream controller is never considered paused. It always
- * forwards its events to all uncanceled listeners, if any, and let them
- * handle their own pausing.
+ * forwards its events to all uncanceled subscriptions, if any,
+ * and let the subscriptions handle their own pausing and buffering.
*/
bool get isPaused;
@@ -169,9 +238,6 @@ abstract class StreamController<T> implements StreamSink<T> {
* Send or enqueue an error event.
*
* If [error] is `null`, it is replaced by a [NullThrownError].
- *
- * Also allows an objection stack trace object, on top of what [EventSink]
- * allows.
*/
void addError(Object error, [StackTrace stackTrace]);
@@ -197,6 +263,103 @@ abstract class StreamController<T> implements StreamSink<T> {
}
+/**
+ * A stream controller that delivers its events synchronously.
+ *
+ * A synchronous stream controller is intended for cases where
+ * an already asynchronous event triggers an event on a stream.
+ *
+ * Instead of adding the event to the stream in a later microtask,
+ * causing extra latency, the event is instead fired immediately by the
+ * synchronous stream controller, as if the stream event was
+ * the current event or microtask.
+ *
+ * The synchronous stream controller can be used to break the contract
+ * on [Stream], and it must be used carefully to avoid doing so.
+ *
+ * The only advantage to using a [SynchronousStreamController] over a
+ * normal [StreamController] is the improved latency.
+ * Only use the synchronous version if the improvement is significant,
+ * and if its use is safe. Otherwise just use a normal stream controller,
+ * which will always have the correct behavior for a [Stream], and won't
+ * accidentally break other code.
+ *
+ * Adding events to a synchronous controller should only happen as the
+ * very last part of a the handling of the original event.
+ * At that point, adding an event to the stream is equivalent to
+ * returning to the event loop and adding the event in the next microtask.
+ *
+ * Each listener callback will be run as if it was a top-level event
+ * or microtask. This means that if it throws, the error will be reported as
+ * uncaught as soon as possible.
+ * This is one reason to add the event as the last thing in the original event
+ * handler - any action done after adding the event will delay the report of
+ * errors in the event listener callbacks.
+ *
+ * If an event is added in a setting that isn't known to be another event,
+ * it may cause the stream's listener to get that event before the listener
+ * is ready to handle it. We promise that after calling [Stream.listen],
+ * you won't get any events until the code doing the listen has completed.
+ * Calling [add] in response to a function call of unknown origin may break
+ * that promise.
+ *
+ * An [onListen] callback from the controller is *not* an asynchronous event,
+ * and adding events to the controller in the `onListen` callback is always
+ * wrong. The events will be delivered before the listener has even received
+ * the subscription yet.
+ *
+ * The synchronous broadcast stream controller also has a restrictions that a
+ * normal stream controller does not:
+ * The [add], [addError], [close] and [addStream] methods *must not* be
+ * called while an event is being delivered.
+ * That is, if a callback on a subscription on the controller's stream causes
+ * a call to any of the functions above, the call will fail.
+ * A broadcast stream may have more than one listener, and if an
+ * event is added synchronously while another is being also in the process
+ * of being added, the latter event might reach some listeners before
+ * the former. To prevent that, an event cannot be added while a previous
+ * event is being fired.
+ * This guarantees that an event is fully delivered when the
+ * first [add], [addError] or [close] returns,
+ * and further events will be delivered in the correct order.
+ *
+ * This still only guarantees that the event is delivered to the subscription.
+ * If the subscription is paused, the actual callback may still happen later,
+ * and the event will instead be buffered by the subscription.
+ * Barring pausing, and the following buffered events that haven't been
+ * delivered yet, callbacks will be called synchronously when an event is added.
+ *
+ * Adding an event to a synchronous non-broadcast stream controller while
+ * another event is in progress may cause the second event to be delayed
+ * and not be delivered synchronously, and until that event is delivered,
+ * the controller will not act synchronously.
+ */
+abstract class SynchronousStreamController<T> implements StreamController<T> {
+ /**
+ * Adds event to the controller's stream.
+ *
+ * As [StreamController.add], but must not be called while an event is
+ * being added by [add], [addError] or [close].
+ */
+ void add(T data);
+
+ /**
+ * Adds error to the controller's stream.
+ *
+ * As [StreamController.addError], but must not be called while an event is
+ * being added by [add], [addError] or [close].
+ */
+ void addError(Object error, [StackTrace stackTrace]);
+
+ /**
+ * Closes the controller's stream.
+ *
+ * As [StreamController.close], but must not be called while an event is
+ * being added by [add], [addError] or [close].
+ */
+ Future close();
+}
+
abstract class _StreamControllerLifecycle<T> {
StreamSubscription<T> _subscribe(
void onData(T data),
@@ -282,15 +445,18 @@ abstract class _StreamController<T> implements StreamController<T>,
// accessed earlier, or if close is called before subscribing.
_Future _doneFuture;
- _StreamController();
+ ControllerCallback onListen;
+ ControllerCallback onPause;
+ ControllerCallback onResume;
+ ControllerCancelCallback onCancel;
- _NotificationHandler get _onListen;
- _NotificationHandler get _onPause;
- _NotificationHandler get _onResume;
- _NotificationHandler get _onCancel;
+ _StreamController(this.onListen,
+ this.onPause,
+ this.onResume,
+ this.onCancel);
// Return a new stream every time. The streams are equal, but not identical.
- Stream<T> get stream => new _ControllerStream(this);
+ Stream<T> get stream => new _ControllerStream<T>(this);
/**
* Returns a view of this object that only exposes the [StreamSink] interface.
@@ -328,37 +494,40 @@ abstract class _StreamController<T> implements StreamController<T>,
// stream is listened to.
// While adding a stream, pending events are moved into the
// state object to allow the state object to use the _varData field.
- _PendingEvents get _pendingEvents {
+ _PendingEvents<T> get _pendingEvents {
assert(_isInitialState);
if (!_isAddingStream) {
- return _varData;
+ return _varData as Object /*=_PendingEvents<T>*/;
}
- _StreamControllerAddStreamState state = _varData;
- return state.varData;
+ _StreamControllerAddStreamState<T> state =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ return state.varData as Object /*=_PendingEvents<T>*/;
}
// Returns the pending events, and creates the object if necessary.
- _StreamImplEvents _ensurePendingEvents() {
+ _StreamImplEvents<T> _ensurePendingEvents() {
assert(_isInitialState);
if (!_isAddingStream) {
- if (_varData == null) _varData = new _StreamImplEvents();
- return _varData;
+ if (_varData == null) _varData = new _StreamImplEvents<T>();
+ return _varData as Object /*=_StreamImplEvents<T>*/;
}
- _StreamControllerAddStreamState state = _varData;
- if (state.varData == null) state.varData = new _StreamImplEvents();
- return state.varData;
+ _StreamControllerAddStreamState<T> state =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ if (state.varData == null) state.varData = new _StreamImplEvents<T>();
+ return state.varData as Object /*=_StreamImplEvents<T>*/;
}
// Get the current subscription.
// If we are adding a stream, the subscription is moved into the state
// object to allow the state object to use the _varData field.
- _ControllerSubscription get _subscription {
+ _ControllerSubscription<T> get _subscription {
assert(hasListener);
if (_isAddingStream) {
- _StreamControllerAddStreamState addState = _varData;
- return addState.varData;
+ _StreamControllerAddStreamState<T> addState =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
+ return addState.varData as Object /*=_ControllerSubscription<T>*/;
}
- return _varData;
+ return _varData as Object /*=_ControllerSubscription<T>*/;
}
/**
@@ -378,11 +547,11 @@ abstract class _StreamController<T> implements StreamController<T>,
Future addStream(Stream<T> source, {bool cancelOnError: true}) {
if (!_mayAddEvent) throw _badEventState();
if (_isCanceled) return new _Future.immediate(null);
- _StreamControllerAddStreamState addState =
- new _StreamControllerAddStreamState(this,
- _varData,
- source,
- cancelOnError);
+ _StreamControllerAddStreamState<T> addState =
+ new _StreamControllerAddStreamState<T>(this,
+ _varData,
+ source,
+ cancelOnError);
_varData = addState;
_state |= _STATE_ADDSTREAM;
return addState.addStreamFuture;
@@ -416,8 +585,8 @@ abstract class _StreamController<T> implements StreamController<T>,
* Send or enqueue an error event.
*/
void addError(Object error, [StackTrace stackTrace]) {
- error = _nonNullError(error);
if (!_mayAddEvent) throw _badEventState();
+ error = _nonNullError(error);
AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
if (replacement != null) {
error = _nonNullError(replacement.error);
@@ -435,7 +604,8 @@ abstract class _StreamController<T> implements StreamController<T>,
* You are allowed to close the controller more than once, but only the first
* call has any effect.
*
- * After closing, no further events may be added using [add] or [addError].
+ * After closing, no further events may be added using [add], [addError]
+ * or [addStream].
*
* The returned future is completed when the done event has been delivered.
*/
@@ -479,7 +649,8 @@ abstract class _StreamController<T> implements StreamController<T>,
void _close() {
// End of addStream stream.
assert(_isAddingStream);
- _StreamControllerAddStreamState addState = _varData;
+ _StreamControllerAddStreamState<T> addState =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
_varData = addState.varData;
_state &= ~_STATE_ADDSTREAM;
addState.complete();
@@ -495,14 +666,15 @@ abstract class _StreamController<T> implements StreamController<T>,
if (!_isInitialState) {
throw new StateError("Stream has already been listened to.");
}
- _ControllerSubscription subscription =
- new _ControllerSubscription(this, onData, onError, onDone,
- cancelOnError);
+ _ControllerSubscription<T> subscription =
+ new _ControllerSubscription<T>(this, onData, onError, onDone,
+ cancelOnError);
- _PendingEvents pendingEvents = _pendingEvents;
+ _PendingEvents<T> pendingEvents = _pendingEvents;
_state |= _STATE_SUBSCRIBED;
if (_isAddingStream) {
- _StreamControllerAddStreamState addState = _varData;
+ _StreamControllerAddStreamState<T> addState =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
addState.varData = subscription;
addState.resume();
} else {
@@ -510,7 +682,7 @@ abstract class _StreamController<T> implements StreamController<T>,
}
subscription._setPendingEvents(pendingEvents);
subscription._guardCallback(() {
- _runGuarded(_onListen);
+ _runGuarded(onListen);
});
return subscription;
@@ -518,8 +690,8 @@ abstract class _StreamController<T> implements StreamController<T>,
Future _recordCancel(StreamSubscription<T> subscription) {
// When we cancel, we first cancel any stream being added,
- // Then we call _onCancel, and finally the _doneFuture is completed.
- // If either of addStream's cancel or _onCancel returns a future,
+ // Then we call `onCancel`, and finally the _doneFuture is completed.
+ // If either of addStream's cancel or `onCancel` returns a future,
// we wait for it before continuing.
// Any error during this process ends up in the returned future.
// If more errors happen, we act as if it happens inside nested try/finallys
@@ -527,19 +699,20 @@ abstract class _StreamController<T> implements StreamController<T>,
// returned future.
Future result;
if (_isAddingStream) {
- _StreamControllerAddStreamState addState = _varData;
+ _StreamControllerAddStreamState<T> addState =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
result = addState.cancel();
}
_varData = null;
_state =
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
- if (_onCancel != null) {
+ if (onCancel != null) {
if (result == null) {
// Only introduce a future if one is needed.
// If _onCancel returns null, no future is needed.
try {
- result = _onCancel();
+ result = onCancel();
} catch (e, s) {
// Return the error in the returned future.
// Complete it asynchronously, so there is time for a listener
@@ -548,7 +721,7 @@ abstract class _StreamController<T> implements StreamController<T>,
}
} else {
// Simpler case when we already know that we will return a future.
- result = result.whenComplete(_onCancel);
+ result = result.whenComplete(onCancel);
}
}
@@ -569,23 +742,28 @@ abstract class _StreamController<T> implements StreamController<T>,
void _recordPause(StreamSubscription<T> subscription) {
if (_isAddingStream) {
- _StreamControllerAddStreamState addState = _varData;
+ _StreamControllerAddStreamState<T> addState =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
addState.pause();
}
- _runGuarded(_onPause);
+ _runGuarded(onPause);
}
void _recordResume(StreamSubscription<T> subscription) {
if (_isAddingStream) {
- _StreamControllerAddStreamState addState = _varData;
+ _StreamControllerAddStreamState<T> addState =
+ _varData as Object /*=_StreamControllerAddStreamState<T>*/;
addState.resume();
}
- _runGuarded(_onResume);
+ _runGuarded(onResume);
}
}
abstract class _SyncStreamControllerDispatch<T>
- implements _StreamController<T> {
+ implements _StreamController<T>, SynchronousStreamController<T> {
+ int get _state;
+ void set _state(int state);
+
void _sendData(T data) {
_subscription._add(data);
}
@@ -602,7 +780,7 @@ abstract class _SyncStreamControllerDispatch<T>
abstract class _AsyncStreamControllerDispatch<T>
implements _StreamController<T> {
void _sendData(T data) {
- _subscription._addPending(new _DelayedData(data));
+ _subscription._addPending(new _DelayedData<dynamic /*=T*/>(data));
}
void _sendError(Object error, StackTrace stackTrace) {
@@ -617,44 +795,11 @@ abstract class _AsyncStreamControllerDispatch<T>
// TODO(lrn): Use common superclass for callback-controllers when VM supports
// constructors in mixin superclasses.
-class _AsyncStreamController<T> extends _StreamController<T>
- with _AsyncStreamControllerDispatch<T> {
- final _NotificationHandler _onListen;
- final _NotificationHandler _onPause;
- final _NotificationHandler _onResume;
- final _NotificationHandler _onCancel;
-
- _AsyncStreamController(void this._onListen(),
- void this._onPause(),
- void this._onResume(),
- this._onCancel());
-}
-
-class _SyncStreamController<T> extends _StreamController<T>
- with _SyncStreamControllerDispatch<T> {
- final _NotificationHandler _onListen;
- final _NotificationHandler _onPause;
- final _NotificationHandler _onResume;
- final _NotificationHandler _onCancel;
-
- _SyncStreamController(void this._onListen(),
- void this._onPause(),
- void this._onResume(),
- this._onCancel());
-}
-
-abstract class _NoCallbacks {
- _NotificationHandler get _onListen => null;
- _NotificationHandler get _onPause => null;
- _NotificationHandler get _onResume => null;
- _NotificationHandler get _onCancel => null;
-}
-
-class _NoCallbackAsyncStreamController<T> = _StreamController<T>
- with _AsyncStreamControllerDispatch<T>, _NoCallbacks;
+class _AsyncStreamController<T> = _StreamController<T>
+ with _AsyncStreamControllerDispatch<T>;
-class _NoCallbackSyncStreamController<T> = _StreamController<T>
- with _SyncStreamControllerDispatch<T>, _NoCallbacks;
+class _SyncStreamController<T> = _StreamController<T>
+ with _SyncStreamControllerDispatch<T>;
typedef _NotificationHandler();
@@ -772,12 +917,12 @@ class _AddStreamState<T> {
* Return a future if the cancel takes time, otherwise return `null`.
*/
Future cancel() {
- var cancel2 = addSubscription.cancel();
- if (cancel2 == null) {
+ var cancel = addSubscription.cancel();
+ if (cancel == null) {
addStreamFuture._asyncComplete(null);
return null;
}
- return cancel2.whenComplete(() { addStreamFuture._asyncComplete(null); });
+ return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); });
}
void complete() {
@@ -791,7 +936,7 @@ class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
// to store this state object.
var varData;
- _StreamControllerAddStreamState(_StreamController controller,
+ _StreamControllerAddStreamState(_StreamController<T> controller,
this.varData,
Stream source,
bool cancelOnError)
« no previous file with comments | « tool/input_sdk/lib/async/stream.dart ('k') | tool/input_sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698