OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
8 _runUserCode(userCode(), | 8 _runUserCode( |
9 onSuccess(value), | 9 userCode(), onSuccess(value), onError(error, StackTrace stackTrace)) { |
10 onError(error, StackTrace stackTrace)) { | |
11 try { | 10 try { |
12 onSuccess(userCode()); | 11 onSuccess(userCode()); |
13 } catch (e, s) { | 12 } catch (e, s) { |
14 AsyncError replacement = Zone.current.errorCallback(e, s); | 13 AsyncError replacement = Zone.current.errorCallback(e, s); |
15 if (replacement == null) { | 14 if (replacement == null) { |
16 onError(e, s); | 15 onError(e, s); |
17 } else { | 16 } else { |
18 var error = _nonNullError(replacement.error); | 17 var error = _nonNullError(replacement.error); |
19 var stackTrace = replacement.stackTrace; | 18 var stackTrace = replacement.stackTrace; |
20 onError(error, stackTrace); | 19 onError(error, stackTrace); |
21 } | 20 } |
22 } | 21 } |
23 } | 22 } |
24 | 23 |
25 /** Helper function to cancel a subscription and wait for the potential future, | 24 /** Helper function to cancel a subscription and wait for the potential future, |
26 before completing with an error. */ | 25 before completing with an error. */ |
27 void _cancelAndError(StreamSubscription subscription, | 26 void _cancelAndError(StreamSubscription subscription, _Future future, error, |
28 _Future future, | 27 StackTrace stackTrace) { |
29 error, | |
30 StackTrace stackTrace) { | |
31 var cancelFuture = subscription.cancel(); | 28 var cancelFuture = subscription.cancel(); |
32 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { | 29 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { |
33 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); | 30 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); |
34 } else { | 31 } else { |
35 future._completeError(error, stackTrace); | 32 future._completeError(error, stackTrace); |
36 } | 33 } |
37 } | 34 } |
38 | 35 |
39 void _cancelAndErrorWithReplacement(StreamSubscription subscription, | 36 void _cancelAndErrorWithReplacement(StreamSubscription subscription, |
40 _Future future, | 37 _Future future, error, StackTrace stackTrace) { |
41 error, StackTrace stackTrace) { | |
42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 38 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
43 if (replacement != null) { | 39 if (replacement != null) { |
44 error = _nonNullError(replacement.error); | 40 error = _nonNullError(replacement.error); |
45 stackTrace = replacement.stackTrace; | 41 stackTrace = replacement.stackTrace; |
46 } | 42 } |
47 _cancelAndError(subscription, future, error, stackTrace); | 43 _cancelAndError(subscription, future, error, stackTrace); |
48 } | 44 } |
49 | 45 |
50 typedef void _ErrorCallback(error, StackTrace stackTrace); | 46 typedef void _ErrorCallback(error, StackTrace stackTrace); |
51 | 47 |
52 /** Helper function to make an onError argument to [_runUserCode]. */ | 48 /** Helper function to make an onError argument to [_runUserCode]. */ |
53 _ErrorCallback _cancelAndErrorClosure( | 49 _ErrorCallback _cancelAndErrorClosure( |
54 StreamSubscription subscription, _Future future) { | 50 StreamSubscription subscription, _Future future) { |
55 return (error, StackTrace stackTrace) { | 51 return (error, StackTrace stackTrace) { |
56 _cancelAndError(subscription, future, error, stackTrace); | 52 _cancelAndError(subscription, future, error, stackTrace); |
57 }; | 53 }; |
58 } | 54 } |
59 | 55 |
60 /** Helper function to cancel a subscription and wait for the potential future, | 56 /** Helper function to cancel a subscription and wait for the potential future, |
61 before completing with a value. */ | 57 before completing with a value. */ |
62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | 58 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { |
63 var cancelFuture = subscription.cancel(); | 59 var cancelFuture = subscription.cancel(); |
64 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { | 60 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { |
65 cancelFuture.whenComplete(() => future._complete(value)); | 61 cancelFuture.whenComplete(() => future._complete(value)); |
66 } else { | 62 } else { |
67 future._complete(value); | 63 future._complete(value); |
68 } | 64 } |
69 } | 65 } |
70 | 66 |
71 | |
72 /** | 67 /** |
73 * A [Stream] that forwards subscriptions to another stream. | 68 * A [Stream] that forwards subscriptions to another stream. |
74 * | 69 * |
75 * This stream implements [Stream], but forwards all subscriptions | 70 * This stream implements [Stream], but forwards all subscriptions |
76 * to an underlying stream, and wraps the returned subscription to | 71 * to an underlying stream, and wraps the returned subscription to |
77 * modify the events on the way. | 72 * modify the events on the way. |
78 * | 73 * |
79 * This class is intended for internal use only. | 74 * This class is intended for internal use only. |
80 */ | 75 */ |
81 abstract class _ForwardingStream<S, T> extends Stream<T> { | 76 abstract class _ForwardingStream<S, T> extends Stream<T> { |
82 final Stream<S> _source; | 77 final Stream<S> _source; |
83 | 78 |
84 _ForwardingStream(this._source); | 79 _ForwardingStream(this._source); |
85 | 80 |
86 bool get isBroadcast => _source.isBroadcast; | 81 bool get isBroadcast => _source.isBroadcast; |
87 | 82 |
88 StreamSubscription<T> listen(void onData(T value), | 83 StreamSubscription<T> listen(void onData(T value), |
89 { Function onError, | 84 {Function onError, void onDone(), bool cancelOnError}) { |
90 void onDone(), | |
91 bool cancelOnError }) { | |
92 cancelOnError = identical(true, cancelOnError); | 85 cancelOnError = identical(true, cancelOnError); |
93 return _createSubscription(onData, onError, onDone, cancelOnError); | 86 return _createSubscription(onData, onError, onDone, cancelOnError); |
94 } | 87 } |
95 | 88 |
96 StreamSubscription<T> _createSubscription( | 89 StreamSubscription<T> _createSubscription(void onData(T data), |
97 void onData(T data), | 90 Function onError, void onDone(), bool cancelOnError) { |
98 Function onError, | |
99 void onDone(), | |
100 bool cancelOnError) { | |
101 return new _ForwardingStreamSubscription<S, T>( | 91 return new _ForwardingStreamSubscription<S, T>( |
102 this, onData, onError, onDone, cancelOnError); | 92 this, onData, onError, onDone, cancelOnError); |
103 } | 93 } |
104 | 94 |
105 // Override the following methods in subclasses to change the behavior. | 95 // Override the following methods in subclasses to change the behavior. |
106 | 96 |
107 void _handleData(S data, _EventSink<T> sink) { | 97 void _handleData(S data, _EventSink<T> sink) { |
108 sink._add(data as Object /*=T*/); | 98 sink._add(data as Object/*=T*/); |
109 } | 99 } |
110 | 100 |
111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | 101 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
112 sink._addError(error, stackTrace); | 102 sink._addError(error, stackTrace); |
113 } | 103 } |
114 | 104 |
115 void _handleDone(_EventSink<T> sink) { | 105 void _handleDone(_EventSink<T> sink) { |
116 sink._close(); | 106 sink._close(); |
117 } | 107 } |
118 } | 108 } |
119 | 109 |
120 /** | 110 /** |
121 * Abstract superclass for subscriptions that forward to other subscriptions. | 111 * Abstract superclass for subscriptions that forward to other subscriptions. |
122 */ | 112 */ |
123 class _ForwardingStreamSubscription<S, T> | 113 class _ForwardingStreamSubscription<S, T> |
124 extends _BufferingStreamSubscription<T> { | 114 extends _BufferingStreamSubscription<T> { |
125 final _ForwardingStream<S, T> _stream; | 115 final _ForwardingStream<S, T> _stream; |
126 | 116 |
127 StreamSubscription<S> _subscription; | 117 StreamSubscription<S> _subscription; |
128 | 118 |
129 _ForwardingStreamSubscription(this._stream, void onData(T data), | 119 _ForwardingStreamSubscription(this._stream, void onData(T data), |
130 Function onError, void onDone(), | 120 Function onError, void onDone(), bool cancelOnError) |
131 bool cancelOnError) | |
132 : super(onData, onError, onDone, cancelOnError) { | 121 : super(onData, onError, onDone, cancelOnError) { |
133 _subscription = _stream._source.listen(_handleData, | 122 _subscription = _stream._source |
134 onError: _handleError, | 123 .listen(_handleData, onError: _handleError, onDone: _handleDone); |
135 onDone: _handleDone); | |
136 } | 124 } |
137 | 125 |
138 // _StreamSink interface. | 126 // _StreamSink interface. |
139 // Transformers sending more than one event have no way to know if the stream | 127 // Transformers sending more than one event have no way to know if the stream |
140 // is canceled or closed after the first, so we just ignore remaining events. | 128 // is canceled or closed after the first, so we just ignore remaining events. |
141 | 129 |
142 void _add(T data) { | 130 void _add(T data) { |
143 if (_isClosed) return; | 131 if (_isClosed) return; |
144 super._add(data); | 132 super._add(data); |
145 } | 133 } |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
193 | 181 |
194 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { | 182 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { |
195 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 183 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
196 if (replacement != null) { | 184 if (replacement != null) { |
197 error = _nonNullError(replacement.error); | 185 error = _nonNullError(replacement.error); |
198 stackTrace = replacement.stackTrace; | 186 stackTrace = replacement.stackTrace; |
199 } | 187 } |
200 sink._addError(error, stackTrace); | 188 sink._addError(error, stackTrace); |
201 } | 189 } |
202 | 190 |
203 | |
204 class _WhereStream<T> extends _ForwardingStream<T, T> { | 191 class _WhereStream<T> extends _ForwardingStream<T, T> { |
205 final _Predicate<T> _test; | 192 final _Predicate<T> _test; |
206 | 193 |
207 _WhereStream(Stream<T> source, bool test(T value)) | 194 _WhereStream(Stream<T> source, bool test(T value)) |
208 : _test = test, super(source); | 195 : _test = test, |
| 196 super(source); |
209 | 197 |
210 void _handleData(T inputEvent, _EventSink<T> sink) { | 198 void _handleData(T inputEvent, _EventSink<T> sink) { |
211 bool satisfies; | 199 bool satisfies; |
212 try { | 200 try { |
213 satisfies = _test(inputEvent); | 201 satisfies = _test(inputEvent); |
214 } catch (e, s) { | 202 } catch (e, s) { |
215 _addErrorWithReplacement(sink, e, s); | 203 _addErrorWithReplacement(sink, e, s); |
216 return; | 204 return; |
217 } | 205 } |
218 if (satisfies) { | 206 if (satisfies) { |
219 sink._add(inputEvent); | 207 sink._add(inputEvent); |
220 } | 208 } |
221 } | 209 } |
222 } | 210 } |
223 | 211 |
224 | |
225 typedef T _Transformation<S, T>(S value); | 212 typedef T _Transformation<S, T>(S value); |
226 | 213 |
227 /** | 214 /** |
228 * A stream pipe that converts data events before passing them on. | 215 * A stream pipe that converts data events before passing them on. |
229 */ | 216 */ |
230 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 217 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
231 final _Transformation<S, T> _transform; | 218 final _Transformation<S, T> _transform; |
232 | 219 |
233 _MapStream(Stream<S> source, T transform(S event)) | 220 _MapStream(Stream<S> source, T transform(S event)) |
234 : this._transform = transform, super(source); | 221 : this._transform = transform, |
| 222 super(source); |
235 | 223 |
236 void _handleData(S inputEvent, _EventSink<T> sink) { | 224 void _handleData(S inputEvent, _EventSink<T> sink) { |
237 T outputEvent; | 225 T outputEvent; |
238 try { | 226 try { |
239 outputEvent = _transform(inputEvent); | 227 outputEvent = _transform(inputEvent); |
240 } catch (e, s) { | 228 } catch (e, s) { |
241 _addErrorWithReplacement(sink, e, s); | 229 _addErrorWithReplacement(sink, e, s); |
242 return; | 230 return; |
243 } | 231 } |
244 sink._add(outputEvent); | 232 sink._add(outputEvent); |
245 } | 233 } |
246 } | 234 } |
247 | 235 |
248 /** | 236 /** |
249 * A stream pipe that converts data events before passing them on. | 237 * A stream pipe that converts data events before passing them on. |
250 */ | 238 */ |
251 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 239 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
252 final _Transformation<S, Iterable<T>> _expand; | 240 final _Transformation<S, Iterable<T>> _expand; |
253 | 241 |
254 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 242 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
255 : this._expand = expand, super(source); | 243 : this._expand = expand, |
| 244 super(source); |
256 | 245 |
257 void _handleData(S inputEvent, _EventSink<T> sink) { | 246 void _handleData(S inputEvent, _EventSink<T> sink) { |
258 try { | 247 try { |
259 for (T value in _expand(inputEvent)) { | 248 for (T value in _expand(inputEvent)) { |
260 sink._add(value); | 249 sink._add(value); |
261 } | 250 } |
262 } catch (e, s) { | 251 } catch (e, s) { |
263 // If either _expand or iterating the generated iterator throws, | 252 // If either _expand or iterating the generated iterator throws, |
264 // we abort the iteration. | 253 // we abort the iteration. |
265 _addErrorWithReplacement(sink, e, s); | 254 _addErrorWithReplacement(sink, e, s); |
266 } | 255 } |
267 } | 256 } |
268 } | 257 } |
269 | 258 |
270 | |
271 typedef bool _ErrorTest(error); | 259 typedef bool _ErrorTest(error); |
272 | 260 |
273 /** | 261 /** |
274 * A stream pipe that converts or disposes error events | 262 * A stream pipe that converts or disposes error events |
275 * before passing them on. | 263 * before passing them on. |
276 */ | 264 */ |
277 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 265 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
278 final Function _transform; | 266 final Function _transform; |
279 final _ErrorTest _test; | 267 final _ErrorTest _test; |
280 | 268 |
281 _HandleErrorStream(Stream<T> source, | 269 _HandleErrorStream(Stream<T> source, Function onError, bool test(error)) |
282 Function onError, | 270 : this._transform = onError, |
283 bool test(error)) | 271 this._test = test, |
284 : this._transform = onError, this._test = test, super(source); | 272 super(source); |
285 | 273 |
286 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | 274 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
287 bool matches = true; | 275 bool matches = true; |
288 if (_test != null) { | 276 if (_test != null) { |
289 try { | 277 try { |
290 matches = _test(error); | 278 matches = _test(error); |
291 } catch (e, s) { | 279 } catch (e, s) { |
292 _addErrorWithReplacement(sink, e, s); | 280 _addErrorWithReplacement(sink, e, s); |
293 return; | 281 return; |
294 } | 282 } |
295 } | 283 } |
296 if (matches) { | 284 if (matches) { |
297 try { | 285 try { |
298 _invokeErrorHandler(_transform, error, stackTrace); | 286 _invokeErrorHandler(_transform, error, stackTrace); |
299 } catch (e, s) { | 287 } catch (e, s) { |
300 if (identical(e, error)) { | 288 if (identical(e, error)) { |
301 sink._addError(error, stackTrace); | 289 sink._addError(error, stackTrace); |
302 } else { | 290 } else { |
303 _addErrorWithReplacement(sink, e, s); | 291 _addErrorWithReplacement(sink, e, s); |
304 } | 292 } |
305 return; | 293 return; |
306 } | 294 } |
307 } else { | 295 } else { |
308 sink._addError(error, stackTrace); | 296 sink._addError(error, stackTrace); |
309 } | 297 } |
310 } | 298 } |
311 } | 299 } |
312 | 300 |
313 | |
314 class _TakeStream<T> extends _ForwardingStream<T, T> { | 301 class _TakeStream<T> extends _ForwardingStream<T, T> { |
315 final int _count; | 302 final int _count; |
316 | 303 |
317 _TakeStream(Stream<T> source, int count) | 304 _TakeStream(Stream<T> source, int count) |
318 : this._count = count, super(source) { | 305 : this._count = count, |
| 306 super(source) { |
319 // This test is done early to avoid handling an async error | 307 // This test is done early to avoid handling an async error |
320 // in the _handleData method. | 308 // in the _handleData method. |
321 if (count is! int) throw new ArgumentError(count); | 309 if (count is! int) throw new ArgumentError(count); |
322 } | 310 } |
323 | 311 |
324 StreamSubscription<T> _createSubscription( | 312 StreamSubscription<T> _createSubscription(void onData(T data), |
325 void onData(T data), | 313 Function onError, void onDone(), bool cancelOnError) { |
326 Function onError, | |
327 void onDone(), | |
328 bool cancelOnError) { | |
329 if (_count == 0) { | 314 if (_count == 0) { |
330 _source.listen(null).cancel(); | 315 _source.listen(null).cancel(); |
331 return new _DoneStreamSubscription<T>(onDone); | 316 return new _DoneStreamSubscription<T>(onDone); |
332 } | 317 } |
333 return new _StateStreamSubscription<T>( | 318 return new _StateStreamSubscription<T>( |
334 this, onData, onError, onDone, cancelOnError, _count); | 319 this, onData, onError, onDone, cancelOnError, _count); |
335 } | 320 } |
336 | 321 |
337 void _handleData(T inputEvent, _EventSink<T> sink) { | 322 void _handleData(T inputEvent, _EventSink<T> sink) { |
338 _StateStreamSubscription<T> subscription = sink; | 323 _StateStreamSubscription<T> subscription = sink; |
(...skipping 14 matching lines...) Expand all Loading... |
353 /** | 338 /** |
354 * A [_ForwardingStreamSubscription] with one extra state field. | 339 * A [_ForwardingStreamSubscription] with one extra state field. |
355 * | 340 * |
356 * Use by several different classes, some storing an integer, others a bool. | 341 * Use by several different classes, some storing an integer, others a bool. |
357 */ | 342 */ |
358 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { | 343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
359 // Raw state field. Typed access provided by getters and setters below. | 344 // Raw state field. Typed access provided by getters and setters below. |
360 var _sharedState; | 345 var _sharedState; |
361 | 346 |
362 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), | 347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
363 Function onError, void onDone(), | 348 Function onError, void onDone(), bool cancelOnError, this._sharedState) |
364 bool cancelOnError, this._sharedState) | |
365 : super(stream, onData, onError, onDone, cancelOnError); | 349 : super(stream, onData, onError, onDone, cancelOnError); |
366 | 350 |
367 bool get _flag => _sharedState; | 351 bool get _flag => _sharedState; |
368 void set _flag(bool flag) { _sharedState = flag; } | 352 void set _flag(bool flag) { |
| 353 _sharedState = flag; |
| 354 } |
| 355 |
369 int get _count => _sharedState; | 356 int get _count => _sharedState; |
370 void set _count(int count) { _sharedState = count; } | 357 void set _count(int count) { |
| 358 _sharedState = count; |
| 359 } |
371 } | 360 } |
372 | 361 |
373 | |
374 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 362 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
375 final _Predicate<T> _test; | 363 final _Predicate<T> _test; |
376 | 364 |
377 _TakeWhileStream(Stream<T> source, bool test(T value)) | 365 _TakeWhileStream(Stream<T> source, bool test(T value)) |
378 : this._test = test, super(source); | 366 : this._test = test, |
| 367 super(source); |
379 | 368 |
380 void _handleData(T inputEvent, _EventSink<T> sink) { | 369 void _handleData(T inputEvent, _EventSink<T> sink) { |
381 bool satisfies; | 370 bool satisfies; |
382 try { | 371 try { |
383 satisfies = _test(inputEvent); | 372 satisfies = _test(inputEvent); |
384 } catch (e, s) { | 373 } catch (e, s) { |
385 _addErrorWithReplacement(sink, e, s); | 374 _addErrorWithReplacement(sink, e, s); |
386 // The test didn't say true. Didn't say false either, but we stop anyway. | 375 // The test didn't say true. Didn't say false either, but we stop anyway. |
387 sink._close(); | 376 sink._close(); |
388 return; | 377 return; |
389 } | 378 } |
390 if (satisfies) { | 379 if (satisfies) { |
391 sink._add(inputEvent); | 380 sink._add(inputEvent); |
392 } else { | 381 } else { |
393 sink._close(); | 382 sink._close(); |
394 } | 383 } |
395 } | 384 } |
396 } | 385 } |
397 | 386 |
398 class _SkipStream<T> extends _ForwardingStream<T, T> { | 387 class _SkipStream<T> extends _ForwardingStream<T, T> { |
399 final int _count; | 388 final int _count; |
400 | 389 |
401 _SkipStream(Stream<T> source, int count) | 390 _SkipStream(Stream<T> source, int count) |
402 : this._count = count, super(source) { | 391 : this._count = count, |
| 392 super(source) { |
403 // This test is done early to avoid handling an async error | 393 // This test is done early to avoid handling an async error |
404 // in the _handleData method. | 394 // in the _handleData method. |
405 if (count is! int || count < 0) throw new ArgumentError(count); | 395 if (count is! int || count < 0) throw new ArgumentError(count); |
406 } | 396 } |
407 | 397 |
408 StreamSubscription<T> _createSubscription( | 398 StreamSubscription<T> _createSubscription(void onData(T data), |
409 void onData(T data), | 399 Function onError, void onDone(), bool cancelOnError) { |
410 Function onError, | |
411 void onDone(), | |
412 bool cancelOnError) { | |
413 return new _StateStreamSubscription<T>( | 400 return new _StateStreamSubscription<T>( |
414 this, onData, onError, onDone, cancelOnError, _count); | 401 this, onData, onError, onDone, cancelOnError, _count); |
415 } | 402 } |
416 | 403 |
417 void _handleData(T inputEvent, _EventSink<T> sink) { | 404 void _handleData(T inputEvent, _EventSink<T> sink) { |
418 _StateStreamSubscription<T> subscription = sink; | 405 _StateStreamSubscription<T> subscription = sink; |
419 int count = subscription._count; | 406 int count = subscription._count; |
420 if (count > 0) { | 407 if (count > 0) { |
421 subscription._count = count - 1; | 408 subscription._count = count - 1; |
422 return; | 409 return; |
423 } | 410 } |
424 sink._add(inputEvent); | 411 sink._add(inputEvent); |
425 } | 412 } |
426 } | 413 } |
427 | 414 |
428 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 415 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
429 final _Predicate<T> _test; | 416 final _Predicate<T> _test; |
430 | 417 |
431 _SkipWhileStream(Stream<T> source, bool test(T value)) | 418 _SkipWhileStream(Stream<T> source, bool test(T value)) |
432 : this._test = test, super(source); | 419 : this._test = test, |
| 420 super(source); |
433 | 421 |
434 StreamSubscription<T> _createSubscription( | 422 StreamSubscription<T> _createSubscription(void onData(T data), |
435 void onData(T data), | 423 Function onError, void onDone(), bool cancelOnError) { |
436 Function onError, | |
437 void onDone(), | |
438 bool cancelOnError) { | |
439 return new _StateStreamSubscription<T>( | 424 return new _StateStreamSubscription<T>( |
440 this, onData, onError, onDone, cancelOnError, false); | 425 this, onData, onError, onDone, cancelOnError, false); |
441 } | 426 } |
442 | 427 |
443 void _handleData(T inputEvent, _EventSink<T> sink) { | 428 void _handleData(T inputEvent, _EventSink<T> sink) { |
444 _StateStreamSubscription<T> subscription = sink; | 429 _StateStreamSubscription<T> subscription = sink; |
445 bool hasFailed = subscription._flag; | 430 bool hasFailed = subscription._flag; |
446 if (hasFailed) { | 431 if (hasFailed) { |
447 sink._add(inputEvent); | 432 sink._add(inputEvent); |
448 return; | 433 return; |
(...skipping 16 matching lines...) Expand all Loading... |
465 | 450 |
466 typedef bool _Equality<T>(T a, T b); | 451 typedef bool _Equality<T>(T a, T b); |
467 | 452 |
468 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 453 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
469 static var _SENTINEL = new Object(); | 454 static var _SENTINEL = new Object(); |
470 | 455 |
471 _Equality<T> _equals; | 456 _Equality<T> _equals; |
472 var _previous = _SENTINEL; | 457 var _previous = _SENTINEL; |
473 | 458 |
474 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 459 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
475 : _equals = equals, super(source); | 460 : _equals = equals, |
| 461 super(source); |
476 | 462 |
477 void _handleData(T inputEvent, _EventSink<T> sink) { | 463 void _handleData(T inputEvent, _EventSink<T> sink) { |
478 if (identical(_previous, _SENTINEL)) { | 464 if (identical(_previous, _SENTINEL)) { |
479 _previous = inputEvent; | 465 _previous = inputEvent; |
480 return sink._add(inputEvent); | 466 return sink._add(inputEvent); |
481 } else { | 467 } else { |
482 bool isEqual; | 468 bool isEqual; |
483 try { | 469 try { |
484 if (_equals == null) { | 470 if (_equals == null) { |
485 isEqual = (_previous == inputEvent); | 471 isEqual = (_previous == inputEvent); |
486 } else { | 472 } else { |
487 isEqual = _equals(_previous as Object /*=T*/, inputEvent); | 473 isEqual = _equals(_previous as Object/*=T*/, inputEvent); |
488 } | 474 } |
489 } catch (e, s) { | 475 } catch (e, s) { |
490 _addErrorWithReplacement(sink, e, s); | 476 _addErrorWithReplacement(sink, e, s); |
491 return null; | 477 return null; |
492 } | 478 } |
493 if (!isEqual) { | 479 if (!isEqual) { |
494 sink._add(inputEvent); | 480 sink._add(inputEvent); |
495 _previous = inputEvent; | 481 _previous = inputEvent; |
496 } | 482 } |
497 } | 483 } |
498 } | 484 } |
499 } | 485 } |
OLD | NEW |