Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index bf7840912d8846ff2cf2b2a7ad3d12d7b3bef68f..5de26b579e0abb372c8f763f59744ee4f783daab 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -395,76 +395,6 @@ abstract class Stream<T> { |
return new _MapStream<T, S>(this, convert); |
} |
- /// Groups events by a computed key. |
- /// |
- /// A key is extracted from incoming events. |
- /// The first time a key is seen, a stream is created for it, and emitted |
- /// on the returned stream, along with the key, as a [GroupedEvents] object. |
- /// Then the event is emitted on the stream ([GroupedEvents.values]) |
- /// corresponding to the key. |
- /// |
- /// An error on the source stream, or when calling the `key` functions, |
- /// will emit the error on the returned stream. |
- /// |
- /// Canceling the subscription on the returned stream will stop processing |
- /// and close the streams for all groups. |
- /// |
- /// Pausing the subscription on the returned stream will pause processing |
- /// and no further events are added to streams for the individual groups. |
- /// |
- /// Pausing or canceling an individual group stream has no effect other than |
- /// on that stream. Events will be queued while the group stream |
- /// is paused and until it is first listened to. |
- /// If the [GroupedEvents.values] stream is never listened to, |
- /// it will enqueue all the events unnecessarily. |
- Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) { |
- var controller; |
- controller = new StreamController<GroupedEvents<K, T>>( |
- sync: true, |
- onListen: () { |
- var groupControllers = new HashMap<K, StreamController<T>>(); |
- |
- void closeAll() { |
- for (var groupController in groupControllers.values) { |
- groupController.close(); |
- } |
- } |
- |
- var subscription = this.listen( |
- (data) { |
- K theKey; |
- try { |
- theKey = key(data); |
- } catch (error, stackTrace) { |
- controller.addError(error, stackTrace); |
- return; |
- } |
- var groupController = groupControllers[theKey]; |
- if (groupController == null) { |
- groupController = |
- new StreamController<T>.broadcast(sync: true); |
- groupControllers[theKey] = groupController; |
- controller.add( |
- new GroupedEvents<K, T>(theKey, groupController.stream)); |
- } |
- groupController.add(data); |
- }, |
- onError: controller.addError, |
- onDone: () { |
- controller.close(); |
- closeAll(); |
- }); |
- controller.onPause = subscription.pause; |
- controller.onResume = subscription.resume; |
- controller.onCancel = () { |
- subscription.cancel(); |
- // Don't fire sync events in response to a callback. |
- scheduleMicrotask(closeAll); |
- }; |
- }); |
- return controller.stream; |
- } |
- |
/** |
* Creates a new stream with each data event of this stream asynchronously |
* mapped to a new event. |
@@ -2067,36 +1997,3 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
_sink.close(); |
} |
} |
- |
-/// A group created by [Stream.groupBy]. |
-/// |
-/// The stream created by `groupBy` emits a `GroupedEvents` |
-/// for each distinct key it encounters. |
-/// This group contains the [key] itself, along with a stream of the [values] |
-/// associated with that key. |
-class GroupedEvents<K, V> { |
- /// The key that identifiers the values emitted by [values]. |
- final K key; |
- |
- /// The [values] that [GroupBy] have grouped by the common [key]. |
- final Stream<V> values; |
- |
- factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; |
- |
- // Don't expose a generative constructor. |
- // This class is not intended for subclassing, so we don't want to promise |
- // it. We can change that in the future. |
- GroupedEvents._(this.key, this.values); |
- |
- /// Tells [values] to discard values instead of retaining them. |
- /// |
- /// Must only be used instead of listening to the [values] stream. |
- /// If the stream has been listened to, this call fails. |
- /// After calling this method, listening on the [values] stream fails. |
- Future cancel() { |
- // If values has been listened to, |
- // this throws a StateError saying that stream has already been listened to, |
- // which is a correct error message for this call too. |
- return values.listen(null).cancel(); |
- } |
-} |