Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..a43b9c38aaeaa80b9f62a2aa8a917e6817c3b87d 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -386,6 +386,76 @@ abstract class Stream<T> { |
| return new _MapStream<T, S>(this, convert); |
| } |
| + /// Groups events by a computed key. |
| + /// |
| + /// A key is extracted from incoming events. |
| + /// The first time a key is seen, a stream is created for it, and emitted |
| + /// on the returned stream, along with the key, as a [StreamGroup] object. |
| + /// Then the event is emitted on the stream ([StreamGroup.values]) |
| + /// corresponding to the key. |
| + /// |
| + /// An error on the source stream, or when calling the `key` functions, |
| + /// will emit the error on the returned stream. |
| + /// |
| + /// Canceling the subscription on the returned stream will stop processing |
| + /// and close the streams for all groups. |
| + /// |
| + /// Pausing the subscription on the returned stream will pause processing |
| + /// and no further events are added to streams for the individual groups. |
| + /// |
| + /// Pausing or canceling an individual group stream has no effect other than |
| + /// on that stream. Events will be queued while the group stream |
| + /// is paused and until it is first listened to. |
| + /// If the [StreamGroup.values] stream is never listened to, |
| + /// it will enqueue all the events unnecessarily. |
|
floitsch
2017/05/04 13:32:09
One should therefore not ignore streams of keys th
|
| + Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { |
| + var controller; |
| + controller = new StreamController<Group<K, V>>( |
| + sync: true, |
| + onListen: () { |
| + var groupControllers = new HashMap<K, StreamController<T>>(); |
| + |
| + void closeAll() { |
| + for (var groupController in groupControllers.values) { |
| + groupController.close(); |
| + } |
| + } |
| + |
| + var subscription = this.listen( |
| + (data) { |
| + K theKey; |
| + try { |
| + theKey = key(data); |
| + } catch (error, stackTrace) { |
| + controller.addError(error, stackTrace); |
| + return; |
| + } |
| + var groupController = groupControllers[theKey]; |
| + if (groupController == null) { |
| + groupController = |
| + new StreamController<T>.broadcast(sync: true); |
| + groupControllers[theKey] = groupController; |
| + controller.add( |
| + new StreamGroup<K, T>(theKey, groupController.stream)); |
| + } |
| + groupController.add(data); |
| + }, |
| + onError: controller.addError, |
| + onDone: () { |
| + controller.close(); |
| + closeAll(); |
| + }); |
| + controller.onPause = subscription.pause; |
| + controller.onResume = subscription.resume; |
| + controller.onCancel = () { |
| + subscription.cancel(); |
| + // Don't fire sync events in response to a callback. |
| + scheduleMicrotask(closeAll); |
| + }; |
| + }); |
| + return controller.stream; |
| + } |
| + |
| /** |
| * Creates a new stream with each data event of this stream asynchronously |
| * mapped to a new event. |
| @@ -1796,3 +1866,31 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
| _sink.close(); |
| } |
| } |
| + |
| +/// A group created by [Stream.groupBy] or [Stream.groupByMapped]. |
| +/// |
| +/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key |
| +/// it encounters. |
| +/// This group contains the [key] itself, along with a stream of the [values] |
| +/// associated with that key. |
| +class StreamGroup<K, V> { |
| + /// The key that identifiers the values emitted by [values]. |
| + final K key; |
| + |
| + /// The [values] that [GroupBy] have grouped by the common [key]. |
| + final Stream<V> values; |
| + |
| + StreamGroup(this.key, this.values); |
| + |
| + /// Tells [values] to discard values instead of retaining them. |
| + /// |
| + /// Must only be used instead of listening to the [values] stream. |
| + /// If the stream has been listened to, this call fails. |
| + /// After calling this method, listening on the [values] stream fails. |
| + Future cancel() { |
| + // If values has been listened to, |
| + // this throws a StateError saying that stream has already been listened to, |
| + // which is a correct error message for this call too. |
| + return values.listen(null).cancel(); |
| + } |
| +} |