Index: tool/input_sdk/lib/async/stream_impl.dart |
diff --git a/tool/input_sdk/lib/async/stream_impl.dart b/tool/input_sdk/lib/async/stream_impl.dart |
index b613cfdc8c23093046fbb915903a453e6ea0fa21..dffaa1205910b222ea7ca6bbdbf197069f33cccd 100644 |
--- a/tool/input_sdk/lib/async/stream_impl.dart |
+++ b/tool/input_sdk/lib/async/stream_impl.dart |
@@ -106,7 +106,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
* |
* Is created when necessary, or set in constructor for preconfigured events. |
*/ |
- _PendingEvents _pending; |
+ _PendingEvents<T> _pending; |
_BufferingStreamSubscription(void onData(T data), |
Function onError, |
@@ -124,7 +124,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
* This can only be done once. The pending events object is used for the |
* rest of the subscription's life cycle. |
*/ |
- void _setPendingEvents(_PendingEvents pendingEvents) { |
+ void _setPendingEvents(_PendingEvents<T> pendingEvents) { |
assert(_pending == null); |
if (pendingEvents == null) return; |
_pending = pendingEvents; |
@@ -134,29 +134,18 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
} |
} |
- /** |
- * Extracts the pending events from a canceled stream. |
- * |
- * This can only be done during the [_onCancel] method call. After that, |
- * any remaining pending events will be cleared. |
- */ |
- _PendingEvents _extractPending() { |
- assert(_isCanceled); |
- _PendingEvents events = _pending; |
- _pending = null; |
- return events; |
- } |
- |
// StreamSubscription interface. |
void onData(void handleData(T event)) { |
if (handleData == null) handleData = _nullDataHandler; |
- _onData = _zone.registerUnaryCallback(handleData); |
+ // TODO(floitsch): the return type should be 'void', and the type |
+ // should be inferred. |
+ _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData); |
} |
void onError(Function handleError) { |
if (handleError == null) handleError = _nullErrorHandler; |
- _onError = _registerErrorHandler(handleError, _zone); |
+ _onError = _registerErrorHandler/*<T>*/(handleError, _zone); |
} |
void onDone(void handleDone()) { |
@@ -202,8 +191,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
return _cancelFuture; |
} |
- Future asFuture([var futureValue]) { |
- _Future<T> result = new _Future<T>(); |
+ Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
+ _Future/*<E>*/ result = new _Future/*<E>*/(); |
// Overwrite the onDone and onError handlers. |
_onDone = () { result._complete(futureValue); }; |
@@ -269,7 +258,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
if (_canFire) { |
_sendData(data); |
} else { |
- _addPending(new _DelayedData(data)); |
+ _addPending(new _DelayedData<dynamic /*=T*/>(data)); |
} |
} |
@@ -319,8 +308,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
* of pending events later (if necessary). |
*/ |
void _addPending(_DelayedEvent event) { |
- _StreamImplEvents pending = _pending; |
- if (_pending == null) pending = _pending = new _StreamImplEvents(); |
+ _StreamImplEvents<T> pending = _pending; |
+ if (_pending == null) { |
+ pending = _pending = new _StreamImplEvents<dynamic /*=T*/>(); |
+ } |
pending.add(event); |
if (!_hasPending) { |
_state |= _STATE_HAS_PENDING; |
@@ -343,7 +334,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
_checkState(wasInputPaused); |
} |
- void _sendError(Object error, StackTrace stackTrace) { |
+ void _sendError(var error, StackTrace stackTrace) { |
assert(!_isCanceled); |
assert(!_isPaused); |
assert(!_inCallback); |
@@ -354,10 +345,13 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
// future to finish we must not report the error. |
if (_isCanceled && !_waitsForCancel) return; |
_state |= _STATE_IN_CALLBACK; |
- if (_onError is ZoneBinaryCallback) { |
- _zone.runBinaryGuarded(_onError, error, stackTrace); |
+ if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
+ ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError |
+ as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
+ _zone.runBinaryGuarded(errorCallback, error, stackTrace); |
} else { |
- _zone.runUnaryGuarded(_onError, error); |
+ _zone.runUnaryGuarded/*<dynamic, dynamic>*/( |
+ _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); |
} |
_state &= ~_STATE_IN_CALLBACK; |
} |
@@ -470,7 +464,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
void onDone(), |
bool cancelOnError }) { |
cancelOnError = identical(true, cancelOnError); |
- StreamSubscription subscription = |
+ StreamSubscription<T> subscription = |
_createSubscription(onData, onError, onDone, cancelOnError); |
_onListen(subscription); |
return subscription; |
@@ -491,11 +485,11 @@ abstract class _StreamImpl<T> extends Stream<T> { |
void _onListen(StreamSubscription subscription) {} |
} |
-typedef _PendingEvents _EventGenerator(); |
+typedef _PendingEvents<T> _EventGenerator<T>(); |
/** Stream that generates its own events. */ |
class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
- final _EventGenerator _pending; |
+ final _EventGenerator<T> _pending; |
bool _isUsed = false; |
/** |
* Initializes the stream to have only the events provided by a |
@@ -512,14 +506,14 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
bool cancelOnError) { |
if (_isUsed) throw new StateError("Stream has already been listened to."); |
_isUsed = true; |
- return new _BufferingStreamSubscription( |
+ return new _BufferingStreamSubscription<T>( |
onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); |
} |
} |
/** Pending events object that gets its events from an [Iterable]. */ |
-class _IterablePendingEvents<T> extends _PendingEvents { |
+class _IterablePendingEvents<T> extends _PendingEvents<T> { |
// The iterator providing data for data events. |
// Set to null when iteration has completed. |
Iterator<T> _iterator; |
@@ -528,7 +522,7 @@ class _IterablePendingEvents<T> extends _PendingEvents { |
bool get isEmpty => _iterator == null; |
- void handleNext(_EventDispatch dispatch) { |
+ void handleNext(_EventDispatch<T> dispatch) { |
if (_iterator == null) { |
throw new StateError("No events pending."); |
} |
@@ -622,7 +616,7 @@ class _DelayedDone implements _DelayedEvent { |
} |
/** Superclass for provider of pending events. */ |
-abstract class _PendingEvents { |
+abstract class _PendingEvents<T> { |
// No async event has been scheduled. |
static const int _STATE_UNSCHEDULED = 0; |
// An async event has been scheduled to run a function. |
@@ -656,7 +650,7 @@ abstract class _PendingEvents { |
* If called more than once, it should be called with the same dispatch as |
* argument each time. It may reuse an earlier argument in some cases. |
*/ |
- void schedule(_EventDispatch dispatch) { |
+ void schedule(_EventDispatch<T> dispatch) { |
if (isScheduled) return; |
assert(!isEmpty); |
if (_eventScheduled) { |
@@ -677,7 +671,7 @@ abstract class _PendingEvents { |
if (isScheduled) _state = _STATE_CANCELED; |
} |
- void handleNext(_EventDispatch dispatch); |
+ void handleNext(_EventDispatch<T> dispatch); |
/** Throw away any pending events and cancel scheduled events. */ |
void clear(); |
@@ -685,7 +679,7 @@ abstract class _PendingEvents { |
/** Class holding pending events for a [_StreamImpl]. */ |
-class _StreamImplEvents extends _PendingEvents { |
+class _StreamImplEvents<T> extends _PendingEvents<T> { |
/// Single linked list of [_DelayedEvent] objects. |
_DelayedEvent firstPendingEvent = null; |
/// Last element in the list of pending events. New events are added after it. |
@@ -701,7 +695,7 @@ class _StreamImplEvents extends _PendingEvents { |
} |
} |
- void handleNext(_EventDispatch dispatch) { |
+ void handleNext(_EventDispatch<T> dispatch) { |
assert(!isScheduled); |
_DelayedEvent event = firstPendingEvent; |
firstPendingEvent = event.next; |
@@ -736,7 +730,7 @@ class _BroadcastLinkedList { |
} |
} |
-typedef void _broadcastCallback(StreamSubscription subscription); |
+typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
/** |
* Done subscription that will send one done event as soon as possible. |
@@ -784,8 +778,8 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
Future cancel() => null; |
- Future asFuture([futureValue]) { |
- _Future result = new _Future(); |
+ Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
+ _Future/*<E>*/ result = new _Future/*<E>*/(); |
_onDone = () { result._completeWithValue(null); }; |
return result; |
} |
@@ -800,18 +794,22 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
class _AsBroadcastStream<T> extends Stream<T> { |
final Stream<T> _source; |
- final _broadcastCallback _onListenHandler; |
- final _broadcastCallback _onCancelHandler; |
+ final _BroadcastCallback<T> _onListenHandler; |
+ final _BroadcastCallback<T> _onCancelHandler; |
final Zone _zone; |
_AsBroadcastStreamController<T> _controller; |
StreamSubscription<T> _subscription; |
_AsBroadcastStream(this._source, |
- void onListenHandler(StreamSubscription subscription), |
- void onCancelHandler(StreamSubscription subscription)) |
- : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler), |
- _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler), |
+ void onListenHandler(StreamSubscription<T> subscription), |
+ void onCancelHandler(StreamSubscription<T> subscription)) |
+ // TODO(floitsch): the return type should be void and should be |
+ // inferred. |
+ : _onListenHandler = Zone.current.registerUnaryCallback |
+ /*<dynamic, StreamSubscription<T>>*/(onListenHandler), |
+ _onCancelHandler = Zone.current.registerUnaryCallback |
+ /*<dynamic, StreamSubscription<T>>*/(onCancelHandler), |
_zone = Zone.current { |
_controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
} |
@@ -839,7 +837,8 @@ class _AsBroadcastStream<T> extends Stream<T> { |
void _onCancel() { |
bool shutdown = (_controller == null) || _controller.isClosed; |
if (_onCancelHandler != null) { |
- _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this)); |
+ _zone.runUnary( |
+ _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
} |
if (shutdown) { |
if (_subscription != null) { |
@@ -851,7 +850,8 @@ class _AsBroadcastStream<T> extends Stream<T> { |
void _onListen() { |
if (_onListenHandler != null) { |
- _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this)); |
+ _zone.runUnary( |
+ _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
} |
} |
@@ -921,7 +921,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
return _stream._isSubscriptionPaused; |
} |
- Future asFuture([var futureValue]) { |
+ Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { |
throw new UnsupportedError( |
"Cannot change handlers of asBroadcastStream source subscription."); |
} |
@@ -972,7 +972,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
/// Also used to store the next value/error in case the stream provides an |
/// event before [moveNext] is called again. In that case, the stream will |
/// be paused to prevent further events. |
- var _futureOrPrefetch = null; |
+ var/*Future<bool> or T*/ _futureOrPrefetch = null; |
/// The current state. |
int _state = _STATE_FOUND; |
@@ -996,14 +996,15 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
if (_state == _STATE_FOUND) { |
_state = _STATE_MOVING; |
_current = null; |
- _futureOrPrefetch = new _Future<bool>(); |
- return _futureOrPrefetch; |
+ var result = new _Future<bool>(); |
+ _futureOrPrefetch = result; |
+ return result; |
} else { |
assert(_state >= _STATE_EXTRA_DATA); |
switch (_state) { |
case _STATE_EXTRA_DATA: |
_state = _STATE_FOUND; |
- _current = _futureOrPrefetch; |
+ _current = _futureOrPrefetch as Object /*=T*/; |
_futureOrPrefetch = null; |
_subscription.resume(); |
return new _Future<bool>.immediate(true); |
@@ -1029,10 +1030,9 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
Future cancel() { |
StreamSubscription subscription = _subscription; |
- // Cherry pick of: https://codereview.chromium.org//896793002 |
if (subscription == null) return null; |
if (_state == _STATE_MOVING) { |
- _Future<bool> hasNext = _futureOrPrefetch; |
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
_clear(); |
hasNext._complete(false); |
} else { |
@@ -1044,7 +1044,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
void _onData(T data) { |
if (_state == _STATE_MOVING) { |
_current = data; |
- _Future<bool> hasNext = _futureOrPrefetch; |
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
_futureOrPrefetch = null; |
_state = _STATE_FOUND; |
hasNext._complete(true); |
@@ -1058,7 +1058,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
void _onError(Object error, [StackTrace stackTrace]) { |
if (_state == _STATE_MOVING) { |
- _Future<bool> hasNext = _futureOrPrefetch; |
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
// We have cancelOnError: true, so the subscription is canceled. |
_clear(); |
hasNext._completeError(error, stackTrace); |
@@ -1072,7 +1072,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
void _onDone() { |
if (_state == _STATE_MOVING) { |
- _Future<bool> hasNext = _futureOrPrefetch; |
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
_clear(); |
hasNext._complete(false); |
return; |
@@ -1082,3 +1082,15 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> { |
_state = _STATE_EXTRA_DONE; |
} |
} |
+ |
+/** An empty broadcast stream, sending a done event as soon as possible. */ |
+class _EmptyStream<T> extends Stream<T> { |
+ const _EmptyStream() : super._internal(); |
+ bool get isBroadcast => true; |
+ StreamSubscription<T> listen(void onData(T data), |
+ {Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ return new _DoneStreamSubscription<T>(onDone); |
+ } |
+} |