| OLD | NEW | 
|---|
| 1 // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file | 
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a | 
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. | 
| 4 | 4 | 
| 5 part of dart.async; | 5 part of dart.async; | 
| 6 | 6 | 
| 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 
| 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 
| 9   if (error is AsyncError) return error; | 9   if (error is AsyncError) return error; | 
| 10   if (cause == null) return new AsyncError(error, stackTrace); | 10   if (cause == null) return new AsyncError(error, stackTrace); | 
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 51 abstract class _ForwardingStream<S, T> extends Stream<T> { | 51 abstract class _ForwardingStream<S, T> extends Stream<T> { | 
| 52   final Stream<S> _source; | 52   final Stream<S> _source; | 
| 53 | 53 | 
| 54   _ForwardingStream(this._source); | 54   _ForwardingStream(this._source); | 
| 55 | 55 | 
| 56   bool get isBroadcast => _source.isBroadcast; | 56   bool get isBroadcast => _source.isBroadcast; | 
| 57 | 57 | 
| 58   StreamSubscription<T> listen(void onData(T value), | 58   StreamSubscription<T> listen(void onData(T value), | 
| 59                               { void onError(AsyncError error), | 59                               { void onError(AsyncError error), | 
| 60                                 void onDone(), | 60                                 void onDone(), | 
| 61                                 bool unsubscribeOnError }) { | 61                                 bool cancelOnError }) { | 
| 62     if (onData == null) onData = _nullDataHandler; | 62     if (onData == null) onData = _nullDataHandler; | 
| 63     if (onError == null) onError = _nullErrorHandler; | 63     if (onError == null) onError = _nullErrorHandler; | 
| 64     if (onDone == null) onDone = _nullDoneHandler; | 64     if (onDone == null) onDone = _nullDoneHandler; | 
| 65     unsubscribeOnError = identical(true, unsubscribeOnError); | 65     cancelOnError = identical(true, cancelOnError); | 
| 66     return _createSubscription(onData, onError, onDone, unsubscribeOnError); | 66     return _createSubscription(onData, onError, onDone, cancelOnError); | 
| 67   } | 67   } | 
| 68 | 68 | 
| 69   StreamSubscription<T> _createSubscription(void onData(T value), | 69   StreamSubscription<T> _createSubscription(void onData(T value), | 
| 70                                             void onError(AsyncError error), | 70                                             void onError(AsyncError error), | 
| 71                                             void onDone(), | 71                                             void onDone(), | 
| 72                                             bool unsubscribeOnError) { | 72                                             bool cancelOnError) { | 
| 73     return new _ForwardingStreamSubscription<S, T>( | 73     return new _ForwardingStreamSubscription<S, T>( | 
| 74         this, onData, onError, onDone, unsubscribeOnError); | 74         this, onData, onError, onDone, cancelOnError); | 
| 75   } | 75   } | 
| 76 | 76 | 
| 77   // Override the following methods in subclasses to change the behavior. | 77   // Override the following methods in subclasses to change the behavior. | 
| 78 | 78 | 
| 79   void _handleData(S data, _EventOutputSink<T> sink) { | 79   void _handleData(S data, _EventOutputSink<T> sink) { | 
| 80     var outputData = data; | 80     var outputData = data; | 
| 81     sink._sendData(outputData); | 81     sink._sendData(outputData); | 
| 82   } | 82   } | 
| 83 | 83 | 
| 84   void _handleError(AsyncError error, _EventOutputSink<T> sink) { | 84   void _handleError(AsyncError error, _EventOutputSink<T> sink) { | 
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 146   } | 146   } | 
| 147 } | 147 } | 
| 148 | 148 | 
| 149 | 149 | 
| 150 /** | 150 /** | 
| 151  * Abstract superclass for subscriptions that forward to other subscriptions. | 151  * Abstract superclass for subscriptions that forward to other subscriptions. | 
| 152  */ | 152  */ | 
| 153 class _ForwardingStreamSubscription<S, T> | 153 class _ForwardingStreamSubscription<S, T> | 
| 154     extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { | 154     extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { | 
| 155   final _ForwardingStream<S, T> _stream; | 155   final _ForwardingStream<S, T> _stream; | 
| 156   final bool _unsubscribeOnError; | 156   final bool _cancelOnError; | 
| 157 | 157 | 
| 158   StreamSubscription<S> _subscription; | 158   StreamSubscription<S> _subscription; | 
| 159 | 159 | 
| 160   _ForwardingStreamSubscription(this._stream, | 160   _ForwardingStreamSubscription(this._stream, | 
| 161                                 void onData(T data), | 161                                 void onData(T data), | 
| 162                                 void onError(AsyncError error), | 162                                 void onError(AsyncError error), | 
| 163                                 void onDone(), | 163                                 void onDone(), | 
| 164                                 this._unsubscribeOnError) | 164                                 this._cancelOnError) | 
| 165       : super(onData, onError, onDone) { | 165       : super(onData, onError, onDone) { | 
| 166     // Don't unsubscribe on incoming error, only if we send an error forwards. | 166     // Don't unsubscribe on incoming error, only if we send an error forwards. | 
| 167     _subscription = | 167     _subscription = | 
| 168         _stream._source.listen(_handleData, | 168         _stream._source.listen(_handleData, | 
| 169                                onError: _handleError, | 169                                onError: _handleError, | 
| 170                                onDone: _handleDone); | 170                                onDone: _handleDone); | 
| 171   } | 171   } | 
| 172 | 172 | 
| 173   // StreamSubscription interface. | 173   // StreamSubscription interface. | 
| 174 | 174 | 
| (...skipping 15 matching lines...) Expand all  Loading... | 
| 190   } | 190   } | 
| 191 | 191 | 
| 192   // _EventOutputSink interface. Sends data to this subscription. | 192   // _EventOutputSink interface. Sends data to this subscription. | 
| 193 | 193 | 
| 194   void _sendData(T data) { | 194   void _sendData(T data) { | 
| 195     _onData(data); | 195     _onData(data); | 
| 196   } | 196   } | 
| 197 | 197 | 
| 198   void _sendError(AsyncError error) { | 198   void _sendError(AsyncError error) { | 
| 199     _onError(error); | 199     _onError(error); | 
| 200     if (_unsubscribeOnError) { | 200     if (_cancelOnError) { | 
| 201       _subscription.cancel(); | 201       _subscription.cancel(); | 
| 202       _subscription = null; | 202       _subscription = null; | 
| 203     } | 203     } | 
| 204   } | 204   } | 
| 205 | 205 | 
| 206   void _sendDone() { | 206   void _sendDone() { | 
| 207     // If the transformation sends a done signal, we stop the subscription. | 207     // If the transformation sends a done signal, we stop the subscription. | 
| 208     if (_subscription != null) { | 208     if (_subscription != null) { | 
| 209       _subscription.cancel(); | 209       _subscription.cancel(); | 
| 210       _subscription = null; | 210       _subscription = null; | 
| (...skipping 317 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 528 | 528 | 
| 529   void handleError(AsyncError error, EventSink<T> sink) { | 529   void handleError(AsyncError error, EventSink<T> sink) { | 
| 530     _handleError(error, sink); | 530     _handleError(error, sink); | 
| 531   } | 531   } | 
| 532 | 532 | 
| 533   void handleDone(EventSink<T> sink) { | 533   void handleDone(EventSink<T> sink) { | 
| 534     _handleDone(sink); | 534     _handleDone(sink); | 
| 535   } | 535   } | 
| 536 } | 536 } | 
| 537 | 537 | 
| OLD | NEW | 
|---|