| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Utility function to attach a stack trace to an [error] if it doesn't have | 8 * Utility function to attach a stack trace to an [error] if it doesn't have |
| 9 * one already. | 9 * one already. |
| 10 */ | 10 */ |
| 11 _asyncError(Object error, Object stackTrace) { | 11 _asyncError(Object error, Object stackTrace) { |
| 12 if (stackTrace == null) return error; | 12 if (stackTrace == null) return error; |
| 13 if (getAttachedStackTrace(error) != null) return error; | 13 if (getAttachedStackTrace(error) != null) return error; |
| 14 _attachStackTrace(error, stackTrace); | 14 _attachStackTrace(error, stackTrace); |
| 15 return error; | 15 return error; |
| 16 } | 16 } |
| 17 | 17 |
| 18 /** Runs user code and takes actions depending on success or failure. */ | 18 /** Runs user code and takes actions depending on success or failure. */ |
| 19 _runUserCode(userCode(), onSuccess(value), onError(error)) { | 19 _runUserCode(userCode(), |
| 20 onSuccess(value), |
| 21 onError(error, StackTrace stackTrace)) { |
| 20 try { | 22 try { |
| 21 onSuccess(userCode()); | 23 onSuccess(userCode()); |
| 22 } catch (e, s) { | 24 } catch (e, s) { |
| 23 onError(_asyncError(e, s)); | 25 onError(_asyncError(e, s), s); |
| 24 } | 26 } |
| 25 } | 27 } |
| 26 | 28 |
| 27 /** Helper function to make an onError argument to [_runUserCode]. */ | 29 /** Helper function to make an onError argument to [_runUserCode]. */ |
| 28 _cancelAndError(StreamSubscription subscription, _Future future) => | 30 _cancelAndError(StreamSubscription subscription, _Future future) => |
| 29 (error) { | 31 (error, StackTrace stackTrace) { |
| 30 subscription.cancel(); | 32 subscription.cancel(); |
| 31 future._completeError(error); | 33 future._completeError(error, stackTrace); |
| 32 }; | 34 }; |
| 33 | 35 |
| 34 | 36 |
| 35 /** | 37 /** |
| 36 * A [Stream] that forwards subscriptions to another stream. | 38 * A [Stream] that forwards subscriptions to another stream. |
| 37 * | 39 * |
| 38 * This stream implements [Stream], but forwards all subscriptions | 40 * This stream implements [Stream], but forwards all subscriptions |
| 39 * to an underlying stream, and wraps the returned subscription to | 41 * to an underlying stream, and wraps the returned subscription to |
| 40 * modify the events on the way. | 42 * modify the events on the way. |
| 41 * | 43 * |
| 42 * This class is intended for internal use only. | 44 * This class is intended for internal use only. |
| 43 */ | 45 */ |
| 44 abstract class _ForwardingStream<S, T> extends Stream<T> { | 46 abstract class _ForwardingStream<S, T> extends Stream<T> { |
| 45 final Stream<S> _source; | 47 final Stream<S> _source; |
| 46 | 48 |
| 47 _ForwardingStream(this._source); | 49 _ForwardingStream(this._source); |
| 48 | 50 |
| 49 bool get isBroadcast => _source.isBroadcast; | 51 bool get isBroadcast => _source.isBroadcast; |
| 50 | 52 |
| 51 StreamSubscription<T> listen(void onData(T value), | 53 StreamSubscription<T> listen(void onData(T value), |
| 52 { void onError(error), | 54 { Function onError, |
| 53 void onDone(), | 55 void onDone(), |
| 54 bool cancelOnError }) { | 56 bool cancelOnError }) { |
| 55 if (onData == null) onData = _nullDataHandler; | 57 if (onData == null) onData = _nullDataHandler; |
| 56 if (onError == null) onError = _nullErrorHandler; | 58 if (onError == null) onError = _nullErrorHandler; |
| 57 if (onDone == null) onDone = _nullDoneHandler; | 59 if (onDone == null) onDone = _nullDoneHandler; |
| 58 cancelOnError = identical(true, cancelOnError); | 60 cancelOnError = identical(true, cancelOnError); |
| 59 return _createSubscription(onData, onError, onDone, cancelOnError); | 61 return _createSubscription(onData, onError, onDone, cancelOnError); |
| 60 } | 62 } |
| 61 | 63 |
| 62 StreamSubscription<T> _createSubscription(void onData(T value), | 64 StreamSubscription<T> _createSubscription(void onData(T value), |
| 63 void onError(error), | 65 Function onError, |
| 64 void onDone(), | 66 void onDone(), |
| 65 bool cancelOnError) { | 67 bool cancelOnError) { |
| 66 return new _ForwardingStreamSubscription<S, T>( | 68 return new _ForwardingStreamSubscription<S, T>( |
| 67 this, onData, onError, onDone, cancelOnError); | 69 this, onData, onError, onDone, cancelOnError); |
| 68 } | 70 } |
| 69 | 71 |
| 70 // Override the following methods in subclasses to change the behavior. | 72 // Override the following methods in subclasses to change the behavior. |
| 71 | 73 |
| 72 void _handleData(S data, _EventSink<T> sink) { | 74 void _handleData(S data, _EventSink<T> sink) { |
| 73 var outputData = data; | 75 var outputData = data; |
| 74 sink._add(outputData); | 76 sink._add(outputData); |
| 75 } | 77 } |
| 76 | 78 |
| 77 void _handleError(error, _EventSink<T> sink) { | 79 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
| 78 sink._addError(error); | 80 sink._addError(error, stackTrace); |
| 79 } | 81 } |
| 80 | 82 |
| 81 void _handleDone(_EventSink<T> sink) { | 83 void _handleDone(_EventSink<T> sink) { |
| 82 sink._close(); | 84 sink._close(); |
| 83 } | 85 } |
| 84 } | 86 } |
| 85 | 87 |
| 86 /** | 88 /** |
| 87 * Abstract superclass for subscriptions that forward to other subscriptions. | 89 * Abstract superclass for subscriptions that forward to other subscriptions. |
| 88 */ | 90 */ |
| 89 class _ForwardingStreamSubscription<S, T> | 91 class _ForwardingStreamSubscription<S, T> |
| 90 extends _BufferingStreamSubscription<T> { | 92 extends _BufferingStreamSubscription<T> { |
| 91 final _ForwardingStream<S, T> _stream; | 93 final _ForwardingStream<S, T> _stream; |
| 92 | 94 |
| 93 StreamSubscription<S> _subscription; | 95 StreamSubscription<S> _subscription; |
| 94 | 96 |
| 95 _ForwardingStreamSubscription(this._stream, | 97 _ForwardingStreamSubscription(this._stream, |
| 96 void onData(T data), | 98 void onData(T data), |
| 97 void onError(error), | 99 Function onError, |
| 98 void onDone(), | 100 void onDone(), |
| 99 bool cancelOnError) | 101 bool cancelOnError) |
| 100 : super(onData, onError, onDone, cancelOnError) { | 102 : super(onData, onError, onDone, cancelOnError) { |
| 101 _subscription = | 103 _subscription = |
| 102 _stream._source.listen(_handleData, | 104 _stream._source.listen(_handleData, |
| 103 onError: _handleError, | 105 onError: _handleError, |
| 104 onDone: _handleDone); | 106 onDone: _handleDone); |
| 105 } | 107 } |
| 106 | 108 |
| 107 // _StreamSink interface. | 109 // _StreamSink interface. |
| 108 // Transformers sending more than one event have no way to know if the stream | 110 // Transformers sending more than one event have no way to know if the stream |
| 109 // is canceled or closed after the first, so we just ignore remaining events. | 111 // is canceled or closed after the first, so we just ignore remaining events. |
| 110 | 112 |
| 111 void _add(T data) { | 113 void _add(T data) { |
| 112 if (_isClosed) return; | 114 if (_isClosed) return; |
| 113 super._add(data); | 115 super._add(data); |
| 114 } | 116 } |
| 115 | 117 |
| 116 void _addError(Object error) { | 118 void _addError(Object error, StackTrace stackTrace) { |
| 117 if (_isClosed) return; | 119 if (_isClosed) return; |
| 118 super._addError(error); | 120 super._addError(error, stackTrace); |
| 119 } | 121 } |
| 120 | 122 |
| 121 // StreamSubscription callbacks. | 123 // StreamSubscription callbacks. |
| 122 | 124 |
| 123 void _onPause() { | 125 void _onPause() { |
| 124 if (_subscription == null) return; | 126 if (_subscription == null) return; |
| 125 _subscription.pause(); | 127 _subscription.pause(); |
| 126 } | 128 } |
| 127 | 129 |
| 128 void _onResume() { | 130 void _onResume() { |
| 129 if (_subscription == null) return; | 131 if (_subscription == null) return; |
| 130 _subscription.resume(); | 132 _subscription.resume(); |
| 131 } | 133 } |
| 132 | 134 |
| 133 void _onCancel() { | 135 void _onCancel() { |
| 134 if (_subscription != null) { | 136 if (_subscription != null) { |
| 135 StreamSubscription subscription = _subscription; | 137 StreamSubscription subscription = _subscription; |
| 136 _subscription = null; | 138 _subscription = null; |
| 137 subscription.cancel(); | 139 subscription.cancel(); |
| 138 } | 140 } |
| 139 } | 141 } |
| 140 | 142 |
| 141 // Methods used as listener on source subscription. | 143 // Methods used as listener on source subscription. |
| 142 | 144 |
| 143 void _handleData(S data) { | 145 void _handleData(S data) { |
| 144 _stream._handleData(data, this); | 146 _stream._handleData(data, this); |
| 145 } | 147 } |
| 146 | 148 |
| 147 void _handleError(error) { | 149 void _handleError(error, StackTrace stackTrace) { |
| 148 _stream._handleError(error, this); | 150 _stream._handleError(error, stackTrace, this); |
| 149 } | 151 } |
| 150 | 152 |
| 151 void _handleDone() { | 153 void _handleDone() { |
| 152 _stream._handleDone(this); | 154 _stream._handleDone(this); |
| 153 } | 155 } |
| 154 } | 156 } |
| 155 | 157 |
| 156 // ------------------------------------------------------------------- | 158 // ------------------------------------------------------------------- |
| 157 // Stream transformers used by the default Stream implementation. | 159 // Stream transformers used by the default Stream implementation. |
| 158 // ------------------------------------------------------------------- | 160 // ------------------------------------------------------------------- |
| 159 | 161 |
| 160 typedef bool _Predicate<T>(T value); | 162 typedef bool _Predicate<T>(T value); |
| 161 | 163 |
| 162 class _WhereStream<T> extends _ForwardingStream<T, T> { | 164 class _WhereStream<T> extends _ForwardingStream<T, T> { |
| 163 final _Predicate<T> _test; | 165 final _Predicate<T> _test; |
| 164 | 166 |
| 165 _WhereStream(Stream<T> source, bool test(T value)) | 167 _WhereStream(Stream<T> source, bool test(T value)) |
| 166 : _test = test, super(source); | 168 : _test = test, super(source); |
| 167 | 169 |
| 168 void _handleData(T inputEvent, _EventSink<T> sink) { | 170 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 169 bool satisfies; | 171 bool satisfies; |
| 170 try { | 172 try { |
| 171 satisfies = _test(inputEvent); | 173 satisfies = _test(inputEvent); |
| 172 } catch (e, s) { | 174 } catch (e, s) { |
| 173 sink._addError(_asyncError(e, s)); | 175 sink._addError(_asyncError(e, s), s); |
| 174 return; | 176 return; |
| 175 } | 177 } |
| 176 if (satisfies) { | 178 if (satisfies) { |
| 177 sink._add(inputEvent); | 179 sink._add(inputEvent); |
| 178 } | 180 } |
| 179 } | 181 } |
| 180 } | 182 } |
| 181 | 183 |
| 182 | 184 |
| 183 typedef T _Transformation<S, T>(S value); | 185 typedef T _Transformation<S, T>(S value); |
| 184 | 186 |
| 185 /** | 187 /** |
| 186 * A stream pipe that converts data events before passing them on. | 188 * A stream pipe that converts data events before passing them on. |
| 187 */ | 189 */ |
| 188 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 190 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| 189 final _Transformation _transform; | 191 final _Transformation _transform; |
| 190 | 192 |
| 191 _MapStream(Stream<S> source, T transform(S event)) | 193 _MapStream(Stream<S> source, T transform(S event)) |
| 192 : this._transform = transform, super(source); | 194 : this._transform = transform, super(source); |
| 193 | 195 |
| 194 void _handleData(S inputEvent, _EventSink<T> sink) { | 196 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 195 T outputEvent; | 197 T outputEvent; |
| 196 try { | 198 try { |
| 197 outputEvent = _transform(inputEvent); | 199 outputEvent = _transform(inputEvent); |
| 198 } catch (e, s) { | 200 } catch (e, s) { |
| 199 sink._addError(_asyncError(e, s)); | 201 sink._addError(_asyncError(e, s), s); |
| 200 return; | 202 return; |
| 201 } | 203 } |
| 202 sink._add(outputEvent); | 204 sink._add(outputEvent); |
| 203 } | 205 } |
| 204 } | 206 } |
| 205 | 207 |
| 206 /** | 208 /** |
| 207 * A stream pipe that converts data events before passing them on. | 209 * A stream pipe that converts data events before passing them on. |
| 208 */ | 210 */ |
| 209 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 211 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| 210 final _Transformation<S, Iterable<T>> _expand; | 212 final _Transformation<S, Iterable<T>> _expand; |
| 211 | 213 |
| 212 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 214 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
| 213 : this._expand = expand, super(source); | 215 : this._expand = expand, super(source); |
| 214 | 216 |
| 215 void _handleData(S inputEvent, _EventSink<T> sink) { | 217 void _handleData(S inputEvent, _EventSink<T> sink) { |
| 216 try { | 218 try { |
| 217 for (T value in _expand(inputEvent)) { | 219 for (T value in _expand(inputEvent)) { |
| 218 sink._add(value); | 220 sink._add(value); |
| 219 } | 221 } |
| 220 } catch (e, s) { | 222 } catch (e, s) { |
| 221 // If either _expand or iterating the generated iterator throws, | 223 // If either _expand or iterating the generated iterator throws, |
| 222 // we abort the iteration. | 224 // we abort the iteration. |
| 223 sink._addError(_asyncError(e, s)); | 225 sink._addError(_asyncError(e, s), s); |
| 224 } | 226 } |
| 225 } | 227 } |
| 226 } | 228 } |
| 227 | 229 |
| 228 | 230 |
| 229 typedef void _ErrorTransformation(error); | |
| 230 typedef bool _ErrorTest(error); | 231 typedef bool _ErrorTest(error); |
| 231 | 232 |
| 232 /** | 233 /** |
| 233 * A stream pipe that converts or disposes error events | 234 * A stream pipe that converts or disposes error events |
| 234 * before passing them on. | 235 * before passing them on. |
| 235 */ | 236 */ |
| 236 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 237 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| 237 final _ErrorTransformation _transform; | 238 final Function _transform; |
| 238 final _ErrorTest _test; | 239 final _ErrorTest _test; |
| 239 | 240 |
| 240 _HandleErrorStream(Stream<T> source, | 241 _HandleErrorStream(Stream<T> source, |
| 241 void transform(event), | 242 Function onError, |
| 242 bool test(error)) | 243 bool test(error)) |
| 243 : this._transform = transform, this._test = test, super(source); | 244 : this._transform = onError, this._test = test, super(source); |
| 244 | 245 |
| 245 void _handleError(Object error, _EventSink<T> sink) { | 246 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
| 246 bool matches = true; | 247 bool matches = true; |
| 247 if (_test != null) { | 248 if (_test != null) { |
| 248 try { | 249 try { |
| 249 matches = _test(error); | 250 matches = _test(error); |
| 250 } catch (e, s) { | 251 } catch (e, s) { |
| 251 sink._addError(_asyncError(e, s)); | 252 sink._addError(_asyncError(e, s), s); |
| 252 return; | 253 return; |
| 253 } | 254 } |
| 254 } | 255 } |
| 255 if (matches) { | 256 if (matches) { |
| 256 try { | 257 try { |
| 257 _transform(error); | 258 if (_transform is ZoneBinaryCallback) { |
| 259 _transform(error, stackTrace); |
| 260 } else { |
| 261 _transform(error); |
| 262 } |
| 258 } catch (e, s) { | 263 } catch (e, s) { |
| 259 sink._addError(_asyncError(e, s)); | 264 if (identical(e, error)) { |
| 265 sink._addError(error, stackTrace); |
| 266 } else { |
| 267 sink._addError(_asyncError(e, s), s); |
| 268 } |
| 260 return; | 269 return; |
| 261 } | 270 } |
| 262 } else { | 271 } else { |
| 263 sink._addError(error); | 272 sink._addError(error, stackTrace); |
| 264 } | 273 } |
| 265 } | 274 } |
| 266 } | 275 } |
| 267 | 276 |
| 268 | 277 |
| 269 class _TakeStream<T> extends _ForwardingStream<T, T> { | 278 class _TakeStream<T> extends _ForwardingStream<T, T> { |
| 270 int _remaining; | 279 int _remaining; |
| 271 | 280 |
| 272 _TakeStream(Stream<T> source, int count) | 281 _TakeStream(Stream<T> source, int count) |
| 273 : this._remaining = count, super(source) { | 282 : this._remaining = count, super(source) { |
| (...skipping 20 matching lines...) Expand all Loading... |
| 294 final _Predicate<T> _test; | 303 final _Predicate<T> _test; |
| 295 | 304 |
| 296 _TakeWhileStream(Stream<T> source, bool test(T value)) | 305 _TakeWhileStream(Stream<T> source, bool test(T value)) |
| 297 : this._test = test, super(source); | 306 : this._test = test, super(source); |
| 298 | 307 |
| 299 void _handleData(T inputEvent, _EventSink<T> sink) { | 308 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 300 bool satisfies; | 309 bool satisfies; |
| 301 try { | 310 try { |
| 302 satisfies = _test(inputEvent); | 311 satisfies = _test(inputEvent); |
| 303 } catch (e, s) { | 312 } catch (e, s) { |
| 304 sink._addError(_asyncError(e, s)); | 313 sink._addError(_asyncError(e, s), s); |
| 305 // The test didn't say true. Didn't say false either, but we stop anyway. | 314 // The test didn't say true. Didn't say false either, but we stop anyway. |
| 306 sink._close(); | 315 sink._close(); |
| 307 return; | 316 return; |
| 308 } | 317 } |
| 309 if (satisfies) { | 318 if (satisfies) { |
| 310 sink._add(inputEvent); | 319 sink._add(inputEvent); |
| 311 } else { | 320 } else { |
| 312 sink._close(); | 321 sink._close(); |
| 313 } | 322 } |
| 314 } | 323 } |
| (...skipping 27 matching lines...) Expand all Loading... |
| 342 | 351 |
| 343 void _handleData(T inputEvent, _EventSink<T> sink) { | 352 void _handleData(T inputEvent, _EventSink<T> sink) { |
| 344 if (_hasFailed) { | 353 if (_hasFailed) { |
| 345 sink._add(inputEvent); | 354 sink._add(inputEvent); |
| 346 return; | 355 return; |
| 347 } | 356 } |
| 348 bool satisfies; | 357 bool satisfies; |
| 349 try { | 358 try { |
| 350 satisfies = _test(inputEvent); | 359 satisfies = _test(inputEvent); |
| 351 } catch (e, s) { | 360 } catch (e, s) { |
| 352 sink._addError(_asyncError(e, s)); | 361 sink._addError(_asyncError(e, s), s); |
| 353 // A failure to return a boolean is considered "not matching". | 362 // A failure to return a boolean is considered "not matching". |
| 354 _hasFailed = true; | 363 _hasFailed = true; |
| 355 return; | 364 return; |
| 356 } | 365 } |
| 357 if (!satisfies) { | 366 if (!satisfies) { |
| 358 _hasFailed = true; | 367 _hasFailed = true; |
| 359 sink._add(inputEvent); | 368 sink._add(inputEvent); |
| 360 } | 369 } |
| 361 } | 370 } |
| 362 } | 371 } |
| (...skipping 15 matching lines...) Expand all Loading... |
| 378 return sink._add(inputEvent); | 387 return sink._add(inputEvent); |
| 379 } else { | 388 } else { |
| 380 bool isEqual; | 389 bool isEqual; |
| 381 try { | 390 try { |
| 382 if (_equals == null) { | 391 if (_equals == null) { |
| 383 isEqual = (_previous == inputEvent); | 392 isEqual = (_previous == inputEvent); |
| 384 } else { | 393 } else { |
| 385 isEqual = _equals(_previous, inputEvent); | 394 isEqual = _equals(_previous, inputEvent); |
| 386 } | 395 } |
| 387 } catch (e, s) { | 396 } catch (e, s) { |
| 388 sink._addError(_asyncError(e, s)); | 397 sink._addError(_asyncError(e, s), s); |
| 389 return null; | 398 return null; |
| 390 } | 399 } |
| 391 if (!isEqual) { | 400 if (!isEqual) { |
| 392 sink._add(inputEvent); | 401 sink._add(inputEvent); |
| 393 _previous = inputEvent; | 402 _previous = inputEvent; |
| 394 } | 403 } |
| 395 } | 404 } |
| 396 } | 405 } |
| 397 } | 406 } |
| 398 | 407 |
| 399 // Stream transformations and event transformations. | 408 // Stream transformations and event transformations. |
| 400 | 409 |
| 401 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); | 410 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
| 402 typedef void _TransformErrorHandler<T>(data, EventSink<T> sink); | 411 typedef void _TransformErrorHandler<T>(Object error, EventSink<T> sink); |
| 403 typedef void _TransformDoneHandler<T>(EventSink<T> sink); | 412 typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
| 404 | 413 |
| 405 /** Default data handler forwards all data. */ | 414 /** Default data handler forwards all data. */ |
| 406 void _defaultHandleData(var data, EventSink sink) { | 415 void _defaultHandleData(var data, EventSink sink) { |
| 407 sink.add(data); | 416 sink.add(data); |
| 408 } | 417 } |
| 409 | 418 |
| 410 /** Default error handler forwards all errors. */ | 419 /** Default error handler forwards all errors. */ |
| 411 void _defaultHandleError(error, EventSink sink) { | 420 void _defaultHandleError(error, EventSink sink) { |
| 412 sink.addError(error); | 421 sink.addError(error); |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 449 | 458 |
| 450 void handleError(error, EventSink<T> sink) { | 459 void handleError(error, EventSink<T> sink) { |
| 451 _handleError(error, sink); | 460 _handleError(error, sink); |
| 452 } | 461 } |
| 453 | 462 |
| 454 void handleDone(EventSink<T> sink) { | 463 void handleDone(EventSink<T> sink) { |
| 455 _handleDone(sink); | 464 _handleDone(sink); |
| 456 } | 465 } |
| 457 } | 466 } |
| 458 | 467 |
| OLD | NEW |