Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(162)

Side by Side Diff: lib/src/subscription_fork.dart

Issue 1238503004: Alternative implementation of StreamQueue.fork which has no impact if it Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/stream_queue.dart ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 library async.subcsription_fork;
2
3 import "dart:async";
4 import "package:async/async.dart" show DelegatingStreamSubscription;
5
6 /// Allows creating forks of a single [StreamSubscription].
7 ///
8 /// The forks are stream subscriptions that will provide the same events
9 /// as the original subscription from the time where the fork was created.
10 ///
11 /// The original subscription is canceled when all forks are canceled, after the
12 /// first fork has been created, and it's paused when all forks are paused.
13 class SubscriptionForker {
14 final StreamSubscription _source;
15 final Set<_ForkedStreamSubscription> _subscriptions = new Set();
16 bool _isClosed;
17
18 SubscriptionForker(StreamSubscription subscription) : _source = subscription {
19 _source.pause();
20 _source.onData(_onData);
21 _source.onError(_onError);
22 _source.onDone(_onDone);
23 }
24
25 void _onData(T data) {
26 for (var subscription in _subscriptions.toList(growable: false)) {
27 subscription._controller.add(data);
28 }
29 }
30
31 void _onError(error, StackTrace stackTrace) {
32 for (var subscription in _subscriptions.toList(growable: false)) {
33 subscription._controller.addError(error, stackTrace);
34 }
35 }
36
37 void _onDone() {
38 _isClosed = true;
39 for (var subscription in _subscriptions.toList(growable: false)) {
40 subscription._controller.close();
41 }
42 }
43
44 Future _cancel(_ForkedStreamSubscription subscription) {
45 _subscriptions.remove(subscription);
46 if (_subscriptions.isEmpty) {
47 _isClosed = true;
48 return _source.cancel();
49 } else if (!_source.isPaused) {
50 _checkPause();
51 }
52 }
53
54 void _pause(_ForkedStreamSubscription subscription) {
55 _checkPause();
56 }
57
58 void _checkPause() {
59 if (_subscriptions.every(_isSubscriptionPaused)) {
60 _source.pause();
61 }
62 }
63
64 void _resume(_ForkedStreamSubscription subscription) {
65 _source.resume();
66 }
67
68 static bool _isSubscriptionPaused(_ForkedStreamSubscription forked) {
69 return forked._controller.isPaused;
70 }
71
72 /// Create a new fork of the source subscription.
73 StreamSubscription<T> fork() {
74 if (_isClosed) return new StreamSubscription<T>.empty();
75 var subscription = new _ForkedStreamSubscription<T>(this);
76 _subscriptions.add(subscription);
77 if (_source.isPaused) _source.resume();
78 return subscription;
79 }
80 }
81
82 /// A forkable stream subscription.
83 ///
84 /// A stream subscription with [fork] method.
85 ///
86 class ForkableSubscription<T> extends DelegatingStreamSubscription<T> {
87 final SubscriptionForker<T> _forker;
88
89 /// Create a forkable subscription from [subscription].
90 ///
91 /// This creates a [SubscriptionForker] and returns a subscription backed
92 /// by one fork of this subscription forker, along with a [fork] method
93 /// which allows creating more.
94 ForkableSubscription(StreamSubscription<T> subscription)
95 : this._(subscription is _ForkedStreamSubscription
96 ? subscription._forker
97 : new SubscriptionForker<T>(subscription));
98
99 ForkableSubscription._(SubscriptionForker<T> forker)
100 : _forker = forker, super(forker.fork());
101
102 StreamSubscription<T> fork() => _forker.fork();
103 }
104
105 class _ForkedStreamSubscription<T> extends DelegatingStreamSubscription<T> {
106 final SubscriptionForker _forker;
107 final StreamController _controller;
108 factory _ForkedStreamSubscription(SubscriptionForker<T> forker) {
109 var instance;
110 var onChange = () => instance._onChange();
111 var controller = new StreamController(sync: true,
112 onPause: onChange,
113 onResume: onChange,
114 onCancel: onChange);
115 instance = new _ForkedStreamSubscription._(forker, controller);
116 return instance;
117 }
118
119 _ForkedStreamSubscription._(this._forker, controller)
120 : _controller = controller, super(controller.stream.listen(null));
121
122 _onChange() {
123 if (!_controller.hasListener) {
124 return _forker._cancel(this);
125 } else if (_controller.isPaused) {
126 _forker._pause(this);
127 } else {
128 _forker._resume(this);
129 }
130 return null;
131 }
132 }
OLDNEW
« no previous file with comments | « lib/src/stream_queue.dart ('k') | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698