|
|
Chromium Code Reviews|
Created:
4 years, 10 months ago by Lasse Reichstein Nielsen Modified:
4 years, 9 months ago CC:
reviews_dartlang.org Base URL:
https://github.com/dart-lang/async@master Target Ref:
refs/heads/master Visibility:
Public. |
DescriptionAdd reactive-inspired stream transformers:
- groupBy
- scan
- debounce
- throttle
and a function to merge separate streams which can be used to remerge streams
after splitting them with groupBy.
Patch Set 1 #
Total comments: 84
Patch Set 2 : Restructure failes and add more tests. #
Total comments: 45
Messages
Total messages: 12 (1 generated)
lrn@google.com changed reviewers: + floitsch@google.com, nweiz@google.com
groupBy is a common request.
LGTM. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:71: /// Scans a stream's events and combine a result from the available events. combines. But the sentence could be better. I'm not sure what exactly it does (without having looked at the code). Seems to be: "Chains events of a stream by combining each element with the result of the previous one". In any case you really want to have examples (with or without code) in the doc. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:78: class Scan<S, A> implements StreamTransformer<S, A> { "Scan looks like a bad name to me". https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:131: /// This differs from [Debounce] which also drops events, from [Debounce] which restarts the timer after dropped events too. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:192: new GroupBy(key, value); the resulting stream should have a "Future<Map> toMap()". https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... File test/stream_transformers_test.dart (right): https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:79: test("empty", () async { group + test should give a sentence. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:91: repeatStream(5, "x").transform(new GroupBy<int, String>((x) => x.length)); long line.
https://codereview.chromium.org/1648963002/diff/1/lib/result.dart File lib/result.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/result.dart#newcode140 lib/result.dart:140: /// Notice that error objecs and stack traces rarely override equality. "objecs" -> "objects" https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:1: // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file These transformers should all have explicit documentation of their behavior upon encountering errors. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:15: final K key; It would be good to have some documentation for these fields, even if it's just cursory. It makes the dartdoc look a lot better. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:34: */ Nit: These should be "///" comments. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:35: class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> { Personally, I'd put each of these into its own file (probably under lib/src/stream_transformers/). I'd rather have many small, focused file than one with a bunch of different stuff in it. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:39: /// Groups values returned by [element] by the key returned by [key]. "element" -> "value" https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:43: GroupBy(K key(S source), [V value(S source)]) Can we make [value] a named parameter? It makes the API more flexible for the future, it's similar to [Map.fromIterable], and it's easier to understand multiple anonymous functions if one of them has a parameter name. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:44: : _key = key, _value = value ?? _identity; Nit: put each of these on its own line (https://www.dartlang.org/effective-dart/style/#do-format-constructor-initiali...). https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:49: Stream<Group<K, V>> bind(Stream<S> stream) async* { Last I heard, "async*" was really slow on the VM. Normally I'd be fine with it because it's so much cleaner, but it worries me a little in a low-level utility library. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:50: var controllers = new HashMap(); "{}" would be a lot more idiomatic, and would make the dispatching of onDone events have a deterministic order. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:52: await for (S source in stream) { I'd use "var source" here and "var value" below. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:78: class Scan<S, A> implements StreamTransformer<S, A> { How is this different than Stream.fold()? https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:108: Debounce(Duration interval) : _interval = interval; I think this would be cleaner as "this._interval". https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:111: var interval = Duration.ZERO; // Avoid dropping the first event. This seems a little too clever to me. What about: // Don't start yet to avoid dropping the first event. var stopwatch = new Stopwatch(); await for (var source in stream) { if (stopwatch.elapsed >= _interval) yield source; stopwatch.reset(); stopwatch.start(); } stopwatch.stop(); Same thing below. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:161: class SkipLast<S> implements StreamTransformer<S, S> { What's the use case for this? https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:163: final int count; Why is this public? It doesn't match the parameters passed to other transformers. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:169: Queue queue = new ListQueue(count + 1); Calling "new ListQueue()" explicitly reads very strange to me. I wouldn't construct a specific implementation unless it would substantially improve performance. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:180: /** /// https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:191: StreamTransformer<dynamic, Group> streamGroupBy(key(source), value(source)) => It seems weird that both these functions *and* the StreamTransformer classes are public. I'd just choose one (probably the functions). Also, I'd call this "groupStreamBy" so that it reads a little more like prose. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:234: Stream concatenateStreams(Iterable<Stream> streams) async* { This and mergeStreams aren't actually transformers, so they should probably go in a different file. I think it's appropriate to put them in the top-level lib/async.dart, but lib/src/stream_functions.dart would work as well. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:244: Stream mergeStreams(Iterable<Stream> streams) { Isn't this the same as StreamGroup.merge? https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... File test/stream_transformers_test.dart (right): https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:11: var strings = const [ I find it a lot easier to read tests when they include all their data inline or in setUp/tearDown, rather than using externally-defined data that I need to look up. It does involve more duplication, but it makes it easier to look at the test and figure out what's going on, or to change its data later without affecting other tests. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:38: var stack = StackTrace.current; It would be nice to avoid the tests depending on 1.14, at least until it gets a stable release. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:64: group("groupBy", () { Test the value() function as well. Test what happens when key() or value() throws an error. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:83: await for (Group<int,String> group in grouped) { It would be cleaner to just write expect(grouped.toList(), completion(isEmpty)). https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:112: } It seems strange, for this and other transformers, that any error stops the stream dead. That's not consistent with how the dart:async transformers work, and it's less flexible than the alternative—you could write a "close after error" transformer, but not a "continue after error" one. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:154: Extra newline. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:156: const ms = const Duration(milliseconds: 1); Making this a method-level const declaration reads very strange to me. Personally, I'd just manually write out "new Duration()" below, but if not I think it would be clearer to move this to the top level. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:158: group("unbounce", () { This and throttle need more tests. They should test error behavior, that the right events got dropped, and that events are *not* dropped if there's enough time between them. You can use the clock and fake_async packages to mock out the Stopwatch.
On 2016/01/29 22:19:46, nweiz wrote: > https://codereview.chromium.org/1648963002/diff/1/lib/result.dart > File lib/result.dart (right): > > https://codereview.chromium.org/1648963002/diff/1/lib/result.dart#newcode140 > lib/result.dart:140: /// Notice that error objecs and stack traces rarely > override equality. > "objecs" -> "objects" > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > File lib/src/stream_transformers.dart (right): > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:1: // Copyright (c) 2016, the Dart project > authors. Please see the AUTHORS file > These transformers should all have explicit documentation of their behavior upon > encountering errors. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:15: final K key; > It would be good to have some documentation for these fields, even if it's just > cursory. It makes the dartdoc look a lot better. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:34: */ > Nit: These should be "///" comments. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:35: class GroupBy<S, K, V> implements > StreamTransformer<S, Group<K, V>> { > Personally, I'd put each of these into its own file (probably under > lib/src/stream_transformers/). I'd rather have many small, focused file than one > with a bunch of different stuff in it. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:39: /// Groups values returned by [element] by > the key returned by [key]. > "element" -> "value" > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:43: GroupBy(K key(S source), [V value(S > source)]) > Can we make [value] a named parameter? It makes the API more flexible for the > future, it's similar to [Map.fromIterable], and it's easier to understand > multiple anonymous functions if one of them has a parameter name. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:44: : _key = key, _value = value ?? _identity; > Nit: put each of these on its own line > (https://www.dartlang.org/effective-dart/style/#do-format-constructor-initiali...). > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:49: Stream<Group<K, V>> bind(Stream<S> stream) > async* { > Last I heard, "async*" was really slow on the VM. Normally I'd be fine with it > because it's so much cleaner, but it worries me a little in a low-level utility > library. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:50: var controllers = new HashMap(); > "{}" would be a lot more idiomatic, and would make the dispatching of onDone > events have a deterministic order. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:52: await for (S source in stream) { > I'd use "var source" here and "var value" below. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:78: class Scan<S, A> implements > StreamTransformer<S, A> { > How is this different than Stream.fold()? > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:108: Debounce(Duration interval) : _interval = > interval; > I think this would be cleaner as "this._interval". > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:111: var interval = Duration.ZERO; // Avoid > dropping the first event. > This seems a little too clever to me. What about: > > // Don't start yet to avoid dropping the first event. > var stopwatch = new Stopwatch(); > await for (var source in stream) { > if (stopwatch.elapsed >= _interval) yield source; > stopwatch.reset(); > stopwatch.start(); > } > stopwatch.stop(); > > Same thing below. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:161: class SkipLast<S> implements > StreamTransformer<S, S> { > What's the use case for this? > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:163: final int count; > Why is this public? It doesn't match the parameters passed to other > transformers. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:169: Queue queue = new ListQueue(count + 1); > Calling "new ListQueue()" explicitly reads very strange to me. I wouldn't > construct a specific implementation unless it would substantially improve > performance. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:180: /** > /// > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:191: StreamTransformer<dynamic, Group> > streamGroupBy(key(source), value(source)) => > It seems weird that both these functions *and* the StreamTransformer classes are > public. I'd just choose one (probably the functions). > > Also, I'd call this "groupStreamBy" so that it reads a little more like prose. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:234: Stream concatenateStreams(Iterable<Stream> > streams) async* { > This and mergeStreams aren't actually transformers, so they should probably go > in a different file. I think it's appropriate to put them in the top-level > lib/async.dart, but lib/src/stream_functions.dart would work as well. > > https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... > lib/src/stream_transformers.dart:244: Stream mergeStreams(Iterable<Stream> > streams) { > Isn't this the same as StreamGroup.merge? > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > File test/stream_transformers_test.dart (right): > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:11: var strings = const [ > I find it a lot easier to read tests when they include all their data inline or > in setUp/tearDown, rather than using externally-defined data that I need to look > up. It does involve more duplication, but it makes it easier to look at the test > and figure out what's going on, or to change its data later without affecting > other tests. > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:38: var stack = StackTrace.current; > It would be nice to avoid the tests depending on 1.14, at least until it gets a > stable release. > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:64: group("groupBy", () { > Test the value() function as well. Test what happens when key() or value() > throws an error. > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:83: await for (Group<int,String> group in > grouped) { > It would be cleaner to just write expect(grouped.toList(), completion(isEmpty)). > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:112: } > It seems strange, for this and other transformers, that any error stops the > stream dead. That's not consistent with how the dart:async transformers work, > and it's less flexible than the alternative—you could write a "close after > error" transformer, but not a "continue after error" one. > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:154: > Extra newline. > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:156: const ms = const Duration(milliseconds: > 1); > Making this a method-level const declaration reads very strange to me. > > Personally, I'd just manually write out "new Duration()" below, but if not I > think it would be clearer to move this to the top level. > > https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... > test/stream_transformers_test.dart:158: group("unbounce", () { > This and throttle need more tests. They should test error behavior, that the > right events got dropped, and that events are *not* dropped if there's enough > time between them. > > You can use the clock and fake_async packages to mock out the Stopwatch. Thanks for the thorough review!
https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:1: // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file True. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:43: GroupBy(K key(S source), [V value(S source)]) On 2016/01/29 22:19:44, nweiz wrote: > Can we make [value] a named parameter? It makes the API more flexible for the future, We could probably make both named parameters. (Making things "flexible for the future" by making all optional parameters named is ... not something I favor. It's a fault in the language that you can't have both kinds of optional parameters on the same function, and making something that should be positional be named is not picking the best possible API for the actual job). > it's similar to [Map.fromIterable], and it's easier to understand > multiple anonymous functions if one of them has a parameter name. Those are good points, though. I think making both named is the best approach. It even makes a perverse kind of sense to use the identity function for both parameters - grouping equal objects in groups represented by their first occurrence. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:49: Stream<Group<K, V>> bind(Stream<S> stream) async* { They should fix that then. We have this syntax because it makes it *much* easier to write correct stream code that handles pause and cancel properly. We should ensure it's also fast. If it isn't, the bug *is* in the compiler. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:50: var controllers = new HashMap(); But I don't want to have a deterministic order (or the overhead of a linked hash map). The HashMap is the exact implementation that does what I need and want, without adding any unused features. A deterministic order just makes people depend on it. I could use a linked hash map, but then I'd shuffle the values before using them :) https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:108: Debounce(Duration interval) : _interval = interval; I will not make a public parameter of a public method (or constructor) have a private name. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:111: var interval = Duration.ZERO; // Avoid dropping the first event. Wouldn't work - the comparison is in the wrong direction. This will always discard the first event. I prefer the current approach to a "bool isFirst = true;" flag. (I really, really hate those - it's a sign that your programming language isn't expressive enough to handle a fencepost problem). https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:161: class SkipLast<S> implements StreamTransformer<S, S> { Honestly don't know - found it in the reactive framework and thought it sounded simple to do, so let's do it. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:163: final int count; Good point. I think I decided to make it public because it means you can inspect the transformer and see if it can be reused. That's probably not going to happen, and if it does, it makes sense for the durations above too. So, let's make it private. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:169: Queue queue = new ListQueue(count + 1); The Queue constructor doesn't have an "initial capacity" parameter, so I need to use the ListQueue constructor directly. (Or not bother, which is obviously an option). https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:191: StreamTransformer<dynamic, Group> streamGroupBy(key(source), value(source)) => On 2016/01/29 22:19:45, nweiz wrote: > It seems weird that both these functions *and* the StreamTransformer classes are > public. I'd just choose one (probably the functions). Until generic functions, that would prevent access to the the type parameters - but I guess that's worth waiting for. > Also, I'd call this "groupStreamBy" so that it reads a little more like prose. Good point. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:192: new GroupBy(key, value); Streams don't have that, and I *really* prefer not extending the stream interface. I'd rather create a "collector" transformer to be the dual of the groupBy. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:244: Stream mergeStreams(Iterable<Stream> streams) { Most likely. So, even if it's worth having as a top-level function (if you don't have a StreamGroup, but just a bunch of streams, why would you look at StreamGroup for the functionality), then it should at least reuse the implementation. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... File test/stream_transformers_test.dart (right): https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:112: } It is a problem that our async*/await for primitives can't handle errors in the source. However, that does mean that "multiple errors on the same stream" is really not a thing any more - new code (using await for) won't be able to handle it anyway. So, when writing new stream functionality, I think it's acceptable to die on the first error (as long as you still do that properly). https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:156: const ms = const Duration(milliseconds: 1); Is it because it's const, because it could just as well be "var/new" instead? https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:158: group("unbounce", () { The clock and fake_async packages require you to use other methods than "new Timer"/"new Stopwatch", so that you can mock them for testing, right? It does make sense, even if I am loathe to change code for testing - it seems somehow dishonest to change the code that you are testing to match the test, it should be the other way around.
https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:49: Stream<Group<K, V>> bind(Stream<S> stream) async* { On 2016/02/01 12:43:20, Lasse Reichstein Nielsen wrote: > They should fix that then. > We have this syntax because it makes it *much* easier to write correct stream > code that handles pause and cancel properly. > We should ensure it's also fast. If it isn't, the bug *is* in the compiler. I agree for the most part (although in some situations they make it harder to work with errors). But as far as I know, there's currently no momentum towards fixing it, and it's not a major request from external users so it's not likely to get on the VM team's radar in the near future. To some extent we have to make do with the implementation we have, not the implementation we should have. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:50: var controllers = new HashMap(); On 2016/02/01 12:43:20, Lasse Reichstein Nielsen wrote: > But I don't want to have a deterministic order (or the overhead of a linked hash > map). The HashMap is the exact implementation that does what I need and want, > without adding any unused features. > > A deterministic order just makes people depend on it. I could use a linked hash > map, but then I'd shuffle the values before using them :) I wrote a lot of Ruby code in the days when hashes were unordered by default, and in my experience it was more common that people (including me!) didn't realize the order wasn't deterministic and wrote tests towards whatever order was coincidentally consistent in that version of the code. Then when some unrelated piece of code changed later, it caused mysterious test failures elsewhere. For async stuff in particular, there's already so much nondeterminism that gives rise to so many difficult-to-track-down, difficult-to-reproduce bugs, it seems unkind to intentionally inject more. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:111: var interval = Duration.ZERO; // Avoid dropping the first event. On 2016/02/01 12:43:20, Lasse Reichstein Nielsen wrote: > Wouldn't work - the comparison is in the wrong direction. > This will always discard the first event. Hmm, true. > I prefer the current approach to a "bool isFirst = true;" flag. > (I really, really hate those - it's a sign that your programming language isn't > expressive enough to handle a fencepost problem). I'm not sure I've ever seen a language that was expressive enough to handle it in full generality with anything I'd call grace. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:244: Stream mergeStreams(Iterable<Stream> streams) { On 2016/02/01 12:43:20, Lasse Reichstein Nielsen wrote: > Most likely. > > So, even if it's worth having as a top-level function (if you don't have a > StreamGroup, but just a bunch of streams, why would you look at StreamGroup for > the functionality), then it should at least reuse the implementation. I'd prefer not to add a top-level forwarder; I'm not a fan of providing multiple ways to do the same thing that users or style guides have to choose between. If you do, though, we should find a way to do the same thing for other static functions like StreamSplitter.split and StreamCompleter.fromFuture, for completeness. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... File test/stream_transformers_test.dart (right): https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:112: } On 2016/02/01 12:43:21, Lasse Reichstein Nielsen wrote: > It is a problem that our async*/await for primitives can't handle errors in the > source. However, that does mean that "multiple errors on the same stream" is > really not a thing any more - new code (using await for) won't be able to handle > it anyway. > > So, when writing new stream functionality, I think it's acceptable to die on the > first error (as long as you still do that properly). I'm pretty skeptical of this. There are still plenty of streams that meaningfully emit multiple errors. Often those exposed by classes representing long-running processes do so, for example. Making a bunch of transformers with inconsistent behavior around those errors will make it much harder to work with those streams, because some (potentially unknown) subset of transformers will be unusable, but won't obviously fail to work at first glance. If we were comfortable altering the core library to forbid any stream to emit multiple events, I'd be okay with this. But given that that's off the table, I think we have to make new transformers—at least ones as general-purpose as these—support the existing model. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:156: const ms = const Duration(milliseconds: 1); On 2016/02/01 12:43:20, Lasse Reichstein Nielsen wrote: > Is it because it's const, because it could just as well be "var/new" instead? Const makes it weirder, but it would still look weird as var/new too. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:158: group("unbounce", () { On 2016/02/01 12:43:20, Lasse Reichstein Nielsen wrote: > The clock and fake_async packages require you to use other methods than "new > Timer"/"new Stopwatch", so that you can mock them for testing, right? > > It does make sense, even if I am loathe to change code for testing - it seems > somehow dishonest to change the code that you are testing to match the test, it > should be the other way around. I suppose ideally the Zone API would mock them out itself. I'm generally not a big fan of going to great lengths to mock stuff out, but timing stuff is such a source of flakiness when it's unmocked and is so easy when it is that I make an exception for it.
PTAL https://codereview.chromium.org/1648963002/diff/1/lib/result.dart File lib/result.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/result.dart#newcode140 lib/result.dart:140: /// Notice that error objecs and stack traces rarely override equality. On 2016/01/29 22:19:44, nweiz wrote: > "objecs" -> "objects" Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... File lib/src/stream_transformers.dart (right): https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:15: final K key; Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:34: */ On 2016/01/29 22:19:45, nweiz wrote: > Nit: These should be "///" comments. Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:35: class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> { On 2016/01/29 22:19:45, nweiz wrote: > Personally, I'd put each of these into its own file (probably under > lib/src/stream_transformers/). I'd rather have many small, focused file than one > with a bunch of different stuff in it. Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:39: /// Groups values returned by [element] by the key returned by [key]. On 2016/01/29 22:19:45, nweiz wrote: > "element" -> "value" Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:43: GroupBy(K key(S source), [V value(S source)]) On 2016/02/01 12:43:20, Lasse Reichstein Nielsen wrote: > On 2016/01/29 22:19:44, nweiz wrote: > > Can we make [value] a named parameter? It makes the API more flexible for the > future, > > We could probably make both named parameters. > > (Making things "flexible for the future" by making all optional parameters named > is ... not something I favor. It's a fault in the language that you can't have > both kinds of optional parameters on the same function, and making something > that should be positional be named is not picking the best possible API for the > actual job). > > > it's similar to [Map.fromIterable], and it's easier to understand > > multiple anonymous functions if one of them has a parameter name. > > Those are good points, though. > I think making both named is the best approach. It even makes a perverse kind of > sense to use the identity function for both parameters - grouping equal objects > in groups represented by their first occurrence. Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:44: : _key = key, _value = value ?? _identity; Done. Grudgingly. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:49: Stream<Group<K, V>> bind(Stream<S> stream) async* { I'd prefer to land this as written, then do a comparison and see if it's much slower than the old-style version. If it is, we have a good case for bug-reporting the VM performance. Changing it proactively means we won't have that. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:50: var controllers = new HashMap(); In this case, we do leak the order by closing the constructors in iteration order, so that argument has merit. I still think relying on ordering of independent async events is going to bite you either way, so biting you early is arguably better. This is exactly a case that HashMap exists for - a mapping where ordering isn't important. I think I'll retain it. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:52: await for (S source in stream) { For value it's actually deliberate (but for source it's not necessary). We don't enforce that _key or _value return the correct type (they could be returning dynamic, and will be if the parameter was omitted), so this is a fail-early way to catch incorrectly typed results in checked mode. That's also why I introduce a variable for `value` - a type error on that line very clearly describes the problem. Now that `key` can be omitted too, I'll add `K` to the `key` variable too. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:71: /// Scans a stream's events and combine a result from the available events. Reworded and example added. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:78: class Scan<S, A> implements StreamTransformer<S, A> { Stream.fold returns a single Future, this returns a Stream of the accumulator values computed along the way. The relation is that Stream.fold(v, combine) is the same as Stream.transform(new Scan(v, combine)).last https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:78: class Scan<S, A> implements StreamTransformer<S, A> { Name was taking from the reactive framework. Alternatives appreciated. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:131: /// This differs from [Debounce] which also drops events, On 2016/01/29 19:05:03, floitsch wrote: > from [Debounce] which restarts the timer after dropped events too. Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:161: class SkipLast<S> implements StreamTransformer<S, S> { But since that's my best argument, I'll drop it for now and see if anybody actually requests it. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:180: /** On 2016/01/29 22:19:45, nweiz wrote: > /// Done. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:234: Stream concatenateStreams(Iterable<Stream> streams) async* { I've remove the rest, so it's just concatenateStreams now. I'll just make a single file for that. https://codereview.chromium.org/1648963002/diff/1/lib/src/stream_transformers... lib/src/stream_transformers.dart:244: Stream mergeStreams(Iterable<Stream> streams) { Ok, I'll remove this one. I'll remove the functions above too, except for concatenateStreams. Let's keep just the classes. (To be honest, I think the StreamTransformer interface was a mistake - it only has a single function (there's a style guide rule against that), and transformers should be reusable so they won't have any dynamic state anyway - but we have it now, so let's embrace it and work towards making "new" optional to reduce the clutter that way :) https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... File test/stream_transformers_test.dart (right): https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:11: var strings = const [ On 2016/01/29 22:19:45, nweiz wrote: > I find it a lot easier to read tests when they include all their data inline or > in setUp/tearDown, rather than using externally-defined data that I need to look > up. It does involve more duplication, but it makes it easier to look at the test > and figure out what's going on, or to change its data later without affecting > other tests. Done. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:38: var stack = StackTrace.current; I believe 1.14 has a stable release now. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:64: group("groupBy", () { On 2016/01/29 22:19:45, nweiz wrote: > Test the value() function as well. Test what happens when key() or value() > throws an error. Done. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:79: test("empty", () async { No groups any longer. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:83: await for (Group<int,String> group in grouped) { On 2016/01/29 22:19:45, nweiz wrote: > It would be cleaner to just write expect(grouped.toList(), completion(isEmpty)). Done. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:91: repeatStream(5, "x").transform(new GroupBy<int, String>((x) => x.length)); On 2016/01/29 19:05:03, floitsch wrote: > long line. Done. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:112: } Reasonable. I'll try rewriting it to accept errors and forward all source and key errors to the main stream, and errors from calling the value function to the key-group stream. That also means not using async*. (And actually having to handle, and think about, pause and cancel explicitly). https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:154: Rewritten. https://codereview.chromium.org/1648963002/diff/1/test/stream_transformers_te... test/stream_transformers_test.dart:158: group("unbounce", () { Ideally you could replace the core library with something that changed some of the classes. That's a better level of abstraction than making every static function or class interceptable using the already heavily overloaded zones. First class libraries are a hard sell, though.
Still LGTM, but wait for Natalie. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/sc... File lib/src/transformers/scan.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/sc... lib/src/transformers/scan.dart:24: class Scan<S, A> implements StreamTransformer<S, A> { I'm still not convinced that "Scan" is a good name. MapFold ?
https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... File lib/src/concatenate_streams.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... lib/src/concatenate_streams.dart:14: /// not listened to at all. It may be worth mentioning that an iterator may be active on [streams] for the duration of the steam's life. The caller may need to know to avoid modifying the underlying source of the iterable for that time. https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... lib/src/concatenate_streams.dart:15: Stream concatenateStreams(Iterable<Stream> streams) async* { I can buy having fatal errors in scan, because there's no way to accumulate an error (and it matches fold), but it *really* feels like concatenated streams should concatenate errors as well. I'm pretty sure throttling and debouncing should also forward errors as-is, since they're purely about choosing whether to forward events. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... File lib/src/transformers/debounce.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:6: import "throttle.dart"; Nit: newline between these two imports. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:12: /// The dropped event will still cause following events to be dropped Nit: "will still cause" -> "still causes" https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:27: /// [Stopwatch] implementation, e.g., for testing Nit: "testing" -> "testing." https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:28: /// It defaults to using [StopWatch]. Nit: "StopWatch" -> "Stopwatch" https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:29: Debounce(Duration interval, {Stopwatch createStopwatch()}) Up to you, but I'd just use the clock package here. It's pretty standard, and it's a good show of support for community packages to have Dart team packages use them. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... File lib/src/transformers/group_by.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:14: class Group<K, V> { What do you think of calling this "GroupByGroup"? "Group" on its own is a pretty broad term, and this is a pretty widely-used library, so it's likely to cause conflicts. I think it may actually cause some internal libraries in test to break, in fact. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:41: /// and close the streams for all groups. The cancel and pause behavior feels wrong. Semantically, the stream of groups is distinct from any group's stream, and as a user I wouldn't expect any modification of it to affect the other streams—it would feel like action-at-a-distance. Here's a more concrete example of where this could be an issue: var groups = await stream.transform(new GroupBy(key: (value) => value.isEven)) .take(2).toList(); var evens = groups[groups[0].key == true ? 0 : 1]; var odds = groups[groups[0].key == false ? 0 : 1]; On the surface, this looks like a reasonable way to split a stream into even and odd components, but in practice it will stop emitting events as soon as at least one even and one odd value are encountered in the original stream. I'd make this behave like StreamSplitter, and only pause the underlying stream if *all* derived streams are paused, and same for canceling. I think that best represents what the user is likely to mean when they cancel individual streams. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:46: /// Pausing or canceling the a group stream has no effect other than "the a" -> "a" https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:58: GroupBy({K key(S source), V value(S source)}) It seems very unlikely that anyone would want to omit the key and get a bunch of streams of identical events. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:82: var groupController = groupControllers[key]; putIfAbsent? https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:108: } Nit: putting some newlines in here would help me read it. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/sc... File lib/src/transformers/scan.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/sc... lib/src/transformers/scan.dart:24: class Scan<S, A> implements StreamTransformer<S, A> { On 2016/02/26 13:50:44, floitsch wrote: > I'm still not convinced that "Scan" is a good name. > MapFold ? I think matching Reactive here is worth the odd name. There was a mailing list post about this just today: people are coming from other languages looking specifically for reactive stuff using reactive terminology. https://codereview.chromium.org/1648963002/diff/20001/test/debounce_test.dart File test/debounce_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/debounce_test.dart... test/debounce_test.dart:5: // Test stream transformers. Nit: I don't think this is necessary. https://codereview.chromium.org/1648963002/diff/20001/test/debounce_test.dart... test/debounce_test.dart:68: Result r(v) => new Result.value(v); Defining an anonymous function for this only to use it once seems like overkill. https://codereview.chromium.org/1648963002/diff/20001/test/group_by_test.dart File test/group_by_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/group_by_test.dart... test/group_by_test.dart:143: } Test pausing and canceling behavior as well. https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart File test/scan_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart#new... test/scan_test.dart:18: test("subtraction", () { Given this, I think testing addition is probably redundant. https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart#new... test/scan_test.dart:48: test("types 3", () { Explain what these tests are actually asserting. https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart#new... test/scan_test.dart:61: yield* () async* { throw "BAD"; }(); // Stream with error. I think it's cleaner to yield Future.error. https://codereview.chromium.org/1648963002/diff/20001/test/stream_transformer... File test/stream_transformers_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/stream_transformer... test/stream_transformers_test.dart:1: // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file Shouldn't these tests be deleted, since the individual classes each have their own file now?
https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... File lib/src/concatenate_streams.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... lib/src/concatenate_streams.dart:14: /// not listened to at all. Good point. https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... lib/src/concatenate_streams.dart:15: Stream concatenateStreams(Iterable<Stream> streams) async* { This will forward both value and error events to the concatenated stream. A `yield*` forwards all events. That's why you can use yield* new Future.error(error, stack).asStream(); to emit an error in an async* function without breaking the stream. Throttling and debouncing - I decided not to handle errors because I couldn't decide whether they should count towards the throttling or not . Forwarding all errors is another choice, and I think you are right that it is better - the user can always cancel on error if they want that. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... File lib/src/transformers/debounce.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:6: import "throttle.dart"; On 2016/03/01 02:10:04, nweiz wrote: > Nit: newline between these two imports. Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:12: /// The dropped event will still cause following events to be dropped On 2016/03/01 02:10:04, nweiz wrote: > Nit: "will still cause" -> "still causes" Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:27: /// [Stopwatch] implementation, e.g., for testing On 2016/03/01 02:10:03, nweiz wrote: > Nit: "testing" -> "testing." Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:28: /// It defaults to using [StopWatch]. On 2016/03/01 02:10:04, nweiz wrote: > Nit: "StopWatch" -> "Stopwatch" Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/de... lib/src/transformers/debounce.dart:29: Debounce(Duration interval, {Stopwatch createStopwatch()}) I don't really like introducing an abstraction layer that is only used for testing. It's a a kind of implicit dependency injection. This is just being explicit about it. I also try to not let packages corresponding to dart: libraries depend on any other packages - except for testing. If you just use the package, you don't have any other dependencies. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... File lib/src/transformers/group_by.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:14: class Group<K, V> { It's a long name for something fairly inconsequential. What this basically is is a Pair<K,Stream<V>>. We generally don't use unnamed pairs because it's better (for some value of "better") to have properly named accessors instead of "first" and "second", but it has the disadvantage that we have to name a pair-class that itself doesn't really correspond to any concept, it is just a pair. So, no obvious good name. Using "GroupBy" as prefix does show the connection, but it's also redundant when you use it as part of the GroupBy signature. How about GroupValues or KeyGroup or ValuesByKey? https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:41: /// and close the streams for all groups. On 2016/03/01 02:10:04, nweiz wrote: > The cancel and pause behavior feels wrong. Semantically, the stream of groups is > distinct from any group's stream, and as a user I wouldn't expect any > modification of it to affect the other streams—it would feel like > action-at-a-distance. Agree. Maybe a better behavior is to keep trucking on the source stream, but not creating any new groups - instead values that doesn't correspond to an existing group are just discarded (which is effectively the same as creating a new group and sending it on the cancelled stream, just without storing the events that you don't need). > Here's a more concrete example of where this could be an issue: > > var groups = await stream.transform(new GroupBy(key: (value) => value.isEven)) > .take(2).toList(); > var evens = groups[groups[0].key == true ? 0 : 1]; > var odds = groups[groups[0].key == false ? 0 : 1]; > > On the surface, this looks like a reasonable way to split a stream into even and > odd components, but in practice it will stop emitting events as soon as at least > one even and one odd value are encountered in the original stream. True. And it's code you *do* want to write because the toList will not work without the take(2) - the transformed stream will only output two events, but it won't close until it has seen the last event. > I'd make this behave like StreamSplitter, and only pause the underlying stream > if *all* derived streams are paused, and same for canceling. I think that best > represents what the user is likely to mean when they cancel individual streams. I considered this for pause, but that doesn't work. You may have paused the six groups you have seen so far, but the next event may create a new group, so pausing the source is overdoing it. If the group stream has been cancelled, then you can start pausing when all value streams are paused - because you actually *know* all the streams then. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:46: /// Pausing or canceling the a group stream has no effect other than On 2016/03/01 02:10:04, nweiz wrote: > "the a" -> "a" Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:58: GroupBy({K key(S source), V value(S source)}) If you omit both functions, you will get streams of *equal* values, not identical. So, you can group equal events, keyed by the first one from each equivalence class. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:82: var groupController = groupControllers[key]; I also need to do controller.add for a new value, but not for an existing one. That would mean putting the controler.add inside the "ifAbsent" function, which I think is too complicated - I prefer to have "ifAbsent" *only* create the new object, and not also do more with it. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:108: } On 2016/03/01 02:10:04, nweiz wrote: > Nit: putting some newlines in here would help me read it. Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/test/debounce_test.dart File test/debounce_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/debounce_test.dart... test/debounce_test.dart:5: // Test stream transformers. True, that's a left-over from the original file. https://codereview.chromium.org/1648963002/diff/20001/test/group_by_test.dart File test/group_by_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/group_by_test.dart... test/group_by_test.dart:143: } On 2016/03/01 02:10:04, nweiz wrote: > Test pausing and canceling behavior as well. Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart File test/scan_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart#new... test/scan_test.dart:18: test("subtraction", () { On 2016/03/01 02:10:04, nweiz wrote: > Given this, I think testing addition is probably redundant. Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart#new... test/scan_test.dart:48: test("types 3", () { On 2016/03/01 02:10:04, nweiz wrote: > Explain what these tests are actually asserting. Acknowledged. https://codereview.chromium.org/1648963002/diff/20001/test/scan_test.dart#new... test/scan_test.dart:61: yield* () async* { throw "BAD"; }(); // Stream with error. Probably true. I got tired of writing "new Future.error(...).asStream()", but it is more readable. I plan to add a "new Stream.error(e, s)" constructor at some point. https://codereview.chromium.org/1648963002/diff/20001/test/stream_transformer... File test/stream_transformers_test.dart (right): https://codereview.chromium.org/1648963002/diff/20001/test/stream_transformer... test/stream_transformers_test.dart:1: // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file Absolutely true.
https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... File lib/src/concatenate_streams.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/concatenate_str... lib/src/concatenate_streams.dart:15: Stream concatenateStreams(Iterable<Stream> streams) async* { On 2016/03/01 16:51:16, Lasse Reichstein Nielsen wrote: > This will forward both value and error events to the concatenated stream. A > `yield*` forwards all events. That's why you can use > yield* new Future.error(error, stack).asStream(); > to emit an error in an async* function without breaking the stream. Oh, interesting. I was under the impression that async* _guaranteed_ that only one error would be emitted. > Throttling and debouncing - I decided not to handle errors because I couldn't > decide whether they should count towards the throttling or not . Forwarding all > errors is another choice, and I think you are right that it is better - the user > can always cancel on error if they want that. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... File lib/src/transformers/group_by.dart (right): https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:14: class Group<K, V> { On 2016/03/01 16:51:17, Lasse Reichstein Nielsen wrote: > It's a long name for something fairly inconsequential. > What this basically is is a Pair<K,Stream<V>>. > > We generally don't use unnamed pairs because it's better (for some value of > "better") to have properly named accessors instead of "first" and "second", but > it has the disadvantage that we have to name a pair-class that itself doesn't > really correspond to any concept, it is just a pair. So, no obvious good name. > > Using "GroupBy" as prefix does show the connection, but it's also redundant when > you use it as part of the GroupBy signature. > > How about GroupValues or KeyGroup or ValuesByKey? > I like "ValuesByKey". https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:41: /// and close the streams for all groups. On 2016/03/01 16:51:17, Lasse Reichstein Nielsen wrote: > On 2016/03/01 02:10:04, nweiz wrote: > > The cancel and pause behavior feels wrong. Semantically, the stream of groups > is > > distinct from any group's stream, and as a user I wouldn't expect any > > modification of it to affect the other streams—it would feel like > > action-at-a-distance. > > Agree. > Maybe a better behavior is to keep trucking on the source stream, but not > creating any new groups - instead values that doesn't correspond to an existing > group are just discarded (which is effectively the same as creating a new group > and sending it on the cancelled stream, just without storing the events that you > don't need). > > > Here's a more concrete example of where this could be an issue: > > > > var groups = await stream.transform(new GroupBy(key: (value) => value.isEven)) > > .take(2).toList(); > > var evens = groups[groups[0].key == true ? 0 : 1]; > > var odds = groups[groups[0].key == false ? 0 : 1]; > > > > On the surface, this looks like a reasonable way to split a stream into even > and > > odd components, but in practice it will stop emitting events as soon as at > least > > one even and one odd value are encountered in the original stream. > > True. And it's code you *do* want to write because the toList will not work > without the take(2) - the transformed stream will only output two events, but it > won't close until it has seen the last event. > > > I'd make this behave like StreamSplitter, and only pause the underlying stream > > if *all* derived streams are paused, and same for canceling. I think that best > > represents what the user is likely to mean when they cancel individual > streams. > > I considered this for pause, but that doesn't work. You may have paused the six > groups you have seen so far, but the next event may create a new group, so > pausing the source is overdoing it. If the group stream has been cancelled, then > you can start pausing when all value streams are paused - because you actually > *know* all the streams then. When I say "all derived streams", I'm including the Stream<Group> as well as all the Stream<V>s. If *all* of them are paused or canceled, it's safe to pause the source because no new events or groups will be processed. https://codereview.chromium.org/1648963002/diff/20001/lib/src/transformers/gr... lib/src/transformers/group_by.dart:58: GroupBy({K key(S source), V value(S source)}) On 2016/03/01 16:51:17, Lasse Reichstein Nielsen wrote: > If you omit both functions, you will get streams of *equal* values, not > identical. So, you can group equal events, keyed by the first one from each > equivalence class. That's true, but it's very rare in Dart to have objects that are == without being functionally identical. I think it's fine to require users to explicitly write "(key) => key" in that case to avoid making them write "key:" in all the others. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
