OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
379 * | 379 * |
380 * The returned stream is a broadcast stream if this stream is. | 380 * The returned stream is a broadcast stream if this stream is. |
381 * The [convert] function is called once per data event per listener. | 381 * The [convert] function is called once per data event per listener. |
382 * If a broadcast stream is listened to more than once, each subscription | 382 * If a broadcast stream is listened to more than once, each subscription |
383 * will individually call [convert] on each data event. | 383 * will individually call [convert] on each data event. |
384 */ | 384 */ |
385 Stream<S> map<S>(S convert(T event)) { | 385 Stream<S> map<S>(S convert(T event)) { |
386 return new _MapStream<T, S>(this, convert); | 386 return new _MapStream<T, S>(this, convert); |
387 } | 387 } |
388 | 388 |
389 /// Groups events by a computed key. | |
390 /// | |
391 /// A key is extracted from incoming events. | |
392 /// The first time a key is seen, a stream is created for it, and emitted | |
393 /// on the returned stream, along with the key, as a [StreamGroup] object. | |
394 /// Then the event is emitted on the stream ([StreamGroup.values]) | |
395 /// corresponding to the key. | |
396 /// | |
397 /// An error on the source stream, or when calling the `key` functions, | |
398 /// will emit the error on the returned stream. | |
399 /// | |
400 /// Canceling the subscription on the returned stream will stop processing | |
401 /// and close the streams for all groups. | |
402 /// | |
403 /// Pausing the subscription on the returned stream will pause processing | |
404 /// and no further events are added to streams for the individual groups. | |
405 /// | |
406 /// Pausing or canceling an individual group stream has no effect other than | |
407 /// on that stream. Events will be queued while the group stream | |
408 /// is paused and until it is first listened to. | |
409 /// If the [StreamGroup.values] stream is never listened to, | |
410 /// it will enqueue all the events unnecessarily. | |
floitsch
2017/05/04 13:32:09
One should therefore not ignore streams of keys th
| |
411 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { | |
412 var controller; | |
413 controller = new StreamController<Group<K, V>>( | |
414 sync: true, | |
415 onListen: () { | |
416 var groupControllers = new HashMap<K, StreamController<T>>(); | |
417 | |
418 void closeAll() { | |
419 for (var groupController in groupControllers.values) { | |
420 groupController.close(); | |
421 } | |
422 } | |
423 | |
424 var subscription = this.listen( | |
425 (data) { | |
426 K theKey; | |
427 try { | |
428 theKey = key(data); | |
429 } catch (error, stackTrace) { | |
430 controller.addError(error, stackTrace); | |
431 return; | |
432 } | |
433 var groupController = groupControllers[theKey]; | |
434 if (groupController == null) { | |
435 groupController = | |
436 new StreamController<T>.broadcast(sync: true); | |
437 groupControllers[theKey] = groupController; | |
438 controller.add( | |
439 new StreamGroup<K, T>(theKey, groupController.stream)); | |
440 } | |
441 groupController.add(data); | |
442 }, | |
443 onError: controller.addError, | |
444 onDone: () { | |
445 controller.close(); | |
446 closeAll(); | |
447 }); | |
448 controller.onPause = subscription.pause; | |
449 controller.onResume = subscription.resume; | |
450 controller.onCancel = () { | |
451 subscription.cancel(); | |
452 // Don't fire sync events in response to a callback. | |
453 scheduleMicrotask(closeAll); | |
454 }; | |
455 }); | |
456 return controller.stream; | |
457 } | |
458 | |
389 /** | 459 /** |
390 * Creates a new stream with each data event of this stream asynchronously | 460 * Creates a new stream with each data event of this stream asynchronously |
391 * mapped to a new event. | 461 * mapped to a new event. |
392 * | 462 * |
393 * This acts like [map], except that [convert] may return a [Future], | 463 * This acts like [map], except that [convert] may return a [Future], |
394 * and in that case, the stream waits for that future to complete before | 464 * and in that case, the stream waits for that future to complete before |
395 * continuing with its result. | 465 * continuing with its result. |
396 * | 466 * |
397 * The returned stream is a broadcast stream if this stream is. | 467 * The returned stream is a broadcast stream if this stream is. |
398 */ | 468 */ |
(...skipping 1390 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1789 } | 1859 } |
1790 | 1860 |
1791 void addError(error, [StackTrace stackTrace]) { | 1861 void addError(error, [StackTrace stackTrace]) { |
1792 _sink.addError(error, stackTrace); | 1862 _sink.addError(error, stackTrace); |
1793 } | 1863 } |
1794 | 1864 |
1795 void close() { | 1865 void close() { |
1796 _sink.close(); | 1866 _sink.close(); |
1797 } | 1867 } |
1798 } | 1868 } |
1869 | |
1870 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. | |
1871 /// | |
1872 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key | |
1873 /// it encounters. | |
1874 /// This group contains the [key] itself, along with a stream of the [values] | |
1875 /// associated with that key. | |
1876 class StreamGroup<K, V> { | |
1877 /// The key that identifiers the values emitted by [values]. | |
1878 final K key; | |
1879 | |
1880 /// The [values] that [GroupBy] have grouped by the common [key]. | |
1881 final Stream<V> values; | |
1882 | |
1883 StreamGroup(this.key, this.values); | |
1884 | |
1885 /// Tells [values] to discard values instead of retaining them. | |
1886 /// | |
1887 /// Must only be used instead of listening to the [values] stream. | |
1888 /// If the stream has been listened to, this call fails. | |
1889 /// After calling this method, listening on the [values] stream fails. | |
1890 Future cancel() { | |
1891 // If values has been listened to, | |
1892 // this throws a StateError saying that stream has already been listened to, | |
1893 // which is a correct error message for this call too. | |
1894 return values.listen(null).cancel(); | |
1895 } | |
1896 } | |
OLD | NEW |