| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index 29cfbd1c3ce5cc1055f0f5240734352e7be9b7d3..164d3b242802874e9d1d4e2cbffaeaf6438a3d68 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -49,9 +49,8 @@ abstract class _EventDispatch<T> {
|
| * but if it happens anyway, the subscription will enqueue the events just as
|
| * when new events arrive while still firing an old event.
|
| */
|
| -class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| - _EventSink<T>,
|
| - _EventDispatch<T> {
|
| +class _BufferingStreamSubscription<T>
|
| + implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
|
| /** The `cancelOnError` flag from the `listen` call. */
|
| static const int _STATE_CANCEL_ON_ERROR = 1;
|
| /**
|
| @@ -107,10 +106,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| */
|
| _PendingEvents<T> _pending;
|
|
|
| - _BufferingStreamSubscription(void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError)
|
| + _BufferingStreamSubscription(
|
| + void onData(T data), Function onError, void onDone(), bool cancelOnError)
|
| : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
|
| this.onData(onData);
|
| this.onError(onError);
|
| @@ -197,7 +194,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| _Future<E> result = new _Future<E>();
|
|
|
| // Overwrite the onDone and onError handlers.
|
| - _onDone = () { result._complete(futureValue); };
|
| + _onDone = () {
|
| + result._complete(futureValue);
|
| + };
|
| _onError = (error, stackTrace) {
|
| Future cancelFuture = cancel();
|
| if (!identical(cancelFuture, Future._nullFuture)) {
|
| @@ -264,7 +263,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| void _addError(Object error, StackTrace stackTrace) {
|
| if (_isCanceled) return;
|
| if (_canFire) {
|
| - _sendError(error, stackTrace); // Reports cancel after sending.
|
| + _sendError(error, stackTrace); // Reports cancel after sending.
|
| } else {
|
| _addPending(new _DelayedError(error, stackTrace));
|
| }
|
| @@ -346,11 +345,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| _state |= _STATE_IN_CALLBACK;
|
| if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
|
| ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
|
| - as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
|
| + as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
|
| _zone.runBinaryGuarded(errorCallback, error, stackTrace);
|
| } else {
|
| _zone.runUnaryGuarded<dynamic, dynamic>(
|
| - _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
|
| + _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
|
| }
|
| _state &= ~_STATE_IN_CALLBACK;
|
| }
|
| @@ -461,9 +460,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| // Stream interface.
|
|
|
| StreamSubscription<T> listen(void onData(T data),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError }) {
|
| + {Function onError, void onDone(), bool cancelOnError}) {
|
| cancelOnError = identical(true, cancelOnError);
|
| StreamSubscription<T> subscription =
|
| _createSubscription(onData, onError, onDone, cancelOnError);
|
| @@ -473,13 +470,10 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
|
|
| // -------------------------------------------------------------------
|
| /** Create a subscription object. Called by [subcribe]. */
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _BufferingStreamSubscription<T>(onData, onError, onDone,
|
| - cancelOnError);
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) {
|
| + return new _BufferingStreamSubscription<T>(
|
| + onData, onError, onDone, cancelOnError);
|
| }
|
|
|
| /** Hook called when the subscription has been created. */
|
| @@ -500,11 +494,8 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
|
| */
|
| _GeneratedStreamImpl(this._pending);
|
|
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) {
|
| if (_isUsed) throw new StateError("Stream has already been listened to.");
|
| _isUsed = true;
|
| return new _BufferingStreamSubscription<T>(
|
| @@ -512,7 +503,6 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
|
| }
|
| }
|
|
|
| -
|
| /** Pending events object that gets its events from an [Iterable]. */
|
| class _IterablePendingEvents<T> extends _PendingEvents<T> {
|
| // The iterator providing data for data events.
|
| @@ -554,14 +544,12 @@ class _IterablePendingEvents<T> extends _PendingEvents<T> {
|
| }
|
| }
|
|
|
| -
|
| // Internal helpers.
|
|
|
| // Types of the different handlers on a stream. Types used to type fields.
|
| typedef void _DataHandler<T>(T value);
|
| typedef void _DoneHandler();
|
|
|
| -
|
| /** Default data handler, does nothing. */
|
| void _nullDataHandler(var value) {}
|
|
|
| @@ -573,7 +561,6 @@ void _nullErrorHandler(error, [StackTrace stackTrace]) {
|
| /** Default done handler, does nothing. */
|
| void _nullDoneHandler() {}
|
|
|
| -
|
| /** A delayed event on a buffering stream subscription. */
|
| abstract class _DelayedEvent<T> {
|
| /** Added as a linked list on the [StreamController]. */
|
| @@ -678,11 +665,11 @@ abstract class _PendingEvents<T> {
|
| void clear();
|
| }
|
|
|
| -
|
| /** Class holding pending events for a [_StreamImpl]. */
|
| class _StreamImplEvents<T> extends _PendingEvents<T> {
|
| /// Single linked list of [_DelayedEvent] objects.
|
| _DelayedEvent firstPendingEvent = null;
|
| +
|
| /// Last element in the list of pending events. New events are added after it.
|
| _DelayedEvent lastPendingEvent = null;
|
|
|
| @@ -742,7 +729,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
|
|
|
| void onData(void handleData(T data)) {}
|
| void onError(Function handleError) {}
|
| - void onDone(void handleDone()) { _onDone = handleDone; }
|
| + void onDone(void handleDone()) {
|
| + _onDone = handleDone;
|
| + }
|
|
|
| void pause([Future resumeSignal]) {
|
| _state += _PAUSED;
|
| @@ -762,7 +751,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
|
|
|
| Future<E> asFuture<E>([E futureValue]) {
|
| _Future<E> result = new _Future<E>();
|
| - _onDone = () { result._completeWithValue(null); };
|
| + _onDone = () {
|
| + result._completeWithValue(null);
|
| + };
|
| return result;
|
| }
|
|
|
| @@ -783,15 +774,18 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| _AsBroadcastStreamController<T> _controller;
|
| StreamSubscription<T> _subscription;
|
|
|
| - _AsBroadcastStream(this._source,
|
| - void onListenHandler(StreamSubscription<T> subscription),
|
| - void onCancelHandler(StreamSubscription<T> subscription))
|
| + _AsBroadcastStream(
|
| + this._source,
|
| + void onListenHandler(StreamSubscription<T> subscription),
|
| + void onCancelHandler(StreamSubscription<T> subscription))
|
| // TODO(floitsch): the return type should be void and should be
|
| // inferred.
|
| - : _onListenHandler = Zone.current.registerUnaryCallback
|
| - <dynamic, StreamSubscription<T>>(onListenHandler),
|
| - _onCancelHandler = Zone.current.registerUnaryCallback
|
| - <dynamic, StreamSubscription<T>>(onCancelHandler),
|
| + : _onListenHandler = Zone.current
|
| + .registerUnaryCallback<dynamic, StreamSubscription<T>>(
|
| + onListenHandler),
|
| + _onCancelHandler = Zone.current
|
| + .registerUnaryCallback<dynamic, StreamSubscription<T>>(
|
| + onCancelHandler),
|
| _zone = Zone.current {
|
| _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
|
| }
|
| @@ -799,9 +793,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| bool get isBroadcast => true;
|
|
|
| StreamSubscription<T> listen(void onData(T data),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| + {Function onError, void onDone(), bool cancelOnError}) {
|
| if (_controller == null || _controller.isClosed) {
|
| // Return a dummy subscription backed by nothing, since
|
| // it will only ever send one done event.
|
| @@ -809,8 +801,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| }
|
| if (_subscription == null) {
|
| _subscription = _source.listen(_controller.add,
|
| - onError: _controller.addError,
|
| - onDone: _controller.close);
|
| + onError: _controller.addError, onDone: _controller.close);
|
| }
|
| cancelOnError = identical(true, cancelOnError);
|
| return _controller._subscribe(onData, onError, onDone, cancelOnError);
|
| @@ -843,7 +834,7 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| // Called by [_controller] when it has no subscribers left.
|
| StreamSubscription subscription = _subscription;
|
| _subscription = null;
|
| - _controller = null; // Marks the stream as no longer listenable.
|
| + _controller = null; // Marks the stream as no longer listenable.
|
| subscription.cancel();
|
| }
|
|
|
| @@ -909,7 +900,6 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
|
| }
|
| }
|
|
|
| -
|
| /**
|
| * Simple implementation of [StreamIterator].
|
| *
|
| @@ -969,7 +959,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
|
|
|
| T get current {
|
| if (_subscription != null && _isPaused) {
|
| - return _stateData as Object /*=T*/;
|
| + return _stateData as Object/*=T*/;
|
| }
|
| return null;
|
| }
|
| @@ -997,9 +987,9 @@ class _StreamIterator<T> implements StreamIterator<T> {
|
| assert(_subscription == null);
|
| var stateData = _stateData;
|
| if (stateData != null) {
|
| - Stream<T> stream = stateData as Object /*=Stream<T>*/;
|
| - _subscription = stream.listen(
|
| - _onData, onError: _onError, onDone: _onDone, cancelOnError: true);
|
| + Stream<T> stream = stateData as Object/*=Stream<T>*/;
|
| + _subscription = stream.listen(_onData,
|
| + onError: _onError, onDone: _onDone, cancelOnError: true);
|
| var future = new _Future<bool>();
|
| _stateData = future;
|
| return future;
|
| @@ -1014,7 +1004,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
|
| if (subscription != null) {
|
| _subscription = null;
|
| if (!_isPaused) {
|
| - _Future<bool> future = stateData as Object /*=_Future<bool>*/;
|
| + _Future<bool> future = stateData as Object/*=_Future<bool>*/;
|
| future._asyncComplete(false);
|
| }
|
| return subscription.cancel();
|
| @@ -1024,7 +1014,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
|
|
|
| void _onData(T data) {
|
| assert(_subscription != null && !_isPaused);
|
| - _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
|
| + _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
|
| _stateData = data;
|
| _isPaused = true;
|
| moveNextFuture._complete(true);
|
| @@ -1033,7 +1023,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
|
|
|
| void _onError(Object error, [StackTrace stackTrace]) {
|
| assert(_subscription != null && !_isPaused);
|
| - _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
|
| + _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
|
| _subscription = null;
|
| _stateData = null;
|
| moveNextFuture._completeError(error, stackTrace);
|
| @@ -1041,7 +1031,7 @@ class _StreamIterator<T> implements StreamIterator<T> {
|
|
|
| void _onDone() {
|
| assert(_subscription != null && !_isPaused);
|
| - _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
|
| + _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/;
|
| _subscription = null;
|
| _stateData = null;
|
| moveNextFuture._complete(false);
|
| @@ -1053,9 +1043,7 @@ class _EmptyStream<T> extends Stream<T> {
|
| const _EmptyStream() : super._internal();
|
| bool get isBroadcast => true;
|
| StreamSubscription<T> listen(void onData(T data),
|
| - {Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| + {Function onError, void onDone(), bool cancelOnError}) {
|
| return new _DoneStreamSubscription<T>(onDone);
|
| }
|
| }
|
|
|