Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.stream_group; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 | |
| 9 /// A collection of streams whose events are unified and sent through a central | |
| 10 /// stream. | |
| 11 /// | |
| 12 /// Both errors and data events are forwarded through [stream]. The streams in | |
| 13 /// the group won't be listened to until [stream] has a listener. **Note that | |
| 14 /// this means that events emitted by broadcast streams will be dropped until | |
| 15 /// [stream] has a listener.** | |
| 16 /// | |
| 17 /// [stream] will be a single-subscription stream for groups constructed with | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
I find "groups" confusing - using "a group" or "a
nweiz
2015/06/19 00:44:16
Reworded.
| |
| 18 /// [new StreamGroup]. In this case, if [stream] is paused or canceled, all | |
| 19 /// streams in the group will likewise be paused or canceled, respectively. | |
| 20 /// | |
| 21 /// [stream] will be a broadcast stream for groups constructed with [new | |
| 22 /// StreamGroup.broadcast]. In this case, the streams in the group will never be | |
| 23 /// paused and single-subscription streams in the group will never be canceled. | |
| 24 /// **Note that single-subscription streams in a broadcast group may drop events | |
| 25 /// if a listener is added and later removed.** Broadcast streams in the group | |
| 26 /// will be canceled once [stream] has no listeners, and will be listened to | |
| 27 /// again once [stream] has listeners. | |
| 28 /// | |
| 29 /// [stream] won't close until [close] is called on the group *and* every stream | |
| 30 /// in the group closes. | |
| 31 class StreamGroup<T> implements Sink<Stream<T>> { | |
| 32 /// The stream through which all events from streams in the group are emitted. | |
| 33 Stream<T> get stream => _controller.stream; | |
| 34 StreamController<T> _controller; | |
| 35 | |
| 36 /// Whether the group is closed, meaning that no more streams may be added. | |
| 37 var _closed = false; | |
| 38 | |
| 39 /// The current state of the group. | |
| 40 /// | |
| 41 /// See [_StreamGroupState] for detailed descriptions of each state. | |
| 42 var _state = _StreamGroupState.dormant; | |
| 43 | |
| 44 /// The streams that have been added to the group, but not yet listened to. | |
| 45 /// | |
| 46 /// This will be empty unless the group is dormant (that is, [listen] has not | |
| 47 /// yet been called). Once the group is listening, streams and their | |
| 48 /// subscriptions will be directly added to [_subscriptions] instead. | |
| 49 final _streams = new List<Stream<T>>(); | |
| 50 | |
| 51 /// Subscriptions to the streams that make up the group. | |
| 52 /// | |
| 53 /// The keys are the streams that are listened on, and the values are the | |
| 54 /// subscriptions to those streams. This will be empty as long as the group is | |
| 55 /// dormant. | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
It seems that it will still contain single-sub gro
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Perhaps state that the map is used to look up the
nweiz
2015/06/19 00:44:17
That's true, I'll state that explicitly.
| |
| 56 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); | |
| 57 | |
| 58 /// Merges the events from [streams] into a single (single-subscriber) stream. | |
| 59 /// | |
| 60 /// This is equivalent to adding [streams] to a group, closing that group, and | |
| 61 /// returning its stream. | |
| 62 static Stream merge(Iterable<Stream> streams) { | |
| 63 var group = new StreamGroup(); | |
| 64 streams.forEach(group.add); | |
| 65 group.close(); | |
| 66 return group.stream; | |
| 67 } | |
| 68 | |
| 69 /// Creates a new stream group where [stream] is single-subscriber. | |
| 70 StreamGroup() { | |
| 71 _controller = new StreamController<T>( | |
| 72 onListen: _onListen, | |
| 73 onPause: _onPause, | |
| 74 onResume: _onResume, | |
| 75 onCancel: _onCancel, | |
| 76 sync: true); | |
| 77 } | |
| 78 | |
| 79 /// Creates a new stream group where [stream] is a broadcast stream. | |
| 80 StreamGroup.broadcast() { | |
| 81 _controller = new StreamController<T>.broadcast( | |
| 82 onListen: _onListen, | |
| 83 onCancel: _onCancelBroadcast, | |
| 84 sync: true); | |
| 85 } | |
| 86 | |
| 87 /// Adds [stream] as a member of this group. | |
| 88 /// | |
| 89 /// Any events from [stream] will be emitted through [this.stream]. If this | |
| 90 /// group has a listener, [stream] will be listened to immediately; otherwise | |
| 91 /// it will only be listened to once this group gets a listener. | |
| 92 /// | |
| 93 /// If this is a single-subscription group and its subscription has been | |
| 94 /// canceled, [stream] will be canceled as soon as its added. If this returns | |
| 95 /// a Future, it will be returned from [add]. Otherwise, [add] returns `null`. | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:32
[Future]
nweiz
2015/06/19 00:44:16
Done.
| |
| 96 /// | |
| 97 /// Throws a [StateError] if this group is closed. | |
| 98 Future add(Stream<T> stream) { | |
| 99 if (_closed) { | |
| 100 throw new StateError("Can't add a Stream to a closed StreamGroup."); | |
| 101 } | |
| 102 | |
| 103 if (_state == _StreamGroupState.dormant) { | |
| 104 _streams.add(stream); | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Here you can add the same stream more than once wh
nweiz
2015/06/19 00:44:17
Good idea! This makes a lot of stuff cleaner.
| |
| 105 } else if (_state == _StreamGroupState.canceled) { | |
| 106 // Listen to the stream and cancel it immediately so that no one else can | |
| 107 // listen, for consistency. If the stream has an onCancel listener this | |
| 108 // will also fire that, which may help it clean up resources. | |
| 109 return stream.listen(null).cancel(); | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
The listen call can fail (for a single-sub stream
nweiz
2015/06/19 00:44:16
It would fail if the group weren't canceled, too,
| |
| 110 } else { | |
| 111 _listenToStream(stream); | |
| 112 } | |
| 113 | |
| 114 return null; | |
| 115 } | |
| 116 | |
| 117 /// Removes [stream] as a member of this group. | |
| 118 /// | |
| 119 /// No further events from [stream] will be emitted through this group. If | |
| 120 /// [stream] has been listened to, its subscription will be canceled. | |
| 121 /// | |
| 122 /// If [stream] has been listened to, this *synchronously* cancels its | |
| 123 /// subscription. This means that any events from [stream] that haven't yet | |
| 124 /// been emitted through this group will not be, even if they were added to | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Maybe drop the ", even if ..." part, I think it on
nweiz
2015/06/19 00:44:17
Done.
| |
| 125 /// [stream] before it was removed. | |
| 126 /// | |
| 127 /// If [stream]'s subscription was canceled, this returns | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
was => is
nweiz
2015/06/19 00:44:17
Done.
| |
| 128 /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`. | |
| 129 Future remove(Stream<T> stream) { | |
| 130 if (_state == _StreamGroupState.dormant) { | |
| 131 _streams.remove(stream); | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
If this is a broadcast group and a single-sub stre
nweiz
2015/06/19 00:44:17
Merging _subscriptions and _streams made this prob
| |
| 132 if (_closed && _streams.isEmpty) _controller.close(); | |
| 133 return null; | |
| 134 } | |
| 135 | |
| 136 var subscription = _subscriptions.remove(stream); | |
| 137 var future = subscription == null ? null : subscription.cancel(); | |
| 138 if (_closed && _subscriptions.isEmpty) _controller.close(); | |
| 139 return future; | |
| 140 } | |
| 141 | |
| 142 /// A callback called when [stream] is listened to. | |
| 143 /// | |
| 144 /// This is called for both single-subscription and broadcast groups. | |
| 145 void _onListen() { | |
| 146 _state = _StreamGroupState.listening; | |
| 147 | |
| 148 // Now that the group is marked as listening, [add] will actually subscribe | |
| 149 // rather than adding a stream to [_streams]. | |
| 150 _streams.forEach(_listenToStream); | |
| 151 _streams.clear(); | |
| 152 } | |
| 153 | |
| 154 /// A callback called when [stream] is paused. | |
| 155 void _onPause() { | |
| 156 _state = _StreamGroupState.paused; | |
| 157 for (var subscription in _subscriptions.values) { | |
| 158 subscription.pause(); | |
| 159 } | |
| 160 } | |
| 161 | |
| 162 /// A callback called when [stream] is resumed. | |
| 163 void _onResume() { | |
| 164 _state = _StreamGroupState.listening; | |
| 165 for (var subscription in _subscriptions.values) { | |
| 166 subscription.resume(); | |
| 167 } | |
| 168 } | |
| 169 | |
| 170 /// A callback called when [stream] is canceled. | |
| 171 /// | |
| 172 /// This is only called for single-subscription groups. | |
| 173 Future _onCancel() { | |
| 174 _state = _StreamGroupState.canceled; | |
| 175 | |
| 176 var futures = _subscriptions.values | |
| 177 .map((subscription) => subscription.cancel()) | |
| 178 .where((future) => future != null) | |
| 179 .toList(); | |
| 180 | |
| 181 _subscriptions.clear(); | |
| 182 return futures.isEmpty ? null : Future.wait(futures); | |
| 183 } | |
| 184 | |
| 185 /// A callback called when [stream] is canceled. | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
is canceled -> 's last listener is canceled
?
Bro
nweiz
2015/06/19 00:44:17
Done.
| |
| 186 /// | |
| 187 /// This is only called for broadcast groups. | |
| 188 void _onCancelBroadcast() { | |
| 189 _state = _StreamGroupState.dormant; | |
| 190 | |
| 191 for (var stream in _subscriptions.keys.toList()) { | |
| 192 if (stream.isBroadcast) _subscriptions.remove(stream).cancel(); | |
| 193 _streams.add(stream); | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Neat, cancel the broadcast streams, but let the si
nweiz
2015/06/19 00:44:17
Done.
| |
| 194 } | |
| 195 } | |
| 196 | |
| 197 /// Starts actively forwarding events from [stream] to [_controller]. | |
| 198 /// | |
| 199 /// This will add the resulting subscription to [_subscriptions] and pause it | |
| 200 /// if [this] is paused. | |
| 201 void _listenToStream(Stream stream) { | |
| 202 _subscriptions.putIfAbsent(stream, () { | |
| 203 var subscription = stream.listen( | |
| 204 _controller.add, | |
| 205 onError: _controller.addError, | |
| 206 onDone: () => remove(stream)); | |
| 207 if (_state == _StreamGroupState.paused) subscription.pause(); | |
| 208 return subscription; | |
| 209 }); | |
| 210 } | |
| 211 | |
| 212 /// Closes the group, indicating that no more streams will be added. | |
| 213 /// | |
| 214 /// If there are no streams in the group, [stream] is closed immediately. | |
| 215 /// Otherwise, [stream] will close once all streams in the group close. | |
| 216 /// | |
| 217 /// Returns a [Future] that completes once [stream] has actually been closed. | |
| 218 Future close() { | |
| 219 if (_closed) return _controller.done; | |
| 220 | |
| 221 _closed = true; | |
| 222 if (_state == _StreamGroupState.dormant) { | |
| 223 if (_streams.isEmpty) _controller.close(); | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
If you are dormant in a broadcast stream with a si
nweiz
2015/06/19 00:44:16
Fixed by merging _subscriptions and _streams.
| |
| 224 } else if (_subscriptions.isEmpty) { | |
| 225 _controller.close(); | |
| 226 } | |
| 227 | |
| 228 return _controller.done; | |
| 229 } | |
| 230 } | |
| 231 | |
| 232 /// An enum of possible states of a [StreamGroup]. | |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Would a real enum work?
(Not that I particularly l
nweiz
2015/06/19 00:44:16
Yes, technically, but I tend to avoid those on pri
| |
| 233 class _StreamGroupState { | |
| 234 /// The group has no listeners. | |
| 235 /// | |
| 236 /// New streams added to the group will be listened once the group has a | |
| 237 /// listener. | |
| 238 static const dormant = const _StreamGroupState("dormant"); | |
| 239 | |
| 240 /// The group has one or more listeners and is actively firing events. | |
| 241 /// | |
| 242 /// New streams added to the group will be immediately listeners. | |
| 243 static const listening = const _StreamGroupState("listening"); | |
| 244 | |
| 245 /// The group is paused and no more events will be fired until it resumes. | |
| 246 /// | |
| 247 /// New streams added to the group will be listened to, but then paused. They | |
| 248 /// will be resumed once the group itself is resumed. | |
| 249 /// | |
| 250 /// This state is only used by single-subscriber groups. | |
| 251 static const paused = const _StreamGroupState("paused"); | |
| 252 | |
| 253 /// The group is canceled and no more events will be fired ever. | |
| 254 /// | |
| 255 /// New streams added to the group will be listened to, canceled, and | |
| 256 /// discarded. | |
| 257 /// | |
| 258 /// This state is only used by single-subscriber groups. | |
| 259 static const canceled = const _StreamGroupState("canceled"); | |
| 260 | |
| 261 /// The name of the state. | |
| 262 /// | |
| 263 /// Used for debugging. | |
| 264 final String name; | |
| 265 | |
| 266 const _StreamGroupState(this.name); | |
| 267 | |
| 268 String toString() => name; | |
| 269 } | |
| OLD | NEW |