Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(505)

Side by Side Diff: packages/async/lib/src/stream_splitter.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.stream_splitter;
6
7 import 'dart:async';
8 import 'dart:collection';
9
10 import '../result.dart';
11 import 'future_group.dart';
12
13 /// A class that splits a single source stream into an arbitrary number of
14 /// (single-subscription) streams (called "branch") that emit the same events.
15 ///
16 /// Each branch will emit all the same values and errors as the source stream,
17 /// regardless of which values have been emitted on other branches. This means
18 /// that the splitter stores every event that has been emitted so far, which may
19 /// consume a lot of memory. The user can call [close] to indicate that no more
20 /// branches will be created, and this memory will be released.
21 ///
22 /// The source stream is only listened to once a branch is created *and listened
23 /// to*. It's paused when all branches are paused *or when all branches are
24 /// canceled*, and resumed once there's at least one branch that's listening and
25 /// unpaused. It's not canceled unless no branches are listening and [close] has
26 /// been called.
27 class StreamSplitter<T> {
28 /// The wrapped stream.
29 final Stream<T> _stream;
30
31 /// The subscription to [_stream].
32 ///
33 /// This will be `null` until a branch has a listener.
34 StreamSubscription<T> _subscription;
35
36 /// The buffer of events or errors that have already been emitted by
37 /// [_stream].
38 final _buffer = new List<Result<T>>();
39
40 /// The controllers for branches that are listening for future events from
41 /// [_stream].
42 ///
43 /// Once a branch is canceled, it's removed from this list. When [_stream] is
44 /// done, all branches are removed.
45 final _controllers = new Set<StreamController<T>>();
46
47 /// A group of futures returned by [close].
48 ///
49 /// This is used to ensure that [close] doesn't complete until all
50 /// [StreamController.close] and [StreamSubscription.cancel] calls complete.
51 final _closeGroup = new FutureGroup();
52
53 /// Whether [_stream] is done emitting events.
54 var _isDone = false;
55
56 /// Whether [close] has been called.
57 var _isClosed = false;
58
59 /// Splits [stream] into [count] identical streams.
60 ///
61 /// [count] defaults to 2. This is the same as creating [count] branches and
62 /// then closing the [StreamSplitter].
63 static List<Stream> splitFrom(Stream stream, [int count]) {
64 if (count == null) count = 2;
65 var splitter = new StreamSplitter(stream);
66 var streams = new List.generate(count, (_) => splitter.split());
67 splitter.close();
68 return streams;
69 }
70
71 StreamSplitter(this._stream);
72
73 /// Returns a single-subscription stream that's a copy of the input stream.
74 ///
75 /// This will throw a [StateError] if [close] has been called.
76 Stream<T> split() {
77 if (_isClosed) {
78 throw new StateError("Can't call split() on a closed StreamSplitter.");
79 }
80
81 var controller;
82 controller = new StreamController<T>(
83 onListen: _onListen,
84 onPause: _onPause,
85 onResume: _onResume,
86 onCancel: () => _onCancel(controller));
87
88 for (var result in _buffer) {
89 result.addTo(controller);
90 }
91
92 if (_isDone) {
93 _closeGroup.add(controller.close());
94 } else {
95 _controllers.add(controller);
96 }
97
98 return controller.stream;
99 }
100
101 /// Indicates that no more branches will be requested via [split].
102 ///
103 /// This clears the internal buffer of events. If there are no branches or all
104 /// branches have been canceled, this cancels the subscription to the input
105 /// stream.
106 ///
107 /// Returns a [Future] that completes once all events have been processed by
108 /// all branches and (if applicable) the subscription to the input stream has
109 /// been canceled.
110 Future close() {
111 if (_isClosed) return _closeGroup.future;
112 _isClosed = true;
113
114 _buffer.clear();
115 if (_controllers.isEmpty) _cancelSubscription();
116
117 return _closeGroup.future;
118 }
119
120 /// Cancel [_subscription] and close [_closeGroup].
121 ///
122 /// This should be called after all the branches' subscriptions have been
123 /// canceled and the splitter has been closed. In that case, we won't use the
124 /// events from [_subscription] any more, since there's nothing to pipe them
125 /// to and no more branches will be created. If [_subscription] is done,
126 /// canceling it will be a no-op.
127 ///
128 /// This may also be called before any branches have been created, in which
129 /// case [_subscription] will be `null`.
130 void _cancelSubscription() {
131 assert(_controllers.isEmpty);
132 assert(_isClosed);
133
134 var future = null;
135 if (_subscription != null) future = _subscription.cancel();
136 if (future != null) _closeGroup.add(future);
137 _closeGroup.close();
138 }
139
140 // StreamController events
141
142 /// Subscribe to [_stream] if we haven't yet done so, and resume the
143 /// subscription if we have.
144 void _onListen() {
145 if (_isDone) return;
146
147 if (_subscription != null) {
148 // Resume the subscription in case it was paused, either because all the
149 // controllers were paused or because the last one was canceled. If it
150 // wasn't paused, this will be a no-op.
151 _subscription.resume();
152 } else {
153 _subscription = _stream.listen(
154 _onData, onError: _onError, onDone: _onDone);
155 }
156 }
157
158 /// Pauses [_subscription] if every controller is paused.
159 void _onPause() {
160 if (!_controllers.every((controller) => controller.isPaused)) return;
161 _subscription.pause();
162 }
163
164 /// Resumes [_subscription].
165 ///
166 /// If [_subscription] wasn't paused, this is a no-op.
167 void _onResume() {
168 _subscription.resume();
169 }
170
171 /// Removes [controller] from [_controllers] and cancels or pauses
172 /// [_subscription] as appropriate.
173 ///
174 /// Since the controller emitting a done event will cause it to register as
175 /// canceled, this is the only way that a controller is ever removed from
176 /// [_controllers].
177 void _onCancel(StreamController controller) {
178 _controllers.remove(controller);
179 if (_controllers.isNotEmpty) return;
180
181 if (_isClosed) {
182 _cancelSubscription();
183 } else {
184 _subscription.pause();
185 }
186 }
187
188 // Stream events
189
190 /// Buffers [data] and passes it to [_controllers].
191 void _onData(T data) {
192 if (!_isClosed) _buffer.add(new Result.value(data));
193 for (var controller in _controllers) {
194 controller.add(data);
195 }
196 }
197
198 /// Buffers [error] and passes it to [_controllers].
199 void _onError(Object error, StackTrace stackTrace) {
200 if (!_isClosed) _buffer.add(new Result.error(error, stackTrace));
201 for (var controller in _controllers) {
202 controller.addError(error, stackTrace);
203 }
204 }
205
206 /// Marks [_controllers] as done.
207 void _onDone() {
208 _isDone = true;
209 for (var controller in _controllers) {
210 _closeGroup.add(controller.close());
211 }
212 }
213 }
OLDNEW
« no previous file with comments | « packages/async/lib/src/stream_queue.dart ('k') | packages/async/lib/src/subscription_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698