| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 379 * | 379 * |
| 380 * The returned stream is a broadcast stream if this stream is. | 380 * The returned stream is a broadcast stream if this stream is. |
| 381 * The [convert] function is called once per data event per listener. | 381 * The [convert] function is called once per data event per listener. |
| 382 * If a broadcast stream is listened to more than once, each subscription | 382 * If a broadcast stream is listened to more than once, each subscription |
| 383 * will individually call [convert] on each data event. | 383 * will individually call [convert] on each data event. |
| 384 */ | 384 */ |
| 385 Stream<S> map<S>(S convert(T event)) { | 385 Stream<S> map<S>(S convert(T event)) { |
| 386 return new _MapStream<T, S>(this, convert); | 386 return new _MapStream<T, S>(this, convert); |
| 387 } | 387 } |
| 388 | 388 |
| 389 /// Groups events by a computed key. |
| 390 /// |
| 391 /// A key is extracted from incoming events. |
| 392 /// The first time a key is seen, a stream is created for it, and emitted |
| 393 /// on the returned stream, along with the key, as a [StreamGroup] object. |
| 394 /// Then the event is emitted on the stream ([StreamGroup.values]) |
| 395 /// corresponding to the key. |
| 396 /// |
| 397 /// An error on the source stream, or when calling the `key` functions, |
| 398 /// will emit the error on the returned stream. |
| 399 /// |
| 400 /// Canceling the subscription on the returned stream will stop processing |
| 401 /// and close the streams for all groups. |
| 402 /// |
| 403 /// Pausing the subscription on the returned stream will pause processing |
| 404 /// and no further events are added to streams for the individual groups. |
| 405 /// |
| 406 /// Pausing or canceling an individual group stream has no effect other than |
| 407 /// on that stream. Events will be queued while the group stream |
| 408 /// is paused and until it is first listened to. |
| 409 /// If the [StreamGroup.values] stream is never listened to, |
| 410 /// it will enqueue all the events unnecessarily. |
| 411 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { |
| 412 var controller; |
| 413 controller = new StreamController<StreamGroup<K, T>>( |
| 414 sync: true, |
| 415 onListen: () { |
| 416 var groupControllers = new HashMap<K, StreamController<T>>(); |
| 417 |
| 418 void closeAll() { |
| 419 for (var groupController in groupControllers.values) { |
| 420 groupController.close(); |
| 421 } |
| 422 } |
| 423 |
| 424 var subscription = this.listen( |
| 425 (data) { |
| 426 K theKey; |
| 427 try { |
| 428 theKey = key(data); |
| 429 } catch (error, stackTrace) { |
| 430 controller.addError(error, stackTrace); |
| 431 return; |
| 432 } |
| 433 var groupController = groupControllers[theKey]; |
| 434 if (groupController == null) { |
| 435 groupController = |
| 436 new StreamController<T>.broadcast(sync: true); |
| 437 groupControllers[theKey] = groupController; |
| 438 controller.add( |
| 439 new StreamGroup<K, T>(theKey, groupController.stream)); |
| 440 } |
| 441 groupController.add(data); |
| 442 }, |
| 443 onError: controller.addError, |
| 444 onDone: () { |
| 445 controller.close(); |
| 446 closeAll(); |
| 447 }); |
| 448 controller.onPause = subscription.pause; |
| 449 controller.onResume = subscription.resume; |
| 450 controller.onCancel = () { |
| 451 subscription.cancel(); |
| 452 // Don't fire sync events in response to a callback. |
| 453 scheduleMicrotask(closeAll); |
| 454 }; |
| 455 }); |
| 456 return controller.stream; |
| 457 } |
| 458 |
| 389 /** | 459 /** |
| 390 * Creates a new stream with each data event of this stream asynchronously | 460 * Creates a new stream with each data event of this stream asynchronously |
| 391 * mapped to a new event. | 461 * mapped to a new event. |
| 392 * | 462 * |
| 393 * This acts like [map], except that [convert] may return a [Future], | 463 * This acts like [map], except that [convert] may return a [Future], |
| 394 * and in that case, the stream waits for that future to complete before | 464 * and in that case, the stream waits for that future to complete before |
| 395 * continuing with its result. | 465 * continuing with its result. |
| 396 * | 466 * |
| 397 * The returned stream is a broadcast stream if this stream is. | 467 * The returned stream is a broadcast stream if this stream is. |
| 398 */ | 468 */ |
| (...skipping 1390 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1789 } | 1859 } |
| 1790 | 1860 |
| 1791 void addError(error, [StackTrace stackTrace]) { | 1861 void addError(error, [StackTrace stackTrace]) { |
| 1792 _sink.addError(error, stackTrace); | 1862 _sink.addError(error, stackTrace); |
| 1793 } | 1863 } |
| 1794 | 1864 |
| 1795 void close() { | 1865 void close() { |
| 1796 _sink.close(); | 1866 _sink.close(); |
| 1797 } | 1867 } |
| 1798 } | 1868 } |
| 1869 |
| 1870 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. |
| 1871 /// |
| 1872 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key |
| 1873 /// it encounters. |
| 1874 /// This group contains the [key] itself, along with a stream of the [values] |
| 1875 /// associated with that key. |
| 1876 class StreamGroup<K, V> { |
| 1877 /// The key that identifiers the values emitted by [values]. |
| 1878 final K key; |
| 1879 |
| 1880 /// The [values] that [GroupBy] have grouped by the common [key]. |
| 1881 final Stream<V> values; |
| 1882 |
| 1883 factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._; |
| 1884 |
| 1885 // Don't expose a generative constructor. |
| 1886 // This class is not intended for subclassing, so we don't want to promise |
| 1887 // it. We can change that in the future. |
| 1888 StreamGroup._(this.key, this.values); |
| 1889 |
| 1890 /// Tells [values] to discard values instead of retaining them. |
| 1891 /// |
| 1892 /// Must only be used instead of listening to the [values] stream. |
| 1893 /// If the stream has been listened to, this call fails. |
| 1894 /// After calling this method, listening on the [values] stream fails. |
| 1895 Future cancel() { |
| 1896 // If values has been listened to, |
| 1897 // this throws a StateError saying that stream has already been listened to, |
| 1898 // which is a correct error message for this call too. |
| 1899 return values.listen(null).cancel(); |
| 1900 } |
| 1901 } |
| OLD | NEW |