Chromium Code Reviews| Index: lib/src/transformers/group_by.dart |
| diff --git a/lib/src/transformers/group_by.dart b/lib/src/transformers/group_by.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..3eefb07b5c7eccd5faca8b65056582846a508173 |
| --- /dev/null |
| +++ b/lib/src/transformers/group_by.dart |
| @@ -0,0 +1,109 @@ |
| +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import "dart:async"; |
| +import "dart:collection" show HashMap; |
| + |
| +/// A group created by [GroupBy]. |
| +/// |
| +/// The stream created by [GroupBy] emits a [Group] for each distinct key |
| +/// it sees. |
| +/// This group contains the key itself, along with a stream of the values |
| +/// associated with that key. |
| +class Group<K, V> { |
|
nweiz
2016/03/01 02:10:04
What do you think of calling this "GroupByGroup"?
Lasse Reichstein Nielsen
2016/03/01 16:51:17
It's a long name for something fairly inconsequent
nweiz
2016/03/08 23:57:34
I like "ValuesByKey".
|
| + /// The key that identifiers the values emitted by [values]. |
| + final K key; |
| + |
| + /// The [values] that [GroupBy] have grouped by the common [key]. |
| + final Stream<V> values; |
| + |
| + Group(this.key, this.values); |
| +} |
| + |
| +/// Groups events by a computed key. |
| +/// |
| +/// A key is extracted from incoming events, and a stream is created for each |
| +/// unique key. A value is computed from the event as well, and emitted on the |
| +/// corresponding stream. |
| +/// |
| +/// The returned stream emits a [Group] object for each distinct key seen |
| +/// by the transformation, and the values associated with the key are output |
| +/// on the [Group.values] stream. |
| +/// |
| +/// An error on the source stream, or when calling the `key` functions, |
| +/// will emit the error on the returned stream. |
| +/// |
| +/// An error when calling the `value` function, but with a successful call |
| +/// of the `key` function, is reported on the stream for the corresponding key. |
| +/// |
| +/// Canceling the subscription on the returned stream will stop processing |
| +/// and close the streams for all groups. |
|
nweiz
2016/03/01 02:10:04
The cancel and pause behavior feels wrong. Semanti
Lasse Reichstein Nielsen
2016/03/01 16:51:17
Agree.
Maybe a better behavior is to keep truckin
nweiz
2016/03/08 23:57:34
When I say "all derived streams", I'm including th
|
| +/// |
| +/// Pausing the subscription on the returned stream will pause processing |
| +/// and no further events are added to streams for the individual groups. |
| +/// |
| +/// Pausing or canceling the a group stream has no effect other than |
|
nweiz
2016/03/01 02:10:04
"the a" -> "a"
Lasse Reichstein Nielsen
2016/03/01 16:51:17
Acknowledged.
|
| +/// on the individual stream. Events will be queued while the group stream |
| +/// is paused and until it is first listened to. |
| +class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> { |
| + final Function _key; |
| + final Function _value; |
| + |
| + /// Groups values returned by [value] by the key returned by [key]. |
| + /// |
| + /// If [key] or [value] (or both) are omitted, they default to the identity |
| + /// function. In that case, [K] and/or [V] should be types that [S] is |
| + /// assignable to. |
| + GroupBy({K key(S source), V value(S source)}) |
|
nweiz
2016/03/01 02:10:04
It seems very unlikely that anyone would want to o
Lasse Reichstein Nielsen
2016/03/01 16:51:17
If you omit both functions, you will get streams o
nweiz
2016/03/08 23:57:34
That's true, but it's very rare in Dart to have ob
|
| + : _key = key ?? _identity, |
| + _value = value ?? _identity; |
| + |
| + // Helper function. |
| + static _identity(x) => x; |
| + |
| + Stream<Group<K, V>> bind(Stream<S> stream) { |
| + var controller; |
| + controller = new StreamController<Group<K, V>>(sync: true, onListen: () { |
| + var groupControllers = new HashMap(); |
| + void closeAll() { |
| + for (var groupController in groupControllers.values) { |
| + groupController.close(); |
| + } |
| + } |
| + var subscription = stream.listen((data) { |
| + K key; |
| + try { |
| + key = _key(data); |
| + } catch (e, s) { |
| + controller.addError(e, s); |
| + return; |
| + } |
| + var groupController = groupControllers[key]; |
|
nweiz
2016/03/01 02:10:04
putIfAbsent?
Lasse Reichstein Nielsen
2016/03/01 16:51:17
I also need to do controller.add for a new value,
|
| + if (groupController == null) { |
| + groupController = new StreamController<V>(sync: true); |
| + groupControllers[key] = groupController; |
| + controller.add(new Group<K, V>(key, groupController.stream)); |
| + } |
| + V value; |
| + try { |
| + value = _value(data); |
| + } catch (e, s) { |
| + groupController.addError(e, s); |
| + return; |
| + } |
| + groupController.add(value); |
| + }, onError: controller.addError, onDone: () { |
| + controller.close(); |
| + closeAll(); |
| + }); |
| + controller.onPause = subscription.pause; |
| + controller.onResume = subscription.resume; |
| + controller.onCancel = () { |
| + subscription.cancel(); |
| + closeAll(); |
| + }; |
| + }); |
| + return controller.stream; |
| + } |
|
nweiz
2016/03/01 02:10:04
Nit: putting some newlines in here would help me r
Lasse Reichstein Nielsen
2016/03/01 16:51:17
Acknowledged.
|
| +} |