OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** | |
8 * Utility function to attach a stack trace to an [error] if it doesn't have | |
9 * one already. | |
10 */ | |
11 _asyncError(Object error, StackTrace stackTrace) { | |
12 if (stackTrace == null) return error; | |
13 if (getAttachedStackTrace(error) != null) return error; | |
14 _attachStackTrace(error, stackTrace); | |
15 return error; | |
16 } | |
17 | |
18 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
19 _runUserCode(userCode(), | 8 _runUserCode(userCode(), |
20 onSuccess(value), | 9 onSuccess(value), |
21 onError(error, StackTrace stackTrace)) { | 10 onError(error, StackTrace stackTrace)) { |
22 try { | 11 try { |
23 onSuccess(userCode()); | 12 onSuccess(userCode()); |
24 } catch (e, s) { | 13 } catch (e, s) { |
25 onError(_asyncError(e, s), s); | 14 onError(e, s); |
26 } | 15 } |
27 } | 16 } |
28 | 17 |
29 /** Helper function to cancel a subscription and wait for the potential future, | 18 /** Helper function to cancel a subscription and wait for the potential future, |
30 before completing with an error. */ | 19 before completing with an error. */ |
31 void _cancelAndError(StreamSubscription subscription, | 20 void _cancelAndError(StreamSubscription subscription, |
32 _Future future, | 21 _Future future, |
33 error, | 22 error, |
34 StackTrace stackTrace) { | 23 StackTrace stackTrace) { |
35 var cancelFuture = subscription.cancel(); | 24 var cancelFuture = subscription.cancel(); |
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
180 final _Predicate<T> _test; | 169 final _Predicate<T> _test; |
181 | 170 |
182 _WhereStream(Stream<T> source, bool test(T value)) | 171 _WhereStream(Stream<T> source, bool test(T value)) |
183 : _test = test, super(source); | 172 : _test = test, super(source); |
184 | 173 |
185 void _handleData(T inputEvent, _EventSink<T> sink) { | 174 void _handleData(T inputEvent, _EventSink<T> sink) { |
186 bool satisfies; | 175 bool satisfies; |
187 try { | 176 try { |
188 satisfies = _test(inputEvent); | 177 satisfies = _test(inputEvent); |
189 } catch (e, s) { | 178 } catch (e, s) { |
190 sink._addError(_asyncError(e, s), s); | 179 sink._addError(e, s); |
191 return; | 180 return; |
192 } | 181 } |
193 if (satisfies) { | 182 if (satisfies) { |
194 sink._add(inputEvent); | 183 sink._add(inputEvent); |
195 } | 184 } |
196 } | 185 } |
197 } | 186 } |
198 | 187 |
199 | 188 |
200 typedef T _Transformation<S, T>(S value); | 189 typedef T _Transformation<S, T>(S value); |
201 | 190 |
202 /** | 191 /** |
203 * A stream pipe that converts data events before passing them on. | 192 * A stream pipe that converts data events before passing them on. |
204 */ | 193 */ |
205 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 194 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
206 final _Transformation _transform; | 195 final _Transformation _transform; |
207 | 196 |
208 _MapStream(Stream<S> source, T transform(S event)) | 197 _MapStream(Stream<S> source, T transform(S event)) |
209 : this._transform = transform, super(source); | 198 : this._transform = transform, super(source); |
210 | 199 |
211 void _handleData(S inputEvent, _EventSink<T> sink) { | 200 void _handleData(S inputEvent, _EventSink<T> sink) { |
212 T outputEvent; | 201 T outputEvent; |
213 try { | 202 try { |
214 outputEvent = _transform(inputEvent); | 203 outputEvent = _transform(inputEvent); |
215 } catch (e, s) { | 204 } catch (e, s) { |
216 sink._addError(_asyncError(e, s), s); | 205 sink._addError(e, s); |
217 return; | 206 return; |
218 } | 207 } |
219 sink._add(outputEvent); | 208 sink._add(outputEvent); |
220 } | 209 } |
221 } | 210 } |
222 | 211 |
223 /** | 212 /** |
224 * A stream pipe that converts data events before passing them on. | 213 * A stream pipe that converts data events before passing them on. |
225 */ | 214 */ |
226 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 215 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
227 final _Transformation<S, Iterable<T>> _expand; | 216 final _Transformation<S, Iterable<T>> _expand; |
228 | 217 |
229 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 218 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
230 : this._expand = expand, super(source); | 219 : this._expand = expand, super(source); |
231 | 220 |
232 void _handleData(S inputEvent, _EventSink<T> sink) { | 221 void _handleData(S inputEvent, _EventSink<T> sink) { |
233 try { | 222 try { |
234 for (T value in _expand(inputEvent)) { | 223 for (T value in _expand(inputEvent)) { |
235 sink._add(value); | 224 sink._add(value); |
236 } | 225 } |
237 } catch (e, s) { | 226 } catch (e, s) { |
238 // If either _expand or iterating the generated iterator throws, | 227 // If either _expand or iterating the generated iterator throws, |
239 // we abort the iteration. | 228 // we abort the iteration. |
240 sink._addError(_asyncError(e, s), s); | 229 sink._addError(e, s); |
241 } | 230 } |
242 } | 231 } |
243 } | 232 } |
244 | 233 |
245 | 234 |
246 typedef bool _ErrorTest(error); | 235 typedef bool _ErrorTest(error); |
247 | 236 |
248 /** | 237 /** |
249 * A stream pipe that converts or disposes error events | 238 * A stream pipe that converts or disposes error events |
250 * before passing them on. | 239 * before passing them on. |
251 */ | 240 */ |
252 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 241 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
253 final Function _transform; | 242 final Function _transform; |
254 final _ErrorTest _test; | 243 final _ErrorTest _test; |
255 | 244 |
256 _HandleErrorStream(Stream<T> source, | 245 _HandleErrorStream(Stream<T> source, |
257 Function onError, | 246 Function onError, |
258 bool test(error)) | 247 bool test(error)) |
259 : this._transform = onError, this._test = test, super(source); | 248 : this._transform = onError, this._test = test, super(source); |
260 | 249 |
261 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | 250 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
262 bool matches = true; | 251 bool matches = true; |
263 if (_test != null) { | 252 if (_test != null) { |
264 try { | 253 try { |
265 matches = _test(error); | 254 matches = _test(error); |
266 } catch (e, s) { | 255 } catch (e, s) { |
267 sink._addError(_asyncError(e, s), s); | 256 sink._addError(e, s); |
268 return; | 257 return; |
269 } | 258 } |
270 } | 259 } |
271 if (matches) { | 260 if (matches) { |
272 try { | 261 try { |
273 _invokeErrorHandler(_transform, error, stackTrace); | 262 _invokeErrorHandler(_transform, error, stackTrace); |
274 } catch (e, s) { | 263 } catch (e, s) { |
275 if (identical(e, error)) { | 264 if (identical(e, error)) { |
276 sink._addError(error, stackTrace); | 265 sink._addError(error, stackTrace); |
277 } else { | 266 } else { |
278 sink._addError(_asyncError(e, s), s); | 267 sink._addError(e, s); |
279 } | 268 } |
280 return; | 269 return; |
281 } | 270 } |
282 } else { | 271 } else { |
283 sink._addError(error, stackTrace); | 272 sink._addError(error, stackTrace); |
284 } | 273 } |
285 } | 274 } |
286 } | 275 } |
287 | 276 |
288 | 277 |
(...skipping 25 matching lines...) Expand all Loading... |
314 final _Predicate<T> _test; | 303 final _Predicate<T> _test; |
315 | 304 |
316 _TakeWhileStream(Stream<T> source, bool test(T value)) | 305 _TakeWhileStream(Stream<T> source, bool test(T value)) |
317 : this._test = test, super(source); | 306 : this._test = test, super(source); |
318 | 307 |
319 void _handleData(T inputEvent, _EventSink<T> sink) { | 308 void _handleData(T inputEvent, _EventSink<T> sink) { |
320 bool satisfies; | 309 bool satisfies; |
321 try { | 310 try { |
322 satisfies = _test(inputEvent); | 311 satisfies = _test(inputEvent); |
323 } catch (e, s) { | 312 } catch (e, s) { |
324 sink._addError(_asyncError(e, s), s); | 313 sink._addError(e, s); |
325 // The test didn't say true. Didn't say false either, but we stop anyway. | 314 // The test didn't say true. Didn't say false either, but we stop anyway. |
326 sink._close(); | 315 sink._close(); |
327 return; | 316 return; |
328 } | 317 } |
329 if (satisfies) { | 318 if (satisfies) { |
330 sink._add(inputEvent); | 319 sink._add(inputEvent); |
331 } else { | 320 } else { |
332 sink._close(); | 321 sink._close(); |
333 } | 322 } |
334 } | 323 } |
(...skipping 27 matching lines...) Expand all Loading... |
362 | 351 |
363 void _handleData(T inputEvent, _EventSink<T> sink) { | 352 void _handleData(T inputEvent, _EventSink<T> sink) { |
364 if (_hasFailed) { | 353 if (_hasFailed) { |
365 sink._add(inputEvent); | 354 sink._add(inputEvent); |
366 return; | 355 return; |
367 } | 356 } |
368 bool satisfies; | 357 bool satisfies; |
369 try { | 358 try { |
370 satisfies = _test(inputEvent); | 359 satisfies = _test(inputEvent); |
371 } catch (e, s) { | 360 } catch (e, s) { |
372 sink._addError(_asyncError(e, s), s); | 361 sink._addError(e, s); |
373 // A failure to return a boolean is considered "not matching". | 362 // A failure to return a boolean is considered "not matching". |
374 _hasFailed = true; | 363 _hasFailed = true; |
375 return; | 364 return; |
376 } | 365 } |
377 if (!satisfies) { | 366 if (!satisfies) { |
378 _hasFailed = true; | 367 _hasFailed = true; |
379 sink._add(inputEvent); | 368 sink._add(inputEvent); |
380 } | 369 } |
381 } | 370 } |
382 } | 371 } |
(...skipping 15 matching lines...) Expand all Loading... |
398 return sink._add(inputEvent); | 387 return sink._add(inputEvent); |
399 } else { | 388 } else { |
400 bool isEqual; | 389 bool isEqual; |
401 try { | 390 try { |
402 if (_equals == null) { | 391 if (_equals == null) { |
403 isEqual = (_previous == inputEvent); | 392 isEqual = (_previous == inputEvent); |
404 } else { | 393 } else { |
405 isEqual = _equals(_previous, inputEvent); | 394 isEqual = _equals(_previous, inputEvent); |
406 } | 395 } |
407 } catch (e, s) { | 396 } catch (e, s) { |
408 sink._addError(_asyncError(e, s), s); | 397 sink._addError(e, s); |
409 return null; | 398 return null; |
410 } | 399 } |
411 if (!isEqual) { | 400 if (!isEqual) { |
412 sink._add(inputEvent); | 401 sink._add(inputEvent); |
413 _previous = inputEvent; | 402 _previous = inputEvent; |
414 } | 403 } |
415 } | 404 } |
416 } | 405 } |
417 } | 406 } |
OLD | NEW |