| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
| 8 _runUserCode(userCode(), | 8 _runUserCode( |
| 9 onSuccess(value), | 9 userCode(), onSuccess(value), onError(error, StackTrace stackTrace)) { |
| 10 onError(error, StackTrace stackTrace)) { | |
| 11 try { | 10 try { |
| 12 onSuccess(userCode()); | 11 onSuccess(userCode()); |
| 13 } catch (e, s) { | 12 } catch (e, s) { |
| 14 AsyncError replacement = Zone.current.errorCallback(e, s); | 13 AsyncError replacement = Zone.current.errorCallback(e, s); |
| 15 if (replacement == null) { | 14 if (replacement == null) { |
| 16 onError(e, s); | 15 onError(e, s); |
| 17 } else { | 16 } else { |
| 18 var error = _nonNullError(replacement.error); | 17 var error = _nonNullError(replacement.error); |
| 19 var stackTrace = replacement.stackTrace; | 18 var stackTrace = replacement.stackTrace; |
| 20 onError(error, stackTrace); | 19 onError(error, stackTrace); |
| 21 } | 20 } |
| 22 } | 21 } |
| 23 } | 22 } |
| 24 | 23 |
| 25 /** Helper function to cancel a subscription and wait for the potential future, | 24 /** Helper function to cancel a subscription and wait for the potential future, |
| 26 before completing with an error. */ | 25 before completing with an error. */ |
| 27 void _cancelAndError(StreamSubscription subscription, | 26 void _cancelAndError(StreamSubscription subscription, _Future future, error, |
| 28 _Future future, | 27 StackTrace stackTrace) { |
| 29 error, | |
| 30 StackTrace stackTrace) { | |
| 31 var cancelFuture = subscription.cancel(); | 28 var cancelFuture = subscription.cancel(); |
| 32 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { | 29 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { |
| 33 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); | 30 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); |
| 34 } else { | 31 } else { |
| 35 future._completeError(error, stackTrace); | 32 future._completeError(error, stackTrace); |
| 36 } | 33 } |
| 37 } | 34 } |
| 38 | 35 |
| 39 void _cancelAndErrorWithReplacement(StreamSubscription subscription, | 36 void _cancelAndErrorWithReplacement(StreamSubscription subscription, |
| 40 _Future future, | 37 _Future future, error, StackTrace stackTrace) { |
| 41 error, StackTrace stackTrace) { | |
| 42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 38 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
| 43 if (replacement != null) { | 39 if (replacement != null) { |
| 44 error = _nonNullError(replacement.error); | 40 error = _nonNullError(replacement.error); |
| 45 stackTrace = replacement.stackTrace; | 41 stackTrace = replacement.stackTrace; |
| 46 } | 42 } |
| 47 _cancelAndError(subscription, future, error, stackTrace); | 43 _cancelAndError(subscription, future, error, stackTrace); |
| 48 } | 44 } |
| 49 | 45 |
| 50 typedef void _ErrorCallback(error, StackTrace stackTrace); | 46 typedef void _ErrorCallback(error, StackTrace stackTrace); |
| 51 | 47 |
| 52 /** Helper function to make an onError argument to [_runUserCode]. */ | 48 /** Helper function to make an onError argument to [_runUserCode]. */ |
| 53 _ErrorCallback _cancelAndErrorClosure( | 49 _ErrorCallback _cancelAndErrorClosure( |
| 54 StreamSubscription subscription, _Future future) { | 50 StreamSubscription subscription, _Future future) { |
| 55 return (error, StackTrace stackTrace) { | 51 return (error, StackTrace stackTrace) { |
| 56 _cancelAndError(subscription, future, error, stackTrace); | 52 _cancelAndError(subscription, future, error, stackTrace); |
| 57 }; | 53 }; |
| 58 } | 54 } |
| 59 | 55 |
| 60 /** Helper function to cancel a subscription and wait for the potential future, | 56 /** Helper function to cancel a subscription and wait for the potential future, |
| 61 before completing with a value. */ | 57 before completing with a value. */ |
| 62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | 58 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { |
| 63 var cancelFuture = subscription.cancel(); | 59 var cancelFuture = subscription.cancel(); |
| 64 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { | 60 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { |
| 65 cancelFuture.whenComplete(() => future._complete(value)); | 61 cancelFuture.whenComplete(() => future._complete(value)); |
| 66 } else { | 62 } else { |
| 67 future._complete(value); | 63 future._complete(value); |
| 68 } | 64 } |
| 69 } | 65 } |
| 70 | 66 |
| 71 | |
| 72 /** | 67 /** |
| 73 * A [Stream] that forwards subscriptions to another stream. | 68 * A [Stream] that forwards subscriptions to another stream. |
| 74 * | 69 * |
| 75 * This stream implements [Stream], but forwards all subscriptions | 70 * This stream implements [Stream], but forwards all subscriptions |
| 76 * to an underlying stream, and wraps the returned subscription to | 71 * to an underlying stream, and wraps the returned subscription to |
| 77 * modify the events on the way. | 72 * modify the events on the way. |
| 78 * | 73 * |
| 79 * This class is intended for internal use only. | 74 * This class is intended for internal use only. |
| 80 */ | 75 */ |
| 81 abstract class _ForwardingStream<S, T> extends Stream<T> { | 76 abstract class _ForwardingStream<S, T> extends Stream<T> { |
| 82 final Stream<S> _source; | 77 final Stream<S> _source; |
| 83 | 78 |
| 84 _ForwardingStream(this._source); | 79 _ForwardingStream(this._source); |
| 85 | 80 |
| 86 bool get isBroadcast => _source.isBroadcast; | 81 bool get isBroadcast => _source.isBroadcast; |
| 87 | 82 |
| 88 StreamSubscription<T> listen(void onData(T value), | 83 StreamSubscription<T> listen(void onData(T value), |
| 89 { Function onError, | 84 {Function onError, void onDone(), bool cancelOnError}) { |
| 90 void onDone(), | |
| 91 bool cancelOnError }) { | |
| 92 cancelOnError = identical(true, cancelOnError); | 85 cancelOnError = identical(true, cancelOnError); |
| 93 return _createSubscription(onData, onError, onDone, cancelOnError); | 86 return _createSubscription(onData, onError, onDone, cancelOnError); |
| 94 } | 87 } |
| 95 | 88 |
| 96 StreamSubscription<T> _createSubscription( | 89 StreamSubscription<T> _createSubscription(void onData(T data), |
| 97 void onData(T data), | 90 Function onError, void onDone(), bool cancelOnError) { |
| 98 Function onError, | |
| 99 void onDone(), | |
| 100 bool cancelOnError) { | |
| 101 return new _ForwardingStreamSubscription<S, T>( | 91 return new _ForwardingStreamSubscription<S, T>( |
| 102 this, onData, onError, onDone, cancelOnError); | 92 this, onData, onError, onDone, cancelOnError); |
| 103 } | 93 } |
| 104 | 94 |
| 105 // Override the following methods in subclasses to change the behavior. | 95 // Override the following methods in subclasses to change the behavior. |
| 106 | 96 |
| 107 void _handleData(S data, _EventSink<T> sink) { | 97 void _handleData(S data, _EventSink<T> sink) { |
| 108 sink._add(data as Object /*=T*/); | 98 sink._add(data as Object/*=T*/); |
| 109 } | 99 } |
| 110 | 100 |
| 111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | 101 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
| 112 sink._addError(error, stackTrace); | 102 sink._addError(error, stackTrace); |
| 113 } | 103 } |
| 114 | 104 |
| 115 void _handleDone(_EventSink<T> sink) { | 105 void _handleDone(_EventSink<T> sink) { |
| 116 sink._close(); | 106 sink._close(); |
| 117 } | 107 } |
| 118 } | 108 } |
| 119 | 109 |
| 120 /** | 110 /** |
| 121 * Abstract superclass for subscriptions that forward to other subscriptions. | 111 * Abstract superclass for subscriptions that forward to other subscriptions. |
| 122 */ | 112 */ |
| 123 class _ForwardingStreamSubscription<S, T> | 113 class _ForwardingStreamSubscription<S, T> |
| 124 extends _BufferingStreamSubscription<T> { | 114 extends _BufferingStreamSubscription<T> { |
| 125 final _ForwardingStream<S, T> _stream; | 115 final _ForwardingStream<S, T> _stream; |
| 126 | 116 |
| 127 StreamSubscription<S> _subscription; | 117 StreamSubscription<S> _subscription; |
| 128 | 118 |
| 129 _ForwardingStreamSubscription(this._stream, void onData(T data), | 119 _ForwardingStreamSubscription(this._stream, void onData(T data), |
| 130 Function onError, void onDone(), | 120 Function onError, void onDone(), bool cancelOnError) |
| 131 bool cancelOnError) | |
| 132 : super(onData, onError, onDone, cancelOnError) { | 121 : super(onData, onError, onDone, cancelOnError) { |
| 133 _subscription = _stream._source.listen(_handleData, | 122 _subscription = _stream._source |
| 134 onError: _handleError, | 123 .listen(_handleData, onError: _handleError, onDone: _handleDone); |
| 135 onDone: _handleDone); | |
| 136 } | 124 } |
| 137 | 125 |
| 138 // _StreamSink interface. | 126 // _StreamSink interface. |
| 139 // Transformers sending more than one event have no way to know if the stream | 127 // Transformers sending more than one event have no way to know if the stream |
| 140 // is canceled or closed after the first, so we just ignore remaining events. | 128 // is canceled or closed after the first, so we just ignore remaining events. |
| 141 | 129 |
| 142 void _add(T data) { | 130 void _add(T data) { |
| 143 if (_isClosed) return; | 131 if (_isClosed) return; |
| 144 super._add(data); | 132 super._add(data); |
| 145 } | 133 } |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 193 | 181 |
| 194 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { | 182 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { |
| 195 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 183 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
| 196 if (replacement != null) { | 184 if (replacement != null) { |
| 197 error = _nonNullError(replacement.error); | 185 error = _nonNullError(replacement.error); |
| 198 stackTrace = replacement.stackTrace; | 186 stackTrace = replacement.stackTrace; |
| 199 } | 187 } |
| 200 sink._addError(error, stackTrace); | 188 sink._addError(error, stackTrace); |
| 201 } | 189 } |
| 202 | 190 |
| 203 | |
| 204 class _WhereStream<T> extends _ForwardingStream<T, T> { | 191 class _WhereStream<T> extends _ForwardingStream<T, T> { |
| 205 final _Predicate<T> _test; | 192 final _Predicate<T> _test; |
| 206 | 193 |
| 207 _WhereStream(Stream<T> source, bool test(T value)) | 194 _WhereStream(Stream<T> source, bool test(T value)) |
| 208 : _test = test, super(source); | 195 : _test = test, |
| 196 super(source); |
| 209 | 197 |
| 210 void _handleData(T inputEvent, _EventSink<T> sink) { | 198 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 211 bool satisfies; | 199 bool satisfies; |
| 212 try { | 200 try { |
| 213 satisfies = _test(inputEvent); | 201 satisfies = _test(inputEvent); |
| 214 } catch (e, s) { | 202 } catch (e, s) { |
| 215 _addErrorWithReplacement(sink, e, s); | 203 _addErrorWithReplacement(sink, e, s); |
| 216 return; | 204 return; |
| 217 } | 205 } |
| 218 if (satisfies) { | 206 if (satisfies) { |
| 219 sink._add(inputEvent); | 207 sink._add(inputEvent); |
| 220 } | 208 } |
| 221 } | 209 } |
| 222 } | 210 } |
| 223 | 211 |
| 224 | |
| 225 typedef T _Transformation<S, T>(S value); | 212 typedef T _Transformation<S, T>(S value); |
| 226 | 213 |
| 227 /** | 214 /** |
| 228 * A stream pipe that converts data events before passing them on. | 215 * A stream pipe that converts data events before passing them on. |
| 229 */ | 216 */ |
| 230 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 217 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| 231 final _Transformation<S, T> _transform; | 218 final _Transformation<S, T> _transform; |
| 232 | 219 |
| 233 _MapStream(Stream<S> source, T transform(S event)) | 220 _MapStream(Stream<S> source, T transform(S event)) |
| 234 : this._transform = transform, super(source); | 221 : this._transform = transform, |
| 222 super(source); |
| 235 | 223 |
| 236 void _handleData(S inputEvent, _EventSink<T> sink) { | 224 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 237 T outputEvent; | 225 T outputEvent; |
| 238 try { | 226 try { |
| 239 outputEvent = _transform(inputEvent); | 227 outputEvent = _transform(inputEvent); |
| 240 } catch (e, s) { | 228 } catch (e, s) { |
| 241 _addErrorWithReplacement(sink, e, s); | 229 _addErrorWithReplacement(sink, e, s); |
| 242 return; | 230 return; |
| 243 } | 231 } |
| 244 sink._add(outputEvent); | 232 sink._add(outputEvent); |
| 245 } | 233 } |
| 246 } | 234 } |
| 247 | 235 |
| 248 /** | 236 /** |
| 249 * A stream pipe that converts data events before passing them on. | 237 * A stream pipe that converts data events before passing them on. |
| 250 */ | 238 */ |
| 251 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 239 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| 252 final _Transformation<S, Iterable<T>> _expand; | 240 final _Transformation<S, Iterable<T>> _expand; |
| 253 | 241 |
| 254 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 242 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
| 255 : this._expand = expand, super(source); | 243 : this._expand = expand, |
| 244 super(source); |
| 256 | 245 |
| 257 void _handleData(S inputEvent, _EventSink<T> sink) { | 246 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 258 try { | 247 try { |
| 259 for (T value in _expand(inputEvent)) { | 248 for (T value in _expand(inputEvent)) { |
| 260 sink._add(value); | 249 sink._add(value); |
| 261 } | 250 } |
| 262 } catch (e, s) { | 251 } catch (e, s) { |
| 263 // If either _expand or iterating the generated iterator throws, | 252 // If either _expand or iterating the generated iterator throws, |
| 264 // we abort the iteration. | 253 // we abort the iteration. |
| 265 _addErrorWithReplacement(sink, e, s); | 254 _addErrorWithReplacement(sink, e, s); |
| 266 } | 255 } |
| 267 } | 256 } |
| 268 } | 257 } |
| 269 | 258 |
| 270 | |
| 271 typedef bool _ErrorTest(error); | 259 typedef bool _ErrorTest(error); |
| 272 | 260 |
| 273 /** | 261 /** |
| 274 * A stream pipe that converts or disposes error events | 262 * A stream pipe that converts or disposes error events |
| 275 * before passing them on. | 263 * before passing them on. |
| 276 */ | 264 */ |
| 277 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 265 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 278 final Function _transform; | 266 final Function _transform; |
| 279 final _ErrorTest _test; | 267 final _ErrorTest _test; |
| 280 | 268 |
| 281 _HandleErrorStream(Stream<T> source, | 269 _HandleErrorStream(Stream<T> source, Function onError, bool test(error)) |
| 282 Function onError, | 270 : this._transform = onError, |
| 283 bool test(error)) | 271 this._test = test, |
| 284 : this._transform = onError, this._test = test, super(source); | 272 super(source); |
| 285 | 273 |
| 286 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | 274 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
| 287 bool matches = true; | 275 bool matches = true; |
| 288 if (_test != null) { | 276 if (_test != null) { |
| 289 try { | 277 try { |
| 290 matches = _test(error); | 278 matches = _test(error); |
| 291 } catch (e, s) { | 279 } catch (e, s) { |
| 292 _addErrorWithReplacement(sink, e, s); | 280 _addErrorWithReplacement(sink, e, s); |
| 293 return; | 281 return; |
| 294 } | 282 } |
| 295 } | 283 } |
| 296 if (matches) { | 284 if (matches) { |
| 297 try { | 285 try { |
| 298 _invokeErrorHandler(_transform, error, stackTrace); | 286 _invokeErrorHandler(_transform, error, stackTrace); |
| 299 } catch (e, s) { | 287 } catch (e, s) { |
| 300 if (identical(e, error)) { | 288 if (identical(e, error)) { |
| 301 sink._addError(error, stackTrace); | 289 sink._addError(error, stackTrace); |
| 302 } else { | 290 } else { |
| 303 _addErrorWithReplacement(sink, e, s); | 291 _addErrorWithReplacement(sink, e, s); |
| 304 } | 292 } |
| 305 return; | 293 return; |
| 306 } | 294 } |
| 307 } else { | 295 } else { |
| 308 sink._addError(error, stackTrace); | 296 sink._addError(error, stackTrace); |
| 309 } | 297 } |
| 310 } | 298 } |
| 311 } | 299 } |
| 312 | 300 |
| 313 | |
| 314 class _TakeStream<T> extends _ForwardingStream<T, T> { | 301 class _TakeStream<T> extends _ForwardingStream<T, T> { |
| 315 final int _count; | 302 final int _count; |
| 316 | 303 |
| 317 _TakeStream(Stream<T> source, int count) | 304 _TakeStream(Stream<T> source, int count) |
| 318 : this._count = count, super(source) { | 305 : this._count = count, |
| 306 super(source) { |
| 319 // This test is done early to avoid handling an async error | 307 // This test is done early to avoid handling an async error |
| 320 // in the _handleData method. | 308 // in the _handleData method. |
| 321 if (count is! int) throw new ArgumentError(count); | 309 if (count is! int) throw new ArgumentError(count); |
| 322 } | 310 } |
| 323 | 311 |
| 324 StreamSubscription<T> _createSubscription( | 312 StreamSubscription<T> _createSubscription(void onData(T data), |
| 325 void onData(T data), | 313 Function onError, void onDone(), bool cancelOnError) { |
| 326 Function onError, | |
| 327 void onDone(), | |
| 328 bool cancelOnError) { | |
| 329 if (_count == 0) { | 314 if (_count == 0) { |
| 330 _source.listen(null).cancel(); | 315 _source.listen(null).cancel(); |
| 331 return new _DoneStreamSubscription<T>(onDone); | 316 return new _DoneStreamSubscription<T>(onDone); |
| 332 } | 317 } |
| 333 return new _StateStreamSubscription<T>( | 318 return new _StateStreamSubscription<T>( |
| 334 this, onData, onError, onDone, cancelOnError, _count); | 319 this, onData, onError, onDone, cancelOnError, _count); |
| 335 } | 320 } |
| 336 | 321 |
| 337 void _handleData(T inputEvent, _EventSink<T> sink) { | 322 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 338 _StateStreamSubscription<T> subscription = sink; | 323 _StateStreamSubscription<T> subscription = sink; |
| (...skipping 14 matching lines...) Expand all Loading... |
| 353 /** | 338 /** |
| 354 * A [_ForwardingStreamSubscription] with one extra state field. | 339 * A [_ForwardingStreamSubscription] with one extra state field. |
| 355 * | 340 * |
| 356 * Use by several different classes, some storing an integer, others a bool. | 341 * Use by several different classes, some storing an integer, others a bool. |
| 357 */ | 342 */ |
| 358 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { | 343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
| 359 // Raw state field. Typed access provided by getters and setters below. | 344 // Raw state field. Typed access provided by getters and setters below. |
| 360 var _sharedState; | 345 var _sharedState; |
| 361 | 346 |
| 362 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), | 347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
| 363 Function onError, void onDone(), | 348 Function onError, void onDone(), bool cancelOnError, this._sharedState) |
| 364 bool cancelOnError, this._sharedState) | |
| 365 : super(stream, onData, onError, onDone, cancelOnError); | 349 : super(stream, onData, onError, onDone, cancelOnError); |
| 366 | 350 |
| 367 bool get _flag => _sharedState; | 351 bool get _flag => _sharedState; |
| 368 void set _flag(bool flag) { _sharedState = flag; } | 352 void set _flag(bool flag) { |
| 353 _sharedState = flag; |
| 354 } |
| 355 |
| 369 int get _count => _sharedState; | 356 int get _count => _sharedState; |
| 370 void set _count(int count) { _sharedState = count; } | 357 void set _count(int count) { |
| 358 _sharedState = count; |
| 359 } |
| 371 } | 360 } |
| 372 | 361 |
| 373 | |
| 374 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 362 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| 375 final _Predicate<T> _test; | 363 final _Predicate<T> _test; |
| 376 | 364 |
| 377 _TakeWhileStream(Stream<T> source, bool test(T value)) | 365 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 378 : this._test = test, super(source); | 366 : this._test = test, |
| 367 super(source); |
| 379 | 368 |
| 380 void _handleData(T inputEvent, _EventSink<T> sink) { | 369 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 381 bool satisfies; | 370 bool satisfies; |
| 382 try { | 371 try { |
| 383 satisfies = _test(inputEvent); | 372 satisfies = _test(inputEvent); |
| 384 } catch (e, s) { | 373 } catch (e, s) { |
| 385 _addErrorWithReplacement(sink, e, s); | 374 _addErrorWithReplacement(sink, e, s); |
| 386 // The test didn't say true. Didn't say false either, but we stop anyway. | 375 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 387 sink._close(); | 376 sink._close(); |
| 388 return; | 377 return; |
| 389 } | 378 } |
| 390 if (satisfies) { | 379 if (satisfies) { |
| 391 sink._add(inputEvent); | 380 sink._add(inputEvent); |
| 392 } else { | 381 } else { |
| 393 sink._close(); | 382 sink._close(); |
| 394 } | 383 } |
| 395 } | 384 } |
| 396 } | 385 } |
| 397 | 386 |
| 398 class _SkipStream<T> extends _ForwardingStream<T, T> { | 387 class _SkipStream<T> extends _ForwardingStream<T, T> { |
| 399 final int _count; | 388 final int _count; |
| 400 | 389 |
| 401 _SkipStream(Stream<T> source, int count) | 390 _SkipStream(Stream<T> source, int count) |
| 402 : this._count = count, super(source) { | 391 : this._count = count, |
| 392 super(source) { |
| 403 // This test is done early to avoid handling an async error | 393 // This test is done early to avoid handling an async error |
| 404 // in the _handleData method. | 394 // in the _handleData method. |
| 405 if (count is! int || count < 0) throw new ArgumentError(count); | 395 if (count is! int || count < 0) throw new ArgumentError(count); |
| 406 } | 396 } |
| 407 | 397 |
| 408 StreamSubscription<T> _createSubscription( | 398 StreamSubscription<T> _createSubscription(void onData(T data), |
| 409 void onData(T data), | 399 Function onError, void onDone(), bool cancelOnError) { |
| 410 Function onError, | |
| 411 void onDone(), | |
| 412 bool cancelOnError) { | |
| 413 return new _StateStreamSubscription<T>( | 400 return new _StateStreamSubscription<T>( |
| 414 this, onData, onError, onDone, cancelOnError, _count); | 401 this, onData, onError, onDone, cancelOnError, _count); |
| 415 } | 402 } |
| 416 | 403 |
| 417 void _handleData(T inputEvent, _EventSink<T> sink) { | 404 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 418 _StateStreamSubscription<T> subscription = sink; | 405 _StateStreamSubscription<T> subscription = sink; |
| 419 int count = subscription._count; | 406 int count = subscription._count; |
| 420 if (count > 0) { | 407 if (count > 0) { |
| 421 subscription._count = count - 1; | 408 subscription._count = count - 1; |
| 422 return; | 409 return; |
| 423 } | 410 } |
| 424 sink._add(inputEvent); | 411 sink._add(inputEvent); |
| 425 } | 412 } |
| 426 } | 413 } |
| 427 | 414 |
| 428 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 415 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
| 429 final _Predicate<T> _test; | 416 final _Predicate<T> _test; |
| 430 | 417 |
| 431 _SkipWhileStream(Stream<T> source, bool test(T value)) | 418 _SkipWhileStream(Stream<T> source, bool test(T value)) |
| 432 : this._test = test, super(source); | 419 : this._test = test, |
| 420 super(source); |
| 433 | 421 |
| 434 StreamSubscription<T> _createSubscription( | 422 StreamSubscription<T> _createSubscription(void onData(T data), |
| 435 void onData(T data), | 423 Function onError, void onDone(), bool cancelOnError) { |
| 436 Function onError, | |
| 437 void onDone(), | |
| 438 bool cancelOnError) { | |
| 439 return new _StateStreamSubscription<T>( | 424 return new _StateStreamSubscription<T>( |
| 440 this, onData, onError, onDone, cancelOnError, false); | 425 this, onData, onError, onDone, cancelOnError, false); |
| 441 } | 426 } |
| 442 | 427 |
| 443 void _handleData(T inputEvent, _EventSink<T> sink) { | 428 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 444 _StateStreamSubscription<T> subscription = sink; | 429 _StateStreamSubscription<T> subscription = sink; |
| 445 bool hasFailed = subscription._flag; | 430 bool hasFailed = subscription._flag; |
| 446 if (hasFailed) { | 431 if (hasFailed) { |
| 447 sink._add(inputEvent); | 432 sink._add(inputEvent); |
| 448 return; | 433 return; |
| (...skipping 16 matching lines...) Expand all Loading... |
| 465 | 450 |
| 466 typedef bool _Equality<T>(T a, T b); | 451 typedef bool _Equality<T>(T a, T b); |
| 467 | 452 |
| 468 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 453 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| 469 static var _SENTINEL = new Object(); | 454 static var _SENTINEL = new Object(); |
| 470 | 455 |
| 471 _Equality<T> _equals; | 456 _Equality<T> _equals; |
| 472 var _previous = _SENTINEL; | 457 var _previous = _SENTINEL; |
| 473 | 458 |
| 474 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 459 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
| 475 : _equals = equals, super(source); | 460 : _equals = equals, |
| 461 super(source); |
| 476 | 462 |
| 477 void _handleData(T inputEvent, _EventSink<T> sink) { | 463 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 478 if (identical(_previous, _SENTINEL)) { | 464 if (identical(_previous, _SENTINEL)) { |
| 479 _previous = inputEvent; | 465 _previous = inputEvent; |
| 480 return sink._add(inputEvent); | 466 return sink._add(inputEvent); |
| 481 } else { | 467 } else { |
| 482 bool isEqual; | 468 bool isEqual; |
| 483 try { | 469 try { |
| 484 if (_equals == null) { | 470 if (_equals == null) { |
| 485 isEqual = (_previous == inputEvent); | 471 isEqual = (_previous == inputEvent); |
| 486 } else { | 472 } else { |
| 487 isEqual = _equals(_previous as Object /*=T*/, inputEvent); | 473 isEqual = _equals(_previous as Object/*=T*/, inputEvent); |
| 488 } | 474 } |
| 489 } catch (e, s) { | 475 } catch (e, s) { |
| 490 _addErrorWithReplacement(sink, e, s); | 476 _addErrorWithReplacement(sink, e, s); |
| 491 return null; | 477 return null; |
| 492 } | 478 } |
| 493 if (!isEqual) { | 479 if (!isEqual) { |
| 494 sink._add(inputEvent); | 480 sink._add(inputEvent); |
| 495 _previous = inputEvent; | 481 _previous = inputEvent; |
| 496 } | 482 } |
| 497 } | 483 } |
| 498 } | 484 } |
| 499 } | 485 } |
| OLD | NEW |