OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ |
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { |
9 if (error is AsyncError) return error; | 9 if (error is AsyncError) return error; |
10 if (cause == null) return new AsyncError(error, stackTrace); | 10 if (cause == null) return new AsyncError(error, stackTrace); |
(...skipping 20 matching lines...) Expand all Loading... |
31 | 31 |
32 /** Helper function to make an onError argument to [_runUserCode]. */ | 32 /** Helper function to make an onError argument to [_runUserCode]. */ |
33 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => | 33 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => |
34 (AsyncError error) { | 34 (AsyncError error) { |
35 subscription.cancel(); | 35 subscription.cancel(); |
36 future._setError(error); | 36 future._setError(error); |
37 }; | 37 }; |
38 | 38 |
39 | 39 |
40 /** | 40 /** |
41 * A wrapper around a stream that allows independent subscribers. | 41 * A [StreamTransformer] that forwards events and subscriptions. |
42 * | 42 * |
43 * By default [this] subscribes to [_source] and forwards all events to its own | 43 * By default this transformer subscribes to [_source] and forwards all events |
44 * subscribers. It does not subscribe until there is a subscriber, and | 44 * to [_stream]. It does not subscribe to [_source] until there is a subscriber, |
45 * unsubscribes again when there are no subscribers left. | 45 * on [_stream] and unsubscribes again when there are no subscribers left. |
46 * | 46 * |
47 * The events are passed through the [_handleData], [_handleError] and | 47 * The events are passed through the [_handleData], [_handleError] and |
48 * [_handleDone] methods. Subclasses are supposed to add handling of some of | 48 * [_handleDone] methods. Subclasses are supposed to add handling of some of |
49 * the events by overriding these methods. | 49 * the events by overriding these methods. |
50 * | 50 * |
51 * This class is intended for internal use only. | 51 * This class is intended for internal use only. |
52 */ | 52 */ |
53 class _ForwardingMultiStream<S, T> extends _MultiStreamImpl<T> { | 53 /** |
54 Stream<S> _source = null; | 54 * |
55 StreamSubscription _subscription = null; | 55 * Handles backwards propagation of subscription and pause. |
| 56 */ |
| 57 class _ForwardingStreamTransformer<S, T> implements StreamTransformer<S, T> { |
| 58 Stream<T> _stream; |
| 59 Stream<S> _source; |
| 60 StreamSubscription<S> _subscription; |
56 | 61 |
57 void _subscribeToSource() { | 62 Stream<T> _createOutputStream() { |
58 _subscription = _source.listen(this._handleData, | 63 if (_source.isSingleSubscription) { |
59 onError: this._handleError, | 64 return new _ForwardingSingleStream<T>(this); |
60 onDone: this._handleDone); | 65 } |
61 if (_isPaused) { | 66 return new _ForwardingMultiStream<T>(this); |
62 _subscription.pause(); | 67 } |
| 68 |
| 69 Stream<T> bind(Stream<S> source) { |
| 70 if (_source != null) { |
| 71 throw new StateError("Transformer source already bound"); |
| 72 } |
| 73 _source = source; |
| 74 _stream = _createOutputStream(); |
| 75 return _stream; |
| 76 } |
| 77 |
| 78 void _onPauseStateChange(bool isPaused) { |
| 79 if (isPaused) { |
| 80 if (_subscription != null) { |
| 81 _subscription.pause(); |
| 82 } |
| 83 } else { |
| 84 if (_subscription != null) { |
| 85 _subscription.resume(); |
| 86 } |
63 } | 87 } |
64 } | 88 } |
65 | 89 |
66 /** | 90 /** |
67 * Subscribe or unsubscribe on [source] depending on whether | 91 * Subscribe or unsubscribe on [_source] depending on whether |
68 * [stream] has subscribers. | 92 * [_stream] has subscribers. |
69 */ | 93 */ |
70 void _onSubscriptionStateChange() { | 94 void _onSubscriptionStateChange(bool hasSubscribers) { |
71 if (_hasSubscribers) { | 95 if (hasSubscribers) { |
72 assert(_subscription == null); | 96 assert(_subscription == null); |
73 if (_source != null) { | 97 _subscription = _source.listen(this._handleData, |
74 _subscribeToSource(); | 98 onError: this._handleError, |
75 } | 99 onDone: this._handleDone); |
76 } else { | 100 } else { |
77 if (_subscription != null) { | 101 // TODO(lrn): Check why this can happen. |
78 _subscription.cancel(); | 102 if (_subscription == null) return; |
79 _subscription = null; | 103 _subscription.cancel(); |
80 } | 104 _subscription = null; |
81 } | |
82 } | |
83 | |
84 void _onPauseStateChange() { | |
85 if (_subscription == null) return; | |
86 if (isPaused) { | |
87 _subscription.pause(); | |
88 } else { | |
89 _subscription.resume(); | |
90 } | 105 } |
91 } | 106 } |
92 | 107 |
93 void _handleData(S inputEvent) { | 108 void _handleData(S inputEvent) { |
94 var outputEvent = inputEvent; | 109 var outputEvent = inputEvent; |
95 _add(outputEvent); | 110 _stream._add(outputEvent); |
96 } | 111 } |
97 | 112 |
98 void _handleError(AsyncError error) { | 113 void _handleError(AsyncError error) { |
99 _signalError(error); | 114 _stream._signalError(error); |
100 } | 115 } |
101 | 116 |
102 void _handleDone() { | 117 void _handleDone() { |
103 _close(); | 118 _stream._close(); |
104 } | 119 } |
105 } | 120 } |
106 | 121 |
| 122 class _ForwardingMultiStream<T> extends _MultiStreamImpl<T> { |
| 123 _ForwardingStreamTransformer _transformer; |
| 124 _ForwardingMultiStream(this._transformer); |
107 | 125 |
108 abstract class _ForwardingTransformer<S, T> extends _ForwardingMultiStream<S, T> | 126 _onSubscriptionStateChange() { |
109 implements StreamTransformer<S, T> { | 127 _transformer._onSubscriptionStateChange(_hasSubscribers); |
110 Stream<T> bind(Stream<S> source) { | 128 } |
111 if (_source != null) throw new StateError("Already bound to source."); | 129 |
112 _source = source; | 130 _onPauseStateChange() { |
113 if (_hasSubscribers) { | 131 _transformer._onPauseStateChange(_isPaused); |
114 _subscribeToSource(); | |
115 } | |
116 return this; | |
117 } | 132 } |
118 } | 133 } |
119 | 134 |
| 135 class _ForwardingSingleStream<T> extends _SingleStreamImpl<T> { |
| 136 _ForwardingStreamTransformer _transformer; |
| 137 _ForwardingSingleStream(this._transformer); |
| 138 |
| 139 _onSubscriptionStateChange() { |
| 140 _transformer._onSubscriptionStateChange(_hasSubscribers); |
| 141 } |
| 142 |
| 143 _onPauseStateChange() { |
| 144 _transformer._onPauseStateChange(_isPaused); |
| 145 } |
| 146 } |
| 147 |
| 148 |
120 // ------------------------------------------------------------------- | 149 // ------------------------------------------------------------------- |
121 // Stream transformers used by the default Stream implementation. | 150 // Stream transformers used by the default Stream implementation. |
122 // ------------------------------------------------------------------- | 151 // ------------------------------------------------------------------- |
123 | 152 |
124 typedef bool _Predicate<T>(T value); | 153 typedef bool _Predicate<T>(T value); |
125 | 154 |
126 class WhereStream<T> extends _ForwardingTransformer<T, T> { | 155 class WhereTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
127 final _Predicate<T> _test; | 156 final _Predicate<T> _test; |
128 | 157 |
129 WhereStream(bool test(T value)) | 158 WhereTransformer(bool test(T value)) |
130 : this._test = test; | 159 : this._test = test; |
131 | 160 |
132 void _handleData(T inputEvent) { | 161 void _handleData(T inputEvent) { |
133 bool satisfies; | 162 bool satisfies; |
134 try { | 163 try { |
135 satisfies = _test(inputEvent); | 164 satisfies = _test(inputEvent); |
136 } catch (e, s) { | 165 } catch (e, s) { |
137 _signalError(_asyncError(e, s)); | 166 _stream._signalError(_asyncError(e, s)); |
138 return; | 167 return; |
139 } | 168 } |
140 if (satisfies) { | 169 if (satisfies) { |
141 _add(inputEvent); | 170 _stream._add(inputEvent); |
142 } | 171 } |
143 } | 172 } |
144 } | 173 } |
145 | 174 |
146 | 175 |
147 typedef T _Transformation<S, T>(S value); | 176 typedef T _Transformation<S, T>(S value); |
148 | 177 |
149 /** | 178 /** |
150 * A stream pipe that converts data events before passing them on. | 179 * A stream pipe that converts data events before passing them on. |
151 */ | 180 */ |
152 class MapStream<S, T> extends _ForwardingTransformer<S, T> { | 181 class MapTransformer<S, T> extends _ForwardingStreamTransformer<S, T> { |
153 final _Transformation _transform; | 182 final _Transformation _transform; |
154 | 183 |
155 MapStream(T transform(S event)) | 184 MapTransformer(T transform(S event)) |
156 : this._transform = transform; | 185 : this._transform = transform; |
157 | 186 |
158 void _handleData(S inputEvent) { | 187 void _handleData(S inputEvent) { |
159 T outputEvent; | 188 T outputEvent; |
160 try { | 189 try { |
161 outputEvent = _transform(inputEvent); | 190 outputEvent = _transform(inputEvent); |
162 } catch (e, s) { | 191 } catch (e, s) { |
163 _signalError(_asyncError(e, s)); | 192 _stream._signalError(_asyncError(e, s)); |
164 return; | 193 return; |
165 } | 194 } |
166 _add(outputEvent); | 195 _stream._add(outputEvent); |
167 } | 196 } |
168 } | 197 } |
169 | 198 |
170 /** | 199 /** |
171 * A stream pipe that converts data events before passing them on. | 200 * A stream pipe that converts data events before passing them on. |
172 */ | 201 */ |
173 class ExpandStream<S, T> extends _ForwardingTransformer<S, T> { | 202 class ExpandTransformer<S, T> extends _ForwardingStreamTransformer<S, T> { |
174 final _Transformation<S, Iterable<T>> _expand; | 203 final _Transformation<S, Iterable<T>> _expand; |
175 | 204 |
176 ExpandStream(Iterable<T> expand(S event)) | 205 ExpandTransformer(Iterable<T> expand(S event)) |
177 : this._expand = expand; | 206 : this._expand = expand; |
178 | 207 |
179 void _handleData(S inputEvent) { | 208 void _handleData(S inputEvent) { |
180 try { | 209 try { |
181 for (T value in _expand(inputEvent)) { | 210 for (T value in _expand(inputEvent)) { |
182 _add(value); | 211 _stream._add(value); |
183 } | 212 } |
184 } catch (e, s) { | 213 } catch (e, s) { |
185 // If either _expand or iterating the generated iterator throws, | 214 // If either _expand or iterating the generated iterator throws, |
186 // we abort the iteration. | 215 // we abort the iteration. |
187 _signalError(_asyncError(e, s)); | 216 _stream._signalError(_asyncError(e, s)); |
188 } | 217 } |
189 } | 218 } |
190 } | 219 } |
191 | 220 |
192 | 221 |
193 typedef void _ErrorTransformation(AsyncError error); | 222 typedef void _ErrorTransformation(AsyncError error); |
194 typedef bool _ErrorTest(error); | 223 typedef bool _ErrorTest(error); |
195 | 224 |
196 /** | 225 /** |
197 * A stream pipe that converts or disposes error events | 226 * A stream pipe that converts or disposes error events |
198 * before passing them on. | 227 * before passing them on. |
199 */ | 228 */ |
200 class HandleErrorStream<T> extends _ForwardingTransformer<T, T> { | 229 class HandleErrorTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
201 final _ErrorTransformation _transform; | 230 final _ErrorTransformation _transform; |
202 final _ErrorTest _test; | 231 final _ErrorTest _test; |
203 | 232 |
204 HandleErrorStream(void transform(AsyncError event), bool test(error)) | 233 HandleErrorTransformer(void transform(AsyncError event), bool test(error)) |
205 : this._transform = transform, this._test = test; | 234 : this._transform = transform, this._test = test; |
206 | 235 |
207 void _handleError(AsyncError error) { | 236 void _handleError(AsyncError error) { |
208 bool matches = true; | 237 bool matches = true; |
209 if (_test != null) { | 238 if (_test != null) { |
210 try { | 239 try { |
211 matches = _test(error.error); | 240 matches = _test(error.error); |
212 } catch (e, s) { | 241 } catch (e, s) { |
213 _signalError(_asyncError(e, s, error)); | 242 _stream._signalError(_asyncError(e, s, error)); |
214 return; | 243 return; |
215 } | 244 } |
216 } | 245 } |
217 if (matches) { | 246 if (matches) { |
218 try { | 247 try { |
219 _transform(error); | 248 _transform(error); |
220 } catch (e, s) { | 249 } catch (e, s) { |
221 _signalError(_asyncError(e, s, error)); | 250 _stream._signalError(_asyncError(e, s, error)); |
222 return; | 251 return; |
223 } | 252 } |
224 } else { | 253 } else { |
225 _signalError(error); | 254 _stream._signalError(error); |
226 } | 255 } |
227 } | 256 } |
228 } | 257 } |
229 | 258 |
230 | 259 |
231 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); | 260 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); |
232 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); | 261 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); |
233 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); | 262 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); |
234 | 263 |
235 /** | 264 /** |
236 * A stream pipe that intercepts all events and can generate any event as | 265 * A stream transfomer that intercepts all events and can generate any event as |
237 * output. | 266 * output. |
238 * | 267 * |
239 * Each incoming event on this [StreamSink] is passed to the corresponding | 268 * Each incoming event on the source stream is passed to the corresponding |
240 * provided event handler, along with a [StreamSink] linked to the [output] of | 269 * provided event handler, along with a [StreamSink] linked to the output |
241 * this pipe. | 270 * Stream. |
242 * The handler can then decide which events to send to the output | 271 * The handler can then decide exactly which events to send to the output. |
243 */ | 272 */ |
244 class PipeStream<S, T> extends _ForwardingTransformer<S, T> { | 273 class _StreamTransformerImpl<S, T> extends _ForwardingStreamTransformer<S, T> { |
245 final _TransformDataHandler<S, T> _onData; | 274 final _TransformDataHandler<S, T> _onData; |
246 final _TransformErrorHandler<T> _onError; | 275 final _TransformErrorHandler<T> _onError; |
247 final _TransformDoneHandler<T> _onDone; | 276 final _TransformDoneHandler<T> _onDone; |
248 StreamSink<T> _sink; | 277 StreamSink<T> _sink; |
249 | 278 |
250 PipeStream({void onData(S data, StreamSink<T> sink), | 279 _StreamTransformerImpl(void onData(S data, StreamSink<T> sink), |
251 void onError(AsyncError data, StreamSink<T> sink), | 280 void onError(AsyncError data, StreamSink<T> sink), |
252 void onDone(StreamSink<T> sink)}) | 281 void onDone(StreamSink<T> sink)) |
253 : this._onData = (onData == null ? _defaultHandleData : onData), | 282 : this._onData = (onData == null ? _defaultHandleData : onData), |
254 this._onError = (onError == null ? _defaultHandleError : onError), | 283 this._onError = (onError == null ? _defaultHandleError : onError), |
255 this._onDone = (onDone == null ? _defaultHandleDone : onDone) { | 284 this._onDone = (onDone == null ? _defaultHandleDone : onDone); |
256 // Cache the sink wrapper to avoid creating a new one for each event. | 285 |
257 this._sink = new _StreamImplSink(this); | 286 Stream<T> bind(Stream<S> source) { |
| 287 Stream<T> stream = super.bind(source); |
| 288 // Cache a Sink object to avoid creating a new one for each event. |
| 289 _sink = new _StreamImplSink(stream); |
| 290 return stream; |
258 } | 291 } |
259 | 292 |
260 void _handleData(S data) { | 293 void _handleData(S data) { |
261 try { | 294 try { |
262 return _onData(data, _sink); | 295 _onData(data, _sink); |
263 } catch (e, s) { | 296 } catch (e, s) { |
264 _signalError(_asyncError(e, s)); | 297 _stream._signalError(_asyncError(e, s)); |
265 } | 298 } |
266 } | 299 } |
267 | 300 |
268 void _handleError(AsyncError error) { | 301 void _handleError(AsyncError error) { |
269 try { | 302 try { |
270 _onError(error, _sink); | 303 _onError(error, _sink); |
271 } catch (e, s) { | 304 } catch (e, s) { |
272 _signalError(_asyncError(e, s, error)); | 305 _stream._signalError(_asyncError(e, s, error)); |
273 } | 306 } |
274 } | 307 } |
275 | 308 |
276 void _handleDone() { | 309 void _handleDone() { |
277 try { | 310 try { |
278 _onDone(_sink); | 311 _onDone(_sink); |
279 } catch (e, s) { | 312 } catch (e, s) { |
280 _signalError(_asyncError(e, s)); | 313 _stream._signalError(_asyncError(e, s)); |
281 } | 314 } |
282 } | 315 } |
283 | 316 |
284 /** Default data handler forwards all data. */ | 317 /** Default data handler forwards all data. */ |
285 static void _defaultHandleData(dynamic data, StreamSink sink) { | 318 static void _defaultHandleData(var data, StreamSink sink) { |
286 sink.add(data); | 319 sink.add(data); |
287 } | 320 } |
288 /** Default error handler forwards all errors. */ | 321 /** Default error handler forwards all errors. */ |
289 static void _defaultHandleError(AsyncError error, StreamSink sink) { | 322 static void _defaultHandleError(AsyncError error, StreamSink sink) { |
290 sink.signalError(error); | 323 sink.signalError(error); |
291 } | 324 } |
292 /** Default done handler forwards done. */ | 325 /** Default done handler forwards done. */ |
293 static void _defaultHandleDone(StreamSink sink) { | 326 static void _defaultHandleDone(StreamSink sink) { |
294 sink.close(); | 327 sink.close(); |
295 } | 328 } |
296 } | 329 } |
297 | 330 |
298 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ | 331 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ |
299 class _StreamImplSink<T> implements StreamSink<T> { | 332 class _StreamImplSink<T> implements StreamSink<T> { |
300 _StreamImpl<T> _target; | 333 _StreamImpl<T> _target; |
301 _StreamImplSink(this._target); | 334 _StreamImplSink(this._target); |
302 void add(T data) { _target._add(data); } | 335 void add(T data) { _target._add(data); } |
303 void signalError(AsyncError error) { _target._signalError(error); } | 336 void signalError(AsyncError error) { _target._signalError(error); } |
304 void close() { _target._close(); } | 337 void close() { _target._close(); } |
305 } | 338 } |
306 | 339 |
307 /** | |
308 * A stream pipe that intercepts all events and can generate any event as | |
309 * output. | |
310 * | |
311 * Each incoming event on this [StreamSink] is passed to the corresponding | |
312 * method on [transform], along with a [StreamSink] linked to the [output] of | |
313 * this pipe. | |
314 * The handler can then decide which events to send to the output | |
315 */ | |
316 class TransformStream<S, T> extends _ForwardingTransformer<S, T> { | |
317 final StreamTransformer<S, T> _transform; | |
318 StreamSink<T> _sink; | |
319 | 340 |
320 TransformStream(StreamTransformer<S, T> transform) | 341 class TakeTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
321 : this._transform = transform { | |
322 // Cache the sink wrapper to avoid creating a new one for each event. | |
323 this._sink = new _StreamImplSink(this); | |
324 } | |
325 | |
326 void _handleData(S data) { | |
327 try { | |
328 return _transform.handleData(data, _sink); | |
329 } catch (e, s) { | |
330 _controller.signalError(_asyncError(e, s)); | |
331 } | |
332 } | |
333 | |
334 void _handleError(AsyncError error) { | |
335 try { | |
336 _transform.handleError(error, _sink); | |
337 } catch (e, s) { | |
338 _controller.signalError(_asyncError(e, s, error)); | |
339 } | |
340 } | |
341 | |
342 void _handleDone() { | |
343 try { | |
344 _transform.handleDone(_sink); | |
345 } catch (e, s) { | |
346 _controller.signalError(_asyncError(e, s)); | |
347 } | |
348 } | |
349 } | |
350 | |
351 | |
352 /** Helper class for transforming three functions into a StreamTransformer. */ | |
353 class _StreamTransformerFunctionWrapper<S, T> | |
354 extends _StreamTransformer<S, T> { | |
355 final _TransformDataHandler<S, T> _handleData; | |
356 final _TransformErrorHandler<T> _handleError; | |
357 final _TransformDoneHandler<T> _handleDone; | |
358 | |
359 _StreamTransformerFunctionWrapper({ | |
360 void onData(S data, StreamSink<T> sink), | |
361 void onError(AsyncError data, StreamSink<T> sink), | |
362 void onDone(StreamSink<T> sink)}) | |
363 : _handleData = onData != null ? onData : PipeStream._defaultHandleData, | |
364 _handleError = onError != null ? onError | |
365 : PipeStream._defaultHandleError, | |
366 _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone; | |
367 | |
368 void handleData(S data, StreamSink<T> sink) { | |
369 return _handleData(data, sink); | |
370 } | |
371 | |
372 void handleError(AsyncError error, StreamSink<T> sink) { | |
373 _handleError(error, sink); | |
374 } | |
375 | |
376 void handleDone(StreamSink<T> sink) { | |
377 _handleDone(sink); | |
378 } | |
379 } | |
380 | |
381 | |
382 class TakeStream<T> extends _ForwardingTransformer<T, T> { | |
383 int _remaining; | 342 int _remaining; |
384 | 343 |
385 TakeStream(int count) | 344 TakeTransformer(int count) |
386 : this._remaining = count { | 345 : this._remaining = count { |
387 // This test is done early to avoid handling an async error | 346 // This test is done early to avoid handling an async error |
388 // in the _handleData method. | 347 // in the _handleData method. |
389 if (count is! int) throw new ArgumentError(count); | 348 if (count is! int) throw new ArgumentError(count); |
390 } | 349 } |
391 | 350 |
392 void _handleData(T inputEvent) { | 351 void _handleData(T inputEvent) { |
393 if (_remaining > 0) { | 352 if (_remaining > 0) { |
394 _add(inputEvent); | 353 _stream._add(inputEvent); |
395 _remaining -= 1; | 354 _remaining -= 1; |
396 if (_remaining == 0) { | 355 if (_remaining == 0) { |
397 // Closing also unsubscribes all subscribers, which unsubscribes | 356 // Closing also unsubscribes all subscribers, which unsubscribes |
398 // this from source. | 357 // this from source. |
399 _close(); | 358 _stream._close(); |
400 } | 359 } |
401 } | 360 } |
402 } | 361 } |
403 } | 362 } |
404 | 363 |
405 | 364 |
406 class TakeWhileStream<T> extends _ForwardingTransformer<T, T> { | 365 class TakeWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
407 final _Predicate<T> _test; | 366 final _Predicate<T> _test; |
408 | 367 |
409 TakeWhileStream(bool test(T value)) | 368 TakeWhileTransformer(bool test(T value)) |
410 : this._test = test; | 369 : this._test = test; |
411 | 370 |
412 void _handleData(T inputEvent) { | 371 void _handleData(T inputEvent) { |
413 bool satisfies; | 372 bool satisfies; |
414 try { | 373 try { |
415 satisfies = _test(inputEvent); | 374 satisfies = _test(inputEvent); |
416 } catch (e, s) { | 375 } catch (e, s) { |
417 _signalError(_asyncError(e, s)); | 376 _stream._signalError(_asyncError(e, s)); |
418 // The test didn't say true. Didn't say false either, but we stop anyway. | 377 // The test didn't say true. Didn't say false either, but we stop anyway. |
419 _close(); | 378 _stream._close(); |
420 return; | 379 return; |
421 } | 380 } |
422 if (satisfies) { | 381 if (satisfies) { |
423 _add(inputEvent); | 382 _stream._add(inputEvent); |
424 } else { | 383 } else { |
425 _close(); | 384 _stream._close(); |
426 } | 385 } |
427 } | 386 } |
428 } | 387 } |
429 | 388 |
430 class SkipStream<T> extends _ForwardingTransformer<T, T> { | 389 class SkipTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
431 int _remaining; | 390 int _remaining; |
432 | 391 |
433 SkipStream(int count) | 392 SkipTransformer(int count) |
434 : this._remaining = count{ | 393 : this._remaining = count{ |
435 // This test is done early to avoid handling an async error | 394 // This test is done early to avoid handling an async error |
436 // in the _handleData method. | 395 // in the _handleData method. |
437 if (count is! int) throw new ArgumentError(count); | 396 if (count is! int || count < 0) throw new ArgumentError(count); |
438 } | 397 } |
439 | 398 |
440 void _handleData(T inputEvent) { | 399 void _handleData(T inputEvent) { |
441 if (_remaining > 0) { | 400 if (_remaining > 0) { |
442 _remaining--; | 401 _remaining--; |
443 return; | 402 return; |
444 } | 403 } |
445 return _add(inputEvent); | 404 return _stream._add(inputEvent); |
446 } | 405 } |
447 } | 406 } |
448 | 407 |
449 class SkipWhileStream<T> extends _ForwardingTransformer<T, T> { | 408 class SkipWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
450 final _Predicate<T> _test; | 409 final _Predicate<T> _test; |
451 bool _hasFailed = false; | 410 bool _hasFailed = false; |
452 | 411 |
453 SkipWhileStream(bool test(T value)) | 412 SkipWhileTransformer(bool test(T value)) |
454 : this._test = test; | 413 : this._test = test; |
455 | 414 |
456 void _handleData(T inputEvent) { | 415 void _handleData(T inputEvent) { |
457 if (_hasFailed) { | 416 if (_hasFailed) { |
458 _add(inputEvent); | 417 _stream._add(inputEvent); |
459 } | 418 } |
460 bool satisfies; | 419 bool satisfies; |
461 try { | 420 try { |
462 satisfies = _test(inputEvent); | 421 satisfies = _test(inputEvent); |
463 } catch (e, s) { | 422 } catch (e, s) { |
464 _signalError(_asyncError(e, s)); | 423 _stream._signalError(_asyncError(e, s)); |
465 // A failure to return a boolean is considered "not matching". | 424 // A failure to return a boolean is considered "not matching". |
466 _hasFailed = true; | 425 _hasFailed = true; |
467 return; | 426 return; |
468 } | 427 } |
469 if (!satisfies) { | 428 if (!satisfies) { |
470 _hasFailed = true; | 429 _hasFailed = true; |
471 _add(inputEvent); | 430 _stream._add(inputEvent); |
472 } | 431 } |
473 } | 432 } |
474 } | 433 } |
475 | 434 |
476 typedef bool _Equality<T>(T a, T b); | 435 typedef bool _Equality<T>(T a, T b); |
477 | 436 |
478 class DistinctStream<T> extends _ForwardingTransformer<T, T> { | 437 class DistinctTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
479 static var _SENTINEL = new Object(); | 438 static var _SENTINEL = new Object(); |
480 | 439 |
481 _Equality<T> _equals; | 440 _Equality<T> _equals; |
482 var _previous = _SENTINEL; | 441 var _previous = _SENTINEL; |
483 | 442 |
484 DistinctStream(bool equals(T a, T b)) | 443 DistinctTransformer(bool equals(T a, T b)) |
485 : _equals = equals; | 444 : _equals = equals; |
486 | 445 |
487 void _handleData(T inputEvent) { | 446 void _handleData(T inputEvent) { |
488 if (identical(_previous, _SENTINEL)) { | 447 if (identical(_previous, _SENTINEL)) { |
489 _previous = inputEvent; | 448 _previous = inputEvent; |
490 return _add(inputEvent); | 449 return _stream._add(inputEvent); |
491 } else { | 450 } else { |
492 bool isEqual; | 451 bool isEqual; |
493 try { | 452 try { |
494 if (_equals == null) { | 453 if (_equals == null) { |
495 isEqual = (_previous == inputEvent); | 454 isEqual = (_previous == inputEvent); |
496 } else { | 455 } else { |
497 isEqual = _equals(_previous, inputEvent); | 456 isEqual = _equals(_previous, inputEvent); |
498 } | 457 } |
499 } catch (e, s) { | 458 } catch (e, s) { |
500 _signalError(_asyncError(e, s)); | 459 _stream._signalError(_asyncError(e, s)); |
501 return null; | 460 return null; |
502 } | 461 } |
503 if (!isEqual) { | 462 if (!isEqual) { |
504 _add(inputEvent); | 463 _stream._add(inputEvent); |
505 _previous = inputEvent; | 464 _previous = inputEvent; |
506 } | 465 } |
507 } | 466 } |
508 } | 467 } |
509 } | 468 } |
OLD | NEW |