| Index: tool/input_sdk/lib/async/stream.dart
|
| diff --git a/tool/input_sdk/lib/async/stream.dart b/tool/input_sdk/lib/async/stream.dart
|
| index 54f712b99e1d9869629029bcbe26abdf671eb021..69313738c0dbaba9cf74a5cc0bd7754506f7f9fd 100644
|
| --- a/tool/input_sdk/lib/async/stream.dart
|
| +++ b/tool/input_sdk/lib/async/stream.dart
|
| @@ -8,6 +8,8 @@ part of dart.async;
|
| // Core Stream types
|
| // -------------------------------------------------------------------
|
|
|
| +typedef void _TimerCallback();
|
| +
|
| /**
|
| * A source of asynchronous data events.
|
| *
|
| @@ -48,7 +50,7 @@ part of dart.async;
|
| * use [asBroadcastStream] to create a broadcast stream on top of the
|
| * non-broadcast stream.
|
| *
|
| - * On either kind of stream, stream transformationss, such as [where] and
|
| + * On either kind of stream, stream transformations, such as [where] and
|
| * [skip], return the same type of stream as the one the method was called on,
|
| * unless otherwise noted.
|
| *
|
| @@ -75,6 +77,22 @@ abstract class Stream<T> {
|
| Stream();
|
|
|
| /**
|
| + * Internal use only. We do not want to promise that Stream stays const.
|
| + *
|
| + * If mixins become compatible with const constructors, we may use a
|
| + * stream mixin instead of extending Stream from a const class.
|
| + */
|
| + const Stream._internal();
|
| +
|
| + /**
|
| + * Creates an empty broadcast stream.
|
| + *
|
| + * This is a stream which does nothing except sending a done event
|
| + * when it's listened to.
|
| + */
|
| + const factory Stream.empty() = _EmptyStream<T>;
|
| +
|
| + /**
|
| * Creates a new single-subscription stream from the future.
|
| *
|
| * When the future completes, the stream will fire one event, either
|
| @@ -84,8 +102,7 @@ abstract class Stream<T> {
|
| // Use the controller's buffering to fill in the value even before
|
| // the stream has a listener. For a single value, it's not worth it
|
| // to wait for a listener before doing the `then` on the future.
|
| - _StreamController<T> controller =
|
| - new StreamController<T>(sync: true) as _StreamController<T>;
|
| + _StreamController<T> controller = new StreamController<T>(sync: true);
|
| future.then((value) {
|
| controller._add(value);
|
| controller._closeUnchecked();
|
| @@ -98,6 +115,47 @@ abstract class Stream<T> {
|
| }
|
|
|
| /**
|
| + * Create a stream from a group of futures.
|
| + *
|
| + * The stream reports the results of the futures on the stream in the order
|
| + * in which the futures complete.
|
| + *
|
| + * If some futures have completed before calling `Stream.fromFutures`,
|
| + * their result will be output on the created stream in some unspecified
|
| + * order.
|
| + *
|
| + * When all futures have completed, the stream is closed.
|
| + *
|
| + * If no future is passed, the stream closes as soon as possible.
|
| + */
|
| + factory Stream.fromFutures(Iterable<Future<T>> futures) {
|
| + _StreamController<T> controller = new StreamController<T>(sync: true);
|
| + int count = 0;
|
| + var onValue = (T value) {
|
| + if (!controller.isClosed) {
|
| + controller._add(value);
|
| + if (--count == 0) controller._closeUnchecked();
|
| + }
|
| + };
|
| + var onError = (error, stack) {
|
| + if (!controller.isClosed) {
|
| + controller._addError(error, stack);
|
| + if (--count == 0) controller._closeUnchecked();
|
| + }
|
| + };
|
| + // The futures are already running, so start listening to them immediately
|
| + // (instead of waiting for the stream to be listened on).
|
| + // If we wait, we might not catch errors in the futures in time.
|
| + for (var future in futures) {
|
| + count++;
|
| + future.then(onValue, onError: onError);
|
| + }
|
| + // Use schedule microtask since controller is sync.
|
| + if (count == 0) scheduleMicrotask(controller.close);
|
| + return controller.stream;
|
| + }
|
| +
|
| + /**
|
| * Creates a single-subscription stream that gets its data from [data].
|
| *
|
| * The iterable is iterated when the stream receives a listener, and stops
|
| @@ -124,8 +182,6 @@ abstract class Stream<T> {
|
| */
|
| factory Stream.periodic(Duration period,
|
| [T computation(int computationCount)]) {
|
| - if (computation == null) computation = ((i) => null);
|
| -
|
| Timer timer;
|
| int computationCount = 0;
|
| StreamController<T> controller;
|
| @@ -134,7 +190,15 @@ abstract class Stream<T> {
|
|
|
| void sendEvent() {
|
| watch.reset();
|
| - T data = computation(computationCount++);
|
| + T data;
|
| + if (computation != null) {
|
| + try {
|
| + data = computation(computationCount++);
|
| + } catch (e, s) {
|
| + controller.addError(e, s);
|
| + return;
|
| + }
|
| + }
|
| controller.add(data);
|
| }
|
|
|
| @@ -197,13 +261,13 @@ abstract class Stream<T> {
|
| * _outputSink.add(data);
|
| * }
|
| *
|
| - * void addError(e, [st]) => _outputSink(e, st);
|
| - * void close() => _outputSink.close();
|
| + * void addError(e, [st]) { _outputSink.addError(e, st); }
|
| + * void close() { _outputSink.close(); }
|
| * }
|
| *
|
| * class DuplicationTransformer implements StreamTransformer<String, String> {
|
| * // Some generic types ommitted for brevety.
|
| - * Stream bind(Stream stream) => new Stream<String>.eventTransform(
|
| + * Stream bind(Stream stream) => new Stream<String>.eventTransformed(
|
| * stream,
|
| * (EventSink sink) => new DuplicationSink(sink));
|
| * }
|
| @@ -263,6 +327,10 @@ abstract class Stream<T> {
|
| * two arguments it is called with the stack trace (which could be `null` if
|
| * the stream itself received an error without stack trace).
|
| * Otherwise it is called with just the error object.
|
| + * If [onError] is omitted, any errors on the stream are considered unhandled,
|
| + * and will be passed to the current [Zone]'s error handler.
|
| + * By default unhandled async errors are treated
|
| + * as if they were uncaught top-level errors.
|
| *
|
| * If this stream closes, the [onDone] handler is called.
|
| *
|
| @@ -292,9 +360,17 @@ abstract class Stream<T> {
|
| * Creates a new stream that converts each element of this stream
|
| * to a new value using the [convert] function.
|
| *
|
| + * For each data event, `o`, in this stream, the returned stream
|
| + * provides a data event with the value `convert(o)`.
|
| + * If [convert] throws, the returned stream reports the exception as an error
|
| + * event instead.
|
| + *
|
| + * Error and done events are passed through unchanged to the returned stream.
|
| + *
|
| * The returned stream is a broadcast stream if this stream is.
|
| + * The [convert] function is called once per data event per listener.
|
| * If a broadcast stream is listened to more than once, each subscription
|
| - * will individually execute `map` for each event.
|
| + * will individually call [convert] on each data event.
|
| */
|
| Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) {
|
| return new _MapStream<T, dynamic/*=S*/>(this, convert);
|
| @@ -310,18 +386,20 @@ abstract class Stream<T> {
|
| *
|
| * The returned stream is a broadcast stream if this stream is.
|
| */
|
| - Stream asyncMap(convert(T event)) {
|
| - StreamController controller;
|
| - StreamSubscription subscription;
|
| - void onListen () {
|
| + Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) {
|
| + StreamController/*<E>*/ controller;
|
| + StreamSubscription/*<T>*/ subscription;
|
| +
|
| + void onListen() {
|
| final add = controller.add;
|
| assert(controller is _StreamController ||
|
| controller is _BroadcastStreamController);
|
| - final eventSink = controller as _EventSink<T>;
|
| + final _EventSink/*<E>*/ eventSink =
|
| + controller as Object /*=_EventSink<E>*/;
|
| final addError = eventSink._addError;
|
| subscription = this.listen(
|
| (T event) {
|
| - var newValue;
|
| + dynamic newValue;
|
| try {
|
| newValue = convert(event);
|
| } catch (e, s) {
|
| @@ -333,21 +411,22 @@ abstract class Stream<T> {
|
| newValue.then(add, onError: addError)
|
| .whenComplete(subscription.resume);
|
| } else {
|
| - controller.add(newValue);
|
| + controller.add(newValue as Object/*=E*/);
|
| }
|
| },
|
| onError: addError,
|
| onDone: controller.close
|
| );
|
| }
|
| +
|
| if (this.isBroadcast) {
|
| - controller = new StreamController.broadcast(
|
| + controller = new StreamController/*<E>*/.broadcast(
|
| onListen: onListen,
|
| onCancel: () { subscription.cancel(); },
|
| sync: true
|
| );
|
| } else {
|
| - controller = new StreamController(
|
| + controller = new StreamController/*<E>*/(
|
| onListen: onListen,
|
| onPause: () { subscription.pause(); },
|
| onResume: () { subscription.resume(); },
|
| @@ -371,16 +450,17 @@ abstract class Stream<T> {
|
| *
|
| * The returned stream is a broadcast stream if this stream is.
|
| */
|
| - Stream asyncExpand(Stream convert(T event)) {
|
| - StreamController controller;
|
| - StreamSubscription subscription;
|
| + Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) {
|
| + StreamController/*<E>*/ controller;
|
| + StreamSubscription<T> subscription;
|
| void onListen() {
|
| assert(controller is _StreamController ||
|
| controller is _BroadcastStreamController);
|
| - final eventSink = controller as _EventSink<T>;
|
| + final _EventSink/*<E>*/ eventSink =
|
| + controller as Object /*=_EventSink<E>*/;
|
| subscription = this.listen(
|
| (T event) {
|
| - Stream newStream;
|
| + Stream/*<E>*/ newStream;
|
| try {
|
| newStream = convert(event);
|
| } catch (e, s) {
|
| @@ -398,13 +478,13 @@ abstract class Stream<T> {
|
| );
|
| }
|
| if (this.isBroadcast) {
|
| - controller = new StreamController.broadcast(
|
| + controller = new StreamController/*<E>*/.broadcast(
|
| onListen: onListen,
|
| onCancel: () { subscription.cancel(); },
|
| sync: true
|
| );
|
| } else {
|
| - controller = new StreamController(
|
| + controller = new StreamController/*<E>*/(
|
| onListen: onListen,
|
| onPause: () { subscription.pause(); },
|
| onResume: () { subscription.resume(); },
|
| @@ -463,12 +543,22 @@ abstract class Stream<T> {
|
| }
|
|
|
| /**
|
| - * Binds this stream as the input of the provided [StreamConsumer].
|
| + * Pipe the events of this stream into [streamConsumer].
|
| *
|
| - * The `streamConsumer` is closed when the stream has been added to it.
|
| + * The events of this stream are added to `streamConsumer` using
|
| + * [StreamConsumer.addStream].
|
| + * The `streamConsumer` is closed when this stream has been successfully added
|
| + * to it - when the future returned by `addStream` completes without an error.
|
| *
|
| * Returns a future which completes when the stream has been consumed
|
| * and the consumer has been closed.
|
| + *
|
| + * The returned future completes with the same result as the future returned
|
| + * by [StreamConsumer.close].
|
| + * If the adding of the stream itself fails in some way,
|
| + * then the consumer is expected to be closed, and won't be closed again.
|
| + * In that case the returned future completes with the error from calling
|
| + * `addStream`.
|
| */
|
| Future pipe(StreamConsumer<T> streamConsumer) {
|
| return streamConsumer.addStream(this).then((_) => streamConsumer.close());
|
| @@ -526,14 +616,15 @@ abstract class Stream<T> {
|
| /** Reduces a sequence of values by repeatedly applying [combine]. */
|
| Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue,
|
| /*=S*/ combine(var/*=S*/ previous, T element)) {
|
| - _Future/*<S>*/ result = new _Future();
|
| - var value = initialValue;
|
| +
|
| + _Future/*<S>*/ result = new _Future/*<S>*/();
|
| + var/*=S*/ value = initialValue;
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| (T element) {
|
| _runUserCode(
|
| () => combine(value, element),
|
| - (newValue) { value = newValue; },
|
| + (/*=S*/ newValue) { value = newValue; },
|
| _cancelAndErrorClosure(subscription, result)
|
| );
|
| },
|
| @@ -795,8 +886,8 @@ abstract class Stream<T> {
|
| * In case of a `done` event the future completes with the given
|
| * [futureValue].
|
| */
|
| - Future drain([var futureValue]) => listen(null, cancelOnError: true)
|
| - .asFuture(futureValue);
|
| + Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue])
|
| + => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue);
|
|
|
| /**
|
| * Provides at most the first [n] values of this stream.
|
| @@ -819,7 +910,7 @@ abstract class Stream<T> {
|
| * the returned stream is listened to.
|
| */
|
| Stream<T> take(int count) {
|
| - return new _TakeStream(this, count);
|
| + return new _TakeStream<T>(this, count);
|
| }
|
|
|
| /**
|
| @@ -841,7 +932,7 @@ abstract class Stream<T> {
|
| * the returned stream is listened to.
|
| */
|
| Stream<T> takeWhile(bool test(T element)) {
|
| - return new _TakeWhileStream(this, test);
|
| + return new _TakeWhileStream<T>(this, test);
|
| }
|
|
|
| /**
|
| @@ -852,7 +943,7 @@ abstract class Stream<T> {
|
| * the returned stream is listened to.
|
| */
|
| Stream<T> skip(int count) {
|
| - return new _SkipStream(this, count);
|
| + return new _SkipStream<T>(this, count);
|
| }
|
|
|
| /**
|
| @@ -868,14 +959,14 @@ abstract class Stream<T> {
|
| * the returned stream is listened to.
|
| */
|
| Stream<T> skipWhile(bool test(T element)) {
|
| - return new _SkipWhileStream(this, test);
|
| + return new _SkipWhileStream<T>(this, test);
|
| }
|
|
|
| /**
|
| * Skips data events if they are equal to the previous data event.
|
| *
|
| * The returned stream provides the same events as this stream, except
|
| - * that it never provides two consequtive data events that are equal.
|
| + * that it never provides two consecutive data events that are equal.
|
| *
|
| * Equality is determined by the provided [equals] method. If that is
|
| * omitted, the '==' operator on the last provided data element is used.
|
| @@ -885,7 +976,7 @@ abstract class Stream<T> {
|
| * will individually perform the `equals` test.
|
| */
|
| Stream<T> distinct([bool equals(T previous, T next)]) {
|
| - return new _DistinctStream(this, equals);
|
| + return new _DistinctStream<T>(this, equals);
|
| }
|
|
|
| /**
|
| @@ -938,8 +1029,7 @@ abstract class Stream<T> {
|
| _Future<T> future = new _Future<T>();
|
| T result = null;
|
| bool foundResult = false;
|
| - StreamSubscription subscription;
|
| - subscription = this.listen(
|
| + listen(
|
| (T value) {
|
| foundResult = true;
|
| result = value;
|
| @@ -1208,26 +1298,26 @@ abstract class Stream<T> {
|
| * will have its individually timer that starts counting on listen,
|
| * and the subscriptions' timers can be paused individually.
|
| */
|
| - Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) {
|
| - StreamController controller;
|
| + Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
|
| + StreamController<T> controller;
|
| // The following variables are set on listen.
|
| StreamSubscription<T> subscription;
|
| Timer timer;
|
| Zone zone;
|
| - Function timeout2;
|
| + _TimerCallback timeout;
|
|
|
| void onData(T event) {
|
| timer.cancel();
|
| controller.add(event);
|
| - timer = zone.createTimer(timeLimit, timeout2);
|
| + timer = zone.createTimer(timeLimit, timeout);
|
| }
|
| void onError(error, StackTrace stackTrace) {
|
| timer.cancel();
|
| assert(controller is _StreamController ||
|
| controller is _BroadcastStreamController);
|
| - var eventSink = controller as _EventSink<T>;
|
| + dynamic eventSink = controller;
|
| eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
|
| - timer = zone.createTimer(timeLimit, timeout2);
|
| + timer = zone.createTimer(timeLimit, timeout);
|
| }
|
| void onDone() {
|
| timer.cancel();
|
| @@ -1240,23 +1330,26 @@ abstract class Stream<T> {
|
| // callback.
|
| zone = Zone.current;
|
| if (onTimeout == null) {
|
| - timeout2 = () {
|
| + timeout = () {
|
| controller.addError(new TimeoutException("No stream event",
|
| timeLimit), null);
|
| };
|
| } else {
|
| - onTimeout = zone.registerUnaryCallback(onTimeout);
|
| + // TODO(floitsch): the return type should be 'void', and the type
|
| + // should be inferred.
|
| + var registeredOnTimeout =
|
| + zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout);
|
| _ControllerEventSinkWrapper wrapper =
|
| new _ControllerEventSinkWrapper(null);
|
| - timeout2 = () {
|
| + timeout = () {
|
| wrapper._sink = controller; // Only valid during call.
|
| - zone.runUnaryGuarded(onTimeout, wrapper);
|
| + zone.runUnaryGuarded(registeredOnTimeout, wrapper);
|
| wrapper._sink = null;
|
| };
|
| }
|
|
|
| subscription = this.listen(onData, onError: onError, onDone: onDone);
|
| - timer = zone.createTimer(timeLimit, timeout2);
|
| + timer = zone.createTimer(timeLimit, timeout);
|
| }
|
| Future onCancel() {
|
| timer.cancel();
|
| @@ -1265,8 +1358,8 @@ abstract class Stream<T> {
|
| return result;
|
| }
|
| controller = isBroadcast
|
| - ? new _SyncBroadcastStreamController(onListen, onCancel)
|
| - : new _SyncStreamController(
|
| + ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
|
| + : new _SyncStreamController<T>(
|
| onListen,
|
| () {
|
| // Don't null the timer, onCancel may call cancel again.
|
| @@ -1275,7 +1368,7 @@ abstract class Stream<T> {
|
| },
|
| () {
|
| subscription.resume();
|
| - timer = zone.createTimer(timeLimit, timeout2);
|
| + timer = zone.createTimer(timeLimit, timeout);
|
| },
|
| onCancel);
|
| return controller.stream;
|
| @@ -1283,7 +1376,7 @@ abstract class Stream<T> {
|
| }
|
|
|
| /**
|
| - * A subscritption on events from a [Stream].
|
| + * A subscription on events from a [Stream].
|
| *
|
| * When you listen on a [Stream] using [Stream.listen],
|
| * a [StreamSubscription] object is returned.
|
| @@ -1295,20 +1388,24 @@ abstract class Stream<T> {
|
| */
|
| abstract class StreamSubscription<T> {
|
| /**
|
| - * Cancels this subscription. It will no longer receive events.
|
| + * Cancels this subscription.
|
| *
|
| - * May return a future which completes when the stream is done cleaning up.
|
| - * This can be used if the stream needs to release some resources
|
| - * that are needed for a following operation,
|
| - * for example a file being read, that should be deleted afterwards.
|
| - * In that case, the file may not be able to be deleted successfully
|
| - * until the returned future has completed.
|
| + * After this call, the subscription no longer receives events.
|
| *
|
| - * The future will be completed with a `null` value.
|
| - * If the cleanup throws, which it really shouldn't, the returned future
|
| - * will be completed with that error.
|
| + * The stream may need to shut down the source of events and clean up after
|
| + * the subscription is canceled.
|
| *
|
| - * Returns `null` if there is no need to wait.
|
| + * Returns a future that is completed once the stream has finished
|
| + * its cleanup. May also return `null` if no cleanup was necessary.
|
| + *
|
| + * Typically, futures are returned when the stream needs to release resources.
|
| + * For example, a stream might need to close an open file (as an asynchronous
|
| + * operation). If the listener wants to delete the file after having
|
| + * canceled the subscription, it must wait for the cleanup future to complete.
|
| + *
|
| + * A returned future completes with a `null` value.
|
| + * If the cleanup throws, which it really shouldn't, the returned future
|
| + * completes with that error.
|
| */
|
| Future cancel();
|
|
|
| @@ -1385,7 +1482,7 @@ abstract class StreamSubscription<T> {
|
| * In case of a `done` event the future completes with the given
|
| * [futureValue].
|
| */
|
| - Future asFuture([var futureValue]);
|
| + Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]);
|
| }
|
|
|
|
|
| @@ -1406,14 +1503,15 @@ abstract class EventSink<T> implements Sink<T> {
|
|
|
| /** [Stream] wrapper that only exposes the [Stream] interface. */
|
| class StreamView<T> extends Stream<T> {
|
| - Stream<T> _stream;
|
| + final Stream<T> _stream;
|
|
|
| - StreamView(this._stream);
|
| + const StreamView(Stream<T> stream) : _stream = stream, super._internal();
|
|
|
| bool get isBroadcast => _stream.isBroadcast;
|
|
|
| - Stream<T> asBroadcastStream({void onListen(StreamSubscription<T> subscription),
|
| - void onCancel(StreamSubscription<T> subscription)})
|
| + Stream<T> asBroadcastStream(
|
| + {void onListen(StreamSubscription<T> subscription),
|
| + void onCancel(StreamSubscription<T> subscription)})
|
| => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
|
|
|
| StreamSubscription<T> listen(void onData(T value),
|
| @@ -1427,11 +1525,19 @@ class StreamView<T> extends Stream<T> {
|
|
|
|
|
| /**
|
| - * The target of a [Stream.pipe] call.
|
| + * Abstract interface for a "sink" accepting multiple entire streams.
|
| *
|
| - * The [Stream.pipe] call will pass itself to this object, and then return
|
| - * the resulting [Future]. The pipe should complete the future when it's
|
| - * done.
|
| + * A consumer can accept a number of consecutive streams using [addStream],
|
| + * and when no further data need to be added, the [close] method tells the
|
| + * consumer to complete its work and shut down.
|
| + *
|
| + * This class is not just a [Sink<Stream>] because it is also combined with
|
| + * other [Sink] classes, like it's combined with [EventSink] in the
|
| + * [StreamSink] class.
|
| + *
|
| + * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream
|
| + * to the consumer's [addStream] method. When that completes, it will
|
| + * call [close] and then complete its own returned future.
|
| */
|
| abstract class StreamConsumer<S> {
|
| /**
|
| @@ -1439,22 +1545,38 @@ abstract class StreamConsumer<S> {
|
| *
|
| * Listens on [stream] and does something for each event.
|
| *
|
| - * The consumer may stop listening after an error, or it may consume
|
| - * all the errors and only stop at a done event.
|
| + * Returns a future which is completed when the stream is done being added,
|
| + * and the consumer is ready to accept a new stream.
|
| + * No further calls to [addStream] or [close] should happen before the
|
| + * returned future has completed.
|
| + *
|
| + * The consumer may stop listening to the stream after an error,
|
| + * it may consume all the errors and only stop at a done event,
|
| + * or it may be canceled early if the receiver don't want any further events.
|
| + *
|
| + * If the consumer stops listening because of some error preventing it
|
| + * from continuing, it may report this error in the returned future,
|
| + * otherwise it will just complete the future with `null`.
|
| */
|
| Future addStream(Stream<S> stream);
|
|
|
| /**
|
| - * Tell the consumer that no futher streams will be added.
|
| + * Tells the consumer that no further streams will be added.
|
| *
|
| - * Returns a future that is completed when the consumer is done handling
|
| - * events.
|
| + * This allows the consumer to complete any remaining work and release
|
| + * resources that are no longer needed
|
| + *
|
| + * Returns a future which is completed when the consumer has shut down.
|
| + * If cleaning up can fail, the error may be reported in the returned future,
|
| + * otherwise it completes with `null`.
|
| */
|
| Future close();
|
| }
|
|
|
|
|
| /**
|
| + * A object that accepts stream events both synchronously and asynchronously.
|
| + *
|
| * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and
|
| * the synchronous methods from [EventSink].
|
| *
|
| @@ -1471,11 +1593,26 @@ abstract class StreamConsumer<S> {
|
| *
|
| * When [close] is called, it will return the [done] [Future].
|
| */
|
| -abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> {
|
| +abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
|
| /**
|
| - * As [EventSink.close], but returns a future.
|
| + * Tells the stream sink that no further streams will be added.
|
| + *
|
| + * This allows the stream sink to complete any remaining work and release
|
| + * resources that are no longer needed
|
| + *
|
| + * Returns a future which is completed when the stream sink has shut down.
|
| + * If cleaning up can fail, the error may be reported in the returned future,
|
| + * otherwise it completes with `null`.
|
| *
|
| * Returns the same future as [done].
|
| + *
|
| + * The stream sink may close before the [close] method is called, either due
|
| + * to an error or because it is itself provding events to someone who has
|
| + * stopped listening. In that case, the [done] future is completed first,
|
| + * and the `close` method will return the `done` future when called.
|
| + *
|
| + * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their
|
| + * object as not expecting any further events.
|
| */
|
| Future close();
|
|
|
| @@ -1561,16 +1698,16 @@ abstract class StreamTransformer<S, T> {
|
| * onDone: controller.close,
|
| * cancelOnError: cancelOnError);
|
| * },
|
| - * onPause: subscription.pause,
|
| - * onResume: subscription.resume,
|
| - * onCancel: subscription.cancel,
|
| + * onPause: () { subscription.pause(); },
|
| + * onResume: () { subscription.resume(); },
|
| + * onCancel: () { subscription.cancel(); },
|
| * sync: true);
|
| * return controller.stream.listen(null);
|
| * });
|
| */
|
| const factory StreamTransformer(
|
| StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
|
| - = _StreamSubscriptionTransformer<S, T>;
|
| + = _StreamSubscriptionTransformer<S, T>;
|
|
|
| /**
|
| * Creates a [StreamTransformer] that delegates events to the given functions.
|
| @@ -1587,7 +1724,7 @@ abstract class StreamTransformer<S, T> {
|
| void handleData(S data, EventSink<T> sink),
|
| void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
|
| void handleDone(EventSink<T> sink)})
|
| - = _StreamHandlerTransformer<S, T>;
|
| + = _StreamHandlerTransformer<S, T>;
|
|
|
| /**
|
| * Transform the incoming [stream]'s events.
|
| @@ -1603,7 +1740,7 @@ abstract class StreamTransformer<S, T> {
|
| }
|
|
|
| /**
|
| - * An [Iterable] like interface for the values of a [Stream].
|
| + * An [Iterator] like interface for the values of a [Stream].
|
| *
|
| * This wraps a [Stream] and a subscription on the stream. It listens
|
| * on the stream, and completes the future returned by [moveNext] when the
|
| @@ -1620,19 +1757,30 @@ abstract class StreamIterator<T> {
|
| /**
|
| * Wait for the next stream value to be available.
|
| *
|
| - * It is not allowed to call this function again until the future has
|
| - * completed. If the returned future completes with anything except `true`,
|
| - * the iterator is done, and no new value will ever be available.
|
| + * Returns a future which will complete with either `true` or `false`.
|
| + * Completing with `true` means that another event has been received and
|
| + * can be read as [current].
|
| + * Completing with `false` means that the stream itearation is done and
|
| + * no further events will ever be available.
|
| + * The future may complete with an error, if the stream produces an error,
|
| + * which also ends iteration.
|
| *
|
| - * The future may complete with an error, if the stream produces an error.
|
| + * The function must not be called again until the future returned by a
|
| + * previous call is completed.
|
| */
|
| Future<bool> moveNext();
|
|
|
| /**
|
| * The current value of the stream.
|
| *
|
| - * Only valid when the future returned by [moveNext] completes with `true`
|
| - * as value, and only until the next call to [moveNext].
|
| + * Is `null` before the first call to [moveNext] and after a call to
|
| + * `moveNext` completes with a `false` result or an error.
|
| + *
|
| + * When a `moveNext` call completes with `true`, the `current` field holds
|
| + * the most recent event of the stream, and it stays like that until the next
|
| + * call to `moveNext`.
|
| + * Between a call to `moveNext` and when its returned future completes,
|
| + * the value is unspecified.
|
| */
|
| T get current;
|
|
|
| @@ -1642,13 +1790,14 @@ abstract class StreamIterator<T> {
|
| * The stream iterator is automatically canceled if the [moveNext] future
|
| * completes with either `false` or an error.
|
| *
|
| - * If a [moveNext] call has been made, it will complete with `false` as value,
|
| - * as will all further calls to [moveNext].
|
| - *
|
| * If you need to stop listening for values before the stream iterator is
|
| * automatically closed, you must call [cancel] to ensure that the stream
|
| * is properly closed.
|
| *
|
| + * If [moveNext] has been called when the iterator is cancelled,
|
| + * its returned future will complete with `false` as value,
|
| + * as will all further calls to [moveNext].
|
| + *
|
| * Returns a future if the cancel-operation is not completed synchronously.
|
| * Otherwise returns `null`.
|
| */
|
|
|