Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 379 * | 379 * |
| 380 * The returned stream is a broadcast stream if this stream is. | 380 * The returned stream is a broadcast stream if this stream is. |
| 381 * The [convert] function is called once per data event per listener. | 381 * The [convert] function is called once per data event per listener. |
| 382 * If a broadcast stream is listened to more than once, each subscription | 382 * If a broadcast stream is listened to more than once, each subscription |
| 383 * will individually call [convert] on each data event. | 383 * will individually call [convert] on each data event. |
| 384 */ | 384 */ |
| 385 Stream<S> map<S>(S convert(T event)) { | 385 Stream<S> map<S>(S convert(T event)) { |
| 386 return new _MapStream<T, S>(this, convert); | 386 return new _MapStream<T, S>(this, convert); |
| 387 } | 387 } |
| 388 | 388 |
| 389 /// Groups events by a computed key. | |
| 390 /// | |
| 391 /// A key is extracted from incoming events, and a stream is created for each | |
| 392 /// unique key (based on the `operator==` of the keys). | |
| 393 /// | |
| 394 /// The returned stream emits a [StreamGroup] object for each distinct key | |
| 395 /// seen by the transformation, and the events associated with the key are | |
| 396 /// output [StreamGroup.values] stream. | |
|
floitsch
2017/05/02 12:23:53
That sentence sounds wrong.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Rewritten.
| |
| 397 /// | |
| 398 /// An error on the source stream, or when calling the `key` functions, | |
| 399 /// will emit the error on the returned stream. | |
| 400 /// | |
| 401 /// Canceling the subscription on the returned stream will stop processing | |
| 402 /// and close the streams for all groups. | |
| 403 /// | |
| 404 /// Pausing the subscription on the returned stream will pause processing | |
| 405 /// and no further events are added to streams for the individual groups. | |
| 406 /// | |
| 407 /// Pausing or canceling an individual group stream has no effect other than | |
| 408 /// on that stream. Events will be queued while the group stream | |
| 409 /// is paused and until it is first listened to. | |
|
floitsch
2017/05/02 12:23:53
Mention that this could lead to memory issues.
Al
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Added something about memory issues.
| |
| 410 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) => | |
| 411 groupValuesBy<K, T>(key, (T x) => x); | |
| 412 | |
| 413 /// Groups values selected from events by a computed key. | |
| 414 /// | |
| 415 /// A key is extracted from incoming events, and a stream is created for each | |
| 416 /// unique key (based on the `operator==` of the keys). | |
| 417 /// A value is computed from the event as well, and emitted on the | |
| 418 /// corresponding stream. | |
| 419 /// | |
| 420 /// The returned stream emits a [StreamGroup] object for each distinct key | |
| 421 /// seen by the transformation, and the values associated with the key are | |
| 422 /// output [StreamGroup.values] stream. | |
| 423 /// | |
| 424 /// An error on the source stream, or when calling the `key` functions, | |
| 425 /// will emit the error on the returned stream. | |
| 426 /// | |
| 427 /// An error when calling the `value` function, but with a successful call | |
| 428 /// of the `key` function, is reported on the stream for the corresponding key . | |
|
floitsch
2017/05/02 12:23:53
long line.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
| |
| 429 /// | |
| 430 /// Canceling the subscription on the returned stream will stop processing | |
| 431 /// and close the streams for all groups. | |
| 432 /// | |
| 433 /// Pausing the subscription on the returned stream will pause processing | |
| 434 /// and no further events are added to streams for the individual groups. | |
| 435 /// | |
| 436 /// Pausing or canceling an individual group stream has no effect other than | |
| 437 /// on that stream. Events will be queued while the group stream | |
| 438 /// is paused and until it is first listened to. | |
| 439 Stream<StreamGroup<K, V>> groupValuesBy<K, V>( | |
|
floitsch
2017/05/02 12:23:53
I think it would be cleaner if `StreamGroup` had a
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That is probably more interesting. But then the ne
| |
| 440 K key(T event), V value(T event)) { | |
| 441 var controller; | |
| 442 controller = new StreamController<Group<K, V>>( | |
| 443 sync: true, | |
| 444 onListen: () { | |
| 445 var groupControllers = new HashMap(); | |
| 446 void closeAll() { | |
|
floitsch
2017/05/02 12:23:53
New line before nested function.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Done.
| |
| 447 for (var groupController in groupControllers.values) { | |
| 448 groupController.close(); | |
| 449 } | |
| 450 } | |
| 451 | |
| 452 var subscription = this.listen( | |
| 453 (data) { | |
|
floitsch
2017/05/02 12:23:53
This looks like it wasn't dartfmt.
Lasse Reichstein Nielsen
2017/05/04 11:05:20
Looks deceive :)
| |
| 454 K theKey; | |
| 455 try { | |
| 456 theKey = key(data); | |
| 457 } catch (error, stackTrace) { | |
| 458 controller.addError(error, stackTrace); | |
| 459 return; | |
| 460 } | |
| 461 var groupController = groupControllers[theKey]; | |
| 462 if (groupController == null) { | |
| 463 groupController = new StreamController<T>(sync: true); | |
| 464 groupControllers[theKey] = groupController; | |
| 465 controller.add( | |
| 466 new StreamGroup<K, T>(theKey, groupController.stream)); | |
| 467 } | |
| 468 V theValue; | |
| 469 try { | |
| 470 theValue = value(data); | |
| 471 } catch (error, stackTrace) { | |
| 472 groupController.addError(error, stackTrace); | |
|
floitsch
2017/05/02 12:23:53
Since the group controller is sync, is there enoug
Lasse Reichstein Nielsen
2017/05/04 11:05:20
That's expected behavior - adding errors to a paus
| |
| 473 return; | |
| 474 } | |
| 475 groupController.add(theValue); | |
| 476 }, | |
| 477 onError: controller.addError, | |
| 478 onDone: () { | |
| 479 controller.close(); | |
| 480 closeAll(); | |
| 481 }); | |
| 482 controller.onPause = subscription.pause; | |
| 483 controller.onResume = subscription.resume; | |
| 484 controller.onCancel = () { | |
| 485 subscription.cancel(); | |
| 486 closeAll(); | |
| 487 }; | |
| 488 }); | |
| 489 return controller.stream; | |
| 490 } | |
| 491 | |
| 389 /** | 492 /** |
| 390 * Creates a new stream with each data event of this stream asynchronously | 493 * Creates a new stream with each data event of this stream asynchronously |
| 391 * mapped to a new event. | 494 * mapped to a new event. |
| 392 * | 495 * |
| 393 * This acts like [map], except that [convert] may return a [Future], | 496 * This acts like [map], except that [convert] may return a [Future], |
| 394 * and in that case, the stream waits for that future to complete before | 497 * and in that case, the stream waits for that future to complete before |
| 395 * continuing with its result. | 498 * continuing with its result. |
| 396 * | 499 * |
| 397 * The returned stream is a broadcast stream if this stream is. | 500 * The returned stream is a broadcast stream if this stream is. |
| 398 */ | 501 */ |
| (...skipping 1390 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1789 } | 1892 } |
| 1790 | 1893 |
| 1791 void addError(error, [StackTrace stackTrace]) { | 1894 void addError(error, [StackTrace stackTrace]) { |
| 1792 _sink.addError(error, stackTrace); | 1895 _sink.addError(error, stackTrace); |
| 1793 } | 1896 } |
| 1794 | 1897 |
| 1795 void close() { | 1898 void close() { |
| 1796 _sink.close(); | 1899 _sink.close(); |
| 1797 } | 1900 } |
| 1798 } | 1901 } |
| 1902 | |
| 1903 /// A group created by [Stream.groupBy] or [Stream.groupValuesBy]. | |
| 1904 /// | |
| 1905 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key | |
| 1906 /// it encounters. | |
| 1907 /// This group contains the [key] itself, along with a stream of the [values] | |
| 1908 /// associated with that key. | |
| 1909 class StreamGroup<K, V> { | |
|
floitsch
2017/05/02 12:23:53
This class could have:
StreamGroup<K, V2> mapValu
Lasse Reichstein Nielsen
2017/05/04 11:05:20
As said above, let's not do that (yet).
| |
| 1910 /// The key that identifiers the values emitted by [values]. | |
| 1911 final K key; | |
| 1912 | |
| 1913 /// The [values] that [GroupBy] have grouped by the common [key]. | |
| 1914 final Stream<V> values; | |
| 1915 | |
| 1916 StreamGroup(this.key, this.values); | |
| 1917 } | |
| OLD | NEW |