Index: tool/input_sdk/lib/async/stream.dart |
diff --git a/tool/input_sdk/lib/async/stream.dart b/tool/input_sdk/lib/async/stream.dart |
index 54f712b99e1d9869629029bcbe26abdf671eb021..69313738c0dbaba9cf74a5cc0bd7754506f7f9fd 100644 |
--- a/tool/input_sdk/lib/async/stream.dart |
+++ b/tool/input_sdk/lib/async/stream.dart |
@@ -8,6 +8,8 @@ part of dart.async; |
// Core Stream types |
// ------------------------------------------------------------------- |
+typedef void _TimerCallback(); |
+ |
/** |
* A source of asynchronous data events. |
* |
@@ -48,7 +50,7 @@ part of dart.async; |
* use [asBroadcastStream] to create a broadcast stream on top of the |
* non-broadcast stream. |
* |
- * On either kind of stream, stream transformationss, such as [where] and |
+ * On either kind of stream, stream transformations, such as [where] and |
* [skip], return the same type of stream as the one the method was called on, |
* unless otherwise noted. |
* |
@@ -75,6 +77,22 @@ abstract class Stream<T> { |
Stream(); |
/** |
+ * Internal use only. We do not want to promise that Stream stays const. |
+ * |
+ * If mixins become compatible with const constructors, we may use a |
+ * stream mixin instead of extending Stream from a const class. |
+ */ |
+ const Stream._internal(); |
+ |
+ /** |
+ * Creates an empty broadcast stream. |
+ * |
+ * This is a stream which does nothing except sending a done event |
+ * when it's listened to. |
+ */ |
+ const factory Stream.empty() = _EmptyStream<T>; |
+ |
+ /** |
* Creates a new single-subscription stream from the future. |
* |
* When the future completes, the stream will fire one event, either |
@@ -84,8 +102,7 @@ abstract class Stream<T> { |
// Use the controller's buffering to fill in the value even before |
// the stream has a listener. For a single value, it's not worth it |
// to wait for a listener before doing the `then` on the future. |
- _StreamController<T> controller = |
- new StreamController<T>(sync: true) as _StreamController<T>; |
+ _StreamController<T> controller = new StreamController<T>(sync: true); |
future.then((value) { |
controller._add(value); |
controller._closeUnchecked(); |
@@ -98,6 +115,47 @@ abstract class Stream<T> { |
} |
/** |
+ * Create a stream from a group of futures. |
+ * |
+ * The stream reports the results of the futures on the stream in the order |
+ * in which the futures complete. |
+ * |
+ * If some futures have completed before calling `Stream.fromFutures`, |
+ * their result will be output on the created stream in some unspecified |
+ * order. |
+ * |
+ * When all futures have completed, the stream is closed. |
+ * |
+ * If no future is passed, the stream closes as soon as possible. |
+ */ |
+ factory Stream.fromFutures(Iterable<Future<T>> futures) { |
+ _StreamController<T> controller = new StreamController<T>(sync: true); |
+ int count = 0; |
+ var onValue = (T value) { |
+ if (!controller.isClosed) { |
+ controller._add(value); |
+ if (--count == 0) controller._closeUnchecked(); |
+ } |
+ }; |
+ var onError = (error, stack) { |
+ if (!controller.isClosed) { |
+ controller._addError(error, stack); |
+ if (--count == 0) controller._closeUnchecked(); |
+ } |
+ }; |
+ // The futures are already running, so start listening to them immediately |
+ // (instead of waiting for the stream to be listened on). |
+ // If we wait, we might not catch errors in the futures in time. |
+ for (var future in futures) { |
+ count++; |
+ future.then(onValue, onError: onError); |
+ } |
+ // Use schedule microtask since controller is sync. |
+ if (count == 0) scheduleMicrotask(controller.close); |
+ return controller.stream; |
+ } |
+ |
+ /** |
* Creates a single-subscription stream that gets its data from [data]. |
* |
* The iterable is iterated when the stream receives a listener, and stops |
@@ -124,8 +182,6 @@ abstract class Stream<T> { |
*/ |
factory Stream.periodic(Duration period, |
[T computation(int computationCount)]) { |
- if (computation == null) computation = ((i) => null); |
- |
Timer timer; |
int computationCount = 0; |
StreamController<T> controller; |
@@ -134,7 +190,15 @@ abstract class Stream<T> { |
void sendEvent() { |
watch.reset(); |
- T data = computation(computationCount++); |
+ T data; |
+ if (computation != null) { |
+ try { |
+ data = computation(computationCount++); |
+ } catch (e, s) { |
+ controller.addError(e, s); |
+ return; |
+ } |
+ } |
controller.add(data); |
} |
@@ -197,13 +261,13 @@ abstract class Stream<T> { |
* _outputSink.add(data); |
* } |
* |
- * void addError(e, [st]) => _outputSink(e, st); |
- * void close() => _outputSink.close(); |
+ * void addError(e, [st]) { _outputSink.addError(e, st); } |
+ * void close() { _outputSink.close(); } |
* } |
* |
* class DuplicationTransformer implements StreamTransformer<String, String> { |
* // Some generic types ommitted for brevety. |
- * Stream bind(Stream stream) => new Stream<String>.eventTransform( |
+ * Stream bind(Stream stream) => new Stream<String>.eventTransformed( |
* stream, |
* (EventSink sink) => new DuplicationSink(sink)); |
* } |
@@ -263,6 +327,10 @@ abstract class Stream<T> { |
* two arguments it is called with the stack trace (which could be `null` if |
* the stream itself received an error without stack trace). |
* Otherwise it is called with just the error object. |
+ * If [onError] is omitted, any errors on the stream are considered unhandled, |
+ * and will be passed to the current [Zone]'s error handler. |
+ * By default unhandled async errors are treated |
+ * as if they were uncaught top-level errors. |
* |
* If this stream closes, the [onDone] handler is called. |
* |
@@ -292,9 +360,17 @@ abstract class Stream<T> { |
* Creates a new stream that converts each element of this stream |
* to a new value using the [convert] function. |
* |
+ * For each data event, `o`, in this stream, the returned stream |
+ * provides a data event with the value `convert(o)`. |
+ * If [convert] throws, the returned stream reports the exception as an error |
+ * event instead. |
+ * |
+ * Error and done events are passed through unchanged to the returned stream. |
+ * |
* The returned stream is a broadcast stream if this stream is. |
+ * The [convert] function is called once per data event per listener. |
* If a broadcast stream is listened to more than once, each subscription |
- * will individually execute `map` for each event. |
+ * will individually call [convert] on each data event. |
*/ |
Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { |
return new _MapStream<T, dynamic/*=S*/>(this, convert); |
@@ -310,18 +386,20 @@ abstract class Stream<T> { |
* |
* The returned stream is a broadcast stream if this stream is. |
*/ |
- Stream asyncMap(convert(T event)) { |
- StreamController controller; |
- StreamSubscription subscription; |
- void onListen () { |
+ Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) { |
+ StreamController/*<E>*/ controller; |
+ StreamSubscription/*<T>*/ subscription; |
+ |
+ void onListen() { |
final add = controller.add; |
assert(controller is _StreamController || |
controller is _BroadcastStreamController); |
- final eventSink = controller as _EventSink<T>; |
+ final _EventSink/*<E>*/ eventSink = |
+ controller as Object /*=_EventSink<E>*/; |
final addError = eventSink._addError; |
subscription = this.listen( |
(T event) { |
- var newValue; |
+ dynamic newValue; |
try { |
newValue = convert(event); |
} catch (e, s) { |
@@ -333,21 +411,22 @@ abstract class Stream<T> { |
newValue.then(add, onError: addError) |
.whenComplete(subscription.resume); |
} else { |
- controller.add(newValue); |
+ controller.add(newValue as Object/*=E*/); |
} |
}, |
onError: addError, |
onDone: controller.close |
); |
} |
+ |
if (this.isBroadcast) { |
- controller = new StreamController.broadcast( |
+ controller = new StreamController/*<E>*/.broadcast( |
onListen: onListen, |
onCancel: () { subscription.cancel(); }, |
sync: true |
); |
} else { |
- controller = new StreamController( |
+ controller = new StreamController/*<E>*/( |
onListen: onListen, |
onPause: () { subscription.pause(); }, |
onResume: () { subscription.resume(); }, |
@@ -371,16 +450,17 @@ abstract class Stream<T> { |
* |
* The returned stream is a broadcast stream if this stream is. |
*/ |
- Stream asyncExpand(Stream convert(T event)) { |
- StreamController controller; |
- StreamSubscription subscription; |
+ Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { |
+ StreamController/*<E>*/ controller; |
+ StreamSubscription<T> subscription; |
void onListen() { |
assert(controller is _StreamController || |
controller is _BroadcastStreamController); |
- final eventSink = controller as _EventSink<T>; |
+ final _EventSink/*<E>*/ eventSink = |
+ controller as Object /*=_EventSink<E>*/; |
subscription = this.listen( |
(T event) { |
- Stream newStream; |
+ Stream/*<E>*/ newStream; |
try { |
newStream = convert(event); |
} catch (e, s) { |
@@ -398,13 +478,13 @@ abstract class Stream<T> { |
); |
} |
if (this.isBroadcast) { |
- controller = new StreamController.broadcast( |
+ controller = new StreamController/*<E>*/.broadcast( |
onListen: onListen, |
onCancel: () { subscription.cancel(); }, |
sync: true |
); |
} else { |
- controller = new StreamController( |
+ controller = new StreamController/*<E>*/( |
onListen: onListen, |
onPause: () { subscription.pause(); }, |
onResume: () { subscription.resume(); }, |
@@ -463,12 +543,22 @@ abstract class Stream<T> { |
} |
/** |
- * Binds this stream as the input of the provided [StreamConsumer]. |
+ * Pipe the events of this stream into [streamConsumer]. |
* |
- * The `streamConsumer` is closed when the stream has been added to it. |
+ * The events of this stream are added to `streamConsumer` using |
+ * [StreamConsumer.addStream]. |
+ * The `streamConsumer` is closed when this stream has been successfully added |
+ * to it - when the future returned by `addStream` completes without an error. |
* |
* Returns a future which completes when the stream has been consumed |
* and the consumer has been closed. |
+ * |
+ * The returned future completes with the same result as the future returned |
+ * by [StreamConsumer.close]. |
+ * If the adding of the stream itself fails in some way, |
+ * then the consumer is expected to be closed, and won't be closed again. |
+ * In that case the returned future completes with the error from calling |
+ * `addStream`. |
*/ |
Future pipe(StreamConsumer<T> streamConsumer) { |
return streamConsumer.addStream(this).then((_) => streamConsumer.close()); |
@@ -526,14 +616,15 @@ abstract class Stream<T> { |
/** Reduces a sequence of values by repeatedly applying [combine]. */ |
Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, |
/*=S*/ combine(var/*=S*/ previous, T element)) { |
- _Future/*<S>*/ result = new _Future(); |
- var value = initialValue; |
+ |
+ _Future/*<S>*/ result = new _Future/*<S>*/(); |
+ var/*=S*/ value = initialValue; |
StreamSubscription subscription; |
subscription = this.listen( |
(T element) { |
_runUserCode( |
() => combine(value, element), |
- (newValue) { value = newValue; }, |
+ (/*=S*/ newValue) { value = newValue; }, |
_cancelAndErrorClosure(subscription, result) |
); |
}, |
@@ -795,8 +886,8 @@ abstract class Stream<T> { |
* In case of a `done` event the future completes with the given |
* [futureValue]. |
*/ |
- Future drain([var futureValue]) => listen(null, cancelOnError: true) |
- .asFuture(futureValue); |
+ Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) |
+ => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue); |
/** |
* Provides at most the first [n] values of this stream. |
@@ -819,7 +910,7 @@ abstract class Stream<T> { |
* the returned stream is listened to. |
*/ |
Stream<T> take(int count) { |
- return new _TakeStream(this, count); |
+ return new _TakeStream<T>(this, count); |
} |
/** |
@@ -841,7 +932,7 @@ abstract class Stream<T> { |
* the returned stream is listened to. |
*/ |
Stream<T> takeWhile(bool test(T element)) { |
- return new _TakeWhileStream(this, test); |
+ return new _TakeWhileStream<T>(this, test); |
} |
/** |
@@ -852,7 +943,7 @@ abstract class Stream<T> { |
* the returned stream is listened to. |
*/ |
Stream<T> skip(int count) { |
- return new _SkipStream(this, count); |
+ return new _SkipStream<T>(this, count); |
} |
/** |
@@ -868,14 +959,14 @@ abstract class Stream<T> { |
* the returned stream is listened to. |
*/ |
Stream<T> skipWhile(bool test(T element)) { |
- return new _SkipWhileStream(this, test); |
+ return new _SkipWhileStream<T>(this, test); |
} |
/** |
* Skips data events if they are equal to the previous data event. |
* |
* The returned stream provides the same events as this stream, except |
- * that it never provides two consequtive data events that are equal. |
+ * that it never provides two consecutive data events that are equal. |
* |
* Equality is determined by the provided [equals] method. If that is |
* omitted, the '==' operator on the last provided data element is used. |
@@ -885,7 +976,7 @@ abstract class Stream<T> { |
* will individually perform the `equals` test. |
*/ |
Stream<T> distinct([bool equals(T previous, T next)]) { |
- return new _DistinctStream(this, equals); |
+ return new _DistinctStream<T>(this, equals); |
} |
/** |
@@ -938,8 +1029,7 @@ abstract class Stream<T> { |
_Future<T> future = new _Future<T>(); |
T result = null; |
bool foundResult = false; |
- StreamSubscription subscription; |
- subscription = this.listen( |
+ listen( |
(T value) { |
foundResult = true; |
result = value; |
@@ -1208,26 +1298,26 @@ abstract class Stream<T> { |
* will have its individually timer that starts counting on listen, |
* and the subscriptions' timers can be paused individually. |
*/ |
- Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) { |
- StreamController controller; |
+ Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) { |
+ StreamController<T> controller; |
// The following variables are set on listen. |
StreamSubscription<T> subscription; |
Timer timer; |
Zone zone; |
- Function timeout2; |
+ _TimerCallback timeout; |
void onData(T event) { |
timer.cancel(); |
controller.add(event); |
- timer = zone.createTimer(timeLimit, timeout2); |
+ timer = zone.createTimer(timeLimit, timeout); |
} |
void onError(error, StackTrace stackTrace) { |
timer.cancel(); |
assert(controller is _StreamController || |
controller is _BroadcastStreamController); |
- var eventSink = controller as _EventSink<T>; |
+ dynamic eventSink = controller; |
eventSink._addError(error, stackTrace); // Avoid Zone error replacement. |
- timer = zone.createTimer(timeLimit, timeout2); |
+ timer = zone.createTimer(timeLimit, timeout); |
} |
void onDone() { |
timer.cancel(); |
@@ -1240,23 +1330,26 @@ abstract class Stream<T> { |
// callback. |
zone = Zone.current; |
if (onTimeout == null) { |
- timeout2 = () { |
+ timeout = () { |
controller.addError(new TimeoutException("No stream event", |
timeLimit), null); |
}; |
} else { |
- onTimeout = zone.registerUnaryCallback(onTimeout); |
+ // TODO(floitsch): the return type should be 'void', and the type |
+ // should be inferred. |
+ var registeredOnTimeout = |
+ zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout); |
_ControllerEventSinkWrapper wrapper = |
new _ControllerEventSinkWrapper(null); |
- timeout2 = () { |
+ timeout = () { |
wrapper._sink = controller; // Only valid during call. |
- zone.runUnaryGuarded(onTimeout, wrapper); |
+ zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
wrapper._sink = null; |
}; |
} |
subscription = this.listen(onData, onError: onError, onDone: onDone); |
- timer = zone.createTimer(timeLimit, timeout2); |
+ timer = zone.createTimer(timeLimit, timeout); |
} |
Future onCancel() { |
timer.cancel(); |
@@ -1265,8 +1358,8 @@ abstract class Stream<T> { |
return result; |
} |
controller = isBroadcast |
- ? new _SyncBroadcastStreamController(onListen, onCancel) |
- : new _SyncStreamController( |
+ ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
+ : new _SyncStreamController<T>( |
onListen, |
() { |
// Don't null the timer, onCancel may call cancel again. |
@@ -1275,7 +1368,7 @@ abstract class Stream<T> { |
}, |
() { |
subscription.resume(); |
- timer = zone.createTimer(timeLimit, timeout2); |
+ timer = zone.createTimer(timeLimit, timeout); |
}, |
onCancel); |
return controller.stream; |
@@ -1283,7 +1376,7 @@ abstract class Stream<T> { |
} |
/** |
- * A subscritption on events from a [Stream]. |
+ * A subscription on events from a [Stream]. |
* |
* When you listen on a [Stream] using [Stream.listen], |
* a [StreamSubscription] object is returned. |
@@ -1295,20 +1388,24 @@ abstract class Stream<T> { |
*/ |
abstract class StreamSubscription<T> { |
/** |
- * Cancels this subscription. It will no longer receive events. |
+ * Cancels this subscription. |
* |
- * May return a future which completes when the stream is done cleaning up. |
- * This can be used if the stream needs to release some resources |
- * that are needed for a following operation, |
- * for example a file being read, that should be deleted afterwards. |
- * In that case, the file may not be able to be deleted successfully |
- * until the returned future has completed. |
+ * After this call, the subscription no longer receives events. |
* |
- * The future will be completed with a `null` value. |
- * If the cleanup throws, which it really shouldn't, the returned future |
- * will be completed with that error. |
+ * The stream may need to shut down the source of events and clean up after |
+ * the subscription is canceled. |
* |
- * Returns `null` if there is no need to wait. |
+ * Returns a future that is completed once the stream has finished |
+ * its cleanup. May also return `null` if no cleanup was necessary. |
+ * |
+ * Typically, futures are returned when the stream needs to release resources. |
+ * For example, a stream might need to close an open file (as an asynchronous |
+ * operation). If the listener wants to delete the file after having |
+ * canceled the subscription, it must wait for the cleanup future to complete. |
+ * |
+ * A returned future completes with a `null` value. |
+ * If the cleanup throws, which it really shouldn't, the returned future |
+ * completes with that error. |
*/ |
Future cancel(); |
@@ -1385,7 +1482,7 @@ abstract class StreamSubscription<T> { |
* In case of a `done` event the future completes with the given |
* [futureValue]. |
*/ |
- Future asFuture([var futureValue]); |
+ Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]); |
} |
@@ -1406,14 +1503,15 @@ abstract class EventSink<T> implements Sink<T> { |
/** [Stream] wrapper that only exposes the [Stream] interface. */ |
class StreamView<T> extends Stream<T> { |
- Stream<T> _stream; |
+ final Stream<T> _stream; |
- StreamView(this._stream); |
+ const StreamView(Stream<T> stream) : _stream = stream, super._internal(); |
bool get isBroadcast => _stream.isBroadcast; |
- Stream<T> asBroadcastStream({void onListen(StreamSubscription<T> subscription), |
- void onCancel(StreamSubscription<T> subscription)}) |
+ Stream<T> asBroadcastStream( |
+ {void onListen(StreamSubscription<T> subscription), |
+ void onCancel(StreamSubscription<T> subscription)}) |
=> _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
StreamSubscription<T> listen(void onData(T value), |
@@ -1427,11 +1525,19 @@ class StreamView<T> extends Stream<T> { |
/** |
- * The target of a [Stream.pipe] call. |
+ * Abstract interface for a "sink" accepting multiple entire streams. |
* |
- * The [Stream.pipe] call will pass itself to this object, and then return |
- * the resulting [Future]. The pipe should complete the future when it's |
- * done. |
+ * A consumer can accept a number of consecutive streams using [addStream], |
+ * and when no further data need to be added, the [close] method tells the |
+ * consumer to complete its work and shut down. |
+ * |
+ * This class is not just a [Sink<Stream>] because it is also combined with |
+ * other [Sink] classes, like it's combined with [EventSink] in the |
+ * [StreamSink] class. |
+ * |
+ * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream |
+ * to the consumer's [addStream] method. When that completes, it will |
+ * call [close] and then complete its own returned future. |
*/ |
abstract class StreamConsumer<S> { |
/** |
@@ -1439,22 +1545,38 @@ abstract class StreamConsumer<S> { |
* |
* Listens on [stream] and does something for each event. |
* |
- * The consumer may stop listening after an error, or it may consume |
- * all the errors and only stop at a done event. |
+ * Returns a future which is completed when the stream is done being added, |
+ * and the consumer is ready to accept a new stream. |
+ * No further calls to [addStream] or [close] should happen before the |
+ * returned future has completed. |
+ * |
+ * The consumer may stop listening to the stream after an error, |
+ * it may consume all the errors and only stop at a done event, |
+ * or it may be canceled early if the receiver don't want any further events. |
+ * |
+ * If the consumer stops listening because of some error preventing it |
+ * from continuing, it may report this error in the returned future, |
+ * otherwise it will just complete the future with `null`. |
*/ |
Future addStream(Stream<S> stream); |
/** |
- * Tell the consumer that no futher streams will be added. |
+ * Tells the consumer that no further streams will be added. |
* |
- * Returns a future that is completed when the consumer is done handling |
- * events. |
+ * This allows the consumer to complete any remaining work and release |
+ * resources that are no longer needed |
+ * |
+ * Returns a future which is completed when the consumer has shut down. |
+ * If cleaning up can fail, the error may be reported in the returned future, |
+ * otherwise it completes with `null`. |
*/ |
Future close(); |
} |
/** |
+ * A object that accepts stream events both synchronously and asynchronously. |
+ * |
* A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and |
* the synchronous methods from [EventSink]. |
* |
@@ -1471,11 +1593,26 @@ abstract class StreamConsumer<S> { |
* |
* When [close] is called, it will return the [done] [Future]. |
*/ |
-abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
+abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { |
/** |
- * As [EventSink.close], but returns a future. |
+ * Tells the stream sink that no further streams will be added. |
+ * |
+ * This allows the stream sink to complete any remaining work and release |
+ * resources that are no longer needed |
+ * |
+ * Returns a future which is completed when the stream sink has shut down. |
+ * If cleaning up can fail, the error may be reported in the returned future, |
+ * otherwise it completes with `null`. |
* |
* Returns the same future as [done]. |
+ * |
+ * The stream sink may close before the [close] method is called, either due |
+ * to an error or because it is itself provding events to someone who has |
+ * stopped listening. In that case, the [done] future is completed first, |
+ * and the `close` method will return the `done` future when called. |
+ * |
+ * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their |
+ * object as not expecting any further events. |
*/ |
Future close(); |
@@ -1561,16 +1698,16 @@ abstract class StreamTransformer<S, T> { |
* onDone: controller.close, |
* cancelOnError: cancelOnError); |
* }, |
- * onPause: subscription.pause, |
- * onResume: subscription.resume, |
- * onCancel: subscription.cancel, |
+ * onPause: () { subscription.pause(); }, |
+ * onResume: () { subscription.resume(); }, |
+ * onCancel: () { subscription.cancel(); }, |
* sync: true); |
* return controller.stream.listen(null); |
* }); |
*/ |
const factory StreamTransformer( |
StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
- = _StreamSubscriptionTransformer<S, T>; |
+ = _StreamSubscriptionTransformer<S, T>; |
/** |
* Creates a [StreamTransformer] that delegates events to the given functions. |
@@ -1587,7 +1724,7 @@ abstract class StreamTransformer<S, T> { |
void handleData(S data, EventSink<T> sink), |
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
void handleDone(EventSink<T> sink)}) |
- = _StreamHandlerTransformer<S, T>; |
+ = _StreamHandlerTransformer<S, T>; |
/** |
* Transform the incoming [stream]'s events. |
@@ -1603,7 +1740,7 @@ abstract class StreamTransformer<S, T> { |
} |
/** |
- * An [Iterable] like interface for the values of a [Stream]. |
+ * An [Iterator] like interface for the values of a [Stream]. |
* |
* This wraps a [Stream] and a subscription on the stream. It listens |
* on the stream, and completes the future returned by [moveNext] when the |
@@ -1620,19 +1757,30 @@ abstract class StreamIterator<T> { |
/** |
* Wait for the next stream value to be available. |
* |
- * It is not allowed to call this function again until the future has |
- * completed. If the returned future completes with anything except `true`, |
- * the iterator is done, and no new value will ever be available. |
+ * Returns a future which will complete with either `true` or `false`. |
+ * Completing with `true` means that another event has been received and |
+ * can be read as [current]. |
+ * Completing with `false` means that the stream itearation is done and |
+ * no further events will ever be available. |
+ * The future may complete with an error, if the stream produces an error, |
+ * which also ends iteration. |
* |
- * The future may complete with an error, if the stream produces an error. |
+ * The function must not be called again until the future returned by a |
+ * previous call is completed. |
*/ |
Future<bool> moveNext(); |
/** |
* The current value of the stream. |
* |
- * Only valid when the future returned by [moveNext] completes with `true` |
- * as value, and only until the next call to [moveNext]. |
+ * Is `null` before the first call to [moveNext] and after a call to |
+ * `moveNext` completes with a `false` result or an error. |
+ * |
+ * When a `moveNext` call completes with `true`, the `current` field holds |
+ * the most recent event of the stream, and it stays like that until the next |
+ * call to `moveNext`. |
+ * Between a call to `moveNext` and when its returned future completes, |
+ * the value is unspecified. |
*/ |
T get current; |
@@ -1642,13 +1790,14 @@ abstract class StreamIterator<T> { |
* The stream iterator is automatically canceled if the [moveNext] future |
* completes with either `false` or an error. |
* |
- * If a [moveNext] call has been made, it will complete with `false` as value, |
- * as will all further calls to [moveNext]. |
- * |
* If you need to stop listening for values before the stream iterator is |
* automatically closed, you must call [cancel] to ensure that the stream |
* is properly closed. |
* |
+ * If [moveNext] has been called when the iterator is cancelled, |
+ * its returned future will complete with `false` as value, |
+ * as will all further calls to [moveNext]. |
+ * |
* Returns a future if the cancel-operation is not completed synchronously. |
* Otherwise returns `null`. |
*/ |