Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(85)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 14251006: Remove AsyncError with Expando. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/html/dart2js/html_dart2js.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ 7 /**
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { 8 * Utility function to attach a stack trace to an [error] if it doesn't have
9 if (error is AsyncError) return error; 9 * one already.
10 if (cause == null) return new AsyncError(error, stackTrace); 10 */
11 return new AsyncError.withCause(error, stackTrace, cause); 11 _asyncError(Object error, Object stackTrace) {
12 if (stackTrace == null) return error;
13 if (getAttachedStackTrace(error) != null) return error;
14 _attachStackTrace(error, stackTrace);
15 return error;
12 } 16 }
13 17
14 /** Runs user code and takes actions depending on success or failure. */ 18 /** Runs user code and takes actions depending on success or failure. */
15 _runUserCode(userCode(), onSuccess(value), onError(AsyncError error), 19 _runUserCode(userCode(), onSuccess(value), onError(error)) {
16 { AsyncError cause }) {
17 var result;
18 try { 20 try {
19 result = userCode(); 21 onSuccess(userCode());
20 } on AsyncError catch (e) {
21 return onError(e);
22 } catch (e, s) { 22 } catch (e, s) {
23 if (cause == null) { 23 onError(_asyncError(e, s));
24 onError(new AsyncError(e, s));
25 } else {
26 onError(new AsyncError.withCause(e, s, cause));
27 }
28 // onError is allowed to return. Don't execute the onSuccess below.
29 return;
30 } 24 }
31 onSuccess(result);
32 } 25 }
33 26
34 /** Helper function to make an onError argument to [_runUserCode]. */ 27 /** Helper function to make an onError argument to [_runUserCode]. */
35 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => 28 _cancelAndError(StreamSubscription subscription, _FutureImpl future) =>
36 (AsyncError error) { 29 (error) {
37 subscription.cancel(); 30 subscription.cancel();
38 future._setError(error); 31 future._setError(error);
39 }; 32 };
40 33
41 34
42 /** 35 /**
43 * A [Stream] that forwards subscriptions to another stream. 36 * A [Stream] that forwards subscriptions to another stream.
44 * 37 *
45 * This stream implements [Stream], but forwards all subscriptions 38 * This stream implements [Stream], but forwards all subscriptions
46 * to an underlying stream, and wraps the returned subscription to 39 * to an underlying stream, and wraps the returned subscription to
47 * modify the events on the way. 40 * modify the events on the way.
48 * 41 *
49 * This class is intended for internal use only. 42 * This class is intended for internal use only.
50 */ 43 */
51 abstract class _ForwardingStream<S, T> extends Stream<T> { 44 abstract class _ForwardingStream<S, T> extends Stream<T> {
52 final Stream<S> _source; 45 final Stream<S> _source;
53 46
54 _ForwardingStream(this._source); 47 _ForwardingStream(this._source);
55 48
56 bool get isBroadcast => _source.isBroadcast; 49 bool get isBroadcast => _source.isBroadcast;
57 50
58 StreamSubscription<T> listen(void onData(T value), 51 StreamSubscription<T> listen(void onData(T value),
59 { void onError(AsyncError error), 52 { void onError(error),
60 void onDone(), 53 void onDone(),
61 bool cancelOnError }) { 54 bool cancelOnError }) {
62 if (onData == null) onData = _nullDataHandler; 55 if (onData == null) onData = _nullDataHandler;
63 if (onError == null) onError = _nullErrorHandler; 56 if (onError == null) onError = _nullErrorHandler;
64 if (onDone == null) onDone = _nullDoneHandler; 57 if (onDone == null) onDone = _nullDoneHandler;
65 cancelOnError = identical(true, cancelOnError); 58 cancelOnError = identical(true, cancelOnError);
66 return _createSubscription(onData, onError, onDone, cancelOnError); 59 return _createSubscription(onData, onError, onDone, cancelOnError);
67 } 60 }
68 61
69 StreamSubscription<T> _createSubscription(void onData(T value), 62 StreamSubscription<T> _createSubscription(void onData(T value),
70 void onError(AsyncError error), 63 void onError(error),
71 void onDone(), 64 void onDone(),
72 bool cancelOnError) { 65 bool cancelOnError) {
73 return new _ForwardingStreamSubscription<S, T>( 66 return new _ForwardingStreamSubscription<S, T>(
74 this, onData, onError, onDone, cancelOnError); 67 this, onData, onError, onDone, cancelOnError);
75 } 68 }
76 69
77 // Override the following methods in subclasses to change the behavior. 70 // Override the following methods in subclasses to change the behavior.
78 71
79 void _handleData(S data, _EventOutputSink<T> sink) { 72 void _handleData(S data, _EventOutputSink<T> sink) {
80 var outputData = data; 73 var outputData = data;
81 sink._sendData(outputData); 74 sink._sendData(outputData);
82 } 75 }
83 76
84 void _handleError(AsyncError error, _EventOutputSink<T> sink) { 77 void _handleError(error, _EventOutputSink<T> sink) {
85 sink._sendError(error); 78 sink._sendError(error);
86 } 79 }
87 80
88 void _handleDone(_EventOutputSink<T> sink) { 81 void _handleDone(_EventOutputSink<T> sink) {
89 sink._sendDone(); 82 sink._sendDone();
90 } 83 }
91 } 84 }
92 85
93 /** 86 /**
94 * Common behavior of [StreamSubscription] classes. 87 * Common behavior of [StreamSubscription] classes.
(...skipping 14 matching lines...) Expand all
109 if (_onError == null) _onError = _nullErrorHandler; 102 if (_onError == null) _onError = _nullErrorHandler;
110 if (_onDone == null) _onDone = _nullDoneHandler; 103 if (_onDone == null) _onDone = _nullDoneHandler;
111 } 104 }
112 105
113 // StreamSubscription interface. 106 // StreamSubscription interface.
114 void onData(void handleData(T event)) { 107 void onData(void handleData(T event)) {
115 if (handleData == null) handleData = _nullDataHandler; 108 if (handleData == null) handleData = _nullDataHandler;
116 _onData = handleData; 109 _onData = handleData;
117 } 110 }
118 111
119 void onError(void handleError(AsyncError error)) { 112 void onError(void handleError(error)) {
120 if (handleError == null) handleError = _nullErrorHandler; 113 if (handleError == null) handleError = _nullErrorHandler;
121 _onError = handleError; 114 _onError = handleError;
122 } 115 }
123 116
124 void onDone(void handleDone()) { 117 void onDone(void handleDone()) {
125 if (handleDone == null) handleDone = _nullDoneHandler; 118 if (handleDone == null) handleDone = _nullDoneHandler;
126 _onDone = handleDone; 119 _onDone = handleDone;
127 } 120 }
128 121
129 void pause([Future resumeSignal]); 122 void pause([Future resumeSignal]);
130 123
131 void resume(); 124 void resume();
132 125
133 void cancel(); 126 void cancel();
134 127
135 Future asFuture([var futureValue]) { 128 Future asFuture([var futureValue]) {
136 _FutureImpl<T> result = new _FutureImpl<T>(); 129 _FutureImpl<T> result = new _FutureImpl<T>();
137 130
138 // Overwrite the onDone and onError handlers. 131 // Overwrite the onDone and onError handlers.
139 onDone(() { result._setValue(futureValue); }); 132 onDone(() { result._setValue(futureValue); });
140 onError((AsyncError error) { 133 onError((error) {
141 cancel(); 134 cancel();
142 result._setError(error); 135 result._setError(error);
143 }); 136 });
144 137
145 return result; 138 return result;
146 } 139 }
147 } 140 }
148 141
149 142
150 /** 143 /**
151 * Abstract superclass for subscriptions that forward to other subscriptions. 144 * Abstract superclass for subscriptions that forward to other subscriptions.
152 */ 145 */
153 class _ForwardingStreamSubscription<S, T> 146 class _ForwardingStreamSubscription<S, T>
154 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { 147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
155 final _ForwardingStream<S, T> _stream; 148 final _ForwardingStream<S, T> _stream;
156 final bool _cancelOnError; 149 final bool _cancelOnError;
157 150
158 StreamSubscription<S> _subscription; 151 StreamSubscription<S> _subscription;
159 152
160 _ForwardingStreamSubscription(this._stream, 153 _ForwardingStreamSubscription(this._stream,
161 void onData(T data), 154 void onData(T data),
162 void onError(AsyncError error), 155 void onError(error),
163 void onDone(), 156 void onDone(),
164 this._cancelOnError) 157 this._cancelOnError)
165 : super(onData, onError, onDone) { 158 : super(onData, onError, onDone) {
166 // Don't unsubscribe on incoming error, only if we send an error forwards. 159 // Don't unsubscribe on incoming error, only if we send an error forwards.
167 _subscription = 160 _subscription =
168 _stream._source.listen(_handleData, 161 _stream._source.listen(_handleData,
169 onError: _handleError, 162 onError: _handleError,
170 onDone: _handleDone); 163 onDone: _handleDone);
171 } 164 }
172 165
(...skipping 15 matching lines...) Expand all
188 _subscription = null; 181 _subscription = null;
189 } 182 }
190 } 183 }
191 184
192 // _EventOutputSink interface. Sends data to this subscription. 185 // _EventOutputSink interface. Sends data to this subscription.
193 186
194 void _sendData(T data) { 187 void _sendData(T data) {
195 _onData(data); 188 _onData(data);
196 } 189 }
197 190
198 void _sendError(AsyncError error) { 191 void _sendError(error) {
199 _onError(error); 192 _onError(error);
200 if (_cancelOnError) { 193 if (_cancelOnError) {
201 _subscription.cancel(); 194 _subscription.cancel();
202 _subscription = null; 195 _subscription = null;
203 } 196 }
204 } 197 }
205 198
206 void _sendDone() { 199 void _sendDone() {
207 // If the transformation sends a done signal, we stop the subscription. 200 // If the transformation sends a done signal, we stop the subscription.
208 if (_subscription != null) { 201 if (_subscription != null) {
209 _subscription.cancel(); 202 _subscription.cancel();
210 _subscription = null; 203 _subscription = null;
211 } 204 }
212 _onDone(); 205 _onDone();
213 } 206 }
214 207
215 // Methods used as listener on source subscription. 208 // Methods used as listener on source subscription.
216 209
217 // TODO(ahe): Restore type when feature is implemented in dart2js 210 // TODO(ahe): Restore type when feature is implemented in dart2js
218 // checked mode. http://dartbug.com/7733 211 // checked mode. http://dartbug.com/7733
219 void _handleData(/*S*/ data) { 212 void _handleData(/*S*/ data) {
220 _stream._handleData(data, this); 213 _stream._handleData(data, this);
221 } 214 }
222 215
223 void _handleError(AsyncError error) { 216 void _handleError(error) {
224 _stream._handleError(error, this); 217 _stream._handleError(error, this);
225 } 218 }
226 219
227 void _handleDone() { 220 void _handleDone() {
228 // On a done-event, we have already been unsubscribed. 221 // On a done-event, we have already been unsubscribed.
229 _subscription = null; 222 _subscription = null;
230 _stream._handleDone(this); 223 _stream._handleDone(this);
231 } 224 }
232 } 225 }
233 226
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
297 } 290 }
298 } catch (e, s) { 291 } catch (e, s) {
299 // If either _expand or iterating the generated iterator throws, 292 // If either _expand or iterating the generated iterator throws,
300 // we abort the iteration. 293 // we abort the iteration.
301 sink._sendError(_asyncError(e, s)); 294 sink._sendError(_asyncError(e, s));
302 } 295 }
303 } 296 }
304 } 297 }
305 298
306 299
307 typedef void _ErrorTransformation(AsyncError error); 300 typedef void _ErrorTransformation(error);
308 typedef bool _ErrorTest(error); 301 typedef bool _ErrorTest(error);
309 302
310 /** 303 /**
311 * A stream pipe that converts or disposes error events 304 * A stream pipe that converts or disposes error events
312 * before passing them on. 305 * before passing them on.
313 */ 306 */
314 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 307 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
315 final _ErrorTransformation _transform; 308 final _ErrorTransformation _transform;
316 final _ErrorTest _test; 309 final _ErrorTest _test;
317 310
318 _HandleErrorStream(Stream<T> source, 311 _HandleErrorStream(Stream<T> source,
319 void transform(AsyncError event), 312 void transform(event),
320 bool test(error)) 313 bool test(error))
321 : this._transform = transform, this._test = test, super(source); 314 : this._transform = transform, this._test = test, super(source);
322 315
323 void _handleError(AsyncError error, _EventOutputSink<T> sink) { 316 void _handleError(Object error, _EventOutputSink<T> sink) {
324 bool matches = true; 317 bool matches = true;
325 if (_test != null) { 318 if (_test != null) {
326 try { 319 try {
327 matches = _test(error.error); 320 matches = _test(error);
328 } catch (e, s) { 321 } catch (e, s) {
329 sink._sendError(_asyncError(e, s, error)); 322 sink._sendError(_asyncError(e, s));
330 return; 323 return;
331 } 324 }
332 } 325 }
333 if (matches) { 326 if (matches) {
334 try { 327 try {
335 _transform(error); 328 _transform(error);
336 } catch (e, s) { 329 } catch (e, s) {
337 sink._sendError(_asyncError(e, s, error)); 330 sink._sendError(_asyncError(e, s));
338 return; 331 return;
339 } 332 }
340 } else { 333 } else {
341 sink._sendError(error); 334 sink._sendError(error);
342 } 335 }
343 } 336 }
344 } 337 }
345 338
346 339
347 class _TakeStream<T> extends _ForwardingStream<T, T> { 340 class _TakeStream<T> extends _ForwardingStream<T, T> {
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
469 sink._sendData(inputEvent); 462 sink._sendData(inputEvent);
470 _previous = inputEvent; 463 _previous = inputEvent;
471 } 464 }
472 } 465 }
473 } 466 }
474 } 467 }
475 468
476 // Stream transformations and event transformations. 469 // Stream transformations and event transformations.
477 470
478 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); 471 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
479 typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink); 472 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink);
480 typedef void _TransformDoneHandler<T>(EventSink<T> sink); 473 typedef void _TransformDoneHandler<T>(EventSink<T> sink);
481 474
482 /** Default data handler forwards all data. */ 475 /** Default data handler forwards all data. */
483 void _defaultHandleData(var data, EventSink sink) { 476 void _defaultHandleData(var data, EventSink sink) {
484 sink.add(data); 477 sink.add(data);
485 } 478 }
486 479
487 /** Default error handler forwards all errors. */ 480 /** Default error handler forwards all errors. */
488 void _defaultHandleError(AsyncError error, EventSink sink) { 481 void _defaultHandleError(error, EventSink sink) {
489 sink.addError(error); 482 sink.addError(error);
490 } 483 }
491 484
492 /** Default done handler forwards done. */ 485 /** Default done handler forwards done. */
493 void _defaultHandleDone(EventSink sink) { 486 void _defaultHandleDone(EventSink sink) {
494 sink.close(); 487 sink.close();
495 } 488 }
496 489
497 490
498 /** 491 /**
499 * A [StreamTransformer] that modifies stream events. 492 * A [StreamTransformer] that modifies stream events.
500 * 493 *
501 * This class is used by [StreamTransformer]'s factory constructor. 494 * This class is used by [StreamTransformer]'s factory constructor.
502 * It is actually an [StreamEventTransformer] where the functions used to 495 * It is actually an [StreamEventTransformer] where the functions used to
503 * modify the events are passed as constructor arguments. 496 * modify the events are passed as constructor arguments.
504 * 497 *
505 * If an argument is omitted, it acts as the default method from 498 * If an argument is omitted, it acts as the default method from
506 * [StreamEventTransformer]. 499 * [StreamEventTransformer].
507 */ 500 */
508 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { 501 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
509 // TODO(ahe): Restore type when feature is implemented in dart2js 502 // TODO(ahe): Restore type when feature is implemented in dart2js
510 // checked mode. http://dartbug.com/7733 503 // checked mode. http://dartbug.com/7733
511 final Function /*_TransformDataHandler<S, T>*/ _handleData; 504 final Function /*_TransformDataHandler<S, T>*/ _handleData;
512 final _TransformErrorHandler<T> _handleError; 505 final _TransformErrorHandler<T> _handleError;
513 final _TransformDoneHandler<T> _handleDone; 506 final _TransformDoneHandler<T> _handleDone;
514 507
515 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), 508 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink),
516 void handleError(AsyncError data, EventSink<T> sink), 509 void handleError(data, EventSink<T> sink),
517 void handleDone(EventSink<T> sink)) 510 void handleDone(EventSink<T> sink))
518 : this._handleData = (handleData == null ? _defaultHandleData 511 : this._handleData = (handleData == null ? _defaultHandleData
519 : handleData), 512 : handleData),
520 this._handleError = (handleError == null ? _defaultHandleError 513 this._handleError = (handleError == null ? _defaultHandleError
521 : handleError), 514 : handleError),
522 this._handleDone = (handleDone == null ? _defaultHandleDone 515 this._handleDone = (handleDone == null ? _defaultHandleDone
523 : handleDone); 516 : handleDone);
524 517
525 void handleData(S data, EventSink<T> sink) { 518 void handleData(S data, EventSink<T> sink) {
526 _handleData(data, sink); 519 _handleData(data, sink);
527 } 520 }
528 521
529 void handleError(AsyncError error, EventSink<T> sink) { 522 void handleError(error, EventSink<T> sink) {
530 _handleError(error, sink); 523 _handleError(error, sink);
531 } 524 }
532 525
533 void handleDone(EventSink<T> sink) { 526 void handleDone(EventSink<T> sink) {
534 _handleDone(sink); 527 _handleDone(sink);
535 } 528 }
536 } 529 }
537 530
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/html/dart2js/html_dart2js.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698