Index: packages/async/lib/src/stream_splitter.dart |
diff --git a/packages/async/lib/src/stream_splitter.dart b/packages/async/lib/src/stream_splitter.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6ec440f5dd8e5c2c83754e73020f8f34a3e1852d |
--- /dev/null |
+++ b/packages/async/lib/src/stream_splitter.dart |
@@ -0,0 +1,213 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+library async.stream_splitter; |
+ |
+import 'dart:async'; |
+import 'dart:collection'; |
+ |
+import '../result.dart'; |
+import 'future_group.dart'; |
+ |
+/// A class that splits a single source stream into an arbitrary number of |
+/// (single-subscription) streams (called "branch") that emit the same events. |
+/// |
+/// Each branch will emit all the same values and errors as the source stream, |
+/// regardless of which values have been emitted on other branches. This means |
+/// that the splitter stores every event that has been emitted so far, which may |
+/// consume a lot of memory. The user can call [close] to indicate that no more |
+/// branches will be created, and this memory will be released. |
+/// |
+/// The source stream is only listened to once a branch is created *and listened |
+/// to*. It's paused when all branches are paused *or when all branches are |
+/// canceled*, and resumed once there's at least one branch that's listening and |
+/// unpaused. It's not canceled unless no branches are listening and [close] has |
+/// been called. |
+class StreamSplitter<T> { |
+ /// The wrapped stream. |
+ final Stream<T> _stream; |
+ |
+ /// The subscription to [_stream]. |
+ /// |
+ /// This will be `null` until a branch has a listener. |
+ StreamSubscription<T> _subscription; |
+ |
+ /// The buffer of events or errors that have already been emitted by |
+ /// [_stream]. |
+ final _buffer = new List<Result<T>>(); |
+ |
+ /// The controllers for branches that are listening for future events from |
+ /// [_stream]. |
+ /// |
+ /// Once a branch is canceled, it's removed from this list. When [_stream] is |
+ /// done, all branches are removed. |
+ final _controllers = new Set<StreamController<T>>(); |
+ |
+ /// A group of futures returned by [close]. |
+ /// |
+ /// This is used to ensure that [close] doesn't complete until all |
+ /// [StreamController.close] and [StreamSubscription.cancel] calls complete. |
+ final _closeGroup = new FutureGroup(); |
+ |
+ /// Whether [_stream] is done emitting events. |
+ var _isDone = false; |
+ |
+ /// Whether [close] has been called. |
+ var _isClosed = false; |
+ |
+ /// Splits [stream] into [count] identical streams. |
+ /// |
+ /// [count] defaults to 2. This is the same as creating [count] branches and |
+ /// then closing the [StreamSplitter]. |
+ static List<Stream> splitFrom(Stream stream, [int count]) { |
+ if (count == null) count = 2; |
+ var splitter = new StreamSplitter(stream); |
+ var streams = new List.generate(count, (_) => splitter.split()); |
+ splitter.close(); |
+ return streams; |
+ } |
+ |
+ StreamSplitter(this._stream); |
+ |
+ /// Returns a single-subscription stream that's a copy of the input stream. |
+ /// |
+ /// This will throw a [StateError] if [close] has been called. |
+ Stream<T> split() { |
+ if (_isClosed) { |
+ throw new StateError("Can't call split() on a closed StreamSplitter."); |
+ } |
+ |
+ var controller; |
+ controller = new StreamController<T>( |
+ onListen: _onListen, |
+ onPause: _onPause, |
+ onResume: _onResume, |
+ onCancel: () => _onCancel(controller)); |
+ |
+ for (var result in _buffer) { |
+ result.addTo(controller); |
+ } |
+ |
+ if (_isDone) { |
+ _closeGroup.add(controller.close()); |
+ } else { |
+ _controllers.add(controller); |
+ } |
+ |
+ return controller.stream; |
+ } |
+ |
+ /// Indicates that no more branches will be requested via [split]. |
+ /// |
+ /// This clears the internal buffer of events. If there are no branches or all |
+ /// branches have been canceled, this cancels the subscription to the input |
+ /// stream. |
+ /// |
+ /// Returns a [Future] that completes once all events have been processed by |
+ /// all branches and (if applicable) the subscription to the input stream has |
+ /// been canceled. |
+ Future close() { |
+ if (_isClosed) return _closeGroup.future; |
+ _isClosed = true; |
+ |
+ _buffer.clear(); |
+ if (_controllers.isEmpty) _cancelSubscription(); |
+ |
+ return _closeGroup.future; |
+ } |
+ |
+ /// Cancel [_subscription] and close [_closeGroup]. |
+ /// |
+ /// This should be called after all the branches' subscriptions have been |
+ /// canceled and the splitter has been closed. In that case, we won't use the |
+ /// events from [_subscription] any more, since there's nothing to pipe them |
+ /// to and no more branches will be created. If [_subscription] is done, |
+ /// canceling it will be a no-op. |
+ /// |
+ /// This may also be called before any branches have been created, in which |
+ /// case [_subscription] will be `null`. |
+ void _cancelSubscription() { |
+ assert(_controllers.isEmpty); |
+ assert(_isClosed); |
+ |
+ var future = null; |
+ if (_subscription != null) future = _subscription.cancel(); |
+ if (future != null) _closeGroup.add(future); |
+ _closeGroup.close(); |
+ } |
+ |
+ // StreamController events |
+ |
+ /// Subscribe to [_stream] if we haven't yet done so, and resume the |
+ /// subscription if we have. |
+ void _onListen() { |
+ if (_isDone) return; |
+ |
+ if (_subscription != null) { |
+ // Resume the subscription in case it was paused, either because all the |
+ // controllers were paused or because the last one was canceled. If it |
+ // wasn't paused, this will be a no-op. |
+ _subscription.resume(); |
+ } else { |
+ _subscription = _stream.listen( |
+ _onData, onError: _onError, onDone: _onDone); |
+ } |
+ } |
+ |
+ /// Pauses [_subscription] if every controller is paused. |
+ void _onPause() { |
+ if (!_controllers.every((controller) => controller.isPaused)) return; |
+ _subscription.pause(); |
+ } |
+ |
+ /// Resumes [_subscription]. |
+ /// |
+ /// If [_subscription] wasn't paused, this is a no-op. |
+ void _onResume() { |
+ _subscription.resume(); |
+ } |
+ |
+ /// Removes [controller] from [_controllers] and cancels or pauses |
+ /// [_subscription] as appropriate. |
+ /// |
+ /// Since the controller emitting a done event will cause it to register as |
+ /// canceled, this is the only way that a controller is ever removed from |
+ /// [_controllers]. |
+ void _onCancel(StreamController controller) { |
+ _controllers.remove(controller); |
+ if (_controllers.isNotEmpty) return; |
+ |
+ if (_isClosed) { |
+ _cancelSubscription(); |
+ } else { |
+ _subscription.pause(); |
+ } |
+ } |
+ |
+ // Stream events |
+ |
+ /// Buffers [data] and passes it to [_controllers]. |
+ void _onData(T data) { |
+ if (!_isClosed) _buffer.add(new Result.value(data)); |
+ for (var controller in _controllers) { |
+ controller.add(data); |
+ } |
+ } |
+ |
+ /// Buffers [error] and passes it to [_controllers]. |
+ void _onError(Object error, StackTrace stackTrace) { |
+ if (!_isClosed) _buffer.add(new Result.error(error, stackTrace)); |
+ for (var controller in _controllers) { |
+ controller.addError(error, stackTrace); |
+ } |
+ } |
+ |
+ /// Marks [_controllers] as done. |
+ void _onDone() { |
+ _isDone = true; |
+ for (var controller in _controllers) { |
+ _closeGroup.add(controller.close()); |
+ } |
+ } |
+} |