| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ffa10233ca043a6f3d0658816d3b15c251e44d4c
|
| --- /dev/null
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -0,0 +1,838 @@
|
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// part of dart.async;
|
| +
|
| +// -------------------------------------------------------------------
|
| +// Core Stream types
|
| +// -------------------------------------------------------------------
|
| +
|
| +abstract class Stream<T> {
|
| + Stream();
|
| +
|
| + factory Stream.fromFuture(Future<T> future) {
|
| + var controller = new StreamController<T>();
|
| + future.then((value) {
|
| + controller.add(value);
|
| + controller.close();
|
| + },
|
| + onError: (error) {
|
| + controller.signalError(error);
|
| + controller.close();
|
| + });
|
| + return controller.stream;
|
| + }
|
| +
|
| + /**
|
| + * Stream that outputs events from the [sources] in cyclic order.
|
| + *
|
| + * The merged streams are paused and resumed in order to ensure the proper
|
| + * order of output events.
|
| + */
|
| + factory Stream.cyclic(Iterable<Stream> sources) = CyclicScheduleStream<T>;
|
| +
|
| + /**
|
| + * Create a stream that forwards data from the highest priority active source.
|
| + *
|
| + * Sources are provided in order of increasing priority, and only data from
|
| + * the highest priority source stream that has provided data are output
|
| + * on the created stream.
|
| + *
|
| + * Errors from the most recent active stream, and any higher priority stream,
|
| + * are forwarded to the created stream.
|
| + *
|
| + * If a higher priority source stream completes without providing data,
|
| + * it will have no effect on lower priority streams.
|
| + */
|
| + factory Stream.superceding(Iterable<Stream<T>> sources) = SupercedeStream<T>;
|
| +
|
| + /**
|
| + * Add a subscription to this stream.
|
| + *
|
| + * On each data event from this stream, the subscribers [onData] handler
|
| + * is called. If [onData] is null, nothing happens.
|
| + *
|
| + * On errors from this stream, the [onError] handler is given a
|
| + * [AsyncError] object describing the error.
|
| + *
|
| + * If this stream closes, the [onDone] handler is called.
|
| + *
|
| + * If [unsubscribeOnError] is true, the subscription is ended when
|
| + * the first error is reported. The default is false.
|
| + */
|
| + StreamSubscription<T> listen(void onData(T event),
|
| + { void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError});
|
| +
|
| + /**
|
| + * Creates a new stream from this stream that discards some data events.
|
| + *
|
| + * The new stream sends the same error and done events as this stream,
|
| + * but it only sends the data events that satisfy the [test].
|
| + */
|
| + Stream<T> where(bool test(T event)) {
|
| + return this.transform(new WhereStream<T>(test));
|
| + }
|
| +
|
| + /**
|
| + * Create a new stream that converts each element of this stream
|
| + * to a new value using the [convert] function.
|
| + */
|
| + Stream mappedBy(convert(T event)) {
|
| + return this.transform(new MapStream<T, dynamic>(convert));
|
| + }
|
| +
|
| + /**
|
| + * Create a wrapper Stream that intercepts some errors from this stream.
|
| + *
|
| + * If the handler returns null, the error is considered handled.
|
| + * Otherwise the returned [AsyncError] is passed to the subscribers
|
| + * of the stream.
|
| + */
|
| + Stream handleError(AsyncError handle(AsyncError error)) {
|
| + return this.transform(new HandleErrorStream<T>(handle));
|
| + }
|
| +
|
| + /**
|
| + * Create a new stream from this stream that converts each element
|
| + * into zero or more events.
|
| + *
|
| + * Each incoming event is converted to an [Iterable] of new events,
|
| + * and each of these new events are then sent by the returned stream
|
| + * in order.
|
| + */
|
| + Stream expand(Iterable convert(T value)) {
|
| + return this.transform(new ExpandStream<T, dynamic>(convert));
|
| + }
|
| +
|
| + /**
|
| + * Bind this stream as the input of the provided [StreamConsumer].
|
| + */
|
| + Future pipe(StreamConsumer<dynamic, T> streamConsumer) {
|
| + return streamConsumer.consume(this);
|
| + }
|
| +
|
| + /**
|
| + * Chain this stream as the input of the provided [StreamTransformer].
|
| + *
|
| + * Returns the result of [:streamTransformer.bind:] itself.
|
| + */
|
| + Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
|
| + return streamTransformer.bind(this);
|
| + }
|
| +
|
| +
|
| + /** Reduces a sequence of values by repeatedly applying [combine]. */
|
| + Future reduce(var initialValue, combine(var previous, T element)) {
|
| + Completer completer = new Completer();
|
| + var value = initialValue;
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T element) {
|
| + try {
|
| + value = combine(value, element);
|
| + } catch (e, s) {
|
| + subscription.cancel();
|
| + completer.completeError(e, s);
|
| + }
|
| + },
|
| + onError: (AsyncError e) {
|
| + completer.completeError(e.error, e.stackTrace);
|
| + },
|
| + onDone: () {
|
| + completer.complete(value);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return completer.future;
|
| + }
|
| +
|
| + // Deprecated method, previously called 'pipe', retained for compatibility.
|
| + Signal pipeInto(Sink<T> sink,
|
| + {void onError(AsyncError error),
|
| + bool unsubscribeOnError}) {
|
| + SignalCompleter completer = new SignalCompleter();
|
| + this.listen(
|
| + sink.add,
|
| + onError: onError,
|
| + onDone: () {
|
| + sink.close();
|
| + completer.complete();
|
| + },
|
| + unsubscribeOnError: unsubscribeOnError);
|
| + return completer.signal;
|
| + }
|
| +
|
| +
|
| + /**
|
| + * Check whether [match] occurs in the elements provided by this stream.
|
| + *
|
| + * Completes the [Future] when the answer is known.
|
| + * If this stream reports an error, the [Future] will report that error.
|
| + */
|
| + Future<bool> contains(T match) {
|
| + _FutureImpl<bool> future = new _FutureImpl<bool>();
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T element) {
|
| + if (element == match) {
|
| + subscription.cancel();
|
| + future._setValue(true);
|
| + }
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(false);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Check whether [test] accepts all elements provided by this stream.
|
| + *
|
| + * Completes the [Future] when the answer is known.
|
| + * If this stream reports an error, the [Future] will report that error.
|
| + */
|
| + Future<bool> every(bool test(T element)) {
|
| + _FutureImpl<bool> future = new _FutureImpl<bool>();
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T element) {
|
| + if (!test(element)) {
|
| + subscription.cancel();
|
| + future._setValue(false);
|
| + }
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(true);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Check whether [test] accepts any element provided by this stream.
|
| + *
|
| + * Completes the [Future] when the answer is known.
|
| + * If this stream reports an error, the [Future] will report that error.
|
| + */
|
| + Future<bool> any(bool test(T element)) {
|
| + _FutureImpl<bool> future = new _FutureImpl<bool>();
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T element) {
|
| + if (test(element)) {
|
| + subscription.cancel();
|
| + future._setValue(true);
|
| + }
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(false);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| +
|
| + /** Counts the elements in the stream. */
|
| + Future<int> get length {
|
| + _FutureImpl<int> future = new _FutureImpl<int>();
|
| + int count = 0;
|
| + this.listen(
|
| + (_) { count++; },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(count);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Finds the least element in the stream.
|
| + *
|
| + * If the stream is empty, the result is [:null:].
|
| + * Otherwise the result is a value from the stream that is not greater
|
| + * than any other value from the stream (according to [compare], which must
|
| + * be a [Comparator]).
|
| + *
|
| + * If [compare] is omitted, it defaults to [Comparable.compare].
|
| + */
|
| + Future<T> min([int compare(T a, T b)]) {
|
| + if (compare == null) compare = Comparable.compare;
|
| + _FutureImpl<T> future = new _FutureImpl<T>();
|
| + StreamSubscription subscription;
|
| + T min = null;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + min = value;
|
| + subscription.onData((T value) {
|
| + if (compare(min, value) > 0) min = value;
|
| + });
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(min);
|
| + },
|
| + unsubscribeOnError: true
|
| + );
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Finds the least element in the stream.
|
| + *
|
| + * If the stream is empty, the result is [:null:].
|
| + * Otherwise the result is an value from the stream that is not greater
|
| + * than any other value from the stream (according to [compare], which must
|
| + * be a [Comparator]).
|
| + *
|
| + * If [compare] is omitted, it defaults to [Comparable.compare].
|
| + */
|
| + Future<T> max([int compare(T a, T b)]) {
|
| + if (compare == null) compare = Comparable.compare;
|
| + _FutureImpl<T> future = new _FutureImpl<T>();
|
| + StreamSubscription subscription;
|
| + T max = null;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + max = value;
|
| + subscription.onData((T value) {
|
| + if (compare(max, value) < 0) max = value;
|
| + });
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(max);
|
| + },
|
| + unsubscribeOnError: true
|
| + );
|
| + return future;
|
| + }
|
| +
|
| + /** Reports whether this stream contains any elements. */
|
| + Future<bool> get isEmpty {
|
| + _FutureImpl<bool> future = new _FutureImpl<bool>();
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (_) {
|
| + subscription.cancel();
|
| + future._setValue(false);
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(true);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /** Collect the data of this stream in a [List]. */
|
| + Future<List<T>> toList() {
|
| + List<T> result = <T>[];
|
| + _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
|
| + this.listen(
|
| + (T data) {
|
| + result.add(data);
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(result);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /** Collect the data of this stream in a [Set]. */
|
| + Future<Set<T>> toSet() {
|
| + Set<T> result = new Set<T>();
|
| + _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
|
| + this.listen(
|
| + (T data) {
|
| + result.add(data);
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setValue(result);
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Provide at most the first [n] values of this stream.
|
| + *
|
| + * Forwards the first [n] data events of this stream, and all error
|
| + * events, to the returned stream, and ends with a done event.
|
| + *
|
| + * If this stream produces fewer than [count] values before it's done,
|
| + * so will the returned stream.
|
| + */
|
| + Stream<T> take(int count) {
|
| + return this.transform(new TakeStream(count));
|
| + }
|
| +
|
| + /**
|
| + * Forwards data events while [test] is successful.
|
| + *
|
| + * The returned stream provides the same events as this stream as long
|
| + * as [test] returns [:true:] for the event data. The stream is done
|
| + * when either this stream is done, or when this stream first provides
|
| + * a value that [test] doesn't accept.
|
| + */
|
| + Stream<T> takeWhile(bool test(T value)) {
|
| + return this.transform(new TakeWhileStream(test));
|
| + }
|
| +
|
| + /**
|
| + * Skips the first [count] data events from this stream.
|
| + */
|
| + Stream<T> skip(int count) {
|
| + return this.transform(new SkipStream(count));
|
| + }
|
| +
|
| + /**
|
| + * Skip data events from this stream while they are matched by [test].
|
| + *
|
| + * Error and done events are provided by the returned stream unmodified.
|
| + *
|
| + * Starting with the first data event where [test] returns true for the
|
| + * event data, the returned stream will have the same events as this stream.
|
| + */
|
| + Stream<T> skipWhile(bool test(T value)) {
|
| + return this.transform(new SkipWhileStream(test));
|
| + }
|
| +
|
| + /**
|
| + * Skip data events if they are equal to the previous data event.
|
| + *
|
| + * The returned stream provides the same events as this stream, except
|
| + * that it never provides two consequtive data events that are equal.
|
| + *
|
| + * Equality is determined by the provided [equals] method. If that is
|
| + * omitted, the '==' operator on the last provided data element is used.
|
| + */
|
| + Stream<T> distinct([bool equals(T previous, T next)]) {
|
| + return this.transform(new DistinctStream(equals));
|
| + }
|
| +
|
| + /**
|
| + * Returns the first element.
|
| + *
|
| + * If [this] is empty throws a [StateError]. Otherwise this method is
|
| + * equivalent to [:this.elementAt(0):]
|
| + */
|
| + Future<T> get first {
|
| + _FutureImpl<T> future = new _FutureImpl();
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + future._setValue(value);
|
| + subscription.cancel();
|
| + return;
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setError(new AsyncError(new StateError("No elements")));
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Returns the last element.
|
| + *
|
| + * If [this] is empty throws a [StateError].
|
| + */
|
| + Future<T> get last {
|
| + _FutureImpl<T> future = new _FutureImpl<T>();
|
| + T result = null;
|
| + bool foundResult = false;
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + foundResult = true;
|
| + result = value;
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._setValue(result);
|
| + return;
|
| + }
|
| + future._setError(new AsyncError(new StateError("No elements")));
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Returns the single element.
|
| + *
|
| + * If [this] is empty or has more than one element throws a [StateError].
|
| + */
|
| + Future<T> get single {
|
| + _FutureImpl<T> future = new _FutureImpl<T>();
|
| + T result = null;
|
| + bool foundResult = false;
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + if (foundResult) {
|
| + // This is the second element we get.
|
| + Error error = new StateError("More than one element");
|
| + future._setError(new AsyncError(error));
|
| + subscription.cancel();
|
| + return;
|
| + }
|
| + foundResult = true;
|
| + result = value;
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._setValue(result);
|
| + return;
|
| + }
|
| + future._setError(new AsyncError(new StateError("No elements")));
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Find the first element of this stream matching [test].
|
| + *
|
| + * Returns a future that is filled with the first element of this stream
|
| + * that [test] returns true for.
|
| + *
|
| + * If no such element is found before this stream is done, and a
|
| + * [defaultValue] function is provided, the result of calling [defaultValue]
|
| + * becomes the value of the future.
|
| + *
|
| + * If an error occurs, or if this stream ends without finding a match and
|
| + * with no [defaultValue] function provided, the future will receive an
|
| + * error.
|
| + */
|
| + Future<T> firstMatching(bool test(T value), {T defaultValue()}) {
|
| + _FutureImpl<T> future = new _FutureImpl<T>();
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + bool matches;
|
| + try {
|
| + matches = (true == test(value));
|
| + } catch (e, s) {
|
| + future._setError(new AsyncError(e, s));
|
| + subscription.cancel();
|
| + return;
|
| + }
|
| + if (matches) {
|
| + future._setValue(value);
|
| + subscription.cancel();
|
| + }
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + if (defaultValue != null) {
|
| + T value;
|
| + try {
|
| + value = defaultValue();
|
| + } catch (e, s) {
|
| + future._setError(new AsyncError(e, s));
|
| + return;
|
| + }
|
| + future._setValue(value);
|
| + return;
|
| + }
|
| + future._setError(
|
| + new AsyncError(new StateError("firstMatch ended without match")));
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Finds the last element in this stream matching [test].
|
| + *
|
| + * As [firstMatching], except that the last matching element is found.
|
| + * That means that the result cannot be provided before this stream
|
| + * is done.
|
| + */
|
| + Future<T> lastMatching(bool test(T value), {T defaultValue()}) {
|
| + _FutureImpl<T> future = new _FutureImpl<T>();
|
| + T result = null;
|
| + bool foundResult = false;
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + bool matches;
|
| + try {
|
| + matches = (true == test(value));
|
| + } catch (e, s) {
|
| + future._setError(new AsyncError(e, s));
|
| + subscription.cancel();
|
| + return;
|
| + }
|
| + if (matches) {
|
| + foundResult = true;
|
| + result = value;
|
| + }
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._setValue(result);
|
| + return;
|
| + }
|
| + if (defaultValue != null) {
|
| + T value;
|
| + try {
|
| + value = defaultValue();
|
| + } catch (e, s) {
|
| + future._setError(new AsyncError(e, s));
|
| + return;
|
| + }
|
| + future._setValue(value);
|
| + return;
|
| + }
|
| + future._setError(
|
| + new AsyncError(new StateError("lastMatch ended without match")));
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Finds the single element in this stream matching [test].
|
| + *
|
| + * Like [lastMatch], except that it is an error if more than one
|
| + * matching element occurs in the stream.
|
| + */
|
| + Future<T> singleMatching(bool test(T value)) {
|
| + _FutureImpl<T> future = new _FutureImpl<T>();
|
| + T result = null;
|
| + bool foundResult = false;
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + bool matches;
|
| + try {
|
| + matches = (true == test(value));
|
| + } catch (e, s) {
|
| + future._setError(new AsyncError(e, s));
|
| + subscription.cancel();
|
| + return;
|
| + }
|
| + if (matches) {
|
| + if (foundResult) {
|
| + future._setError(new AsyncError(
|
| + new StateError('Multiple matches for "single"')));
|
| + subscription.cancel();
|
| + return;
|
| + }
|
| + foundResult = true;
|
| + result = value;
|
| + }
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._setValue(result);
|
| + return;
|
| + }
|
| + future._setError(
|
| + new AsyncError(new StateError("single ended without match")));
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +
|
| + /**
|
| + * Returns the value of the [index]th data event of this stream.
|
| + *
|
| + * If an error event occurs, the future will end with this error.
|
| + *
|
| + * If this stream provides fewer than [index] elements before closing,
|
| + * an error is reported.
|
| + */
|
| + Future<T> elementAt(int index) {
|
| + _FutureImpl<T> future = new _FutureImpl();
|
| + StreamSubscription subscription;
|
| + subscription = this.listen(
|
| + (T value) {
|
| + if (index == 0) {
|
| + future._setValue(value);
|
| + subscription.cancel();
|
| + return;
|
| + }
|
| + index -= 1;
|
| + },
|
| + onError: future._setError,
|
| + onDone: () {
|
| + future._setError(new AsyncError(
|
| + new StateError("Not enough elements for elementAt")));
|
| + },
|
| + unsubscribeOnError: true);
|
| + return future;
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * A control object for the subscription on a [Stream].
|
| + *
|
| + * When you subscribe on a [Stream] using [Stream.subscribe],
|
| + * a [StreamSubscription] object is returned. This object
|
| + * is used to later unsubscribe again, or to temporarily pause
|
| + * the stream's events.
|
| + */
|
| +abstract class StreamSubscription<T> {
|
| + /**
|
| + * Cancels this subscription. It will no longer receive events.
|
| + *
|
| + * If an event is currently firing, this unsubscription will only
|
| + * take effect after all subscribers have received the current event.
|
| + */
|
| + void cancel();
|
| +
|
| + /** Set or override the data event handler of this subscription. */
|
| + void onData(void handleData(T data));
|
| +
|
| + /** Set or override the error event handler of this subscription. */
|
| + void onError(void handleError(AsyncError error));
|
| +
|
| + /** Set or override the done event handler of this subscription. */
|
| + void onDone(void handleDone());
|
| +
|
| + /**
|
| + * Request that the stream pauses events until further notice.
|
| + *
|
| + * If [resumeSignal] is provided, the stream will undo the pause
|
| + * when the signal completes.
|
| + * A call to [resume] will also undo a pause.
|
| + *
|
| + * If the subscription is paused more than once, an equal number
|
| + * of resumes must be performed to resume the stream.
|
| + */
|
| + void pause([Signal resumeSignal]);
|
| +
|
| + /**
|
| + * Resume after a pause.
|
| + */
|
| + void resume();
|
| +}
|
| +
|
| +
|
| +/**
|
| + * An interface that abstracts sending events into a [Stream].
|
| + */
|
| +abstract class StreamSink<T> implements Sink<T> {
|
| + void add(T event);
|
| + /** Signal an async error to the receivers of this sink's values. */
|
| + void signalError(AsyncError errorEvent);
|
| + void close();
|
| +}
|
| +
|
| +/** [Stream] wrapper that only exposes the [Stream] interface. */
|
| +class StreamView<T> extends Stream<T> {
|
| + Stream<T> _stream;
|
| +
|
| + StreamView(this._stream);
|
| +
|
| + StreamSubscription<T> listen(void onData(T value),
|
| + { void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError }) {
|
| + return _stream.listen(onData, onError: onError, onDone: onDone,
|
| + unsubscribeOnError: unsubscribeOnError);
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * [StreamSink] wrapper that only exposes the [StreamSink] interface.
|
| + */
|
| +class StreamSinkView<T> implements StreamSink<T> {
|
| + final StreamSink<T> _sink;
|
| +
|
| + StreamSinkView(this._sink);
|
| +
|
| + void add(T value) { _sink.add(value); }
|
| + void signalError(AsyncError error) { _sink.signalError(error); }
|
| + void close() { _sink.close(); }
|
| +}
|
| +
|
| +
|
| +/**
|
| + * The target of a [Stream.pipe] call.
|
| + *
|
| + * The [Stream.pipe] call will pass itself to this object, and then return
|
| + * the resulting [Future]. The pipe should complete the future when it's
|
| + * done.
|
| + */
|
| +abstract class StreamConsumer<S, T> {
|
| + Future<T> consume(Stream<S> stream);
|
| +}
|
| +
|
| +/**
|
| + * The target of a [Stream.transform] call.
|
| + *
|
| + * The [Stream.transform] call will pass itself to this object and then return
|
| + * the resulting stream.
|
| + */
|
| +abstract class StreamTransformer<S, T> {
|
| + /**
|
| + * Create a [StreamTransformer] that delegates events to the given functions.
|
| + *
|
| + * If a parameter is omitted, a default handler is used that forwards the
|
| + * event directly to the sink.
|
| + *
|
| + * Pauses on the are forwarded to the input stream as well.
|
| + */
|
| + factory StreamTransformer.from({
|
| + void onData(S data, StreamSink<T> sink),
|
| + void onError(AsyncError error, StreamSink<T> sink),
|
| + void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper;
|
| +
|
| + Stream<T> bind(Stream<S> stream);
|
| +}
|
| +
|
| +
|
| +// TODO(lrn): Remove this class.
|
| +/**
|
| + * A base class for configuration objects for [TransformStream].
|
| + *
|
| + * A default implementation forwards all incoming events to the output sink.
|
| + */
|
| +abstract class _StreamTransformer<S, T> implements StreamTransformer<S, T> {
|
| + const _StreamTransformer();
|
| +
|
| + Stream<T> bind(Stream<S> input) {
|
| + return input.transform(new TransformStream<S, T>(this));
|
| + }
|
| +
|
| + /**
|
| + * Handle an incoming data event.
|
| + */
|
| + void handleData(S data, StreamSink<T> sink) {
|
| + var outData = data;
|
| + return sink.add(outData);
|
| + }
|
| +
|
| + /**
|
| + * Handle an incoming error event.
|
| + */
|
| + void handleError(AsyncError error, StreamSink<T> sink) {
|
| + sink.signalError(error);
|
| + }
|
| +
|
| + /**
|
| + * Handle an incoming done event.
|
| + */
|
| + void handleDone(StreamSink<T> sink) {
|
| + sink.close();
|
| + }
|
| +}
|
|
|