| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..9abe75c54d4e52778b8dd89293a762ed4b86fd0a 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -386,6 +386,76 @@ abstract class Stream<T> {
|
| return new _MapStream<T, S>(this, convert);
|
| }
|
|
|
| + /// Groups events by a computed key.
|
| + ///
|
| + /// A key is extracted from incoming events.
|
| + /// The first time a key is seen, a stream is created for it, and emitted
|
| + /// on the returned stream, along with the key, as a [StreamGroup] object.
|
| + /// Then the event is emitted on the stream ([StreamGroup.values])
|
| + /// corresponding to the key.
|
| + ///
|
| + /// An error on the source stream, or when calling the `key` functions,
|
| + /// will emit the error on the returned stream.
|
| + ///
|
| + /// Canceling the subscription on the returned stream will stop processing
|
| + /// and close the streams for all groups.
|
| + ///
|
| + /// Pausing the subscription on the returned stream will pause processing
|
| + /// and no further events are added to streams for the individual groups.
|
| + ///
|
| + /// Pausing or canceling an individual group stream has no effect other than
|
| + /// on that stream. Events will be queued while the group stream
|
| + /// is paused and until it is first listened to.
|
| + /// If the [StreamGroup.values] stream is never listened to,
|
| + /// it will enqueue all the events unnecessarily.
|
| + Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) {
|
| + var controller;
|
| + controller = new StreamController<StreamGroup<K, T>>(
|
| + sync: true,
|
| + onListen: () {
|
| + var groupControllers = new HashMap<K, StreamController<T>>();
|
| +
|
| + void closeAll() {
|
| + for (var groupController in groupControllers.values) {
|
| + groupController.close();
|
| + }
|
| + }
|
| +
|
| + var subscription = this.listen(
|
| + (data) {
|
| + K theKey;
|
| + try {
|
| + theKey = key(data);
|
| + } catch (error, stackTrace) {
|
| + controller.addError(error, stackTrace);
|
| + return;
|
| + }
|
| + var groupController = groupControllers[theKey];
|
| + if (groupController == null) {
|
| + groupController =
|
| + new StreamController<T>.broadcast(sync: true);
|
| + groupControllers[theKey] = groupController;
|
| + controller.add(
|
| + new StreamGroup<K, T>(theKey, groupController.stream));
|
| + }
|
| + groupController.add(data);
|
| + },
|
| + onError: controller.addError,
|
| + onDone: () {
|
| + controller.close();
|
| + closeAll();
|
| + });
|
| + controller.onPause = subscription.pause;
|
| + controller.onResume = subscription.resume;
|
| + controller.onCancel = () {
|
| + subscription.cancel();
|
| + // Don't fire sync events in response to a callback.
|
| + scheduleMicrotask(closeAll);
|
| + };
|
| + });
|
| + return controller.stream;
|
| + }
|
| +
|
| /**
|
| * Creates a new stream with each data event of this stream asynchronously
|
| * mapped to a new event.
|
| @@ -1796,3 +1866,36 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
|
| _sink.close();
|
| }
|
| }
|
| +
|
| +/// A group created by [Stream.groupBy] or [Stream.groupByMapped].
|
| +///
|
| +/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key
|
| +/// it encounters.
|
| +/// This group contains the [key] itself, along with a stream of the [values]
|
| +/// associated with that key.
|
| +class StreamGroup<K, V> {
|
| + /// The key that identifiers the values emitted by [values].
|
| + final K key;
|
| +
|
| + /// The [values] that [GroupBy] have grouped by the common [key].
|
| + final Stream<V> values;
|
| +
|
| + factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._;
|
| +
|
| + // Don't expose a generative constructor.
|
| + // This class is not intended for subclassing, so we don't want to promise
|
| + // it. We can change that in the future.
|
| + StreamGroup._(this.key, this.values);
|
| +
|
| + /// Tells [values] to discard values instead of retaining them.
|
| + ///
|
| + /// Must only be used instead of listening to the [values] stream.
|
| + /// If the stream has been listened to, this call fails.
|
| + /// After calling this method, listening on the [values] stream fails.
|
| + Future cancel() {
|
| + // If values has been listened to,
|
| + // this throws a StateError saying that stream has already been listened to,
|
| + // which is a correct error message for this call too.
|
| + return values.listen(null).cancel();
|
| + }
|
| +}
|
|
|