Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(396)

Unified Diff: sdk/lib/async/stream.dart

Issue 2850393003: Add groupBy to Stream. (Closed)
Patch Set: Fix typos and warnings Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..9abe75c54d4e52778b8dd89293a762ed4b86fd0a 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -386,6 +386,76 @@ abstract class Stream<T> {
return new _MapStream<T, S>(this, convert);
}
+ /// Groups events by a computed key.
+ ///
+ /// A key is extracted from incoming events.
+ /// The first time a key is seen, a stream is created for it, and emitted
+ /// on the returned stream, along with the key, as a [StreamGroup] object.
+ /// Then the event is emitted on the stream ([StreamGroup.values])
+ /// corresponding to the key.
+ ///
+ /// An error on the source stream, or when calling the `key` functions,
+ /// will emit the error on the returned stream.
+ ///
+ /// Canceling the subscription on the returned stream will stop processing
+ /// and close the streams for all groups.
+ ///
+ /// Pausing the subscription on the returned stream will pause processing
+ /// and no further events are added to streams for the individual groups.
+ ///
+ /// Pausing or canceling an individual group stream has no effect other than
+ /// on that stream. Events will be queued while the group stream
+ /// is paused and until it is first listened to.
+ /// If the [StreamGroup.values] stream is never listened to,
+ /// it will enqueue all the events unnecessarily.
+ Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) {
+ var controller;
+ controller = new StreamController<StreamGroup<K, T>>(
+ sync: true,
+ onListen: () {
+ var groupControllers = new HashMap<K, StreamController<T>>();
+
+ void closeAll() {
+ for (var groupController in groupControllers.values) {
+ groupController.close();
+ }
+ }
+
+ var subscription = this.listen(
+ (data) {
+ K theKey;
+ try {
+ theKey = key(data);
+ } catch (error, stackTrace) {
+ controller.addError(error, stackTrace);
+ return;
+ }
+ var groupController = groupControllers[theKey];
+ if (groupController == null) {
+ groupController =
+ new StreamController<T>.broadcast(sync: true);
+ groupControllers[theKey] = groupController;
+ controller.add(
+ new StreamGroup<K, T>(theKey, groupController.stream));
+ }
+ groupController.add(data);
+ },
+ onError: controller.addError,
+ onDone: () {
+ controller.close();
+ closeAll();
+ });
+ controller.onPause = subscription.pause;
+ controller.onResume = subscription.resume;
+ controller.onCancel = () {
+ subscription.cancel();
+ // Don't fire sync events in response to a callback.
+ scheduleMicrotask(closeAll);
+ };
+ });
+ return controller.stream;
+ }
+
/**
* Creates a new stream with each data event of this stream asynchronously
* mapped to a new event.
@@ -1796,3 +1866,36 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
_sink.close();
}
}
+
+/// A group created by [Stream.groupBy] or [Stream.groupByMapped].
+///
+/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key
+/// it encounters.
+/// This group contains the [key] itself, along with a stream of the [values]
+/// associated with that key.
+class StreamGroup<K, V> {
+ /// The key that identifiers the values emitted by [values].
+ final K key;
+
+ /// The [values] that [GroupBy] have grouped by the common [key].
+ final Stream<V> values;
+
+ factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._;
+
+ // Don't expose a generative constructor.
+ // This class is not intended for subclassing, so we don't want to promise
+ // it. We can change that in the future.
+ StreamGroup._(this.key, this.values);
+
+ /// Tells [values] to discard values instead of retaining them.
+ ///
+ /// Must only be used instead of listening to the [values] stream.
+ /// If the stream has been listened to, this call fails.
+ /// After calling this method, listening on the [values] stream fails.
+ Future cancel() {
+ // If values has been listened to,
+ // this throws a StateError saying that stream has already been listened to,
+ // which is a correct error message for this call too.
+ return values.listen(null).cancel();
+ }
+}
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698