Index: sdk/lib/async/broadcast_stream_controller.dart |
diff --git a/sdk/lib/async/broadcast_stream_controller.dart b/sdk/lib/async/broadcast_stream_controller.dart |
index 22ed3e9bb4aec90f896757e91692326cfa8b2543..075386102069ffcbbd4ffb21c3e920233de4f82b 100644 |
--- a/sdk/lib/async/broadcast_stream_controller.dart |
+++ b/sdk/lib/async/broadcast_stream_controller.dart |
@@ -18,22 +18,18 @@ class _BroadcastSubscription<T> extends _ControllerSubscription<T> { |
// TODO(lrn): Use the _state field on _ControllerSubscription to |
// also store this state. Requires that the subscription implementation |
// does not assume that it's use of the state integer is the only use. |
- int _eventState = 0; // Initialized to help dart2js type inference. |
+ int _eventState = 0; // Initialized to help dart2js type inference. |
_BroadcastSubscription<T> _next; |
_BroadcastSubscription<T> _previous; |
_BroadcastSubscription(_StreamControllerLifecycle<T> controller, |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) |
+ void onData(T data), Function onError, void onDone(), bool cancelOnError) |
: super(controller, onData, onError, onDone, cancelOnError) { |
_next = _previous = this; |
} |
- bool _expectsEvent(int eventId) => |
- (_eventState & _STATE_EVENT_ID) == eventId; |
+ bool _expectsEvent(int eventId) => (_eventState & _STATE_EVENT_ID) == eventId; |
void _toggleEventId() { |
_eventState ^= _STATE_EVENT_ID; |
@@ -51,20 +47,21 @@ class _BroadcastSubscription<T> extends _ControllerSubscription<T> { |
// The controller._recordPause doesn't do anything for a broadcast controller, |
// so we don't bother calling it. |
- void _onPause() { } |
+ void _onPause() {} |
// The controller._recordResume doesn't do anything for a broadcast |
// controller, so we don't bother calling it. |
- void _onResume() { } |
+ void _onResume() {} |
// _onCancel is inherited. |
} |
abstract class _BroadcastStreamController<T> |
- implements StreamController<T>, |
- _StreamControllerLifecycle<T>, |
- _EventSink<T>, |
- _EventDispatch<T> { |
+ implements |
+ StreamController<T>, |
+ _StreamControllerLifecycle<T>, |
+ _EventSink<T>, |
+ _EventDispatch<T> { |
static const int _STATE_INITIAL = 0; |
static const int _STATE_EVENT_ID = 1; |
static const int _STATE_FIRING = 2; |
@@ -117,7 +114,7 @@ abstract class _BroadcastStreamController<T> |
"Broadcast stream controllers do not support pause callbacks"); |
} |
- void set onResume(void onResumeHandler()) { |
+ void set onResume(void onResumeHandler()) { |
throw new UnsupportedError( |
"Broadcast stream controllers do not support pause callbacks"); |
} |
@@ -206,18 +203,14 @@ abstract class _BroadcastStreamController<T> |
// _StreamControllerLifecycle interface. |
- StreamSubscription<T> _subscribe( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
+ StreamSubscription<T> _subscribe(void onData(T data), Function onError, |
+ void onDone(), bool cancelOnError) { |
if (isClosed) { |
if (onDone == null) onDone = _nullDoneHandler; |
return new _DoneStreamSubscription<T>(onDone); |
} |
- StreamSubscription<T> subscription = |
- new _BroadcastSubscription<T>(this, onData, onError, onDone, |
- cancelOnError); |
+ StreamSubscription<T> subscription = new _BroadcastSubscription<T>( |
+ this, onData, onError, onDone, cancelOnError); |
_addListener(subscription); |
if (identical(_firstSubscription, _lastSubscription)) { |
// Only one listener, so it must be the first listener. |
@@ -362,7 +355,7 @@ abstract class _BroadcastStreamController<T> |
} |
class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> |
- implements SynchronousStreamController<T> { |
+ implements SynchronousStreamController<T> { |
_SyncBroadcastStreamController(void onListen(), void onCancel()) |
: super(onListen, onCancel); |
@@ -423,16 +416,16 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
void _sendData(T data) { |
for (_BroadcastSubscription<T> subscription = _firstSubscription; |
- subscription != null; |
- subscription = subscription._next) { |
+ subscription != null; |
+ subscription = subscription._next) { |
subscription._addPending(new _DelayedData<T>(data)); |
} |
} |
void _sendError(Object error, StackTrace stackTrace) { |
for (_BroadcastSubscription<T> subscription = _firstSubscription; |
- subscription != null; |
- subscription = subscription._next) { |
+ subscription != null; |
+ subscription = subscription._next) { |
subscription._addPending(new _DelayedError(error, stackTrace)); |
} |
} |
@@ -440,8 +433,8 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
void _sendDone() { |
if (!_isEmpty) { |
for (_BroadcastSubscription<T> subscription = _firstSubscription; |
- subscription != null; |
- subscription = subscription._next) { |
+ subscription != null; |
+ subscription = subscription._next) { |
subscription._addPending(const _DelayedDone()); |
} |
} else { |
@@ -463,15 +456,14 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
* an "asBroadcastStream" stream are always initiated by events |
* on another stream, and it is fine to forward them synchronously. |
*/ |
-class _AsBroadcastStreamController<T> |
- extends _SyncBroadcastStreamController<T> |
+class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T> |
implements _EventDispatch<T> { |
_StreamImplEvents<T> _pending; |
_AsBroadcastStreamController(void onListen(), void onCancel()) |
: super(onListen, onCancel); |
- bool get _hasPending => _pending != null && ! _pending.isEmpty; |
+ bool get _hasPending => _pending != null && !_pending.isEmpty; |
void _addPendingEvent(_DelayedEvent event) { |
if (_pending == null) { |
@@ -534,11 +526,19 @@ class _DoneSubscription<T> implements StreamSubscription<T> { |
if (resumeSignal != null) resumeSignal.then(_resume); |
_pauseCount++; |
} |
- void resume() { _resume(null); } |
+ |
+ void resume() { |
+ _resume(null); |
+ } |
+ |
void _resume(_) { |
if (_pauseCount > 0) _pauseCount--; |
} |
- Future cancel() { return new _Future.immediate(null); } |
+ |
+ Future cancel() { |
+ return new _Future.immediate(null); |
+ } |
+ |
bool get isPaused => _pauseCount > 0; |
Future<E> asFuture<E>([E value]) => new _Future<E>(); |
} |