Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(618)

Unified Diff: lib/src/subscription_fork.dart

Issue 1238503004: Alternative implementation of StreamQueue.fork which has no impact if it Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « lib/src/stream_queue.dart ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/subscription_fork.dart
diff --git a/lib/src/subscription_fork.dart b/lib/src/subscription_fork.dart
new file mode 100644
index 0000000000000000000000000000000000000000..6eef1271e3de23b8f6e0ad53fa52400ec83f5a99
--- /dev/null
+++ b/lib/src/subscription_fork.dart
@@ -0,0 +1,132 @@
+library async.subcsription_fork;
+
+import "dart:async";
+import "package:async/async.dart" show DelegatingStreamSubscription;
+
+/// Allows creating forks of a single [StreamSubscription].
+///
+/// The forks are stream subscriptions that will provide the same events
+/// as the original subscription from the time where the fork was created.
+///
+/// The original subscription is canceled when all forks are canceled, after the
+/// first fork has been created, and it's paused when all forks are paused.
+class SubscriptionForker {
+ final StreamSubscription _source;
+ final Set<_ForkedStreamSubscription> _subscriptions = new Set();
+ bool _isClosed;
+
+ SubscriptionForker(StreamSubscription subscription) : _source = subscription {
+ _source.pause();
+ _source.onData(_onData);
+ _source.onError(_onError);
+ _source.onDone(_onDone);
+ }
+
+ void _onData(T data) {
+ for (var subscription in _subscriptions.toList(growable: false)) {
+ subscription._controller.add(data);
+ }
+ }
+
+ void _onError(error, StackTrace stackTrace) {
+ for (var subscription in _subscriptions.toList(growable: false)) {
+ subscription._controller.addError(error, stackTrace);
+ }
+ }
+
+ void _onDone() {
+ _isClosed = true;
+ for (var subscription in _subscriptions.toList(growable: false)) {
+ subscription._controller.close();
+ }
+ }
+
+ Future _cancel(_ForkedStreamSubscription subscription) {
+ _subscriptions.remove(subscription);
+ if (_subscriptions.isEmpty) {
+ _isClosed = true;
+ return _source.cancel();
+ } else if (!_source.isPaused) {
+ _checkPause();
+ }
+ }
+
+ void _pause(_ForkedStreamSubscription subscription) {
+ _checkPause();
+ }
+
+ void _checkPause() {
+ if (_subscriptions.every(_isSubscriptionPaused)) {
+ _source.pause();
+ }
+ }
+
+ void _resume(_ForkedStreamSubscription subscription) {
+ _source.resume();
+ }
+
+ static bool _isSubscriptionPaused(_ForkedStreamSubscription forked) {
+ return forked._controller.isPaused;
+ }
+
+ /// Create a new fork of the source subscription.
+ StreamSubscription<T> fork() {
+ if (_isClosed) return new StreamSubscription<T>.empty();
+ var subscription = new _ForkedStreamSubscription<T>(this);
+ _subscriptions.add(subscription);
+ if (_source.isPaused) _source.resume();
+ return subscription;
+ }
+ }
+
+/// A forkable stream subscription.
+///
+/// A stream subscription with [fork] method.
+///
+class ForkableSubscription<T> extends DelegatingStreamSubscription<T> {
+ final SubscriptionForker<T> _forker;
+
+ /// Create a forkable subscription from [subscription].
+ ///
+ /// This creates a [SubscriptionForker] and returns a subscription backed
+ /// by one fork of this subscription forker, along with a [fork] method
+ /// which allows creating more.
+ ForkableSubscription(StreamSubscription<T> subscription)
+ : this._(subscription is _ForkedStreamSubscription
+ ? subscription._forker
+ : new SubscriptionForker<T>(subscription));
+
+ ForkableSubscription._(SubscriptionForker<T> forker)
+ : _forker = forker, super(forker.fork());
+
+ StreamSubscription<T> fork() => _forker.fork();
+}
+
+class _ForkedStreamSubscription<T> extends DelegatingStreamSubscription<T> {
+ final SubscriptionForker _forker;
+ final StreamController _controller;
+ factory _ForkedStreamSubscription(SubscriptionForker<T> forker) {
+ var instance;
+ var onChange = () => instance._onChange();
+ var controller = new StreamController(sync: true,
+ onPause: onChange,
+ onResume: onChange,
+ onCancel: onChange);
+ instance = new _ForkedStreamSubscription._(forker, controller);
+ return instance;
+ }
+
+ _ForkedStreamSubscription._(this._forker, controller)
+ : _controller = controller, super(controller.stream.listen(null));
+
+ _onChange() {
+ if (!_controller.hasListener) {
+ return _forker._cancel(this);
+ } else if (_controller.isPaused) {
+ _forker._pause(this);
+ } else {
+ _forker._resume(this);
+ }
+ return null;
+ }
+}
« no previous file with comments | « lib/src/stream_queue.dart ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698