| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** |
| 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 * Utility function to attach a stack trace to an [error] if it doesn't have |
| 9 if (error is AsyncError) return error; | 9 * one already. |
| 10 if (cause == null) return new AsyncError(error, stackTrace); | 10 */ |
| 11 return new AsyncError.withCause(error, stackTrace, cause); | 11 _asyncError(Object error, Object stackTrace) { |
| 12 if (stackTrace == null) return error; |
| 13 if (getAttachedStackTrace(error) != null) return error; |
| 14 _attachStackTrace(error, stackTrace); |
| 15 return error; |
| 12 } | 16 } |
| 13 | 17 |
| 14 /** Runs user code and takes actions depending on success or failure. */ | 18 /** Runs user code and takes actions depending on success or failure. */ |
| 15 _runUserCode(userCode(), onSuccess(value), onError(AsyncError error), | 19 _runUserCode(userCode(), onSuccess(value), onError(error)) { |
| 16 { AsyncError cause }) { | |
| 17 var result; | |
| 18 try { | 20 try { |
| 19 result = userCode(); | 21 onSuccess(userCode()); |
| 20 } on AsyncError catch (e) { | |
| 21 return onError(e); | |
| 22 } catch (e, s) { | 22 } catch (e, s) { |
| 23 if (cause == null) { | 23 onError(_asyncError(e, s)); |
| 24 onError(new AsyncError(e, s)); | |
| 25 } else { | |
| 26 onError(new AsyncError.withCause(e, s, cause)); | |
| 27 } | |
| 28 // onError is allowed to return. Don't execute the onSuccess below. | |
| 29 return; | |
| 30 } | 24 } |
| 31 onSuccess(result); | |
| 32 } | 25 } |
| 33 | 26 |
| 34 /** Helper function to make an onError argument to [_runUserCode]. */ | 27 /** Helper function to make an onError argument to [_runUserCode]. */ |
| 35 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => | 28 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => |
| 36 (AsyncError error) { | 29 (error) { |
| 37 subscription.cancel(); | 30 subscription.cancel(); |
| 38 future._setError(error); | 31 future._setError(error); |
| 39 }; | 32 }; |
| 40 | 33 |
| 41 | 34 |
| 42 /** | 35 /** |
| 43 * A [Stream] that forwards subscriptions to another stream. | 36 * A [Stream] that forwards subscriptions to another stream. |
| 44 * | 37 * |
| 45 * This stream implements [Stream], but forwards all subscriptions | 38 * This stream implements [Stream], but forwards all subscriptions |
| 46 * to an underlying stream, and wraps the returned subscription to | 39 * to an underlying stream, and wraps the returned subscription to |
| 47 * modify the events on the way. | 40 * modify the events on the way. |
| 48 * | 41 * |
| 49 * This class is intended for internal use only. | 42 * This class is intended for internal use only. |
| 50 */ | 43 */ |
| 51 abstract class _ForwardingStream<S, T> extends Stream<T> { | 44 abstract class _ForwardingStream<S, T> extends Stream<T> { |
| 52 final Stream<S> _source; | 45 final Stream<S> _source; |
| 53 | 46 |
| 54 _ForwardingStream(this._source); | 47 _ForwardingStream(this._source); |
| 55 | 48 |
| 56 bool get isBroadcast => _source.isBroadcast; | 49 bool get isBroadcast => _source.isBroadcast; |
| 57 | 50 |
| 58 StreamSubscription<T> listen(void onData(T value), | 51 StreamSubscription<T> listen(void onData(T value), |
| 59 { void onError(AsyncError error), | 52 { void onError(error), |
| 60 void onDone(), | 53 void onDone(), |
| 61 bool cancelOnError }) { | 54 bool cancelOnError }) { |
| 62 if (onData == null) onData = _nullDataHandler; | 55 if (onData == null) onData = _nullDataHandler; |
| 63 if (onError == null) onError = _nullErrorHandler; | 56 if (onError == null) onError = _nullErrorHandler; |
| 64 if (onDone == null) onDone = _nullDoneHandler; | 57 if (onDone == null) onDone = _nullDoneHandler; |
| 65 cancelOnError = identical(true, cancelOnError); | 58 cancelOnError = identical(true, cancelOnError); |
| 66 return _createSubscription(onData, onError, onDone, cancelOnError); | 59 return _createSubscription(onData, onError, onDone, cancelOnError); |
| 67 } | 60 } |
| 68 | 61 |
| 69 StreamSubscription<T> _createSubscription(void onData(T value), | 62 StreamSubscription<T> _createSubscription(void onData(T value), |
| 70 void onError(AsyncError error), | 63 void onError(error), |
| 71 void onDone(), | 64 void onDone(), |
| 72 bool cancelOnError) { | 65 bool cancelOnError) { |
| 73 return new _ForwardingStreamSubscription<S, T>( | 66 return new _ForwardingStreamSubscription<S, T>( |
| 74 this, onData, onError, onDone, cancelOnError); | 67 this, onData, onError, onDone, cancelOnError); |
| 75 } | 68 } |
| 76 | 69 |
| 77 // Override the following methods in subclasses to change the behavior. | 70 // Override the following methods in subclasses to change the behavior. |
| 78 | 71 |
| 79 void _handleData(S data, _EventOutputSink<T> sink) { | 72 void _handleData(S data, _EventOutputSink<T> sink) { |
| 80 var outputData = data; | 73 var outputData = data; |
| 81 sink._sendData(outputData); | 74 sink._sendData(outputData); |
| 82 } | 75 } |
| 83 | 76 |
| 84 void _handleError(AsyncError error, _EventOutputSink<T> sink) { | 77 void _handleError(error, _EventOutputSink<T> sink) { |
| 85 sink._sendError(error); | 78 sink._sendError(error); |
| 86 } | 79 } |
| 87 | 80 |
| 88 void _handleDone(_EventOutputSink<T> sink) { | 81 void _handleDone(_EventOutputSink<T> sink) { |
| 89 sink._sendDone(); | 82 sink._sendDone(); |
| 90 } | 83 } |
| 91 } | 84 } |
| 92 | 85 |
| 93 /** | 86 /** |
| 94 * Common behavior of [StreamSubscription] classes. | 87 * Common behavior of [StreamSubscription] classes. |
| (...skipping 14 matching lines...) Expand all Loading... |
| 109 if (_onError == null) _onError = _nullErrorHandler; | 102 if (_onError == null) _onError = _nullErrorHandler; |
| 110 if (_onDone == null) _onDone = _nullDoneHandler; | 103 if (_onDone == null) _onDone = _nullDoneHandler; |
| 111 } | 104 } |
| 112 | 105 |
| 113 // StreamSubscription interface. | 106 // StreamSubscription interface. |
| 114 void onData(void handleData(T event)) { | 107 void onData(void handleData(T event)) { |
| 115 if (handleData == null) handleData = _nullDataHandler; | 108 if (handleData == null) handleData = _nullDataHandler; |
| 116 _onData = handleData; | 109 _onData = handleData; |
| 117 } | 110 } |
| 118 | 111 |
| 119 void onError(void handleError(AsyncError error)) { | 112 void onError(void handleError(error)) { |
| 120 if (handleError == null) handleError = _nullErrorHandler; | 113 if (handleError == null) handleError = _nullErrorHandler; |
| 121 _onError = handleError; | 114 _onError = handleError; |
| 122 } | 115 } |
| 123 | 116 |
| 124 void onDone(void handleDone()) { | 117 void onDone(void handleDone()) { |
| 125 if (handleDone == null) handleDone = _nullDoneHandler; | 118 if (handleDone == null) handleDone = _nullDoneHandler; |
| 126 _onDone = handleDone; | 119 _onDone = handleDone; |
| 127 } | 120 } |
| 128 | 121 |
| 129 void pause([Future resumeSignal]); | 122 void pause([Future resumeSignal]); |
| 130 | 123 |
| 131 void resume(); | 124 void resume(); |
| 132 | 125 |
| 133 void cancel(); | 126 void cancel(); |
| 134 | 127 |
| 135 Future asFuture([var futureValue]) { | 128 Future asFuture([var futureValue]) { |
| 136 _FutureImpl<T> result = new _FutureImpl<T>(); | 129 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 137 | 130 |
| 138 // Overwrite the onDone and onError handlers. | 131 // Overwrite the onDone and onError handlers. |
| 139 onDone(() { result._setValue(futureValue); }); | 132 onDone(() { result._setValue(futureValue); }); |
| 140 onError((AsyncError error) { | 133 onError((error) { |
| 141 cancel(); | 134 cancel(); |
| 142 result._setError(error); | 135 result._setError(error); |
| 143 }); | 136 }); |
| 144 | 137 |
| 145 return result; | 138 return result; |
| 146 } | 139 } |
| 147 } | 140 } |
| 148 | 141 |
| 149 | 142 |
| 150 /** | 143 /** |
| 151 * Abstract superclass for subscriptions that forward to other subscriptions. | 144 * Abstract superclass for subscriptions that forward to other subscriptions. |
| 152 */ | 145 */ |
| 153 class _ForwardingStreamSubscription<S, T> | 146 class _ForwardingStreamSubscription<S, T> |
| 154 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { | 147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
| 155 final _ForwardingStream<S, T> _stream; | 148 final _ForwardingStream<S, T> _stream; |
| 156 final bool _cancelOnError; | 149 final bool _cancelOnError; |
| 157 | 150 |
| 158 StreamSubscription<S> _subscription; | 151 StreamSubscription<S> _subscription; |
| 159 | 152 |
| 160 _ForwardingStreamSubscription(this._stream, | 153 _ForwardingStreamSubscription(this._stream, |
| 161 void onData(T data), | 154 void onData(T data), |
| 162 void onError(AsyncError error), | 155 void onError(error), |
| 163 void onDone(), | 156 void onDone(), |
| 164 this._cancelOnError) | 157 this._cancelOnError) |
| 165 : super(onData, onError, onDone) { | 158 : super(onData, onError, onDone) { |
| 166 // Don't unsubscribe on incoming error, only if we send an error forwards. | 159 // Don't unsubscribe on incoming error, only if we send an error forwards. |
| 167 _subscription = | 160 _subscription = |
| 168 _stream._source.listen(_handleData, | 161 _stream._source.listen(_handleData, |
| 169 onError: _handleError, | 162 onError: _handleError, |
| 170 onDone: _handleDone); | 163 onDone: _handleDone); |
| 171 } | 164 } |
| 172 | 165 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 188 _subscription = null; | 181 _subscription = null; |
| 189 } | 182 } |
| 190 } | 183 } |
| 191 | 184 |
| 192 // _EventOutputSink interface. Sends data to this subscription. | 185 // _EventOutputSink interface. Sends data to this subscription. |
| 193 | 186 |
| 194 void _sendData(T data) { | 187 void _sendData(T data) { |
| 195 _onData(data); | 188 _onData(data); |
| 196 } | 189 } |
| 197 | 190 |
| 198 void _sendError(AsyncError error) { | 191 void _sendError(error) { |
| 199 _onError(error); | 192 _onError(error); |
| 200 if (_cancelOnError) { | 193 if (_cancelOnError) { |
| 201 _subscription.cancel(); | 194 _subscription.cancel(); |
| 202 _subscription = null; | 195 _subscription = null; |
| 203 } | 196 } |
| 204 } | 197 } |
| 205 | 198 |
| 206 void _sendDone() { | 199 void _sendDone() { |
| 207 // If the transformation sends a done signal, we stop the subscription. | 200 // If the transformation sends a done signal, we stop the subscription. |
| 208 if (_subscription != null) { | 201 if (_subscription != null) { |
| 209 _subscription.cancel(); | 202 _subscription.cancel(); |
| 210 _subscription = null; | 203 _subscription = null; |
| 211 } | 204 } |
| 212 _onDone(); | 205 _onDone(); |
| 213 } | 206 } |
| 214 | 207 |
| 215 // Methods used as listener on source subscription. | 208 // Methods used as listener on source subscription. |
| 216 | 209 |
| 217 // TODO(ahe): Restore type when feature is implemented in dart2js | 210 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 218 // checked mode. http://dartbug.com/7733 | 211 // checked mode. http://dartbug.com/7733 |
| 219 void _handleData(/*S*/ data) { | 212 void _handleData(/*S*/ data) { |
| 220 _stream._handleData(data, this); | 213 _stream._handleData(data, this); |
| 221 } | 214 } |
| 222 | 215 |
| 223 void _handleError(AsyncError error) { | 216 void _handleError(error) { |
| 224 _stream._handleError(error, this); | 217 _stream._handleError(error, this); |
| 225 } | 218 } |
| 226 | 219 |
| 227 void _handleDone() { | 220 void _handleDone() { |
| 228 // On a done-event, we have already been unsubscribed. | 221 // On a done-event, we have already been unsubscribed. |
| 229 _subscription = null; | 222 _subscription = null; |
| 230 _stream._handleDone(this); | 223 _stream._handleDone(this); |
| 231 } | 224 } |
| 232 } | 225 } |
| 233 | 226 |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 297 } | 290 } |
| 298 } catch (e, s) { | 291 } catch (e, s) { |
| 299 // If either _expand or iterating the generated iterator throws, | 292 // If either _expand or iterating the generated iterator throws, |
| 300 // we abort the iteration. | 293 // we abort the iteration. |
| 301 sink._sendError(_asyncError(e, s)); | 294 sink._sendError(_asyncError(e, s)); |
| 302 } | 295 } |
| 303 } | 296 } |
| 304 } | 297 } |
| 305 | 298 |
| 306 | 299 |
| 307 typedef void _ErrorTransformation(AsyncError error); | 300 typedef void _ErrorTransformation(error); |
| 308 typedef bool _ErrorTest(error); | 301 typedef bool _ErrorTest(error); |
| 309 | 302 |
| 310 /** | 303 /** |
| 311 * A stream pipe that converts or disposes error events | 304 * A stream pipe that converts or disposes error events |
| 312 * before passing them on. | 305 * before passing them on. |
| 313 */ | 306 */ |
| 314 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 307 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 315 final _ErrorTransformation _transform; | 308 final _ErrorTransformation _transform; |
| 316 final _ErrorTest _test; | 309 final _ErrorTest _test; |
| 317 | 310 |
| 318 _HandleErrorStream(Stream<T> source, | 311 _HandleErrorStream(Stream<T> source, |
| 319 void transform(AsyncError event), | 312 void transform(event), |
| 320 bool test(error)) | 313 bool test(error)) |
| 321 : this._transform = transform, this._test = test, super(source); | 314 : this._transform = transform, this._test = test, super(source); |
| 322 | 315 |
| 323 void _handleError(AsyncError error, _EventOutputSink<T> sink) { | 316 void _handleError(Object error, _EventOutputSink<T> sink) { |
| 324 bool matches = true; | 317 bool matches = true; |
| 325 if (_test != null) { | 318 if (_test != null) { |
| 326 try { | 319 try { |
| 327 matches = _test(error.error); | 320 matches = _test(error); |
| 328 } catch (e, s) { | 321 } catch (e, s) { |
| 329 sink._sendError(_asyncError(e, s, error)); | 322 sink._sendError(_asyncError(e, s)); |
| 330 return; | 323 return; |
| 331 } | 324 } |
| 332 } | 325 } |
| 333 if (matches) { | 326 if (matches) { |
| 334 try { | 327 try { |
| 335 _transform(error); | 328 _transform(error); |
| 336 } catch (e, s) { | 329 } catch (e, s) { |
| 337 sink._sendError(_asyncError(e, s, error)); | 330 sink._sendError(_asyncError(e, s)); |
| 338 return; | 331 return; |
| 339 } | 332 } |
| 340 } else { | 333 } else { |
| 341 sink._sendError(error); | 334 sink._sendError(error); |
| 342 } | 335 } |
| 343 } | 336 } |
| 344 } | 337 } |
| 345 | 338 |
| 346 | 339 |
| 347 class _TakeStream<T> extends _ForwardingStream<T, T> { | 340 class _TakeStream<T> extends _ForwardingStream<T, T> { |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 469 sink._sendData(inputEvent); | 462 sink._sendData(inputEvent); |
| 470 _previous = inputEvent; | 463 _previous = inputEvent; |
| 471 } | 464 } |
| 472 } | 465 } |
| 473 } | 466 } |
| 474 } | 467 } |
| 475 | 468 |
| 476 // Stream transformations and event transformations. | 469 // Stream transformations and event transformations. |
| 477 | 470 |
| 478 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 471 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 479 typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink); | 472 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); |
| 480 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 473 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
| 481 | 474 |
| 482 /** Default data handler forwards all data. */ | 475 /** Default data handler forwards all data. */ |
| 483 void _defaultHandleData(var data, EventSink sink) { | 476 void _defaultHandleData(var data, EventSink sink) { |
| 484 sink.add(data); | 477 sink.add(data); |
| 485 } | 478 } |
| 486 | 479 |
| 487 /** Default error handler forwards all errors. */ | 480 /** Default error handler forwards all errors. */ |
| 488 void _defaultHandleError(AsyncError error, EventSink sink) { | 481 void _defaultHandleError(error, EventSink sink) { |
| 489 sink.addError(error); | 482 sink.addError(error); |
| 490 } | 483 } |
| 491 | 484 |
| 492 /** Default done handler forwards done. */ | 485 /** Default done handler forwards done. */ |
| 493 void _defaultHandleDone(EventSink sink) { | 486 void _defaultHandleDone(EventSink sink) { |
| 494 sink.close(); | 487 sink.close(); |
| 495 } | 488 } |
| 496 | 489 |
| 497 | 490 |
| 498 /** | 491 /** |
| 499 * A [StreamTransformer] that modifies stream events. | 492 * A [StreamTransformer] that modifies stream events. |
| 500 * | 493 * |
| 501 * This class is used by [StreamTransformer]'s factory constructor. | 494 * This class is used by [StreamTransformer]'s factory constructor. |
| 502 * It is actually an [StreamEventTransformer] where the functions used to | 495 * It is actually an [StreamEventTransformer] where the functions used to |
| 503 * modify the events are passed as constructor arguments. | 496 * modify the events are passed as constructor arguments. |
| 504 * | 497 * |
| 505 * If an argument is omitted, it acts as the default method from | 498 * If an argument is omitted, it acts as the default method from |
| 506 * [StreamEventTransformer]. | 499 * [StreamEventTransformer]. |
| 507 */ | 500 */ |
| 508 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { | 501 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { |
| 509 // TODO(ahe): Restore type when feature is implemented in dart2js | 502 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 510 // checked mode. http://dartbug.com/7733 | 503 // checked mode. http://dartbug.com/7733 |
| 511 final Function /*_TransformDataHandler<S, T>*/ _handleData; | 504 final Function /*_TransformDataHandler<S, T>*/ _handleData; |
| 512 final _TransformErrorHandler<T> _handleError; | 505 final _TransformErrorHandler<T> _handleError; |
| 513 final _TransformDoneHandler<T> _handleDone; | 506 final _TransformDoneHandler<T> _handleDone; |
| 514 | 507 |
| 515 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), | 508 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), |
| 516 void handleError(AsyncError data, EventSink<T> sink), | 509 void handleError(data, EventSink<T> sink), |
| 517 void handleDone(EventSink<T> sink)) | 510 void handleDone(EventSink<T> sink)) |
| 518 : this._handleData = (handleData == null ? _defaultHandleData | 511 : this._handleData = (handleData == null ? _defaultHandleData |
| 519 : handleData), | 512 : handleData), |
| 520 this._handleError = (handleError == null ? _defaultHandleError | 513 this._handleError = (handleError == null ? _defaultHandleError |
| 521 : handleError), | 514 : handleError), |
| 522 this._handleDone = (handleDone == null ? _defaultHandleDone | 515 this._handleDone = (handleDone == null ? _defaultHandleDone |
| 523 : handleDone); | 516 : handleDone); |
| 524 | 517 |
| 525 void handleData(S data, EventSink<T> sink) { | 518 void handleData(S data, EventSink<T> sink) { |
| 526 _handleData(data, sink); | 519 _handleData(data, sink); |
| 527 } | 520 } |
| 528 | 521 |
| 529 void handleError(AsyncError error, EventSink<T> sink) { | 522 void handleError(error, EventSink<T> sink) { |
| 530 _handleError(error, sink); | 523 _handleError(error, sink); |
| 531 } | 524 } |
| 532 | 525 |
| 533 void handleDone(EventSink<T> sink) { | 526 void handleDone(EventSink<T> sink) { |
| 534 _handleDone(sink); | 527 _handleDone(sink); |
| 535 } | 528 } |
| 536 } | 529 } |
| 537 | 530 |
| OLD | NEW |