Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(606)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2850393003: Add groupBy to Stream. (Closed)
Patch Set: Add test, should add more. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after
379 * 379 *
380 * The returned stream is a broadcast stream if this stream is. 380 * The returned stream is a broadcast stream if this stream is.
381 * The [convert] function is called once per data event per listener. 381 * The [convert] function is called once per data event per listener.
382 * If a broadcast stream is listened to more than once, each subscription 382 * If a broadcast stream is listened to more than once, each subscription
383 * will individually call [convert] on each data event. 383 * will individually call [convert] on each data event.
384 */ 384 */
385 Stream<S> map<S>(S convert(T event)) { 385 Stream<S> map<S>(S convert(T event)) {
386 return new _MapStream<T, S>(this, convert); 386 return new _MapStream<T, S>(this, convert);
387 } 387 }
388 388
389 /// Groups events by a computed key.
390 ///
391 /// A key is extracted from incoming events, and a stream is created for each
392 /// unique key (based on the `operator==` of the keys).
393 ///
394 /// The returned stream emits a [StreamGroup] object for each distinct key
395 /// seen by the transformation, and the events associated with the key are
396 /// output [StreamGroup.values] stream.
floitsch 2017/05/02 12:23:53 That sentence sounds wrong.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Rewritten.
397 ///
398 /// An error on the source stream, or when calling the `key` functions,
399 /// will emit the error on the returned stream.
400 ///
401 /// Canceling the subscription on the returned stream will stop processing
402 /// and close the streams for all groups.
403 ///
404 /// Pausing the subscription on the returned stream will pause processing
405 /// and no further events are added to streams for the individual groups.
406 ///
407 /// Pausing or canceling an individual group stream has no effect other than
408 /// on that stream. Events will be queued while the group stream
409 /// is paused and until it is first listened to.
floitsch 2017/05/02 12:23:53 Mention that this could lead to memory issues. Al
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Added something about memory issues.
410 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) =>
411 groupValuesBy<K, T>(key, (T x) => x);
412
413 /// Groups values selected from events by a computed key.
414 ///
415 /// A key is extracted from incoming events, and a stream is created for each
416 /// unique key (based on the `operator==` of the keys).
417 /// A value is computed from the event as well, and emitted on the
418 /// corresponding stream.
419 ///
420 /// The returned stream emits a [StreamGroup] object for each distinct key
421 /// seen by the transformation, and the values associated with the key are
422 /// output [StreamGroup.values] stream.
423 ///
424 /// An error on the source stream, or when calling the `key` functions,
425 /// will emit the error on the returned stream.
426 ///
427 /// An error when calling the `value` function, but with a successful call
428 /// of the `key` function, is reported on the stream for the corresponding key .
floitsch 2017/05/02 12:23:53 long line.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Done.
429 ///
430 /// Canceling the subscription on the returned stream will stop processing
431 /// and close the streams for all groups.
432 ///
433 /// Pausing the subscription on the returned stream will pause processing
434 /// and no further events are added to streams for the individual groups.
435 ///
436 /// Pausing or canceling an individual group stream has no effect other than
437 /// on that stream. Events will be queued while the group stream
438 /// is paused and until it is first listened to.
439 Stream<StreamGroup<K, V>> groupValuesBy<K, V>(
floitsch 2017/05/02 12:23:53 I think it would be cleaner if `StreamGroup` had a
Lasse Reichstein Nielsen 2017/05/04 11:05:20 That is probably more interesting. But then the ne
440 K key(T event), V value(T event)) {
441 var controller;
442 controller = new StreamController<Group<K, V>>(
443 sync: true,
444 onListen: () {
445 var groupControllers = new HashMap();
446 void closeAll() {
floitsch 2017/05/02 12:23:53 New line before nested function.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Done.
447 for (var groupController in groupControllers.values) {
448 groupController.close();
449 }
450 }
451
452 var subscription = this.listen(
453 (data) {
floitsch 2017/05/02 12:23:53 This looks like it wasn't dartfmt.
Lasse Reichstein Nielsen 2017/05/04 11:05:20 Looks deceive :)
454 K theKey;
455 try {
456 theKey = key(data);
457 } catch (error, stackTrace) {
458 controller.addError(error, stackTrace);
459 return;
460 }
461 var groupController = groupControllers[theKey];
462 if (groupController == null) {
463 groupController = new StreamController<T>(sync: true);
464 groupControllers[theKey] = groupController;
465 controller.add(
466 new StreamGroup<K, T>(theKey, groupController.stream));
467 }
468 V theValue;
469 try {
470 theValue = value(data);
471 } catch (error, stackTrace) {
472 groupController.addError(error, stackTrace);
floitsch 2017/05/02 12:23:53 Since the group controller is sync, is there enoug
Lasse Reichstein Nielsen 2017/05/04 11:05:20 That's expected behavior - adding errors to a paus
473 return;
474 }
475 groupController.add(theValue);
476 },
477 onError: controller.addError,
478 onDone: () {
479 controller.close();
480 closeAll();
481 });
482 controller.onPause = subscription.pause;
483 controller.onResume = subscription.resume;
484 controller.onCancel = () {
485 subscription.cancel();
486 closeAll();
487 };
488 });
489 return controller.stream;
490 }
491
389 /** 492 /**
390 * Creates a new stream with each data event of this stream asynchronously 493 * Creates a new stream with each data event of this stream asynchronously
391 * mapped to a new event. 494 * mapped to a new event.
392 * 495 *
393 * This acts like [map], except that [convert] may return a [Future], 496 * This acts like [map], except that [convert] may return a [Future],
394 * and in that case, the stream waits for that future to complete before 497 * and in that case, the stream waits for that future to complete before
395 * continuing with its result. 498 * continuing with its result.
396 * 499 *
397 * The returned stream is a broadcast stream if this stream is. 500 * The returned stream is a broadcast stream if this stream is.
398 */ 501 */
(...skipping 1390 matching lines...) Expand 10 before | Expand all | Expand 10 after
1789 } 1892 }
1790 1893
1791 void addError(error, [StackTrace stackTrace]) { 1894 void addError(error, [StackTrace stackTrace]) {
1792 _sink.addError(error, stackTrace); 1895 _sink.addError(error, stackTrace);
1793 } 1896 }
1794 1897
1795 void close() { 1898 void close() {
1796 _sink.close(); 1899 _sink.close();
1797 } 1900 }
1798 } 1901 }
1902
1903 /// A group created by [Stream.groupBy] or [Stream.groupValuesBy].
1904 ///
1905 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key
1906 /// it encounters.
1907 /// This group contains the [key] itself, along with a stream of the [values]
1908 /// associated with that key.
1909 class StreamGroup<K, V> {
floitsch 2017/05/02 12:23:53 This class could have: StreamGroup<K, V2> mapValu
Lasse Reichstein Nielsen 2017/05/04 11:05:20 As said above, let's not do that (yet).
1910 /// The key that identifiers the values emitted by [values].
1911 final K key;
1912
1913 /// The [values] that [GroupBy] have grouped by the common [key].
1914 final Stream<V> values;
1915
1916 StreamGroup(this.key, this.values);
1917 }
OLDNEW
« no previous file with comments | « no previous file | tests/lib/async/stream_group_by_test.dart » ('j') | tests/lib/async/stream_group_by_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698