| Index: tool/input_sdk/lib/async/stream_impl.dart
|
| diff --git a/tool/input_sdk/lib/async/stream_impl.dart b/tool/input_sdk/lib/async/stream_impl.dart
|
| index b613cfdc8c23093046fbb915903a453e6ea0fa21..dffaa1205910b222ea7ca6bbdbf197069f33cccd 100644
|
| --- a/tool/input_sdk/lib/async/stream_impl.dart
|
| +++ b/tool/input_sdk/lib/async/stream_impl.dart
|
| @@ -106,7 +106,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| *
|
| * Is created when necessary, or set in constructor for preconfigured events.
|
| */
|
| - _PendingEvents _pending;
|
| + _PendingEvents<T> _pending;
|
|
|
| _BufferingStreamSubscription(void onData(T data),
|
| Function onError,
|
| @@ -124,7 +124,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| * This can only be done once. The pending events object is used for the
|
| * rest of the subscription's life cycle.
|
| */
|
| - void _setPendingEvents(_PendingEvents pendingEvents) {
|
| + void _setPendingEvents(_PendingEvents<T> pendingEvents) {
|
| assert(_pending == null);
|
| if (pendingEvents == null) return;
|
| _pending = pendingEvents;
|
| @@ -134,29 +134,18 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| }
|
| }
|
|
|
| - /**
|
| - * Extracts the pending events from a canceled stream.
|
| - *
|
| - * This can only be done during the [_onCancel] method call. After that,
|
| - * any remaining pending events will be cleared.
|
| - */
|
| - _PendingEvents _extractPending() {
|
| - assert(_isCanceled);
|
| - _PendingEvents events = _pending;
|
| - _pending = null;
|
| - return events;
|
| - }
|
| -
|
| // StreamSubscription interface.
|
|
|
| void onData(void handleData(T event)) {
|
| if (handleData == null) handleData = _nullDataHandler;
|
| - _onData = _zone.registerUnaryCallback(handleData);
|
| + // TODO(floitsch): the return type should be 'void', and the type
|
| + // should be inferred.
|
| + _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData);
|
| }
|
|
|
| void onError(Function handleError) {
|
| if (handleError == null) handleError = _nullErrorHandler;
|
| - _onError = _registerErrorHandler(handleError, _zone);
|
| + _onError = _registerErrorHandler/*<T>*/(handleError, _zone);
|
| }
|
|
|
| void onDone(void handleDone()) {
|
| @@ -202,8 +191,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| return _cancelFuture;
|
| }
|
|
|
| - Future asFuture([var futureValue]) {
|
| - _Future<T> result = new _Future<T>();
|
| + Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| + _Future/*<E>*/ result = new _Future/*<E>*/();
|
|
|
| // Overwrite the onDone and onError handlers.
|
| _onDone = () { result._complete(futureValue); };
|
| @@ -269,7 +258,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| if (_canFire) {
|
| _sendData(data);
|
| } else {
|
| - _addPending(new _DelayedData(data));
|
| + _addPending(new _DelayedData<dynamic /*=T*/>(data));
|
| }
|
| }
|
|
|
| @@ -319,8 +308,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| * of pending events later (if necessary).
|
| */
|
| void _addPending(_DelayedEvent event) {
|
| - _StreamImplEvents pending = _pending;
|
| - if (_pending == null) pending = _pending = new _StreamImplEvents();
|
| + _StreamImplEvents<T> pending = _pending;
|
| + if (_pending == null) {
|
| + pending = _pending = new _StreamImplEvents<dynamic /*=T*/>();
|
| + }
|
| pending.add(event);
|
| if (!_hasPending) {
|
| _state |= _STATE_HAS_PENDING;
|
| @@ -343,7 +334,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| _checkState(wasInputPaused);
|
| }
|
|
|
| - void _sendError(Object error, StackTrace stackTrace) {
|
| + void _sendError(var error, StackTrace stackTrace) {
|
| assert(!_isCanceled);
|
| assert(!_isPaused);
|
| assert(!_inCallback);
|
| @@ -354,10 +345,13 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| // future to finish we must not report the error.
|
| if (_isCanceled && !_waitsForCancel) return;
|
| _state |= _STATE_IN_CALLBACK;
|
| - if (_onError is ZoneBinaryCallback) {
|
| - _zone.runBinaryGuarded(_onError, error, stackTrace);
|
| + if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
|
| + ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
|
| + as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
|
| + _zone.runBinaryGuarded(errorCallback, error, stackTrace);
|
| } else {
|
| - _zone.runUnaryGuarded(_onError, error);
|
| + _zone.runUnaryGuarded/*<dynamic, dynamic>*/(
|
| + _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
|
| }
|
| _state &= ~_STATE_IN_CALLBACK;
|
| }
|
| @@ -470,7 +464,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| void onDone(),
|
| bool cancelOnError }) {
|
| cancelOnError = identical(true, cancelOnError);
|
| - StreamSubscription subscription =
|
| + StreamSubscription<T> subscription =
|
| _createSubscription(onData, onError, onDone, cancelOnError);
|
| _onListen(subscription);
|
| return subscription;
|
| @@ -491,11 +485,11 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| void _onListen(StreamSubscription subscription) {}
|
| }
|
|
|
| -typedef _PendingEvents _EventGenerator();
|
| +typedef _PendingEvents<T> _EventGenerator<T>();
|
|
|
| /** Stream that generates its own events. */
|
| class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
|
| - final _EventGenerator _pending;
|
| + final _EventGenerator<T> _pending;
|
| bool _isUsed = false;
|
| /**
|
| * Initializes the stream to have only the events provided by a
|
| @@ -512,14 +506,14 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
|
| bool cancelOnError) {
|
| if (_isUsed) throw new StateError("Stream has already been listened to.");
|
| _isUsed = true;
|
| - return new _BufferingStreamSubscription(
|
| + return new _BufferingStreamSubscription<T>(
|
| onData, onError, onDone, cancelOnError).._setPendingEvents(_pending());
|
| }
|
| }
|
|
|
|
|
| /** Pending events object that gets its events from an [Iterable]. */
|
| -class _IterablePendingEvents<T> extends _PendingEvents {
|
| +class _IterablePendingEvents<T> extends _PendingEvents<T> {
|
| // The iterator providing data for data events.
|
| // Set to null when iteration has completed.
|
| Iterator<T> _iterator;
|
| @@ -528,7 +522,7 @@ class _IterablePendingEvents<T> extends _PendingEvents {
|
|
|
| bool get isEmpty => _iterator == null;
|
|
|
| - void handleNext(_EventDispatch dispatch) {
|
| + void handleNext(_EventDispatch<T> dispatch) {
|
| if (_iterator == null) {
|
| throw new StateError("No events pending.");
|
| }
|
| @@ -622,7 +616,7 @@ class _DelayedDone implements _DelayedEvent {
|
| }
|
|
|
| /** Superclass for provider of pending events. */
|
| -abstract class _PendingEvents {
|
| +abstract class _PendingEvents<T> {
|
| // No async event has been scheduled.
|
| static const int _STATE_UNSCHEDULED = 0;
|
| // An async event has been scheduled to run a function.
|
| @@ -656,7 +650,7 @@ abstract class _PendingEvents {
|
| * If called more than once, it should be called with the same dispatch as
|
| * argument each time. It may reuse an earlier argument in some cases.
|
| */
|
| - void schedule(_EventDispatch dispatch) {
|
| + void schedule(_EventDispatch<T> dispatch) {
|
| if (isScheduled) return;
|
| assert(!isEmpty);
|
| if (_eventScheduled) {
|
| @@ -677,7 +671,7 @@ abstract class _PendingEvents {
|
| if (isScheduled) _state = _STATE_CANCELED;
|
| }
|
|
|
| - void handleNext(_EventDispatch dispatch);
|
| + void handleNext(_EventDispatch<T> dispatch);
|
|
|
| /** Throw away any pending events and cancel scheduled events. */
|
| void clear();
|
| @@ -685,7 +679,7 @@ abstract class _PendingEvents {
|
|
|
|
|
| /** Class holding pending events for a [_StreamImpl]. */
|
| -class _StreamImplEvents extends _PendingEvents {
|
| +class _StreamImplEvents<T> extends _PendingEvents<T> {
|
| /// Single linked list of [_DelayedEvent] objects.
|
| _DelayedEvent firstPendingEvent = null;
|
| /// Last element in the list of pending events. New events are added after it.
|
| @@ -701,7 +695,7 @@ class _StreamImplEvents extends _PendingEvents {
|
| }
|
| }
|
|
|
| - void handleNext(_EventDispatch dispatch) {
|
| + void handleNext(_EventDispatch<T> dispatch) {
|
| assert(!isScheduled);
|
| _DelayedEvent event = firstPendingEvent;
|
| firstPendingEvent = event.next;
|
| @@ -736,7 +730,7 @@ class _BroadcastLinkedList {
|
| }
|
| }
|
|
|
| -typedef void _broadcastCallback(StreamSubscription subscription);
|
| +typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
|
|
|
| /**
|
| * Done subscription that will send one done event as soon as possible.
|
| @@ -784,8 +778,8 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
|
|
|
| Future cancel() => null;
|
|
|
| - Future asFuture([futureValue]) {
|
| - _Future result = new _Future();
|
| + Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| + _Future/*<E>*/ result = new _Future/*<E>*/();
|
| _onDone = () { result._completeWithValue(null); };
|
| return result;
|
| }
|
| @@ -800,18 +794,22 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
|
|
|
| class _AsBroadcastStream<T> extends Stream<T> {
|
| final Stream<T> _source;
|
| - final _broadcastCallback _onListenHandler;
|
| - final _broadcastCallback _onCancelHandler;
|
| + final _BroadcastCallback<T> _onListenHandler;
|
| + final _BroadcastCallback<T> _onCancelHandler;
|
| final Zone _zone;
|
|
|
| _AsBroadcastStreamController<T> _controller;
|
| StreamSubscription<T> _subscription;
|
|
|
| _AsBroadcastStream(this._source,
|
| - void onListenHandler(StreamSubscription subscription),
|
| - void onCancelHandler(StreamSubscription subscription))
|
| - : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler),
|
| - _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler),
|
| + void onListenHandler(StreamSubscription<T> subscription),
|
| + void onCancelHandler(StreamSubscription<T> subscription))
|
| + // TODO(floitsch): the return type should be void and should be
|
| + // inferred.
|
| + : _onListenHandler = Zone.current.registerUnaryCallback
|
| + /*<dynamic, StreamSubscription<T>>*/(onListenHandler),
|
| + _onCancelHandler = Zone.current.registerUnaryCallback
|
| + /*<dynamic, StreamSubscription<T>>*/(onCancelHandler),
|
| _zone = Zone.current {
|
| _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
|
| }
|
| @@ -839,7 +837,8 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| void _onCancel() {
|
| bool shutdown = (_controller == null) || _controller.isClosed;
|
| if (_onCancelHandler != null) {
|
| - _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this));
|
| + _zone.runUnary(
|
| + _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
|
| }
|
| if (shutdown) {
|
| if (_subscription != null) {
|
| @@ -851,7 +850,8 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
|
|
| void _onListen() {
|
| if (_onListenHandler != null) {
|
| - _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this));
|
| + _zone.runUnary(
|
| + _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
|
| }
|
| }
|
|
|
| @@ -921,7 +921,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
|
| return _stream._isSubscriptionPaused;
|
| }
|
|
|
| - Future asFuture([var futureValue]) {
|
| + Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
|
| throw new UnsupportedError(
|
| "Cannot change handlers of asBroadcastStream source subscription.");
|
| }
|
| @@ -972,7 +972,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| /// Also used to store the next value/error in case the stream provides an
|
| /// event before [moveNext] is called again. In that case, the stream will
|
| /// be paused to prevent further events.
|
| - var _futureOrPrefetch = null;
|
| + var/*Future<bool> or T*/ _futureOrPrefetch = null;
|
|
|
| /// The current state.
|
| int _state = _STATE_FOUND;
|
| @@ -996,14 +996,15 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| if (_state == _STATE_FOUND) {
|
| _state = _STATE_MOVING;
|
| _current = null;
|
| - _futureOrPrefetch = new _Future<bool>();
|
| - return _futureOrPrefetch;
|
| + var result = new _Future<bool>();
|
| + _futureOrPrefetch = result;
|
| + return result;
|
| } else {
|
| assert(_state >= _STATE_EXTRA_DATA);
|
| switch (_state) {
|
| case _STATE_EXTRA_DATA:
|
| _state = _STATE_FOUND;
|
| - _current = _futureOrPrefetch;
|
| + _current = _futureOrPrefetch as Object /*=T*/;
|
| _futureOrPrefetch = null;
|
| _subscription.resume();
|
| return new _Future<bool>.immediate(true);
|
| @@ -1029,10 +1030,9 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
|
|
| Future cancel() {
|
| StreamSubscription subscription = _subscription;
|
| - // Cherry pick of: https://codereview.chromium.org//896793002
|
| if (subscription == null) return null;
|
| if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch;
|
| + _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| _clear();
|
| hasNext._complete(false);
|
| } else {
|
| @@ -1044,7 +1044,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| void _onData(T data) {
|
| if (_state == _STATE_MOVING) {
|
| _current = data;
|
| - _Future<bool> hasNext = _futureOrPrefetch;
|
| + _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| _futureOrPrefetch = null;
|
| _state = _STATE_FOUND;
|
| hasNext._complete(true);
|
| @@ -1058,7 +1058,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
|
|
| void _onError(Object error, [StackTrace stackTrace]) {
|
| if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch;
|
| + _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| // We have cancelOnError: true, so the subscription is canceled.
|
| _clear();
|
| hasNext._completeError(error, stackTrace);
|
| @@ -1072,7 +1072,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
|
|
| void _onDone() {
|
| if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch;
|
| + _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| _clear();
|
| hasNext._complete(false);
|
| return;
|
| @@ -1082,3 +1082,15 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| _state = _STATE_EXTRA_DONE;
|
| }
|
| }
|
| +
|
| +/** An empty broadcast stream, sending a done event as soon as possible. */
|
| +class _EmptyStream<T> extends Stream<T> {
|
| + const _EmptyStream() : super._internal();
|
| + bool get isBroadcast => true;
|
| + StreamSubscription<T> listen(void onData(T data),
|
| + {Function onError,
|
| + void onDone(),
|
| + bool cancelOnError}) {
|
| + return new _DoneStreamSubscription<T>(onDone);
|
| + }
|
| +}
|
|
|