| Index: sdk/lib/async/broadcast_stream_controller.dart
|
| diff --git a/sdk/lib/async/broadcast_stream_controller.dart b/sdk/lib/async/broadcast_stream_controller.dart
|
| index 22ed3e9bb4aec90f896757e91692326cfa8b2543..075386102069ffcbbd4ffb21c3e920233de4f82b 100644
|
| --- a/sdk/lib/async/broadcast_stream_controller.dart
|
| +++ b/sdk/lib/async/broadcast_stream_controller.dart
|
| @@ -18,22 +18,18 @@ class _BroadcastSubscription<T> extends _ControllerSubscription<T> {
|
| // TODO(lrn): Use the _state field on _ControllerSubscription to
|
| // also store this state. Requires that the subscription implementation
|
| // does not assume that it's use of the state integer is the only use.
|
| - int _eventState = 0; // Initialized to help dart2js type inference.
|
| + int _eventState = 0; // Initialized to help dart2js type inference.
|
|
|
| _BroadcastSubscription<T> _next;
|
| _BroadcastSubscription<T> _previous;
|
|
|
| _BroadcastSubscription(_StreamControllerLifecycle<T> controller,
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError)
|
| + void onData(T data), Function onError, void onDone(), bool cancelOnError)
|
| : super(controller, onData, onError, onDone, cancelOnError) {
|
| _next = _previous = this;
|
| }
|
|
|
| - bool _expectsEvent(int eventId) =>
|
| - (_eventState & _STATE_EVENT_ID) == eventId;
|
| + bool _expectsEvent(int eventId) => (_eventState & _STATE_EVENT_ID) == eventId;
|
|
|
| void _toggleEventId() {
|
| _eventState ^= _STATE_EVENT_ID;
|
| @@ -51,20 +47,21 @@ class _BroadcastSubscription<T> extends _ControllerSubscription<T> {
|
|
|
| // The controller._recordPause doesn't do anything for a broadcast controller,
|
| // so we don't bother calling it.
|
| - void _onPause() { }
|
| + void _onPause() {}
|
|
|
| // The controller._recordResume doesn't do anything for a broadcast
|
| // controller, so we don't bother calling it.
|
| - void _onResume() { }
|
| + void _onResume() {}
|
|
|
| // _onCancel is inherited.
|
| }
|
|
|
| abstract class _BroadcastStreamController<T>
|
| - implements StreamController<T>,
|
| - _StreamControllerLifecycle<T>,
|
| - _EventSink<T>,
|
| - _EventDispatch<T> {
|
| + implements
|
| + StreamController<T>,
|
| + _StreamControllerLifecycle<T>,
|
| + _EventSink<T>,
|
| + _EventDispatch<T> {
|
| static const int _STATE_INITIAL = 0;
|
| static const int _STATE_EVENT_ID = 1;
|
| static const int _STATE_FIRING = 2;
|
| @@ -117,7 +114,7 @@ abstract class _BroadcastStreamController<T>
|
| "Broadcast stream controllers do not support pause callbacks");
|
| }
|
|
|
| - void set onResume(void onResumeHandler()) {
|
| + void set onResume(void onResumeHandler()) {
|
| throw new UnsupportedError(
|
| "Broadcast stream controllers do not support pause callbacks");
|
| }
|
| @@ -206,18 +203,14 @@ abstract class _BroadcastStreamController<T>
|
|
|
| // _StreamControllerLifecycle interface.
|
|
|
| - StreamSubscription<T> _subscribe(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| + StreamSubscription<T> _subscribe(void onData(T data), Function onError,
|
| + void onDone(), bool cancelOnError) {
|
| if (isClosed) {
|
| if (onDone == null) onDone = _nullDoneHandler;
|
| return new _DoneStreamSubscription<T>(onDone);
|
| }
|
| - StreamSubscription<T> subscription =
|
| - new _BroadcastSubscription<T>(this, onData, onError, onDone,
|
| - cancelOnError);
|
| + StreamSubscription<T> subscription = new _BroadcastSubscription<T>(
|
| + this, onData, onError, onDone, cancelOnError);
|
| _addListener(subscription);
|
| if (identical(_firstSubscription, _lastSubscription)) {
|
| // Only one listener, so it must be the first listener.
|
| @@ -362,7 +355,7 @@ abstract class _BroadcastStreamController<T>
|
| }
|
|
|
| class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
|
| - implements SynchronousStreamController<T> {
|
| + implements SynchronousStreamController<T> {
|
| _SyncBroadcastStreamController(void onListen(), void onCancel())
|
| : super(onListen, onCancel);
|
|
|
| @@ -423,16 +416,16 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
|
|
| void _sendData(T data) {
|
| for (_BroadcastSubscription<T> subscription = _firstSubscription;
|
| - subscription != null;
|
| - subscription = subscription._next) {
|
| + subscription != null;
|
| + subscription = subscription._next) {
|
| subscription._addPending(new _DelayedData<T>(data));
|
| }
|
| }
|
|
|
| void _sendError(Object error, StackTrace stackTrace) {
|
| for (_BroadcastSubscription<T> subscription = _firstSubscription;
|
| - subscription != null;
|
| - subscription = subscription._next) {
|
| + subscription != null;
|
| + subscription = subscription._next) {
|
| subscription._addPending(new _DelayedError(error, stackTrace));
|
| }
|
| }
|
| @@ -440,8 +433,8 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| void _sendDone() {
|
| if (!_isEmpty) {
|
| for (_BroadcastSubscription<T> subscription = _firstSubscription;
|
| - subscription != null;
|
| - subscription = subscription._next) {
|
| + subscription != null;
|
| + subscription = subscription._next) {
|
| subscription._addPending(const _DelayedDone());
|
| }
|
| } else {
|
| @@ -463,15 +456,14 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| * an "asBroadcastStream" stream are always initiated by events
|
| * on another stream, and it is fine to forward them synchronously.
|
| */
|
| -class _AsBroadcastStreamController<T>
|
| - extends _SyncBroadcastStreamController<T>
|
| +class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
|
| implements _EventDispatch<T> {
|
| _StreamImplEvents<T> _pending;
|
|
|
| _AsBroadcastStreamController(void onListen(), void onCancel())
|
| : super(onListen, onCancel);
|
|
|
| - bool get _hasPending => _pending != null && ! _pending.isEmpty;
|
| + bool get _hasPending => _pending != null && !_pending.isEmpty;
|
|
|
| void _addPendingEvent(_DelayedEvent event) {
|
| if (_pending == null) {
|
| @@ -534,11 +526,19 @@ class _DoneSubscription<T> implements StreamSubscription<T> {
|
| if (resumeSignal != null) resumeSignal.then(_resume);
|
| _pauseCount++;
|
| }
|
| - void resume() { _resume(null); }
|
| +
|
| + void resume() {
|
| + _resume(null);
|
| + }
|
| +
|
| void _resume(_) {
|
| if (_pauseCount > 0) _pauseCount--;
|
| }
|
| - Future cancel() { return new _Future.immediate(null); }
|
| +
|
| + Future cancel() {
|
| + return new _Future.immediate(null);
|
| + }
|
| +
|
| bool get isPaused => _pauseCount > 0;
|
| Future<E> asFuture<E>([E value]) => new _Future<E>();
|
| }
|
|
|