| Index: lib/src/subscription_fork.dart
|
| diff --git a/lib/src/subscription_fork.dart b/lib/src/subscription_fork.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6eef1271e3de23b8f6e0ad53fa52400ec83f5a99
|
| --- /dev/null
|
| +++ b/lib/src/subscription_fork.dart
|
| @@ -0,0 +1,132 @@
|
| +library async.subcsription_fork;
|
| +
|
| +import "dart:async";
|
| +import "package:async/async.dart" show DelegatingStreamSubscription;
|
| +
|
| +/// Allows creating forks of a single [StreamSubscription].
|
| +///
|
| +/// The forks are stream subscriptions that will provide the same events
|
| +/// as the original subscription from the time where the fork was created.
|
| +///
|
| +/// The original subscription is canceled when all forks are canceled, after the
|
| +/// first fork has been created, and it's paused when all forks are paused.
|
| +class SubscriptionForker {
|
| + final StreamSubscription _source;
|
| + final Set<_ForkedStreamSubscription> _subscriptions = new Set();
|
| + bool _isClosed;
|
| +
|
| + SubscriptionForker(StreamSubscription subscription) : _source = subscription {
|
| + _source.pause();
|
| + _source.onData(_onData);
|
| + _source.onError(_onError);
|
| + _source.onDone(_onDone);
|
| + }
|
| +
|
| + void _onData(T data) {
|
| + for (var subscription in _subscriptions.toList(growable: false)) {
|
| + subscription._controller.add(data);
|
| + }
|
| + }
|
| +
|
| + void _onError(error, StackTrace stackTrace) {
|
| + for (var subscription in _subscriptions.toList(growable: false)) {
|
| + subscription._controller.addError(error, stackTrace);
|
| + }
|
| + }
|
| +
|
| + void _onDone() {
|
| + _isClosed = true;
|
| + for (var subscription in _subscriptions.toList(growable: false)) {
|
| + subscription._controller.close();
|
| + }
|
| + }
|
| +
|
| + Future _cancel(_ForkedStreamSubscription subscription) {
|
| + _subscriptions.remove(subscription);
|
| + if (_subscriptions.isEmpty) {
|
| + _isClosed = true;
|
| + return _source.cancel();
|
| + } else if (!_source.isPaused) {
|
| + _checkPause();
|
| + }
|
| + }
|
| +
|
| + void _pause(_ForkedStreamSubscription subscription) {
|
| + _checkPause();
|
| + }
|
| +
|
| + void _checkPause() {
|
| + if (_subscriptions.every(_isSubscriptionPaused)) {
|
| + _source.pause();
|
| + }
|
| + }
|
| +
|
| + void _resume(_ForkedStreamSubscription subscription) {
|
| + _source.resume();
|
| + }
|
| +
|
| + static bool _isSubscriptionPaused(_ForkedStreamSubscription forked) {
|
| + return forked._controller.isPaused;
|
| + }
|
| +
|
| + /// Create a new fork of the source subscription.
|
| + StreamSubscription<T> fork() {
|
| + if (_isClosed) return new StreamSubscription<T>.empty();
|
| + var subscription = new _ForkedStreamSubscription<T>(this);
|
| + _subscriptions.add(subscription);
|
| + if (_source.isPaused) _source.resume();
|
| + return subscription;
|
| + }
|
| + }
|
| +
|
| +/// A forkable stream subscription.
|
| +///
|
| +/// A stream subscription with [fork] method.
|
| +///
|
| +class ForkableSubscription<T> extends DelegatingStreamSubscription<T> {
|
| + final SubscriptionForker<T> _forker;
|
| +
|
| + /// Create a forkable subscription from [subscription].
|
| + ///
|
| + /// This creates a [SubscriptionForker] and returns a subscription backed
|
| + /// by one fork of this subscription forker, along with a [fork] method
|
| + /// which allows creating more.
|
| + ForkableSubscription(StreamSubscription<T> subscription)
|
| + : this._(subscription is _ForkedStreamSubscription
|
| + ? subscription._forker
|
| + : new SubscriptionForker<T>(subscription));
|
| +
|
| + ForkableSubscription._(SubscriptionForker<T> forker)
|
| + : _forker = forker, super(forker.fork());
|
| +
|
| + StreamSubscription<T> fork() => _forker.fork();
|
| +}
|
| +
|
| +class _ForkedStreamSubscription<T> extends DelegatingStreamSubscription<T> {
|
| + final SubscriptionForker _forker;
|
| + final StreamController _controller;
|
| + factory _ForkedStreamSubscription(SubscriptionForker<T> forker) {
|
| + var instance;
|
| + var onChange = () => instance._onChange();
|
| + var controller = new StreamController(sync: true,
|
| + onPause: onChange,
|
| + onResume: onChange,
|
| + onCancel: onChange);
|
| + instance = new _ForkedStreamSubscription._(forker, controller);
|
| + return instance;
|
| + }
|
| +
|
| + _ForkedStreamSubscription._(this._forker, controller)
|
| + : _controller = controller, super(controller.stream.listen(null));
|
| +
|
| + _onChange() {
|
| + if (!_controller.hasListener) {
|
| + return _forker._cancel(this);
|
| + } else if (_controller.isPaused) {
|
| + _forker._pause(this);
|
| + } else {
|
| + _forker._resume(this);
|
| + }
|
| + return null;
|
| + }
|
| +}
|
|
|