Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1198)

Unified Diff: sdk/lib/async/stream.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/signal.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
new file mode 100644
index 0000000000000000000000000000000000000000..ffa10233ca043a6f3d0658816d3b15c251e44d4c
--- /dev/null
+++ b/sdk/lib/async/stream.dart
@@ -0,0 +1,838 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// part of dart.async;
+
+// -------------------------------------------------------------------
+// Core Stream types
+// -------------------------------------------------------------------
+
+abstract class Stream<T> {
+ Stream();
+
+ factory Stream.fromFuture(Future<T> future) {
+ var controller = new StreamController<T>();
+ future.then((value) {
+ controller.add(value);
+ controller.close();
+ },
+ onError: (error) {
+ controller.signalError(error);
+ controller.close();
+ });
+ return controller.stream;
+ }
+
+ /**
+ * Stream that outputs events from the [sources] in cyclic order.
+ *
+ * The merged streams are paused and resumed in order to ensure the proper
+ * order of output events.
+ */
+ factory Stream.cyclic(Iterable<Stream> sources) = CyclicScheduleStream<T>;
+
+ /**
+ * Create a stream that forwards data from the highest priority active source.
+ *
+ * Sources are provided in order of increasing priority, and only data from
+ * the highest priority source stream that has provided data are output
+ * on the created stream.
+ *
+ * Errors from the most recent active stream, and any higher priority stream,
+ * are forwarded to the created stream.
+ *
+ * If a higher priority source stream completes without providing data,
+ * it will have no effect on lower priority streams.
+ */
+ factory Stream.superceding(Iterable<Stream<T>> sources) = SupercedeStream<T>;
+
+ /**
+ * Add a subscription to this stream.
+ *
+ * On each data event from this stream, the subscribers [onData] handler
+ * is called. If [onData] is null, nothing happens.
+ *
+ * On errors from this stream, the [onError] handler is given a
+ * [AsyncError] object describing the error.
+ *
+ * If this stream closes, the [onDone] handler is called.
+ *
+ * If [unsubscribeOnError] is true, the subscription is ended when
+ * the first error is reported. The default is false.
+ */
+ StreamSubscription<T> listen(void onData(T event),
+ { void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError});
+
+ /**
+ * Creates a new stream from this stream that discards some data events.
+ *
+ * The new stream sends the same error and done events as this stream,
+ * but it only sends the data events that satisfy the [test].
+ */
+ Stream<T> where(bool test(T event)) {
+ return this.transform(new WhereStream<T>(test));
+ }
+
+ /**
+ * Create a new stream that converts each element of this stream
+ * to a new value using the [convert] function.
+ */
+ Stream mappedBy(convert(T event)) {
+ return this.transform(new MapStream<T, dynamic>(convert));
+ }
+
+ /**
+ * Create a wrapper Stream that intercepts some errors from this stream.
+ *
+ * If the handler returns null, the error is considered handled.
+ * Otherwise the returned [AsyncError] is passed to the subscribers
+ * of the stream.
+ */
+ Stream handleError(AsyncError handle(AsyncError error)) {
+ return this.transform(new HandleErrorStream<T>(handle));
+ }
+
+ /**
+ * Create a new stream from this stream that converts each element
+ * into zero or more events.
+ *
+ * Each incoming event is converted to an [Iterable] of new events,
+ * and each of these new events are then sent by the returned stream
+ * in order.
+ */
+ Stream expand(Iterable convert(T value)) {
+ return this.transform(new ExpandStream<T, dynamic>(convert));
+ }
+
+ /**
+ * Bind this stream as the input of the provided [StreamConsumer].
+ */
+ Future pipe(StreamConsumer<dynamic, T> streamConsumer) {
+ return streamConsumer.consume(this);
+ }
+
+ /**
+ * Chain this stream as the input of the provided [StreamTransformer].
+ *
+ * Returns the result of [:streamTransformer.bind:] itself.
+ */
+ Stream transform(StreamTransformer<T, dynamic> streamTransformer) {
+ return streamTransformer.bind(this);
+ }
+
+
+ /** Reduces a sequence of values by repeatedly applying [combine]. */
+ Future reduce(var initialValue, combine(var previous, T element)) {
+ Completer completer = new Completer();
+ var value = initialValue;
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T element) {
+ try {
+ value = combine(value, element);
+ } catch (e, s) {
+ subscription.cancel();
+ completer.completeError(e, s);
+ }
+ },
+ onError: (AsyncError e) {
+ completer.completeError(e.error, e.stackTrace);
+ },
+ onDone: () {
+ completer.complete(value);
+ },
+ unsubscribeOnError: true);
+ return completer.future;
+ }
+
+ // Deprecated method, previously called 'pipe', retained for compatibility.
+ Signal pipeInto(Sink<T> sink,
+ {void onError(AsyncError error),
+ bool unsubscribeOnError}) {
+ SignalCompleter completer = new SignalCompleter();
+ this.listen(
+ sink.add,
+ onError: onError,
+ onDone: () {
+ sink.close();
+ completer.complete();
+ },
+ unsubscribeOnError: unsubscribeOnError);
+ return completer.signal;
+ }
+
+
+ /**
+ * Check whether [match] occurs in the elements provided by this stream.
+ *
+ * Completes the [Future] when the answer is known.
+ * If this stream reports an error, the [Future] will report that error.
+ */
+ Future<bool> contains(T match) {
+ _FutureImpl<bool> future = new _FutureImpl<bool>();
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T element) {
+ if (element == match) {
+ subscription.cancel();
+ future._setValue(true);
+ }
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(false);
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Check whether [test] accepts all elements provided by this stream.
+ *
+ * Completes the [Future] when the answer is known.
+ * If this stream reports an error, the [Future] will report that error.
+ */
+ Future<bool> every(bool test(T element)) {
+ _FutureImpl<bool> future = new _FutureImpl<bool>();
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T element) {
+ if (!test(element)) {
+ subscription.cancel();
+ future._setValue(false);
+ }
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(true);
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Check whether [test] accepts any element provided by this stream.
+ *
+ * Completes the [Future] when the answer is known.
+ * If this stream reports an error, the [Future] will report that error.
+ */
+ Future<bool> any(bool test(T element)) {
+ _FutureImpl<bool> future = new _FutureImpl<bool>();
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T element) {
+ if (test(element)) {
+ subscription.cancel();
+ future._setValue(true);
+ }
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(false);
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+
+ /** Counts the elements in the stream. */
+ Future<int> get length {
+ _FutureImpl<int> future = new _FutureImpl<int>();
+ int count = 0;
+ this.listen(
+ (_) { count++; },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(count);
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Finds the least element in the stream.
+ *
+ * If the stream is empty, the result is [:null:].
+ * Otherwise the result is a value from the stream that is not greater
+ * than any other value from the stream (according to [compare], which must
+ * be a [Comparator]).
+ *
+ * If [compare] is omitted, it defaults to [Comparable.compare].
+ */
+ Future<T> min([int compare(T a, T b)]) {
+ if (compare == null) compare = Comparable.compare;
+ _FutureImpl<T> future = new _FutureImpl<T>();
+ StreamSubscription subscription;
+ T min = null;
+ subscription = this.listen(
+ (T value) {
+ min = value;
+ subscription.onData((T value) {
+ if (compare(min, value) > 0) min = value;
+ });
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(min);
+ },
+ unsubscribeOnError: true
+ );
+ return future;
+ }
+
+ /**
+ * Finds the least element in the stream.
+ *
+ * If the stream is empty, the result is [:null:].
+ * Otherwise the result is an value from the stream that is not greater
+ * than any other value from the stream (according to [compare], which must
+ * be a [Comparator]).
+ *
+ * If [compare] is omitted, it defaults to [Comparable.compare].
+ */
+ Future<T> max([int compare(T a, T b)]) {
+ if (compare == null) compare = Comparable.compare;
+ _FutureImpl<T> future = new _FutureImpl<T>();
+ StreamSubscription subscription;
+ T max = null;
+ subscription = this.listen(
+ (T value) {
+ max = value;
+ subscription.onData((T value) {
+ if (compare(max, value) < 0) max = value;
+ });
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(max);
+ },
+ unsubscribeOnError: true
+ );
+ return future;
+ }
+
+ /** Reports whether this stream contains any elements. */
+ Future<bool> get isEmpty {
+ _FutureImpl<bool> future = new _FutureImpl<bool>();
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (_) {
+ subscription.cancel();
+ future._setValue(false);
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(true);
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /** Collect the data of this stream in a [List]. */
+ Future<List<T>> toList() {
+ List<T> result = <T>[];
+ _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
+ this.listen(
+ (T data) {
+ result.add(data);
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(result);
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /** Collect the data of this stream in a [Set]. */
+ Future<Set<T>> toSet() {
+ Set<T> result = new Set<T>();
+ _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
+ this.listen(
+ (T data) {
+ result.add(data);
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setValue(result);
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Provide at most the first [n] values of this stream.
+ *
+ * Forwards the first [n] data events of this stream, and all error
+ * events, to the returned stream, and ends with a done event.
+ *
+ * If this stream produces fewer than [count] values before it's done,
+ * so will the returned stream.
+ */
+ Stream<T> take(int count) {
+ return this.transform(new TakeStream(count));
+ }
+
+ /**
+ * Forwards data events while [test] is successful.
+ *
+ * The returned stream provides the same events as this stream as long
+ * as [test] returns [:true:] for the event data. The stream is done
+ * when either this stream is done, or when this stream first provides
+ * a value that [test] doesn't accept.
+ */
+ Stream<T> takeWhile(bool test(T value)) {
+ return this.transform(new TakeWhileStream(test));
+ }
+
+ /**
+ * Skips the first [count] data events from this stream.
+ */
+ Stream<T> skip(int count) {
+ return this.transform(new SkipStream(count));
+ }
+
+ /**
+ * Skip data events from this stream while they are matched by [test].
+ *
+ * Error and done events are provided by the returned stream unmodified.
+ *
+ * Starting with the first data event where [test] returns true for the
+ * event data, the returned stream will have the same events as this stream.
+ */
+ Stream<T> skipWhile(bool test(T value)) {
+ return this.transform(new SkipWhileStream(test));
+ }
+
+ /**
+ * Skip data events if they are equal to the previous data event.
+ *
+ * The returned stream provides the same events as this stream, except
+ * that it never provides two consequtive data events that are equal.
+ *
+ * Equality is determined by the provided [equals] method. If that is
+ * omitted, the '==' operator on the last provided data element is used.
+ */
+ Stream<T> distinct([bool equals(T previous, T next)]) {
+ return this.transform(new DistinctStream(equals));
+ }
+
+ /**
+ * Returns the first element.
+ *
+ * If [this] is empty throws a [StateError]. Otherwise this method is
+ * equivalent to [:this.elementAt(0):]
+ */
+ Future<T> get first {
+ _FutureImpl<T> future = new _FutureImpl();
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T value) {
+ future._setValue(value);
+ subscription.cancel();
+ return;
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setError(new AsyncError(new StateError("No elements")));
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Returns the last element.
+ *
+ * If [this] is empty throws a [StateError].
+ */
+ Future<T> get last {
+ _FutureImpl<T> future = new _FutureImpl<T>();
+ T result = null;
+ bool foundResult = false;
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T value) {
+ foundResult = true;
+ result = value;
+ },
+ onError: future._setError,
+ onDone: () {
+ if (foundResult) {
+ future._setValue(result);
+ return;
+ }
+ future._setError(new AsyncError(new StateError("No elements")));
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Returns the single element.
+ *
+ * If [this] is empty or has more than one element throws a [StateError].
+ */
+ Future<T> get single {
+ _FutureImpl<T> future = new _FutureImpl<T>();
+ T result = null;
+ bool foundResult = false;
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T value) {
+ if (foundResult) {
+ // This is the second element we get.
+ Error error = new StateError("More than one element");
+ future._setError(new AsyncError(error));
+ subscription.cancel();
+ return;
+ }
+ foundResult = true;
+ result = value;
+ },
+ onError: future._setError,
+ onDone: () {
+ if (foundResult) {
+ future._setValue(result);
+ return;
+ }
+ future._setError(new AsyncError(new StateError("No elements")));
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Find the first element of this stream matching [test].
+ *
+ * Returns a future that is filled with the first element of this stream
+ * that [test] returns true for.
+ *
+ * If no such element is found before this stream is done, and a
+ * [defaultValue] function is provided, the result of calling [defaultValue]
+ * becomes the value of the future.
+ *
+ * If an error occurs, or if this stream ends without finding a match and
+ * with no [defaultValue] function provided, the future will receive an
+ * error.
+ */
+ Future<T> firstMatching(bool test(T value), {T defaultValue()}) {
+ _FutureImpl<T> future = new _FutureImpl<T>();
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T value) {
+ bool matches;
+ try {
+ matches = (true == test(value));
+ } catch (e, s) {
+ future._setError(new AsyncError(e, s));
+ subscription.cancel();
+ return;
+ }
+ if (matches) {
+ future._setValue(value);
+ subscription.cancel();
+ }
+ },
+ onError: future._setError,
+ onDone: () {
+ if (defaultValue != null) {
+ T value;
+ try {
+ value = defaultValue();
+ } catch (e, s) {
+ future._setError(new AsyncError(e, s));
+ return;
+ }
+ future._setValue(value);
+ return;
+ }
+ future._setError(
+ new AsyncError(new StateError("firstMatch ended without match")));
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Finds the last element in this stream matching [test].
+ *
+ * As [firstMatching], except that the last matching element is found.
+ * That means that the result cannot be provided before this stream
+ * is done.
+ */
+ Future<T> lastMatching(bool test(T value), {T defaultValue()}) {
+ _FutureImpl<T> future = new _FutureImpl<T>();
+ T result = null;
+ bool foundResult = false;
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T value) {
+ bool matches;
+ try {
+ matches = (true == test(value));
+ } catch (e, s) {
+ future._setError(new AsyncError(e, s));
+ subscription.cancel();
+ return;
+ }
+ if (matches) {
+ foundResult = true;
+ result = value;
+ }
+ },
+ onError: future._setError,
+ onDone: () {
+ if (foundResult) {
+ future._setValue(result);
+ return;
+ }
+ if (defaultValue != null) {
+ T value;
+ try {
+ value = defaultValue();
+ } catch (e, s) {
+ future._setError(new AsyncError(e, s));
+ return;
+ }
+ future._setValue(value);
+ return;
+ }
+ future._setError(
+ new AsyncError(new StateError("lastMatch ended without match")));
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Finds the single element in this stream matching [test].
+ *
+ * Like [lastMatch], except that it is an error if more than one
+ * matching element occurs in the stream.
+ */
+ Future<T> singleMatching(bool test(T value)) {
+ _FutureImpl<T> future = new _FutureImpl<T>();
+ T result = null;
+ bool foundResult = false;
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T value) {
+ bool matches;
+ try {
+ matches = (true == test(value));
+ } catch (e, s) {
+ future._setError(new AsyncError(e, s));
+ subscription.cancel();
+ return;
+ }
+ if (matches) {
+ if (foundResult) {
+ future._setError(new AsyncError(
+ new StateError('Multiple matches for "single"')));
+ subscription.cancel();
+ return;
+ }
+ foundResult = true;
+ result = value;
+ }
+ },
+ onError: future._setError,
+ onDone: () {
+ if (foundResult) {
+ future._setValue(result);
+ return;
+ }
+ future._setError(
+ new AsyncError(new StateError("single ended without match")));
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+
+ /**
+ * Returns the value of the [index]th data event of this stream.
+ *
+ * If an error event occurs, the future will end with this error.
+ *
+ * If this stream provides fewer than [index] elements before closing,
+ * an error is reported.
+ */
+ Future<T> elementAt(int index) {
+ _FutureImpl<T> future = new _FutureImpl();
+ StreamSubscription subscription;
+ subscription = this.listen(
+ (T value) {
+ if (index == 0) {
+ future._setValue(value);
+ subscription.cancel();
+ return;
+ }
+ index -= 1;
+ },
+ onError: future._setError,
+ onDone: () {
+ future._setError(new AsyncError(
+ new StateError("Not enough elements for elementAt")));
+ },
+ unsubscribeOnError: true);
+ return future;
+ }
+}
+
+/**
+ * A control object for the subscription on a [Stream].
+ *
+ * When you subscribe on a [Stream] using [Stream.subscribe],
+ * a [StreamSubscription] object is returned. This object
+ * is used to later unsubscribe again, or to temporarily pause
+ * the stream's events.
+ */
+abstract class StreamSubscription<T> {
+ /**
+ * Cancels this subscription. It will no longer receive events.
+ *
+ * If an event is currently firing, this unsubscription will only
+ * take effect after all subscribers have received the current event.
+ */
+ void cancel();
+
+ /** Set or override the data event handler of this subscription. */
+ void onData(void handleData(T data));
+
+ /** Set or override the error event handler of this subscription. */
+ void onError(void handleError(AsyncError error));
+
+ /** Set or override the done event handler of this subscription. */
+ void onDone(void handleDone());
+
+ /**
+ * Request that the stream pauses events until further notice.
+ *
+ * If [resumeSignal] is provided, the stream will undo the pause
+ * when the signal completes.
+ * A call to [resume] will also undo a pause.
+ *
+ * If the subscription is paused more than once, an equal number
+ * of resumes must be performed to resume the stream.
+ */
+ void pause([Signal resumeSignal]);
+
+ /**
+ * Resume after a pause.
+ */
+ void resume();
+}
+
+
+/**
+ * An interface that abstracts sending events into a [Stream].
+ */
+abstract class StreamSink<T> implements Sink<T> {
+ void add(T event);
+ /** Signal an async error to the receivers of this sink's values. */
+ void signalError(AsyncError errorEvent);
+ void close();
+}
+
+/** [Stream] wrapper that only exposes the [Stream] interface. */
+class StreamView<T> extends Stream<T> {
+ Stream<T> _stream;
+
+ StreamView(this._stream);
+
+ StreamSubscription<T> listen(void onData(T value),
+ { void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError }) {
+ return _stream.listen(onData, onError: onError, onDone: onDone,
+ unsubscribeOnError: unsubscribeOnError);
+ }
+}
+
+/**
+ * [StreamSink] wrapper that only exposes the [StreamSink] interface.
+ */
+class StreamSinkView<T> implements StreamSink<T> {
+ final StreamSink<T> _sink;
+
+ StreamSinkView(this._sink);
+
+ void add(T value) { _sink.add(value); }
+ void signalError(AsyncError error) { _sink.signalError(error); }
+ void close() { _sink.close(); }
+}
+
+
+/**
+ * The target of a [Stream.pipe] call.
+ *
+ * The [Stream.pipe] call will pass itself to this object, and then return
+ * the resulting [Future]. The pipe should complete the future when it's
+ * done.
+ */
+abstract class StreamConsumer<S, T> {
+ Future<T> consume(Stream<S> stream);
+}
+
+/**
+ * The target of a [Stream.transform] call.
+ *
+ * The [Stream.transform] call will pass itself to this object and then return
+ * the resulting stream.
+ */
+abstract class StreamTransformer<S, T> {
+ /**
+ * Create a [StreamTransformer] that delegates events to the given functions.
+ *
+ * If a parameter is omitted, a default handler is used that forwards the
+ * event directly to the sink.
+ *
+ * Pauses on the are forwarded to the input stream as well.
+ */
+ factory StreamTransformer.from({
+ void onData(S data, StreamSink<T> sink),
+ void onError(AsyncError error, StreamSink<T> sink),
+ void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper;
+
+ Stream<T> bind(Stream<S> stream);
+}
+
+
+// TODO(lrn): Remove this class.
+/**
+ * A base class for configuration objects for [TransformStream].
+ *
+ * A default implementation forwards all incoming events to the output sink.
+ */
+abstract class _StreamTransformer<S, T> implements StreamTransformer<S, T> {
+ const _StreamTransformer();
+
+ Stream<T> bind(Stream<S> input) {
+ return input.transform(new TransformStream<S, T>(this));
+ }
+
+ /**
+ * Handle an incoming data event.
+ */
+ void handleData(S data, StreamSink<T> sink) {
+ var outData = data;
+ return sink.add(outData);
+ }
+
+ /**
+ * Handle an incoming error event.
+ */
+ void handleError(AsyncError error, StreamSink<T> sink) {
+ sink.signalError(error);
+ }
+
+ /**
+ * Handle an incoming done event.
+ */
+ void handleDone(StreamSink<T> sink) {
+ sink.close();
+ }
+}
« no previous file with comments | « sdk/lib/async/signal.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698