Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(275)

Unified Diff: sdk/lib/async/stream.dart

Issue 2850393003: Add groupBy to Stream. (Closed)
Patch Set: Add test, should add more. Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/lib/async/stream_group_by_test.dart » ('j') | tests/lib/async/stream_group_by_test.dart » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..70fd7a035f097e0f10174858d793ef2a2f4fe3e7 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -386,6 +386,109 @@ abstract class Stream<T> {
return new _MapStream<T, S>(this, convert);
}
+ /// Groups events by a computed key.
+ ///
+ /// A key is extracted from incoming events, and a stream is created for each
+ /// unique key (based on the `operator==` of the keys).
+ ///
+ /// The returned stream emits a [StreamGroup] object for each distinct key
+ /// seen by the transformation, and the events associated with the key are
+ /// output [StreamGroup.values] stream.
floitsch 2017/05/02 12:23:53 That sentence sounds wrong.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Rewritten.
+ ///
+ /// An error on the source stream, or when calling the `key` functions,
+ /// will emit the error on the returned stream.
+ ///
+ /// Canceling the subscription on the returned stream will stop processing
+ /// and close the streams for all groups.
+ ///
+ /// Pausing the subscription on the returned stream will pause processing
+ /// and no further events are added to streams for the individual groups.
+ ///
+ /// Pausing or canceling an individual group stream has no effect other than
+ /// on that stream. Events will be queued while the group stream
+ /// is paused and until it is first listened to.
floitsch 2017/05/02 12:23:53 Mention that this could lead to memory issues. Al
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Added something about memory issues.
+ Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) =>
+ groupValuesBy<K, T>(key, (T x) => x);
+
+ /// Groups values selected from events by a computed key.
+ ///
+ /// A key is extracted from incoming events, and a stream is created for each
+ /// unique key (based on the `operator==` of the keys).
+ /// A value is computed from the event as well, and emitted on the
+ /// corresponding stream.
+ ///
+ /// The returned stream emits a [StreamGroup] object for each distinct key
+ /// seen by the transformation, and the values associated with the key are
+ /// output [StreamGroup.values] stream.
+ ///
+ /// An error on the source stream, or when calling the `key` functions,
+ /// will emit the error on the returned stream.
+ ///
+ /// An error when calling the `value` function, but with a successful call
+ /// of the `key` function, is reported on the stream for the corresponding key.
floitsch 2017/05/02 12:23:53 long line.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Done.
+ ///
+ /// Canceling the subscription on the returned stream will stop processing
+ /// and close the streams for all groups.
+ ///
+ /// Pausing the subscription on the returned stream will pause processing
+ /// and no further events are added to streams for the individual groups.
+ ///
+ /// Pausing or canceling an individual group stream has no effect other than
+ /// on that stream. Events will be queued while the group stream
+ /// is paused and until it is first listened to.
+ Stream<StreamGroup<K, V>> groupValuesBy<K, V>(
floitsch 2017/05/02 12:23:53 I think it would be cleaner if `StreamGroup` had a
Lasse Reichstein Nielsen 2017/05/04 11:05:20 That is probably more interesting. But then the ne
+ K key(T event), V value(T event)) {
+ var controller;
+ controller = new StreamController<Group<K, V>>(
+ sync: true,
+ onListen: () {
+ var groupControllers = new HashMap();
+ void closeAll() {
floitsch 2017/05/02 12:23:53 New line before nested function.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Done.
+ for (var groupController in groupControllers.values) {
+ groupController.close();
+ }
+ }
+
+ var subscription = this.listen(
+ (data) {
floitsch 2017/05/02 12:23:53 This looks like it wasn't dartfmt.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Looks deceive :)
+ K theKey;
+ try {
+ theKey = key(data);
+ } catch (error, stackTrace) {
+ controller.addError(error, stackTrace);
+ return;
+ }
+ var groupController = groupControllers[theKey];
+ if (groupController == null) {
+ groupController = new StreamController<T>(sync: true);
+ groupControllers[theKey] = groupController;
+ controller.add(
+ new StreamGroup<K, T>(theKey, groupController.stream));
+ }
+ V theValue;
+ try {
+ theValue = value(data);
+ } catch (error, stackTrace) {
+ groupController.addError(error, stackTrace);
floitsch 2017/05/02 12:23:53 Since the group controller is sync, is there enoug
Lasse Reichstein Nielsen 2017/05/04 11:05:20 That's expected behavior - adding errors to a paus
+ return;
+ }
+ groupController.add(theValue);
+ },
+ onError: controller.addError,
+ onDone: () {
+ controller.close();
+ closeAll();
+ });
+ controller.onPause = subscription.pause;
+ controller.onResume = subscription.resume;
+ controller.onCancel = () {
+ subscription.cancel();
+ closeAll();
+ };
+ });
+ return controller.stream;
+ }
+
/**
* Creates a new stream with each data event of this stream asynchronously
* mapped to a new event.
@@ -1796,3 +1899,19 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
_sink.close();
}
}
+
+/// A group created by [Stream.groupBy] or [Stream.groupValuesBy].
+///
+/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key
+/// it encounters.
+/// This group contains the [key] itself, along with a stream of the [values]
+/// associated with that key.
+class StreamGroup<K, V> {
floitsch 2017/05/02 12:23:53 This class could have: StreamGroup<K, V2> mapValu
Lasse Reichstein Nielsen 2017/05/04 11:05:20 As said above, let's not do that (yet).
+ /// The key that identifiers the values emitted by [values].
+ final K key;
+
+ /// The [values] that [GroupBy] have grouped by the common [key].
+ final Stream<V> values;
+
+ StreamGroup(this.key, this.values);
+}
« no previous file with comments | « no previous file | tests/lib/async/stream_group_by_test.dart » ('j') | tests/lib/async/stream_group_by_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698