 Chromium Code Reviews
 Chromium Code Reviews Issue 1648963002:
  Add reactive-inspired stream transformers: 
  Base URL: https://github.com/dart-lang/async@master
    
  
    Issue 1648963002:
  Add reactive-inspired stream transformers: 
  Base URL: https://github.com/dart-lang/async@master| OLD | NEW | 
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | |
| 
nweiz
2016/01/29 22:19:44
These transformers should all have explicit docume
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
True.
 | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 import "dart:async"; | |
| 6 import "dart:collection"; | |
| 7 | |
| 8 /// A group created by [GroupBy]. | |
| 9 /// | |
| 10 /// The stream created by [GroupBy] emits a [Group] for each distinct key | |
| 11 /// it sees. | |
| 12 /// This group contains the key itself, along with a stream of the values | |
| 13 /// associated with that key. | |
| 14 class Group<K, V> { | |
| 15 final K key; | |
| 
nweiz
2016/01/29 22:19:45
It would be good to have some documentation for th
 
Lasse Reichstein Nielsen
2016/02/25 16:18:09
Done.
 | |
| 16 final Stream<V> values; | |
| 17 Group(this.key, this.values); | |
| 18 } | |
| 19 | |
| 20 /** | |
| 21 * Groups events by a computed key. | |
| 22 * | |
| 23 * A key is extracted from incoming events, and a stream is created for each | |
| 24 * unique key. A value is computed from the event as well, and emitted on the | |
| 25 * corresponding stream. | |
| 26 * | |
| 27 * An error on the source stream, or when calling the `key` or `value` | |
| 28 * functions, will emit the error on the returned stream and stop | |
| 29 * processing, closing all the individual group streams. | |
| 30 * | |
| 31 * The returned stream emits a [Group] object for each distinct key seen | |
| 32 * by the transformation, and the values associated with the key are output | |
| 33 * on the [Group.values] stream. | |
| 34 */ | |
| 
nweiz
2016/01/29 22:19:45
Nit: These should be "///" comments.
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
 | |
