Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(334)

Side by Side Diff: packages/async/lib/src/stream_group.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/async/lib/src/stream_completer.dart ('k') | packages/async/lib/src/stream_queue.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_group;
6
7 import 'dart:async';
8
9 /// A collection of streams whose events are unified and sent through a central
10 /// stream.
11 ///
12 /// Both errors and data events are forwarded through [stream]. The streams in
13 /// the group won't be listened to until [stream] has a listener. **Note that
14 /// this means that events emitted by broadcast streams will be dropped until
15 /// [stream] has a listener.**
16 ///
17 /// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will b e
18 /// single-subscription. In this case, if [stream] is paused or canceled, all
19 /// streams in the group will likewise be paused or canceled, respectively.
20 ///
21 /// If the `StreamGroup` is constructed using [new StreamGroup.broadcast],
22 /// [stream] will be a broadcast stream. In this case, the streams in the group
23 /// will never be paused and single-subscription streams in the group will never
24 /// be canceled. **Note that single-subscription streams in a broadcast group
25 /// may drop events if a listener is added and later removed.** Broadcast
26 /// streams in the group will be canceled once [stream] has no listeners, and
27 /// will be listened to again once [stream] has listeners.
28 ///
29 /// [stream] won't close until [close] is called on the group *and* every stream
30 /// in the group closes.
31 class StreamGroup<T> implements Sink<Stream<T>> {
32 /// The stream through which all events from streams in the group are emitted.
33 Stream<T> get stream => _controller.stream;
34 StreamController<T> _controller;
35
36 /// Whether the group is closed, meaning that no more streams may be added.
37 var _closed = false;
38
39 /// The current state of the group.
40 ///
41 /// See [_StreamGroupState] for detailed descriptions of each state.
42 var _state = _StreamGroupState.dormant;
43
44 /// Streams that have been added to the group, and their subscriptions if they
45 /// have been subscribed to.
46 ///
47 /// The subscriptions will be null until the group has a listener registered.
48 /// If it's a broadcast group and it goes dormant again, broadcast stream
49 /// subscriptions will be canceled and set to null again. Single-subscriber
50 /// stream subscriptions will be left intact, since they can't be
51 /// re-subscribed.
52 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>();
53
54 /// Merges the events from [streams] into a single (single-subscriber) stream.
55 ///
56 /// This is equivalent to adding [streams] to a group, closing that group, and
57 /// returning its stream.
58 static Stream merge(Iterable<Stream> streams) {
59 var group = new StreamGroup();
60 streams.forEach(group.add);
61 group.close();
62 return group.stream;
63 }
64
65 /// Creates a new stream group where [stream] is single-subscriber.
66 StreamGroup() {
67 _controller = new StreamController<T>(
68 onListen: _onListen,
69 onPause: _onPause,
70 onResume: _onResume,
71 onCancel: _onCancel,
72 sync: true);
73 }
74
75 /// Creates a new stream group where [stream] is a broadcast stream.
76 StreamGroup.broadcast() {
77 _controller = new StreamController<T>.broadcast(
78 onListen: _onListen,
79 onCancel: _onCancelBroadcast,
80 sync: true);
81 }
82
83 /// Adds [stream] as a member of this group.
84 ///
85 /// Any events from [stream] will be emitted through [this.stream]. If this
86 /// group has a listener, [stream] will be listened to immediately; otherwise
87 /// it will only be listened to once this group gets a listener.
88 ///
89 /// If this is a single-subscription group and its subscription has been
90 /// canceled, [stream] will be canceled as soon as its added. If this returns
91 /// a [Future], it will be returned from [add]. Otherwise, [add] returns
92 /// `null`.
93 ///
94 /// Throws a [StateError] if this group is closed.
95 Future add(Stream<T> stream) {
96 if (_closed) {
97 throw new StateError("Can't add a Stream to a closed StreamGroup.");
98 }
99
100 if (_state == _StreamGroupState.dormant) {
101 _subscriptions.putIfAbsent(stream, () => null);
102 } else if (_state == _StreamGroupState.canceled) {
103 // Listen to the stream and cancel it immediately so that no one else can
104 // listen, for consistency. If the stream has an onCancel listener this
105 // will also fire that, which may help it clean up resources.
106 return stream.listen(null).cancel();
107 } else {
108 _subscriptions.putIfAbsent(stream, () => _listenToStream(stream));
109 }
110
111 return null;
112 }
113
114 /// Removes [stream] as a member of this group.
115 ///
116 /// No further events from [stream] will be emitted through this group. If
117 /// [stream] has been listened to, its subscription will be canceled.
118 ///
119 /// If [stream] has been listened to, this *synchronously* cancels its
120 /// subscription. This means that any events from [stream] that haven't yet
121 /// been emitted through this group will not be.
122 ///
123 /// If [stream]'s subscription is canceled, this returns
124 /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`.
125 Future remove(Stream<T> stream) {
126 var subscription = _subscriptions.remove(stream);
127 var future = subscription == null ? null : subscription.cancel();
128 if (_closed && _subscriptions.isEmpty) _controller.close();
129 return future;
130 }
131
132 /// A callback called when [stream] is listened to.
133 ///
134 /// This is called for both single-subscription and broadcast groups.
135 void _onListen() {
136 _state = _StreamGroupState.listening;
137 _subscriptions.forEach((stream, subscription) {
138 // If this is a broadcast group and this isn't the first time it's been
139 // listened to, there may still be some subscriptions to
140 // single-subscription streams.
141 if (subscription != null) return;
142 _subscriptions[stream] = _listenToStream(stream);
143 });
144 }
145
146 /// A callback called when [stream] is paused.
147 void _onPause() {
148 _state = _StreamGroupState.paused;
149 for (var subscription in _subscriptions.values) {
150 subscription.pause();
151 }
152 }
153
154 /// A callback called when [stream] is resumed.
155 void _onResume() {
156 _state = _StreamGroupState.listening;
157 for (var subscription in _subscriptions.values) {
158 subscription.resume();
159 }
160 }
161
162 /// A callback called when [stream] is canceled.
163 ///
164 /// This is only called for single-subscription groups.
165 Future _onCancel() {
166 _state = _StreamGroupState.canceled;
167
168 var futures = _subscriptions.values
169 .map((subscription) => subscription.cancel())
170 .where((future) => future != null)
171 .toList();
172
173 _subscriptions.clear();
174 return futures.isEmpty ? null : Future.wait(futures);
175 }
176
177 /// A callback called when [stream]'s last listener is canceled.
178 ///
179 /// This is only called for broadcast groups.
180 void _onCancelBroadcast() {
181 _state = _StreamGroupState.dormant;
182
183 _subscriptions.forEach((stream, subscription) {
184 // Cancel the broadcast streams, since we can re-listen to those later,
185 // but allow the single-subscription streams to keep firing. Their events
186 // will still be added to [_controller], but then they'll be dropped since
187 // it has no listeners.
188 if (!stream.isBroadcast) return;
189 subscription.cancel();
190 _subscriptions[stream] = null;
191 });
192 }
193
194 /// Starts actively forwarding events from [stream] to [_controller].
195 ///
196 /// This will pause the resulting subscription if [this] is paused.
197 StreamSubscription _listenToStream(Stream stream) {
198 var subscription = stream.listen(
199 _controller.add,
200 onError: _controller.addError,
201 onDone: () => remove(stream));
202 if (_state == _StreamGroupState.paused) subscription.pause();
203 return subscription;
204 }
205
206 /// Closes the group, indicating that no more streams will be added.
207 ///
208 /// If there are no streams in the group, [stream] is closed immediately.
209 /// Otherwise, [stream] will close once all streams in the group close.
210 ///
211 /// Returns a [Future] that completes once [stream] has actually been closed.
212 Future close() {
213 if (_closed) return _controller.done;
214
215 _closed = true;
216 if (_subscriptions.isEmpty) _controller.close();
217
218 return _controller.done;
219 }
220 }
221
222 /// An enum of possible states of a [StreamGroup].
223 class _StreamGroupState {
224 /// The group has no listeners.
225 ///
226 /// New streams added to the group will be listened once the group has a
227 /// listener.
228 static const dormant = const _StreamGroupState("dormant");
229
230 /// The group has one or more listeners and is actively firing events.
231 ///
232 /// New streams added to the group will be immediately listeners.
233 static const listening = const _StreamGroupState("listening");
234
235 /// The group is paused and no more events will be fired until it resumes.
236 ///
237 /// New streams added to the group will be listened to, but then paused. They
238 /// will be resumed once the group itself is resumed.
239 ///
240 /// This state is only used by single-subscriber groups.
241 static const paused = const _StreamGroupState("paused");
242
243 /// The group is canceled and no more events will be fired ever.
244 ///
245 /// New streams added to the group will be listened to, canceled, and
246 /// discarded.
247 ///
248 /// This state is only used by single-subscriber groups.
249 static const canceled = const _StreamGroupState("canceled");
250
251 /// The name of the state.
252 ///
253 /// Used for debugging.
254 final String name;
255
256 const _StreamGroupState(this.name);
257
258 String toString() => name;
259 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/stream_completer.dart ('k') | packages/async/lib/src/stream_queue.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698