| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ |
| 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { |
| 9 if (error is AsyncError) return error; | 9 if (error is AsyncError) return error; |
| 10 if (cause == null) return new AsyncError(error, stackTrace); | 10 if (cause == null) return new AsyncError(error, stackTrace); |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 51 abstract class _ForwardingStream<S, T> extends Stream<T> { | 51 abstract class _ForwardingStream<S, T> extends Stream<T> { |
| 52 final Stream<S> _source; | 52 final Stream<S> _source; |
| 53 | 53 |
| 54 _ForwardingStream(this._source); | 54 _ForwardingStream(this._source); |
| 55 | 55 |
| 56 bool get isBroadcast => _source.isBroadcast; | 56 bool get isBroadcast => _source.isBroadcast; |
| 57 | 57 |
| 58 StreamSubscription<T> listen(void onData(T value), | 58 StreamSubscription<T> listen(void onData(T value), |
| 59 { void onError(AsyncError error), | 59 { void onError(AsyncError error), |
| 60 void onDone(), | 60 void onDone(), |
| 61 bool unsubscribeOnError }) { | 61 bool cancelOnError }) { |
| 62 if (onData == null) onData = _nullDataHandler; | 62 if (onData == null) onData = _nullDataHandler; |
| 63 if (onError == null) onError = _nullErrorHandler; | 63 if (onError == null) onError = _nullErrorHandler; |
| 64 if (onDone == null) onDone = _nullDoneHandler; | 64 if (onDone == null) onDone = _nullDoneHandler; |
| 65 unsubscribeOnError = identical(true, unsubscribeOnError); | 65 cancelOnError = identical(true, cancelOnError); |
| 66 return _createSubscription(onData, onError, onDone, unsubscribeOnError); | 66 return _createSubscription(onData, onError, onDone, cancelOnError); |
| 67 } | 67 } |
| 68 | 68 |
| 69 StreamSubscription<T> _createSubscription(void onData(T value), | 69 StreamSubscription<T> _createSubscription(void onData(T value), |
| 70 void onError(AsyncError error), | 70 void onError(AsyncError error), |
| 71 void onDone(), | 71 void onDone(), |
| 72 bool unsubscribeOnError) { | 72 bool cancelOnError) { |
| 73 return new _ForwardingStreamSubscription<S, T>( | 73 return new _ForwardingStreamSubscription<S, T>( |
| 74 this, onData, onError, onDone, unsubscribeOnError); | 74 this, onData, onError, onDone, cancelOnError); |
| 75 } | 75 } |
| 76 | 76 |
| 77 // Override the following methods in subclasses to change the behavior. | 77 // Override the following methods in subclasses to change the behavior. |
| 78 | 78 |
| 79 void _handleData(S data, _EventOutputSink<T> sink) { | 79 void _handleData(S data, _EventOutputSink<T> sink) { |
| 80 var outputData = data; | 80 var outputData = data; |
| 81 sink._sendData(outputData); | 81 sink._sendData(outputData); |
| 82 } | 82 } |
| 83 | 83 |
| 84 void _handleError(AsyncError error, _EventOutputSink<T> sink) { | 84 void _handleError(AsyncError error, _EventOutputSink<T> sink) { |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 146 } | 146 } |
| 147 } | 147 } |
| 148 | 148 |
| 149 | 149 |
| 150 /** | 150 /** |
| 151 * Abstract superclass for subscriptions that forward to other subscriptions. | 151 * Abstract superclass for subscriptions that forward to other subscriptions. |
| 152 */ | 152 */ |
| 153 class _ForwardingStreamSubscription<S, T> | 153 class _ForwardingStreamSubscription<S, T> |
| 154 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { | 154 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
| 155 final _ForwardingStream<S, T> _stream; | 155 final _ForwardingStream<S, T> _stream; |
| 156 final bool _unsubscribeOnError; | 156 final bool _cancelOnError; |
| 157 | 157 |
| 158 StreamSubscription<S> _subscription; | 158 StreamSubscription<S> _subscription; |
| 159 | 159 |
| 160 _ForwardingStreamSubscription(this._stream, | 160 _ForwardingStreamSubscription(this._stream, |
| 161 void onData(T data), | 161 void onData(T data), |
| 162 void onError(AsyncError error), | 162 void onError(AsyncError error), |
| 163 void onDone(), | 163 void onDone(), |
| 164 this._unsubscribeOnError) | 164 this._cancelOnError) |
| 165 : super(onData, onError, onDone) { | 165 : super(onData, onError, onDone) { |
| 166 // Don't unsubscribe on incoming error, only if we send an error forwards. | 166 // Don't unsubscribe on incoming error, only if we send an error forwards. |
| 167 _subscription = | 167 _subscription = |
| 168 _stream._source.listen(_handleData, | 168 _stream._source.listen(_handleData, |
| 169 onError: _handleError, | 169 onError: _handleError, |
| 170 onDone: _handleDone); | 170 onDone: _handleDone); |
| 171 } | 171 } |
| 172 | 172 |
| 173 // StreamSubscription interface. | 173 // StreamSubscription interface. |
| 174 | 174 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 190 } | 190 } |
| 191 | 191 |
| 192 // _EventOutputSink interface. Sends data to this subscription. | 192 // _EventOutputSink interface. Sends data to this subscription. |
| 193 | 193 |
| 194 void _sendData(T data) { | 194 void _sendData(T data) { |
| 195 _onData(data); | 195 _onData(data); |
| 196 } | 196 } |
| 197 | 197 |
| 198 void _sendError(AsyncError error) { | 198 void _sendError(AsyncError error) { |
| 199 _onError(error); | 199 _onError(error); |
| 200 if (_unsubscribeOnError) { | 200 if (_cancelOnError) { |
| 201 _subscription.cancel(); | 201 _subscription.cancel(); |
| 202 _subscription = null; | 202 _subscription = null; |
| 203 } | 203 } |
| 204 } | 204 } |
| 205 | 205 |
| 206 void _sendDone() { | 206 void _sendDone() { |
| 207 // If the transformation sends a done signal, we stop the subscription. | 207 // If the transformation sends a done signal, we stop the subscription. |
| 208 if (_subscription != null) { | 208 if (_subscription != null) { |
| 209 _subscription.cancel(); | 209 _subscription.cancel(); |
| 210 _subscription = null; | 210 _subscription = null; |
| (...skipping 317 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 528 | 528 |
| 529 void handleError(AsyncError error, EventSink<T> sink) { | 529 void handleError(AsyncError error, EventSink<T> sink) { |
| 530 _handleError(error, sink); | 530 _handleError(error, sink); |
| 531 } | 531 } |
| 532 | 532 |
| 533 void handleDone(EventSink<T> sink) { | 533 void handleDone(EventSink<T> sink) { |
| 534 _handleDone(sink); | 534 _handleDone(sink); |
| 535 } | 535 } |
| 536 } | 536 } |
| 537 | 537 |
| OLD | NEW |