Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(526)

Unified Diff: lib/src/transformers/group_by.dart

Issue 1648963002: Add reactive-inspired stream transformers: Base URL: https://github.com/dart-lang/async@master
Patch Set: Restructure failes and add more tests. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/transformers/group_by.dart
diff --git a/lib/src/transformers/group_by.dart b/lib/src/transformers/group_by.dart
new file mode 100644
index 0000000000000000000000000000000000000000..3eefb07b5c7eccd5faca8b65056582846a508173
--- /dev/null
+++ b/lib/src/transformers/group_by.dart
@@ -0,0 +1,109 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+import "dart:collection" show HashMap;
+
+/// A group created by [GroupBy].
+///
+/// The stream created by [GroupBy] emits a [Group] for each distinct key
+/// it sees.
+/// This group contains the key itself, along with a stream of the values
+/// associated with that key.
+class Group<K, V> {
nweiz 2016/03/01 02:10:04 What do you think of calling this "GroupByGroup"?
Lasse Reichstein Nielsen 2016/03/01 16:51:17 It's a long name for something fairly inconsequent
nweiz 2016/03/08 23:57:34 I like "ValuesByKey".
+ /// The key that identifiers the values emitted by [values].
+ final K key;
+
+ /// The [values] that [GroupBy] have grouped by the common [key].
+ final Stream<V> values;
+
+ Group(this.key, this.values);
+}
+
+/// Groups events by a computed key.
+///
+/// A key is extracted from incoming events, and a stream is created for each
+/// unique key. A value is computed from the event as well, and emitted on the
+/// corresponding stream.
+///
+/// The returned stream emits a [Group] object for each distinct key seen
+/// by the transformation, and the values associated with the key are output
+/// on the [Group.values] stream.
+///
+/// An error on the source stream, or when calling the `key` functions,
+/// will emit the error on the returned stream.
+///
+/// An error when calling the `value` function, but with a successful call
+/// of the `key` function, is reported on the stream for the corresponding key.
+///
+/// Canceling the subscription on the returned stream will stop processing
+/// and close the streams for all groups.
nweiz 2016/03/01 02:10:04 The cancel and pause behavior feels wrong. Semanti
Lasse Reichstein Nielsen 2016/03/01 16:51:17 Agree. Maybe a better behavior is to keep truckin
nweiz 2016/03/08 23:57:34 When I say "all derived streams", I'm including th
+///
+/// Pausing the subscription on the returned stream will pause processing
+/// and no further events are added to streams for the individual groups.
+///
+/// Pausing or canceling the a group stream has no effect other than
nweiz 2016/03/01 02:10:04 "the a" -> "a"
Lasse Reichstein Nielsen 2016/03/01 16:51:17 Acknowledged.
+/// on the individual stream. Events will be queued while the group stream
+/// is paused and until it is first listened to.
+class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> {
+ final Function _key;
+ final Function _value;
+
+ /// Groups values returned by [value] by the key returned by [key].
+ ///
+ /// If [key] or [value] (or both) are omitted, they default to the identity
+ /// function. In that case, [K] and/or [V] should be types that [S] is
+ /// assignable to.
+ GroupBy({K key(S source), V value(S source)})
nweiz 2016/03/01 02:10:04 It seems very unlikely that anyone would want to o
Lasse Reichstein Nielsen 2016/03/01 16:51:17 If you omit both functions, you will get streams o
nweiz 2016/03/08 23:57:34 That's true, but it's very rare in Dart to have ob
+ : _key = key ?? _identity,
+ _value = value ?? _identity;
+
+ // Helper function.
+ static _identity(x) => x;
+
+ Stream<Group<K, V>> bind(Stream<S> stream) {
+ var controller;
+ controller = new StreamController<Group<K, V>>(sync: true, onListen: () {
+ var groupControllers = new HashMap();
+ void closeAll() {
+ for (var groupController in groupControllers.values) {
+ groupController.close();
+ }
+ }
+ var subscription = stream.listen((data) {
+ K key;
+ try {
+ key = _key(data);
+ } catch (e, s) {
+ controller.addError(e, s);
+ return;
+ }
+ var groupController = groupControllers[key];
nweiz 2016/03/01 02:10:04 putIfAbsent?
Lasse Reichstein Nielsen 2016/03/01 16:51:17 I also need to do controller.add for a new value,
+ if (groupController == null) {
+ groupController = new StreamController<V>(sync: true);
+ groupControllers[key] = groupController;
+ controller.add(new Group<K, V>(key, groupController.stream));
+ }
+ V value;
+ try {
+ value = _value(data);
+ } catch (e, s) {
+ groupController.addError(e, s);
+ return;
+ }
+ groupController.add(value);
+ }, onError: controller.addError, onDone: () {
+ controller.close();
+ closeAll();
+ });
+ controller.onPause = subscription.pause;
+ controller.onResume = subscription.resume;
+ controller.onCancel = () {
+ subscription.cancel();
+ closeAll();
+ };
+ });
+ return controller.stream;
+ }
nweiz 2016/03/01 02:10:04 Nit: putting some newlines in here would help me r
Lasse Reichstein Nielsen 2016/03/01 16:51:17 Acknowledged.
+}

Powered by Google App Engine
This is Rietveld 408576698