| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 42 * * [_close]: Request to close the stream. | 42 * * [_close]: Request to close the stream. |
| 43 * * [_onCancel]: Called when the subscription will provide no more events, | 43 * * [_onCancel]: Called when the subscription will provide no more events, |
| 44 * either due to being actively canceled, or after sending a done event. | 44 * either due to being actively canceled, or after sending a done event. |
| 45 * * [_onPause]: Called when the subscription wants the event source to pause. | 45 * * [_onPause]: Called when the subscription wants the event source to pause. |
| 46 * * [_onResume]: Called when allowing new events after a pause. | 46 * * [_onResume]: Called when allowing new events after a pause. |
| 47 * | 47 * |
| 48 * The user should not add new events when the subscription requests a paused, | 48 * The user should not add new events when the subscription requests a paused, |
| 49 * but if it happens anyway, the subscription will enqueue the events just as | 49 * but if it happens anyway, the subscription will enqueue the events just as |
| 50 * when new events arrive while still firing an old event. | 50 * when new events arrive while still firing an old event. |
| 51 */ | 51 */ |
| 52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 52 class _BufferingStreamSubscription<T> |
| 53 _EventSink<T>, | 53 implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> { |
| 54 _EventDispatch<T> { | |
| 55 /** The `cancelOnError` flag from the `listen` call. */ | 54 /** The `cancelOnError` flag from the `listen` call. */ |
| 56 static const int _STATE_CANCEL_ON_ERROR = 1; | 55 static const int _STATE_CANCEL_ON_ERROR = 1; |
| 57 /** | 56 /** |
| 58 * Whether the "done" event has been received. | 57 * Whether the "done" event has been received. |
| 59 * No further events are accepted after this. | 58 * No further events are accepted after this. |
| 60 */ | 59 */ |
| 61 static const int _STATE_CLOSED = 2; | 60 static const int _STATE_CLOSED = 2; |
| 62 /** | 61 /** |
| 63 * Set if the input has been asked not to send events. | 62 * Set if the input has been asked not to send events. |
| 64 * | 63 * |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 100 /** The future [_onCancel] may return. */ | 99 /** The future [_onCancel] may return. */ |
| 101 Future _cancelFuture; | 100 Future _cancelFuture; |
| 102 | 101 |
| 103 /** | 102 /** |
| 104 * Queue of pending events. | 103 * Queue of pending events. |
| 105 * | 104 * |
| 106 * Is created when necessary, or set in constructor for preconfigured events. | 105 * Is created when necessary, or set in constructor for preconfigured events. |
| 107 */ | 106 */ |
| 108 _PendingEvents<T> _pending; | 107 _PendingEvents<T> _pending; |
| 109 | 108 |
| 110 _BufferingStreamSubscription(void onData(T data), | 109 _BufferingStreamSubscription( |
| 111 Function onError, | 110 void onData(T data), Function onError, void onDone(), bool cancelOnError) |
| 112 void onDone(), | |
| 113 bool cancelOnError) | |
| 114 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 111 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
| 115 this.onData(onData); | 112 this.onData(onData); |
| 116 this.onError(onError); | 113 this.onError(onError); |
| 117 this.onDone(onDone); | 114 this.onDone(onDone); |
| 118 } | 115 } |
| 119 | 116 |
| 120 /** | 117 /** |
| 121 * Sets the subscription's pending events object. | 118 * Sets the subscription's pending events object. |
| 122 * | 119 * |
| 123 * This can only be done once. The pending events object is used for the | 120 * This can only be done once. The pending events object is used for the |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 190 if (!_isCanceled) { | 187 if (!_isCanceled) { |
| 191 _cancel(); | 188 _cancel(); |
| 192 } | 189 } |
| 193 return _cancelFuture ?? Future._nullFuture; | 190 return _cancelFuture ?? Future._nullFuture; |
| 194 } | 191 } |
| 195 | 192 |
| 196 Future<E> asFuture<E>([E futureValue]) { | 193 Future<E> asFuture<E>([E futureValue]) { |
| 197 _Future<E> result = new _Future<E>(); | 194 _Future<E> result = new _Future<E>(); |
| 198 | 195 |
| 199 // Overwrite the onDone and onError handlers. | 196 // Overwrite the onDone and onError handlers. |
| 200 _onDone = () { result._complete(futureValue); }; | 197 _onDone = () { |
| 198 result._complete(futureValue); |
| 199 }; |
| 201 _onError = (error, stackTrace) { | 200 _onError = (error, stackTrace) { |
| 202 Future cancelFuture = cancel(); | 201 Future cancelFuture = cancel(); |
| 203 if (!identical(cancelFuture, Future._nullFuture)) { | 202 if (!identical(cancelFuture, Future._nullFuture)) { |
| 204 cancelFuture.whenComplete(() { | 203 cancelFuture.whenComplete(() { |
| 205 result._completeError(error, stackTrace); | 204 result._completeError(error, stackTrace); |
| 206 }); | 205 }); |
| 207 } else { | 206 } else { |
| 208 result._completeError(error, stackTrace); | 207 result._completeError(error, stackTrace); |
| 209 } | 208 } |
| 210 }; | 209 }; |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 257 if (_canFire) { | 256 if (_canFire) { |
| 258 _sendData(data); | 257 _sendData(data); |
| 259 } else { | 258 } else { |
| 260 _addPending(new _DelayedData<T>(data)); | 259 _addPending(new _DelayedData<T>(data)); |
| 261 } | 260 } |
| 262 } | 261 } |
| 263 | 262 |
| 264 void _addError(Object error, StackTrace stackTrace) { | 263 void _addError(Object error, StackTrace stackTrace) { |
| 265 if (_isCanceled) return; | 264 if (_isCanceled) return; |
| 266 if (_canFire) { | 265 if (_canFire) { |
| 267 _sendError(error, stackTrace); // Reports cancel after sending. | 266 _sendError(error, stackTrace); // Reports cancel after sending. |
| 268 } else { | 267 } else { |
| 269 _addPending(new _DelayedError(error, stackTrace)); | 268 _addPending(new _DelayedError(error, stackTrace)); |
| 270 } | 269 } |
| 271 } | 270 } |
| 272 | 271 |
| 273 void _close() { | 272 void _close() { |
| 274 assert(!_isClosed); | 273 assert(!_isClosed); |
| 275 if (_isCanceled) return; | 274 if (_isCanceled) return; |
| 276 _state |= _STATE_CLOSED; | 275 _state |= _STATE_CLOSED; |
| 277 if (_canFire) { | 276 if (_canFire) { |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 339 assert(!_inCallback); | 338 assert(!_inCallback); |
| 340 bool wasInputPaused = _isInputPaused; | 339 bool wasInputPaused = _isInputPaused; |
| 341 | 340 |
| 342 void sendError() { | 341 void sendError() { |
| 343 // If the subscription has been canceled while waiting for the cancel | 342 // If the subscription has been canceled while waiting for the cancel |
| 344 // future to finish we must not report the error. | 343 // future to finish we must not report the error. |
| 345 if (_isCanceled && !_waitsForCancel) return; | 344 if (_isCanceled && !_waitsForCancel) return; |
| 346 _state |= _STATE_IN_CALLBACK; | 345 _state |= _STATE_IN_CALLBACK; |
| 347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { | 346 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
| 348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError | 347 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError |
| 349 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | 348 as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
| 350 _zone.runBinaryGuarded(errorCallback, error, stackTrace); | 349 _zone.runBinaryGuarded(errorCallback, error, stackTrace); |
| 351 } else { | 350 } else { |
| 352 _zone.runUnaryGuarded<dynamic, dynamic>( | 351 _zone.runUnaryGuarded<dynamic, dynamic>( |
| 353 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); | 352 _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error); |
| 354 } | 353 } |
| 355 _state &= ~_STATE_IN_CALLBACK; | 354 _state &= ~_STATE_IN_CALLBACK; |
| 356 } | 355 } |
| 357 | 356 |
| 358 if (_cancelOnError) { | 357 if (_cancelOnError) { |
| 359 _state |= _STATE_WAIT_FOR_CANCEL; | 358 _state |= _STATE_WAIT_FOR_CANCEL; |
| 360 _cancel(); | 359 _cancel(); |
| 361 if (_cancelFuture is Future && | 360 if (_cancelFuture is Future && |
| 362 !identical(_cancelFuture, Future._nullFuture)) { | 361 !identical(_cancelFuture, Future._nullFuture)) { |
| 363 _cancelFuture.whenComplete(sendError); | 362 _cancelFuture.whenComplete(sendError); |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 454 } | 453 } |
| 455 | 454 |
| 456 // ------------------------------------------------------------------- | 455 // ------------------------------------------------------------------- |
| 457 // Common base class for single and multi-subscription streams. | 456 // Common base class for single and multi-subscription streams. |
| 458 // ------------------------------------------------------------------- | 457 // ------------------------------------------------------------------- |
| 459 abstract class _StreamImpl<T> extends Stream<T> { | 458 abstract class _StreamImpl<T> extends Stream<T> { |
| 460 // ------------------------------------------------------------------ | 459 // ------------------------------------------------------------------ |
| 461 // Stream interface. | 460 // Stream interface. |
| 462 | 461 |
| 463 StreamSubscription<T> listen(void onData(T data), | 462 StreamSubscription<T> listen(void onData(T data), |
| 464 { Function onError, | 463 {Function onError, void onDone(), bool cancelOnError}) { |
| 465 void onDone(), | |
| 466 bool cancelOnError }) { | |
| 467 cancelOnError = identical(true, cancelOnError); | 464 cancelOnError = identical(true, cancelOnError); |
| 468 StreamSubscription<T> subscription = | 465 StreamSubscription<T> subscription = |
| 469 _createSubscription(onData, onError, onDone, cancelOnError); | 466 _createSubscription(onData, onError, onDone, cancelOnError); |
| 470 _onListen(subscription); | 467 _onListen(subscription); |
| 471 return subscription; | 468 return subscription; |
| 472 } | 469 } |
| 473 | 470 |
| 474 // ------------------------------------------------------------------- | 471 // ------------------------------------------------------------------- |
| 475 /** Create a subscription object. Called by [subcribe]. */ | 472 /** Create a subscription object. Called by [subcribe]. */ |
| 476 StreamSubscription<T> _createSubscription( | 473 StreamSubscription<T> _createSubscription(void onData(T data), |
| 477 void onData(T data), | 474 Function onError, void onDone(), bool cancelOnError) { |
| 478 Function onError, | 475 return new _BufferingStreamSubscription<T>( |
| 479 void onDone(), | 476 onData, onError, onDone, cancelOnError); |
| 480 bool cancelOnError) { | |
| 481 return new _BufferingStreamSubscription<T>(onData, onError, onDone, | |
| 482 cancelOnError); | |
| 483 } | 477 } |
| 484 | 478 |
| 485 /** Hook called when the subscription has been created. */ | 479 /** Hook called when the subscription has been created. */ |
| 486 void _onListen(StreamSubscription subscription) {} | 480 void _onListen(StreamSubscription subscription) {} |
| 487 } | 481 } |
| 488 | 482 |
| 489 typedef _PendingEvents<T> _EventGenerator<T>(); | 483 typedef _PendingEvents<T> _EventGenerator<T>(); |
| 490 | 484 |
| 491 /** Stream that generates its own events. */ | 485 /** Stream that generates its own events. */ |
| 492 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 486 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
| 493 final _EventGenerator<T> _pending; | 487 final _EventGenerator<T> _pending; |
| 494 bool _isUsed = false; | 488 bool _isUsed = false; |
| 495 /** | 489 /** |
| 496 * Initializes the stream to have only the events provided by a | 490 * Initializes the stream to have only the events provided by a |
| 497 * [_PendingEvents]. | 491 * [_PendingEvents]. |
| 498 * | 492 * |
| 499 * A new [_PendingEvents] must be generated for each listen. | 493 * A new [_PendingEvents] must be generated for each listen. |
| 500 */ | 494 */ |
| 501 _GeneratedStreamImpl(this._pending); | 495 _GeneratedStreamImpl(this._pending); |
| 502 | 496 |
| 503 StreamSubscription<T> _createSubscription( | 497 StreamSubscription<T> _createSubscription(void onData(T data), |
| 504 void onData(T data), | 498 Function onError, void onDone(), bool cancelOnError) { |
| 505 Function onError, | |
| 506 void onDone(), | |
| 507 bool cancelOnError) { | |
| 508 if (_isUsed) throw new StateError("Stream has already been listened to."); | 499 if (_isUsed) throw new StateError("Stream has already been listened to."); |
| 509 _isUsed = true; | 500 _isUsed = true; |
| 510 return new _BufferingStreamSubscription<T>( | 501 return new _BufferingStreamSubscription<T>( |
| 511 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); | 502 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); |
| 512 } | 503 } |
| 513 } | 504 } |
| 514 | 505 |
| 515 | |
| 516 /** Pending events object that gets its events from an [Iterable]. */ | 506 /** Pending events object that gets its events from an [Iterable]. */ |
| 517 class _IterablePendingEvents<T> extends _PendingEvents<T> { | 507 class _IterablePendingEvents<T> extends _PendingEvents<T> { |
| 518 // The iterator providing data for data events. | 508 // The iterator providing data for data events. |
| 519 // Set to null when iteration has completed. | 509 // Set to null when iteration has completed. |
| 520 Iterator<T> _iterator; | 510 Iterator<T> _iterator; |
| 521 | 511 |
| 522 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | 512 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
| 523 | 513 |
| 524 bool get isEmpty => _iterator == null; | 514 bool get isEmpty => _iterator == null; |
| 525 | 515 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 547 dispatch._sendDone(); | 537 dispatch._sendDone(); |
| 548 } | 538 } |
| 549 } | 539 } |
| 550 | 540 |
| 551 void clear() { | 541 void clear() { |
| 552 if (isScheduled) cancelSchedule(); | 542 if (isScheduled) cancelSchedule(); |
| 553 _iterator = null; | 543 _iterator = null; |
| 554 } | 544 } |
| 555 } | 545 } |
| 556 | 546 |
| 557 | |
| 558 // Internal helpers. | 547 // Internal helpers. |
| 559 | 548 |
| 560 // Types of the different handlers on a stream. Types used to type fields. | 549 // Types of the different handlers on a stream. Types used to type fields. |
| 561 typedef void _DataHandler<T>(T value); | 550 typedef void _DataHandler<T>(T value); |
| 562 typedef void _DoneHandler(); | 551 typedef void _DoneHandler(); |
| 563 | 552 |
| 564 | |
| 565 /** Default data handler, does nothing. */ | 553 /** Default data handler, does nothing. */ |
| 566 void _nullDataHandler(var value) {} | 554 void _nullDataHandler(var value) {} |
| 567 | 555 |
| 568 /** Default error handler, reports the error to the current zone's handler. */ | 556 /** Default error handler, reports the error to the current zone's handler. */ |
| 569 void _nullErrorHandler(error, [StackTrace stackTrace]) { | 557 void _nullErrorHandler(error, [StackTrace stackTrace]) { |
| 570 Zone.current.handleUncaughtError(error, stackTrace); | 558 Zone.current.handleUncaughtError(error, stackTrace); |
| 571 } | 559 } |
| 572 | 560 |
| 573 /** Default done handler, does nothing. */ | 561 /** Default done handler, does nothing. */ |
| 574 void _nullDoneHandler() {} | 562 void _nullDoneHandler() {} |
| 575 | 563 |
| 576 | |
| 577 /** A delayed event on a buffering stream subscription. */ | 564 /** A delayed event on a buffering stream subscription. */ |
| 578 abstract class _DelayedEvent<T> { | 565 abstract class _DelayedEvent<T> { |
| 579 /** Added as a linked list on the [StreamController]. */ | 566 /** Added as a linked list on the [StreamController]. */ |
| 580 _DelayedEvent next; | 567 _DelayedEvent next; |
| 581 /** Execute the delayed event on the [StreamController]. */ | 568 /** Execute the delayed event on the [StreamController]. */ |
| 582 void perform(_EventDispatch<T> dispatch); | 569 void perform(_EventDispatch<T> dispatch); |
| 583 } | 570 } |
| 584 | 571 |
| 585 /** A delayed data event. */ | 572 /** A delayed data event. */ |
| 586 class _DelayedData<T> extends _DelayedEvent<T> { | 573 class _DelayedData<T> extends _DelayedEvent<T> { |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 671 void cancelSchedule() { | 658 void cancelSchedule() { |
| 672 if (isScheduled) _state = _STATE_CANCELED; | 659 if (isScheduled) _state = _STATE_CANCELED; |
| 673 } | 660 } |
| 674 | 661 |
| 675 void handleNext(_EventDispatch<T> dispatch); | 662 void handleNext(_EventDispatch<T> dispatch); |
| 676 | 663 |
| 677 /** Throw away any pending events and cancel scheduled events. */ | 664 /** Throw away any pending events and cancel scheduled events. */ |
| 678 void clear(); | 665 void clear(); |
| 679 } | 666 } |
| 680 | 667 |
| 681 | |
| 682 /** Class holding pending events for a [_StreamImpl]. */ | 668 /** Class holding pending events for a [_StreamImpl]. */ |
| 683 class _StreamImplEvents<T> extends _PendingEvents<T> { | 669 class _StreamImplEvents<T> extends _PendingEvents<T> { |
| 684 /// Single linked list of [_DelayedEvent] objects. | 670 /// Single linked list of [_DelayedEvent] objects. |
| 685 _DelayedEvent firstPendingEvent = null; | 671 _DelayedEvent firstPendingEvent = null; |
| 672 |
| 686 /// Last element in the list of pending events. New events are added after it. | 673 /// Last element in the list of pending events. New events are added after it. |
| 687 _DelayedEvent lastPendingEvent = null; | 674 _DelayedEvent lastPendingEvent = null; |
| 688 | 675 |
| 689 bool get isEmpty => lastPendingEvent == null; | 676 bool get isEmpty => lastPendingEvent == null; |
| 690 | 677 |
| 691 void add(_DelayedEvent event) { | 678 void add(_DelayedEvent event) { |
| 692 if (lastPendingEvent == null) { | 679 if (lastPendingEvent == null) { |
| 693 firstPendingEvent = lastPendingEvent = event; | 680 firstPendingEvent = lastPendingEvent = event; |
| 694 } else { | 681 } else { |
| 695 lastPendingEvent = lastPendingEvent.next = event; | 682 lastPendingEvent = lastPendingEvent.next = event; |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 735 bool get isPaused => _state >= _PAUSED; | 722 bool get isPaused => _state >= _PAUSED; |
| 736 | 723 |
| 737 void _schedule() { | 724 void _schedule() { |
| 738 if (_isScheduled) return; | 725 if (_isScheduled) return; |
| 739 _zone.scheduleMicrotask(_sendDone); | 726 _zone.scheduleMicrotask(_sendDone); |
| 740 _state |= _SCHEDULED; | 727 _state |= _SCHEDULED; |
| 741 } | 728 } |
| 742 | 729 |
| 743 void onData(void handleData(T data)) {} | 730 void onData(void handleData(T data)) {} |
| 744 void onError(Function handleError) {} | 731 void onError(Function handleError) {} |
| 745 void onDone(void handleDone()) { _onDone = handleDone; } | 732 void onDone(void handleDone()) { |
| 733 _onDone = handleDone; |
| 734 } |
| 746 | 735 |
| 747 void pause([Future resumeSignal]) { | 736 void pause([Future resumeSignal]) { |
| 748 _state += _PAUSED; | 737 _state += _PAUSED; |
| 749 if (resumeSignal != null) resumeSignal.whenComplete(resume); | 738 if (resumeSignal != null) resumeSignal.whenComplete(resume); |
| 750 } | 739 } |
| 751 | 740 |
| 752 void resume() { | 741 void resume() { |
| 753 if (isPaused) { | 742 if (isPaused) { |
| 754 _state -= _PAUSED; | 743 _state -= _PAUSED; |
| 755 if (!isPaused && !_isSent) { | 744 if (!isPaused && !_isSent) { |
| 756 _schedule(); | 745 _schedule(); |
| 757 } | 746 } |
| 758 } | 747 } |
| 759 } | 748 } |
| 760 | 749 |
| 761 Future cancel() => Future._nullFuture; | 750 Future cancel() => Future._nullFuture; |
| 762 | 751 |
| 763 Future<E> asFuture<E>([E futureValue]) { | 752 Future<E> asFuture<E>([E futureValue]) { |
| 764 _Future<E> result = new _Future<E>(); | 753 _Future<E> result = new _Future<E>(); |
| 765 _onDone = () { result._completeWithValue(null); }; | 754 _onDone = () { |
| 755 result._completeWithValue(null); |
| 756 }; |
| 766 return result; | 757 return result; |
| 767 } | 758 } |
| 768 | 759 |
| 769 void _sendDone() { | 760 void _sendDone() { |
| 770 _state &= ~_SCHEDULED; | 761 _state &= ~_SCHEDULED; |
| 771 if (isPaused) return; | 762 if (isPaused) return; |
| 772 _state |= _DONE_SENT; | 763 _state |= _DONE_SENT; |
| 773 if (_onDone != null) _zone.runGuarded(_onDone); | 764 if (_onDone != null) _zone.runGuarded(_onDone); |
| 774 } | 765 } |
| 775 } | 766 } |
| 776 | 767 |
| 777 class _AsBroadcastStream<T> extends Stream<T> { | 768 class _AsBroadcastStream<T> extends Stream<T> { |
| 778 final Stream<T> _source; | 769 final Stream<T> _source; |
| 779 final _BroadcastCallback<T> _onListenHandler; | 770 final _BroadcastCallback<T> _onListenHandler; |
| 780 final _BroadcastCallback<T> _onCancelHandler; | 771 final _BroadcastCallback<T> _onCancelHandler; |
| 781 final Zone _zone; | 772 final Zone _zone; |
| 782 | 773 |
| 783 _AsBroadcastStreamController<T> _controller; | 774 _AsBroadcastStreamController<T> _controller; |
| 784 StreamSubscription<T> _subscription; | 775 StreamSubscription<T> _subscription; |
| 785 | 776 |
| 786 _AsBroadcastStream(this._source, | 777 _AsBroadcastStream( |
| 787 void onListenHandler(StreamSubscription<T> subscription), | 778 this._source, |
| 788 void onCancelHandler(StreamSubscription<T> subscription)) | 779 void onListenHandler(StreamSubscription<T> subscription), |
| 780 void onCancelHandler(StreamSubscription<T> subscription)) |
| 789 // TODO(floitsch): the return type should be void and should be | 781 // TODO(floitsch): the return type should be void and should be |
| 790 // inferred. | 782 // inferred. |
| 791 : _onListenHandler = Zone.current.registerUnaryCallback | 783 : _onListenHandler = Zone.current |
| 792 <dynamic, StreamSubscription<T>>(onListenHandler), | 784 .registerUnaryCallback<dynamic, StreamSubscription<T>>( |
| 793 _onCancelHandler = Zone.current.registerUnaryCallback | 785 onListenHandler), |
| 794 <dynamic, StreamSubscription<T>>(onCancelHandler), | 786 _onCancelHandler = Zone.current |
| 787 .registerUnaryCallback<dynamic, StreamSubscription<T>>( |
| 788 onCancelHandler), |
| 795 _zone = Zone.current { | 789 _zone = Zone.current { |
| 796 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 790 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
| 797 } | 791 } |
| 798 | 792 |
| 799 bool get isBroadcast => true; | 793 bool get isBroadcast => true; |
| 800 | 794 |
| 801 StreamSubscription<T> listen(void onData(T data), | 795 StreamSubscription<T> listen(void onData(T data), |
| 802 { Function onError, | 796 {Function onError, void onDone(), bool cancelOnError}) { |
| 803 void onDone(), | |
| 804 bool cancelOnError}) { | |
| 805 if (_controller == null || _controller.isClosed) { | 797 if (_controller == null || _controller.isClosed) { |
| 806 // Return a dummy subscription backed by nothing, since | 798 // Return a dummy subscription backed by nothing, since |
| 807 // it will only ever send one done event. | 799 // it will only ever send one done event. |
| 808 return new _DoneStreamSubscription<T>(onDone); | 800 return new _DoneStreamSubscription<T>(onDone); |
| 809 } | 801 } |
| 810 if (_subscription == null) { | 802 if (_subscription == null) { |
| 811 _subscription = _source.listen(_controller.add, | 803 _subscription = _source.listen(_controller.add, |
| 812 onError: _controller.addError, | 804 onError: _controller.addError, onDone: _controller.close); |
| 813 onDone: _controller.close); | |
| 814 } | 805 } |
| 815 cancelOnError = identical(true, cancelOnError); | 806 cancelOnError = identical(true, cancelOnError); |
| 816 return _controller._subscribe(onData, onError, onDone, cancelOnError); | 807 return _controller._subscribe(onData, onError, onDone, cancelOnError); |
| 817 } | 808 } |
| 818 | 809 |
| 819 void _onCancel() { | 810 void _onCancel() { |
| 820 bool shutdown = (_controller == null) || _controller.isClosed; | 811 bool shutdown = (_controller == null) || _controller.isClosed; |
| 821 if (_onCancelHandler != null) { | 812 if (_onCancelHandler != null) { |
| 822 _zone.runUnary( | 813 _zone.runUnary( |
| 823 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); | 814 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
| (...skipping 12 matching lines...) Expand all Loading... |
| 836 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); | 827 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
| 837 } | 828 } |
| 838 } | 829 } |
| 839 | 830 |
| 840 // Methods called from _BroadcastSubscriptionWrapper. | 831 // Methods called from _BroadcastSubscriptionWrapper. |
| 841 void _cancelSubscription() { | 832 void _cancelSubscription() { |
| 842 if (_subscription == null) return; | 833 if (_subscription == null) return; |
| 843 // Called by [_controller] when it has no subscribers left. | 834 // Called by [_controller] when it has no subscribers left. |
| 844 StreamSubscription subscription = _subscription; | 835 StreamSubscription subscription = _subscription; |
| 845 _subscription = null; | 836 _subscription = null; |
| 846 _controller = null; // Marks the stream as no longer listenable. | 837 _controller = null; // Marks the stream as no longer listenable. |
| 847 subscription.cancel(); | 838 subscription.cancel(); |
| 848 } | 839 } |
| 849 | 840 |
| 850 void _pauseSubscription(Future resumeSignal) { | 841 void _pauseSubscription(Future resumeSignal) { |
| 851 if (_subscription == null) return; | 842 if (_subscription == null) return; |
| 852 _subscription.pause(resumeSignal); | 843 _subscription.pause(resumeSignal); |
| 853 } | 844 } |
| 854 | 845 |
| 855 void _resumeSubscription() { | 846 void _resumeSubscription() { |
| 856 if (_subscription == null) return; | 847 if (_subscription == null) return; |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 902 bool get isPaused { | 893 bool get isPaused { |
| 903 return _stream._isSubscriptionPaused; | 894 return _stream._isSubscriptionPaused; |
| 904 } | 895 } |
| 905 | 896 |
| 906 Future<E> asFuture<E>([E futureValue]) { | 897 Future<E> asFuture<E>([E futureValue]) { |
| 907 throw new UnsupportedError( | 898 throw new UnsupportedError( |
| 908 "Cannot change handlers of asBroadcastStream source subscription."); | 899 "Cannot change handlers of asBroadcastStream source subscription."); |
| 909 } | 900 } |
| 910 } | 901 } |
| 911 | 902 |
| 912 | |
| 913 /** | 903 /** |
| 914 * Simple implementation of [StreamIterator]. | 904 * Simple implementation of [StreamIterator]. |
| 915 * | 905 * |
| 916 * Pauses the stream between calls to [moveNext]. | 906 * Pauses the stream between calls to [moveNext]. |
| 917 */ | 907 */ |
| 918 class _StreamIterator<T> implements StreamIterator<T> { | 908 class _StreamIterator<T> implements StreamIterator<T> { |
| 919 // The stream iterator is always in one of four states. | 909 // The stream iterator is always in one of four states. |
| 920 // The value of the [_stateData] field depends on the state. | 910 // The value of the [_stateData] field depends on the state. |
| 921 // | 911 // |
| 922 // When `_subscription == null` and `_stateData != null`: | 912 // When `_subscription == null` and `_stateData != null`: |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 962 /// Whether the iterator is between calls to `moveNext`. | 952 /// Whether the iterator is between calls to `moveNext`. |
| 963 /// This will usually cause the [_subscription] to be paused, but as an | 953 /// This will usually cause the [_subscription] to be paused, but as an |
| 964 /// optimization, we only pause after the [moveNext] future has been | 954 /// optimization, we only pause after the [moveNext] future has been |
| 965 /// completed. | 955 /// completed. |
| 966 bool _isPaused = false; | 956 bool _isPaused = false; |
| 967 | 957 |
| 968 _StreamIterator(final Stream<T> stream) : _stateData = stream; | 958 _StreamIterator(final Stream<T> stream) : _stateData = stream; |
| 969 | 959 |
| 970 T get current { | 960 T get current { |
| 971 if (_subscription != null && _isPaused) { | 961 if (_subscription != null && _isPaused) { |
| 972 return _stateData as Object /*=T*/; | 962 return _stateData as Object/*=T*/; |
| 973 } | 963 } |
| 974 return null; | 964 return null; |
| 975 } | 965 } |
| 976 | 966 |
| 977 Future<bool> moveNext() { | 967 Future<bool> moveNext() { |
| 978 if (_subscription != null) { | 968 if (_subscription != null) { |
| 979 if (_isPaused) { | 969 if (_isPaused) { |
| 980 var future = new _Future<bool>(); | 970 var future = new _Future<bool>(); |
| 981 _stateData = future; | 971 _stateData = future; |
| 982 _isPaused = false; | 972 _isPaused = false; |
| 983 _subscription.resume(); | 973 _subscription.resume(); |
| 984 return future; | 974 return future; |
| 985 } | 975 } |
| 986 throw new StateError("Already waiting for next."); | 976 throw new StateError("Already waiting for next."); |
| 987 } | 977 } |
| 988 return _initializeOrDone(); | 978 return _initializeOrDone(); |
| 989 } | 979 } |
| 990 | 980 |
| 991 /// Called if there is no active subscription when [moveNext] is called. | 981 /// Called if there is no active subscription when [moveNext] is called. |
| 992 /// | 982 /// |
| 993 /// Either starts listening on the stream if this is the first call to | 983 /// Either starts listening on the stream if this is the first call to |
| 994 /// [moveNext], or returns a `false` future because the stream has already | 984 /// [moveNext], or returns a `false` future because the stream has already |
| 995 /// ended. | 985 /// ended. |
| 996 Future<bool> _initializeOrDone() { | 986 Future<bool> _initializeOrDone() { |
| 997 assert(_subscription == null); | 987 assert(_subscription == null); |
| 998 var stateData = _stateData; | 988 var stateData = _stateData; |
| 999 if (stateData != null) { | 989 if (stateData != null) { |
| 1000 Stream<T> stream = stateData as Object /*=Stream<T>*/; | 990 Stream<T> stream = stateData as Object/*=Stream<T>*/; |
| 1001 _subscription = stream.listen( | 991 _subscription = stream.listen(_onData, |
| 1002 _onData, onError: _onError, onDone: _onDone, cancelOnError: true); | 992 onError: _onError, onDone: _onDone, cancelOnError: true); |
| 1003 var future = new _Future<bool>(); | 993 var future = new _Future<bool>(); |
| 1004 _stateData = future; | 994 _stateData = future; |
| 1005 return future; | 995 return future; |
| 1006 } | 996 } |
| 1007 return new _Future<bool>.immediate(false); | 997 return new _Future<bool>.immediate(false); |
| 1008 } | 998 } |
| 1009 | 999 |
| 1010 Future cancel() { | 1000 Future cancel() { |
| 1011 StreamSubscription<T> subscription = _subscription; | 1001 StreamSubscription<T> subscription = _subscription; |
| 1012 Object stateData = _stateData; | 1002 Object stateData = _stateData; |
| 1013 _stateData = null; | 1003 _stateData = null; |
| 1014 if (subscription != null) { | 1004 if (subscription != null) { |
| 1015 _subscription = null; | 1005 _subscription = null; |
| 1016 if (!_isPaused) { | 1006 if (!_isPaused) { |
| 1017 _Future<bool> future = stateData as Object /*=_Future<bool>*/; | 1007 _Future<bool> future = stateData as Object/*=_Future<bool>*/; |
| 1018 future._asyncComplete(false); | 1008 future._asyncComplete(false); |
| 1019 } | 1009 } |
| 1020 return subscription.cancel(); | 1010 return subscription.cancel(); |
| 1021 } | 1011 } |
| 1022 return Future._nullFuture; | 1012 return Future._nullFuture; |
| 1023 } | 1013 } |
| 1024 | 1014 |
| 1025 void _onData(T data) { | 1015 void _onData(T data) { |
| 1026 assert(_subscription != null && !_isPaused); | 1016 assert(_subscription != null && !_isPaused); |
| 1027 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 1017 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
| 1028 _stateData = data; | 1018 _stateData = data; |
| 1029 _isPaused = true; | 1019 _isPaused = true; |
| 1030 moveNextFuture._complete(true); | 1020 moveNextFuture._complete(true); |
| 1031 if (_subscription != null && _isPaused) _subscription.pause(); | 1021 if (_subscription != null && _isPaused) _subscription.pause(); |
| 1032 } | 1022 } |
| 1033 | 1023 |
| 1034 void _onError(Object error, [StackTrace stackTrace]) { | 1024 void _onError(Object error, [StackTrace stackTrace]) { |
| 1035 assert(_subscription != null && !_isPaused); | 1025 assert(_subscription != null && !_isPaused); |
| 1036 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 1026 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
| 1037 _subscription = null; | 1027 _subscription = null; |
| 1038 _stateData = null; | 1028 _stateData = null; |
| 1039 moveNextFuture._completeError(error, stackTrace); | 1029 moveNextFuture._completeError(error, stackTrace); |
| 1040 } | 1030 } |
| 1041 | 1031 |
| 1042 void _onDone() { | 1032 void _onDone() { |
| 1043 assert(_subscription != null && !_isPaused); | 1033 assert(_subscription != null && !_isPaused); |
| 1044 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 1034 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
| 1045 _subscription = null; | 1035 _subscription = null; |
| 1046 _stateData = null; | 1036 _stateData = null; |
| 1047 moveNextFuture._complete(false); | 1037 moveNextFuture._complete(false); |
| 1048 } | 1038 } |
| 1049 } | 1039 } |
| 1050 | 1040 |
| 1051 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1041 /** An empty broadcast stream, sending a done event as soon as possible. */ |
| 1052 class _EmptyStream<T> extends Stream<T> { | 1042 class _EmptyStream<T> extends Stream<T> { |
| 1053 const _EmptyStream() : super._internal(); | 1043 const _EmptyStream() : super._internal(); |
| 1054 bool get isBroadcast => true; | 1044 bool get isBroadcast => true; |
| 1055 StreamSubscription<T> listen(void onData(T data), | 1045 StreamSubscription<T> listen(void onData(T data), |
| 1056 {Function onError, | 1046 {Function onError, void onDone(), bool cancelOnError}) { |
| 1057 void onDone(), | |
| 1058 bool cancelOnError}) { | |
| 1059 return new _DoneStreamSubscription<T>(onDone); | 1047 return new _DoneStreamSubscription<T>(onDone); |
| 1060 } | 1048 } |
| 1061 } | 1049 } |
| OLD | NEW |