OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_group; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 | 6 |
9 /// A collection of streams whose events are unified and sent through a central | 7 /// A collection of streams whose events are unified and sent through a central |
10 /// stream. | 8 /// stream. |
11 /// | 9 /// |
12 /// Both errors and data events are forwarded through [stream]. The streams in | 10 /// Both errors and data events are forwarded through [stream]. The streams in |
13 /// the group won't be listened to until [stream] has a listener. **Note that | 11 /// the group won't be listened to until [stream] has a listener. **Note that |
14 /// this means that events emitted by broadcast streams will be dropped until | 12 /// this means that events emitted by broadcast streams will be dropped until |
15 /// [stream] has a listener.** | 13 /// [stream] has a listener.** |
16 /// | 14 /// |
17 /// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will b
e | 15 /// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will |
18 /// single-subscription. In this case, if [stream] is paused or canceled, all | 16 /// be single-subscription. In this case, if [stream] is paused or canceled, all |
19 /// streams in the group will likewise be paused or canceled, respectively. | 17 /// streams in the group will likewise be paused or canceled, respectively. |
20 /// | 18 /// |
21 /// If the `StreamGroup` is constructed using [new StreamGroup.broadcast], | 19 /// If the `StreamGroup` is constructed using [new StreamGroup.broadcast], |
22 /// [stream] will be a broadcast stream. In this case, the streams in the group | 20 /// [stream] will be a broadcast stream. In this case, the streams in the group |
23 /// will never be paused and single-subscription streams in the group will never | 21 /// will never be paused and single-subscription streams in the group will never |
24 /// be canceled. **Note that single-subscription streams in a broadcast group | 22 /// be canceled. **Note that single-subscription streams in a broadcast group |
25 /// may drop events if a listener is added and later removed.** Broadcast | 23 /// may drop events if a listener is added and later removed.** Broadcast |
26 /// streams in the group will be canceled once [stream] has no listeners, and | 24 /// streams in the group will be canceled once [stream] has no listeners, and |
27 /// will be listened to again once [stream] has listeners. | 25 /// will be listened to again once [stream] has listeners. |
28 /// | 26 /// |
(...skipping 19 matching lines...) Expand all Loading... |
48 /// If it's a broadcast group and it goes dormant again, broadcast stream | 46 /// If it's a broadcast group and it goes dormant again, broadcast stream |
49 /// subscriptions will be canceled and set to null again. Single-subscriber | 47 /// subscriptions will be canceled and set to null again. Single-subscriber |
50 /// stream subscriptions will be left intact, since they can't be | 48 /// stream subscriptions will be left intact, since they can't be |
51 /// re-subscribed. | 49 /// re-subscribed. |
52 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); | 50 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); |
53 | 51 |
54 /// Merges the events from [streams] into a single (single-subscriber) stream. | 52 /// Merges the events from [streams] into a single (single-subscriber) stream. |
55 /// | 53 /// |
56 /// This is equivalent to adding [streams] to a group, closing that group, and | 54 /// This is equivalent to adding [streams] to a group, closing that group, and |
57 /// returning its stream. | 55 /// returning its stream. |
58 static Stream merge(Iterable<Stream> streams) { | 56 static Stream<T> merge<T>(Iterable<Stream<T>> streams) { |
59 var group = new StreamGroup(); | 57 var group = new StreamGroup<T>(); |
60 streams.forEach(group.add); | 58 streams.forEach(group.add); |
61 group.close(); | 59 group.close(); |
62 return group.stream; | 60 return group.stream; |
63 } | 61 } |
64 | 62 |
65 /// Creates a new stream group where [stream] is single-subscriber. | 63 /// Creates a new stream group where [stream] is single-subscriber. |
66 StreamGroup() { | 64 StreamGroup() { |
67 _controller = new StreamController<T>( | 65 _controller = new StreamController<T>( |
68 onListen: _onListen, | 66 onListen: _onListen, |
69 onPause: _onPause, | 67 onPause: _onPause, |
70 onResume: _onResume, | 68 onResume: _onResume, |
71 onCancel: _onCancel, | 69 onCancel: _onCancel, |
72 sync: true); | 70 sync: true); |
73 } | 71 } |
74 | 72 |
75 /// Creates a new stream group where [stream] is a broadcast stream. | 73 /// Creates a new stream group where [stream] is a broadcast stream. |
76 StreamGroup.broadcast() { | 74 StreamGroup.broadcast() { |
77 _controller = new StreamController<T>.broadcast( | 75 _controller = new StreamController<T>.broadcast( |
78 onListen: _onListen, | 76 onListen: _onListen, onCancel: _onCancelBroadcast, sync: true); |
79 onCancel: _onCancelBroadcast, | |
80 sync: true); | |
81 } | 77 } |
82 | 78 |
83 /// Adds [stream] as a member of this group. | 79 /// Adds [stream] as a member of this group. |
84 /// | 80 /// |
85 /// Any events from [stream] will be emitted through [this.stream]. If this | 81 /// Any events from [stream] will be emitted through [this.stream]. If this |
86 /// group has a listener, [stream] will be listened to immediately; otherwise | 82 /// group has a listener, [stream] will be listened to immediately; otherwise |
87 /// it will only be listened to once this group gets a listener. | 83 /// it will only be listened to once this group gets a listener. |
88 /// | 84 /// |
89 /// If this is a single-subscription group and its subscription has been | 85 /// If this is a single-subscription group and its subscription has been |
90 /// canceled, [stream] will be canceled as soon as its added. If this returns | 86 /// canceled, [stream] will be canceled as soon as its added. If this returns |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
187 // it has no listeners. | 183 // it has no listeners. |
188 if (!stream.isBroadcast) return; | 184 if (!stream.isBroadcast) return; |
189 subscription.cancel(); | 185 subscription.cancel(); |
190 _subscriptions[stream] = null; | 186 _subscriptions[stream] = null; |
191 }); | 187 }); |
192 } | 188 } |
193 | 189 |
194 /// Starts actively forwarding events from [stream] to [_controller]. | 190 /// Starts actively forwarding events from [stream] to [_controller]. |
195 /// | 191 /// |
196 /// This will pause the resulting subscription if [this] is paused. | 192 /// This will pause the resulting subscription if [this] is paused. |
197 StreamSubscription _listenToStream(Stream stream) { | 193 StreamSubscription<T> _listenToStream(Stream<T> stream) { |
198 var subscription = stream.listen( | 194 var subscription = stream.listen(_controller.add, |
199 _controller.add, | 195 onError: _controller.addError, onDone: () => remove(stream)); |
200 onError: _controller.addError, | |
201 onDone: () => remove(stream)); | |
202 if (_state == _StreamGroupState.paused) subscription.pause(); | 196 if (_state == _StreamGroupState.paused) subscription.pause(); |
203 return subscription; | 197 return subscription; |
204 } | 198 } |
205 | 199 |
206 /// Closes the group, indicating that no more streams will be added. | 200 /// Closes the group, indicating that no more streams will be added. |
207 /// | 201 /// |
208 /// If there are no streams in the group, [stream] is closed immediately. | 202 /// If there are no streams in the group, [stream] is closed immediately. |
209 /// Otherwise, [stream] will close once all streams in the group close. | 203 /// Otherwise, [stream] will close once all streams in the group close. |
210 /// | 204 /// |
211 /// Returns a [Future] that completes once [stream] has actually been closed. | 205 /// Returns a [Future] that completes once [stream] has actually been closed. |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
250 | 244 |
251 /// The name of the state. | 245 /// The name of the state. |
252 /// | 246 /// |
253 /// Used for debugging. | 247 /// Used for debugging. |
254 final String name; | 248 final String name; |
255 | 249 |
256 const _StreamGroupState(this.name); | 250 const _StreamGroupState(this.name); |
257 | 251 |
258 String toString() => name; | 252 String toString() => name; |
259 } | 253 } |
OLD | NEW |