| Index: packages/async/lib/src/stream_splitter.dart
|
| diff --git a/packages/async/lib/src/stream_splitter.dart b/packages/async/lib/src/stream_splitter.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6ec440f5dd8e5c2c83754e73020f8f34a3e1852d
|
| --- /dev/null
|
| +++ b/packages/async/lib/src/stream_splitter.dart
|
| @@ -0,0 +1,213 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library async.stream_splitter;
|
| +
|
| +import 'dart:async';
|
| +import 'dart:collection';
|
| +
|
| +import '../result.dart';
|
| +import 'future_group.dart';
|
| +
|
| +/// A class that splits a single source stream into an arbitrary number of
|
| +/// (single-subscription) streams (called "branch") that emit the same events.
|
| +///
|
| +/// Each branch will emit all the same values and errors as the source stream,
|
| +/// regardless of which values have been emitted on other branches. This means
|
| +/// that the splitter stores every event that has been emitted so far, which may
|
| +/// consume a lot of memory. The user can call [close] to indicate that no more
|
| +/// branches will be created, and this memory will be released.
|
| +///
|
| +/// The source stream is only listened to once a branch is created *and listened
|
| +/// to*. It's paused when all branches are paused *or when all branches are
|
| +/// canceled*, and resumed once there's at least one branch that's listening and
|
| +/// unpaused. It's not canceled unless no branches are listening and [close] has
|
| +/// been called.
|
| +class StreamSplitter<T> {
|
| + /// The wrapped stream.
|
| + final Stream<T> _stream;
|
| +
|
| + /// The subscription to [_stream].
|
| + ///
|
| + /// This will be `null` until a branch has a listener.
|
| + StreamSubscription<T> _subscription;
|
| +
|
| + /// The buffer of events or errors that have already been emitted by
|
| + /// [_stream].
|
| + final _buffer = new List<Result<T>>();
|
| +
|
| + /// The controllers for branches that are listening for future events from
|
| + /// [_stream].
|
| + ///
|
| + /// Once a branch is canceled, it's removed from this list. When [_stream] is
|
| + /// done, all branches are removed.
|
| + final _controllers = new Set<StreamController<T>>();
|
| +
|
| + /// A group of futures returned by [close].
|
| + ///
|
| + /// This is used to ensure that [close] doesn't complete until all
|
| + /// [StreamController.close] and [StreamSubscription.cancel] calls complete.
|
| + final _closeGroup = new FutureGroup();
|
| +
|
| + /// Whether [_stream] is done emitting events.
|
| + var _isDone = false;
|
| +
|
| + /// Whether [close] has been called.
|
| + var _isClosed = false;
|
| +
|
| + /// Splits [stream] into [count] identical streams.
|
| + ///
|
| + /// [count] defaults to 2. This is the same as creating [count] branches and
|
| + /// then closing the [StreamSplitter].
|
| + static List<Stream> splitFrom(Stream stream, [int count]) {
|
| + if (count == null) count = 2;
|
| + var splitter = new StreamSplitter(stream);
|
| + var streams = new List.generate(count, (_) => splitter.split());
|
| + splitter.close();
|
| + return streams;
|
| + }
|
| +
|
| + StreamSplitter(this._stream);
|
| +
|
| + /// Returns a single-subscription stream that's a copy of the input stream.
|
| + ///
|
| + /// This will throw a [StateError] if [close] has been called.
|
| + Stream<T> split() {
|
| + if (_isClosed) {
|
| + throw new StateError("Can't call split() on a closed StreamSplitter.");
|
| + }
|
| +
|
| + var controller;
|
| + controller = new StreamController<T>(
|
| + onListen: _onListen,
|
| + onPause: _onPause,
|
| + onResume: _onResume,
|
| + onCancel: () => _onCancel(controller));
|
| +
|
| + for (var result in _buffer) {
|
| + result.addTo(controller);
|
| + }
|
| +
|
| + if (_isDone) {
|
| + _closeGroup.add(controller.close());
|
| + } else {
|
| + _controllers.add(controller);
|
| + }
|
| +
|
| + return controller.stream;
|
| + }
|
| +
|
| + /// Indicates that no more branches will be requested via [split].
|
| + ///
|
| + /// This clears the internal buffer of events. If there are no branches or all
|
| + /// branches have been canceled, this cancels the subscription to the input
|
| + /// stream.
|
| + ///
|
| + /// Returns a [Future] that completes once all events have been processed by
|
| + /// all branches and (if applicable) the subscription to the input stream has
|
| + /// been canceled.
|
| + Future close() {
|
| + if (_isClosed) return _closeGroup.future;
|
| + _isClosed = true;
|
| +
|
| + _buffer.clear();
|
| + if (_controllers.isEmpty) _cancelSubscription();
|
| +
|
| + return _closeGroup.future;
|
| + }
|
| +
|
| + /// Cancel [_subscription] and close [_closeGroup].
|
| + ///
|
| + /// This should be called after all the branches' subscriptions have been
|
| + /// canceled and the splitter has been closed. In that case, we won't use the
|
| + /// events from [_subscription] any more, since there's nothing to pipe them
|
| + /// to and no more branches will be created. If [_subscription] is done,
|
| + /// canceling it will be a no-op.
|
| + ///
|
| + /// This may also be called before any branches have been created, in which
|
| + /// case [_subscription] will be `null`.
|
| + void _cancelSubscription() {
|
| + assert(_controllers.isEmpty);
|
| + assert(_isClosed);
|
| +
|
| + var future = null;
|
| + if (_subscription != null) future = _subscription.cancel();
|
| + if (future != null) _closeGroup.add(future);
|
| + _closeGroup.close();
|
| + }
|
| +
|
| + // StreamController events
|
| +
|
| + /// Subscribe to [_stream] if we haven't yet done so, and resume the
|
| + /// subscription if we have.
|
| + void _onListen() {
|
| + if (_isDone) return;
|
| +
|
| + if (_subscription != null) {
|
| + // Resume the subscription in case it was paused, either because all the
|
| + // controllers were paused or because the last one was canceled. If it
|
| + // wasn't paused, this will be a no-op.
|
| + _subscription.resume();
|
| + } else {
|
| + _subscription = _stream.listen(
|
| + _onData, onError: _onError, onDone: _onDone);
|
| + }
|
| + }
|
| +
|
| + /// Pauses [_subscription] if every controller is paused.
|
| + void _onPause() {
|
| + if (!_controllers.every((controller) => controller.isPaused)) return;
|
| + _subscription.pause();
|
| + }
|
| +
|
| + /// Resumes [_subscription].
|
| + ///
|
| + /// If [_subscription] wasn't paused, this is a no-op.
|
| + void _onResume() {
|
| + _subscription.resume();
|
| + }
|
| +
|
| + /// Removes [controller] from [_controllers] and cancels or pauses
|
| + /// [_subscription] as appropriate.
|
| + ///
|
| + /// Since the controller emitting a done event will cause it to register as
|
| + /// canceled, this is the only way that a controller is ever removed from
|
| + /// [_controllers].
|
| + void _onCancel(StreamController controller) {
|
| + _controllers.remove(controller);
|
| + if (_controllers.isNotEmpty) return;
|
| +
|
| + if (_isClosed) {
|
| + _cancelSubscription();
|
| + } else {
|
| + _subscription.pause();
|
| + }
|
| + }
|
| +
|
| + // Stream events
|
| +
|
| + /// Buffers [data] and passes it to [_controllers].
|
| + void _onData(T data) {
|
| + if (!_isClosed) _buffer.add(new Result.value(data));
|
| + for (var controller in _controllers) {
|
| + controller.add(data);
|
| + }
|
| + }
|
| +
|
| + /// Buffers [error] and passes it to [_controllers].
|
| + void _onError(Object error, StackTrace stackTrace) {
|
| + if (!_isClosed) _buffer.add(new Result.error(error, stackTrace));
|
| + for (var controller in _controllers) {
|
| + controller.addError(error, stackTrace);
|
| + }
|
| + }
|
| +
|
| + /// Marks [_controllers] as done.
|
| + void _onDone() {
|
| + _isDone = true;
|
| + for (var controller in _controllers) {
|
| + _closeGroup.add(controller.close());
|
| + }
|
| + }
|
| +}
|
|
|