| Index: lib/src/util/forkable_stream.dart
|
| diff --git a/lib/src/util/forkable_stream.dart b/lib/src/util/forkable_stream.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..033d49821e860782293a15588c63506780b8991d
|
| --- /dev/null
|
| +++ b/lib/src/util/forkable_stream.dart
|
| @@ -0,0 +1,168 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
|
| +// lands.
|
| +library test.util.forkable_stream;
|
| +
|
| +import 'dart:async';
|
| +
|
| +import 'package:async/async.dart' hide ForkableStream;
|
| +
|
| +/// A single-subscription stream from which other streams may be forked off at
|
| +/// the current position.
|
| +///
|
| +/// This adds an operation, [fork], which produces a new stream that
|
| +/// independently emits the same events as this stream. Unlike the branches
|
| +/// produced by [StreamSplitter], a fork only emits events that arrive *after*
|
| +/// the call to [fork].
|
| +///
|
| +/// Each fork can be paused or canceled independently of one another and of this
|
| +/// stream. The underlying stream will be listened to once any branch is
|
| +/// listened to. It will be paused when all branches are paused or not yet
|
| +/// listened to. It will be canceled when all branches have been listened to and
|
| +/// then canceled.
|
| +class ForkableStream<T> extends StreamView<T> {
|
| + /// The underlying stream.
|
| + final Stream _sourceStream;
|
| +
|
| + /// The subscription to [_sourceStream].
|
| + ///
|
| + /// This will be `null` until this stream or any of its forks are listened to.
|
| + StreamSubscription _subscription;
|
| +
|
| + /// Whether this has been cancelled and no more forks may be created.
|
| + bool _isCanceled = false;
|
| +
|
| + /// The controllers for any branches that have not yet been canceled.
|
| + ///
|
| + /// This includes a controller for this stream, until that has been cancelled.
|
| + final _controllers = new Set<StreamController<T>>();
|
| +
|
| + /// Creates a new forkable stream wrapping [sourceStream].
|
| + ForkableStream(Stream sourceStream)
|
| + // Use a completer here so that we can provide its stream to the
|
| + // superclass constructor while also adding the stream controller to
|
| + // [_controllers].
|
| + : this._(sourceStream, new StreamCompleter());
|
| +
|
| + ForkableStream._(this._sourceStream, StreamCompleter completer)
|
| + : super(completer.stream) {
|
| + completer.setSourceStream(_fork(primary: true));
|
| + }
|
| +
|
| + /// Creates a new fork of this stream.
|
| + ///
|
| + /// From this point forward, the fork will emit the same events as this
|
| + /// stream. It will *not* emit any events that have already been emitted by
|
| + /// this stream. The fork is independent of this stream, which means each one
|
| + /// may be paused or canceled without affecting the other.
|
| + ///
|
| + /// If this stream is done or its subscription has been canceled, this returns
|
| + /// an empty stream.
|
| + Stream<T> fork() => _fork(primary: false);
|
| +
|
| + /// Creates a stream forwarding [_sourceStream].
|
| + ///
|
| + /// If [primary] is true, this is the stream underlying this object;
|
| + /// otherwise, it's a fork. The only difference is that when the primary
|
| + /// stream is canceled, [fork] starts throwing [StateError]s.
|
| + Stream<T> _fork({bool primary: false}) {
|
| + if (_isCanceled) {
|
| + var controller = new StreamController<T>()..close();
|
| + return controller.stream;
|
| + }
|
| +
|
| + var controller;
|
| + controller = new StreamController<T>(
|
| + onListen: () => _onListenOrResume(controller),
|
| + onCancel: () => _onCancel(controller, primary: primary),
|
| + onPause: () => _onPause(controller),
|
| + onResume: () => _onListenOrResume(controller),
|
| + sync: true);
|
| +
|
| + _controllers.add(controller);
|
| +
|
| + return controller.stream;
|
| + }
|
| +
|
| + /// The callback called when `onListen` or `onResume` is called for the branch
|
| + /// managed by [controller].
|
| + ///
|
| + /// This ensures that we're subscribed to [_sourceStream] and that the
|
| + /// subscription isn't paused.
|
| + void _onListenOrResume(StreamController<T> controller) {
|
| + if (controller.isClosed) return;
|
| + if (_subscription == null) {
|
| + _subscription =
|
| + _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
|
| + } else {
|
| + _subscription.resume();
|
| + }
|
| + }
|
| +
|
| + /// The callback called when `onCancel` is called for the branch managed by
|
| + /// [controller].
|
| + ///
|
| + /// This cancels or pauses the underlying subscription as necessary. If
|
| + /// [primary] is true, it also ensures that future calls to [fork] throw
|
| + /// [StateError]s.
|
| + Future _onCancel(StreamController<T> controller, {bool primary: false}) {
|
| + if (primary) _isCanceled = true;
|
| +
|
| + if (controller.isClosed) return null;
|
| + _controllers.remove(controller);
|
| +
|
| + if (_controllers.isEmpty) return _subscription.cancel();
|
| +
|
| + _onPause(controller);
|
| + return null;
|
| + }
|
| +
|
| + /// The callback called when `onPause` is called for the branch managed by
|
| + /// [controller].
|
| + ///
|
| + /// This pauses the underlying subscription if necessary.
|
| + void _onPause(StreamController<T> controller) {
|
| + if (controller.isClosed) return;
|
| + if (_subscription.isPaused) return;
|
| + if (_controllers.any((controller) =>
|
| + controller.hasListener && !controller.isPaused)) {
|
| + return;
|
| + }
|
| +
|
| + _subscription.pause();
|
| + }
|
| +
|
| + /// Forwards data events to all branches.
|
| + void _onData(value) {
|
| + // Don't iterate directly over the set because [controller.add] might cause
|
| + // it to be modified synchronously.
|
| + for (var controller in _controllers.toList()) {
|
| + controller.add(value);
|
| + }
|
| + }
|
| +
|
| + /// Forwards error events to all branches.
|
| + void _onError(error, StackTrace stackTrace) {
|
| + // Don't iterate directly over the set because [controller.addError] might
|
| + // cause it to be modified synchronously.
|
| + for (var controller in _controllers.toList()) {
|
| + controller.addError(error, stackTrace);
|
| + }
|
| + }
|
| +
|
| + /// Forwards close events to all branches.
|
| + void _onDone() {
|
| + _isCanceled = true;
|
| +
|
| + // Don't iterate directly over the set because [controller.close] might
|
| + // cause it to be modified synchronously.
|
| + for (var controller in _controllers.toList()) {
|
| + controller.close();
|
| + }
|
| + _controllers.clear();
|
| + }
|
| +}
|
| +
|
|
|