OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
59 * the event has returned. | 59 * the event has returned. |
60 * | 60 * |
61 * The controller will buffer all incoming events until the subscriber is | 61 * The controller will buffer all incoming events until the subscriber is |
62 * registered. | 62 * registered. |
63 * | 63 * |
64 * The [onPause] function is called when the stream becomes | 64 * The [onPause] function is called when the stream becomes |
65 * paused. [onResume] is called when the stream resumed. | 65 * paused. [onResume] is called when the stream resumed. |
66 * | 66 * |
67 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
68 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
69 * its subscription. | 69 * its subscription. If [onCancel] needs to perform an asynchronous operation, |
| 70 * [onCancel] should return a future that completes when the cancel operation |
| 71 * is done. |
70 * | 72 * |
71 * If the stream is canceled before the controller needs new data the | 73 * If the stream is canceled before the controller needs new data the |
72 * [onResume] call might not be executed. | 74 * [onResume] call might not be executed. |
73 */ | 75 */ |
74 factory StreamController({void onListen(), | 76 factory StreamController({void onListen(), |
75 void onPause(), | 77 void onPause(), |
76 void onResume(), | 78 void onResume(), |
77 void onCancel(), | 79 onCancel(), |
78 bool sync: false}) { | 80 bool sync: false}) { |
79 if (onListen == null && onPause == null && | 81 if (onListen == null && onPause == null && |
80 onResume == null && onCancel == null) { | 82 onResume == null && onCancel == null) { |
81 return sync | 83 return sync |
82 ? new _NoCallbackSyncStreamController/*<T>*/() | 84 ? new _NoCallbackSyncStreamController/*<T>*/() |
83 : new _NoCallbackAsyncStreamController/*<T>*/(); | 85 : new _NoCallbackAsyncStreamController/*<T>*/(); |
84 } | 86 } |
85 return sync | 87 return sync |
86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 88 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 89 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
165 * allows. | 167 * allows. |
166 */ | 168 */ |
167 void addError(Object error, [Object stackTrace]); | 169 void addError(Object error, [Object stackTrace]); |
168 } | 170 } |
169 | 171 |
170 | 172 |
171 abstract class _StreamControllerLifecycle<T> { | 173 abstract class _StreamControllerLifecycle<T> { |
172 StreamSubscription<T> _subscribe(bool cancelOnError); | 174 StreamSubscription<T> _subscribe(bool cancelOnError); |
173 void _recordPause(StreamSubscription<T> subscription) {} | 175 void _recordPause(StreamSubscription<T> subscription) {} |
174 void _recordResume(StreamSubscription<T> subscription) {} | 176 void _recordResume(StreamSubscription<T> subscription) {} |
175 void _recordCancel(StreamSubscription<T> subscription) {} | 177 Future _recordCancel(StreamSubscription<T> subscription) => null; |
176 } | 178 } |
177 | 179 |
178 /** | 180 /** |
179 * Default implementation of [StreamController]. | 181 * Default implementation of [StreamController]. |
180 * | 182 * |
181 * Controls a stream that only supports a single controller. | 183 * Controls a stream that only supports a single controller. |
182 */ | 184 */ |
183 abstract class _StreamController<T> implements StreamController<T>, | 185 abstract class _StreamController<T> implements StreamController<T>, |
184 _StreamControllerLifecycle<T>, | 186 _StreamControllerLifecycle<T>, |
185 _EventSink<T>, | 187 _EventSink<T>, |
(...skipping 270 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
456 _varData = subscription; | 458 _varData = subscription; |
457 } | 459 } |
458 subscription._setPendingEvents(pendingEvents); | 460 subscription._setPendingEvents(pendingEvents); |
459 subscription._guardCallback(() { | 461 subscription._guardCallback(() { |
460 _runGuarded(_onListen); | 462 _runGuarded(_onListen); |
461 }); | 463 }); |
462 | 464 |
463 return subscription; | 465 return subscription; |
464 } | 466 } |
465 | 467 |
466 void _recordCancel(StreamSubscription<T> subscription) { | 468 Future _recordCancel(StreamSubscription<T> subscription) { |
467 if (_isAddingStream) { | 469 if (_isAddingStream) { |
468 _StreamControllerAddStreamState addState = _varData; | 470 _StreamControllerAddStreamState addState = _varData; |
469 addState.cancel(); | 471 addState.cancel(); |
470 } | 472 } |
471 _varData = null; | 473 _varData = null; |
472 _state = | 474 _state = |
473 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 475 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
474 _runGuarded(_onCancel); | 476 void complete() { |
475 if (_doneFuture != null && _doneFuture._mayComplete) { | 477 if (_doneFuture != null && _doneFuture._mayComplete) { |
476 _doneFuture._asyncComplete(null); | 478 _doneFuture._asyncComplete(null); |
| 479 } |
477 } | 480 } |
| 481 Future future = _runGuarded(_onCancel); |
| 482 if (future != null) { |
| 483 future = future.whenComplete(complete); |
| 484 } else { |
| 485 complete(); |
| 486 } |
| 487 return future; |
478 } | 488 } |
479 | 489 |
480 void _recordPause(StreamSubscription<T> subscription) { | 490 void _recordPause(StreamSubscription<T> subscription) { |
481 if (_isAddingStream) { | 491 if (_isAddingStream) { |
482 _StreamControllerAddStreamState addState = _varData; | 492 _StreamControllerAddStreamState addState = _varData; |
483 addState.pause(); | 493 addState.pause(); |
484 } | 494 } |
485 _runGuarded(_onPause); | 495 _runGuarded(_onPause); |
486 } | 496 } |
487 | 497 |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
530 class _AsyncStreamController<T> extends _StreamController<T> | 540 class _AsyncStreamController<T> extends _StreamController<T> |
531 with _AsyncStreamControllerDispatch<T> { | 541 with _AsyncStreamControllerDispatch<T> { |
532 final _NotificationHandler _onListen; | 542 final _NotificationHandler _onListen; |
533 final _NotificationHandler _onPause; | 543 final _NotificationHandler _onPause; |
534 final _NotificationHandler _onResume; | 544 final _NotificationHandler _onResume; |
535 final _NotificationHandler _onCancel; | 545 final _NotificationHandler _onCancel; |
536 | 546 |
537 _AsyncStreamController(void this._onListen(), | 547 _AsyncStreamController(void this._onListen(), |
538 void this._onPause(), | 548 void this._onPause(), |
539 void this._onResume(), | 549 void this._onResume(), |
540 void this._onCancel()); | 550 this._onCancel()); |
541 } | 551 } |
542 | 552 |
543 class _SyncStreamController<T> extends _StreamController<T> | 553 class _SyncStreamController<T> extends _StreamController<T> |
544 with _SyncStreamControllerDispatch<T> { | 554 with _SyncStreamControllerDispatch<T> { |
545 final _NotificationHandler _onListen; | 555 final _NotificationHandler _onListen; |
546 final _NotificationHandler _onPause; | 556 final _NotificationHandler _onPause; |
547 final _NotificationHandler _onResume; | 557 final _NotificationHandler _onResume; |
548 final _NotificationHandler _onCancel; | 558 final _NotificationHandler _onCancel; |
549 | 559 |
550 _SyncStreamController(void this._onListen(), | 560 _SyncStreamController(void this._onListen(), |
551 void this._onPause(), | 561 void this._onPause(), |
552 void this._onResume(), | 562 void this._onResume(), |
553 void this._onCancel()); | 563 this._onCancel()); |
554 } | 564 } |
555 | 565 |
556 abstract class _NoCallbacks { | 566 abstract class _NoCallbacks { |
557 _NotificationHandler get _onListen => null; | 567 _NotificationHandler get _onListen => null; |
558 _NotificationHandler get _onPause => null; | 568 _NotificationHandler get _onPause => null; |
559 _NotificationHandler get _onResume => null; | 569 _NotificationHandler get _onResume => null; |
560 _NotificationHandler get _onCancel => null; | 570 _NotificationHandler get _onCancel => null; |
561 } | 571 } |
562 | 572 |
563 class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 573 class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
564 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 574 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
565 | 575 |
566 class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 576 class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
567 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 577 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
568 | 578 |
569 typedef void _NotificationHandler(); | 579 typedef _NotificationHandler(); |
570 | 580 |
571 void _runGuarded(_NotificationHandler notificationHandler) { | 581 Future _runGuarded(_NotificationHandler notificationHandler) { |
572 if (notificationHandler == null) return; | 582 if (notificationHandler == null) return null; |
573 try { | 583 try { |
574 notificationHandler(); | 584 var result = notificationHandler(); |
| 585 if (result is Future) return result; |
| 586 return null; |
575 } catch (e, s) { | 587 } catch (e, s) { |
576 Zone.current.handleUncaughtError(_asyncError(e, s), s); | 588 Zone.current.handleUncaughtError(_asyncError(e, s), s); |
577 } | 589 } |
578 } | 590 } |
579 | 591 |
580 class _ControllerStream<T> extends _StreamImpl<T> { | 592 class _ControllerStream<T> extends _StreamImpl<T> { |
581 _StreamControllerLifecycle<T> _controller; | 593 _StreamControllerLifecycle<T> _controller; |
582 | 594 |
583 _ControllerStream(this._controller); | 595 _ControllerStream(this._controller); |
584 | 596 |
(...skipping 13 matching lines...) Expand all Loading... |
598 return identical(otherStream._controller, this._controller); | 610 return identical(otherStream._controller, this._controller); |
599 } | 611 } |
600 } | 612 } |
601 | 613 |
602 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 614 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
603 final _StreamControllerLifecycle<T> _controller; | 615 final _StreamControllerLifecycle<T> _controller; |
604 | 616 |
605 _ControllerSubscription(this._controller, bool cancelOnError) | 617 _ControllerSubscription(this._controller, bool cancelOnError) |
606 : super(cancelOnError); | 618 : super(cancelOnError); |
607 | 619 |
608 void _onCancel() { | 620 Future _onCancel() { |
609 _controller._recordCancel(this); | 621 return _controller._recordCancel(this); |
610 } | 622 } |
611 | 623 |
612 void _onPause() { | 624 void _onPause() { |
613 _controller._recordPause(this); | 625 _controller._recordPause(this); |
614 } | 626 } |
615 | 627 |
616 void _onResume() { | 628 void _onResume() { |
617 _controller._recordResume(this); | 629 _controller._recordResume(this); |
618 } | 630 } |
619 } | 631 } |
620 | 632 |
621 | 633 |
622 /** A class that exposes only the [StreamSink] interface of an object. */ | 634 /** A class that exposes only the [StreamSink] interface of an object. */ |
623 class _StreamSinkWrapper<T> implements StreamSink<T> { | 635 class _StreamSinkWrapper<T> implements StreamSink<T> { |
624 final StreamSink _target; | 636 final StreamSink _target; |
625 _StreamSinkWrapper(this._target); | 637 _StreamSinkWrapper(this._target); |
626 void add(T data) { _target.add(data); } | 638 void add(T data) { _target.add(data); } |
627 void addError(Object error, [StackTrace stackTrace]) { | 639 void addError(Object error, [StackTrace stackTrace]) { |
628 _target.addError(error); | 640 _target.addError(error); |
629 } | 641 } |
630 Future close() => _target.close(); | 642 Future close() => _target.close(); |
631 Future addStream(Stream<T> source) => _target.addStream(source); | 643 Future addStream(Stream<T> source) => _target.addStream(source); |
632 Future get done => _target.done; | 644 Future get done => _target.done; |
633 } | 645 } |
634 | 646 |
635 /** | 647 /** |
636 * Object containing the state used to handle [StreamController.addStream]. | 648 * Object containing the state used to handle [StreamController.addStream]. |
637 */ | 649 */ |
638 class _AddStreamState<T> { | 650 class _AddStreamState<T> { |
639 // [_FutureImpl] returned by call to addStream. | 651 // [_Future] returned by call to addStream. |
640 _Future addStreamFuture; | 652 _Future addStreamFuture; |
641 | 653 |
642 // Subscription on stream argument to addStream. | 654 // Subscription on stream argument to addStream. |
643 StreamSubscription addSubscription; | 655 StreamSubscription addSubscription; |
644 | 656 |
645 _AddStreamState(_EventSink<T> controller, Stream source) | 657 _AddStreamState(_EventSink<T> controller, Stream source) |
646 : addStreamFuture = new _Future(), | 658 : addStreamFuture = new _Future(), |
647 addSubscription = source.listen(controller._add, | 659 addSubscription = source.listen(controller._add, |
648 onError: controller._addError, | 660 onError: controller._addError, |
649 onDone: controller._close, | 661 onDone: controller._close, |
(...skipping 24 matching lines...) Expand all Loading... |
674 var varData; | 686 var varData; |
675 | 687 |
676 _StreamControllerAddStreamState(_StreamController controller, | 688 _StreamControllerAddStreamState(_StreamController controller, |
677 this.varData, | 689 this.varData, |
678 Stream source) : super(controller, source) { | 690 Stream source) : super(controller, source) { |
679 if (controller.isPaused) { | 691 if (controller.isPaused) { |
680 addSubscription.pause(); | 692 addSubscription.pause(); |
681 } | 693 } |
682 } | 694 } |
683 } | 695 } |
OLD | NEW |