Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(209)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 14251013: Rename unsubscribeOnError to cancelOnError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) {
9 if (error is AsyncError) return error; 9 if (error is AsyncError) return error;
10 if (cause == null) return new AsyncError(error, stackTrace); 10 if (cause == null) return new AsyncError(error, stackTrace);
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
51 abstract class _ForwardingStream<S, T> extends Stream<T> { 51 abstract class _ForwardingStream<S, T> extends Stream<T> {
52 final Stream<S> _source; 52 final Stream<S> _source;
53 53
54 _ForwardingStream(this._source); 54 _ForwardingStream(this._source);
55 55
56 bool get isBroadcast => _source.isBroadcast; 56 bool get isBroadcast => _source.isBroadcast;
57 57
58 StreamSubscription<T> listen(void onData(T value), 58 StreamSubscription<T> listen(void onData(T value),
59 { void onError(AsyncError error), 59 { void onError(AsyncError error),
60 void onDone(), 60 void onDone(),
61 bool unsubscribeOnError }) { 61 bool cancelOnError }) {
62 if (onData == null) onData = _nullDataHandler; 62 if (onData == null) onData = _nullDataHandler;
63 if (onError == null) onError = _nullErrorHandler; 63 if (onError == null) onError = _nullErrorHandler;
64 if (onDone == null) onDone = _nullDoneHandler; 64 if (onDone == null) onDone = _nullDoneHandler;
65 unsubscribeOnError = identical(true, unsubscribeOnError); 65 cancelOnError = identical(true, cancelOnError);
66 return _createSubscription(onData, onError, onDone, unsubscribeOnError); 66 return _createSubscription(onData, onError, onDone, cancelOnError);
67 } 67 }
68 68
69 StreamSubscription<T> _createSubscription(void onData(T value), 69 StreamSubscription<T> _createSubscription(void onData(T value),
70 void onError(AsyncError error), 70 void onError(AsyncError error),
71 void onDone(), 71 void onDone(),
72 bool unsubscribeOnError) { 72 bool cancelOnError) {
73 return new _ForwardingStreamSubscription<S, T>( 73 return new _ForwardingStreamSubscription<S, T>(
74 this, onData, onError, onDone, unsubscribeOnError); 74 this, onData, onError, onDone, cancelOnError);
75 } 75 }
76 76
77 // Override the following methods in subclasses to change the behavior. 77 // Override the following methods in subclasses to change the behavior.
78 78
79 void _handleData(S data, _EventOutputSink<T> sink) { 79 void _handleData(S data, _EventOutputSink<T> sink) {
80 var outputData = data; 80 var outputData = data;
81 sink._sendData(outputData); 81 sink._sendData(outputData);
82 } 82 }
83 83
84 void _handleError(AsyncError error, _EventOutputSink<T> sink) { 84 void _handleError(AsyncError error, _EventOutputSink<T> sink) {
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
146 } 146 }
147 } 147 }
148 148
149 149
150 /** 150 /**
151 * Abstract superclass for subscriptions that forward to other subscriptions. 151 * Abstract superclass for subscriptions that forward to other subscriptions.
152 */ 152 */
153 class _ForwardingStreamSubscription<S, T> 153 class _ForwardingStreamSubscription<S, T>
154 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { 154 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
155 final _ForwardingStream<S, T> _stream; 155 final _ForwardingStream<S, T> _stream;
156 final bool _unsubscribeOnError; 156 final bool _cancelOnError;
157 157
158 StreamSubscription<S> _subscription; 158 StreamSubscription<S> _subscription;
159 159
160 _ForwardingStreamSubscription(this._stream, 160 _ForwardingStreamSubscription(this._stream,
161 void onData(T data), 161 void onData(T data),
162 void onError(AsyncError error), 162 void onError(AsyncError error),
163 void onDone(), 163 void onDone(),
164 this._unsubscribeOnError) 164 this._cancelOnError)
165 : super(onData, onError, onDone) { 165 : super(onData, onError, onDone) {
166 // Don't unsubscribe on incoming error, only if we send an error forwards. 166 // Don't unsubscribe on incoming error, only if we send an error forwards.
167 _subscription = 167 _subscription =
168 _stream._source.listen(_handleData, 168 _stream._source.listen(_handleData,
169 onError: _handleError, 169 onError: _handleError,
170 onDone: _handleDone); 170 onDone: _handleDone);
171 } 171 }
172 172
173 // StreamSubscription interface. 173 // StreamSubscription interface.
174 174
(...skipping 15 matching lines...) Expand all
190 } 190 }
191 191
192 // _EventOutputSink interface. Sends data to this subscription. 192 // _EventOutputSink interface. Sends data to this subscription.
193 193
194 void _sendData(T data) { 194 void _sendData(T data) {
195 _onData(data); 195 _onData(data);
196 } 196 }
197 197
198 void _sendError(AsyncError error) { 198 void _sendError(AsyncError error) {
199 _onError(error); 199 _onError(error);
200 if (_unsubscribeOnError) { 200 if (_cancelOnError) {
201 _subscription.cancel(); 201 _subscription.cancel();
202 _subscription = null; 202 _subscription = null;
203 } 203 }
204 } 204 }
205 205
206 void _sendDone() { 206 void _sendDone() {
207 // If the transformation sends a done signal, we stop the subscription. 207 // If the transformation sends a done signal, we stop the subscription.
208 if (_subscription != null) { 208 if (_subscription != null) {
209 _subscription.cancel(); 209 _subscription.cancel();
210 _subscription = null; 210 _subscription = null;
(...skipping 317 matching lines...) Expand 10 before | Expand all | Expand 10 after
528 528
529 void handleError(AsyncError error, EventSink<T> sink) { 529 void handleError(AsyncError error, EventSink<T> sink) {
530 _handleError(error, sink); 530 _handleError(error, sink);
531 } 531 }
532 532
533 void handleDone(EventSink<T> sink) { 533 void handleDone(EventSink<T> sink) {
534 _handleDone(sink); 534 _handleDone(sink);
535 } 535 }
536 } 536 }
537 537
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698