| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index d29fae354464dc064baf62cd8626f49040f1ecd9..4b88281fd0fe2f605f4553511af567649d53a4ea 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -91,14 +91,15 @@ abstract class StreamController<T> implements StreamSink<T> {
|
| * If the stream is canceled before the controller needs new data the
|
| * [onResume] call might not be executed.
|
| */
|
| - factory StreamController({void onListen(),
|
| - void onPause(),
|
| - void onResume(),
|
| - onCancel(),
|
| - bool sync: false}) {
|
| + factory StreamController(
|
| + {void onListen(),
|
| + void onPause(),
|
| + void onResume(),
|
| + onCancel(),
|
| + bool sync: false}) {
|
| return sync
|
| - ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
|
| - : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
|
| + ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
|
| + : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
|
| }
|
|
|
| /**
|
| @@ -152,9 +153,8 @@ abstract class StreamController<T> implements StreamSink<T> {
|
| * If a listener is added again later, after the [onCancel] was called,
|
| * the [onListen] will be called again.
|
| */
|
| - factory StreamController.broadcast({void onListen(),
|
| - void onCancel(),
|
| - bool sync: false}) {
|
| + factory StreamController.broadcast(
|
| + {void onListen(), void onCancel(), bool sync: false}) {
|
| return sync
|
| ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
|
| : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
|
| @@ -262,7 +262,6 @@ abstract class StreamController<T> implements StreamSink<T> {
|
| Future addStream(Stream<T> source, {bool cancelOnError: true});
|
| }
|
|
|
| -
|
| /**
|
| * A stream controller that delivers its events synchronously.
|
| *
|
| @@ -362,10 +361,7 @@ abstract class SynchronousStreamController<T> implements StreamController<T> {
|
|
|
| abstract class _StreamControllerLifecycle<T> {
|
| StreamSubscription<T> _subscribe(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError);
|
| + void onData(T data), Function onError, void onDone(), bool cancelOnError);
|
| void _recordPause(StreamSubscription<T> subscription) {}
|
| void _recordResume(StreamSubscription<T> subscription) {}
|
| Future _recordCancel(StreamSubscription<T> subscription) => null;
|
| @@ -376,10 +372,12 @@ abstract class _StreamControllerLifecycle<T> {
|
| *
|
| * Controls a stream that only supports a single controller.
|
| */
|
| -abstract class _StreamController<T> implements StreamController<T>,
|
| - _StreamControllerLifecycle<T>,
|
| - _EventSink<T>,
|
| - _EventDispatch<T> {
|
| +abstract class _StreamController<T>
|
| + implements
|
| + StreamController<T>,
|
| + _StreamControllerLifecycle<T>,
|
| + _EventSink<T>,
|
| + _EventDispatch<T> {
|
| // The states are bit-flags. More than one can be set at a time.
|
| //
|
| // The "subscription state" goes through the states:
|
| @@ -450,10 +448,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| ControllerCallback onResume;
|
| ControllerCancelCallback onCancel;
|
|
|
| - _StreamController(this.onListen,
|
| - this.onPause,
|
| - this.onResume,
|
| - this.onCancel);
|
| + _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel);
|
|
|
| // Return a new stream every time. The streams are equal, but not identical.
|
| Stream<T> get stream => new _ControllerStream<T>(this);
|
| @@ -479,8 +474,8 @@ abstract class _StreamController<T> implements StreamController<T>,
|
|
|
| bool get isClosed => (_state & _STATE_CLOSED) != 0;
|
|
|
| - bool get isPaused => hasListener ? _subscription._isInputPaused
|
| - : !_isCanceled;
|
| + bool get isPaused =>
|
| + hasListener ? _subscription._isInputPaused : !_isCanceled;
|
|
|
| bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
|
|
|
| @@ -497,11 +492,11 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| _PendingEvents<T> get _pendingEvents {
|
| assert(_isInitialState);
|
| if (!_isAddingStream) {
|
| - return _varData as Object /*=_PendingEvents<T>*/;
|
| + return _varData as Object/*=_PendingEvents<T>*/;
|
| }
|
| _StreamControllerAddStreamState<T> state =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| - return state.varData as Object /*=_PendingEvents<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| + return state.varData as Object/*=_PendingEvents<T>*/;
|
| }
|
|
|
| // Returns the pending events, and creates the object if necessary.
|
| @@ -509,12 +504,12 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| assert(_isInitialState);
|
| if (!_isAddingStream) {
|
| if (_varData == null) _varData = new _StreamImplEvents<T>();
|
| - return _varData as Object /*=_StreamImplEvents<T>*/;
|
| + return _varData as Object/*=_StreamImplEvents<T>*/;
|
| }
|
| _StreamControllerAddStreamState<T> state =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| if (state.varData == null) state.varData = new _StreamImplEvents<T>();
|
| - return state.varData as Object /*=_StreamImplEvents<T>*/;
|
| + return state.varData as Object/*=_StreamImplEvents<T>*/;
|
| }
|
|
|
| // Get the current subscription.
|
| @@ -524,10 +519,10 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| assert(hasListener);
|
| if (_isAddingStream) {
|
| _StreamControllerAddStreamState<T> addState =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| - return addState.varData as Object /*=_ControllerSubscription<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| + return addState.varData as Object/*=_ControllerSubscription<T>*/;
|
| }
|
| - return _varData as Object /*=_ControllerSubscription<T>*/;
|
| + return _varData as Object/*=_ControllerSubscription<T>*/;
|
| }
|
|
|
| /**
|
| @@ -548,10 +543,8 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| if (!_mayAddEvent) throw _badEventState();
|
| if (_isCanceled) return new _Future.immediate(null);
|
| _StreamControllerAddStreamState<T> addState =
|
| - new _StreamControllerAddStreamState<T>(this,
|
| - _varData,
|
| - source,
|
| - cancelOnError);
|
| + new _StreamControllerAddStreamState<T>(
|
| + this, _varData, source, cancelOnError);
|
| _varData = addState;
|
| _state |= _STATE_ADDSTREAM;
|
| return addState.addStreamFuture;
|
| @@ -650,7 +643,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| // End of addStream stream.
|
| assert(_isAddingStream);
|
| _StreamControllerAddStreamState<T> addState =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| _varData = addState.varData;
|
| _state &= ~_STATE_ADDSTREAM;
|
| addState.complete();
|
| @@ -658,23 +651,19 @@ abstract class _StreamController<T> implements StreamController<T>,
|
|
|
| // _StreamControllerLifeCycle interface
|
|
|
| - StreamSubscription<T> _subscribe(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| + StreamSubscription<T> _subscribe(void onData(T data), Function onError,
|
| + void onDone(), bool cancelOnError) {
|
| if (!_isInitialState) {
|
| throw new StateError("Stream has already been listened to.");
|
| }
|
| - _ControllerSubscription<T> subscription =
|
| - new _ControllerSubscription<T>(this, onData, onError, onDone,
|
| - cancelOnError);
|
| + _ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
|
| + this, onData, onError, onDone, cancelOnError);
|
|
|
| _PendingEvents<T> pendingEvents = _pendingEvents;
|
| _state |= _STATE_SUBSCRIBED;
|
| if (_isAddingStream) {
|
| _StreamControllerAddStreamState<T> addState =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| addState.varData = subscription;
|
| addState.resume();
|
| } else {
|
| @@ -700,7 +689,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| Future result;
|
| if (_isAddingStream) {
|
| _StreamControllerAddStreamState<T> addState =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| result = addState.cancel();
|
| }
|
| _varData = null;
|
| @@ -743,7 +732,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| void _recordPause(StreamSubscription<T> subscription) {
|
| if (_isAddingStream) {
|
| _StreamControllerAddStreamState<T> addState =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| addState.pause();
|
| }
|
| _runGuarded(onPause);
|
| @@ -752,7 +741,7 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| void _recordResume(StreamSubscription<T> subscription) {
|
| if (_isAddingStream) {
|
| _StreamControllerAddStreamState<T> addState =
|
| - _varData as Object /*=_StreamControllerAddStreamState<T>*/;
|
| + _varData as Object/*=_StreamControllerAddStreamState<T>*/;
|
| addState.resume();
|
| }
|
| _runGuarded(onResume);
|
| @@ -796,10 +785,10 @@ abstract class _AsyncStreamControllerDispatch<T>
|
| // constructors in mixin superclasses.
|
|
|
| class _AsyncStreamController<T> = _StreamController<T>
|
| - with _AsyncStreamControllerDispatch<T>;
|
| + with _AsyncStreamControllerDispatch<T>;
|
|
|
| class _SyncStreamController<T> = _StreamController<T>
|
| - with _SyncStreamControllerDispatch<T>;
|
| + with _SyncStreamControllerDispatch<T>;
|
|
|
| typedef _NotificationHandler();
|
|
|
| @@ -819,12 +808,9 @@ class _ControllerStream<T> extends _StreamImpl<T> {
|
|
|
| _ControllerStream(this._controller);
|
|
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) =>
|
| - _controller._subscribe(onData, onError, onDone, cancelOnError);
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) =>
|
| + _controller._subscribe(onData, onError, onDone, cancelOnError);
|
|
|
| // Override == and hashCode so that new streams returned by the same
|
| // controller are considered equal. The controller returns a new stream
|
| @@ -832,7 +818,7 @@ class _ControllerStream<T> extends _StreamImpl<T> {
|
|
|
| int get hashCode => _controller.hashCode ^ 0x35323532;
|
|
|
| - bool operator==(Object other) {
|
| + bool operator ==(Object other) {
|
| if (identical(this, other)) return true;
|
| if (other is! _ControllerStream) return false;
|
| _ControllerStream otherStream = other;
|
| @@ -844,7 +830,7 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
| final _StreamControllerLifecycle<T> _controller;
|
|
|
| _ControllerSubscription(this._controller, void onData(T data),
|
| - Function onError, void onDone(), bool cancelOnError)
|
| + Function onError, void onDone(), bool cancelOnError)
|
| : super(onData, onError, onDone, cancelOnError);
|
|
|
| Future _onCancel() {
|
| @@ -860,15 +846,18 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
| }
|
| }
|
|
|
| -
|
| /** A class that exposes only the [StreamSink] interface of an object. */
|
| class _StreamSinkWrapper<T> implements StreamSink<T> {
|
| final StreamController _target;
|
| _StreamSinkWrapper(this._target);
|
| - void add(T data) { _target.add(data); }
|
| + void add(T data) {
|
| + _target.add(data);
|
| + }
|
| +
|
| void addError(Object error, [StackTrace stackTrace]) {
|
| _target.addError(error, stackTrace);
|
| }
|
| +
|
| Future close() => _target.close();
|
| Future addStream(Stream<T> source, {bool cancelOnError: true}) =>
|
| _target.addStream(source, cancelOnError: cancelOnError);
|
| @@ -888,14 +877,13 @@ class _AddStreamState<T> {
|
| _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError)
|
| : addStreamFuture = new _Future(),
|
| addSubscription = source.listen(controller._add,
|
| - onError: cancelOnError
|
| - ? makeErrorHandler(controller)
|
| - : controller._addError,
|
| - onDone: controller._close,
|
| - cancelOnError: cancelOnError);
|
| -
|
| - static makeErrorHandler(_EventSink controller) =>
|
| - (e, StackTrace s) {
|
| + onError: cancelOnError
|
| + ? makeErrorHandler(controller)
|
| + : controller._addError,
|
| + onDone: controller._close,
|
| + cancelOnError: cancelOnError);
|
| +
|
| + static makeErrorHandler(_EventSink controller) => (e, StackTrace s) {
|
| controller._addError(e, s);
|
| controller._close();
|
| };
|
| @@ -922,7 +910,9 @@ class _AddStreamState<T> {
|
| addStreamFuture._asyncComplete(null);
|
| return null;
|
| }
|
| - return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); });
|
| + return cancel.whenComplete(() {
|
| + addStreamFuture._asyncComplete(null);
|
| + });
|
| }
|
|
|
| void complete() {
|
| @@ -936,10 +926,8 @@ class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
|
| // to store this state object.
|
| var varData;
|
|
|
| - _StreamControllerAddStreamState(_StreamController<T> controller,
|
| - this.varData,
|
| - Stream source,
|
| - bool cancelOnError)
|
| + _StreamControllerAddStreamState(_StreamController<T> controller, this.varData,
|
| + Stream source, bool cancelOnError)
|
| : super(controller, source, cancelOnError) {
|
| if (controller.isPaused) {
|
| addSubscription.pause();
|
|
|