Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(107)

Unified Diff: sdk/lib/async/stream.dart

Issue 2921663002: Revert "Add groupBy to Stream." (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index bf7840912d8846ff2cf2b2a7ad3d12d7b3bef68f..5de26b579e0abb372c8f763f59744ee4f783daab 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -395,76 +395,6 @@ abstract class Stream<T> {
return new _MapStream<T, S>(this, convert);
}
- /// Groups events by a computed key.
- ///
- /// A key is extracted from incoming events.
- /// The first time a key is seen, a stream is created for it, and emitted
- /// on the returned stream, along with the key, as a [GroupedEvents] object.
- /// Then the event is emitted on the stream ([GroupedEvents.values])
- /// corresponding to the key.
- ///
- /// An error on the source stream, or when calling the `key` functions,
- /// will emit the error on the returned stream.
- ///
- /// Canceling the subscription on the returned stream will stop processing
- /// and close the streams for all groups.
- ///
- /// Pausing the subscription on the returned stream will pause processing
- /// and no further events are added to streams for the individual groups.
- ///
- /// Pausing or canceling an individual group stream has no effect other than
- /// on that stream. Events will be queued while the group stream
- /// is paused and until it is first listened to.
- /// If the [GroupedEvents.values] stream is never listened to,
- /// it will enqueue all the events unnecessarily.
- Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) {
- var controller;
- controller = new StreamController<GroupedEvents<K, T>>(
- sync: true,
- onListen: () {
- var groupControllers = new HashMap<K, StreamController<T>>();
-
- void closeAll() {
- for (var groupController in groupControllers.values) {
- groupController.close();
- }
- }
-
- var subscription = this.listen(
- (data) {
- K theKey;
- try {
- theKey = key(data);
- } catch (error, stackTrace) {
- controller.addError(error, stackTrace);
- return;
- }
- var groupController = groupControllers[theKey];
- if (groupController == null) {
- groupController =
- new StreamController<T>.broadcast(sync: true);
- groupControllers[theKey] = groupController;
- controller.add(
- new GroupedEvents<K, T>(theKey, groupController.stream));
- }
- groupController.add(data);
- },
- onError: controller.addError,
- onDone: () {
- controller.close();
- closeAll();
- });
- controller.onPause = subscription.pause;
- controller.onResume = subscription.resume;
- controller.onCancel = () {
- subscription.cancel();
- // Don't fire sync events in response to a callback.
- scheduleMicrotask(closeAll);
- };
- });
- return controller.stream;
- }
-
/**
* Creates a new stream with each data event of this stream asynchronously
* mapped to a new event.
@@ -2067,36 +1997,3 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
_sink.close();
}
}
-
-/// A group created by [Stream.groupBy].
-///
-/// The stream created by `groupBy` emits a `GroupedEvents`
-/// for each distinct key it encounters.
-/// This group contains the [key] itself, along with a stream of the [values]
-/// associated with that key.
-class GroupedEvents<K, V> {
- /// The key that identifiers the values emitted by [values].
- final K key;
-
- /// The [values] that [GroupBy] have grouped by the common [key].
- final Stream<V> values;
-
- factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._;
-
- // Don't expose a generative constructor.
- // This class is not intended for subclassing, so we don't want to promise
- // it. We can change that in the future.
- GroupedEvents._(this.key, this.values);
-
- /// Tells [values] to discard values instead of retaining them.
- ///
- /// Must only be used instead of listening to the [values] stream.
- /// If the stream has been listened to, this call fails.
- /// After calling this method, listening on the [values] stream fails.
- Future cancel() {
- // If values has been listened to,
- // this throws a StateError saying that stream has already been listened to,
- // which is a correct error message for this call too.
- return values.listen(null).cancel();
- }
-}
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698