Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(237)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2850393003: Add groupBy to Stream. (Closed)
Patch Set: Fix typos and warnings Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after
379 * 379 *
380 * The returned stream is a broadcast stream if this stream is. 380 * The returned stream is a broadcast stream if this stream is.
381 * The [convert] function is called once per data event per listener. 381 * The [convert] function is called once per data event per listener.
382 * If a broadcast stream is listened to more than once, each subscription 382 * If a broadcast stream is listened to more than once, each subscription
383 * will individually call [convert] on each data event. 383 * will individually call [convert] on each data event.
384 */ 384 */
385 Stream<S> map<S>(S convert(T event)) { 385 Stream<S> map<S>(S convert(T event)) {
386 return new _MapStream<T, S>(this, convert); 386 return new _MapStream<T, S>(this, convert);
387 } 387 }
388 388
389 /// Groups events by a computed key.
390 ///
391 /// A key is extracted from incoming events.
392 /// The first time a key is seen, a stream is created for it, and emitted
393 /// on the returned stream, along with the key, as a [StreamGroup] object.
394 /// Then the event is emitted on the stream ([StreamGroup.values])
395 /// corresponding to the key.
396 ///
397 /// An error on the source stream, or when calling the `key` functions,
398 /// will emit the error on the returned stream.
399 ///
400 /// Canceling the subscription on the returned stream will stop processing
401 /// and close the streams for all groups.
402 ///
403 /// Pausing the subscription on the returned stream will pause processing
404 /// and no further events are added to streams for the individual groups.
405 ///
406 /// Pausing or canceling an individual group stream has no effect other than
407 /// on that stream. Events will be queued while the group stream
408 /// is paused and until it is first listened to.
409 /// If the [StreamGroup.values] stream is never listened to,
410 /// it will enqueue all the events unnecessarily.
411 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) {
412 var controller;
413 controller = new StreamController<StreamGroup<K, T>>(
414 sync: true,
415 onListen: () {
416 var groupControllers = new HashMap<K, StreamController<T>>();
417
418 void closeAll() {
419 for (var groupController in groupControllers.values) {
420 groupController.close();
421 }
422 }
423
424 var subscription = this.listen(
425 (data) {
426 K theKey;
427 try {
428 theKey = key(data);
429 } catch (error, stackTrace) {
430 controller.addError(error, stackTrace);
431 return;
432 }
433 var groupController = groupControllers[theKey];
434 if (groupController == null) {
435 groupController =
436 new StreamController<T>.broadcast(sync: true);
437 groupControllers[theKey] = groupController;
438 controller.add(
439 new StreamGroup<K, T>(theKey, groupController.stream));
440 }
441 groupController.add(data);
442 },
443 onError: controller.addError,
444 onDone: () {
445 controller.close();
446 closeAll();
447 });
448 controller.onPause = subscription.pause;
449 controller.onResume = subscription.resume;
450 controller.onCancel = () {
451 subscription.cancel();
452 // Don't fire sync events in response to a callback.
453 scheduleMicrotask(closeAll);
454 };
455 });
456 return controller.stream;
457 }
458
389 /** 459 /**
390 * Creates a new stream with each data event of this stream asynchronously 460 * Creates a new stream with each data event of this stream asynchronously
391 * mapped to a new event. 461 * mapped to a new event.
392 * 462 *
393 * This acts like [map], except that [convert] may return a [Future], 463 * This acts like [map], except that [convert] may return a [Future],
394 * and in that case, the stream waits for that future to complete before 464 * and in that case, the stream waits for that future to complete before
395 * continuing with its result. 465 * continuing with its result.
396 * 466 *
397 * The returned stream is a broadcast stream if this stream is. 467 * The returned stream is a broadcast stream if this stream is.
398 */ 468 */
(...skipping 1390 matching lines...) Expand 10 before | Expand all | Expand 10 after
1789 } 1859 }
1790 1860
1791 void addError(error, [StackTrace stackTrace]) { 1861 void addError(error, [StackTrace stackTrace]) {
1792 _sink.addError(error, stackTrace); 1862 _sink.addError(error, stackTrace);
1793 } 1863 }
1794 1864
1795 void close() { 1865 void close() {
1796 _sink.close(); 1866 _sink.close();
1797 } 1867 }
1798 } 1868 }
1869
1870 /// A group created by [Stream.groupBy] or [Stream.groupByMapped].
1871 ///
1872 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key
1873 /// it encounters.
1874 /// This group contains the [key] itself, along with a stream of the [values]
1875 /// associated with that key.
1876 class StreamGroup<K, V> {
1877 /// The key that identifiers the values emitted by [values].
1878 final K key;
1879
1880 /// The [values] that [GroupBy] have grouped by the common [key].
1881 final Stream<V> values;
1882
1883 factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._;
1884
1885 // Don't expose a generative constructor.
1886 // This class is not intended for subclassing, so we don't want to promise
1887 // it. We can change that in the future.
1888 StreamGroup._(this.key, this.values);
1889
1890 /// Tells [values] to discard values instead of retaining them.
1891 ///
1892 /// Must only be used instead of listening to the [values] stream.
1893 /// If the stream has been listened to, this call fails.
1894 /// After calling this method, listening on the [values] stream fails.
1895 Future cancel() {
1896 // If values has been listened to,
1897 // this throws a StateError saying that stream has already been listened to,
1898 // which is a correct error message for this call too.
1899 return values.listen(null).cancel();
1900 }
1901 }
OLDNEW
« no previous file with comments | « CHANGELOG.md ('k') | tests/lib/async/stream_group_by_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698