OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** | 7 /** |
8 * Utility function to attach a stack trace to an [error] if it doesn't have | 8 * Utility function to attach a stack trace to an [error] if it doesn't have |
9 * one already. | 9 * one already. |
10 */ | 10 */ |
11 _asyncError(Object error, Object stackTrace) { | 11 _asyncError(Object error, Object stackTrace) { |
12 if (stackTrace == null) return error; | 12 if (stackTrace == null) return error; |
13 if (getAttachedStackTrace(error) != null) return error; | 13 if (getAttachedStackTrace(error) != null) return error; |
14 _attachStackTrace(error, stackTrace); | 14 _attachStackTrace(error, stackTrace); |
15 return error; | 15 return error; |
16 } | 16 } |
17 | 17 |
18 /** Runs user code and takes actions depending on success or failure. */ | 18 /** Runs user code and takes actions depending on success or failure. */ |
19 _runUserCode(userCode(), onSuccess(value), onError(error)) { | 19 _runUserCode(userCode(), |
| 20 onSuccess(value), |
| 21 onError(error, StackTrace stackTrace)) { |
20 try { | 22 try { |
21 onSuccess(userCode()); | 23 onSuccess(userCode()); |
22 } catch (e, s) { | 24 } catch (e, s) { |
23 onError(_asyncError(e, s)); | 25 onError(_asyncError(e, s), s); |
24 } | 26 } |
25 } | 27 } |
26 | 28 |
27 /** Helper function to make an onError argument to [_runUserCode]. */ | 29 /** Helper function to make an onError argument to [_runUserCode]. */ |
28 _cancelAndError(StreamSubscription subscription, _Future future) => | 30 _cancelAndError(StreamSubscription subscription, _Future future) => |
29 (error) { | 31 (error, StackTrace stackTrace) { |
30 subscription.cancel(); | 32 subscription.cancel(); |
31 future._completeError(error); | 33 future._completeError(error, stackTrace); |
32 }; | 34 }; |
33 | 35 |
34 | 36 |
35 /** | 37 /** |
36 * A [Stream] that forwards subscriptions to another stream. | 38 * A [Stream] that forwards subscriptions to another stream. |
37 * | 39 * |
38 * This stream implements [Stream], but forwards all subscriptions | 40 * This stream implements [Stream], but forwards all subscriptions |
39 * to an underlying stream, and wraps the returned subscription to | 41 * to an underlying stream, and wraps the returned subscription to |
40 * modify the events on the way. | 42 * modify the events on the way. |
41 * | 43 * |
42 * This class is intended for internal use only. | 44 * This class is intended for internal use only. |
43 */ | 45 */ |
44 abstract class _ForwardingStream<S, T> extends Stream<T> { | 46 abstract class _ForwardingStream<S, T> extends Stream<T> { |
45 final Stream<S> _source; | 47 final Stream<S> _source; |
46 | 48 |
47 _ForwardingStream(this._source); | 49 _ForwardingStream(this._source); |
48 | 50 |
49 bool get isBroadcast => _source.isBroadcast; | 51 bool get isBroadcast => _source.isBroadcast; |
50 | 52 |
51 StreamSubscription<T> listen(void onData(T value), | 53 StreamSubscription<T> listen(void onData(T value), |
52 { void onError(error), | 54 { Function onError, |
53 void onDone(), | 55 void onDone(), |
54 bool cancelOnError }) { | 56 bool cancelOnError }) { |
55 if (onData == null) onData = _nullDataHandler; | 57 if (onData == null) onData = _nullDataHandler; |
56 if (onError == null) onError = _nullErrorHandler; | 58 if (onError == null) onError = _nullErrorHandler; |
57 if (onDone == null) onDone = _nullDoneHandler; | 59 if (onDone == null) onDone = _nullDoneHandler; |
58 cancelOnError = identical(true, cancelOnError); | 60 cancelOnError = identical(true, cancelOnError); |
59 return _createSubscription(onData, onError, onDone, cancelOnError); | 61 return _createSubscription(onData, onError, onDone, cancelOnError); |
60 } | 62 } |
61 | 63 |
62 StreamSubscription<T> _createSubscription(void onData(T value), | 64 StreamSubscription<T> _createSubscription(void onData(T value), |
63 void onError(error), | 65 Function onError, |
64 void onDone(), | 66 void onDone(), |
65 bool cancelOnError) { | 67 bool cancelOnError) { |
66 return new _ForwardingStreamSubscription<S, T>( | 68 return new _ForwardingStreamSubscription<S, T>( |
67 this, onData, onError, onDone, cancelOnError); | 69 this, onData, onError, onDone, cancelOnError); |
68 } | 70 } |
69 | 71 |
70 // Override the following methods in subclasses to change the behavior. | 72 // Override the following methods in subclasses to change the behavior. |
71 | 73 |
72 void _handleData(S data, _EventSink<T> sink) { | 74 void _handleData(S data, _EventSink<T> sink) { |
73 var outputData = data; | 75 var outputData = data; |
74 sink._add(outputData); | 76 sink._add(outputData); |
75 } | 77 } |
76 | 78 |
77 void _handleError(error, _EventSink<T> sink) { | 79 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
78 sink._addError(error); | 80 sink._addError(error, stackTrace); |
79 } | 81 } |
80 | 82 |
81 void _handleDone(_EventSink<T> sink) { | 83 void _handleDone(_EventSink<T> sink) { |
82 sink._close(); | 84 sink._close(); |
83 } | 85 } |
84 } | 86 } |
85 | 87 |
86 /** | 88 /** |
87 * Abstract superclass for subscriptions that forward to other subscriptions. | 89 * Abstract superclass for subscriptions that forward to other subscriptions. |
88 */ | 90 */ |
89 class _ForwardingStreamSubscription<S, T> | 91 class _ForwardingStreamSubscription<S, T> |
90 extends _BufferingStreamSubscription<T> { | 92 extends _BufferingStreamSubscription<T> { |
91 final _ForwardingStream<S, T> _stream; | 93 final _ForwardingStream<S, T> _stream; |
92 | 94 |
93 StreamSubscription<S> _subscription; | 95 StreamSubscription<S> _subscription; |
94 | 96 |
95 _ForwardingStreamSubscription(this._stream, | 97 _ForwardingStreamSubscription(this._stream, |
96 void onData(T data), | 98 void onData(T data), |
97 void onError(error), | 99 Function onError, |
98 void onDone(), | 100 void onDone(), |
99 bool cancelOnError) | 101 bool cancelOnError) |
100 : super(onData, onError, onDone, cancelOnError) { | 102 : super(onData, onError, onDone, cancelOnError) { |
101 _subscription = | 103 _subscription = |
102 _stream._source.listen(_handleData, | 104 _stream._source.listen(_handleData, |
103 onError: _handleError, | 105 onError: _handleError, |
104 onDone: _handleDone); | 106 onDone: _handleDone); |
105 } | 107 } |
106 | 108 |
107 // _StreamSink interface. | 109 // _StreamSink interface. |
108 // Transformers sending more than one event have no way to know if the stream | 110 // Transformers sending more than one event have no way to know if the stream |
109 // is canceled or closed after the first, so we just ignore remaining events. | 111 // is canceled or closed after the first, so we just ignore remaining events. |
110 | 112 |
111 void _add(T data) { | 113 void _add(T data) { |
112 if (_isClosed) return; | 114 if (_isClosed) return; |
113 super._add(data); | 115 super._add(data); |
114 } | 116 } |
115 | 117 |
116 void _addError(Object error) { | 118 void _addError(Object error, StackTrace stackTrace) { |
117 if (_isClosed) return; | 119 if (_isClosed) return; |
118 super._addError(error); | 120 super._addError(error, stackTrace); |
119 } | 121 } |
120 | 122 |
121 // StreamSubscription callbacks. | 123 // StreamSubscription callbacks. |
122 | 124 |
123 void _onPause() { | 125 void _onPause() { |
124 if (_subscription == null) return; | 126 if (_subscription == null) return; |
125 _subscription.pause(); | 127 _subscription.pause(); |
126 } | 128 } |
127 | 129 |
128 void _onResume() { | 130 void _onResume() { |
129 if (_subscription == null) return; | 131 if (_subscription == null) return; |
130 _subscription.resume(); | 132 _subscription.resume(); |
131 } | 133 } |
132 | 134 |
133 void _onCancel() { | 135 void _onCancel() { |
134 if (_subscription != null) { | 136 if (_subscription != null) { |
135 StreamSubscription subscription = _subscription; | 137 StreamSubscription subscription = _subscription; |
136 _subscription = null; | 138 _subscription = null; |
137 subscription.cancel(); | 139 subscription.cancel(); |
138 } | 140 } |
139 } | 141 } |
140 | 142 |
141 // Methods used as listener on source subscription. | 143 // Methods used as listener on source subscription. |
142 | 144 |
143 void _handleData(S data) { | 145 void _handleData(S data) { |
144 _stream._handleData(data, this); | 146 _stream._handleData(data, this); |
145 } | 147 } |
146 | 148 |
147 void _handleError(error) { | 149 void _handleError(error, StackTrace stackTrace) { |
148 _stream._handleError(error, this); | 150 _stream._handleError(error, stackTrace, this); |
149 } | 151 } |
150 | 152 |
151 void _handleDone() { | 153 void _handleDone() { |
152 _stream._handleDone(this); | 154 _stream._handleDone(this); |
153 } | 155 } |
154 } | 156 } |
155 | 157 |
156 // ------------------------------------------------------------------- | 158 // ------------------------------------------------------------------- |
157 // Stream transformers used by the default Stream implementation. | 159 // Stream transformers used by the default Stream implementation. |
158 // ------------------------------------------------------------------- | 160 // ------------------------------------------------------------------- |
159 | 161 |
160 typedef bool _Predicate<T>(T value); | 162 typedef bool _Predicate<T>(T value); |
161 | 163 |
162 class _WhereStream<T> extends _ForwardingStream<T, T> { | 164 class _WhereStream<T> extends _ForwardingStream<T, T> { |
163 final _Predicate<T> _test; | 165 final _Predicate<T> _test; |
164 | 166 |
165 _WhereStream(Stream<T> source, bool test(T value)) | 167 _WhereStream(Stream<T> source, bool test(T value)) |
166 : _test = test, super(source); | 168 : _test = test, super(source); |
167 | 169 |
168 void _handleData(T inputEvent, _EventSink<T> sink) { | 170 void _handleData(T inputEvent, _EventSink<T> sink) { |
169 bool satisfies; | 171 bool satisfies; |
170 try { | 172 try { |
171 satisfies = _test(inputEvent); | 173 satisfies = _test(inputEvent); |
172 } catch (e, s) { | 174 } catch (e, s) { |
173 sink._addError(_asyncError(e, s)); | 175 sink._addError(_asyncError(e, s), s); |
174 return; | 176 return; |
175 } | 177 } |
176 if (satisfies) { | 178 if (satisfies) { |
177 sink._add(inputEvent); | 179 sink._add(inputEvent); |
178 } | 180 } |
179 } | 181 } |
180 } | 182 } |
181 | 183 |
182 | 184 |
183 typedef T _Transformation<S, T>(S value); | 185 typedef T _Transformation<S, T>(S value); |
184 | 186 |
185 /** | 187 /** |
186 * A stream pipe that converts data events before passing them on. | 188 * A stream pipe that converts data events before passing them on. |
187 */ | 189 */ |
188 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 190 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
189 final _Transformation _transform; | 191 final _Transformation _transform; |
190 | 192 |
191 _MapStream(Stream<S> source, T transform(S event)) | 193 _MapStream(Stream<S> source, T transform(S event)) |
192 : this._transform = transform, super(source); | 194 : this._transform = transform, super(source); |
193 | 195 |
194 void _handleData(S inputEvent, _EventSink<T> sink) { | 196 void _handleData(S inputEvent, _EventSink<T> sink) { |
195 T outputEvent; | 197 T outputEvent; |
196 try { | 198 try { |
197 outputEvent = _transform(inputEvent); | 199 outputEvent = _transform(inputEvent); |
198 } catch (e, s) { | 200 } catch (e, s) { |
199 sink._addError(_asyncError(e, s)); | 201 sink._addError(_asyncError(e, s), s); |
200 return; | 202 return; |
201 } | 203 } |
202 sink._add(outputEvent); | 204 sink._add(outputEvent); |
203 } | 205 } |
204 } | 206 } |
205 | 207 |
206 /** | 208 /** |
207 * A stream pipe that converts data events before passing them on. | 209 * A stream pipe that converts data events before passing them on. |
208 */ | 210 */ |
209 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 211 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
210 final _Transformation<S, Iterable<T>> _expand; | 212 final _Transformation<S, Iterable<T>> _expand; |
211 | 213 |
212 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 214 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
213 : this._expand = expand, super(source); | 215 : this._expand = expand, super(source); |
214 | 216 |
215 void _handleData(S inputEvent, _EventSink<T> sink) { | 217 void _handleData(S inputEvent, _EventSink<T> sink) { |
216 try { | 218 try { |
217 for (T value in _expand(inputEvent)) { | 219 for (T value in _expand(inputEvent)) { |
218 sink._add(value); | 220 sink._add(value); |
219 } | 221 } |
220 } catch (e, s) { | 222 } catch (e, s) { |
221 // If either _expand or iterating the generated iterator throws, | 223 // If either _expand or iterating the generated iterator throws, |
222 // we abort the iteration. | 224 // we abort the iteration. |
223 sink._addError(_asyncError(e, s)); | 225 sink._addError(_asyncError(e, s), s); |
224 } | 226 } |
225 } | 227 } |
226 } | 228 } |
227 | 229 |
228 | 230 |
229 typedef void _ErrorTransformation(error); | |
230 typedef bool _ErrorTest(error); | 231 typedef bool _ErrorTest(error); |
231 | 232 |
232 /** | 233 /** |
233 * A stream pipe that converts or disposes error events | 234 * A stream pipe that converts or disposes error events |
234 * before passing them on. | 235 * before passing them on. |
235 */ | 236 */ |
236 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 237 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
237 final _ErrorTransformation _transform; | 238 final Function _transform; |
238 final _ErrorTest _test; | 239 final _ErrorTest _test; |
239 | 240 |
240 _HandleErrorStream(Stream<T> source, | 241 _HandleErrorStream(Stream<T> source, |
241 void transform(event), | 242 Function onError, |
242 bool test(error)) | 243 bool test(error)) |
243 : this._transform = transform, this._test = test, super(source); | 244 : this._transform = onError, this._test = test, super(source); |
244 | 245 |
245 void _handleError(Object error, _EventSink<T> sink) { | 246 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
246 bool matches = true; | 247 bool matches = true; |
247 if (_test != null) { | 248 if (_test != null) { |
248 try { | 249 try { |
249 matches = _test(error); | 250 matches = _test(error); |
250 } catch (e, s) { | 251 } catch (e, s) { |
251 sink._addError(_asyncError(e, s)); | 252 sink._addError(_asyncError(e, s), s); |
252 return; | 253 return; |
253 } | 254 } |
254 } | 255 } |
255 if (matches) { | 256 if (matches) { |
256 try { | 257 try { |
257 _transform(error); | 258 if (_transform is ZoneBinaryCallback) { |
| 259 _transform(error, stackTrace); |
| 260 } else { |
| 261 _transform(error); |
| 262 } |
258 } catch (e, s) { | 263 } catch (e, s) { |
259 sink._addError(_asyncError(e, s)); | 264 if (identical(e, error)) { |
| 265 sink._addError(error, stackTrace); |
| 266 } else { |
| 267 sink._addError(_asyncError(e, s), s); |
| 268 } |
260 return; | 269 return; |
261 } | 270 } |
262 } else { | 271 } else { |
263 sink._addError(error); | 272 sink._addError(error, stackTrace); |
264 } | 273 } |
265 } | 274 } |
266 } | 275 } |
267 | 276 |
268 | 277 |
269 class _TakeStream<T> extends _ForwardingStream<T, T> { | 278 class _TakeStream<T> extends _ForwardingStream<T, T> { |
270 int _remaining; | 279 int _remaining; |
271 | 280 |
272 _TakeStream(Stream<T> source, int count) | 281 _TakeStream(Stream<T> source, int count) |
273 : this._remaining = count, super(source) { | 282 : this._remaining = count, super(source) { |
(...skipping 20 matching lines...) Expand all Loading... |
294 final _Predicate<T> _test; | 303 final _Predicate<T> _test; |
295 | 304 |
296 _TakeWhileStream(Stream<T> source, bool test(T value)) | 305 _TakeWhileStream(Stream<T> source, bool test(T value)) |
297 : this._test = test, super(source); | 306 : this._test = test, super(source); |
298 | 307 |
299 void _handleData(T inputEvent, _EventSink<T> sink) { | 308 void _handleData(T inputEvent, _EventSink<T> sink) { |
300 bool satisfies; | 309 bool satisfies; |
301 try { | 310 try { |
302 satisfies = _test(inputEvent); | 311 satisfies = _test(inputEvent); |
303 } catch (e, s) { | 312 } catch (e, s) { |
304 sink._addError(_asyncError(e, s)); | 313 sink._addError(_asyncError(e, s), s); |
305 // The test didn't say true. Didn't say false either, but we stop anyway. | 314 // The test didn't say true. Didn't say false either, but we stop anyway. |
306 sink._close(); | 315 sink._close(); |
307 return; | 316 return; |
308 } | 317 } |
309 if (satisfies) { | 318 if (satisfies) { |
310 sink._add(inputEvent); | 319 sink._add(inputEvent); |
311 } else { | 320 } else { |
312 sink._close(); | 321 sink._close(); |
313 } | 322 } |
314 } | 323 } |
(...skipping 27 matching lines...) Expand all Loading... |
342 | 351 |
343 void _handleData(T inputEvent, _EventSink<T> sink) { | 352 void _handleData(T inputEvent, _EventSink<T> sink) { |
344 if (_hasFailed) { | 353 if (_hasFailed) { |
345 sink._add(inputEvent); | 354 sink._add(inputEvent); |
346 return; | 355 return; |
347 } | 356 } |
348 bool satisfies; | 357 bool satisfies; |
349 try { | 358 try { |
350 satisfies = _test(inputEvent); | 359 satisfies = _test(inputEvent); |
351 } catch (e, s) { | 360 } catch (e, s) { |
352 sink._addError(_asyncError(e, s)); | 361 sink._addError(_asyncError(e, s), s); |
353 // A failure to return a boolean is considered "not matching". | 362 // A failure to return a boolean is considered "not matching". |
354 _hasFailed = true; | 363 _hasFailed = true; |
355 return; | 364 return; |
356 } | 365 } |
357 if (!satisfies) { | 366 if (!satisfies) { |
358 _hasFailed = true; | 367 _hasFailed = true; |
359 sink._add(inputEvent); | 368 sink._add(inputEvent); |
360 } | 369 } |
361 } | 370 } |
362 } | 371 } |
(...skipping 15 matching lines...) Expand all Loading... |
378 return sink._add(inputEvent); | 387 return sink._add(inputEvent); |
379 } else { | 388 } else { |
380 bool isEqual; | 389 bool isEqual; |
381 try { | 390 try { |
382 if (_equals == null) { | 391 if (_equals == null) { |
383 isEqual = (_previous == inputEvent); | 392 isEqual = (_previous == inputEvent); |
384 } else { | 393 } else { |
385 isEqual = _equals(_previous, inputEvent); | 394 isEqual = _equals(_previous, inputEvent); |
386 } | 395 } |
387 } catch (e, s) { | 396 } catch (e, s) { |
388 sink._addError(_asyncError(e, s)); | 397 sink._addError(_asyncError(e, s), s); |
389 return null; | 398 return null; |
390 } | 399 } |
391 if (!isEqual) { | 400 if (!isEqual) { |
392 sink._add(inputEvent); | 401 sink._add(inputEvent); |
393 _previous = inputEvent; | 402 _previous = inputEvent; |
394 } | 403 } |
395 } | 404 } |
396 } | 405 } |
397 } | 406 } |
398 | 407 |
399 // Stream transformations and event transformations. | 408 // Stream transformations and event transformations. |
400 | 409 |
401 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 410 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
402 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); | 411 typedef void _TransformErrorHandler<T>(Object error, EventSink<T> sink); |
403 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 412 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
404 | 413 |
405 /** Default data handler forwards all data. */ | 414 /** Default data handler forwards all data. */ |
406 void _defaultHandleData(var data, EventSink sink) { | 415 void _defaultHandleData(var data, EventSink sink) { |
407 sink.add(data); | 416 sink.add(data); |
408 } | 417 } |
409 | 418 |
410 /** Default error handler forwards all errors. */ | 419 /** Default error handler forwards all errors. */ |
411 void _defaultHandleError(error, EventSink sink) { | 420 void _defaultHandleError(error, EventSink sink) { |
412 sink.addError(error); | 421 sink.addError(error); |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
449 | 458 |
450 void handleError(error, EventSink<T> sink) { | 459 void handleError(error, EventSink<T> sink) { |
451 _handleError(error, sink); | 460 _handleError(error, sink); |
452 } | 461 } |
453 | 462 |
454 void handleDone(EventSink<T> sink) { | 463 void handleDone(EventSink<T> sink) { |
455 _handleDone(sink); | 464 _handleDone(sink); |
456 } | 465 } |
457 } | 466 } |
458 | 467 |
OLD | NEW |