Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Side by Side Diff: lib/src/stream_transformers.dart

Issue 1648963002: Add reactive-inspired stream transformers: Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
nweiz 2016/01/29 22:19:44 These transformers should all have explicit docume
Lasse Reichstein Nielsen 2016/02/01 12:43:20 True.
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import "dart:async";
6 import "dart:collection";
7
8 /// A group created by [GroupBy].
9 ///
10 /// The stream created by [GroupBy] emits a [Group] for each distinct key
11 /// it sees.
12 /// This group contains the key itself, along with a stream of the values
13 /// associated with that key.
14 class Group<K, V> {
15 final K key;
nweiz 2016/01/29 22:19:45 It would be good to have some documentation for th
Lasse Reichstein Nielsen 2016/02/25 16:18:09 Done.
16 final Stream<V> values;
17 Group(this.key, this.values);
18 }
19
20 /**
21 * Groups events by a computed key.
22 *
23 * A key is extracted from incoming events, and a stream is created for each
24 * unique key. A value is computed from the event as well, and emitted on the
25 * corresponding stream.
26 *
27 * An error on the source stream, or when calling the `key` or `value`
28 * functions, will emit the error on the returned stream and stop
29 * processing, closing all the individual group streams.
30 *
31 * The returned stream emits a [Group] object for each distinct key seen
32 * by the transformation, and the values associated with the key are output
33 * on the [Group.values] stream.
34 */
nweiz 2016/01/29 22:19:45 Nit: These should be "///" comments.
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
35 class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> {
nweiz 2016/01/29 22:19:45 Personally, I'd put each of these into its own fil
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
36 final Function _key;
37 final Function _value;
38
39 /// Groups values returned by [element] by the key returned by [key].
nweiz 2016/01/29 22:19:45 "element" -> "value"
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
40 ///
41 /// If [value] is omitted, it defaults to an identity function, which
42 /// will only work if all the source events are instances of [V].
43 GroupBy(K key(S source), [V value(S source)])
nweiz 2016/01/29 22:19:44 Can we make [value] a named parameter? It makes th
Lasse Reichstein Nielsen 2016/02/01 12:43:20 We could probably make both named parameters. (Ma
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
44 : _key = key, _value = value ?? _identity;
nweiz 2016/01/29 22:19:45 Nit: put each of these on its own line (https://ww
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done. Grudgingly.
45
46 // Helper function.
47 static _identity(x) => x;
48
49 Stream<Group<K, V>> bind(Stream<S> stream) async* {
nweiz 2016/01/29 22:19:44 Last I heard, "async*" was really slow on the VM.
Lasse Reichstein Nielsen 2016/02/01 12:43:20 They should fix that then. We have this syntax bec
nweiz 2016/02/02 02:29:13 I agree for the most part (although in some situat
Lasse Reichstein Nielsen 2016/02/25 16:18:10 I'd prefer to land this as written, then do a comp
50 var controllers = new HashMap();
nweiz 2016/01/29 22:19:45 "{}" would be a lot more idiomatic, and would make
Lasse Reichstein Nielsen 2016/02/01 12:43:20 But I don't want to have a deterministic order (or
nweiz 2016/02/02 02:29:13 I wrote a lot of Ruby code in the days when hashes
Lasse Reichstein Nielsen 2016/02/25 16:18:10 In this case, we do leak the order by closing the
51 try {
52 await for (S source in stream) {
nweiz 2016/01/29 22:19:45 I'd use "var source" here and "var value" below.
Lasse Reichstein Nielsen 2016/02/25 16:18:10 For value it's actually deliberate (but for source
53 var key = _key(source);
54 var controller = controllers[key];
55 if (controller == null) {
56 controller = new StreamController<V>(sync: true);
57 controllers[key] = controller;
58 yield new Group<K, V>(key, controller.stream);
59 }
60 V value = _value(source);
61 controller.add(value);
62 }
63 } finally {
64 for (var controller in controllers.values) {
65 controller.close();
66 }
67 }
68 }
69 }
70
71 /// Scans a stream's events and combine a result from the available events.
floitsch 2016/01/29 19:05:03 combines. But the sentence could be better. I'm n
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Reworded and example added.
72 ///
73 /// Combines each event with the previous accumulator value
74 /// and emits the result.
75 ///
76 /// Errors in the source stream or when calling the combine function will
77 /// be reported on the result stream, and will stop the process.
78 class Scan<S, A> implements StreamTransformer<S, A> {
floitsch 2016/01/29 19:05:02 "Scan looks like a bad name to me".
nweiz 2016/01/29 22:19:45 How is this different than Stream.fold()?
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Stream.fold returns a single Future, this returns
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Name was taking from the reactive framework. Alter
79 final A _initial;
80 final Function _combine;
81
82 /// Accumulates stream events using [combine] starting with [initial].
83 Scan(A initial, A combine(A accumulator, S source))
84 : _initial = initial, _combine = combine;
85
86 Stream<A> bind(Stream<S> stream) async* {
87 A accumulator = _initial;
88 await for (S source in stream) {
89 accumulator = _combine(accumulator, source);
90 yield accumulator;
91 }
92 }
93 }
94
95 /// Drops any event that happens shortly after another event.
96 ///
97 /// If an event happens within a certain [Duration] of the previous event,
98 /// the new event is dropped, but the dropped event will still cause
99 /// following events to be dropped until the entire duration has passed without
100 /// any events.
101 ///
102 /// This differs from [Throttle] which also drops events, but which only
103 /// restarts its timer at events that aren't dropped.
104 class Debounce<S> implements StreamTransformer<S, S> {
105 final Duration _interval;
106
107 /// Drops events sooner than [interval] after any other event.
108 Debounce(Duration interval) : _interval = interval;
nweiz 2016/01/29 22:19:45 I think this would be cleaner as "this._interval".
Lasse Reichstein Nielsen 2016/02/01 12:43:20 I will not make a public parameter of a public met
109
110 Stream<S> bind(Stream<S> stream) async* {
111 var interval = Duration.ZERO; // Avoid dropping the first event.
nweiz 2016/01/29 22:19:45 This seems a little too clever to me. What about:
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Wouldn't work - the comparison is in the wrong dir
nweiz 2016/02/02 02:29:13 Hmm, true.
112 var stopwatch = new Stopwatch()..start();
113 await for (var source in stream) {
114 if (stopwatch.elapsed >= interval) {
115 yield source;
116 interval = _interval;
117 }
118 stopwatch.reset();
119 }
120 stopwatch.stop();
121 }
122 }
123
124 /// Drops any event that happens shortly after another undropped event.
125 ///
126 /// If an event happens within a certain [Duration] of the most recently
127 /// output event, the new event is dropped.
128 /// When the duration has passed after an event that wasn't dropped,
129 /// the next event will be accepted.
130 ///
131 /// This differs from [Debounce] which also drops events,
floitsch 2016/01/29 19:05:03 from [Debounce] which restarts the timer after dro
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
132 /// but which restarts the timer after dropped events too.
133 class Throttle<S> implements StreamTransformer<S, S> {
134 final Duration _interval;
135
136 /// Drops events sooner than [interval] after emitted events.
137 Throttle(Duration interval) : _interval = interval;
138
139 Stream<S> bind(Stream<S> stream) async* {
140 var interval = Duration.ZERO; // Avoid dropping the first event.
141 var stopwatch = new Stopwatch()..start();
142 await for (var source in stream) {
143 if (stopwatch.elapsed >= interval) {
144 yield source;
145 stopwatch.reset();
146 interval = _interval;
147 }
148 }
149 stopwatch.stop();
150 }
151 }
152
153 /// Drops the last [count] events of a stream.
154 ///
155 /// The resulting stream is [count] events behind the source stream.
156 /// The first [count] events are buffered, and when further events arrive,
157 /// they are buffered too, and the earlies events in the buffer are emitted.
158 ///
159 /// The resulting stream ends [count] events before the source stream,
160 /// effecively skipping the last [count] events.
161 class SkipLast<S> implements StreamTransformer<S, S> {
nweiz 2016/01/29 22:19:45 What's the use case for this?
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Honestly don't know - found it in the reactive fra
Lasse Reichstein Nielsen 2016/02/25 16:18:10 But since that's my best argument, I'll drop it fo
162 /// Number of events dropped.
163 final int count;
nweiz 2016/01/29 22:19:45 Why is this public? It doesn't match the parameter
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Good point. I think I decided to make it public b
164
165 /// Skip the [count] last events.
166 SkipLast(this.count);
167
168 Stream<S> bind(Stream<S> stream) async* {
169 Queue queue = new ListQueue(count + 1);
nweiz 2016/01/29 22:19:45 Calling "new ListQueue()" explicitly reads very st
Lasse Reichstein Nielsen 2016/02/01 12:43:20 The Queue constructor doesn't have an "initial cap
170 await for (S source in stream) {
171 queue.add(source);
172 if (queue.length > count) {
173 yield queue.removeFirst();
174 }
175 }
176 }
177 }
178
179
180 /**
nweiz 2016/01/29 22:19:45 ///
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Done.
181 * Groups events by a computed key.
182 *
183 * A key is extracted from incoming events, and a stream is created for each
184 * unique key. A value is computed from the event as well, and emitted on the
185 * corresponding stream.
186 *
187 * The returned stream emits a [Group] object for each distinct key seen
188 * by the transformation, and the values associated with the key are output
189 * on the [Group.values] stream.
190 */
191 StreamTransformer<dynamic, Group> streamGroupBy(key(source), value(source)) =>
nweiz 2016/01/29 22:19:45 It seems weird that both these functions *and* the
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Until generic functions, that would prevent access
192 new GroupBy(key, value);
floitsch 2016/01/29 19:05:03 the resulting stream should have a "Future<Map> to
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Streams don't have that, and I *really* prefer not
193
194 /// Scans a stream's events and combine a result from the available events.
195 ///
196 /// Combines each event with the previous accumulator value
197 /// and emits the result.
198 ///
199 /// Errors in the source stream or when calling the combine function will
200 /// be reported on the result stream, and will stop the process.
201 StreamTransformer scanStream(initial, combine(value, source)) =>
202 new Scan(initial, combine);
203
204 /// Drops any event that happens shortly after another undropped event.
205 ///
206 /// If an event happens within a certain [Duration] of the most recently
207 /// output event, the new event is dropped.
208 /// When the duration has passed after an event that wasn't dropped,
209 /// the next event will be accepted.
210 ///
211 /// This differs from [Debounce] which also drops events,
212 /// but which restarts the timer after dropped events too.
213 StreamTransformer throttleStream(Duration duration) => new Throttle(duration);
214
215 /// Drops any event that happens shortly after another event.
216 ///
217 /// If an event happens within a certain [Duration] of the previous event,
218 /// the new event is dropped, but the dropped event will still cause
219 /// following events to be dropped until the entire duration has passed without
220 /// any events.
221 ///
222 /// This differs from [Throttle] which also drops events, but which only
223 /// restarts its timer at events that aren't dropped.
224 StreamTransformer debounceStream(Duration duration) => new Debounce(duration);
225
226 /// Drops the last [count] events of a stream.
227 StreamTransformer streamSkipLast(int count) => new SkipLast(count);
228
229 /// Concatenates streams.
230 ///
231 /// The events of each stream in [streams] are emitted, one stream at a time
232 /// in iteration order. When the final stream in [streams] has been processed,
233 /// the returned stream closes.
234 Stream concatenateStreams(Iterable<Stream> streams) async* {
nweiz 2016/01/29 22:19:44 This and mergeStreams aren't actually transformers
Lasse Reichstein Nielsen 2016/02/25 16:18:10 I've remove the rest, so it's just concatenateStre
235 for (var stream in streams) {
236 yield* stream;
237 }
238 }
239
240 /// Returns a stream that emits the events of all the [streams].
241 ///
242 /// The events from all the source streams are merged into one stream, with
243 /// events from the same source stream occuring in their original order.
244 Stream mergeStreams(Iterable<Stream> streams) {
nweiz 2016/01/29 22:19:45 Isn't this the same as StreamGroup.merge?
Lasse Reichstein Nielsen 2016/02/01 12:43:20 Most likely. So, even if it's worth having as a t
nweiz 2016/02/02 02:29:13 I'd prefer not to add a top-level forwarder; I'm n
Lasse Reichstein Nielsen 2016/02/25 16:18:10 Ok, I'll remove this one. I'll remove the functio
245 var controller;
246 int activeSubscriptions = 0;
247 var subscriptions = [];
248 controller = new StreamController(sync: true,
249 onListen: () {
250 // Cache extracted functions instead of extacting for each stream.
251 var add = controller.add;
252 var addError = controller.addError;
253 for (var stream in streams) {
254 subscriptions.add(stream.listen(add,
255 onError: addError,
256 onDone: () {
257 activeSubscriptions--;
258 if (activeSubscriptions == 0) {
259 controller.close();
260 }
261 }));
262 }
263 activeSubscriptions = subscriptions.length;
264 if (activeSubscriptions == 0) {
265 controller.close();
266 }
267 },
268 onPause: () {
269 for (var subscription in subscriptions) subscription.pause();
270 },
271 onResume: () {
272 for (var subscription in subscriptions) subscription.resume();
273 },
274 onCancel: () {
275 return Future.wait(subscriptions.map((subscription) {
276 var result = subscription.cancel();
277 if (result is Future) return result;
278 return new Future.value();
279 }));
280 });
281 return controller.stream;
282 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698