Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(109)

Unified Diff: tool/input_sdk/lib/async/stream_impl.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Remove unneeded calls. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tool/input_sdk/lib/async/stream_controller.dart ('k') | tool/input_sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tool/input_sdk/lib/async/stream_impl.dart
diff --git a/tool/input_sdk/lib/async/stream_impl.dart b/tool/input_sdk/lib/async/stream_impl.dart
index b613cfdc8c23093046fbb915903a453e6ea0fa21..dffaa1205910b222ea7ca6bbdbf197069f33cccd 100644
--- a/tool/input_sdk/lib/async/stream_impl.dart
+++ b/tool/input_sdk/lib/async/stream_impl.dart
@@ -106,7 +106,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
*
* Is created when necessary, or set in constructor for preconfigured events.
*/
- _PendingEvents _pending;
+ _PendingEvents<T> _pending;
_BufferingStreamSubscription(void onData(T data),
Function onError,
@@ -124,7 +124,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
* This can only be done once. The pending events object is used for the
* rest of the subscription's life cycle.
*/
- void _setPendingEvents(_PendingEvents pendingEvents) {
+ void _setPendingEvents(_PendingEvents<T> pendingEvents) {
assert(_pending == null);
if (pendingEvents == null) return;
_pending = pendingEvents;
@@ -134,29 +134,18 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
}
}
- /**
- * Extracts the pending events from a canceled stream.
- *
- * This can only be done during the [_onCancel] method call. After that,
- * any remaining pending events will be cleared.
- */
- _PendingEvents _extractPending() {
- assert(_isCanceled);
- _PendingEvents events = _pending;
- _pending = null;
- return events;
- }
-
// StreamSubscription interface.
void onData(void handleData(T event)) {
if (handleData == null) handleData = _nullDataHandler;
- _onData = _zone.registerUnaryCallback(handleData);
+ // TODO(floitsch): the return type should be 'void', and the type
+ // should be inferred.
+ _onData = _zone.registerUnaryCallback/*<dynamic, T>*/(handleData);
}
void onError(Function handleError) {
if (handleError == null) handleError = _nullErrorHandler;
- _onError = _registerErrorHandler(handleError, _zone);
+ _onError = _registerErrorHandler/*<T>*/(handleError, _zone);
}
void onDone(void handleDone()) {
@@ -202,8 +191,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
return _cancelFuture;
}
- Future asFuture([var futureValue]) {
- _Future<T> result = new _Future<T>();
+ Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
+ _Future/*<E>*/ result = new _Future/*<E>*/();
// Overwrite the onDone and onError handlers.
_onDone = () { result._complete(futureValue); };
@@ -269,7 +258,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
if (_canFire) {
_sendData(data);
} else {
- _addPending(new _DelayedData(data));
+ _addPending(new _DelayedData<dynamic /*=T*/>(data));
}
}
@@ -319,8 +308,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
* of pending events later (if necessary).
*/
void _addPending(_DelayedEvent event) {
- _StreamImplEvents pending = _pending;
- if (_pending == null) pending = _pending = new _StreamImplEvents();
+ _StreamImplEvents<T> pending = _pending;
+ if (_pending == null) {
+ pending = _pending = new _StreamImplEvents<dynamic /*=T*/>();
+ }
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
@@ -343,7 +334,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_checkState(wasInputPaused);
}
- void _sendError(Object error, StackTrace stackTrace) {
+ void _sendError(var error, StackTrace stackTrace) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
@@ -354,10 +345,13 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// future to finish we must not report the error.
if (_isCanceled && !_waitsForCancel) return;
_state |= _STATE_IN_CALLBACK;
- if (_onError is ZoneBinaryCallback) {
- _zone.runBinaryGuarded(_onError, error, stackTrace);
+ if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) {
+ ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError
+ as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/;
+ _zone.runBinaryGuarded(errorCallback, error, stackTrace);
} else {
- _zone.runUnaryGuarded(_onError, error);
+ _zone.runUnaryGuarded/*<dynamic, dynamic>*/(
+ _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
}
_state &= ~_STATE_IN_CALLBACK;
}
@@ -470,7 +464,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
void onDone(),
bool cancelOnError }) {
cancelOnError = identical(true, cancelOnError);
- StreamSubscription subscription =
+ StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
@@ -491,11 +485,11 @@ abstract class _StreamImpl<T> extends Stream<T> {
void _onListen(StreamSubscription subscription) {}
}
-typedef _PendingEvents _EventGenerator();
+typedef _PendingEvents<T> _EventGenerator<T>();
/** Stream that generates its own events. */
class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
- final _EventGenerator _pending;
+ final _EventGenerator<T> _pending;
bool _isUsed = false;
/**
* Initializes the stream to have only the events provided by a
@@ -512,14 +506,14 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
bool cancelOnError) {
if (_isUsed) throw new StateError("Stream has already been listened to.");
_isUsed = true;
- return new _BufferingStreamSubscription(
+ return new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError).._setPendingEvents(_pending());
}
}
/** Pending events object that gets its events from an [Iterable]. */
-class _IterablePendingEvents<T> extends _PendingEvents {
+class _IterablePendingEvents<T> extends _PendingEvents<T> {
// The iterator providing data for data events.
// Set to null when iteration has completed.
Iterator<T> _iterator;
@@ -528,7 +522,7 @@ class _IterablePendingEvents<T> extends _PendingEvents {
bool get isEmpty => _iterator == null;
- void handleNext(_EventDispatch dispatch) {
+ void handleNext(_EventDispatch<T> dispatch) {
if (_iterator == null) {
throw new StateError("No events pending.");
}
@@ -622,7 +616,7 @@ class _DelayedDone implements _DelayedEvent {
}
/** Superclass for provider of pending events. */
-abstract class _PendingEvents {
+abstract class _PendingEvents<T> {
// No async event has been scheduled.
static const int _STATE_UNSCHEDULED = 0;
// An async event has been scheduled to run a function.
@@ -656,7 +650,7 @@ abstract class _PendingEvents {
* If called more than once, it should be called with the same dispatch as
* argument each time. It may reuse an earlier argument in some cases.
*/
- void schedule(_EventDispatch dispatch) {
+ void schedule(_EventDispatch<T> dispatch) {
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
@@ -677,7 +671,7 @@ abstract class _PendingEvents {
if (isScheduled) _state = _STATE_CANCELED;
}
- void handleNext(_EventDispatch dispatch);
+ void handleNext(_EventDispatch<T> dispatch);
/** Throw away any pending events and cancel scheduled events. */
void clear();
@@ -685,7 +679,7 @@ abstract class _PendingEvents {
/** Class holding pending events for a [_StreamImpl]. */
-class _StreamImplEvents extends _PendingEvents {
+class _StreamImplEvents<T> extends _PendingEvents<T> {
/// Single linked list of [_DelayedEvent] objects.
_DelayedEvent firstPendingEvent = null;
/// Last element in the list of pending events. New events are added after it.
@@ -701,7 +695,7 @@ class _StreamImplEvents extends _PendingEvents {
}
}
- void handleNext(_EventDispatch dispatch) {
+ void handleNext(_EventDispatch<T> dispatch) {
assert(!isScheduled);
_DelayedEvent event = firstPendingEvent;
firstPendingEvent = event.next;
@@ -736,7 +730,7 @@ class _BroadcastLinkedList {
}
}
-typedef void _broadcastCallback(StreamSubscription subscription);
+typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
/**
* Done subscription that will send one done event as soon as possible.
@@ -784,8 +778,8 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
Future cancel() => null;
- Future asFuture([futureValue]) {
- _Future result = new _Future();
+ Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
+ _Future/*<E>*/ result = new _Future/*<E>*/();
_onDone = () { result._completeWithValue(null); };
return result;
}
@@ -800,18 +794,22 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
class _AsBroadcastStream<T> extends Stream<T> {
final Stream<T> _source;
- final _broadcastCallback _onListenHandler;
- final _broadcastCallback _onCancelHandler;
+ final _BroadcastCallback<T> _onListenHandler;
+ final _BroadcastCallback<T> _onCancelHandler;
final Zone _zone;
_AsBroadcastStreamController<T> _controller;
StreamSubscription<T> _subscription;
_AsBroadcastStream(this._source,
- void onListenHandler(StreamSubscription subscription),
- void onCancelHandler(StreamSubscription subscription))
- : _onListenHandler = Zone.current.registerUnaryCallback(onListenHandler),
- _onCancelHandler = Zone.current.registerUnaryCallback(onCancelHandler),
+ void onListenHandler(StreamSubscription<T> subscription),
+ void onCancelHandler(StreamSubscription<T> subscription))
+ // TODO(floitsch): the return type should be void and should be
+ // inferred.
+ : _onListenHandler = Zone.current.registerUnaryCallback
+ /*<dynamic, StreamSubscription<T>>*/(onListenHandler),
+ _onCancelHandler = Zone.current.registerUnaryCallback
+ /*<dynamic, StreamSubscription<T>>*/(onCancelHandler),
_zone = Zone.current {
_controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
}
@@ -839,7 +837,8 @@ class _AsBroadcastStream<T> extends Stream<T> {
void _onCancel() {
bool shutdown = (_controller == null) || _controller.isClosed;
if (_onCancelHandler != null) {
- _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this));
+ _zone.runUnary(
+ _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
}
if (shutdown) {
if (_subscription != null) {
@@ -851,7 +850,8 @@ class _AsBroadcastStream<T> extends Stream<T> {
void _onListen() {
if (_onListenHandler != null) {
- _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this));
+ _zone.runUnary(
+ _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this));
}
}
@@ -921,7 +921,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
return _stream._isSubscriptionPaused;
}
- Future asFuture([var futureValue]) {
+ Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
throw new UnsupportedError(
"Cannot change handlers of asBroadcastStream source subscription.");
}
@@ -972,7 +972,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
/// Also used to store the next value/error in case the stream provides an
/// event before [moveNext] is called again. In that case, the stream will
/// be paused to prevent further events.
- var _futureOrPrefetch = null;
+ var/*Future<bool> or T*/ _futureOrPrefetch = null;
/// The current state.
int _state = _STATE_FOUND;
@@ -996,14 +996,15 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
if (_state == _STATE_FOUND) {
_state = _STATE_MOVING;
_current = null;
- _futureOrPrefetch = new _Future<bool>();
- return _futureOrPrefetch;
+ var result = new _Future<bool>();
+ _futureOrPrefetch = result;
+ return result;
} else {
assert(_state >= _STATE_EXTRA_DATA);
switch (_state) {
case _STATE_EXTRA_DATA:
_state = _STATE_FOUND;
- _current = _futureOrPrefetch;
+ _current = _futureOrPrefetch as Object /*=T*/;
_futureOrPrefetch = null;
_subscription.resume();
return new _Future<bool>.immediate(true);
@@ -1029,10 +1030,9 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
Future cancel() {
StreamSubscription subscription = _subscription;
- // Cherry pick of: https://codereview.chromium.org//896793002
if (subscription == null) return null;
if (_state == _STATE_MOVING) {
- _Future<bool> hasNext = _futureOrPrefetch;
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_clear();
hasNext._complete(false);
} else {
@@ -1044,7 +1044,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
void _onData(T data) {
if (_state == _STATE_MOVING) {
_current = data;
- _Future<bool> hasNext = _futureOrPrefetch;
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_futureOrPrefetch = null;
_state = _STATE_FOUND;
hasNext._complete(true);
@@ -1058,7 +1058,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
void _onError(Object error, [StackTrace stackTrace]) {
if (_state == _STATE_MOVING) {
- _Future<bool> hasNext = _futureOrPrefetch;
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
// We have cancelOnError: true, so the subscription is canceled.
_clear();
hasNext._completeError(error, stackTrace);
@@ -1072,7 +1072,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
void _onDone() {
if (_state == _STATE_MOVING) {
- _Future<bool> hasNext = _futureOrPrefetch;
+ _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_clear();
hasNext._complete(false);
return;
@@ -1082,3 +1082,15 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
_state = _STATE_EXTRA_DONE;
}
}
+
+/** An empty broadcast stream, sending a done event as soon as possible. */
+class _EmptyStream<T> extends Stream<T> {
+ const _EmptyStream() : super._internal();
+ bool get isBroadcast => true;
+ StreamSubscription<T> listen(void onData(T data),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ return new _DoneStreamSubscription<T>(onDone);
+ }
+}
« no previous file with comments | « tool/input_sdk/lib/async/stream_controller.dart ('k') | tool/input_sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698