OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.stream_group; | |
6 | |
7 import 'dart:async'; | |
8 | |
9 /// A collection of streams whose events are unified and sent through a central | |
10 /// stream. | |
11 /// | |
12 /// Both errors and data events are forwarded through [stream]. The streams in | |
13 /// the group won't be listened to until [stream] has a listener. **Note that | |
14 /// this means that events emitted by broadcast streams will be dropped until | |
15 /// [stream] has a listener.** | |
16 /// | |
17 /// If the `StreamGroup` is construced using [new StreamGroup], [stream] will be | |
18 /// single-subscription. In this case, if [stream] is paused or canceled, all | |
19 /// streams in the group will likewise be paused or canceled, respectively. | |
20 /// | |
21 /// If the `StreamGroup` is construced using [new StreamGroup.broadcast], | |
22 /// [stream] will be a broadcast stream. In this case, the streams in the group | |
23 /// will never be paused and single-subscription streams in the group will never | |
24 /// be canceled. **Note that single-subscription streams in a broadcast group | |
25 /// may drop events if a listener is added and later removed.** Broadcast | |
Lasse Reichstein Nielsen
2015/06/19 12:45:13
-> may drop events that arrive while [stream] has
nweiz
2015/06/19 20:02:03
I think that's less accurate, since events *won't*
| |
26 /// streams in the group will be canceled once [stream] has no listeners, and | |
27 /// will be listened to again once [stream] has listeners. | |
28 /// | |
29 /// [stream] won't close until [close] is called on the group *and* every stream | |
30 /// in the group closes. | |
31 class StreamGroup<T> implements Sink<Stream<T>> { | |
32 /// The stream through which all events from streams in the group are emitted. | |
33 Stream<T> get stream => _controller.stream; | |
34 StreamController<T> _controller; | |
35 | |
36 /// Whether the group is closed, meaning that no more streams may be added. | |
37 var _closed = false; | |
38 | |
39 /// The current state of the group. | |
40 /// | |
41 /// See [_StreamGroupState] for detailed descriptions of each state. | |
42 var _state = _StreamGroupState.dormant; | |
43 | |
44 /// Streams that have been added to the group, and their subscriptions if they | |
45 /// have been subscribed to. | |
46 /// | |
47 /// The subscriptions will be null until the group has a listener registered. | |
48 /// If it's a broadcast group and it goes dormant again, broadcast stream | |
49 /// subscriptions will be canceled and set to null again. Single-subscriber | |
50 /// stream subscriptions will be left intact, since they can't be | |
51 /// re-subscribed. | |
52 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); | |
53 | |
54 /// Merges the events from [streams] into a single (single-subscriber) stream. | |
55 /// | |
56 /// This is equivalent to adding [streams] to a group, closing that group, and | |
57 /// returning its stream. | |
58 static Stream merge(Iterable<Stream> streams) { | |
59 var group = new StreamGroup(); | |
60 streams.forEach(group.add); | |
61 group.close(); | |
62 return group.stream; | |
63 } | |
64 | |
65 /// Creates a new stream group where [stream] is single-subscriber. | |
66 StreamGroup() { | |
67 _controller = new StreamController<T>( | |
68 onListen: _onListen, | |
69 onPause: _onPause, | |
70 onResume: _onResume, | |
71 onCancel: _onCancel, | |
72 sync: true); | |
73 } | |
74 | |
75 /// Creates a new stream group where [stream] is a broadcast stream. | |
76 StreamGroup.broadcast() { | |
77 _controller = new StreamController<T>.broadcast( | |
78 onListen: _onListen, | |
79 onCancel: _onCancelBroadcast, | |
80 sync: true); | |
81 } | |
82 | |
83 /// Adds [stream] as a member of this group. | |
84 /// | |
85 /// Any events from [stream] will be emitted through [this.stream]. If this | |
86 /// group has a listener, [stream] will be listened to immediately; otherwise | |
87 /// it will only be listened to once this group gets a listener. | |
88 /// | |
89 /// If this is a single-subscription group and its subscription has been | |
90 /// canceled, [stream] will be canceled as soon as its added. If this returns | |
91 /// a [Future], it will be returned from [add]. Otherwise, [add] returns | |
92 /// `null`. | |
93 /// | |
94 /// Throws a [StateError] if this group is closed. | |
95 Future add(Stream<T> stream) { | |
96 if (_closed) { | |
97 throw new StateError("Can't add a Stream to a closed StreamGroup."); | |
98 } | |
99 | |
100 if (_state == _StreamGroupState.dormant) { | |
Lasse Reichstein Nielsen
2015/06/19 12:45:13
So, if this is a broadcast streamgroup with no lis
| |
101 _subscriptions.putIfAbsent(stream, () => null); | |
102 } else if (_state == _StreamGroupState.canceled) { | |
103 // Listen to the stream and cancel it immediately so that no one else can | |
104 // listen, for consistency. If the stream has an onCancel listener this | |
105 // will also fire that, which may help it clean up resources. | |
106 return stream.listen(null).cancel(); | |
107 } else { | |
108 _subscriptions.putIfAbsent(stream, () => _listenToStream(stream)); | |
109 } | |
110 | |
111 return null; | |
112 } | |
113 | |
114 /// Removes [stream] as a member of this group. | |
115 /// | |
116 /// No further events from [stream] will be emitted through this group. If | |
117 /// [stream] has been listened to, its subscription will be canceled. | |
118 /// | |
119 /// If [stream] has been listened to, this *synchronously* cancels its | |
120 /// subscription. This means that any events from [stream] that haven't yet | |
121 /// been emitted through this group will not be. | |
122 /// | |
123 /// If [stream]'s subscription is canceled, this returns | |
124 /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`. | |
125 Future remove(Stream<T> stream) { | |
126 var subscription = _subscriptions.remove(stream); | |
127 var future = subscription == null ? null : subscription.cancel(); | |
128 if (_closed && _subscriptions.isEmpty) _controller.close(); | |
129 return future; | |
130 } | |
131 | |
132 /// A callback called when [stream] is listened to. | |
133 /// | |
134 /// This is called for both single-subscription and broadcast groups. | |
135 void _onListen() { | |
136 _state = _StreamGroupState.listening; | |
137 for (var stream in _subscriptions.keys.toList()) { | |
Lasse Reichstein Nielsen
2015/06/19 12:45:13
You don't need to do toList (or avoid forEach).
Up
nweiz
2015/06/19 20:02:03
Done.
| |
138 // If this is a broadcast group and this isn't the first time it's been | |
139 // listened to, there may still be some subscriptions to | |
140 // single-subscription streams. | |
141 if (_subscriptions[stream] != null) continue; | |
142 _subscriptions[stream] = _listenToStream(stream); | |
143 } | |
144 } | |
145 | |
146 /// A callback called when [stream] is paused. | |
147 void _onPause() { | |
148 _state = _StreamGroupState.paused; | |
149 for (var subscription in _subscriptions.values) { | |
150 subscription.pause(); | |
151 } | |
152 } | |
153 | |
154 /// A callback called when [stream] is resumed. | |
155 void _onResume() { | |
156 _state = _StreamGroupState.listening; | |
157 for (var subscription in _subscriptions.values) { | |
158 subscription.resume(); | |
159 } | |
160 } | |
161 | |
162 /// A callback called when [stream] is canceled. | |
163 /// | |
164 /// This is only called for single-subscription groups. | |
165 Future _onCancel() { | |
166 _state = _StreamGroupState.canceled; | |
167 | |
168 var futures = _subscriptions.values | |
169 .map((subscription) => subscription.cancel()) | |
170 .where((future) => future != null) | |
171 .toList(); | |
172 | |
173 _subscriptions.clear(); | |
174 return futures.isEmpty ? null : Future.wait(futures); | |
175 } | |
176 | |
177 /// A callback called when [stream]'s last listener is canceled. | |
178 /// | |
179 /// This is only called for broadcast groups. | |
180 void _onCancelBroadcast() { | |
181 _state = _StreamGroupState.dormant; | |
182 | |
183 for (var stream in _subscriptions.keys.toList()) { | |
Lasse Reichstein Nielsen
2015/06/19 12:45:13
Ditto here, no need for toList.
nweiz
2015/06/19 20:02:03
Done.
| |
184 // Cancel the broadcast streams, since we can re-listen to those later, | |
185 // but allow the single-subscription streams to keep firing. Their events | |
186 // will still be added to [_controller], but then they'll be dropped since | |
187 // it has no listeners. | |
188 if (!stream.isBroadcast) continue; | |
189 _subscriptions[stream].cancel(); | |
190 _subscriptions[stream] = null; | |
191 } | |
192 } | |
193 | |
194 /// Starts actively forwarding events from [stream] to [_controller]. | |
195 /// | |
196 /// This will pause the resulting subscription if [this] is paused. | |
197 StreamSubscription _listenToStream(Stream stream) { | |
198 var subscription = stream.listen( | |
199 _controller.add, | |
200 onError: _controller.addError, | |
201 onDone: () => remove(stream)); | |
202 if (_state == _StreamGroupState.paused) subscription.pause(); | |
203 return subscription; | |
204 } | |
205 | |
206 /// Closes the group, indicating that no more streams will be added. | |
207 /// | |
208 /// If there are no streams in the group, [stream] is closed immediately. | |
209 /// Otherwise, [stream] will close once all streams in the group close. | |
210 /// | |
211 /// Returns a [Future] that completes once [stream] has actually been closed. | |
212 Future close() { | |
213 if (_closed) return _controller.done; | |
214 | |
215 _closed = true; | |
216 if (_subscriptions.isEmpty) _controller.close(); | |
217 | |
218 return _controller.done; | |
219 } | |
220 } | |
221 | |
222 /// An enum of possible states of a [StreamGroup]. | |
223 class _StreamGroupState { | |
224 /// The group has no listeners. | |
225 /// | |
226 /// New streams added to the group will be listened once the group has a | |
227 /// listener. | |
228 static const dormant = const _StreamGroupState("dormant"); | |
229 | |
230 /// The group has one or more listeners and is actively firing events. | |
231 /// | |
232 /// New streams added to the group will be immediately listeners. | |
233 static const listening = const _StreamGroupState("listening"); | |
234 | |
235 /// The group is paused and no more events will be fired until it resumes. | |
236 /// | |
237 /// New streams added to the group will be listened to, but then paused. They | |
238 /// will be resumed once the group itself is resumed. | |
239 /// | |
240 /// This state is only used by single-subscriber groups. | |
241 static const paused = const _StreamGroupState("paused"); | |
242 | |
243 /// The group is canceled and no more events will be fired ever. | |
244 /// | |
245 /// New streams added to the group will be listened to, canceled, and | |
246 /// discarded. | |
247 /// | |
248 /// This state is only used by single-subscriber groups. | |
249 static const canceled = const _StreamGroupState("canceled"); | |
250 | |
251 /// The name of the state. | |
252 /// | |
253 /// Used for debugging. | |
254 final String name; | |
255 | |
256 const _StreamGroupState(this.name); | |
257 | |
258 String toString() => name; | |
259 } | |
OLD | NEW |