Index: lib/src/subscription_fork.dart |
diff --git a/lib/src/subscription_fork.dart b/lib/src/subscription_fork.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6eef1271e3de23b8f6e0ad53fa52400ec83f5a99 |
--- /dev/null |
+++ b/lib/src/subscription_fork.dart |
@@ -0,0 +1,132 @@ |
+library async.subcsription_fork; |
+ |
+import "dart:async"; |
+import "package:async/async.dart" show DelegatingStreamSubscription; |
+ |
+/// Allows creating forks of a single [StreamSubscription]. |
+/// |
+/// The forks are stream subscriptions that will provide the same events |
+/// as the original subscription from the time where the fork was created. |
+/// |
+/// The original subscription is canceled when all forks are canceled, after the |
+/// first fork has been created, and it's paused when all forks are paused. |
+class SubscriptionForker { |
+ final StreamSubscription _source; |
+ final Set<_ForkedStreamSubscription> _subscriptions = new Set(); |
+ bool _isClosed; |
+ |
+ SubscriptionForker(StreamSubscription subscription) : _source = subscription { |
+ _source.pause(); |
+ _source.onData(_onData); |
+ _source.onError(_onError); |
+ _source.onDone(_onDone); |
+ } |
+ |
+ void _onData(T data) { |
+ for (var subscription in _subscriptions.toList(growable: false)) { |
+ subscription._controller.add(data); |
+ } |
+ } |
+ |
+ void _onError(error, StackTrace stackTrace) { |
+ for (var subscription in _subscriptions.toList(growable: false)) { |
+ subscription._controller.addError(error, stackTrace); |
+ } |
+ } |
+ |
+ void _onDone() { |
+ _isClosed = true; |
+ for (var subscription in _subscriptions.toList(growable: false)) { |
+ subscription._controller.close(); |
+ } |
+ } |
+ |
+ Future _cancel(_ForkedStreamSubscription subscription) { |
+ _subscriptions.remove(subscription); |
+ if (_subscriptions.isEmpty) { |
+ _isClosed = true; |
+ return _source.cancel(); |
+ } else if (!_source.isPaused) { |
+ _checkPause(); |
+ } |
+ } |
+ |
+ void _pause(_ForkedStreamSubscription subscription) { |
+ _checkPause(); |
+ } |
+ |
+ void _checkPause() { |
+ if (_subscriptions.every(_isSubscriptionPaused)) { |
+ _source.pause(); |
+ } |
+ } |
+ |
+ void _resume(_ForkedStreamSubscription subscription) { |
+ _source.resume(); |
+ } |
+ |
+ static bool _isSubscriptionPaused(_ForkedStreamSubscription forked) { |
+ return forked._controller.isPaused; |
+ } |
+ |
+ /// Create a new fork of the source subscription. |
+ StreamSubscription<T> fork() { |
+ if (_isClosed) return new StreamSubscription<T>.empty(); |
+ var subscription = new _ForkedStreamSubscription<T>(this); |
+ _subscriptions.add(subscription); |
+ if (_source.isPaused) _source.resume(); |
+ return subscription; |
+ } |
+ } |
+ |
+/// A forkable stream subscription. |
+/// |
+/// A stream subscription with [fork] method. |
+/// |
+class ForkableSubscription<T> extends DelegatingStreamSubscription<T> { |
+ final SubscriptionForker<T> _forker; |
+ |
+ /// Create a forkable subscription from [subscription]. |
+ /// |
+ /// This creates a [SubscriptionForker] and returns a subscription backed |
+ /// by one fork of this subscription forker, along with a [fork] method |
+ /// which allows creating more. |
+ ForkableSubscription(StreamSubscription<T> subscription) |
+ : this._(subscription is _ForkedStreamSubscription |
+ ? subscription._forker |
+ : new SubscriptionForker<T>(subscription)); |
+ |
+ ForkableSubscription._(SubscriptionForker<T> forker) |
+ : _forker = forker, super(forker.fork()); |
+ |
+ StreamSubscription<T> fork() => _forker.fork(); |
+} |
+ |
+class _ForkedStreamSubscription<T> extends DelegatingStreamSubscription<T> { |
+ final SubscriptionForker _forker; |
+ final StreamController _controller; |
+ factory _ForkedStreamSubscription(SubscriptionForker<T> forker) { |
+ var instance; |
+ var onChange = () => instance._onChange(); |
+ var controller = new StreamController(sync: true, |
+ onPause: onChange, |
+ onResume: onChange, |
+ onCancel: onChange); |
+ instance = new _ForkedStreamSubscription._(forker, controller); |
+ return instance; |
+ } |
+ |
+ _ForkedStreamSubscription._(this._forker, controller) |
+ : _controller = controller, super(controller.stream.listen(null)); |
+ |
+ _onChange() { |
+ if (!_controller.hasListener) { |
+ return _forker._cancel(this); |
+ } else if (_controller.isPaused) { |
+ _forker._pause(this); |
+ } else { |
+ _forker._resume(this); |
+ } |
+ return null; |
+ } |
+} |