Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(154)

Side by Side Diff: lib/src/transformers/group_by.dart

Issue 1648963002: Add reactive-inspired stream transformers: Base URL: https://github.com/dart-lang/async@master
Patch Set: Restructure failes and add more tests. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import "dart:async";
6 import "dart:collection" show HashMap;
7
8 /// A group created by [GroupBy].
9 ///
10 /// The stream created by [GroupBy] emits a [Group] for each distinct key
11 /// it sees.
12 /// This group contains the key itself, along with a stream of the values
13 /// associated with that key.
14 class Group<K, V> {
nweiz 2016/03/01 02:10:04 What do you think of calling this "GroupByGroup"?
Lasse Reichstein Nielsen 2016/03/01 16:51:17 It's a long name for something fairly inconsequent
nweiz 2016/03/08 23:57:34 I like "ValuesByKey".
15 /// The key that identifiers the values emitted by [values].
16 final K key;
17
18 /// The [values] that [GroupBy] have grouped by the common [key].
19 final Stream<V> values;
20
21 Group(this.key, this.values);
22 }
23
24 /// Groups events by a computed key.
25 ///
26 /// A key is extracted from incoming events, and a stream is created for each
27 /// unique key. A value is computed from the event as well, and emitted on the
28 /// corresponding stream.
29 ///
30 /// The returned stream emits a [Group] object for each distinct key seen
31 /// by the transformation, and the values associated with the key are output
32 /// on the [Group.values] stream.
33 ///
34 /// An error on the source stream, or when calling the `key` functions,
35 /// will emit the error on the returned stream.
36 ///
37 /// An error when calling the `value` function, but with a successful call
38 /// of the `key` function, is reported on the stream for the corresponding key.
39 ///
40 /// Canceling the subscription on the returned stream will stop processing
41 /// and close the streams for all groups.
nweiz 2016/03/01 02:10:04 The cancel and pause behavior feels wrong. Semanti
Lasse Reichstein Nielsen 2016/03/01 16:51:17 Agree. Maybe a better behavior is to keep truckin
nweiz 2016/03/08 23:57:34 When I say "all derived streams", I'm including th
42 ///
43 /// Pausing the subscription on the returned stream will pause processing
44 /// and no further events are added to streams for the individual groups.
45 ///
46 /// Pausing or canceling the a group stream has no effect other than
nweiz 2016/03/01 02:10:04 "the a" -> "a"
Lasse Reichstein Nielsen 2016/03/01 16:51:17 Acknowledged.
47 /// on the individual stream. Events will be queued while the group stream
48 /// is paused and until it is first listened to.
49 class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> {
50 final Function _key;
51 final Function _value;
52
53 /// Groups values returned by [value] by the key returned by [key].
54 ///
55 /// If [key] or [value] (or both) are omitted, they default to the identity
56 /// function. In that case, [K] and/or [V] should be types that [S] is
57 /// assignable to.
58 GroupBy({K key(S source), V value(S source)})
nweiz 2016/03/01 02:10:04 It seems very unlikely that anyone would want to o
Lasse Reichstein Nielsen 2016/03/01 16:51:17 If you omit both functions, you will get streams o
nweiz 2016/03/08 23:57:34 That's true, but it's very rare in Dart to have ob
59 : _key = key ?? _identity,
60 _value = value ?? _identity;
61
62 // Helper function.
63 static _identity(x) => x;
64
65 Stream<Group<K, V>> bind(Stream<S> stream) {
66 var controller;
67 controller = new StreamController<Group<K, V>>(sync: true, onListen: () {
68 var groupControllers = new HashMap();
69 void closeAll() {
70 for (var groupController in groupControllers.values) {
71 groupController.close();
72 }
73 }
74 var subscription = stream.listen((data) {
75 K key;
76 try {
77 key = _key(data);
78 } catch (e, s) {
79 controller.addError(e, s);
80 return;
81 }
82 var groupController = groupControllers[key];
nweiz 2016/03/01 02:10:04 putIfAbsent?
Lasse Reichstein Nielsen 2016/03/01 16:51:17 I also need to do controller.add for a new value,
83 if (groupController == null) {
84 groupController = new StreamController<V>(sync: true);
85 groupControllers[key] = groupController;
86 controller.add(new Group<K, V>(key, groupController.stream));
87 }
88 V value;
89 try {
90 value = _value(data);
91 } catch (e, s) {
92 groupController.addError(e, s);
93 return;
94 }
95 groupController.add(value);
96 }, onError: controller.addError, onDone: () {
97 controller.close();
98 closeAll();
99 });
100 controller.onPause = subscription.pause;
101 controller.onResume = subscription.resume;
102 controller.onCancel = () {
103 subscription.cancel();
104 closeAll();
105 };
106 });
107 return controller.stream;
108 }
nweiz 2016/03/01 02:10:04 Nit: putting some newlines in here would help me r
Lasse Reichstein Nielsen 2016/03/01 16:51:17 Acknowledged.
109 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698