| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_splitter; | 5 import 'dart:async'; |
| 6 | 6 |
| 7 import 'dart:async'; | |
| 8 import 'dart:collection'; | |
| 9 | |
| 10 import '../result.dart'; | |
| 11 import 'future_group.dart'; | 7 import 'future_group.dart'; |
| 8 import 'result.dart'; |
| 12 | 9 |
| 13 /// A class that splits a single source stream into an arbitrary number of | 10 /// A class that splits a single source stream into an arbitrary number of |
| 14 /// (single-subscription) streams (called "branch") that emit the same events. | 11 /// (single-subscription) streams (called "branch") that emit the same events. |
| 15 /// | 12 /// |
| 16 /// Each branch will emit all the same values and errors as the source stream, | 13 /// Each branch will emit all the same values and errors as the source stream, |
| 17 /// regardless of which values have been emitted on other branches. This means | 14 /// regardless of which values have been emitted on other branches. This means |
| 18 /// that the splitter stores every event that has been emitted so far, which may | 15 /// that the splitter stores every event that has been emitted so far, which may |
| 19 /// consume a lot of memory. The user can call [close] to indicate that no more | 16 /// consume a lot of memory. The user can call [close] to indicate that no more |
| 20 /// branches will be created, and this memory will be released. | 17 /// branches will be created, and this memory will be released. |
| 21 /// | 18 /// |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 53 /// Whether [_stream] is done emitting events. | 50 /// Whether [_stream] is done emitting events. |
| 54 var _isDone = false; | 51 var _isDone = false; |
| 55 | 52 |
| 56 /// Whether [close] has been called. | 53 /// Whether [close] has been called. |
| 57 var _isClosed = false; | 54 var _isClosed = false; |
| 58 | 55 |
| 59 /// Splits [stream] into [count] identical streams. | 56 /// Splits [stream] into [count] identical streams. |
| 60 /// | 57 /// |
| 61 /// [count] defaults to 2. This is the same as creating [count] branches and | 58 /// [count] defaults to 2. This is the same as creating [count] branches and |
| 62 /// then closing the [StreamSplitter]. | 59 /// then closing the [StreamSplitter]. |
| 63 static List<Stream> splitFrom(Stream stream, [int count]) { | 60 static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) { |
| 64 if (count == null) count = 2; | 61 if (count == null) count = 2; |
| 65 var splitter = new StreamSplitter(stream); | 62 var splitter = new StreamSplitter<T>(stream); |
| 66 var streams = new List.generate(count, (_) => splitter.split()); | 63 var streams = new List<Stream>.generate(count, (_) => splitter.split()); |
| 67 splitter.close(); | 64 splitter.close(); |
| 68 return streams; | 65 return streams; |
| 69 } | 66 } |
| 70 | 67 |
| 71 StreamSplitter(this._stream); | 68 StreamSplitter(this._stream); |
| 72 | 69 |
| 73 /// Returns a single-subscription stream that's a copy of the input stream. | 70 /// Returns a single-subscription stream that's a copy of the input stream. |
| 74 /// | 71 /// |
| 75 /// This will throw a [StateError] if [close] has been called. | 72 /// This will throw a [StateError] if [close] has been called. |
| 76 Stream<T> split() { | 73 Stream<T> split() { |
| 77 if (_isClosed) { | 74 if (_isClosed) { |
| 78 throw new StateError("Can't call split() on a closed StreamSplitter."); | 75 throw new StateError("Can't call split() on a closed StreamSplitter."); |
| 79 } | 76 } |
| 80 | 77 |
| 81 var controller; | 78 var controller = new StreamController<T>( |
| 82 controller = new StreamController<T>( | 79 onListen: _onListen, onPause: _onPause, onResume: _onResume); |
| 83 onListen: _onListen, | 80 controller.onCancel = () => _onCancel(controller); |
| 84 onPause: _onPause, | |
| 85 onResume: _onResume, | |
| 86 onCancel: () => _onCancel(controller)); | |
| 87 | 81 |
| 88 for (var result in _buffer) { | 82 for (var result in _buffer) { |
| 89 result.addTo(controller); | 83 result.addTo(controller); |
| 90 } | 84 } |
| 91 | 85 |
| 92 if (_isDone) { | 86 if (_isDone) { |
| 93 _closeGroup.add(controller.close()); | 87 _closeGroup.add(controller.close()); |
| 94 } else { | 88 } else { |
| 95 _controllers.add(controller); | 89 _controllers.add(controller); |
| 96 } | 90 } |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 /// subscription if we have. | 137 /// subscription if we have. |
| 144 void _onListen() { | 138 void _onListen() { |
| 145 if (_isDone) return; | 139 if (_isDone) return; |
| 146 | 140 |
| 147 if (_subscription != null) { | 141 if (_subscription != null) { |
| 148 // Resume the subscription in case it was paused, either because all the | 142 // Resume the subscription in case it was paused, either because all the |
| 149 // controllers were paused or because the last one was canceled. If it | 143 // controllers were paused or because the last one was canceled. If it |
| 150 // wasn't paused, this will be a no-op. | 144 // wasn't paused, this will be a no-op. |
| 151 _subscription.resume(); | 145 _subscription.resume(); |
| 152 } else { | 146 } else { |
| 153 _subscription = _stream.listen( | 147 _subscription = |
| 154 _onData, onError: _onError, onDone: _onDone); | 148 _stream.listen(_onData, onError: _onError, onDone: _onDone); |
| 155 } | 149 } |
| 156 } | 150 } |
| 157 | 151 |
| 158 /// Pauses [_subscription] if every controller is paused. | 152 /// Pauses [_subscription] if every controller is paused. |
| 159 void _onPause() { | 153 void _onPause() { |
| 160 if (!_controllers.every((controller) => controller.isPaused)) return; | 154 if (!_controllers.every((controller) => controller.isPaused)) return; |
| 161 _subscription.pause(); | 155 _subscription.pause(); |
| 162 } | 156 } |
| 163 | 157 |
| 164 /// Resumes [_subscription]. | 158 /// Resumes [_subscription]. |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 204 } | 198 } |
| 205 | 199 |
| 206 /// Marks [_controllers] as done. | 200 /// Marks [_controllers] as done. |
| 207 void _onDone() { | 201 void _onDone() { |
| 208 _isDone = true; | 202 _isDone = true; |
| 209 for (var controller in _controllers) { | 203 for (var controller in _controllers) { |
| 210 _closeGroup.add(controller.close()); | 204 _closeGroup.add(controller.close()); |
| 211 } | 205 } |
| 212 } | 206 } |
| 213 } | 207 } |
| OLD | NEW |