Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(202)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2921663002: Revert "Add groupBy to Stream." (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 377 matching lines...) Expand 10 before | Expand all | Expand 10 after
388 * 388 *
389 * The returned stream is a broadcast stream if this stream is. 389 * The returned stream is a broadcast stream if this stream is.
390 * The [convert] function is called once per data event per listener. 390 * The [convert] function is called once per data event per listener.
391 * If a broadcast stream is listened to more than once, each subscription 391 * If a broadcast stream is listened to more than once, each subscription
392 * will individually call [convert] on each data event. 392 * will individually call [convert] on each data event.
393 */ 393 */
394 Stream<S> map<S>(S convert(T event)) { 394 Stream<S> map<S>(S convert(T event)) {
395 return new _MapStream<T, S>(this, convert); 395 return new _MapStream<T, S>(this, convert);
396 } 396 }
397 397
398 /// Groups events by a computed key.
399 ///
400 /// A key is extracted from incoming events.
401 /// The first time a key is seen, a stream is created for it, and emitted
402 /// on the returned stream, along with the key, as a [GroupedEvents] object.
403 /// Then the event is emitted on the stream ([GroupedEvents.values])
404 /// corresponding to the key.
405 ///
406 /// An error on the source stream, or when calling the `key` functions,
407 /// will emit the error on the returned stream.
408 ///
409 /// Canceling the subscription on the returned stream will stop processing
410 /// and close the streams for all groups.
411 ///
412 /// Pausing the subscription on the returned stream will pause processing
413 /// and no further events are added to streams for the individual groups.
414 ///
415 /// Pausing or canceling an individual group stream has no effect other than
416 /// on that stream. Events will be queued while the group stream
417 /// is paused and until it is first listened to.
418 /// If the [GroupedEvents.values] stream is never listened to,
419 /// it will enqueue all the events unnecessarily.
420 Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) {
421 var controller;
422 controller = new StreamController<GroupedEvents<K, T>>(
423 sync: true,
424 onListen: () {
425 var groupControllers = new HashMap<K, StreamController<T>>();
426
427 void closeAll() {
428 for (var groupController in groupControllers.values) {
429 groupController.close();
430 }
431 }
432
433 var subscription = this.listen(
434 (data) {
435 K theKey;
436 try {
437 theKey = key(data);
438 } catch (error, stackTrace) {
439 controller.addError(error, stackTrace);
440 return;
441 }
442 var groupController = groupControllers[theKey];
443 if (groupController == null) {
444 groupController =
445 new StreamController<T>.broadcast(sync: true);
446 groupControllers[theKey] = groupController;
447 controller.add(
448 new GroupedEvents<K, T>(theKey, groupController.stream));
449 }
450 groupController.add(data);
451 },
452 onError: controller.addError,
453 onDone: () {
454 controller.close();
455 closeAll();
456 });
457 controller.onPause = subscription.pause;
458 controller.onResume = subscription.resume;
459 controller.onCancel = () {
460 subscription.cancel();
461 // Don't fire sync events in response to a callback.
462 scheduleMicrotask(closeAll);
463 };
464 });
465 return controller.stream;
466 }
467
468 /** 398 /**
469 * Creates a new stream with each data event of this stream asynchronously 399 * Creates a new stream with each data event of this stream asynchronously
470 * mapped to a new event. 400 * mapped to a new event.
471 * 401 *
472 * This acts like [map], except that [convert] may return a [Future], 402 * This acts like [map], except that [convert] may return a [Future],
473 * and in that case, the stream waits for that future to complete before 403 * and in that case, the stream waits for that future to complete before
474 * continuing with its result. 404 * continuing with its result.
475 * 405 *
476 * The returned stream is a broadcast stream if this stream is. 406 * The returned stream is a broadcast stream if this stream is.
477 */ 407 */
(...skipping 1582 matching lines...) Expand 10 before | Expand all | Expand 10 after
2060 } 1990 }
2061 1991
2062 void addError(error, [StackTrace stackTrace]) { 1992 void addError(error, [StackTrace stackTrace]) {
2063 _sink.addError(error, stackTrace); 1993 _sink.addError(error, stackTrace);
2064 } 1994 }
2065 1995
2066 void close() { 1996 void close() {
2067 _sink.close(); 1997 _sink.close();
2068 } 1998 }
2069 } 1999 }
2070
2071 /// A group created by [Stream.groupBy].
2072 ///
2073 /// The stream created by `groupBy` emits a `GroupedEvents`
2074 /// for each distinct key it encounters.
2075 /// This group contains the [key] itself, along with a stream of the [values]
2076 /// associated with that key.
2077 class GroupedEvents<K, V> {
2078 /// The key that identifiers the values emitted by [values].
2079 final K key;
2080
2081 /// The [values] that [GroupBy] have grouped by the common [key].
2082 final Stream<V> values;
2083
2084 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._;
2085
2086 // Don't expose a generative constructor.
2087 // This class is not intended for subclassing, so we don't want to promise
2088 // it. We can change that in the future.
2089 GroupedEvents._(this.key, this.values);
2090
2091 /// Tells [values] to discard values instead of retaining them.
2092 ///
2093 /// Must only be used instead of listening to the [values] stream.
2094 /// If the stream has been listened to, this call fails.
2095 /// After calling this method, listening on the [values] stream fails.
2096 Future cancel() {
2097 // If values has been listened to,
2098 // this throws a StateError saying that stream has already been listened to,
2099 // which is a correct error message for this call too.
2100 return values.listen(null).cancel();
2101 }
2102 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698