Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..70fd7a035f097e0f10174858d793ef2a2f4fe3e7 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -386,6 +386,109 @@ abstract class Stream<T> { |
return new _MapStream<T, S>(this, convert); |
} |
+ /// Groups events by a computed key. |
+ /// |
+ /// A key is extracted from incoming events, and a stream is created for each |
+ /// unique key (based on the `operator==` of the keys). |
+ /// |
+ /// The returned stream emits a [StreamGroup] object for each distinct key |
+ /// seen by the transformation, and the events associated with the key are |
+ /// output [StreamGroup.values] stream. |
floitsch
2017/05/02 12:23:53
That sentence sounds wrong.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Rewritten.
|
+ /// |
+ /// An error on the source stream, or when calling the `key` functions, |
+ /// will emit the error on the returned stream. |
+ /// |
+ /// Canceling the subscription on the returned stream will stop processing |
+ /// and close the streams for all groups. |
+ /// |
+ /// Pausing the subscription on the returned stream will pause processing |
+ /// and no further events are added to streams for the individual groups. |
+ /// |
+ /// Pausing or canceling an individual group stream has no effect other than |
+ /// on that stream. Events will be queued while the group stream |
+ /// is paused and until it is first listened to. |
floitsch
2017/05/02 12:23:53
Mention that this could lead to memory issues.
Al
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Added something about memory issues.
|
+ Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) => |
+ groupValuesBy<K, T>(key, (T x) => x); |
+ |
+ /// Groups values selected from events by a computed key. |
+ /// |
+ /// A key is extracted from incoming events, and a stream is created for each |
+ /// unique key (based on the `operator==` of the keys). |
+ /// A value is computed from the event as well, and emitted on the |
+ /// corresponding stream. |
+ /// |
+ /// The returned stream emits a [StreamGroup] object for each distinct key |
+ /// seen by the transformation, and the values associated with the key are |
+ /// output [StreamGroup.values] stream. |
+ /// |
+ /// An error on the source stream, or when calling the `key` functions, |
+ /// will emit the error on the returned stream. |
+ /// |
+ /// An error when calling the `value` function, but with a successful call |
+ /// of the `key` function, is reported on the stream for the corresponding key. |
floitsch
2017/05/02 12:23:53
long line.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
|
+ /// |
+ /// Canceling the subscription on the returned stream will stop processing |
+ /// and close the streams for all groups. |
+ /// |
+ /// Pausing the subscription on the returned stream will pause processing |
+ /// and no further events are added to streams for the individual groups. |
+ /// |
+ /// Pausing or canceling an individual group stream has no effect other than |
+ /// on that stream. Events will be queued while the group stream |
+ /// is paused and until it is first listened to. |
+ Stream<StreamGroup<K, V>> groupValuesBy<K, V>( |
floitsch
2017/05/02 12:23:53
I think it would be cleaner if `StreamGroup` had a
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That is probably more interesting. But then the ne
|
+ K key(T event), V value(T event)) { |
+ var controller; |
+ controller = new StreamController<Group<K, V>>( |
+ sync: true, |
+ onListen: () { |
+ var groupControllers = new HashMap(); |
+ void closeAll() { |
floitsch
2017/05/02 12:23:53
New line before nested function.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
|
+ for (var groupController in groupControllers.values) { |
+ groupController.close(); |
+ } |
+ } |
+ |
+ var subscription = this.listen( |
+ (data) { |
floitsch
2017/05/02 12:23:53
This looks like it wasn't dartfmt.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Looks deceive :)
|
+ K theKey; |
+ try { |
+ theKey = key(data); |
+ } catch (error, stackTrace) { |
+ controller.addError(error, stackTrace); |
+ return; |
+ } |
+ var groupController = groupControllers[theKey]; |
+ if (groupController == null) { |
+ groupController = new StreamController<T>(sync: true); |
+ groupControllers[theKey] = groupController; |
+ controller.add( |
+ new StreamGroup<K, T>(theKey, groupController.stream)); |
+ } |
+ V theValue; |
+ try { |
+ theValue = value(data); |
+ } catch (error, stackTrace) { |
+ groupController.addError(error, stackTrace); |
floitsch
2017/05/02 12:23:53
Since the group controller is sync, is there enoug
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That's expected behavior - adding errors to a paus
|
+ return; |
+ } |
+ groupController.add(theValue); |
+ }, |
+ onError: controller.addError, |
+ onDone: () { |
+ controller.close(); |
+ closeAll(); |
+ }); |
+ controller.onPause = subscription.pause; |
+ controller.onResume = subscription.resume; |
+ controller.onCancel = () { |
+ subscription.cancel(); |
+ closeAll(); |
+ }; |
+ }); |
+ return controller.stream; |
+ } |
+ |
/** |
* Creates a new stream with each data event of this stream asynchronously |
* mapped to a new event. |
@@ -1796,3 +1899,19 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
_sink.close(); |
} |
} |
+ |
+/// A group created by [Stream.groupBy] or [Stream.groupValuesBy]. |
+/// |
+/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key |
+/// it encounters. |
+/// This group contains the [key] itself, along with a stream of the [values] |
+/// associated with that key. |
+class StreamGroup<K, V> { |
floitsch
2017/05/02 12:23:53
This class could have:
StreamGroup<K, V2> mapValu
Lasse Reichstein Nielsen
2017/05/04 11:05:20
As said above, let's not do that (yet).
|
+ /// The key that identifiers the values emitted by [values]. |
+ final K key; |
+ |
+ /// The [values] that [GroupBy] have grouped by the common [key]. |
+ final Stream<V> values; |
+ |
+ StreamGroup(this.key, this.values); |
+} |