OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.stream_splitter; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:collection'; |
| 9 |
| 10 import '../result.dart'; |
| 11 import 'future_group.dart'; |
| 12 |
| 13 /// A class that splits a single source stream into an arbitrary number of |
| 14 /// (single-subscription) streams (called "branch") that emit the same events. |
| 15 /// |
| 16 /// Each branch will emit all the same values and errors as the source stream, |
| 17 /// regardless of which values have been emitted on other branches. This means |
| 18 /// that the splitter stores every event that has been emitted so far, which may |
| 19 /// consume a lot of memory. The user can call [close] to indicate that no more |
| 20 /// branches will be created, and this memory will be released. |
| 21 /// |
| 22 /// The source stream is only listened to once a branch is created *and listened |
| 23 /// to*. It's paused when all branches are paused *or when all branches are |
| 24 /// canceled*, and resumed once there's at least one branch that's listening and |
| 25 /// unpaused. It's not canceled unless no branches are listening and [close] has |
| 26 /// been called. |
| 27 class StreamSplitter<T> { |
| 28 /// The wrapped stream. |
| 29 final Stream<T> _stream; |
| 30 |
| 31 /// The subscription to [_stream]. |
| 32 /// |
| 33 /// This will be `null` until a branch has a listener. |
| 34 StreamSubscription<T> _subscription; |
| 35 |
| 36 /// The buffer of events or errors that have already been emitted by |
| 37 /// [_stream]. |
| 38 final _buffer = new List<Result<T>>(); |
| 39 |
| 40 /// The controllers for branches that are listening for future events from |
| 41 /// [_stream]. |
| 42 /// |
| 43 /// Once a branch is canceled, it's removed from this list. When [_stream] is |
| 44 /// done, all branches are removed. |
| 45 final _controllers = new Set<StreamController<T>>(); |
| 46 |
| 47 /// A group of futures returned by [close]. |
| 48 /// |
| 49 /// This is used to ensure that [close] doesn't complete until all |
| 50 /// [StreamController.close] and [StreamSubscription.cancel] calls complete. |
| 51 final _closeGroup = new FutureGroup(); |
| 52 |
| 53 /// Whether [_stream] is done emitting events. |
| 54 var _isDone = false; |
| 55 |
| 56 /// Whether [close] has been called. |
| 57 var _isClosed = false; |
| 58 |
| 59 /// Splits [stream] into [count] identical streams. |
| 60 /// |
| 61 /// [count] defaults to 2. This is the same as creating [count] branches and |
| 62 /// then closing the [StreamSplitter]. |
| 63 static List<Stream> splitFrom(Stream stream, [int count]) { |
| 64 if (count == null) count = 2; |
| 65 var splitter = new StreamSplitter(stream); |
| 66 var streams = new List.generate(count, (_) => splitter.split()); |
| 67 splitter.close(); |
| 68 return streams; |
| 69 } |
| 70 |
| 71 StreamSplitter(this._stream); |
| 72 |
| 73 /// Returns a single-subscription stream that's a copy of the input stream. |
| 74 /// |
| 75 /// This will throw a [StateError] if [close] has been called. |
| 76 Stream<T> split() { |
| 77 if (_isClosed) { |
| 78 throw new StateError("Can't call split() on a closed StreamSplitter."); |
| 79 } |
| 80 |
| 81 var controller; |
| 82 controller = new StreamController<T>( |
| 83 onListen: _onListen, |
| 84 onPause: _onPause, |
| 85 onResume: _onResume, |
| 86 onCancel: () => _onCancel(controller)); |
| 87 |
| 88 for (var result in _buffer) { |
| 89 result.addTo(controller); |
| 90 } |
| 91 |
| 92 if (_isDone) { |
| 93 _closeGroup.add(controller.close()); |
| 94 } else { |
| 95 _controllers.add(controller); |
| 96 } |
| 97 |
| 98 return controller.stream; |
| 99 } |
| 100 |
| 101 /// Indicates that no more branches will be requested via [split]. |
| 102 /// |
| 103 /// This clears the internal buffer of events. If there are no branches or all |
| 104 /// branches have been canceled, this cancels the subscription to the input |
| 105 /// stream. |
| 106 /// |
| 107 /// Returns a [Future] that completes once all events have been processed by |
| 108 /// all branches and (if applicable) the subscription to the input stream has |
| 109 /// been canceled. |
| 110 Future close() { |
| 111 if (_isClosed) return _closeGroup.future; |
| 112 _isClosed = true; |
| 113 |
| 114 _buffer.clear(); |
| 115 if (_controllers.isEmpty) _cancelSubscription(); |
| 116 |
| 117 return _closeGroup.future; |
| 118 } |
| 119 |
| 120 /// Cancel [_subscription] and close [_closeGroup]. |
| 121 /// |
| 122 /// This should be called after all the branches' subscriptions have been |
| 123 /// canceled and the splitter has been closed. In that case, we won't use the |
| 124 /// events from [_subscription] any more, since there's nothing to pipe them |
| 125 /// to and no more branches will be created. If [_subscription] is done, |
| 126 /// canceling it will be a no-op. |
| 127 /// |
| 128 /// This may also be called before any branches have been created, in which |
| 129 /// case [_subscription] will be `null`. |
| 130 void _cancelSubscription() { |
| 131 assert(_controllers.isEmpty); |
| 132 assert(_isClosed); |
| 133 |
| 134 var future = null; |
| 135 if (_subscription != null) future = _subscription.cancel(); |
| 136 if (future != null) _closeGroup.add(future); |
| 137 _closeGroup.close(); |
| 138 } |
| 139 |
| 140 // StreamController events |
| 141 |
| 142 /// Subscribe to [_stream] if we haven't yet done so, and resume the |
| 143 /// subscription if we have. |
| 144 void _onListen() { |
| 145 if (_isDone) return; |
| 146 |
| 147 if (_subscription != null) { |
| 148 // Resume the subscription in case it was paused, either because all the |
| 149 // controllers were paused or because the last one was canceled. If it |
| 150 // wasn't paused, this will be a no-op. |
| 151 _subscription.resume(); |
| 152 } else { |
| 153 _subscription = _stream.listen( |
| 154 _onData, onError: _onError, onDone: _onDone); |
| 155 } |
| 156 } |
| 157 |
| 158 /// Pauses [_subscription] if every controller is paused. |
| 159 void _onPause() { |
| 160 if (!_controllers.every((controller) => controller.isPaused)) return; |
| 161 _subscription.pause(); |
| 162 } |
| 163 |
| 164 /// Resumes [_subscription]. |
| 165 /// |
| 166 /// If [_subscription] wasn't paused, this is a no-op. |
| 167 void _onResume() { |
| 168 _subscription.resume(); |
| 169 } |
| 170 |
| 171 /// Removes [controller] from [_controllers] and cancels or pauses |
| 172 /// [_subscription] as appropriate. |
| 173 /// |
| 174 /// Since the controller emitting a done event will cause it to register as |
| 175 /// canceled, this is the only way that a controller is ever removed from |
| 176 /// [_controllers]. |
| 177 void _onCancel(StreamController controller) { |
| 178 _controllers.remove(controller); |
| 179 if (_controllers.isNotEmpty) return; |
| 180 |
| 181 if (_isClosed) { |
| 182 _cancelSubscription(); |
| 183 } else { |
| 184 _subscription.pause(); |
| 185 } |
| 186 } |
| 187 |
| 188 // Stream events |
| 189 |
| 190 /// Buffers [data] and passes it to [_controllers]. |
| 191 void _onData(T data) { |
| 192 if (!_isClosed) _buffer.add(new Result.value(data)); |
| 193 for (var controller in _controllers) { |
| 194 controller.add(data); |
| 195 } |
| 196 } |
| 197 |
| 198 /// Buffers [error] and passes it to [_controllers]. |
| 199 void _onError(Object error, StackTrace stackTrace) { |
| 200 if (!_isClosed) _buffer.add(new Result.error(error, stackTrace)); |
| 201 for (var controller in _controllers) { |
| 202 controller.addError(error, stackTrace); |
| 203 } |
| 204 } |
| 205 |
| 206 /// Marks [_controllers] as done. |
| 207 void _onDone() { |
| 208 _isDone = true; |
| 209 for (var controller in _controllers) { |
| 210 _closeGroup.add(controller.close()); |
| 211 } |
| 212 } |
| 213 } |
OLD | NEW |