OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 372 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
383 * will individually call [convert] on each data event. | 383 * will individually call [convert] on each data event. |
384 */ | 384 */ |
385 Stream<S> map<S>(S convert(T event)) { | 385 Stream<S> map<S>(S convert(T event)) { |
386 return new _MapStream<T, S>(this, convert); | 386 return new _MapStream<T, S>(this, convert); |
387 } | 387 } |
388 | 388 |
389 /// Groups events by a computed key. | 389 /// Groups events by a computed key. |
390 /// | 390 /// |
391 /// A key is extracted from incoming events. | 391 /// A key is extracted from incoming events. |
392 /// The first time a key is seen, a stream is created for it, and emitted | 392 /// The first time a key is seen, a stream is created for it, and emitted |
393 /// on the returned stream, along with the key, as a [StreamGroup] object. | 393 /// on the returned stream, along with the key, as a [GroupedEvents] object. |
394 /// Then the event is emitted on the stream ([StreamGroup.values]) | 394 /// Then the event is emitted on the stream ([GroupedEvents.values]) |
395 /// corresponding to the key. | 395 /// corresponding to the key. |
396 /// | 396 /// |
397 /// An error on the source stream, or when calling the `key` functions, | 397 /// An error on the source stream, or when calling the `key` functions, |
398 /// will emit the error on the returned stream. | 398 /// will emit the error on the returned stream. |
399 /// | 399 /// |
400 /// Canceling the subscription on the returned stream will stop processing | 400 /// Canceling the subscription on the returned stream will stop processing |
401 /// and close the streams for all groups. | 401 /// and close the streams for all groups. |
402 /// | 402 /// |
403 /// Pausing the subscription on the returned stream will pause processing | 403 /// Pausing the subscription on the returned stream will pause processing |
404 /// and no further events are added to streams for the individual groups. | 404 /// and no further events are added to streams for the individual groups. |
405 /// | 405 /// |
406 /// Pausing or canceling an individual group stream has no effect other than | 406 /// Pausing or canceling an individual group stream has no effect other than |
407 /// on that stream. Events will be queued while the group stream | 407 /// on that stream. Events will be queued while the group stream |
408 /// is paused and until it is first listened to. | 408 /// is paused and until it is first listened to. |
409 /// If the [StreamGroup.values] stream is never listened to, | 409 /// If the [GroupedEvents.values] stream is never listened to, |
410 /// it will enqueue all the events unnecessarily. | 410 /// it will enqueue all the events unnecessarily. |
411 Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) { | 411 Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) { |
412 var controller; | 412 var controller; |
413 controller = new StreamController<StreamGroup<K, T>>( | 413 controller = new StreamController<GroupedEvents<K, T>>( |
414 sync: true, | 414 sync: true, |
415 onListen: () { | 415 onListen: () { |
416 var groupControllers = new HashMap<K, StreamController<T>>(); | 416 var groupControllers = new HashMap<K, StreamController<T>>(); |
417 | 417 |
418 void closeAll() { | 418 void closeAll() { |
419 for (var groupController in groupControllers.values) { | 419 for (var groupController in groupControllers.values) { |
420 groupController.close(); | 420 groupController.close(); |
421 } | 421 } |
422 } | 422 } |
423 | 423 |
424 var subscription = this.listen( | 424 var subscription = this.listen( |
425 (data) { | 425 (data) { |
426 K theKey; | 426 K theKey; |
427 try { | 427 try { |
428 theKey = key(data); | 428 theKey = key(data); |
429 } catch (error, stackTrace) { | 429 } catch (error, stackTrace) { |
430 controller.addError(error, stackTrace); | 430 controller.addError(error, stackTrace); |
431 return; | 431 return; |
432 } | 432 } |
433 var groupController = groupControllers[theKey]; | 433 var groupController = groupControllers[theKey]; |
434 if (groupController == null) { | 434 if (groupController == null) { |
435 groupController = | 435 groupController = |
436 new StreamController<T>.broadcast(sync: true); | 436 new StreamController<T>.broadcast(sync: true); |
437 groupControllers[theKey] = groupController; | 437 groupControllers[theKey] = groupController; |
438 controller.add( | 438 controller.add( |
439 new StreamGroup<K, T>(theKey, groupController.stream)); | 439 new GroupedEvents<K, T>(theKey, groupController.stream)); |
440 } | 440 } |
441 groupController.add(data); | 441 groupController.add(data); |
442 }, | 442 }, |
443 onError: controller.addError, | 443 onError: controller.addError, |
444 onDone: () { | 444 onDone: () { |
445 controller.close(); | 445 controller.close(); |
446 closeAll(); | 446 closeAll(); |
447 }); | 447 }); |
448 controller.onPause = subscription.pause; | 448 controller.onPause = subscription.pause; |
449 controller.onResume = subscription.resume; | 449 controller.onResume = subscription.resume; |
(...skipping 1452 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1902 _sink.addError(error, stackTrace); | 1902 _sink.addError(error, stackTrace); |
1903 } | 1903 } |
1904 | 1904 |
1905 void close() { | 1905 void close() { |
1906 _sink.close(); | 1906 _sink.close(); |
1907 } | 1907 } |
1908 } | 1908 } |
1909 | 1909 |
1910 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. | 1910 /// A group created by [Stream.groupBy] or [Stream.groupByMapped]. |
1911 /// | 1911 /// |
1912 /// The stream created by `groupBy` emits a `StreamGroup` for each distinct key | 1912 /// The stream created by `groupBy` emits a `GroupedEvents` for each distinct ke
y |
1913 /// it encounters. | 1913 /// it encounters. |
1914 /// This group contains the [key] itself, along with a stream of the [values] | 1914 /// This group contains the [key] itself, along with a stream of the [values] |
1915 /// associated with that key. | 1915 /// associated with that key. |
1916 class StreamGroup<K, V> { | 1916 class GroupedEvents<K, V> { |
1917 /// The key that identifiers the values emitted by [values]. | 1917 /// The key that identifiers the values emitted by [values]. |
1918 final K key; | 1918 final K key; |
1919 | 1919 |
1920 /// The [values] that [GroupBy] have grouped by the common [key]. | 1920 /// The [values] that [GroupBy] have grouped by the common [key]. |
1921 final Stream<V> values; | 1921 final Stream<V> values; |
1922 | 1922 |
1923 factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._; | 1923 factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._; |
1924 | 1924 |
1925 // Don't expose a generative constructor. | 1925 // Don't expose a generative constructor. |
1926 // This class is not intended for subclassing, so we don't want to promise | 1926 // This class is not intended for subclassing, so we don't want to promise |
1927 // it. We can change that in the future. | 1927 // it. We can change that in the future. |
1928 StreamGroup._(this.key, this.values); | 1928 GroupedEvents._(this.key, this.values); |
1929 | 1929 |
1930 /// Tells [values] to discard values instead of retaining them. | 1930 /// Tells [values] to discard values instead of retaining them. |
1931 /// | 1931 /// |
1932 /// Must only be used instead of listening to the [values] stream. | 1932 /// Must only be used instead of listening to the [values] stream. |
1933 /// If the stream has been listened to, this call fails. | 1933 /// If the stream has been listened to, this call fails. |
1934 /// After calling this method, listening on the [values] stream fails. | 1934 /// After calling this method, listening on the [values] stream fails. |
1935 Future cancel() { | 1935 Future cancel() { |
1936 // If values has been listened to, | 1936 // If values has been listened to, |
1937 // this throws a StateError saying that stream has already been listened to, | 1937 // this throws a StateError saying that stream has already been listened to, |
1938 // which is a correct error message for this call too. | 1938 // which is a correct error message for this call too. |
1939 return values.listen(null).cancel(); | 1939 return values.listen(null).cancel(); |
1940 } | 1940 } |
1941 } | 1941 } |
OLD | NEW |