Index: sdk/lib/async/stream_controller.dart |
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
index d29fae354464dc064baf62cd8626f49040f1ecd9..4b88281fd0fe2f605f4553511af567649d53a4ea 100644 |
--- a/sdk/lib/async/stream_controller.dart |
+++ b/sdk/lib/async/stream_controller.dart |
@@ -91,14 +91,15 @@ abstract class StreamController<T> implements StreamSink<T> { |
* If the stream is canceled before the controller needs new data the |
* [onResume] call might not be executed. |
*/ |
- factory StreamController({void onListen(), |
- void onPause(), |
- void onResume(), |
- onCancel(), |
- bool sync: false}) { |
+ factory StreamController( |
+ {void onListen(), |
+ void onPause(), |
+ void onResume(), |
+ onCancel(), |
+ bool sync: false}) { |
return sync |
- ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
- : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
+ ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
+ : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
} |
/** |
@@ -152,9 +153,8 @@ abstract class StreamController<T> implements StreamSink<T> { |
* If a listener is added again later, after the [onCancel] was called, |
* the [onListen] will be called again. |
*/ |
- factory StreamController.broadcast({void onListen(), |
- void onCancel(), |
- bool sync: false}) { |
+ factory StreamController.broadcast( |
+ {void onListen(), void onCancel(), bool sync: false}) { |
return sync |
? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
: new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
@@ -262,7 +262,6 @@ abstract class StreamController<T> implements StreamSink<T> { |
Future addStream(Stream<T> source, {bool cancelOnError: true}); |
} |
- |
/** |
* A stream controller that delivers its events synchronously. |
* |
@@ -362,10 +361,7 @@ abstract class SynchronousStreamController<T> implements StreamController<T> { |
abstract class _StreamControllerLifecycle<T> { |
StreamSubscription<T> _subscribe( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError); |
+ void onData(T data), Function onError, void onDone(), bool cancelOnError); |
void _recordPause(StreamSubscription<T> subscription) {} |
void _recordResume(StreamSubscription<T> subscription) {} |
Future _recordCancel(StreamSubscription<T> subscription) => null; |
@@ -376,10 +372,12 @@ abstract class _StreamControllerLifecycle<T> { |
* |
* Controls a stream that only supports a single controller. |
*/ |
-abstract class _StreamController<T> implements StreamController<T>, |
- _StreamControllerLifecycle<T>, |
- _EventSink<T>, |
- _EventDispatch<T> { |
+abstract class _StreamController<T> |
+ implements |
+ StreamController<T>, |
+ _StreamControllerLifecycle<T>, |
+ _EventSink<T>, |
+ _EventDispatch<T> { |
// The states are bit-flags. More than one can be set at a time. |
// |
// The "subscription state" goes through the states: |
@@ -450,10 +448,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
ControllerCallback onResume; |
ControllerCancelCallback onCancel; |
- _StreamController(this.onListen, |
- this.onPause, |
- this.onResume, |
- this.onCancel); |
+ _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel); |
// Return a new stream every time. The streams are equal, but not identical. |
Stream<T> get stream => new _ControllerStream<T>(this); |
@@ -479,8 +474,8 @@ abstract class _StreamController<T> implements StreamController<T>, |
bool get isClosed => (_state & _STATE_CLOSED) != 0; |
- bool get isPaused => hasListener ? _subscription._isInputPaused |
- : !_isCanceled; |
+ bool get isPaused => |
+ hasListener ? _subscription._isInputPaused : !_isCanceled; |
bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
@@ -497,11 +492,11 @@ abstract class _StreamController<T> implements StreamController<T>, |
_PendingEvents<T> get _pendingEvents { |
assert(_isInitialState); |
if (!_isAddingStream) { |
- return _varData as Object /*=_PendingEvents<T>*/; |
+ return _varData as Object/*=_PendingEvents<T>*/; |
} |
_StreamControllerAddStreamState<T> state = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
- return state.varData as Object /*=_PendingEvents<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
+ return state.varData as Object/*=_PendingEvents<T>*/; |
} |
// Returns the pending events, and creates the object if necessary. |
@@ -509,12 +504,12 @@ abstract class _StreamController<T> implements StreamController<T>, |
assert(_isInitialState); |
if (!_isAddingStream) { |
if (_varData == null) _varData = new _StreamImplEvents<T>(); |
- return _varData as Object /*=_StreamImplEvents<T>*/; |
+ return _varData as Object/*=_StreamImplEvents<T>*/; |
} |
_StreamControllerAddStreamState<T> state = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
if (state.varData == null) state.varData = new _StreamImplEvents<T>(); |
- return state.varData as Object /*=_StreamImplEvents<T>*/; |
+ return state.varData as Object/*=_StreamImplEvents<T>*/; |
} |
// Get the current subscription. |
@@ -524,10 +519,10 @@ abstract class _StreamController<T> implements StreamController<T>, |
assert(hasListener); |
if (_isAddingStream) { |
_StreamControllerAddStreamState<T> addState = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
- return addState.varData as Object /*=_ControllerSubscription<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
+ return addState.varData as Object/*=_ControllerSubscription<T>*/; |
} |
- return _varData as Object /*=_ControllerSubscription<T>*/; |
+ return _varData as Object/*=_ControllerSubscription<T>*/; |
} |
/** |
@@ -548,10 +543,8 @@ abstract class _StreamController<T> implements StreamController<T>, |
if (!_mayAddEvent) throw _badEventState(); |
if (_isCanceled) return new _Future.immediate(null); |
_StreamControllerAddStreamState<T> addState = |
- new _StreamControllerAddStreamState<T>(this, |
- _varData, |
- source, |
- cancelOnError); |
+ new _StreamControllerAddStreamState<T>( |
+ this, _varData, source, cancelOnError); |
_varData = addState; |
_state |= _STATE_ADDSTREAM; |
return addState.addStreamFuture; |
@@ -650,7 +643,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
// End of addStream stream. |
assert(_isAddingStream); |
_StreamControllerAddStreamState<T> addState = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
_varData = addState.varData; |
_state &= ~_STATE_ADDSTREAM; |
addState.complete(); |
@@ -658,23 +651,19 @@ abstract class _StreamController<T> implements StreamController<T>, |
// _StreamControllerLifeCycle interface |
- StreamSubscription<T> _subscribe( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
+ StreamSubscription<T> _subscribe(void onData(T data), Function onError, |
+ void onDone(), bool cancelOnError) { |
if (!_isInitialState) { |
throw new StateError("Stream has already been listened to."); |
} |
- _ControllerSubscription<T> subscription = |
- new _ControllerSubscription<T>(this, onData, onError, onDone, |
- cancelOnError); |
+ _ControllerSubscription<T> subscription = new _ControllerSubscription<T>( |
+ this, onData, onError, onDone, cancelOnError); |
_PendingEvents<T> pendingEvents = _pendingEvents; |
_state |= _STATE_SUBSCRIBED; |
if (_isAddingStream) { |
_StreamControllerAddStreamState<T> addState = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
addState.varData = subscription; |
addState.resume(); |
} else { |
@@ -700,7 +689,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
Future result; |
if (_isAddingStream) { |
_StreamControllerAddStreamState<T> addState = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
result = addState.cancel(); |
} |
_varData = null; |
@@ -743,7 +732,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
void _recordPause(StreamSubscription<T> subscription) { |
if (_isAddingStream) { |
_StreamControllerAddStreamState<T> addState = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
addState.pause(); |
} |
_runGuarded(onPause); |
@@ -752,7 +741,7 @@ abstract class _StreamController<T> implements StreamController<T>, |
void _recordResume(StreamSubscription<T> subscription) { |
if (_isAddingStream) { |
_StreamControllerAddStreamState<T> addState = |
- _varData as Object /*=_StreamControllerAddStreamState<T>*/; |
+ _varData as Object/*=_StreamControllerAddStreamState<T>*/; |
addState.resume(); |
} |
_runGuarded(onResume); |
@@ -796,10 +785,10 @@ abstract class _AsyncStreamControllerDispatch<T> |
// constructors in mixin superclasses. |
class _AsyncStreamController<T> = _StreamController<T> |
- with _AsyncStreamControllerDispatch<T>; |
+ with _AsyncStreamControllerDispatch<T>; |
class _SyncStreamController<T> = _StreamController<T> |
- with _SyncStreamControllerDispatch<T>; |
+ with _SyncStreamControllerDispatch<T>; |
typedef _NotificationHandler(); |
@@ -819,12 +808,9 @@ class _ControllerStream<T> extends _StreamImpl<T> { |
_ControllerStream(this._controller); |
- StreamSubscription<T> _createSubscription( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) => |
- _controller._subscribe(onData, onError, onDone, cancelOnError); |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) => |
+ _controller._subscribe(onData, onError, onDone, cancelOnError); |
// Override == and hashCode so that new streams returned by the same |
// controller are considered equal. The controller returns a new stream |
@@ -832,7 +818,7 @@ class _ControllerStream<T> extends _StreamImpl<T> { |
int get hashCode => _controller.hashCode ^ 0x35323532; |
- bool operator==(Object other) { |
+ bool operator ==(Object other) { |
if (identical(this, other)) return true; |
if (other is! _ControllerStream) return false; |
_ControllerStream otherStream = other; |
@@ -844,7 +830,7 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
final _StreamControllerLifecycle<T> _controller; |
_ControllerSubscription(this._controller, void onData(T data), |
- Function onError, void onDone(), bool cancelOnError) |
+ Function onError, void onDone(), bool cancelOnError) |
: super(onData, onError, onDone, cancelOnError); |
Future _onCancel() { |
@@ -860,15 +846,18 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
} |
} |
- |
/** A class that exposes only the [StreamSink] interface of an object. */ |
class _StreamSinkWrapper<T> implements StreamSink<T> { |
final StreamController _target; |
_StreamSinkWrapper(this._target); |
- void add(T data) { _target.add(data); } |
+ void add(T data) { |
+ _target.add(data); |
+ } |
+ |
void addError(Object error, [StackTrace stackTrace]) { |
_target.addError(error, stackTrace); |
} |
+ |
Future close() => _target.close(); |
Future addStream(Stream<T> source, {bool cancelOnError: true}) => |
_target.addStream(source, cancelOnError: cancelOnError); |
@@ -888,14 +877,13 @@ class _AddStreamState<T> { |
_AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) |
: addStreamFuture = new _Future(), |
addSubscription = source.listen(controller._add, |
- onError: cancelOnError |
- ? makeErrorHandler(controller) |
- : controller._addError, |
- onDone: controller._close, |
- cancelOnError: cancelOnError); |
- |
- static makeErrorHandler(_EventSink controller) => |
- (e, StackTrace s) { |
+ onError: cancelOnError |
+ ? makeErrorHandler(controller) |
+ : controller._addError, |
+ onDone: controller._close, |
+ cancelOnError: cancelOnError); |
+ |
+ static makeErrorHandler(_EventSink controller) => (e, StackTrace s) { |
controller._addError(e, s); |
controller._close(); |
}; |
@@ -922,7 +910,9 @@ class _AddStreamState<T> { |
addStreamFuture._asyncComplete(null); |
return null; |
} |
- return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); }); |
+ return cancel.whenComplete(() { |
+ addStreamFuture._asyncComplete(null); |
+ }); |
} |
void complete() { |
@@ -936,10 +926,8 @@ class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
// to store this state object. |
var varData; |
- _StreamControllerAddStreamState(_StreamController<T> controller, |
- this.varData, |
- Stream source, |
- bool cancelOnError) |
+ _StreamControllerAddStreamState(_StreamController<T> controller, this.varData, |
+ Stream source, bool cancelOnError) |
: super(controller, source, cancelOnError) { |
if (controller.isPaused) { |
addSubscription.pause(); |