OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.stream_group; |
| 6 |
| 7 import 'dart:async'; |
| 8 |
| 9 /// A collection of streams whose events are unified and sent through a central |
| 10 /// stream. |
| 11 /// |
| 12 /// Both errors and data events are forwarded through [stream]. The streams in |
| 13 /// the group won't be listened to until [stream] has a listener. **Note that |
| 14 /// this means that events emitted by broadcast streams will be dropped until |
| 15 /// [stream] has a listener.** |
| 16 /// |
| 17 /// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will b
e |
| 18 /// single-subscription. In this case, if [stream] is paused or canceled, all |
| 19 /// streams in the group will likewise be paused or canceled, respectively. |
| 20 /// |
| 21 /// If the `StreamGroup` is constructed using [new StreamGroup.broadcast], |
| 22 /// [stream] will be a broadcast stream. In this case, the streams in the group |
| 23 /// will never be paused and single-subscription streams in the group will never |
| 24 /// be canceled. **Note that single-subscription streams in a broadcast group |
| 25 /// may drop events if a listener is added and later removed.** Broadcast |
| 26 /// streams in the group will be canceled once [stream] has no listeners, and |
| 27 /// will be listened to again once [stream] has listeners. |
| 28 /// |
| 29 /// [stream] won't close until [close] is called on the group *and* every stream |
| 30 /// in the group closes. |
| 31 class StreamGroup<T> implements Sink<Stream<T>> { |
| 32 /// The stream through which all events from streams in the group are emitted. |
| 33 Stream<T> get stream => _controller.stream; |
| 34 StreamController<T> _controller; |
| 35 |
| 36 /// Whether the group is closed, meaning that no more streams may be added. |
| 37 var _closed = false; |
| 38 |
| 39 /// The current state of the group. |
| 40 /// |
| 41 /// See [_StreamGroupState] for detailed descriptions of each state. |
| 42 var _state = _StreamGroupState.dormant; |
| 43 |
| 44 /// Streams that have been added to the group, and their subscriptions if they |
| 45 /// have been subscribed to. |
| 46 /// |
| 47 /// The subscriptions will be null until the group has a listener registered. |
| 48 /// If it's a broadcast group and it goes dormant again, broadcast stream |
| 49 /// subscriptions will be canceled and set to null again. Single-subscriber |
| 50 /// stream subscriptions will be left intact, since they can't be |
| 51 /// re-subscribed. |
| 52 final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); |
| 53 |
| 54 /// Merges the events from [streams] into a single (single-subscriber) stream. |
| 55 /// |
| 56 /// This is equivalent to adding [streams] to a group, closing that group, and |
| 57 /// returning its stream. |
| 58 static Stream merge(Iterable<Stream> streams) { |
| 59 var group = new StreamGroup(); |
| 60 streams.forEach(group.add); |
| 61 group.close(); |
| 62 return group.stream; |
| 63 } |
| 64 |
| 65 /// Creates a new stream group where [stream] is single-subscriber. |
| 66 StreamGroup() { |
| 67 _controller = new StreamController<T>( |
| 68 onListen: _onListen, |
| 69 onPause: _onPause, |
| 70 onResume: _onResume, |
| 71 onCancel: _onCancel, |
| 72 sync: true); |
| 73 } |
| 74 |
| 75 /// Creates a new stream group where [stream] is a broadcast stream. |
| 76 StreamGroup.broadcast() { |
| 77 _controller = new StreamController<T>.broadcast( |
| 78 onListen: _onListen, |
| 79 onCancel: _onCancelBroadcast, |
| 80 sync: true); |
| 81 } |
| 82 |
| 83 /// Adds [stream] as a member of this group. |
| 84 /// |
| 85 /// Any events from [stream] will be emitted through [this.stream]. If this |
| 86 /// group has a listener, [stream] will be listened to immediately; otherwise |
| 87 /// it will only be listened to once this group gets a listener. |
| 88 /// |
| 89 /// If this is a single-subscription group and its subscription has been |
| 90 /// canceled, [stream] will be canceled as soon as its added. If this returns |
| 91 /// a [Future], it will be returned from [add]. Otherwise, [add] returns |
| 92 /// `null`. |
| 93 /// |
| 94 /// Throws a [StateError] if this group is closed. |
| 95 Future add(Stream<T> stream) { |
| 96 if (_closed) { |
| 97 throw new StateError("Can't add a Stream to a closed StreamGroup."); |
| 98 } |
| 99 |
| 100 if (_state == _StreamGroupState.dormant) { |
| 101 _subscriptions.putIfAbsent(stream, () => null); |
| 102 } else if (_state == _StreamGroupState.canceled) { |
| 103 // Listen to the stream and cancel it immediately so that no one else can |
| 104 // listen, for consistency. If the stream has an onCancel listener this |
| 105 // will also fire that, which may help it clean up resources. |
| 106 return stream.listen(null).cancel(); |
| 107 } else { |
| 108 _subscriptions.putIfAbsent(stream, () => _listenToStream(stream)); |
| 109 } |
| 110 |
| 111 return null; |
| 112 } |
| 113 |
| 114 /// Removes [stream] as a member of this group. |
| 115 /// |
| 116 /// No further events from [stream] will be emitted through this group. If |
| 117 /// [stream] has been listened to, its subscription will be canceled. |
| 118 /// |
| 119 /// If [stream] has been listened to, this *synchronously* cancels its |
| 120 /// subscription. This means that any events from [stream] that haven't yet |
| 121 /// been emitted through this group will not be. |
| 122 /// |
| 123 /// If [stream]'s subscription is canceled, this returns |
| 124 /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`. |
| 125 Future remove(Stream<T> stream) { |
| 126 var subscription = _subscriptions.remove(stream); |
| 127 var future = subscription == null ? null : subscription.cancel(); |
| 128 if (_closed && _subscriptions.isEmpty) _controller.close(); |
| 129 return future; |
| 130 } |
| 131 |
| 132 /// A callback called when [stream] is listened to. |
| 133 /// |
| 134 /// This is called for both single-subscription and broadcast groups. |
| 135 void _onListen() { |
| 136 _state = _StreamGroupState.listening; |
| 137 _subscriptions.forEach((stream, subscription) { |
| 138 // If this is a broadcast group and this isn't the first time it's been |
| 139 // listened to, there may still be some subscriptions to |
| 140 // single-subscription streams. |
| 141 if (subscription != null) return; |
| 142 _subscriptions[stream] = _listenToStream(stream); |
| 143 }); |
| 144 } |
| 145 |
| 146 /// A callback called when [stream] is paused. |
| 147 void _onPause() { |
| 148 _state = _StreamGroupState.paused; |
| 149 for (var subscription in _subscriptions.values) { |
| 150 subscription.pause(); |
| 151 } |
| 152 } |
| 153 |
| 154 /// A callback called when [stream] is resumed. |
| 155 void _onResume() { |
| 156 _state = _StreamGroupState.listening; |
| 157 for (var subscription in _subscriptions.values) { |
| 158 subscription.resume(); |
| 159 } |
| 160 } |
| 161 |
| 162 /// A callback called when [stream] is canceled. |
| 163 /// |
| 164 /// This is only called for single-subscription groups. |
| 165 Future _onCancel() { |
| 166 _state = _StreamGroupState.canceled; |
| 167 |
| 168 var futures = _subscriptions.values |
| 169 .map((subscription) => subscription.cancel()) |
| 170 .where((future) => future != null) |
| 171 .toList(); |
| 172 |
| 173 _subscriptions.clear(); |
| 174 return futures.isEmpty ? null : Future.wait(futures); |
| 175 } |
| 176 |
| 177 /// A callback called when [stream]'s last listener is canceled. |
| 178 /// |
| 179 /// This is only called for broadcast groups. |
| 180 void _onCancelBroadcast() { |
| 181 _state = _StreamGroupState.dormant; |
| 182 |
| 183 _subscriptions.forEach((stream, subscription) { |
| 184 // Cancel the broadcast streams, since we can re-listen to those later, |
| 185 // but allow the single-subscription streams to keep firing. Their events |
| 186 // will still be added to [_controller], but then they'll be dropped since |
| 187 // it has no listeners. |
| 188 if (!stream.isBroadcast) return; |
| 189 subscription.cancel(); |
| 190 _subscriptions[stream] = null; |
| 191 }); |
| 192 } |
| 193 |
| 194 /// Starts actively forwarding events from [stream] to [_controller]. |
| 195 /// |
| 196 /// This will pause the resulting subscription if [this] is paused. |
| 197 StreamSubscription _listenToStream(Stream stream) { |
| 198 var subscription = stream.listen( |
| 199 _controller.add, |
| 200 onError: _controller.addError, |
| 201 onDone: () => remove(stream)); |
| 202 if (_state == _StreamGroupState.paused) subscription.pause(); |
| 203 return subscription; |
| 204 } |
| 205 |
| 206 /// Closes the group, indicating that no more streams will be added. |
| 207 /// |
| 208 /// If there are no streams in the group, [stream] is closed immediately. |
| 209 /// Otherwise, [stream] will close once all streams in the group close. |
| 210 /// |
| 211 /// Returns a [Future] that completes once [stream] has actually been closed. |
| 212 Future close() { |
| 213 if (_closed) return _controller.done; |
| 214 |
| 215 _closed = true; |
| 216 if (_subscriptions.isEmpty) _controller.close(); |
| 217 |
| 218 return _controller.done; |
| 219 } |
| 220 } |
| 221 |
| 222 /// An enum of possible states of a [StreamGroup]. |
| 223 class _StreamGroupState { |
| 224 /// The group has no listeners. |
| 225 /// |
| 226 /// New streams added to the group will be listened once the group has a |
| 227 /// listener. |
| 228 static const dormant = const _StreamGroupState("dormant"); |
| 229 |
| 230 /// The group has one or more listeners and is actively firing events. |
| 231 /// |
| 232 /// New streams added to the group will be immediately listeners. |
| 233 static const listening = const _StreamGroupState("listening"); |
| 234 |
| 235 /// The group is paused and no more events will be fired until it resumes. |
| 236 /// |
| 237 /// New streams added to the group will be listened to, but then paused. They |
| 238 /// will be resumed once the group itself is resumed. |
| 239 /// |
| 240 /// This state is only used by single-subscriber groups. |
| 241 static const paused = const _StreamGroupState("paused"); |
| 242 |
| 243 /// The group is canceled and no more events will be fired ever. |
| 244 /// |
| 245 /// New streams added to the group will be listened to, canceled, and |
| 246 /// discarded. |
| 247 /// |
| 248 /// This state is only used by single-subscriber groups. |
| 249 static const canceled = const _StreamGroupState("canceled"); |
| 250 |
| 251 /// The name of the state. |
| 252 /// |
| 253 /// Used for debugging. |
| 254 final String name; |
| 255 |
| 256 const _StreamGroupState(this.name); |
| 257 |
| 258 String toString() => name; |
| 259 } |
OLD | NEW |