Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(109)

Side by Side Diff: lib/src/stream_group.dart

Issue 1178793006: Add a StreamGroup class for merging streams. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_group;
6
7 import 'dart:async';
8
9 /// A collection of streams whose events are unified and sent through a central
10 /// stream.
11 ///
12 /// Both errors and data events are forwarded through [stream]. The streams in
13 /// the group won't be listened to until [stream] has a listener. **Note that
14 /// this means that events emitted by broadcast streams will be dropped until
15 /// [stream] has a listener.**
16 ///
17 /// [stream] will be a single-subscription stream for groups constructed with
Lasse Reichstein Nielsen 2015/06/18 10:29:31 I find "groups" confusing - using "a group" or "a
nweiz 2015/06/19 00:44:16 Reworded.
18 /// [new StreamGroup]. In this case, if [stream] is paused or canceled, all
19 /// streams in the group will likewise be paused or canceled, respectively.
20 ///
21 /// [stream] will be a broadcast stream for groups constructed with [new
22 /// StreamGroup.broadcast]. In this case, the streams in the group will never be
23 /// paused and single-subscription streams in the group will never be canceled.
24 /// **Note that single-subscription streams in a broadcast group may drop events
25 /// if a listener is added and later removed.** Broadcast streams in the group
26 /// will be canceled once [stream] has no listeners, and will be listened to
27 /// again once [stream] has listeners.
28 ///
29 /// [stream] won't close until [close] is called on the group *and* every stream
30 /// in the group closes.
31 class StreamGroup<T> implements Sink<Stream<T>> {
32 /// The stream through which all events from streams in the group are emitted.
33 Stream<T> get stream => _controller.stream;
34 StreamController<T> _controller;
35
36 /// Whether the group is closed, meaning that no more streams may be added.
37 var _closed = false;
38
39 /// The current state of the group.
40 ///
41 /// See [_StreamGroupState] for detailed descriptions of each state.
42 var _state = _StreamGroupState.dormant;
43
44 /// The streams that have been added to the group, but not yet listened to.
45 ///
46 /// This will be empty unless the group is dormant (that is, [listen] has not
47 /// yet been called). Once the group is listening, streams and their
48 /// subscriptions will be directly added to [_subscriptions] instead.
49 final _streams = new List<Stream<T>>();
50
51 /// Subscriptions to the streams that make up the group.
52 ///
53 /// The keys are the streams that are listened on, and the values are the
54 /// subscriptions to those streams. This will be empty as long as the group is
55 /// dormant.
Lasse Reichstein Nielsen 2015/06/18 10:29:31 It seems that it will still contain single-sub gro
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Perhaps state that the map is used to look up the
nweiz 2015/06/19 00:44:17 That's true, I'll state that explicitly.
56 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>();
57
58 /// Merges the events from [streams] into a single (single-subscriber) stream.
59 ///
60 /// This is equivalent to adding [streams] to a group, closing that group, and
61 /// returning its stream.
62 static Stream merge(Iterable<Stream> streams) {
63 var group = new StreamGroup();
64 streams.forEach(group.add);
65 group.close();
66 return group.stream;
67 }
68
69 /// Creates a new stream group where [stream] is single-subscriber.
70 StreamGroup() {
71 _controller = new StreamController<T>(
72 onListen: _onListen,
73 onPause: _onPause,
74 onResume: _onResume,
75 onCancel: _onCancel,
76 sync: true);
77 }
78
79 /// Creates a new stream group where [stream] is a broadcast stream.
80 StreamGroup.broadcast() {
81 _controller = new StreamController<T>.broadcast(
82 onListen: _onListen,
83 onCancel: _onCancelBroadcast,
84 sync: true);
85 }
86
87 /// Adds [stream] as a member of this group.
88 ///
89 /// Any events from [stream] will be emitted through [this.stream]. If this
90 /// group has a listener, [stream] will be listened to immediately; otherwise
91 /// it will only be listened to once this group gets a listener.
92 ///
93 /// If this is a single-subscription group and its subscription has been
94 /// canceled, [stream] will be canceled as soon as its added. If this returns
95 /// a Future, it will be returned from [add]. Otherwise, [add] returns `null`.
Lasse Reichstein Nielsen 2015/06/18 10:29:32 [Future]
nweiz 2015/06/19 00:44:16 Done.
96 ///
97 /// Throws a [StateError] if this group is closed.
98 Future add(Stream<T> stream) {
99 if (_closed) {
100 throw new StateError("Can't add a Stream to a closed StreamGroup.");
101 }
102
103 if (_state == _StreamGroupState.dormant) {
104 _streams.add(stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Here you can add the same stream more than once wh
nweiz 2015/06/19 00:44:17 Good idea! This makes a lot of stuff cleaner.
105 } else if (_state == _StreamGroupState.canceled) {
106 // Listen to the stream and cancel it immediately so that no one else can
107 // listen, for consistency. If the stream has an onCancel listener this
108 // will also fire that, which may help it clean up resources.
109 return stream.listen(null).cancel();
Lasse Reichstein Nielsen 2015/06/18 10:29:31 The listen call can fail (for a single-sub stream
nweiz 2015/06/19 00:44:16 It would fail if the group weren't canceled, too,
110 } else {
111 _listenToStream(stream);
112 }
113
114 return null;
115 }
116
117 /// Removes [stream] as a member of this group.
118 ///
119 /// No further events from [stream] will be emitted through this group. If
120 /// [stream] has been listened to, its subscription will be canceled.
121 ///
122 /// If [stream] has been listened to, this *synchronously* cancels its
123 /// subscription. This means that any events from [stream] that haven't yet
124 /// been emitted through this group will not be, even if they were added to
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Maybe drop the ", even if ..." part, I think it on
nweiz 2015/06/19 00:44:17 Done.
125 /// [stream] before it was removed.
126 ///
127 /// If [stream]'s subscription was canceled, this returns
Lasse Reichstein Nielsen 2015/06/18 10:29:31 was => is
nweiz 2015/06/19 00:44:17 Done.
128 /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`.
129 Future remove(Stream<T> stream) {
130 if (_state == _StreamGroupState.dormant) {
131 _streams.remove(stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:31 If this is a broadcast group and a single-sub stre
nweiz 2015/06/19 00:44:17 Merging _subscriptions and _streams made this prob
132 if (_closed && _streams.isEmpty) _controller.close();
133 return null;
134 }
135
136 var subscription = _subscriptions.remove(stream);
137 var future = subscription == null ? null : subscription.cancel();
138 if (_closed && _subscriptions.isEmpty) _controller.close();
139 return future;
140 }
141
142 /// A callback called when [stream] is listened to.
143 ///
144 /// This is called for both single-subscription and broadcast groups.
145 void _onListen() {
146 _state = _StreamGroupState.listening;
147
148 // Now that the group is marked as listening, [add] will actually subscribe
149 // rather than adding a stream to [_streams].
150 _streams.forEach(_listenToStream);
151 _streams.clear();
152 }
153
154 /// A callback called when [stream] is paused.
155 void _onPause() {
156 _state = _StreamGroupState.paused;
157 for (var subscription in _subscriptions.values) {
158 subscription.pause();
159 }
160 }
161
162 /// A callback called when [stream] is resumed.
163 void _onResume() {
164 _state = _StreamGroupState.listening;
165 for (var subscription in _subscriptions.values) {
166 subscription.resume();
167 }
168 }
169
170 /// A callback called when [stream] is canceled.
171 ///
172 /// This is only called for single-subscription groups.
173 Future _onCancel() {
174 _state = _StreamGroupState.canceled;
175
176 var futures = _subscriptions.values
177 .map((subscription) => subscription.cancel())
178 .where((future) => future != null)
179 .toList();
180
181 _subscriptions.clear();
182 return futures.isEmpty ? null : Future.wait(futures);
183 }
184
185 /// A callback called when [stream] is canceled.
Lasse Reichstein Nielsen 2015/06/18 10:29:31 is canceled -> 's last listener is canceled ? Bro
nweiz 2015/06/19 00:44:17 Done.
186 ///
187 /// This is only called for broadcast groups.
188 void _onCancelBroadcast() {
189 _state = _StreamGroupState.dormant;
190
191 for (var stream in _subscriptions.keys.toList()) {
192 if (stream.isBroadcast) _subscriptions.remove(stream).cancel();
193 _streams.add(stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Neat, cancel the broadcast streams, but let the si
nweiz 2015/06/19 00:44:17 Done.
194 }
195 }
196
197 /// Starts actively forwarding events from [stream] to [_controller].
198 ///
199 /// This will add the resulting subscription to [_subscriptions] and pause it
200 /// if [this] is paused.
201 void _listenToStream(Stream stream) {
202 _subscriptions.putIfAbsent(stream, () {
203 var subscription = stream.listen(
204 _controller.add,
205 onError: _controller.addError,
206 onDone: () => remove(stream));
207 if (_state == _StreamGroupState.paused) subscription.pause();
208 return subscription;
209 });
210 }
211
212 /// Closes the group, indicating that no more streams will be added.
213 ///
214 /// If there are no streams in the group, [stream] is closed immediately.
215 /// Otherwise, [stream] will close once all streams in the group close.
216 ///
217 /// Returns a [Future] that completes once [stream] has actually been closed.
218 Future close() {
219 if (_closed) return _controller.done;
220
221 _closed = true;
222 if (_state == _StreamGroupState.dormant) {
223 if (_streams.isEmpty) _controller.close();
Lasse Reichstein Nielsen 2015/06/18 10:29:31 If you are dormant in a broadcast stream with a si
nweiz 2015/06/19 00:44:16 Fixed by merging _subscriptions and _streams.
224 } else if (_subscriptions.isEmpty) {
225 _controller.close();
226 }
227
228 return _controller.done;
229 }
230 }
231
232 /// An enum of possible states of a [StreamGroup].
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Would a real enum work? (Not that I particularly l
nweiz 2015/06/19 00:44:16 Yes, technically, but I tend to avoid those on pri
233 class _StreamGroupState {
234 /// The group has no listeners.
235 ///
236 /// New streams added to the group will be listened once the group has a
237 /// listener.
238 static const dormant = const _StreamGroupState("dormant");
239
240 /// The group has one or more listeners and is actively firing events.
241 ///
242 /// New streams added to the group will be immediately listeners.
243 static const listening = const _StreamGroupState("listening");
244
245 /// The group is paused and no more events will be fired until it resumes.
246 ///
247 /// New streams added to the group will be listened to, but then paused. They
248 /// will be resumed once the group itself is resumed.
249 ///
250 /// This state is only used by single-subscriber groups.
251 static const paused = const _StreamGroupState("paused");
252
253 /// The group is canceled and no more events will be fired ever.
254 ///
255 /// New streams added to the group will be listened to, canceled, and
256 /// discarded.
257 ///
258 /// This state is only used by single-subscriber groups.
259 static const canceled = const _StreamGroupState("canceled");
260
261 /// The name of the state.
262 ///
263 /// Used for debugging.
264 final String name;
265
266 const _StreamGroupState(this.name);
267
268 String toString() => name;
269 }
OLDNEW
« no previous file with comments | « lib/async.dart ('k') | test/stream_group_test.dart » ('j') | test/stream_group_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698