OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ | 7 /** |
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { | 8 * Utility function to attach a stack trace to an [error] if it doesn't have |
9 if (error is AsyncError) return error; | 9 * one already. |
10 if (cause == null) return new AsyncError(error, stackTrace); | 10 */ |
11 return new AsyncError.withCause(error, stackTrace, cause); | 11 _asyncError(Object error, Object stackTrace) { |
| 12 if (stackTrace == null) return error; |
| 13 if (getAttachedStackTrace(error) != null) return error; |
| 14 _attachStackTrace(error, stackTrace); |
| 15 return error; |
12 } | 16 } |
13 | 17 |
14 /** Runs user code and takes actions depending on success or failure. */ | 18 /** Runs user code and takes actions depending on success or failure. */ |
15 _runUserCode(userCode(), onSuccess(value), onError(AsyncError error), | 19 _runUserCode(userCode(), onSuccess(value), onError(error)) { |
16 { AsyncError cause }) { | |
17 var result; | |
18 try { | 20 try { |
19 result = userCode(); | 21 onSuccess(userCode()); |
20 } on AsyncError catch (e) { | |
21 return onError(e); | |
22 } catch (e, s) { | 22 } catch (e, s) { |
23 if (cause == null) { | 23 onError(_asyncError(e, s)); |
24 onError(new AsyncError(e, s)); | |
25 } else { | |
26 onError(new AsyncError.withCause(e, s, cause)); | |
27 } | |
28 // onError is allowed to return. Don't execute the onSuccess below. | |
29 return; | |
30 } | 24 } |
31 onSuccess(result); | |
32 } | 25 } |
33 | 26 |
34 /** Helper function to make an onError argument to [_runUserCode]. */ | 27 /** Helper function to make an onError argument to [_runUserCode]. */ |
35 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => | 28 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => |
36 (AsyncError error) { | 29 (error) { |
37 subscription.cancel(); | 30 subscription.cancel(); |
38 future._setError(error); | 31 future._setError(error); |
39 }; | 32 }; |
40 | 33 |
41 | 34 |
42 /** | 35 /** |
43 * A [Stream] that forwards subscriptions to another stream. | 36 * A [Stream] that forwards subscriptions to another stream. |
44 * | 37 * |
45 * This stream implements [Stream], but forwards all subscriptions | 38 * This stream implements [Stream], but forwards all subscriptions |
46 * to an underlying stream, and wraps the returned subscription to | 39 * to an underlying stream, and wraps the returned subscription to |
47 * modify the events on the way. | 40 * modify the events on the way. |
48 * | 41 * |
49 * This class is intended for internal use only. | 42 * This class is intended for internal use only. |
50 */ | 43 */ |
51 abstract class _ForwardingStream<S, T> extends Stream<T> { | 44 abstract class _ForwardingStream<S, T> extends Stream<T> { |
52 final Stream<S> _source; | 45 final Stream<S> _source; |
53 | 46 |
54 _ForwardingStream(this._source); | 47 _ForwardingStream(this._source); |
55 | 48 |
56 bool get isBroadcast => _source.isBroadcast; | 49 bool get isBroadcast => _source.isBroadcast; |
57 | 50 |
58 StreamSubscription<T> listen(void onData(T value), | 51 StreamSubscription<T> listen(void onData(T value), |
59 { void onError(AsyncError error), | 52 { void onError(error), |
60 void onDone(), | 53 void onDone(), |
61 bool cancelOnError }) { | 54 bool cancelOnError }) { |
62 if (onData == null) onData = _nullDataHandler; | 55 if (onData == null) onData = _nullDataHandler; |
63 if (onError == null) onError = _nullErrorHandler; | 56 if (onError == null) onError = _nullErrorHandler; |
64 if (onDone == null) onDone = _nullDoneHandler; | 57 if (onDone == null) onDone = _nullDoneHandler; |
65 cancelOnError = identical(true, cancelOnError); | 58 cancelOnError = identical(true, cancelOnError); |
66 return _createSubscription(onData, onError, onDone, cancelOnError); | 59 return _createSubscription(onData, onError, onDone, cancelOnError); |
67 } | 60 } |
68 | 61 |
69 StreamSubscription<T> _createSubscription(void onData(T value), | 62 StreamSubscription<T> _createSubscription(void onData(T value), |
70 void onError(AsyncError error), | 63 void onError(error), |
71 void onDone(), | 64 void onDone(), |
72 bool cancelOnError) { | 65 bool cancelOnError) { |
73 return new _ForwardingStreamSubscription<S, T>( | 66 return new _ForwardingStreamSubscription<S, T>( |
74 this, onData, onError, onDone, cancelOnError); | 67 this, onData, onError, onDone, cancelOnError); |
75 } | 68 } |
76 | 69 |
77 // Override the following methods in subclasses to change the behavior. | 70 // Override the following methods in subclasses to change the behavior. |
78 | 71 |
79 void _handleData(S data, _EventOutputSink<T> sink) { | 72 void _handleData(S data, _EventOutputSink<T> sink) { |
80 var outputData = data; | 73 var outputData = data; |
81 sink._sendData(outputData); | 74 sink._sendData(outputData); |
82 } | 75 } |
83 | 76 |
84 void _handleError(AsyncError error, _EventOutputSink<T> sink) { | 77 void _handleError(error, _EventOutputSink<T> sink) { |
85 sink._sendError(error); | 78 sink._sendError(error); |
86 } | 79 } |
87 | 80 |
88 void _handleDone(_EventOutputSink<T> sink) { | 81 void _handleDone(_EventOutputSink<T> sink) { |
89 sink._sendDone(); | 82 sink._sendDone(); |
90 } | 83 } |
91 } | 84 } |
92 | 85 |
93 /** | 86 /** |
94 * Common behavior of [StreamSubscription] classes. | 87 * Common behavior of [StreamSubscription] classes. |
(...skipping 14 matching lines...) Expand all Loading... |
109 if (_onError == null) _onError = _nullErrorHandler; | 102 if (_onError == null) _onError = _nullErrorHandler; |
110 if (_onDone == null) _onDone = _nullDoneHandler; | 103 if (_onDone == null) _onDone = _nullDoneHandler; |
111 } | 104 } |
112 | 105 |
113 // StreamSubscription interface. | 106 // StreamSubscription interface. |
114 void onData(void handleData(T event)) { | 107 void onData(void handleData(T event)) { |
115 if (handleData == null) handleData = _nullDataHandler; | 108 if (handleData == null) handleData = _nullDataHandler; |
116 _onData = handleData; | 109 _onData = handleData; |
117 } | 110 } |
118 | 111 |
119 void onError(void handleError(AsyncError error)) { | 112 void onError(void handleError(error)) { |
120 if (handleError == null) handleError = _nullErrorHandler; | 113 if (handleError == null) handleError = _nullErrorHandler; |
121 _onError = handleError; | 114 _onError = handleError; |
122 } | 115 } |
123 | 116 |
124 void onDone(void handleDone()) { | 117 void onDone(void handleDone()) { |
125 if (handleDone == null) handleDone = _nullDoneHandler; | 118 if (handleDone == null) handleDone = _nullDoneHandler; |
126 _onDone = handleDone; | 119 _onDone = handleDone; |
127 } | 120 } |
128 | 121 |
129 void pause([Future resumeSignal]); | 122 void pause([Future resumeSignal]); |
130 | 123 |
131 void resume(); | 124 void resume(); |
132 | 125 |
133 void cancel(); | 126 void cancel(); |
134 | 127 |
135 Future asFuture([var futureValue]) { | 128 Future asFuture([var futureValue]) { |
136 _FutureImpl<T> result = new _FutureImpl<T>(); | 129 _FutureImpl<T> result = new _FutureImpl<T>(); |
137 | 130 |
138 // Overwrite the onDone and onError handlers. | 131 // Overwrite the onDone and onError handlers. |
139 onDone(() { result._setValue(futureValue); }); | 132 onDone(() { result._setValue(futureValue); }); |
140 onError((AsyncError error) { | 133 onError((error) { |
141 cancel(); | 134 cancel(); |
142 result._setError(error); | 135 result._setError(error); |
143 }); | 136 }); |
144 | 137 |
145 return result; | 138 return result; |
146 } | 139 } |
147 } | 140 } |
148 | 141 |
149 | 142 |
150 /** | 143 /** |
151 * Abstract superclass for subscriptions that forward to other subscriptions. | 144 * Abstract superclass for subscriptions that forward to other subscriptions. |
152 */ | 145 */ |
153 class _ForwardingStreamSubscription<S, T> | 146 class _ForwardingStreamSubscription<S, T> |
154 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { | 147 extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
155 final _ForwardingStream<S, T> _stream; | 148 final _ForwardingStream<S, T> _stream; |
156 final bool _cancelOnError; | 149 final bool _cancelOnError; |
157 | 150 |
158 StreamSubscription<S> _subscription; | 151 StreamSubscription<S> _subscription; |
159 | 152 |
160 _ForwardingStreamSubscription(this._stream, | 153 _ForwardingStreamSubscription(this._stream, |
161 void onData(T data), | 154 void onData(T data), |
162 void onError(AsyncError error), | 155 void onError(error), |
163 void onDone(), | 156 void onDone(), |
164 this._cancelOnError) | 157 this._cancelOnError) |
165 : super(onData, onError, onDone) { | 158 : super(onData, onError, onDone) { |
166 // Don't unsubscribe on incoming error, only if we send an error forwards. | 159 // Don't unsubscribe on incoming error, only if we send an error forwards. |
167 _subscription = | 160 _subscription = |
168 _stream._source.listen(_handleData, | 161 _stream._source.listen(_handleData, |
169 onError: _handleError, | 162 onError: _handleError, |
170 onDone: _handleDone); | 163 onDone: _handleDone); |
171 } | 164 } |
172 | 165 |
(...skipping 15 matching lines...) Expand all Loading... |
188 _subscription = null; | 181 _subscription = null; |
189 } | 182 } |
190 } | 183 } |
191 | 184 |
192 // _EventOutputSink interface. Sends data to this subscription. | 185 // _EventOutputSink interface. Sends data to this subscription. |
193 | 186 |
194 void _sendData(T data) { | 187 void _sendData(T data) { |
195 _onData(data); | 188 _onData(data); |
196 } | 189 } |
197 | 190 |
198 void _sendError(AsyncError error) { | 191 void _sendError(error) { |
199 _onError(error); | 192 _onError(error); |
200 if (_cancelOnError) { | 193 if (_cancelOnError) { |
201 _subscription.cancel(); | 194 _subscription.cancel(); |
202 _subscription = null; | 195 _subscription = null; |
203 } | 196 } |
204 } | 197 } |
205 | 198 |
206 void _sendDone() { | 199 void _sendDone() { |
207 // If the transformation sends a done signal, we stop the subscription. | 200 // If the transformation sends a done signal, we stop the subscription. |
208 if (_subscription != null) { | 201 if (_subscription != null) { |
209 _subscription.cancel(); | 202 _subscription.cancel(); |
210 _subscription = null; | 203 _subscription = null; |
211 } | 204 } |
212 _onDone(); | 205 _onDone(); |
213 } | 206 } |
214 | 207 |
215 // Methods used as listener on source subscription. | 208 // Methods used as listener on source subscription. |
216 | 209 |
217 // TODO(ahe): Restore type when feature is implemented in dart2js | 210 // TODO(ahe): Restore type when feature is implemented in dart2js |
218 // checked mode. http://dartbug.com/7733 | 211 // checked mode. http://dartbug.com/7733 |
219 void _handleData(/*S*/ data) { | 212 void _handleData(/*S*/ data) { |
220 _stream._handleData(data, this); | 213 _stream._handleData(data, this); |
221 } | 214 } |
222 | 215 |
223 void _handleError(AsyncError error) { | 216 void _handleError(error) { |
224 _stream._handleError(error, this); | 217 _stream._handleError(error, this); |
225 } | 218 } |
226 | 219 |
227 void _handleDone() { | 220 void _handleDone() { |
228 // On a done-event, we have already been unsubscribed. | 221 // On a done-event, we have already been unsubscribed. |
229 _subscription = null; | 222 _subscription = null; |
230 _stream._handleDone(this); | 223 _stream._handleDone(this); |
231 } | 224 } |
232 } | 225 } |
233 | 226 |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
297 } | 290 } |
298 } catch (e, s) { | 291 } catch (e, s) { |
299 // If either _expand or iterating the generated iterator throws, | 292 // If either _expand or iterating the generated iterator throws, |
300 // we abort the iteration. | 293 // we abort the iteration. |
301 sink._sendError(_asyncError(e, s)); | 294 sink._sendError(_asyncError(e, s)); |
302 } | 295 } |
303 } | 296 } |
304 } | 297 } |
305 | 298 |
306 | 299 |
307 typedef void _ErrorTransformation(AsyncError error); | 300 typedef void _ErrorTransformation(error); |
308 typedef bool _ErrorTest(error); | 301 typedef bool _ErrorTest(error); |
309 | 302 |
310 /** | 303 /** |
311 * A stream pipe that converts or disposes error events | 304 * A stream pipe that converts or disposes error events |
312 * before passing them on. | 305 * before passing them on. |
313 */ | 306 */ |
314 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 307 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
315 final _ErrorTransformation _transform; | 308 final _ErrorTransformation _transform; |
316 final _ErrorTest _test; | 309 final _ErrorTest _test; |
317 | 310 |
318 _HandleErrorStream(Stream<T> source, | 311 _HandleErrorStream(Stream<T> source, |
319 void transform(AsyncError event), | 312 void transform(event), |
320 bool test(error)) | 313 bool test(error)) |
321 : this._transform = transform, this._test = test, super(source); | 314 : this._transform = transform, this._test = test, super(source); |
322 | 315 |
323 void _handleError(AsyncError error, _EventOutputSink<T> sink) { | 316 void _handleError(Object error, _EventOutputSink<T> sink) { |
324 bool matches = true; | 317 bool matches = true; |
325 if (_test != null) { | 318 if (_test != null) { |
326 try { | 319 try { |
327 matches = _test(error.error); | 320 matches = _test(error); |
328 } catch (e, s) { | 321 } catch (e, s) { |
329 sink._sendError(_asyncError(e, s, error)); | 322 sink._sendError(_asyncError(e, s)); |
330 return; | 323 return; |
331 } | 324 } |
332 } | 325 } |
333 if (matches) { | 326 if (matches) { |
334 try { | 327 try { |
335 _transform(error); | 328 _transform(error); |
336 } catch (e, s) { | 329 } catch (e, s) { |
337 sink._sendError(_asyncError(e, s, error)); | 330 sink._sendError(_asyncError(e, s)); |
338 return; | 331 return; |
339 } | 332 } |
340 } else { | 333 } else { |
341 sink._sendError(error); | 334 sink._sendError(error); |
342 } | 335 } |
343 } | 336 } |
344 } | 337 } |
345 | 338 |
346 | 339 |
347 class _TakeStream<T> extends _ForwardingStream<T, T> { | 340 class _TakeStream<T> extends _ForwardingStream<T, T> { |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
469 sink._sendData(inputEvent); | 462 sink._sendData(inputEvent); |
470 _previous = inputEvent; | 463 _previous = inputEvent; |
471 } | 464 } |
472 } | 465 } |
473 } | 466 } |
474 } | 467 } |
475 | 468 |
476 // Stream transformations and event transformations. | 469 // Stream transformations and event transformations. |
477 | 470 |
478 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 471 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
479 typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink); | 472 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); |
480 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 473 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
481 | 474 |
482 /** Default data handler forwards all data. */ | 475 /** Default data handler forwards all data. */ |
483 void _defaultHandleData(var data, EventSink sink) { | 476 void _defaultHandleData(var data, EventSink sink) { |
484 sink.add(data); | 477 sink.add(data); |
485 } | 478 } |
486 | 479 |
487 /** Default error handler forwards all errors. */ | 480 /** Default error handler forwards all errors. */ |
488 void _defaultHandleError(AsyncError error, EventSink sink) { | 481 void _defaultHandleError(error, EventSink sink) { |
489 sink.addError(error); | 482 sink.addError(error); |
490 } | 483 } |
491 | 484 |
492 /** Default done handler forwards done. */ | 485 /** Default done handler forwards done. */ |
493 void _defaultHandleDone(EventSink sink) { | 486 void _defaultHandleDone(EventSink sink) { |
494 sink.close(); | 487 sink.close(); |
495 } | 488 } |
496 | 489 |
497 | 490 |
498 /** | 491 /** |
499 * A [StreamTransformer] that modifies stream events. | 492 * A [StreamTransformer] that modifies stream events. |
500 * | 493 * |
501 * This class is used by [StreamTransformer]'s factory constructor. | 494 * This class is used by [StreamTransformer]'s factory constructor. |
502 * It is actually an [StreamEventTransformer] where the functions used to | 495 * It is actually an [StreamEventTransformer] where the functions used to |
503 * modify the events are passed as constructor arguments. | 496 * modify the events are passed as constructor arguments. |
504 * | 497 * |
505 * If an argument is omitted, it acts as the default method from | 498 * If an argument is omitted, it acts as the default method from |
506 * [StreamEventTransformer]. | 499 * [StreamEventTransformer]. |
507 */ | 500 */ |
508 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { | 501 class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { |
509 // TODO(ahe): Restore type when feature is implemented in dart2js | 502 // TODO(ahe): Restore type when feature is implemented in dart2js |
510 // checked mode. http://dartbug.com/7733 | 503 // checked mode. http://dartbug.com/7733 |
511 final Function /*_TransformDataHandler<S, T>*/ _handleData; | 504 final Function /*_TransformDataHandler<S, T>*/ _handleData; |
512 final _TransformErrorHandler<T> _handleError; | 505 final _TransformErrorHandler<T> _handleError; |
513 final _TransformDoneHandler<T> _handleDone; | 506 final _TransformDoneHandler<T> _handleDone; |
514 | 507 |
515 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), | 508 _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), |
516 void handleError(AsyncError data, EventSink<T> sink), | 509 void handleError(data, EventSink<T> sink), |
517 void handleDone(EventSink<T> sink)) | 510 void handleDone(EventSink<T> sink)) |
518 : this._handleData = (handleData == null ? _defaultHandleData | 511 : this._handleData = (handleData == null ? _defaultHandleData |
519 : handleData), | 512 : handleData), |
520 this._handleError = (handleError == null ? _defaultHandleError | 513 this._handleError = (handleError == null ? _defaultHandleError |
521 : handleError), | 514 : handleError), |
522 this._handleDone = (handleDone == null ? _defaultHandleDone | 515 this._handleDone = (handleDone == null ? _defaultHandleDone |
523 : handleDone); | 516 : handleDone); |
524 | 517 |
525 void handleData(S data, EventSink<T> sink) { | 518 void handleData(S data, EventSink<T> sink) { |
526 _handleData(data, sink); | 519 _handleData(data, sink); |
527 } | 520 } |
528 | 521 |
529 void handleError(AsyncError error, EventSink<T> sink) { | 522 void handleError(error, EventSink<T> sink) { |
530 _handleError(error, sink); | 523 _handleError(error, sink); |
531 } | 524 } |
532 | 525 |
533 void handleDone(EventSink<T> sink) { | 526 void handleDone(EventSink<T> sink) { |
534 _handleDone(sink); | 527 _handleDone(sink); |
535 } | 528 } |
536 } | 529 } |
537 | 530 |
OLD | NEW |