Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(41)

Issue 1648963002: Add reactive-inspired stream transformers:

Created:
4 years, 10 months ago by Lasse Reichstein Nielsen
Modified:
4 years, 9 months ago
Reviewers:
nweiz, floitsch
CC:
reviews_dartlang.org
Base URL:
https://github.com/dart-lang/async@master
Target Ref:
refs/heads/master
Visibility:
Public.

Description

Add reactive-inspired stream transformers: - groupBy - scan - debounce - throttle and a function to merge separate streams which can be used to remerge streams after splitting them with groupBy.

Patch Set 1 #

Total comments: 84

Patch Set 2 : Restructure failes and add more tests. #

Total comments: 45
Unified diffs Side-by-side diffs Delta from patch set Stats (+893 lines, -2 lines) Patch
M CHANGELOG.md View 1 1 chunk +7 lines, -0 lines 0 comments Download
M lib/async.dart View 1 2 chunks +5 lines, -0 lines 0 comments Download
M lib/result.dart View 1 3 chunks +38 lines, -0 lines 0 comments Download
A lib/src/concatenate_streams.dart View 1 1 chunk +19 lines, -0 lines 5 comments Download
A lib/src/transformers/debounce.dart View 1 1 chunk +47 lines, -0 lines 10 comments Download
A lib/src/transformers/group_by.dart View 1 1 chunk +109 lines, -0 lines 15 comments Download
A lib/src/transformers/scan.dart View 1 1 chunk +39 lines, -0 lines 2 comments Download
A lib/src/transformers/throttle.dart View 1 1 chunk +44 lines, -0 lines 0 comments Download
M pubspec.yaml View 1 1 chunk +3 lines, -2 lines 0 comments Download
A test/debounce_test.dart View 1 1 chunk +71 lines, -0 lines 3 comments Download
A test/group_by_test.dart View 1 1 chunk +143 lines, -0 lines 2 comments Download
A test/scan_test.dart View 1 1 chunk +76 lines, -0 lines 6 comments Download
A test/stream_transformers_test.dart View 1 chunk +220 lines, -0 lines 2 comments Download
A test/throttle_test.dart View 1 1 chunk +72 lines, -0 lines 0 comments Download

Messages

Total messages: 12 (1 generated)
Lasse Reichstein Nielsen
groupBy is a common request.
4 years, 10 months ago (2016-01-29 15:46:27 UTC) #2
floitsch
LGTM. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers.dart File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers.dart#newcode71 lib/src/stream_transformers.dart:71: /// Scans a stream's events and combine a ...
4 years, 10 months ago (2016-01-29 19:05:03 UTC) #3
nweiz
https://codereview.chromium.org/1648963002/diff/1/lib/result.dart File lib/result.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/result.dart#newcode140 lib/result.dart:140: /// Notice that error objecs and stack traces rarely ...
4 years, 10 months ago (2016-01-29 22:19:46 UTC) #4
floitsch
On 2016/01/29 22:19:46, nweiz wrote: > https://codereview.chromium.org/1648963002/diff/1/lib/result.dart > File lib/result.dart (right): > > https://codereview.chromium.org/1648963002/diff/1/lib/result.dart#newcode140 > ...
4 years, 10 months ago (2016-02-01 12:13:27 UTC) #5
Lasse Reichstein Nielsen
https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers.dart File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers.dart#newcode1 lib/src/stream_transformers.dart:1: // Copyright (c) 2016, the Dart project authors. Please ...
4 years, 10 months ago (2016-02-01 12:43:21 UTC) #6
nweiz
https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers.dart File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers.dart#newcode49 lib/src/stream_transformers.dart:49: Stream<Group<K, V>> bind(Stream<S> stream) async* { On 2016/02/01 12:43:20, ...
4 years, 10 months ago (2016-02-02 02:29:13 UTC) #7
Lasse Reichstein Nielsen
PTAL https://codereview.chromium.org/1648963002/diff/1/lib/result.dart File lib/result.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/result.dart#newcode140 lib/result.dart:140: /// Notice that error objecs and stack traces ...
4 years, 10 months ago (2016-02-25 16:18:11 UTC) #8
floitsch
Still LGTM, but wait for Natalie. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/scan.dart File lib/src/transformers/scan.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/scan.dart#newcode24 lib/src/transformers/scan.dart:24: class Scan<S, A> ...
4 years, 10 months ago (2016-02-26 13:50:44 UTC) #9
nweiz
https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_streams.dart File lib/src/concatenate_streams.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_streams.dart#newcode14 lib/src/concatenate_streams.dart:14: /// not listened to at all. It may be ...
4 years, 9 months ago (2016-03-01 02:10:05 UTC) #10
Lasse Reichstein Nielsen
https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_streams.dart File lib/src/concatenate_streams.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_streams.dart#newcode14 lib/src/concatenate_streams.dart:14: /// not listened to at all. Good point. https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_streams.dart#newcode15 ...
4 years, 9 months ago (2016-03-01 16:51:17 UTC) #11
nweiz
4 years, 9 months ago (2016-03-08 23:57:34 UTC) #12
https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str...
File lib/src/concatenate_streams.dart (right):

