Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..70fd7a035f097e0f10174858d793ef2a2f4fe3e7 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -386,6 +386,109 @@ abstract class Stream<T> { |
| return new _MapStream<T, S>(this, convert); |
| } |
| + /// Groups events by a computed key. |
| + /// |
| + /// A key is extracted from incoming events, and a stream is created for each |
| + /// unique key (based on the `operator==` of the keys). |
| + /// |
| + /// The returned stream emits a [StreamGroup] object for each distinct key |
| + /// seen by the transformation, and the events associated with the key are |
| + /// output [StreamGroup.values] stream. |
|
floitsch
2017/05/02 12:23:53
That sentence sounds wrong.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Rewritten.
|
| + /// |
| + /// An error on the source stream, or when calling the `key` functions, |
| + /// will emit the error on the returned stream. |
| + /// |
| + /// Canceling the subscription on the returned stream will stop processing |
| + /// and close the streams for all groups. |
| + /// |
| + /// Pausing the subscription on the returned stream will pause processing |
| + /// and no further events are added to streams for the individual groups. |
| + /// |
| + /// Pausing or canceling an individual group stream has no effect other than |
| + /// on that stream. Events will be queued while the group stream |
| + /// is paused and until it is first listened to. |
|
floitsch
2017/05/02 12:23:53
Mention that this could lead to memory issues.
Al
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Added something about memory issues.
|
| + Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) => |
| + groupValuesBy<K, T>(key, (T x) => x); |
| + |
| + /// Groups values selected from events by a computed key. |
| + /// |
| + /// A key is extracted from incoming events, and a stream is created for each |
| + /// unique key (based on the `operator==` of the keys). |
| + /// A value is computed from the event as well, and emitted on the |
| + /// corresponding stream. |
| + /// |
| + /// The returned stream emits a [StreamGroup] object for each distinct key |
| + /// seen by the transformation, and the values associated with the key are |
| + /// output [StreamGroup.values] stream. |
| + /// |
| + /// An error on the source stream, or when calling the `key` functions, |
| + /// will emit the error on the returned stream. |
| + /// |
| + /// An error when calling the `value` function, but with a successful call |
| + /// of the `key` function, is reported on the stream for the corresponding key. |
|
floitsch
2017/05/02 12:23:53
long line.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
|
| + /// |
| + /// Canceling the subscription on the returned stream will stop processing |
| + /// and close the streams for all groups. |
| + /// |
| + /// Pausing the subscription on the returned stream will pause processing |
| + /// and no further events are added to streams for the individual groups. |
| + /// |
| + /// Pausing or canceling an individual group stream has no effect other than |
| + /// on that stream. Events will be queued while the group stream |
| + /// is paused and until it is first listened to. |
| + Stream<StreamGroup<K, V>> groupValuesBy<K, V>( |
|
floitsch
2017/05/02 12:23:53
I think it would be cleaner if `StreamGroup` had a
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That is probably more interesting. But then the ne
|
| + K key(T event), V value(T event)) { |
| + var controller; |
| + controller = new StreamController<Group<K, V>>( |
| + sync: true, |
| + onListen: () { |
| + var groupControllers = new HashMap(); |
| + void closeAll() { |
|
floitsch
2017/05/02 12:23:53
New line before nested function.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
|
| + for (var groupController in groupControllers.values) { |
| + groupController.close(); |
| + } |
| + } |
| + |
| + var subscription = this.listen( |
| + (data) { |
|
floitsch
2017/05/02 12:23:53
This looks like it wasn't dartfmt.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Looks deceive :)
|
| + K theKey; |
| + try { |
| + theKey = key(data); |
| + } catch (error, stackTrace) { |
| + controller.addError(error, stackTrace); |
| + return; |
| + } |
| + var groupController = groupControllers[theKey]; |
| + if (groupController == null) { |
| + groupController = new StreamController<T>(sync: true); |
| + groupControllers[theKey] = groupController; |
| + controller.add( |
| + new StreamGroup<K, T>(theKey, groupController.stream)); |
| + } |
| + V theValue; |
| + try { |
| + theValue = value(data); |
| + } catch (error, stackTrace) { |
| + groupController.addError(error, stackTrace); |
|
floitsch
2017/05/02 12:23:53
Since the group controller is sync, is there enoug
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That's expected behavior - adding errors to a paus
|
| + return; |
| + } |
| + groupController.add(theValue); |
| + }, |
| + onError: controller.addError, |
| + onDone: () { |
| + controller.close(); |
| + closeAll(); |
| + }); |
| + controller.onPause = subscription.pause; |
| + controller.onResume = subscription.resume; |
| + controller.onCancel = () { |
| + subscription.cancel(); |
| + closeAll(); |
| + }; |
| + }); |
| + return controller.stream; |
| + } |
| + |
| /** |
| * Creates a new stream with each data event of this stream asynchronously |
| * mapped to a new event. |
| @@ -1796,3 +1899,19 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
| _sink.close(); |
| } |
| } |
| + |
| +/// A group created by [Stream.groupBy] or [Stream.groupValuesBy]. |
| +/// |
| +/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key |
| +/// it encounters. |
| +/// This group contains the [key] itself, along with a stream of the [values] |
| +/// associated with that key. |
| +class StreamGroup<K, V> { |
|
floitsch
2017/05/02 12:23:53
This class could have:
StreamGroup<K, V2> mapValu
Lasse Reichstein Nielsen
2017/05/04 11:05:20
As said above, let's not do that (yet).
|
| + /// The key that identifiers the values emitted by [values]. |
| + final K key; |
| + |
| + /// The [values] that [GroupBy] have grouped by the common [key]. |
| + final Stream<V> values; |
| + |
| + StreamGroup(this.key, this.values); |
| +} |