OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
8 _runUserCode(userCode(), | 8 _runUserCode(userCode(), |
9 onSuccess(value), | 9 onSuccess(value), |
10 onError(error, StackTrace stackTrace)) { | 10 onError(error, StackTrace stackTrace)) { |
(...skipping 29 matching lines...) Expand all Loading... |
40 _Future future, | 40 _Future future, |
41 error, StackTrace stackTrace) { | 41 error, StackTrace stackTrace) { |
42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
43 if (replacement != null) { | 43 if (replacement != null) { |
44 error = _nonNullError(replacement.error); | 44 error = _nonNullError(replacement.error); |
45 stackTrace = replacement.stackTrace; | 45 stackTrace = replacement.stackTrace; |
46 } | 46 } |
47 _cancelAndError(subscription, future, error, stackTrace); | 47 _cancelAndError(subscription, future, error, stackTrace); |
48 } | 48 } |
49 | 49 |
| 50 typedef void _ErrorCallback(error, StackTrace stackTrace); |
| 51 |
50 /** Helper function to make an onError argument to [_runUserCode]. */ | 52 /** Helper function to make an onError argument to [_runUserCode]. */ |
51 _cancelAndErrorClosure(StreamSubscription subscription, _Future future) => | 53 _ErrorCallback _cancelAndErrorClosure( |
52 ((error, StackTrace stackTrace) => _cancelAndError( | 54 StreamSubscription subscription, _Future future) { |
53 subscription, future, error, stackTrace)); | 55 return (error, StackTrace stackTrace) { |
| 56 _cancelAndError(subscription, future, error, stackTrace); |
| 57 }; |
| 58 } |
54 | 59 |
55 /** Helper function to cancel a subscription and wait for the potential future, | 60 /** Helper function to cancel a subscription and wait for the potential future, |
56 before completing with a value. */ | 61 before completing with a value. */ |
57 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | 62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { |
58 var cancelFuture = subscription.cancel(); | 63 var cancelFuture = subscription.cancel(); |
59 if (cancelFuture is Future) { | 64 if (cancelFuture is Future) { |
60 cancelFuture.whenComplete(() => future._complete(value)); | 65 cancelFuture.whenComplete(() => future._complete(value)); |
61 } else { | 66 } else { |
62 future._complete(value); | 67 future._complete(value); |
63 } | 68 } |
(...skipping 29 matching lines...) Expand all Loading... |
93 Function onError, | 98 Function onError, |
94 void onDone(), | 99 void onDone(), |
95 bool cancelOnError) { | 100 bool cancelOnError) { |
96 return new _ForwardingStreamSubscription<S, T>( | 101 return new _ForwardingStreamSubscription<S, T>( |
97 this, onData, onError, onDone, cancelOnError); | 102 this, onData, onError, onDone, cancelOnError); |
98 } | 103 } |
99 | 104 |
100 // Override the following methods in subclasses to change the behavior. | 105 // Override the following methods in subclasses to change the behavior. |
101 | 106 |
102 void _handleData(S data, _EventSink<T> sink) { | 107 void _handleData(S data, _EventSink<T> sink) { |
103 dynamic outputData = data; | 108 sink._add(data as Object /*=T*/); |
104 sink._add(outputData); | |
105 } | 109 } |
106 | 110 |
107 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { | 111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
108 sink._addError(error, stackTrace); | 112 sink._addError(error, stackTrace); |
109 } | 113 } |
110 | 114 |
111 void _handleDone(_EventSink<T> sink) { | 115 void _handleDone(_EventSink<T> sink) { |
112 sink._close(); | 116 sink._close(); |
113 } | 117 } |
114 } | 118 } |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
154 | 158 |
155 void _onResume() { | 159 void _onResume() { |
156 if (_subscription == null) return; | 160 if (_subscription == null) return; |
157 _subscription.resume(); | 161 _subscription.resume(); |
158 } | 162 } |
159 | 163 |
160 Future _onCancel() { | 164 Future _onCancel() { |
161 if (_subscription != null) { | 165 if (_subscription != null) { |
162 StreamSubscription subscription = _subscription; | 166 StreamSubscription subscription = _subscription; |
163 _subscription = null; | 167 _subscription = null; |
164 subscription.cancel(); | 168 return subscription.cancel(); |
165 } | 169 } |
166 return null; | 170 return null; |
167 } | 171 } |
168 | 172 |
169 // Methods used as listener on source subscription. | 173 // Methods used as listener on source subscription. |
170 | 174 |
171 void _handleData(S data) { | 175 void _handleData(S data) { |
172 _stream._handleData(data, this); | 176 _stream._handleData(data, this); |
173 } | 177 } |
174 | 178 |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
217 } | 221 } |
218 } | 222 } |
219 | 223 |
220 | 224 |
221 typedef T _Transformation<S, T>(S value); | 225 typedef T _Transformation<S, T>(S value); |
222 | 226 |
223 /** | 227 /** |
224 * A stream pipe that converts data events before passing them on. | 228 * A stream pipe that converts data events before passing them on. |
225 */ | 229 */ |
226 class _MapStream<S, T> extends _ForwardingStream<S, T> { | 230 class _MapStream<S, T> extends _ForwardingStream<S, T> { |
227 final _Transformation _transform; | 231 final _Transformation<S, T> _transform; |
228 | 232 |
229 _MapStream(Stream<S> source, T transform(S event)) | 233 _MapStream(Stream<S> source, T transform(S event)) |
230 : this._transform = transform, super(source); | 234 : this._transform = transform, super(source); |
231 | 235 |
232 void _handleData(S inputEvent, _EventSink<T> sink) { | 236 void _handleData(S inputEvent, _EventSink<T> sink) { |
233 T outputEvent; | 237 T outputEvent; |
234 try { | 238 try { |
235 outputEvent = _transform(inputEvent); | 239 outputEvent = _transform(inputEvent); |
236 } catch (e, s) { | 240 } catch (e, s) { |
237 _addErrorWithReplacement(sink, e, s); | 241 _addErrorWithReplacement(sink, e, s); |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
301 return; | 305 return; |
302 } | 306 } |
303 } else { | 307 } else { |
304 sink._addError(error, stackTrace); | 308 sink._addError(error, stackTrace); |
305 } | 309 } |
306 } | 310 } |
307 } | 311 } |
308 | 312 |
309 | 313 |
310 class _TakeStream<T> extends _ForwardingStream<T, T> { | 314 class _TakeStream<T> extends _ForwardingStream<T, T> { |
311 int _remaining; | 315 final int _count; |
312 | 316 |
313 _TakeStream(Stream<T> source, int count) | 317 _TakeStream(Stream<T> source, int count) |
314 : this._remaining = count, super(source) { | 318 : this._count = count, super(source) { |
315 // This test is done early to avoid handling an async error | 319 // This test is done early to avoid handling an async error |
316 // in the _handleData method. | 320 // in the _handleData method. |
317 if (count is! int) throw new ArgumentError(count); | 321 if (count is! int) throw new ArgumentError(count); |
318 } | 322 } |
319 | 323 |
| 324 StreamSubscription<T> _createSubscription( |
| 325 void onData(T data), |
| 326 Function onError, |
| 327 void onDone(), |
| 328 bool cancelOnError) { |
| 329 return new _StateStreamSubscription<T>( |
| 330 this, onData, onError, onDone, cancelOnError, _count); |
| 331 } |
| 332 |
320 void _handleData(T inputEvent, _EventSink<T> sink) { | 333 void _handleData(T inputEvent, _EventSink<T> sink) { |
321 if (_remaining > 0) { | 334 _StateStreamSubscription<T> subscription = sink; |
| 335 int count = subscription._count; |
| 336 if (count > 0) { |
322 sink._add(inputEvent); | 337 sink._add(inputEvent); |
323 _remaining -= 1; | 338 count -= 1; |
324 if (_remaining == 0) { | 339 subscription._count = count; |
| 340 if (count == 0) { |
325 // Closing also unsubscribes all subscribers, which unsubscribes | 341 // Closing also unsubscribes all subscribers, which unsubscribes |
326 // this from source. | 342 // this from source. |
327 sink._close(); | 343 sink._close(); |
328 } | 344 } |
329 } | 345 } |
330 } | 346 } |
331 } | 347 } |
332 | 348 |
| 349 /** |
| 350 * A [_ForwardingStreamSubscription] with one extra state field. |
| 351 * |
| 352 * Use by several different classes, some storing an integer, others a bool. |
| 353 */ |
| 354 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
| 355 // Raw state field. Typed access provided by getters and setters below. |
| 356 var _sharedState; |
| 357 |
| 358 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
| 359 Function onError, void onDone(), |
| 360 bool cancelOnError, this._sharedState) |
| 361 : super(stream, onData, onError, onDone, cancelOnError); |
| 362 |
| 363 bool get _flag => _sharedState; |
| 364 void set _flag(bool flag) { _sharedState = flag; } |
| 365 int get _count => _sharedState; |
| 366 void set _count(int count) { _sharedState = count; } |
| 367 } |
| 368 |
333 | 369 |
334 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 370 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
335 final _Predicate<T> _test; | 371 final _Predicate<T> _test; |
336 | 372 |
337 _TakeWhileStream(Stream<T> source, bool test(T value)) | 373 _TakeWhileStream(Stream<T> source, bool test(T value)) |
338 : this._test = test, super(source); | 374 : this._test = test, super(source); |
339 | 375 |
340 void _handleData(T inputEvent, _EventSink<T> sink) { | 376 void _handleData(T inputEvent, _EventSink<T> sink) { |
341 bool satisfies; | 377 bool satisfies; |
342 try { | 378 try { |
343 satisfies = _test(inputEvent); | 379 satisfies = _test(inputEvent); |
344 } catch (e, s) { | 380 } catch (e, s) { |
345 _addErrorWithReplacement(sink, e, s); | 381 _addErrorWithReplacement(sink, e, s); |
346 // The test didn't say true. Didn't say false either, but we stop anyway. | 382 // The test didn't say true. Didn't say false either, but we stop anyway. |
347 sink._close(); | 383 sink._close(); |
348 return; | 384 return; |
349 } | 385 } |
350 if (satisfies) { | 386 if (satisfies) { |
351 sink._add(inputEvent); | 387 sink._add(inputEvent); |
352 } else { | 388 } else { |
353 sink._close(); | 389 sink._close(); |
354 } | 390 } |
355 } | 391 } |
356 } | 392 } |
357 | 393 |
358 class _SkipStream<T> extends _ForwardingStream<T, T> { | 394 class _SkipStream<T> extends _ForwardingStream<T, T> { |
359 int _remaining; | 395 final int _count; |
360 | 396 |
361 _SkipStream(Stream<T> source, int count) | 397 _SkipStream(Stream<T> source, int count) |
362 : this._remaining = count, super(source) { | 398 : this._count = count, super(source) { |
363 // This test is done early to avoid handling an async error | 399 // This test is done early to avoid handling an async error |
364 // in the _handleData method. | 400 // in the _handleData method. |
365 if (count is! int || count < 0) throw new ArgumentError(count); | 401 if (count is! int || count < 0) throw new ArgumentError(count); |
366 } | 402 } |
367 | 403 |
| 404 StreamSubscription<T> _createSubscription( |
| 405 void onData(T data), |
| 406 Function onError, |
| 407 void onDone(), |
| 408 bool cancelOnError) { |
| 409 return new _StateStreamSubscription<T>( |
| 410 this, onData, onError, onDone, cancelOnError, _count); |
| 411 } |
| 412 |
368 void _handleData(T inputEvent, _EventSink<T> sink) { | 413 void _handleData(T inputEvent, _EventSink<T> sink) { |
369 if (_remaining > 0) { | 414 _StateStreamSubscription<T> subscription = sink; |
370 _remaining--; | 415 int count = subscription._count; |
| 416 if (count > 0) { |
| 417 subscription._count = count - 1; |
371 return; | 418 return; |
372 } | 419 } |
373 sink._add(inputEvent); | 420 sink._add(inputEvent); |
374 } | 421 } |
375 } | 422 } |
376 | 423 |
377 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 424 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
378 final _Predicate<T> _test; | 425 final _Predicate<T> _test; |
379 bool _hasFailed = false; | |
380 | 426 |
381 _SkipWhileStream(Stream<T> source, bool test(T value)) | 427 _SkipWhileStream(Stream<T> source, bool test(T value)) |
382 : this._test = test, super(source); | 428 : this._test = test, super(source); |
383 | 429 |
| 430 StreamSubscription<T> _createSubscription( |
| 431 void onData(T data), |
| 432 Function onError, |
| 433 void onDone(), |
| 434 bool cancelOnError) { |
| 435 return new _StateStreamSubscription<T>( |
| 436 this, onData, onError, onDone, cancelOnError, false); |
| 437 } |
| 438 |
384 void _handleData(T inputEvent, _EventSink<T> sink) { | 439 void _handleData(T inputEvent, _EventSink<T> sink) { |
385 if (_hasFailed) { | 440 _StateStreamSubscription<T> subscription = sink; |
| 441 bool hasFailed = subscription._flag; |
| 442 if (hasFailed) { |
386 sink._add(inputEvent); | 443 sink._add(inputEvent); |
387 return; | 444 return; |
388 } | 445 } |
389 bool satisfies; | 446 bool satisfies; |
390 try { | 447 try { |
391 satisfies = _test(inputEvent); | 448 satisfies = _test(inputEvent); |
392 } catch (e, s) { | 449 } catch (e, s) { |
393 _addErrorWithReplacement(sink, e, s); | 450 _addErrorWithReplacement(sink, e, s); |
394 // A failure to return a boolean is considered "not matching". | 451 // A failure to return a boolean is considered "not matching". |
395 _hasFailed = true; | 452 subscription._flag = true; |
396 return; | 453 return; |
397 } | 454 } |
398 if (!satisfies) { | 455 if (!satisfies) { |
399 _hasFailed = true; | 456 subscription._flag = true; |
400 sink._add(inputEvent); | 457 sink._add(inputEvent); |
401 } | 458 } |
402 } | 459 } |
403 } | 460 } |
404 | 461 |
405 typedef bool _Equality<T>(T a, T b); | 462 typedef bool _Equality<T>(T a, T b); |
406 | 463 |
407 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 464 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
408 static var _SENTINEL = new Object(); | 465 static var _SENTINEL = new Object(); |
409 | 466 |
410 _Equality<T> _equals; | 467 _Equality<T> _equals; |
411 var _previous = _SENTINEL; | 468 var _previous = _SENTINEL; |
412 | 469 |
413 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 470 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
414 : _equals = equals, super(source); | 471 : _equals = equals, super(source); |
415 | 472 |
416 void _handleData(T inputEvent, _EventSink<T> sink) { | 473 void _handleData(T inputEvent, _EventSink<T> sink) { |
417 if (identical(_previous, _SENTINEL)) { | 474 if (identical(_previous, _SENTINEL)) { |
418 _previous = inputEvent; | 475 _previous = inputEvent; |
419 return sink._add(inputEvent); | 476 return sink._add(inputEvent); |
420 } else { | 477 } else { |
421 bool isEqual; | 478 bool isEqual; |
422 try { | 479 try { |
423 if (_equals == null) { | 480 if (_equals == null) { |
424 isEqual = (_previous == inputEvent); | 481 isEqual = (_previous == inputEvent); |
425 } else { | 482 } else { |
426 isEqual = _equals(_previous, inputEvent); | 483 isEqual = _equals(_previous as Object /*=T*/, inputEvent); |
427 } | 484 } |
428 } catch (e, s) { | 485 } catch (e, s) { |
429 _addErrorWithReplacement(sink, e, s); | 486 _addErrorWithReplacement(sink, e, s); |
430 return null; | 487 return null; |
431 } | 488 } |
432 if (!isEqual) { | 489 if (!isEqual) { |
433 sink._add(inputEvent); | 490 sink._add(inputEvent); |
434 _previous = inputEvent; | 491 _previous = inputEvent; |
435 } | 492 } |
436 } | 493 } |
437 } | 494 } |
438 } | 495 } |
OLD | NEW |