Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 import "dart:async"; | |
| 6 import "dart:collection" show HashMap; | |
| 7 | |
| 8 /// A group created by [GroupBy]. | |
| 9 /// | |
| 10 /// The stream created by [GroupBy] emits a [Group] for each distinct key | |
| 11 /// it sees. | |
| 12 /// This group contains the key itself, along with a stream of the values | |
| 13 /// associated with that key. | |
| 14 class Group<K, V> { | |
|
nweiz
2016/03/01 02:10:04
What do you think of calling this "GroupByGroup"?
Lasse Reichstein Nielsen
2016/03/01 16:51:17
It's a long name for something fairly inconsequent
nweiz
2016/03/08 23:57:34
I like "ValuesByKey".
| |
| 15 /// The key that identifiers the values emitted by [values]. | |
| 16 final K key; | |
| 17 | |
| 18 /// The [values] that [GroupBy] have grouped by the common [key]. | |
| 19 final Stream<V> values; | |
| 20 | |
| 21 Group(this.key, this.values); | |
| 22 } | |
| 23 | |
| 24 /// Groups events by a computed key. | |
| 25 /// | |
| 26 /// A key is extracted from incoming events, and a stream is created for each | |
| 27 /// unique key. A value is computed from the event as well, and emitted on the | |
| 28 /// corresponding stream. | |
| 29 /// | |
| 30 /// The returned stream emits a [Group] object for each distinct key seen | |
| 31 /// by the transformation, and the values associated with the key are output | |
| 32 /// on the [Group.values] stream. | |
| 33 /// | |
| 34 /// An error on the source stream, or when calling the `key` functions, | |
| 35 /// will emit the error on the returned stream. | |
| 36 /// | |
| 37 /// An error when calling the `value` function, but with a successful call | |
| 38 /// of the `key` function, is reported on the stream for the corresponding key. | |
| 39 /// | |
| 40 /// Canceling the subscription on the returned stream will stop processing | |
| 41 /// and close the streams for all groups. | |
|
nweiz
2016/03/01 02:10:04
The cancel and pause behavior feels wrong. Semanti
Lasse Reichstein Nielsen
2016/03/01 16:51:17
Agree.
Maybe a better behavior is to keep truckin
nweiz
2016/03/08 23:57:34
When I say "all derived streams", I'm including th
| |
| 42 /// | |
| 43 /// Pausing the subscription on the returned stream will pause processing | |
| 44 /// and no further events are added to streams for the individual groups. | |
| 45 /// | |
| 46 /// Pausing or canceling the a group stream has no effect other than | |
|
nweiz
2016/03/01 02:10:04
"the a" -> "a"
Lasse Reichstein Nielsen
2016/03/01 16:51:17
Acknowledged.
| |
| 47 /// on the individual stream. Events will be queued while the group stream | |
| 48 /// is paused and until it is first listened to. | |
| 49 class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> { | |
| 50 final Function _key; | |
| 51 final Function _value; | |
| 52 | |
| 53 /// Groups values returned by [value] by the key returned by [key]. | |
| 54 /// | |
| 55 /// If [key] or [value] (or both) are omitted, they default to the identity | |
| 56 /// function. In that case, [K] and/or [V] should be types that [S] is | |
| 57 /// assignable to. | |
| 58 GroupBy({K key(S source), V value(S source)}) | |
|
nweiz
2016/03/01 02:10:04
It seems very unlikely that anyone would want to o
Lasse Reichstein Nielsen
2016/03/01 16:51:17
If you omit both functions, you will get streams o
nweiz
2016/03/08 23:57:34
That's true, but it's very rare in Dart to have ob
| |
| 59 : _key = key ?? _identity, | |
| 60 _value = value ?? _identity; | |
| 61 | |
| 62 // Helper function. | |
| 63 static _identity(x) => x; | |
| 64 | |
| 65 Stream<Group<K, V>> bind(Stream<S> stream) { | |
| 66 var controller; | |
| 67 controller = new StreamController<Group<K, V>>(sync: true, onListen: () { | |
| 68 var groupControllers = new HashMap(); | |
| 69 void closeAll() { | |
| 70 for (var groupController in groupControllers.values) { | |
| 71 groupController.close(); | |
| 72 } | |
| 73 } | |
| 74 var subscription = stream.listen((data) { | |
| 75 K key; | |
| 76 try { | |
| 77 key = _key(data); | |
| 78 } catch (e, s) { | |
| 79 controller.addError(e, s); | |
| 80 return; | |
| 81 } | |
| 82 var groupController = groupControllers[key]; | |
|
nweiz
2016/03/01 02:10:04
putIfAbsent?
Lasse Reichstein Nielsen
2016/03/01 16:51:17
I also need to do controller.add for a new value,
| |
| 83 if (groupController == null) { | |
| 84 groupController = new StreamController<V>(sync: true); | |
| 85 groupControllers[key] = groupController; | |
| 86 controller.add(new Group<K, V>(key, groupController.stream)); | |
| 87 } | |
| 88 V value; | |
| 89 try { | |
| 90 value = _value(data); | |
| 91 } catch (e, s) { | |
| 92 groupController.addError(e, s); | |
| 93 return; | |
| 94 } | |
| 95 groupController.add(value); | |
| 96 }, onError: controller.addError, onDone: () { | |
| 97 controller.close(); | |
| 98 closeAll(); | |
| 99 }); | |
| 100 controller.onPause = subscription.pause; | |
| 101 controller.onResume = subscription.resume; | |
| 102 controller.onCancel = () { | |
| 103 subscription.cancel(); | |
| 104 closeAll(); | |
| 105 }; | |
| 106 }); | |
| 107 return controller.stream; | |
| 108 } | |
|
nweiz
2016/03/01 02:10:04
Nit: putting some newlines in here would help me r
Lasse Reichstein Nielsen
2016/03/01 16:51:17
Acknowledged.
| |
| 109 } | |
| OLD | NEW |