https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str...
lib/src/concatenate_streams.dart:15: Stream concatenateStreams(Iterable<Stream>
streams) async* {
On 2016/03/01 16:51:16, Lasse Reichstein Nielsen wrote:
> This will forward both value and error events to the concatenated stream. A
> `yield*` forwards all events. That's why you can use
>   yield* new Future.error(error, stack).asStream();
> to emit an error in an async* function without breaking the stream.

Oh, interesting. I was under the impression that async* _guaranteed_ that only
one error would be emitted.

> Throttling and debouncing - I decided not to handle errors because I couldn't
> decide whether they should count towards the throttling or not . Forwarding
all
> errors is another choice, and I think you are right that it is better - the
user
> can always cancel on error if they want that.

https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr...
File lib/src/transformers/group_by.dart (right):

https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr...
lib/src/transformers/group_by.dart:14: class Group<K, V> {
On 2016/03/01 16:51:17, Lasse Reichstein Nielsen wrote:
> It's a long name for something fairly inconsequential.
> What this basically is is a Pair<K,Stream<V>>.
> 
> We generally don't use unnamed pairs because it's better (for some value of
> "better") to have properly named accessors instead of "first" and "second",
but
> it has the disadvantage that we have to name a pair-class that itself doesn't
> really correspond to any concept, it is just a pair. So, no obvious good name.
> 
> Using "GroupBy" as prefix does show the connection, but it's also redundant
when
> you use it as part of the GroupBy signature.
> 
> How about GroupValues or KeyGroup or ValuesByKey?
> 

I like "ValuesByKey".

https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr...
lib/src/transformers/group_by.dart:41: /// and close the streams for all groups.
On 2016/03/01 16:51:17, Lasse Reichstein Nielsen wrote:
> On 2016/03/01 02:10:04, nweiz wrote:
> > The cancel and pause behavior feels wrong. Semantically, the stream of
groups
> is
> > distinct from any group's stream, and as a user I wouldn't expect any
> > modification of it to affect the other streams—it would feel like
> > action-at-a-distance.
> 
> Agree. 
> Maybe a better behavior is to keep trucking on the source stream, but not
> creating any new groups - instead values that doesn't correspond to an
existing
> group are just discarded (which is effectively the same as creating a new
group
> and sending it on the cancelled stream, just without storing the events that
you
> don't need).
> 
> > Here's a more concrete example of where this could be an issue:
> > 
> > var groups = await stream.transform(new GroupBy(key: (value) =>
value.isEven))
> >     .take(2).toList();
> > var evens = groups[groups[0].key == true ? 0 : 1];
> > var odds = groups[groups[0].key == false ? 0 : 1];
> > 
> > On the surface, this looks like a reasonable way to split a stream into even
> and
> > odd components, but in practice it will stop emitting events as soon as at
> least
> > one even and one odd value are encountered in the original stream.
> 
> True. And it's code you *do* want to write because the toList will not work
> without the take(2) - the transformed stream will only output two events, but
it
> won't close until it has seen the last event.
> 
> > I'd make this behave like StreamSplitter, and only pause the underlying
stream
> > if *all* derived streams are paused, and same for canceling. I think that
best
> > represents what the user is likely to mean when they cancel individual
> streams.
> 
> I considered this for pause, but that doesn't work. You may have paused the
six
> groups you have seen so far, but the next event may create a new group, so
> pausing the source is overdoing it. If the group stream has been cancelled,
then
> you can start pausing when all value streams are paused - because you actually
> *know* all the streams then.

When I say "all derived streams", I'm including the Stream<Group> as well as all
the Stream<V>s. If *all* of them are paused or canceled, it's safe to pause the
source because no new events or groups will be processed.

https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr...
lib/src/transformers/group_by.dart:58: GroupBy({K key(S source), V value(S
source)})
On 2016/03/01 16:51:17, Lasse Reichstein Nielsen wrote:
> If you omit both functions, you will get streams of *equal* values, not
> identical. So, you can group equal events, keyed by the first one from each
> equivalence class.

That's true, but it's very rare in Dart to have objects that are == without
being functionally identical. I think it's fine to require users to explicitly
write "(key) => key" in that case to avoid making them write "key:" in all the
others.

Powered by Google App Engine
This is Rietveld 408576698