OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
379 * | 379 * |
380 * The returned stream is a broadcast stream if this stream is. | 380 * The returned stream is a broadcast stream if this stream is. |
381 * The [convert] function is called once per data event per listener. | 381 * The [convert] function is called once per data event per listener. |
382 * If a broadcast stream is listened to more than once, each subscription | 382 * If a broadcast stream is listened to more than once, each subscription |
383 * will individually call [convert] on each data event. | 383 * will individually call [convert] on each data event. |
384 */ | 384 */ |
385 Stream<S> map<S>(S convert(T event)) { | 385 Stream<S> map<S>(S convert(T event)) { |
386 return new _MapStream<T, S>(this, convert); | 386 return new _MapStream<T, S>(this, convert); |
387 } | 387 } |
388 | 388 |
389 /// Groups events by a computed key. | |
390 /// | |
391 /// A key is extracted from incoming events, and a stream is created for each | |
392 /// unique key (based on the `operator==` of the keys). | |
393 /// | |
394 /// The returned stream emits a [StreamGroup] object for each distinct key | |
395 /// seen by the transformation, and the events associated with the key are | |
396 /// output [StreamGroup.values] stream. | |
floitsch
2017/05/02 12:23:53
That sentence sounds wrong.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Rewritten.
| |
397 /// | |
398 /// An error on the source stream, or when calling the `key` functions, | |
399 /// will emit the error on the returned stream. | |
400 /// | |
401 /// Canceling the subscription on the returned stream will stop processing | |
402 /// and close the streams for all groups. | |
403 /// | |
404 /// Pausing the subscription on the returned stream will pause processing | |
405 /// and no further events are added to streams for the individual groups. | |
406 /// | |
407 /// Pausing or canceling an individual group stream has no effect other than | |
408 /// on that stream. Events will be queued while the group stream | |
409 /// is paused and until it is first listened to. | |
floitsch
2017/05/02 12:23:53
Mention that this could lead to memory issues.
Al
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Added something about memory issues.
| |
410 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) => | |
411 groupValuesBy<K, T>(key, (T x) => x); | |
412 | |
413 /// Groups values selected from events by a computed key. | |
414 /// | |
415 /// A key is extracted from incoming events, and a stream is created for each | |
416 /// unique key (based on the `operator==` of the keys). | |
417 /// A value is computed from the event as well, and emitted on the | |
418 /// corresponding stream. | |
419 /// | |
420 /// The returned stream emits a [StreamGroup] object for each distinct key | |
421 /// seen by the transformation, and the values associated with the key are | |
422 /// output [StreamGroup.values] stream. | |
423 /// | |
424 /// An error on the source stream, or when calling the `key` functions, | |
425 /// will emit the error on the returned stream. | |
426 /// | |
427 /// An error when calling the `value` function, but with a successful call | |
428 /// of the `key` function, is reported on the stream for the corresponding key . | |
floitsch
2017/05/02 12:23:53
long line.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
| |
429 /// | |
430 /// Canceling the subscription on the returned stream will stop processing | |
431 /// and close the streams for all groups. | |
432 /// | |
433 /// Pausing the subscription on the returned stream will pause processing | |
434 /// and no further events are added to streams for the individual groups. | |
435 /// | |
436 /// Pausing or canceling an individual group stream has no effect other than | |
437 /// on that stream. Events will be queued while the group stream | |
438 /// is paused and until it is first listened to. | |
439 Stream<StreamGroup<K, V>> groupValuesBy<K, V>( | |
floitsch
2017/05/02 12:23:53
I think it would be cleaner if `StreamGroup` had a
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That is probably more interesting. But then the ne
| |
440 K key(T event), V value(T event)) { | |
441 var controller; | |
442 controller = new StreamController<Group<K, V>>( | |
443 sync: true, | |
444 onListen: () { | |
445 var groupControllers = new HashMap(); | |
446 void closeAll() { | |
floitsch
2017/05/02 12:23:53
New line before nested function.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
| |
447 for (var groupController in groupControllers.values) { | |
448 groupController.close(); | |
449 } | |
450 } | |
451 | |
452 var subscription = this.listen( | |
453 (data) { | |
floitsch
2017/05/02 12:23:53
This looks like it wasn't dartfmt.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Looks deceive :)
| |
454 K theKey; | |
455 try { | |
456 theKey = key(data); | |
457 } catch (error, stackTrace) { | |
458 controller.addError(error, stackTrace); | |
459 return; | |
460 } | |
461 var groupController = groupControllers[theKey]; | |
462 if (groupController == null) { | |
463 groupController = new StreamController<T>(sync: true); | |
464 groupControllers[theKey] = groupController; | |
465 controller.add( | |
466 new StreamGroup<K, T>(theKey, groupController.stream)); | |
467 } | |
468 V theValue; | |
469 try { | |
470 theValue = value(data); | |
471 } catch (error, stackTrace) { | |
472 groupController.addError(error, stackTrace); | |
floitsch
2017/05/02 12:23:53
Since the group controller is sync, is there enoug
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That's expected behavior - adding errors to a paus
| |
473 return; | |
474 } | |
475 groupController.add(theValue); | |
476 }, | |
477 onError: controller.addError, | |
478 onDone: () { | |
479 controller.close(); | |
480 closeAll(); | |
481 }); | |
482 controller.onPause = subscription.pause; | |
483 controller.onResume = subscription.resume; | |
484 controller.onCancel = () { | |
485 subscription.cancel(); | |
486 closeAll(); | |
487 }; | |
488 }); | |
489 return controller.stream; | |
490 } | |
491 | |
389 /** | 492 /** |
390 * Creates a new stream with each data event of this stream asynchronously | 493 * Creates a new stream with each data event of this stream asynchronously |
391 * mapped to a new event. | 494 * mapped to a new event. |
392 * | 495 * |
393 * This acts like [map], except that [convert] may return a [Future], | 496 * This acts like [map], except that [convert] may return a [Future], |
394 * and in that case, the stream waits for that future to complete before | 497 * and in that case, the stream waits for that future to complete before |
395 * continuing with its result. | 498 * continuing with its result. |
396 * | 499 * |
397 * The returned stream is a broadcast stream if this stream is. | 500 * The returned stream is a broadcast stream if this stream is. |
398 */ | 501 */ |
(...skipping 1390 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1789 } | 1892 } |
1790 | 1893 |
1791 void addError(error, [StackTrace stackTrace]) { | 1894 void addError(error, [StackTrace stackTrace]) { |
1792 _sink.addError(error, stackTrace); | 1895 _sink.addError(error, stackTrace); |
1793 } | 1896 } |
1794 | 1897 |
1795 void close() { | 1898 void close() { |
1796 _sink.close(); | 1899 _sink.close(); |
1797 } | 1900 } |
1798 } | 1901 } |
1902 | |
1903 /// A group created by [Stream.groupBy] or [Stream.groupValuesBy]. | |
1904 /// | |
1905 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key | |
1906 /// it encounters. | |
1907 /// This group contains the [key] itself, along with a stream of the [values] | |
1908 /// associated with that key. | |
1909 class StreamGroup<K, V> { | |
floitsch
2017/05/02 12:23:53
This class could have:
StreamGroup<K, V2> mapValu
Lasse Reichstein Nielsen
2017/05/04 11:05:20
As said above, let's not do that (yet).
| |
1910 /// The key that identifiers the values emitted by [values]. | |
1911 final K key; | |
1912 | |
1913 /// The [values] that [GroupBy] have grouped by the common [key]. | |
1914 final Stream<V> values; | |
1915 | |
1916 StreamGroup(this.key, this.values); | |
1917 } | |
OLD | NEW |