| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import 'dart:async'; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 /// A collection of streams whose events are unified and sent through a central | 7 /// A collection of streams whose events are unified and sent through a central |
| 8 /// stream. | 8 /// stream. |
| 9 /// | 9 /// |
| 10 /// Both errors and data events are forwarded through [stream]. The streams in | 10 /// Both errors and data events are forwarded through [stream]. The streams in |
| 11 /// the group won't be listened to until [stream] has a listener. **Note that | 11 /// the group won't be listened to until [stream] has a listener. **Note that |
| 12 /// this means that events emitted by broadcast streams will be dropped until | 12 /// this means that events emitted by broadcast streams will be dropped until |
| 13 /// [stream] has a listener.** | 13 /// [stream] has a listener.** |
| 14 /// | 14 /// |
| 15 /// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will b
e | 15 /// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will |
| 16 /// single-subscription. In this case, if [stream] is paused or canceled, all | 16 /// be single-subscription. In this case, if [stream] is paused or canceled, all |
| 17 /// streams in the group will likewise be paused or canceled, respectively. | 17 /// streams in the group will likewise be paused or canceled, respectively. |
| 18 /// | 18 /// |
| 19 /// If the `StreamGroup` is constructed using [new StreamGroup.broadcast], | 19 /// If the `StreamGroup` is constructed using [new StreamGroup.broadcast], |
| 20 /// [stream] will be a broadcast stream. In this case, the streams in the group | 20 /// [stream] will be a broadcast stream. In this case, the streams in the group |
| 21 /// will never be paused and single-subscription streams in the group will never | 21 /// will never be paused and single-subscription streams in the group will never |
| 22 /// be canceled. **Note that single-subscription streams in a broadcast group | 22 /// be canceled. **Note that single-subscription streams in a broadcast group |
| 23 /// may drop events if a listener is added and later removed.** Broadcast | 23 /// may drop events if a listener is added and later removed.** Broadcast |
| 24 /// streams in the group will be canceled once [stream] has no listeners, and | 24 /// streams in the group will be canceled once [stream] has no listeners, and |
| 25 /// will be listened to again once [stream] has listeners. | 25 /// will be listened to again once [stream] has listeners. |
| 26 /// | 26 /// |
| (...skipping 19 matching lines...) Expand all Loading... |
| 46 /// If it's a broadcast group and it goes dormant again, broadcast stream | 46 /// If it's a broadcast group and it goes dormant again, broadcast stream |
| 47 /// subscriptions will be canceled and set to null again. Single-subscriber | 47 /// subscriptions will be canceled and set to null again. Single-subscriber |
| 48 /// stream subscriptions will be left intact, since they can't be | 48 /// stream subscriptions will be left intact, since they can't be |
| 49 /// re-subscribed. | 49 /// re-subscribed. |
| 50 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); | 50 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); |
| 51 | 51 |
| 52 /// Merges the events from [streams] into a single (single-subscriber) stream. | 52 /// Merges the events from [streams] into a single (single-subscriber) stream. |
| 53 /// | 53 /// |
| 54 /// This is equivalent to adding [streams] to a group, closing that group, and | 54 /// This is equivalent to adding [streams] to a group, closing that group, and |
| 55 /// returning its stream. | 55 /// returning its stream. |
| 56 static Stream merge(Iterable<Stream> streams) { | 56 static Stream/*<T>*/ merge/*<T>*/(Iterable<Stream/*<T>*/> streams) { |
| 57 var group = new StreamGroup(); | 57 var group = new StreamGroup/*<T>*/(); |
| 58 streams.forEach(group.add); | 58 streams.forEach(group.add); |
| 59 group.close(); | 59 group.close(); |
| 60 return group.stream; | 60 return group.stream; |
| 61 } | 61 } |
| 62 | 62 |
| 63 /// Creates a new stream group where [stream] is single-subscriber. | 63 /// Creates a new stream group where [stream] is single-subscriber. |
| 64 StreamGroup() { | 64 StreamGroup() { |
| 65 _controller = new StreamController<T>( | 65 _controller = new StreamController<T>( |
| 66 onListen: _onListen, | 66 onListen: _onListen, |
| 67 onPause: _onPause, | 67 onPause: _onPause, |
| (...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 185 // it has no listeners. | 185 // it has no listeners. |
| 186 if (!stream.isBroadcast) return; | 186 if (!stream.isBroadcast) return; |
| 187 subscription.cancel(); | 187 subscription.cancel(); |
| 188 _subscriptions[stream] = null; | 188 _subscriptions[stream] = null; |
| 189 }); | 189 }); |
| 190 } | 190 } |
| 191 | 191 |
| 192 /// Starts actively forwarding events from [stream] to [_controller]. | 192 /// Starts actively forwarding events from [stream] to [_controller]. |
| 193 /// | 193 /// |
| 194 /// This will pause the resulting subscription if [this] is paused. | 194 /// This will pause the resulting subscription if [this] is paused. |
| 195 StreamSubscription _listenToStream(Stream stream) { | 195 StreamSubscription<T> _listenToStream(Stream<T> stream) { |
| 196 var subscription = stream.listen( | 196 var subscription = stream.listen( |
| 197 _controller.add, | 197 _controller.add, |
| 198 onError: _controller.addError, | 198 onError: _controller.addError, |
| 199 onDone: () => remove(stream)); | 199 onDone: () => remove(stream)); |
| 200 if (_state == _StreamGroupState.paused) subscription.pause(); | 200 if (_state == _StreamGroupState.paused) subscription.pause(); |
| 201 return subscription; | 201 return subscription; |
| 202 } | 202 } |
| 203 | 203 |
| 204 /// Closes the group, indicating that no more streams will be added. | 204 /// Closes the group, indicating that no more streams will be added. |
| 205 /// | 205 /// |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 248 | 248 |
| 249 /// The name of the state. | 249 /// The name of the state. |
| 250 /// | 250 /// |
| 251 /// Used for debugging. | 251 /// Used for debugging. |
| 252 final String name; | 252 final String name; |
| 253 | 253 |
| 254 const _StreamGroupState(this.name); | 254 const _StreamGroupState(this.name); |
| 255 | 255 |
| 256 String toString() => name; | 256 String toString() => name; |
| 257 } | 257 } |
| OLD | NEW |