| 35 class GroupBy<S, K, V> implements StreamTransformer<S, Group<K, V>> { | |
| 
nweiz
2016/01/29 22:19:45
Personally, I'd put each of these into its own fil
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
 | |
| 36 final Function _key; | |
| 37 final Function _value; | |
| 38 | |
| 39 /// Groups values returned by [element] by the key returned by [key]. | |
| 
nweiz
2016/01/29 22:19:45
"element" -> "value"
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
 | |
| 40 /// | |
| 41 /// If [value] is omitted, it defaults to an identity function, which | |
| 42 /// will only work if all the source events are instances of [V]. | |
| 43 GroupBy(K key(S source), [V value(S source)]) | |
| 
nweiz
2016/01/29 22:19:44
Can we make [value] a named parameter? It makes th
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
We could probably make both named parameters.
(Ma
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
 | |
| 44 : _key = key, _value = value ?? _identity; | |
| 
nweiz
2016/01/29 22:19:45
Nit: put each of these on its own line (https://ww
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done. Grudgingly.
 | |
| 45 | |
| 46 // Helper function. | |
| 47 static _identity(x) => x; | |
| 48 | |
| 49 Stream<Group<K, V>> bind(Stream<S> stream) async* { | |
| 
nweiz
2016/01/29 22:19:44
Last I heard, "async*" was really slow on the VM.
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
They should fix that then.
We have this syntax bec
 
nweiz
2016/02/02 02:29:13
I agree for the most part (although in some situat
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
I'd prefer to land this as written, then do a comp
 | |
| 50 var controllers = new HashMap(); | |
| 
nweiz
2016/01/29 22:19:45
"{}" would be a lot more idiomatic, and would make
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
But I don't want to have a deterministic order (or
 
nweiz
2016/02/02 02:29:13
I wrote a lot of Ruby code in the days when hashes
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
In this case, we do leak the order by closing the
 | |
| 51 try { | |
| 52 await for (S source in stream) { | |
| 
nweiz
2016/01/29 22:19:45
I'd use "var source" here and "var value" below.
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
For value it's actually deliberate (but for source
 | |
| 53 var key = _key(source); | |
| 54 var controller = controllers[key]; | |
| 55 if (controller == null) { | |
| 56 controller = new StreamController<V>(sync: true); | |
| 57 controllers[key] = controller; | |
| 58 yield new Group<K, V>(key, controller.stream); | |
| 59 } | |
| 60 V value = _value(source); | |
| 61 controller.add(value); | |
| 62 } | |
| 63 } finally { | |
| 64 for (var controller in controllers.values) { | |
| 65 controller.close(); | |
| 66 } | |
| 67 } | |
| 68 } | |
| 69 } | |
| 70 | |
| 71 /// Scans a stream's events and combine a result from the available events. | |
| 
floitsch
2016/01/29 19:05:03
combines.
But the sentence could be better. I'm n
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Reworded and example added.
 | |
| 72 /// | |
| 73 /// Combines each event with the previous accumulator value | |
| 74 /// and emits the result. | |
| 75 /// | |
| 76 /// Errors in the source stream or when calling the combine function will | |
| 77 /// be reported on the result stream, and will stop the process. | |
| 78 class Scan<S, A> implements StreamTransformer<S, A> { | |
| 
floitsch
2016/01/29 19:05:02
"Scan looks like a bad name to me".
 
nweiz
2016/01/29 22:19:45
How is this different than Stream.fold()?
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Stream.fold returns a single Future, this returns
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Name was taking from the reactive framework.
Alter
 | |
| 79 final A _initial; | |
| 80 final Function _combine; | |
| 81 | |
| 82 /// Accumulates stream events using [combine] starting with [initial]. | |
| 83 Scan(A initial, A combine(A accumulator, S source)) | |
| 84 : _initial = initial, _combine = combine; | |
| 85 | |
| 86 Stream<A> bind(Stream<S> stream) async* { | |
| 87 A accumulator = _initial; | |
| 88 await for (S source in stream) { | |
| 89 accumulator = _combine(accumulator, source); | |
| 90 yield accumulator; | |
| 91 } | |
| 92 } | |
| 93 } | |
| 94 | |
| 95 /// Drops any event that happens shortly after another event. | |
| 96 /// | |
| 97 /// If an event happens within a certain [Duration] of the previous event, | |
| 98 /// the new event is dropped, but the dropped event will still cause | |
| 99 /// following events to be dropped until the entire duration has passed without | |
| 100 /// any events. | |
| 101 /// | |
| 102 /// This differs from [Throttle] which also drops events, but which only | |
| 103 /// restarts its timer at events that aren't dropped. | |
| 104 class Debounce<S> implements StreamTransformer<S, S> { | |
| 105 final Duration _interval; | |
| 106 | |
| 107 /// Drops events sooner than [interval] after any other event. | |
| 108 Debounce(Duration interval) : _interval = interval; | |
| 
nweiz
2016/01/29 22:19:45
I think this would be cleaner as "this._interval".
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
I will not make a public parameter of a public met
 | |
| 109 | |
| 110 Stream<S> bind(Stream<S> stream) async* { | |
| 111 var interval = Duration.ZERO; // Avoid dropping the first event. | |
| 
nweiz
2016/01/29 22:19:45
This seems a little too clever to me. What about:
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Wouldn't work - the comparison is in the wrong dir
 
nweiz
2016/02/02 02:29:13
Hmm, true.
 | |
| 112 var stopwatch = new Stopwatch()..start(); | |
| 113 await for (var source in stream) { | |
| 114 if (stopwatch.elapsed >= interval) { | |
| 115 yield source; | |
| 116 interval = _interval; | |
| 117 } | |
| 118 stopwatch.reset(); | |
| 119 } | |
| 120 stopwatch.stop(); | |
| 121 } | |
| 122 } | |
| 123 | |
| 124 /// Drops any event that happens shortly after another undropped event. | |
| 125 /// | |
| 126 /// If an event happens within a certain [Duration] of the most recently | |
| 127 /// output event, the new event is dropped. | |
| 128 /// When the duration has passed after an event that wasn't dropped, | |
| 129 /// the next event will be accepted. | |
| 130 /// | |
| 131 /// This differs from [Debounce] which also drops events, | |
| 
floitsch
2016/01/29 19:05:03
from [Debounce] which restarts the timer after dro
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
 | |
| 132 /// but which restarts the timer after dropped events too. | |
| 133 class Throttle<S> implements StreamTransformer<S, S> { | |
| 134 final Duration _interval; | |
| 135 | |
| 136 /// Drops events sooner than [interval] after emitted events. | |
| 137 Throttle(Duration interval) : _interval = interval; | |
| 138 | |
| 139 Stream<S> bind(Stream<S> stream) async* { | |
| 140 var interval = Duration.ZERO; // Avoid dropping the first event. | |
| 141 var stopwatch = new Stopwatch()..start(); | |
| 142 await for (var source in stream) { | |
| 143 if (stopwatch.elapsed >= interval) { | |
| 144 yield source; | |
| 145 stopwatch.reset(); | |
| 146 interval = _interval; | |
| 147 } | |
| 148 } | |
| 149 stopwatch.stop(); | |
| 150 } | |
| 151 } | |
| 152 | |
| 153 /// Drops the last [count] events of a stream. | |
| 154 /// | |
| 155 /// The resulting stream is [count] events behind the source stream. | |
| 156 /// The first [count] events are buffered, and when further events arrive, | |
| 157 /// they are buffered too, and the earlies events in the buffer are emitted. | |
| 158 /// | |
| 159 /// The resulting stream ends [count] events before the source stream, | |
| 160 /// effecively skipping the last [count] events. | |
| 161 class SkipLast<S> implements StreamTransformer<S, S> { | |
| 
nweiz
2016/01/29 22:19:45
What's the use case for this?
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Honestly don't know - found it in the reactive fra
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
But since that's my best argument, I'll drop it fo
 | |
| 162 /// Number of events dropped. | |
| 163 final int count; | |
| 
nweiz
2016/01/29 22:19:45
Why is this public? It doesn't match the parameter
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Good point.
I think I decided to make it public b
 | |
| 164 | |
| 165 /// Skip the [count] last events. | |
| 166 SkipLast(this.count); | |
| 167 | |
| 168 Stream<S> bind(Stream<S> stream) async* { | |
| 169 Queue queue = new ListQueue(count + 1); | |
| 
nweiz
2016/01/29 22:19:45
Calling "new ListQueue()" explicitly reads very st
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
The Queue constructor doesn't have an "initial cap
 | |
| 170 await for (S source in stream) { | |
| 171 queue.add(source); | |
| 172 if (queue.length > count) { | |
| 173 yield queue.removeFirst(); | |
| 174 } | |
| 175 } | |
| 176 } | |
| 177 } | |
| 178 | |
| 179 | |
| 180 /** | |
| 
nweiz
2016/01/29 22:19:45
///
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Done.
 | |
| 181 * Groups events by a computed key. | |
| 182 * | |
| 183 * A key is extracted from incoming events, and a stream is created for each | |
| 184 * unique key. A value is computed from the event as well, and emitted on the | |
| 185 * corresponding stream. | |
| 186 * | |
| 187 * The returned stream emits a [Group] object for each distinct key seen | |
| 188 * by the transformation, and the values associated with the key are output | |
| 189 * on the [Group.values] stream. | |
| 190 */ | |
| 191 StreamTransformer<dynamic, Group> streamGroupBy(key(source), value(source)) => | |
| 
nweiz
2016/01/29 22:19:45
It seems weird that both these functions *and* the
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Until generic functions, that would prevent access
 | |
| 192 new GroupBy(key, value); | |
| 
floitsch
2016/01/29 19:05:03
the resulting stream should have a "Future<Map> to
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Streams don't have that, and I *really* prefer not
 | |
| 193 | |
| 194 /// Scans a stream's events and combine a result from the available events. | |
| 195 /// | |
| 196 /// Combines each event with the previous accumulator value | |
| 197 /// and emits the result. | |
| 198 /// | |
| 199 /// Errors in the source stream or when calling the combine function will | |
| 200 /// be reported on the result stream, and will stop the process. | |
| 201 StreamTransformer scanStream(initial, combine(value, source)) => | |
| 202 new Scan(initial, combine); | |
| 203 | |
| 204 /// Drops any event that happens shortly after another undropped event. | |
| 205 /// | |
| 206 /// If an event happens within a certain [Duration] of the most recently | |
| 207 /// output event, the new event is dropped. | |
| 208 /// When the duration has passed after an event that wasn't dropped, | |
| 209 /// the next event will be accepted. | |
| 210 /// | |
| 211 /// This differs from [Debounce] which also drops events, | |
| 212 /// but which restarts the timer after dropped events too. | |
| 213 StreamTransformer throttleStream(Duration duration) => new Throttle(duration); | |
| 214 | |
| 215 /// Drops any event that happens shortly after another event. | |
| 216 /// | |
| 217 /// If an event happens within a certain [Duration] of the previous event, | |
| 218 /// the new event is dropped, but the dropped event will still cause | |
| 219 /// following events to be dropped until the entire duration has passed without | |
| 220 /// any events. | |
| 221 /// | |
| 222 /// This differs from [Throttle] which also drops events, but which only | |
| 223 /// restarts its timer at events that aren't dropped. | |
| 224 StreamTransformer debounceStream(Duration duration) => new Debounce(duration); | |
| 225 | |
| 226 /// Drops the last [count] events of a stream. | |
| 227 StreamTransformer streamSkipLast(int count) => new SkipLast(count); | |
| 228 | |
| 229 /// Concatenates streams. | |
| 230 /// | |
| 231 /// The events of each stream in [streams] are emitted, one stream at a time | |
| 232 /// in iteration order. When the final stream in [streams] has been processed, | |
| 233 /// the returned stream closes. | |
| 234 Stream concatenateStreams(Iterable<Stream> streams) async* { | |
| 
nweiz
2016/01/29 22:19:44
This and mergeStreams aren't actually transformers
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
I've remove the rest, so it's just concatenateStre
 | |
| 235 for (var stream in streams) { | |
| 236 yield* stream; | |
| 237 } | |
| 238 } | |
| 239 | |
| 240 /// Returns a stream that emits the events of all the [streams]. | |
| 241 /// | |
| 242 /// The events from all the source streams are merged into one stream, with | |
| 243 /// events from the same source stream occuring in their original order. | |
| 244 Stream mergeStreams(Iterable<Stream> streams) { | |
| 
nweiz
2016/01/29 22:19:45
Isn't this the same as StreamGroup.merge?
 
Lasse Reichstein Nielsen
2016/02/01 12:43:20
Most likely.
So, even if it's worth having as a t
 
nweiz
2016/02/02 02:29:13
I'd prefer not to add a top-level forwarder; I'm n
 
Lasse Reichstein Nielsen
2016/02/25 16:18:10
Ok, I'll remove this one.
I'll remove the functio
 | |
| 245 var controller; | |
| 246 int activeSubscriptions = 0; | |
| 247 var subscriptions = []; | |
| 248 controller = new StreamController(sync: true, | |
| 249 onListen: () { | |
| 250 // Cache extracted functions instead of extacting for each stream. | |
| 251 var add = controller.add; | |
| 252 var addError = controller.addError; | |
| 253 for (var stream in streams) { | |
| 254 subscriptions.add(stream.listen(add, | |
| 255 onError: addError, | |
| 256 onDone: () { | |
| 257 activeSubscriptions--; | |
| 258 if (activeSubscriptions == 0) { | |
| 259 controller.close(); | |
| 260 } | |
| 261 })); | |
| 262 } | |
| 263 activeSubscriptions = subscriptions.length; | |
| 264 if (activeSubscriptions == 0) { | |
| 265 controller.close(); | |
| 266 } | |
| 267 }, | |
| 268 onPause: () { | |
| 269 for (var subscription in subscriptions) subscription.pause(); | |
| 270 }, | |
| 271 onResume: () { | |
| 272 for (var subscription in subscriptions) subscription.resume(); | |
| 273 }, | |
| 274 onCancel: () { | |
| 275 return Future.wait(subscriptions.map((subscription) { | |
| 276 var result = subscription.cancel(); | |
| 277 if (result is Future) return result; | |
| 278 return new Future.value(); | |
| 279 })); | |
| 280 }); | |
| 281 return controller.stream; | |
| 282 } | |
| OLD | NEW |