OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
42 * * [_close]: Request to close the stream. | 42 * * [_close]: Request to close the stream. |
43 * * [_onCancel]: Called when the subscription will provide no more events, | 43 * * [_onCancel]: Called when the subscription will provide no more events, |
44 * either due to being actively canceled, or after sending a done event. | 44 * either due to being actively canceled, or after sending a done event. |
45 * * [_onPause]: Called when the subscription wants the event source to pause. | 45 * * [_onPause]: Called when the subscription wants the event source to pause. |
46 * * [_onResume]: Called when allowing new events after a pause. | 46 * * [_onResume]: Called when allowing new events after a pause. |
47 * | 47 * |
48 * The user should not add new events when the subscription requests a paused, | 48 * The user should not add new events when the subscription requests a paused, |
49 * but if it happens anyway, the subscription will enqueue the events just as | 49 * but if it happens anyway, the subscription will enqueue the events just as |
50 * when new events arrive while still firing an old event. | 50 * when new events arrive while still firing an old event. |
51 */ | 51 */ |
52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 52 class _BufferingStreamSubscription<T> |
53 _EventSink<T>, | 53 implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> { |
54 _EventDispatch<T> { | |
55 /** The `cancelOnError` flag from the `listen` call. */ | 54 /** The `cancelOnError` flag from the `listen` call. */ |
56 static const int _STATE_CANCEL_ON_ERROR = 1; | 55 static const int _STATE_CANCEL_ON_ERROR = 1; |
57 /** | 56 /** |
58 * Whether the "done" event has been received. | 57 * Whether the "done" event has been received. |
59 * No further events are accepted after this. | 58 * No further events are accepted after this. |
60 */ | 59 */ |
61 static const int _STATE_CLOSED = 2; | 60 static const int _STATE_CLOSED = 2; |
62 /** | 61 /** |
63 * Set if the input has been asked not to send events. | 62 * Set if the input has been asked not to send events. |
64 * | 63 * |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
100 /** The future [_onCancel] may return. */ | 99 /** The future [_onCancel] may return. */ |
101 Future _cancelFuture; | 100 Future _cancelFuture; |
102 | 101 |
103 /** | 102 /** |
104 * Queue of pending events. | 103 * Queue of pending events. |
105 * | 104 * |
106 * Is created when necessary, or set in constructor for preconfigured events. | 105 * Is created when necessary, or set in constructor for preconfigured events. |
107 */ | 106 */ |
108 _PendingEvents<T> _pending; | 107 _PendingEvents<T> _pending; |
109 | 108 |
110 _BufferingStreamSubscription(void onData(T data), | 109 _BufferingStreamSubscription( |
111 Function onError, | 110 void onData(T data), Function onError, void onDone(), bool cancelOnError) |
112 void onDone(), | |
113 bool cancelOnError) | |
114 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 111 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
115 this.onData(onData); | 112 this.onData(onData); |
116 this.onError(onError); | 113 this.onError(onError); |
117 this.onDone(onDone); | 114 this.onDone(onDone); |
118 } | 115 } |
119 | 116 |
120 /** | 117 /** |
121 * Sets the subscription's pending events object. | 118 * Sets the subscription's pending events object. |
122 * | 119 * |
123 * This can only be done once. The pending events object is used for the | 120 * This can only be done once. The pending events object is used for the |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
190 if (!_isCanceled) { | 187 if (!_isCanceled) { |
191 _cancel(); | 188 _cancel(); |
192 } | 189 } |
193 return _cancelFuture ?? Future._nullFuture; | 190 return _cancelFuture ?? Future._nullFuture; |
194 } | 191 } |
195 | 192 |
196 Future<E> asFuture<E>([E futureValue]) { | 193 Future<E> asFuture<E>([E futureValue]) { |
197 _Future<E> result = new _Future<E>(); | 194 _Future<E> result = new _Future<E>(); |
198 | 195 |
199 // Overwrite the onDone and onError handlers. | 196 // Overwrite the onDone and onError handlers. |
200 _onDone = () { result._complete(futureValue); }; | 197 _onDone = () { |
| 198 result._complete(futureValue); |
| 199 }; |
201 _onError = (error, stackTrace) { | 200 _onError = (error, stackTrace) { |
202 Future cancelFuture = cancel(); | 201 Future cancelFuture = cancel(); |
203 if (!identical(cancelFuture, Future._nullFuture)) { | 202 if (!identical(cancelFuture, Future._nullFuture)) { |
204 cancelFuture.whenComplete(() { | 203 cancelFuture.whenComplete(() { |
205 result._completeError(error, stackTrace); | 204 result._completeError(error, stackTrace); |
206 }); | 205 }); |
207 } else { | 206 } else { |
208 result._completeError(error, stackTrace); | 207 result._completeError(error, stackTrace); |
209 } | 208 } |
210 }; | 209 }; |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
257 if (_canFire) { | 256 if (_canFire) { |
258 _sendData(data); | 257 _sendData(data); |
259 } else { | 258 } else { |
260 _addPending(new _DelayedData<T>(data)); | 259 _addPending(new _DelayedData<T>(data)); |
261 } | 260 } |
262 } | 261 } |
263 | 262 |
264 void _addError(Object error, StackTrace stackTrace) { | 263 void _addError(Object error, StackTrace stackTrace) { |
265 if (_isCanceled) return; | 264 if (_isCanceled) return; |
266 if (_canFire) { | 265 if (_canFire) { |
267 _sendError(error, stackTrace); // Reports cancel after sending. | 266 _sendError(error, stackTrace); // Reports cancel after sending. |
268 } else { | 267 } else { |
269 _addPending(new _DelayedError(error, stackTrace)); | 268 _addPending(new _DelayedError(error, stackTrace)); |
270 } | 269 } |
271 } | 270 } |
272 | 271 |
273 void _close() { | 272 void _close() { |
274 assert(!_isClosed); | 273 assert(!_isClosed); |
275 if (_isCanceled) return; | 274 if (_isCanceled) return; |
276 _state |= _STATE_CLOSED; | 275 _state |= _STATE_CLOSED; |
277 if (_canFire) { | 276 if (_canFire) { |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
339 assert(!_inCallback); | 338 assert(!_inCallback); |
340 bool wasInputPaused = _isInputPaused; | 339 bool wasInputPaused = _isInputPaused; |
341 | 340 |
342 void sendError() { | 341 void sendError() { |
343 // If the subscription has been canceled while waiting for the cancel | 342 // If the subscription has been canceled while waiting for the cancel |
344 // future to finish we must not report the error. | 343 // future to finish we must not report the error. |
345 if (_isCanceled && !_waitsForCancel) return; | 344 if (_isCanceled && !_waitsForCancel) return; |
346 _state |= _STATE_IN_CALLBACK; | 345 _state |= _STATE_IN_CALLBACK; |
347 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { | 346 if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
348 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError | 347 ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError |
349 as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; | 348 as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
350 _zone.runBinaryGuarded(errorCallback, error, stackTrace); | 349 _zone.runBinaryGuarded(errorCallback, error, stackTrace); |
351 } else { | 350 } else { |
352 _zone.runUnaryGuarded<dynamic, dynamic>( | 351 _zone.runUnaryGuarded<dynamic, dynamic>( |
353 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); | 352 _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error); |
354 } | 353 } |
355 _state &= ~_STATE_IN_CALLBACK; | 354 _state &= ~_STATE_IN_CALLBACK; |
356 } | 355 } |
357 | 356 |
358 if (_cancelOnError) { | 357 if (_cancelOnError) { |
359 _state |= _STATE_WAIT_FOR_CANCEL; | 358 _state |= _STATE_WAIT_FOR_CANCEL; |
360 _cancel(); | 359 _cancel(); |
361 if (_cancelFuture is Future && | 360 if (_cancelFuture is Future && |
362 !identical(_cancelFuture, Future._nullFuture)) { | 361 !identical(_cancelFuture, Future._nullFuture)) { |
363 _cancelFuture.whenComplete(sendError); | 362 _cancelFuture.whenComplete(sendError); |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
454 } | 453 } |
455 | 454 |
456 // ------------------------------------------------------------------- | 455 // ------------------------------------------------------------------- |
457 // Common base class for single and multi-subscription streams. | 456 // Common base class for single and multi-subscription streams. |
458 // ------------------------------------------------------------------- | 457 // ------------------------------------------------------------------- |
459 abstract class _StreamImpl<T> extends Stream<T> { | 458 abstract class _StreamImpl<T> extends Stream<T> { |
460 // ------------------------------------------------------------------ | 459 // ------------------------------------------------------------------ |
461 // Stream interface. | 460 // Stream interface. |
462 | 461 |
463 StreamSubscription<T> listen(void onData(T data), | 462 StreamSubscription<T> listen(void onData(T data), |
464 { Function onError, | 463 {Function onError, void onDone(), bool cancelOnError}) { |
465 void onDone(), | |
466 bool cancelOnError }) { | |
467 cancelOnError = identical(true, cancelOnError); | 464 cancelOnError = identical(true, cancelOnError); |
468 StreamSubscription<T> subscription = | 465 StreamSubscription<T> subscription = |
469 _createSubscription(onData, onError, onDone, cancelOnError); | 466 _createSubscription(onData, onError, onDone, cancelOnError); |
470 _onListen(subscription); | 467 _onListen(subscription); |
471 return subscription; | 468 return subscription; |
472 } | 469 } |
473 | 470 |
474 // ------------------------------------------------------------------- | 471 // ------------------------------------------------------------------- |
475 /** Create a subscription object. Called by [subcribe]. */ | 472 /** Create a subscription object. Called by [subcribe]. */ |
476 StreamSubscription<T> _createSubscription( | 473 StreamSubscription<T> _createSubscription(void onData(T data), |
477 void onData(T data), | 474 Function onError, void onDone(), bool cancelOnError) { |
478 Function onError, | 475 return new _BufferingStreamSubscription<T>( |
479 void onDone(), | 476 onData, onError, onDone, cancelOnError); |
480 bool cancelOnError) { | |
481 return new _BufferingStreamSubscription<T>(onData, onError, onDone, | |
482 cancelOnError); | |
483 } | 477 } |
484 | 478 |
485 /** Hook called when the subscription has been created. */ | 479 /** Hook called when the subscription has been created. */ |
486 void _onListen(StreamSubscription subscription) {} | 480 void _onListen(StreamSubscription subscription) {} |
487 } | 481 } |
488 | 482 |
489 typedef _PendingEvents<T> _EventGenerator<T>(); | 483 typedef _PendingEvents<T> _EventGenerator<T>(); |
490 | 484 |
491 /** Stream that generates its own events. */ | 485 /** Stream that generates its own events. */ |
492 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 486 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
493 final _EventGenerator<T> _pending; | 487 final _EventGenerator<T> _pending; |
494 bool _isUsed = false; | 488 bool _isUsed = false; |
495 /** | 489 /** |
496 * Initializes the stream to have only the events provided by a | 490 * Initializes the stream to have only the events provided by a |
497 * [_PendingEvents]. | 491 * [_PendingEvents]. |
498 * | 492 * |
499 * A new [_PendingEvents] must be generated for each listen. | 493 * A new [_PendingEvents] must be generated for each listen. |
500 */ | 494 */ |
501 _GeneratedStreamImpl(this._pending); | 495 _GeneratedStreamImpl(this._pending); |
502 | 496 |
503 StreamSubscription<T> _createSubscription( | 497 StreamSubscription<T> _createSubscription(void onData(T data), |
504 void onData(T data), | 498 Function onError, void onDone(), bool cancelOnError) { |
505 Function onError, | |
506 void onDone(), | |
507 bool cancelOnError) { | |
508 if (_isUsed) throw new StateError("Stream has already been listened to."); | 499 if (_isUsed) throw new StateError("Stream has already been listened to."); |
509 _isUsed = true; | 500 _isUsed = true; |
510 return new _BufferingStreamSubscription<T>( | 501 return new _BufferingStreamSubscription<T>( |
511 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); | 502 onData, onError, onDone, cancelOnError).._setPendingEvents(_pending()); |
512 } | 503 } |
513 } | 504 } |
514 | 505 |
515 | |
516 /** Pending events object that gets its events from an [Iterable]. */ | 506 /** Pending events object that gets its events from an [Iterable]. */ |
517 class _IterablePendingEvents<T> extends _PendingEvents<T> { | 507 class _IterablePendingEvents<T> extends _PendingEvents<T> { |
518 // The iterator providing data for data events. | 508 // The iterator providing data for data events. |
519 // Set to null when iteration has completed. | 509 // Set to null when iteration has completed. |
520 Iterator<T> _iterator; | 510 Iterator<T> _iterator; |
521 | 511 |
522 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | 512 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
523 | 513 |
524 bool get isEmpty => _iterator == null; | 514 bool get isEmpty => _iterator == null; |
525 | 515 |
(...skipping 21 matching lines...) Expand all Loading... |
547 dispatch._sendDone(); | 537 dispatch._sendDone(); |
548 } | 538 } |
549 } | 539 } |
550 | 540 |
551 void clear() { | 541 void clear() { |
552 if (isScheduled) cancelSchedule(); | 542 if (isScheduled) cancelSchedule(); |
553 _iterator = null; | 543 _iterator = null; |
554 } | 544 } |
555 } | 545 } |
556 | 546 |
557 | |
558 // Internal helpers. | 547 // Internal helpers. |
559 | 548 |
560 // Types of the different handlers on a stream. Types used to type fields. | 549 // Types of the different handlers on a stream. Types used to type fields. |
561 typedef void _DataHandler<T>(T value); | 550 typedef void _DataHandler<T>(T value); |
562 typedef void _DoneHandler(); | 551 typedef void _DoneHandler(); |
563 | 552 |
564 | |
565 /** Default data handler, does nothing. */ | 553 /** Default data handler, does nothing. */ |
566 void _nullDataHandler(var value) {} | 554 void _nullDataHandler(var value) {} |
567 | 555 |
568 /** Default error handler, reports the error to the current zone's handler. */ | 556 /** Default error handler, reports the error to the current zone's handler. */ |
569 void _nullErrorHandler(error, [StackTrace stackTrace]) { | 557 void _nullErrorHandler(error, [StackTrace stackTrace]) { |
570 Zone.current.handleUncaughtError(error, stackTrace); | 558 Zone.current.handleUncaughtError(error, stackTrace); |
571 } | 559 } |
572 | 560 |
573 /** Default done handler, does nothing. */ | 561 /** Default done handler, does nothing. */ |
574 void _nullDoneHandler() {} | 562 void _nullDoneHandler() {} |
575 | 563 |
576 | |
577 /** A delayed event on a buffering stream subscription. */ | 564 /** A delayed event on a buffering stream subscription. */ |
578 abstract class _DelayedEvent<T> { | 565 abstract class _DelayedEvent<T> { |
579 /** Added as a linked list on the [StreamController]. */ | 566 /** Added as a linked list on the [StreamController]. */ |
580 _DelayedEvent next; | 567 _DelayedEvent next; |
581 /** Execute the delayed event on the [StreamController]. */ | 568 /** Execute the delayed event on the [StreamController]. */ |
582 void perform(_EventDispatch<T> dispatch); | 569 void perform(_EventDispatch<T> dispatch); |
583 } | 570 } |
584 | 571 |
585 /** A delayed data event. */ | 572 /** A delayed data event. */ |
586 class _DelayedData<T> extends _DelayedEvent<T> { | 573 class _DelayedData<T> extends _DelayedEvent<T> { |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
671 void cancelSchedule() { | 658 void cancelSchedule() { |
672 if (isScheduled) _state = _STATE_CANCELED; | 659 if (isScheduled) _state = _STATE_CANCELED; |
673 } | 660 } |
674 | 661 |
675 void handleNext(_EventDispatch<T> dispatch); | 662 void handleNext(_EventDispatch<T> dispatch); |
676 | 663 |
677 /** Throw away any pending events and cancel scheduled events. */ | 664 /** Throw away any pending events and cancel scheduled events. */ |
678 void clear(); | 665 void clear(); |
679 } | 666 } |
680 | 667 |
681 | |
682 /** Class holding pending events for a [_StreamImpl]. */ | 668 /** Class holding pending events for a [_StreamImpl]. */ |
683 class _StreamImplEvents<T> extends _PendingEvents<T> { | 669 class _StreamImplEvents<T> extends _PendingEvents<T> { |
684 /// Single linked list of [_DelayedEvent] objects. | 670 /// Single linked list of [_DelayedEvent] objects. |
685 _DelayedEvent firstPendingEvent = null; | 671 _DelayedEvent firstPendingEvent = null; |
| 672 |
686 /// Last element in the list of pending events. New events are added after it. | 673 /// Last element in the list of pending events. New events are added after it. |
687 _DelayedEvent lastPendingEvent = null; | 674 _DelayedEvent lastPendingEvent = null; |
688 | 675 |
689 bool get isEmpty => lastPendingEvent == null; | 676 bool get isEmpty => lastPendingEvent == null; |
690 | 677 |
691 void add(_DelayedEvent event) { | 678 void add(_DelayedEvent event) { |
692 if (lastPendingEvent == null) { | 679 if (lastPendingEvent == null) { |
693 firstPendingEvent = lastPendingEvent = event; | 680 firstPendingEvent = lastPendingEvent = event; |
694 } else { | 681 } else { |
695 lastPendingEvent = lastPendingEvent.next = event; | 682 lastPendingEvent = lastPendingEvent.next = event; |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
735 bool get isPaused => _state >= _PAUSED; | 722 bool get isPaused => _state >= _PAUSED; |
736 | 723 |
737 void _schedule() { | 724 void _schedule() { |
738 if (_isScheduled) return; | 725 if (_isScheduled) return; |
739 _zone.scheduleMicrotask(_sendDone); | 726 _zone.scheduleMicrotask(_sendDone); |
740 _state |= _SCHEDULED; | 727 _state |= _SCHEDULED; |
741 } | 728 } |
742 | 729 |
743 void onData(void handleData(T data)) {} | 730 void onData(void handleData(T data)) {} |
744 void onError(Function handleError) {} | 731 void onError(Function handleError) {} |
745 void onDone(void handleDone()) { _onDone = handleDone; } | 732 void onDone(void handleDone()) { |
| 733 _onDone = handleDone; |
| 734 } |
746 | 735 |
747 void pause([Future resumeSignal]) { | 736 void pause([Future resumeSignal]) { |
748 _state += _PAUSED; | 737 _state += _PAUSED; |
749 if (resumeSignal != null) resumeSignal.whenComplete(resume); | 738 if (resumeSignal != null) resumeSignal.whenComplete(resume); |
750 } | 739 } |
751 | 740 |
752 void resume() { | 741 void resume() { |
753 if (isPaused) { | 742 if (isPaused) { |
754 _state -= _PAUSED; | 743 _state -= _PAUSED; |
755 if (!isPaused && !_isSent) { | 744 if (!isPaused && !_isSent) { |
756 _schedule(); | 745 _schedule(); |
757 } | 746 } |
758 } | 747 } |
759 } | 748 } |
760 | 749 |
761 Future cancel() => Future._nullFuture; | 750 Future cancel() => Future._nullFuture; |
762 | 751 |
763 Future<E> asFuture<E>([E futureValue]) { | 752 Future<E> asFuture<E>([E futureValue]) { |
764 _Future<E> result = new _Future<E>(); | 753 _Future<E> result = new _Future<E>(); |
765 _onDone = () { result._completeWithValue(null); }; | 754 _onDone = () { |
| 755 result._completeWithValue(null); |
| 756 }; |
766 return result; | 757 return result; |
767 } | 758 } |
768 | 759 |
769 void _sendDone() { | 760 void _sendDone() { |
770 _state &= ~_SCHEDULED; | 761 _state &= ~_SCHEDULED; |
771 if (isPaused) return; | 762 if (isPaused) return; |
772 _state |= _DONE_SENT; | 763 _state |= _DONE_SENT; |
773 if (_onDone != null) _zone.runGuarded(_onDone); | 764 if (_onDone != null) _zone.runGuarded(_onDone); |
774 } | 765 } |
775 } | 766 } |
776 | 767 |
777 class _AsBroadcastStream<T> extends Stream<T> { | 768 class _AsBroadcastStream<T> extends Stream<T> { |
778 final Stream<T> _source; | 769 final Stream<T> _source; |
779 final _BroadcastCallback<T> _onListenHandler; | 770 final _BroadcastCallback<T> _onListenHandler; |
780 final _BroadcastCallback<T> _onCancelHandler; | 771 final _BroadcastCallback<T> _onCancelHandler; |
781 final Zone _zone; | 772 final Zone _zone; |
782 | 773 |
783 _AsBroadcastStreamController<T> _controller; | 774 _AsBroadcastStreamController<T> _controller; |
784 StreamSubscription<T> _subscription; | 775 StreamSubscription<T> _subscription; |
785 | 776 |
786 _AsBroadcastStream(this._source, | 777 _AsBroadcastStream( |
787 void onListenHandler(StreamSubscription<T> subscription), | 778 this._source, |
788 void onCancelHandler(StreamSubscription<T> subscription)) | 779 void onListenHandler(StreamSubscription<T> subscription), |
| 780 void onCancelHandler(StreamSubscription<T> subscription)) |
789 // TODO(floitsch): the return type should be void and should be | 781 // TODO(floitsch): the return type should be void and should be |
790 // inferred. | 782 // inferred. |
791 : _onListenHandler = Zone.current.registerUnaryCallback | 783 : _onListenHandler = Zone.current |
792 <dynamic, StreamSubscription<T>>(onListenHandler), | 784 .registerUnaryCallback<dynamic, StreamSubscription<T>>( |
793 _onCancelHandler = Zone.current.registerUnaryCallback | 785 onListenHandler), |
794 <dynamic, StreamSubscription<T>>(onCancelHandler), | 786 _onCancelHandler = Zone.current |
| 787 .registerUnaryCallback<dynamic, StreamSubscription<T>>( |
| 788 onCancelHandler), |
795 _zone = Zone.current { | 789 _zone = Zone.current { |
796 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 790 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
797 } | 791 } |
798 | 792 |
799 bool get isBroadcast => true; | 793 bool get isBroadcast => true; |
800 | 794 |
801 StreamSubscription<T> listen(void onData(T data), | 795 StreamSubscription<T> listen(void onData(T data), |
802 { Function onError, | 796 {Function onError, void onDone(), bool cancelOnError}) { |
803 void onDone(), | |
804 bool cancelOnError}) { | |
805 if (_controller == null || _controller.isClosed) { | 797 if (_controller == null || _controller.isClosed) { |
806 // Return a dummy subscription backed by nothing, since | 798 // Return a dummy subscription backed by nothing, since |
807 // it will only ever send one done event. | 799 // it will only ever send one done event. |
808 return new _DoneStreamSubscription<T>(onDone); | 800 return new _DoneStreamSubscription<T>(onDone); |
809 } | 801 } |
810 if (_subscription == null) { | 802 if (_subscription == null) { |
811 _subscription = _source.listen(_controller.add, | 803 _subscription = _source.listen(_controller.add, |
812 onError: _controller.addError, | 804 onError: _controller.addError, onDone: _controller.close); |
813 onDone: _controller.close); | |
814 } | 805 } |
815 cancelOnError = identical(true, cancelOnError); | 806 cancelOnError = identical(true, cancelOnError); |
816 return _controller._subscribe(onData, onError, onDone, cancelOnError); | 807 return _controller._subscribe(onData, onError, onDone, cancelOnError); |
817 } | 808 } |
818 | 809 |
819 void _onCancel() { | 810 void _onCancel() { |
820 bool shutdown = (_controller == null) || _controller.isClosed; | 811 bool shutdown = (_controller == null) || _controller.isClosed; |
821 if (_onCancelHandler != null) { | 812 if (_onCancelHandler != null) { |
822 _zone.runUnary( | 813 _zone.runUnary( |
823 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); | 814 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
(...skipping 12 matching lines...) Expand all Loading... |
836 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); | 827 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
837 } | 828 } |
838 } | 829 } |
839 | 830 |
840 // Methods called from _BroadcastSubscriptionWrapper. | 831 // Methods called from _BroadcastSubscriptionWrapper. |
841 void _cancelSubscription() { | 832 void _cancelSubscription() { |
842 if (_subscription == null) return; | 833 if (_subscription == null) return; |
843 // Called by [_controller] when it has no subscribers left. | 834 // Called by [_controller] when it has no subscribers left. |
844 StreamSubscription subscription = _subscription; | 835 StreamSubscription subscription = _subscription; |
845 _subscription = null; | 836 _subscription = null; |
846 _controller = null; // Marks the stream as no longer listenable. | 837 _controller = null; // Marks the stream as no longer listenable. |
847 subscription.cancel(); | 838 subscription.cancel(); |
848 } | 839 } |
849 | 840 |
850 void _pauseSubscription(Future resumeSignal) { | 841 void _pauseSubscription(Future resumeSignal) { |
851 if (_subscription == null) return; | 842 if (_subscription == null) return; |
852 _subscription.pause(resumeSignal); | 843 _subscription.pause(resumeSignal); |
853 } | 844 } |
854 | 845 |
855 void _resumeSubscription() { | 846 void _resumeSubscription() { |
856 if (_subscription == null) return; | 847 if (_subscription == null) return; |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
902 bool get isPaused { | 893 bool get isPaused { |
903 return _stream._isSubscriptionPaused; | 894 return _stream._isSubscriptionPaused; |
904 } | 895 } |
905 | 896 |
906 Future<E> asFuture<E>([E futureValue]) { | 897 Future<E> asFuture<E>([E futureValue]) { |
907 throw new UnsupportedError( | 898 throw new UnsupportedError( |
908 "Cannot change handlers of asBroadcastStream source subscription."); | 899 "Cannot change handlers of asBroadcastStream source subscription."); |
909 } | 900 } |
910 } | 901 } |
911 | 902 |
912 | |
913 /** | 903 /** |
914 * Simple implementation of [StreamIterator]. | 904 * Simple implementation of [StreamIterator]. |
915 * | 905 * |
916 * Pauses the stream between calls to [moveNext]. | 906 * Pauses the stream between calls to [moveNext]. |
917 */ | 907 */ |
918 class _StreamIterator<T> implements StreamIterator<T> { | 908 class _StreamIterator<T> implements StreamIterator<T> { |
919 // The stream iterator is always in one of four states. | 909 // The stream iterator is always in one of four states. |
920 // The value of the [_stateData] field depends on the state. | 910 // The value of the [_stateData] field depends on the state. |
921 // | 911 // |
922 // When `_subscription == null` and `_stateData != null`: | 912 // When `_subscription == null` and `_stateData != null`: |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
962 /// Whether the iterator is between calls to `moveNext`. | 952 /// Whether the iterator is between calls to `moveNext`. |
963 /// This will usually cause the [_subscription] to be paused, but as an | 953 /// This will usually cause the [_subscription] to be paused, but as an |
964 /// optimization, we only pause after the [moveNext] future has been | 954 /// optimization, we only pause after the [moveNext] future has been |
965 /// completed. | 955 /// completed. |
966 bool _isPaused = false; | 956 bool _isPaused = false; |
967 | 957 |
968 _StreamIterator(final Stream<T> stream) : _stateData = stream; | 958 _StreamIterator(final Stream<T> stream) : _stateData = stream; |
969 | 959 |
970 T get current { | 960 T get current { |
971 if (_subscription != null && _isPaused) { | 961 if (_subscription != null && _isPaused) { |
972 return _stateData as Object /*=T*/; | 962 return _stateData as Object/*=T*/; |
973 } | 963 } |
974 return null; | 964 return null; |
975 } | 965 } |
976 | 966 |
977 Future<bool> moveNext() { | 967 Future<bool> moveNext() { |
978 if (_subscription != null) { | 968 if (_subscription != null) { |
979 if (_isPaused) { | 969 if (_isPaused) { |
980 var future = new _Future<bool>(); | 970 var future = new _Future<bool>(); |
981 _stateData = future; | 971 _stateData = future; |
982 _isPaused = false; | 972 _isPaused = false; |
983 _subscription.resume(); | 973 _subscription.resume(); |
984 return future; | 974 return future; |
985 } | 975 } |
986 throw new StateError("Already waiting for next."); | 976 throw new StateError("Already waiting for next."); |
987 } | 977 } |
988 return _initializeOrDone(); | 978 return _initializeOrDone(); |
989 } | 979 } |
990 | 980 |
991 /// Called if there is no active subscription when [moveNext] is called. | 981 /// Called if there is no active subscription when [moveNext] is called. |
992 /// | 982 /// |
993 /// Either starts listening on the stream if this is the first call to | 983 /// Either starts listening on the stream if this is the first call to |
994 /// [moveNext], or returns a `false` future because the stream has already | 984 /// [moveNext], or returns a `false` future because the stream has already |
995 /// ended. | 985 /// ended. |
996 Future<bool> _initializeOrDone() { | 986 Future<bool> _initializeOrDone() { |
997 assert(_subscription == null); | 987 assert(_subscription == null); |
998 var stateData = _stateData; | 988 var stateData = _stateData; |
999 if (stateData != null) { | 989 if (stateData != null) { |
1000 Stream<T> stream = stateData as Object /*=Stream<T>*/; | 990 Stream<T> stream = stateData as Object/*=Stream<T>*/; |
1001 _subscription = stream.listen( | 991 _subscription = stream.listen(_onData, |
1002 _onData, onError: _onError, onDone: _onDone, cancelOnError: true); | 992 onError: _onError, onDone: _onDone, cancelOnError: true); |
1003 var future = new _Future<bool>(); | 993 var future = new _Future<bool>(); |
1004 _stateData = future; | 994 _stateData = future; |
1005 return future; | 995 return future; |
1006 } | 996 } |
1007 return new _Future<bool>.immediate(false); | 997 return new _Future<bool>.immediate(false); |
1008 } | 998 } |
1009 | 999 |
1010 Future cancel() { | 1000 Future cancel() { |
1011 StreamSubscription<T> subscription = _subscription; | 1001 StreamSubscription<T> subscription = _subscription; |
1012 Object stateData = _stateData; | 1002 Object stateData = _stateData; |
1013 _stateData = null; | 1003 _stateData = null; |
1014 if (subscription != null) { | 1004 if (subscription != null) { |
1015 _subscription = null; | 1005 _subscription = null; |
1016 if (!_isPaused) { | 1006 if (!_isPaused) { |
1017 _Future<bool> future = stateData as Object /*=_Future<bool>*/; | 1007 _Future<bool> future = stateData as Object/*=_Future<bool>*/; |
1018 future._asyncComplete(false); | 1008 future._asyncComplete(false); |
1019 } | 1009 } |
1020 return subscription.cancel(); | 1010 return subscription.cancel(); |
1021 } | 1011 } |
1022 return Future._nullFuture; | 1012 return Future._nullFuture; |
1023 } | 1013 } |
1024 | 1014 |
1025 void _onData(T data) { | 1015 void _onData(T data) { |
1026 assert(_subscription != null && !_isPaused); | 1016 assert(_subscription != null && !_isPaused); |
1027 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 1017 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
1028 _stateData = data; | 1018 _stateData = data; |
1029 _isPaused = true; | 1019 _isPaused = true; |
1030 moveNextFuture._complete(true); | 1020 moveNextFuture._complete(true); |
1031 if (_subscription != null && _isPaused) _subscription.pause(); | 1021 if (_subscription != null && _isPaused) _subscription.pause(); |
1032 } | 1022 } |
1033 | 1023 |
1034 void _onError(Object error, [StackTrace stackTrace]) { | 1024 void _onError(Object error, [StackTrace stackTrace]) { |
1035 assert(_subscription != null && !_isPaused); | 1025 assert(_subscription != null && !_isPaused); |
1036 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 1026 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
1037 _subscription = null; | 1027 _subscription = null; |
1038 _stateData = null; | 1028 _stateData = null; |
1039 moveNextFuture._completeError(error, stackTrace); | 1029 moveNextFuture._completeError(error, stackTrace); |
1040 } | 1030 } |
1041 | 1031 |
1042 void _onDone() { | 1032 void _onDone() { |
1043 assert(_subscription != null && !_isPaused); | 1033 assert(_subscription != null && !_isPaused); |
1044 _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; | 1034 _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
1045 _subscription = null; | 1035 _subscription = null; |
1046 _stateData = null; | 1036 _stateData = null; |
1047 moveNextFuture._complete(false); | 1037 moveNextFuture._complete(false); |
1048 } | 1038 } |
1049 } | 1039 } |
1050 | 1040 |
1051 /** An empty broadcast stream, sending a done event as soon as possible. */ | 1041 /** An empty broadcast stream, sending a done event as soon as possible. */ |
1052 class _EmptyStream<T> extends Stream<T> { | 1042 class _EmptyStream<T> extends Stream<T> { |
1053 const _EmptyStream() : super._internal(); | 1043 const _EmptyStream() : super._internal(); |
1054 bool get isBroadcast => true; | 1044 bool get isBroadcast => true; |
1055 StreamSubscription<T> listen(void onData(T data), | 1045 StreamSubscription<T> listen(void onData(T data), |
1056 {Function onError, | 1046 {Function onError, void onDone(), bool cancelOnError}) { |
1057 void onDone(), | |
1058 bool cancelOnError}) { | |
1059 return new _DoneStreamSubscription<T>(onDone); | 1047 return new _DoneStreamSubscription<T>(onDone); |
1060 } | 1048 } |
1061 } | 1049 } |
OLD | NEW |