Chromium Code Reviews| Index: lib/src/stream_transformers.dart |
| diff --git a/lib/src/stream_transformers.dart b/lib/src/stream_transformers.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..f3eb4d9c97542f9b624fcbee0d728010ee141e11 |
| --- /dev/null |
| +++ b/lib/src/stream_transformers.dart |
| @@ -0,0 +1,282 @@ |
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
|
nweiz
2016/01/29 22:19:44
These transformers should all have explicit docume
Lasse Reichstein Nielsen
2016/02/01 12:43:20
True.
|
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import "dart:async"; |
| +import "dart:collection"; |
| + |
| +/// A group created by [GroupBy]. |
| +/// |
| +/// The stream created by [GroupBy] emits a [Group] for each distinct key |
| +/// it sees. |
| +/// This group contains the key itself, along with a stream of the values |
| +/// associated with that key. |
| +class Group<K, V> { |
| + final K key; |
|
nweiz
2016/01/29 22:19:45
It would be good to have some documentation for th
Lasse Reichstein Nielsen
2016/02/25 16:18:09
Done.
|
| + final Stream<V> values; |
| + Group(this.key, this.values); |
| +} |
| + |
| +/** |
| + * Groups events by a computed key. |
| + * |
| + * A key is extracted from incoming events, and a stream is created for each |
| + * unique key. A value is computed from the event as well, and emitted on the |
| + * corresponding stream. |
| + * |
| + * An error on the source stream, or when calling the `key` or `value` |
| + * functions, will emit the error on the returned stream and stop |
| + * processing, closing all the individual group streams. |
| + * |
| + * The returned stream emits a [Group] object for each distinct key seen |
| + * by the transformation, and the values associated with the key are output |
| + * on the [Group.values] stream. |
| + */ |
|
nweiz
2016/01/29 22:19:45
Nit: These should be "///" comments.
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
|
| +class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> { |
|
nweiz
2016/01/29 22:19:45
Personally, I'd put each of these into its own fil
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
|
| + final Function _key; |
| + final Function _value; |
| + |
| + /// Groups values returned by [element] by the key returned by [key]. |
|
nweiz
2016/01/29 22:19:45
"element" -> "value"
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
|
| + /// |
| + /// If [value] is omitted, it defaults to an identity function, which |
| + /// will only work if all the source events are instances of [V]. |
| + GroupBy(K key(S source), [V value(S source)]) |
|
nweiz
2016/01/29 22:19:44
Can we make [value] a named parameter? It makes th
Lasse Reichstein Nielsen
2016/02/01 12:43:20
We could probably make both named parameters.
(Ma
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
|
| + : _key = key, _value = value ?? _identity; |
|
nweiz
2016/01/29 22:19:45
Nit: put each of these on its own line (https://ww
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done. Grudgingly.
|
| + |
| + // Helper function. |
| + static _identity(x) => x; |
| + |
| + Stream<Group<K, V>> bind(Stream<S> stream) async* { |
|
nweiz
2016/01/29 22:19:44
Last I heard, "async*" was really slow on the VM.
Lasse Reichstein Nielsen
2016/02/01 12:43:20
They should fix that then.
We have this syntax bec
nweiz
2016/02/02 02:29:13
I agree for the most part (although in some situat
Lasse Reichstein Nielsen
2016/02/25 16:18:10
I'd prefer to land this as written, then do a comp
|
| + var controllers = new HashMap(); |
|
nweiz
2016/01/29 22:19:45
"{}" would be a lot more idiomatic, and would make
Lasse Reichstein Nielsen
2016/02/01 12:43:20
But I don't want to have a deterministic order (or
nweiz
2016/02/02 02:29:13
I wrote a lot of Ruby code in the days when hashes
Lasse Reichstein Nielsen
2016/02/25 16:18:10
In this case, we do leak the order by closing the
|
| + try { |
| + await for (S source in stream) { |
|
nweiz
2016/01/29 22:19:45
I'd use "var source" here and "var value" below.
Lasse Reichstein Nielsen
2016/02/25 16:18:10
For value it's actually deliberate (but for source
|
| + var key = _key(source); |
| + var controller = controllers[key]; |
| + if (controller == null) { |
| + controller = new StreamController<V>(sync: true); |
| + controllers[key] = controller; |
| + yield new Group<K, V>(key, controller.stream); |
| + } |
| + V value = _value(source); |
| + controller.add(value); |
| + } |
| + } finally { |
| + for (var controller in controllers.values) { |
| + controller.close(); |
| + } |
| + } |
| + } |
| +} |
| + |
| +/// Scans a stream's events and combine a result from the available events. |
|
floitsch
2016/01/29 19:05:03
combines.
But the sentence could be better. I'm n
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Reworded and example added.
|
| +/// |
| +/// Combines each event with the previous accumulator value |
| +/// and emits the result. |
| +/// |
| +/// Errors in the source stream or when calling the combine function will |
| +/// be reported on the result stream, and will stop the process. |
| +class Scan<S, A> implements StreamTransformer<S, A> { |
|
floitsch
2016/01/29 19:05:02
"Scan looks like a bad name to me".
nweiz
2016/01/29 22:19:45
How is this different than Stream.fold()?
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Stream.fold returns a single Future, this returns
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Name was taking from the reactive framework.
Alter
|
| + final A _initial; |
| + final Function _combine; |
| + |
| + /// Accumulates stream events using [combine] starting with [initial]. |
| + Scan(A initial, A combine(A accumulator, S source)) |
| + : _initial = initial, _combine = combine; |
| + |
| + Stream<A> bind(Stream<S> stream) async* { |
| + A accumulator = _initial; |
| + await for (S source in stream) { |
| + accumulator = _combine(accumulator, source); |
| + yield accumulator; |
| + } |
| + } |
| +} |
| + |
| +/// Drops any event that happens shortly after another event. |
| +/// |
| +/// If an event happens within a certain [Duration] of the previous event, |
| +/// the new event is dropped, but the dropped event will still cause |
| +/// following events to be dropped until the entire duration has passed without |
| +/// any events. |
| +/// |
| +/// This differs from [Throttle] which also drops events, but which only |
| +/// restarts its timer at events that aren't dropped. |
| +class Debounce<S> implements StreamTransformer<S, S> { |
| + final Duration _interval; |
| + |
| + /// Drops events sooner than [interval] after any other event. |
| + Debounce(Duration interval) : _interval = interval; |
|
nweiz
2016/01/29 22:19:45
I think this would be cleaner as "this._interval".
Lasse Reichstein Nielsen
2016/02/01 12:43:20
I will not make a public parameter of a public met
|
| + |
| + Stream<S> bind(Stream<S> stream) async* { |
| + var interval = Duration.ZERO; // Avoid dropping the first event. |
|
nweiz
2016/01/29 22:19:45
This seems a little too clever to me. What about:
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Wouldn't work - the comparison is in the wrong dir
nweiz
2016/02/02 02:29:13
Hmm, true.
|
| + var stopwatch = new Stopwatch()..start(); |
| + await for (var source in stream) { |
| + if (stopwatch.elapsed >= interval) { |
| + yield source; |
| + interval = _interval; |
| + } |
| + stopwatch.reset(); |
| + } |
| + stopwatch.stop(); |
| + } |
| +} |
| + |
| +/// Drops any event that happens shortly after another undropped event. |
| +/// |
| +/// If an event happens within a certain [Duration] of the most recently |
| +/// output event, the new event is dropped. |
| +/// When the duration has passed after an event that wasn't dropped, |
| +/// the next event will be accepted. |
| +/// |
| +/// This differs from [Debounce] which also drops events, |
|
floitsch
2016/01/29 19:05:03
from [Debounce] which restarts the timer after dro
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
|
| +/// but which restarts the timer after dropped events too. |
| +class Throttle<S> implements StreamTransformer<S, S> { |
| + final Duration _interval; |
| + |
| + /// Drops events sooner than [interval] after emitted events. |
| + Throttle(Duration interval) : _interval = interval; |
| + |
| + Stream<S> bind(Stream<S> stream) async* { |
| + var interval = Duration.ZERO; // Avoid dropping the first event. |
| + var stopwatch = new Stopwatch()..start(); |
| + await for (var source in stream) { |
| + if (stopwatch.elapsed >= interval) { |
| + yield source; |
| + stopwatch.reset(); |
| + interval = _interval; |
| + } |
| + } |
| + stopwatch.stop(); |
| + } |
| +} |
| + |
| +/// Drops the last [count] events of a stream. |
| +/// |
| +/// The resulting stream is [count] events behind the source stream. |
| +/// The first [count] events are buffered, and when further events arrive, |
| +/// they are buffered too, and the earlies events in the buffer are emitted. |
| +/// |
| +/// The resulting stream ends [count] events before the source stream, |
| +/// effecively skipping the last [count] events. |
| +class SkipLast<S> implements StreamTransformer<S, S> { |
|
nweiz
2016/01/29 22:19:45
What's the use case for this?
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Honestly don't know - found it in the reactive fra
Lasse Reichstein Nielsen
2016/02/25 16:18:10
But since that's my best argument, I'll drop it fo
|
| + /// Number of events dropped. |
| + final int count; |
|
nweiz
2016/01/29 22:19:45
Why is this public? It doesn't match the parameter
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Good point.
I think I decided to make it public b
|
| + |
| + /// Skip the [count] last events. |
| + SkipLast(this.count); |
| + |
| + Stream<S> bind(Stream<S> stream) async* { |
| + Queue queue = new ListQueue(count + 1); |
|
nweiz
2016/01/29 22:19:45
Calling "new ListQueue()" explicitly reads very st
Lasse Reichstein Nielsen
2016/02/01 12:43:20
The Queue constructor doesn't have an "initial cap
|
| + await for (S source in stream) { |
| + queue.add(source); |
| + if (queue.length > count) { |
| + yield queue.removeFirst(); |
| + } |
| + } |
| + } |
| +} |
| + |
| + |
| +/** |
|
nweiz
2016/01/29 22:19:45
///
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
|
| + * Groups events by a computed key. |
| + * |
| + * A key is extracted from incoming events, and a stream is created for each |
| + * unique key. A value is computed from the event as well, and emitted on the |
| + * corresponding stream. |
| + * |
| + * The returned stream emits a [Group] object for each distinct key seen |
| + * by the transformation, and the values associated with the key are output |
| + * on the [Group.values] stream. |
| + */ |
| +StreamTransformer<dynamic, Group> streamGroupBy(key(source), value(source)) => |
|
nweiz
2016/01/29 22:19:45
It seems weird that both these functions *and* the
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Until generic functions, that would prevent access
|
| + new GroupBy(key, value); |
|
floitsch
2016/01/29 19:05:03
the resulting stream should have a "Future<Map> to
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Streams don't have that, and I *really* prefer not
|
| + |
| +/// Scans a stream's events and combine a result from the available events. |
| +/// |
| +/// Combines each event with the previous accumulator value |
| +/// and emits the result. |
| +/// |
| +/// Errors in the source stream or when calling the combine function will |
| +/// be reported on the result stream, and will stop the process. |
| +StreamTransformer scanStream(initial, combine(value, source)) => |
| + new Scan(initial, combine); |
| + |
| +/// Drops any event that happens shortly after another undropped event. |
| +/// |
| +/// If an event happens within a certain [Duration] of the most recently |
| +/// output event, the new event is dropped. |
| +/// When the duration has passed after an event that wasn't dropped, |
| +/// the next event will be accepted. |
| +/// |
| +/// This differs from [Debounce] which also drops events, |
| +/// but which restarts the timer after dropped events too. |
| +StreamTransformer throttleStream(Duration duration) => new Throttle(duration); |
| + |
| +/// Drops any event that happens shortly after another event. |
| +/// |
| +/// If an event happens within a certain [Duration] of the previous event, |
| +/// the new event is dropped, but the dropped event will still cause |
| +/// following events to be dropped until the entire duration has passed without |
| +/// any events. |
| +/// |
| +/// This differs from [Throttle] which also drops events, but which only |
| +/// restarts its timer at events that aren't dropped. |
| +StreamTransformer debounceStream(Duration duration) => new Debounce(duration); |
| + |
| +/// Drops the last [count] events of a stream. |
| +StreamTransformer streamSkipLast(int count) => new SkipLast(count); |
| + |
| +/// Concatenates streams. |
| +/// |
| +/// The events of each stream in [streams] are emitted, one stream at a time |
| +/// in iteration order. When the final stream in [streams] has been processed, |
| +/// the returned stream closes. |
| +Stream concatenateStreams(Iterable<Stream> streams) async* { |
|
nweiz
2016/01/29 22:19:44
This and mergeStreams aren't actually transformers
Lasse Reichstein Nielsen
2016/02/25 16:18:10
I've remove the rest, so it's just concatenateStre
|
| + for (var stream in streams) { |
| + yield* stream; |
| + } |
| +} |
| + |
| +/// Returns a stream that emits the events of all the [streams]. |
| +/// |
| +/// The events from all the source streams are merged into one stream, with |
| +/// events from the same source stream occuring in their original order. |
| +Stream mergeStreams(Iterable<Stream> streams) { |
|
nweiz
2016/01/29 22:19:45
Isn't this the same as StreamGroup.merge?
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Most likely.
So, even if it's worth having as a t
nweiz
2016/02/02 02:29:13
I'd prefer not to add a top-level forwarder; I'm n
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Ok, I'll remove this one.
I'll remove the functio
|
| + var controller; |
| + int activeSubscriptions = 0; |
| + var subscriptions = []; |
| + controller = new StreamController(sync: true, |
| + onListen: () { |
| + // Cache extracted functions instead of extacting for each stream. |
| + var add = controller.add; |
| + var addError = controller.addError; |
| + for (var stream in streams) { |
| + subscriptions.add(stream.listen(add, |
| + onError: addError, |
| + onDone: () { |
| + activeSubscriptions--; |
| + if (activeSubscriptions == 0) { |
| + controller.close(); |
| + } |
| + })); |
| + } |
| + activeSubscriptions = subscriptions.length; |
| + if (activeSubscriptions == 0) { |
| + controller.close(); |
| + } |
| + }, |
| + onPause: () { |
| + for (var subscription in subscriptions) subscription.pause(); |
| + }, |
| + onResume: () { |
| + for (var subscription in subscriptions) subscription.resume(); |
| + }, |
| + onCancel: () { |
| + return Future.wait(subscriptions.map((subscription) { |
| + var result = subscription.cancel(); |
| + if (result is Future) return result; |
| + return new Future.value(); |
| + })); |
| + }); |
| + return controller.stream; |
| +} |