Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1315)

Unified Diff: lib/src/stream_transformers.dart

Issue 1648963002: Add reactive-inspired stream transformers: Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/stream_transformers.dart
diff --git a/lib/src/stream_transformers.dart b/lib/src/stream_transformers.dart
new file mode 100644
index 0000000000000000000000000000000000000000..f3eb4d9c97542f9b624fcbee0d728010ee141e11
--- /dev/null
+++ b/lib/src/stream_transformers.dart
@@ -0,0 +1,282 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
nweiz 2016/01/29 22:19:44 These transformers should all have explicit docume
Lasse Reichstein Nielsen 2016/02/01 12:43:20 True.
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+import "dart:collection";
+
+/// A group created by [GroupBy].
+///
+/// The stream created by [GroupBy] emits a [Group] for each distinct key
+/// it sees.
+/// This group contains the key itself, along with a stream of the values
+/// associated with that key.
+class Group<K, V> {
+ final K key;
nweiz 2016/01/29 22:19:45 It would be good to have some documentation for th
Lasse Reichstein Nielsen 2016/02/25 16:18:09 Done.
+ final Stream<V> values;
+ Group(this.key, this.values);
+}
+
+/**
+ * Groups events by a computed key.
+ *
+ * A key is extracted from incoming events, and a stream is created for each
+ * unique key. A value is computed from the event as well, and emitted on the
+ * corresponding stream.
+ *
+ * An error on the source stream, or when calling the `key` or `value`
+ * functions, will emit the error on the returned stream and stop
+ * processing, closing all the individual group streams.
+ *
+ * The returned stream emits a [Group] object for each distinct key seen
+ * by the transformation, and the values associated with the key are output
+ * on the [Group.values] stream.
+ */
nweiz 2016/01/29 22:19:45 Nit: These should be "///" comments.
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
+class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> {
nweiz 2016/01/29 22:19:45 Personally, I'd put each of these into its own fil
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
+ final Function _key;
+ final Function _value;
+
+ /// Groups values returned by [element] by the key returned by [key].
nweiz 2016/01/29 22:19:45 "element" -> "value"
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
+ ///
+ /// If [value] is omitted, it defaults to an identity function, which
+ /// will only work if all the source events are instances of [V].
+ GroupBy(K key(S source), [V value(S source)])
nweiz 2016/01/29 22:19:44 Can we make [value] a named parameter? It makes th
Lasse Reichstein Nielsen 2016/02/01 12:43:20 We could probably make both named parameters. (Ma
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
+ : _key = key, _value = value ?? _identity;
nweiz 2016/01/29 22:19:45 Nit: put each of these on its own line (https://ww
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done. Grudgingly.
+
+ // Helper function.
+ static _identity(x) => x;
+
+ Stream<Group<K, V>> bind(Stream<S> stream) async* {
nweiz 2016/01/29 22:19:44 Last I heard, "async*" was really slow on the VM.
Lasse Reichstein Nielsen 2016/02/01 12:43:20 They should fix that then. We have this syntax bec
nweiz 2016/02/02 02:29:13 I agree for the most part (although in some situat
Lasse Reichstein Nielsen 2016/02/25 16:18:10 I'd prefer to land this as written, then do a comp
+ var controllers = new HashMap();
nweiz 2016/01/29 22:19:45 "{}" would be a lot more idiomatic, and would make
Lasse Reichstein Nielsen 2016/02/01 12:43:20 But I don't want to have a deterministic order (or
nweiz 2016/02/02 02:29:13 I wrote a lot of Ruby code in the days when hashes
Lasse Reichstein Nielsen 2016/02/25 16:18:10 In this case, we do leak the order by closing the
+ try {
+ await for (S source in stream) {
nweiz 2016/01/29 22:19:45 I'd use "var source" here and "var value" below.
Lasse Reichstein Nielsen 2016/02/25 16:18:10 For value it's actually deliberate (but for source
+ var key = _key(source);
+ var controller = controllers[key];
+ if (controller == null) {
+ controller = new StreamController<V>(sync: true);
+ controllers[key] = controller;
+ yield new Group<K, V>(key, controller.stream);
+ }
+ V value = _value(source);
+ controller.add(value);
+ }
+ } finally {
+ for (var controller in controllers.values) {
+ controller.close();
+ }
+ }
+ }
+}
+
+/// Scans a stream's events and combine a result from the available events.
floitsch 2016/01/29 19:05:03 combines. But the sentence could be better. I'm n
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Reworded and example added.
+///
+/// Combines each event with the previous accumulator value
+/// and emits the result.
+///
+/// Errors in the source stream or when calling the combine function will
+/// be reported on the result stream, and will stop the process.
+class Scan<S, A> implements StreamTransformer<S, A> {
floitsch 2016/01/29 19:05:02 "Scan looks like a bad name to me".
nweiz 2016/01/29 22:19:45 How is this different than Stream.fold()?
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Stream.fold returns a single Future, this returns
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Name was taking from the reactive framework. Alter
+ final A _initial;
+ final Function _combine;
+
+ /// Accumulates stream events using [combine] starting with [initial].
+ Scan(A initial, A combine(A accumulator, S source))
+ : _initial = initial, _combine = combine;
+
+ Stream<A> bind(Stream<S> stream) async* {
+ A accumulator = _initial;
+ await for (S source in stream) {
+ accumulator = _combine(accumulator, source);
+ yield accumulator;
+ }
+ }
+}
+
+/// Drops any event that happens shortly after another event.
+///
+/// If an event happens within a certain [Duration] of the previous event,
+/// the new event is dropped, but the dropped event will still cause
+/// following events to be dropped until the entire duration has passed without
+/// any events.
+///
+/// This differs from [Throttle] which also drops events, but which only
+/// restarts its timer at events that aren't dropped.
+class Debounce<S> implements StreamTransformer<S, S> {
+ final Duration _interval;
+
+ /// Drops events sooner than [interval] after any other event.
+ Debounce(Duration interval) : _interval = interval;
nweiz 2016/01/29 22:19:45 I think this would be cleaner as "this._interval".
Lasse Reichstein Nielsen 2016/02/01 12:43:20 I will not make a public parameter of a public met
+
+ Stream<S> bind(Stream<S> stream) async* {
+ var interval = Duration.ZERO; // Avoid dropping the first event.
nweiz 2016/01/29 22:19:45 This seems a little too clever to me. What about:
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Wouldn't work - the comparison is in the wrong dir
nweiz 2016/02/02 02:29:13 Hmm, true.
+ var stopwatch = new Stopwatch()..start();
+ await for (var source in stream) {
+ if (stopwatch.elapsed >= interval) {
+ yield source;
+ interval = _interval;
+ }
+ stopwatch.reset();
+ }
+ stopwatch.stop();
+ }
+}
+
+/// Drops any event that happens shortly after another undropped event.
+///
+/// If an event happens within a certain [Duration] of the most recently
+/// output event, the new event is dropped.
+/// When the duration has passed after an event that wasn't dropped,
+/// the next event will be accepted.
+///
+/// This differs from [Debounce] which also drops events,
floitsch 2016/01/29 19:05:03 from [Debounce] which restarts the timer after dro
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
+/// but which restarts the timer after dropped events too.
+class Throttle<S> implements StreamTransformer<S, S> {
+ final Duration _interval;
+
+ /// Drops events sooner than [interval] after emitted events.
+ Throttle(Duration interval) : _interval = interval;
+
+ Stream<S> bind(Stream<S> stream) async* {
+ var interval = Duration.ZERO; // Avoid dropping the first event.
+ var stopwatch = new Stopwatch()..start();
+ await for (var source in stream) {
+ if (stopwatch.elapsed >= interval) {
+ yield source;
+ stopwatch.reset();
+ interval = _interval;
+ }
+ }
+ stopwatch.stop();
+ }
+}
+
+/// Drops the last [count] events of a stream.
+///
+/// The resulting stream is [count] events behind the source stream.
+/// The first [count] events are buffered, and when further events arrive,
+/// they are buffered too, and the earlies events in the buffer are emitted.
+///
+/// The resulting stream ends [count] events before the source stream,
+/// effecively skipping the last [count] events.
+class SkipLast<S> implements StreamTransformer<S, S> {
nweiz 2016/01/29 22:19:45 What's the use case for this?
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Honestly don't know - found it in the reactive fra
Lasse Reichstein Nielsen 2016/02/25 16:18:10 But since that's my best argument, I'll drop it fo
+ /// Number of events dropped.
+ final int count;
nweiz 2016/01/29 22:19:45 Why is this public? It doesn't match the parameter
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Good point. I think I decided to make it public b
+
+ /// Skip the [count] last events.
+ SkipLast(this.count);
+
+ Stream<S> bind(Stream<S> stream) async* {
+ Queue queue = new ListQueue(count + 1);
nweiz 2016/01/29 22:19:45 Calling "new ListQueue()" explicitly reads very st
Lasse Reichstein Nielsen 2016/02/01 12:43:20 The Queue constructor doesn't have an "initial cap
+ await for (S source in stream) {
+ queue.add(source);
+ if (queue.length > count) {
+ yield queue.removeFirst();
+ }
+ }
+ }
+}
+
+
+/**
nweiz 2016/01/29 22:19:45 ///
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
+ * Groups events by a computed key.
+ *
+ * A key is extracted from incoming events, and a stream is created for each
+ * unique key. A value is computed from the event as well, and emitted on the
+ * corresponding stream.
+ *
+ * The returned stream emits a [Group] object for each distinct key seen
+ * by the transformation, and the values associated with the key are output
+ * on the [Group.values] stream.
+ */
+StreamTransformer<dynamic, Group> streamGroupBy(key(source), value(source)) =>
nweiz 2016/01/29 22:19:45 It seems weird that both these functions *and* the
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Until generic functions, that would prevent access
+ new GroupBy(key, value);
floitsch 2016/01/29 19:05:03 the resulting stream should have a "Future<Map> to
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Streams don't have that, and I *really* prefer not
+
+/// Scans a stream's events and combine a result from the available events.
+///
+/// Combines each event with the previous accumulator value
+/// and emits the result.
+///
+/// Errors in the source stream or when calling the combine function will
+/// be reported on the result stream, and will stop the process.
+StreamTransformer scanStream(initial, combine(value, source)) =>
+ new Scan(initial, combine);
+
+/// Drops any event that happens shortly after another undropped event.
+///
+/// If an event happens within a certain [Duration] of the most recently
+/// output event, the new event is dropped.
+/// When the duration has passed after an event that wasn't dropped,
+/// the next event will be accepted.
+///
+/// This differs from [Debounce] which also drops events,
+/// but which restarts the timer after dropped events too.
+StreamTransformer throttleStream(Duration duration) => new Throttle(duration);
+
+/// Drops any event that happens shortly after another event.
+///
+/// If an event happens within a certain [Duration] of the previous event,
+/// the new event is dropped, but the dropped event will still cause
+/// following events to be dropped until the entire duration has passed without
+/// any events.
+///
+/// This differs from [Throttle] which also drops events, but which only
+/// restarts its timer at events that aren't dropped.
+StreamTransformer debounceStream(Duration duration) => new Debounce(duration);
+
+/// Drops the last [count] events of a stream.
+StreamTransformer streamSkipLast(int count) => new SkipLast(count);
+
+/// Concatenates streams.
+///
+/// The events of each stream in [streams] are emitted, one stream at a time
+/// in iteration order. When the final stream in [streams] has been processed,
+/// the returned stream closes.
+Stream concatenateStreams(Iterable<Stream> streams) async* {
nweiz 2016/01/29 22:19:44 This and mergeStreams aren't actually transformers
Lasse Reichstein Nielsen 2016/02/25 16:18:10 I've remove the rest, so it's just concatenateStre
+ for (var stream in streams) {
+ yield* stream;
+ }
+}
+
+/// Returns a stream that emits the events of all the [streams].
+///
+/// The events from all the source streams are merged into one stream, with
+/// events from the same source stream occuring in their original order.
+Stream mergeStreams(Iterable<Stream> streams) {
nweiz 2016/01/29 22:19:45 Isn't this the same as StreamGroup.merge?
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Most likely. So, even if it's worth having as a t
nweiz 2016/02/02 02:29:13 I'd prefer not to add a top-level forwarder; I'm n
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Ok, I'll remove this one. I'll remove the functio
+ var controller;
+ int activeSubscriptions = 0;
+ var subscriptions = [];
+ controller = new StreamController(sync: true,
+ onListen: () {
+ // Cache extracted functions instead of extacting for each stream.
+ var add = controller.add;
+ var addError = controller.addError;
+ for (var stream in streams) {
+ subscriptions.add(stream.listen(add,
+ onError: addError,
+ onDone: () {
+ activeSubscriptions--;
+ if (activeSubscriptions == 0) {
+ controller.close();
+ }
+ }));
+ }
+ activeSubscriptions = subscriptions.length;
+ if (activeSubscriptions == 0) {
+ controller.close();
+ }
+ },
+ onPause: () {
+ for (var subscription in subscriptions) subscription.pause();
+ },
+ onResume: () {
+ for (var subscription in subscriptions) subscription.resume();
+ },
+ onCancel: () {
+ return Future.wait(subscriptions.map((subscription) {
+ var result = subscription.cancel();
+ if (result is Future) return result;
+ return new Future.value();
+ }));
+ });
+ return controller.stream;
+}

Powered by Google App Engine
This is Rietveld 408576698