Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..9abe75c54d4e52778b8dd89293a762ed4b86fd0a 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -386,6 +386,76 @@ abstract class Stream<T> { |
return new _MapStream<T, S>(this, convert); |
} |
+ /// Groups events by a computed key. |
+ /// |
+ /// A key is extracted from incoming events. |
+ /// The first time a key is seen, a stream is created for it, and emitted |
+ /// on the returned stream, along with the key, as a [StreamGroup] object. |
+ /// Then the event is emitted on the stream ([StreamGroup.values]) |
+ /// corresponding to the key. |
+ /// |
+ /// An error on the source stream, or when calling the `key` functions, |
+ /// will emit the error on the returned stream. |
+ /// |
+ /// Canceling the subscription on the returned stream will stop processing |
+ /// and close the streams for all groups. |
+ /// |
+ /// Pausing the subscription on the returned stream will pause processing |
+ /// and no further events are added to streams for the individual groups. |
+ /// |
+ /// Pausing or canceling an individual group stream has no effect other than |
+ /// on that stream. Events will be queued while the group stream |
+ /// is paused and until it is first listened to. |
+ /// If the [StreamGroup.values] stream is never listened to, |
+ /// it will enqueue all the events unnecessarily. |
+ Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { |
+ var controller; |
+ controller = new StreamController<StreamGroup<K, T>>( |
+ sync: true, |
+ onListen: () { |
+ var groupControllers = new HashMap<K, StreamController<T>>(); |
+ |
+ void closeAll() { |
+ for (var groupController in groupControllers.values) { |
+ groupController.close(); |
+ } |
+ } |
+ |
+ var subscription = this.listen( |
+ (data) { |
+ K theKey; |
+ try { |
+ theKey = key(data); |
+ } catch (error, stackTrace) { |
+ controller.addError(error, stackTrace); |
+ return; |
+ } |
+ var groupController = groupControllers[theKey]; |
+ if (groupController == null) { |
+ groupController = |
+ new StreamController<T>.broadcast(sync: true); |
+ groupControllers[theKey] = groupController; |
+ controller.add( |
+ new StreamGroup<K, T>(theKey, groupController.stream)); |
+ } |
+ groupController.add(data); |
+ }, |
+ onError: controller.addError, |
+ onDone: () { |
+ controller.close(); |
+ closeAll(); |
+ }); |
+ controller.onPause = subscription.pause; |
+ controller.onResume = subscription.resume; |
+ controller.onCancel = () { |
+ subscription.cancel(); |
+ // Don't fire sync events in response to a callback. |
+ scheduleMicrotask(closeAll); |
+ }; |
+ }); |
+ return controller.stream; |
+ } |
+ |
/** |
* Creates a new stream with each data event of this stream asynchronously |
* mapped to a new event. |
@@ -1796,3 +1866,36 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
_sink.close(); |
} |
} |
+ |
+/// A group created by [Stream.groupBy] or [Stream.groupByMapped]. |
+/// |
+/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key |
+/// it encounters. |
+/// This group contains the [key] itself, along with a stream of the [values] |
+/// associated with that key. |
+class StreamGroup<K, V> { |
+ /// The key that identifiers the values emitted by [values]. |
+ final K key; |
+ |
+ /// The [values] that [GroupBy] have grouped by the common [key]. |
+ final Stream<V> values; |
+ |
+ factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._; |
+ |
+ // Don't expose a generative constructor. |
+ // This class is not intended for subclassing, so we don't want to promise |
+ // it. We can change that in the future. |
+ StreamGroup._(this.key, this.values); |
+ |
+ /// Tells [values] to discard values instead of retaining them. |
+ /// |
+ /// Must only be used instead of listening to the [values] stream. |
+ /// If the stream has been listened to, this call fails. |
+ /// After calling this method, listening on the [values] stream fails. |
+ Future cancel() { |
+ // If values has been listened to, |
+ // this throws a StateError saying that stream has already been listened to, |
+ // which is a correct error message for this call too. |
+ return values.listen(null).cancel(); |
+ } |
+} |