OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
59 * the event has returned. | 59 * the event has returned. |
60 * | 60 * |
61 * The controller will buffer all incoming events until the subscriber is | 61 * The controller will buffer all incoming events until the subscriber is |
62 * registered. | 62 * registered. |
63 * | 63 * |
64 * The [onPause] function is called when the stream becomes | 64 * The [onPause] function is called when the stream becomes |
65 * paused. [onResume] is called when the stream resumed. | 65 * paused. [onResume] is called when the stream resumed. |
66 * | 66 * |
67 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
68 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
69 * its subscription. | 69 * its subscription. |
Lasse Reichstein Nielsen
2013/07/17 07:28:46
The [onCancel] callback may return a value or [Fut
| |
70 * | 70 * |
71 * If the stream is canceled before the controller needs new data the | 71 * If the stream is canceled before the controller needs new data the |
72 * [onResume] call might not be executed. | 72 * [onResume] call might not be executed. |
73 */ | 73 */ |
74 factory StreamController({void onListen(), | 74 factory StreamController({void onListen(), |
75 void onPause(), | 75 void onPause(), |
76 void onResume(), | 76 void onResume(), |
77 void onCancel(), | 77 onCancel(), |
78 bool sync: false}) { | 78 bool sync: false}) { |
79 if (onListen == null && onPause == null && | 79 if (onListen == null && onPause == null && |
80 onResume == null && onCancel == null) { | 80 onResume == null && onCancel == null) { |
81 return sync | 81 return sync |
82 ? new _NoCallbackSyncStreamController/*<T>*/() | 82 ? new _NoCallbackSyncStreamController/*<T>*/() |
83 : new _NoCallbackAsyncStreamController/*<T>*/(); | 83 : new _NoCallbackAsyncStreamController/*<T>*/(); |
84 } | 84 } |
85 return sync | 85 return sync |
86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
168 } | 168 } |
169 | 169 |
170 | 170 |
171 abstract class _StreamControllerLifecycle<T> { | 171 abstract class _StreamControllerLifecycle<T> { |
172 StreamSubscription<T> _subscribe(void onData(T data), | 172 StreamSubscription<T> _subscribe(void onData(T data), |
173 void onError(Object error), | 173 void onError(Object error), |
174 void onDone(), | 174 void onDone(), |
175 bool cancelOnError); | 175 bool cancelOnError); |
176 void _recordPause(StreamSubscription<T> subscription) {} | 176 void _recordPause(StreamSubscription<T> subscription) {} |
177 void _recordResume(StreamSubscription<T> subscription) {} | 177 void _recordResume(StreamSubscription<T> subscription) {} |
178 void _recordCancel(StreamSubscription<T> subscription) {} | 178 _FutureImpl _recordCancel(StreamSubscription<T> subscription) => null; |
179 } | 179 } |
180 | 180 |
181 /** | 181 /** |
182 * Default implementation of [StreamController]. | 182 * Default implementation of [StreamController]. |
183 * | 183 * |
184 * Controls a stream that only supports a single controller. | 184 * Controls a stream that only supports a single controller. |
185 */ | 185 */ |
186 abstract class _StreamController<T> implements StreamController<T>, | 186 abstract class _StreamController<T> implements StreamController<T>, |
187 _StreamControllerLifecycle<T>, | 187 _StreamControllerLifecycle<T>, |
188 _EventSink<T>, | 188 _EventSink<T>, |
(...skipping 273 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
462 _varData = subscription; | 462 _varData = subscription; |
463 } | 463 } |
464 subscription._setPendingEvents(pendingEvents); | 464 subscription._setPendingEvents(pendingEvents); |
465 subscription._guardCallback(() { | 465 subscription._guardCallback(() { |
466 _runGuarded(_onListen); | 466 _runGuarded(_onListen); |
467 }); | 467 }); |
468 | 468 |
469 return subscription; | 469 return subscription; |
470 } | 470 } |
471 | 471 |
472 void _recordCancel(StreamSubscription<T> subscription) { | 472 _FutureImpl _recordCancel(StreamSubscription<T> subscription) { |
Lasse Reichstein Nielsen
2013/07/17 07:28:46
Why _FutureImpl if the user can provide a future t
| |
473 if (_isAddingStream) { | 473 if (_isAddingStream) { |
474 _StreamControllerAddStreamState addState = _varData; | 474 _StreamControllerAddStreamState addState = _varData; |
475 addState.cancel(); | 475 addState.cancel(); |
476 } | 476 } |
477 _varData = null; | 477 _varData = null; |
478 _state = | 478 _state = |
479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
480 _runGuarded(_onCancel); | 480 var future = _runGuardedFutureResult(_onCancel); |
floitsch
2013/07/12 16:42:34
_runGuardedFutureResult may return `null`. Documen
Lasse Reichstein Nielsen
2013/07/17 07:28:46
I think we should not guarantee that there is no e
| |
481 if (_doneFuture != null && _doneFuture._mayComplete) { | 481 if (_doneFuture != null && _doneFuture._mayComplete) { |
482 _doneFuture._asyncSetValue(null); | 482 _doneFuture._asyncSetValue(null); |
483 } | 483 } |
484 return future; | |
484 } | 485 } |
485 | 486 |
486 void _recordPause(StreamSubscription<T> subscription) { | 487 void _recordPause(StreamSubscription<T> subscription) { |
487 if (_isAddingStream) { | 488 if (_isAddingStream) { |
488 _StreamControllerAddStreamState addState = _varData; | 489 _StreamControllerAddStreamState addState = _varData; |
489 addState.pause(); | 490 addState.pause(); |
490 } | 491 } |
491 _runGuarded(_onPause); | 492 _runGuarded(_onPause); |
492 } | 493 } |
493 | 494 |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
536 class _AsyncStreamController<T> extends _StreamController<T> | 537 class _AsyncStreamController<T> extends _StreamController<T> |
537 with _AsyncStreamControllerDispatch<T> { | 538 with _AsyncStreamControllerDispatch<T> { |
538 final _NotificationHandler _onListen; | 539 final _NotificationHandler _onListen; |
539 final _NotificationHandler _onPause; | 540 final _NotificationHandler _onPause; |
540 final _NotificationHandler _onResume; | 541 final _NotificationHandler _onResume; |
541 final _NotificationHandler _onCancel; | 542 final _NotificationHandler _onCancel; |
542 | 543 |
543 _AsyncStreamController(void this._onListen(), | 544 _AsyncStreamController(void this._onListen(), |
544 void this._onPause(), | 545 void this._onPause(), |
545 void this._onResume(), | 546 void this._onResume(), |
546 void this._onCancel()); | 547 this._onCancel()); |
547 } | 548 } |
548 | 549 |
549 class _SyncStreamController<T> extends _StreamController<T> | 550 class _SyncStreamController<T> extends _StreamController<T> |
550 with _SyncStreamControllerDispatch<T> { | 551 with _SyncStreamControllerDispatch<T> { |
551 final _NotificationHandler _onListen; | 552 final _NotificationHandler _onListen; |
552 final _NotificationHandler _onPause; | 553 final _NotificationHandler _onPause; |
553 final _NotificationHandler _onResume; | 554 final _NotificationHandler _onResume; |
554 final _NotificationHandler _onCancel; | 555 final _NotificationHandler _onCancel; |
555 | 556 |
556 _SyncStreamController(void this._onListen(), | 557 _SyncStreamController(void this._onListen(), |
557 void this._onPause(), | 558 void this._onPause(), |
558 void this._onResume(), | 559 void this._onResume(), |
559 void this._onCancel()); | 560 this._onCancel()); |
560 } | 561 } |
561 | 562 |
562 abstract class _NoCallbacks { | 563 abstract class _NoCallbacks { |
563 _NotificationHandler get _onListen => null; | 564 _NotificationHandler get _onListen => null; |
564 _NotificationHandler get _onPause => null; | 565 _NotificationHandler get _onPause => null; |
565 _NotificationHandler get _onResume => null; | 566 _NotificationHandler get _onResume => null; |
566 _NotificationHandler get _onCancel => null; | 567 _NotificationHandler get _onCancel => null; |
567 } | 568 } |
568 | 569 |
569 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 570 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
570 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 571 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
571 | 572 |
572 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 573 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
573 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 574 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
574 | 575 |
575 typedef void _NotificationHandler(); | 576 typedef void _NotificationHandler(); |
576 | 577 |
577 void _runGuarded(_NotificationHandler notificationHandler) { | 578 void _runGuarded(_NotificationHandler notificationHandler) { |
578 if (notificationHandler == null) return; | 579 if (notificationHandler == null) return; |
579 try { | 580 try { |
580 notificationHandler(); | 581 notificationHandler(); |
581 } catch (e, s) { | 582 } catch (e, s) { |
582 _Zone.current.handleUncaughtError(_asyncError(e, s)); | 583 _Zone.current.handleUncaughtError(_asyncError(e, s)); |
583 } | 584 } |
584 } | 585 } |
585 | 586 |
587 _FutureImpl _runGuardedFutureResult(_NotificationHandler notificationHandler) { | |
588 if (notificationHandler == null) return null; | |
589 try { | |
590 var value = notificationHandler(); | |
591 if (value is _FutureImpl) { | |
592 return value; | |
593 } else { | |
Lasse Reichstein Nielsen
2013/07/17 07:28:46
else if (value is Future) { // You can't assume t
| |
594 return new Future(() => value); | |
Lasse Reichstein Nielsen
2013/07/17 07:28:46
return new _FutureImpl.immediate(value);
| |
595 } | |
596 } catch (e, s) { | |
597 _Zone.current.handleUncaughtError(_asyncError(e, s)); | |
598 } | |
599 } | |
600 | |
586 class _ControllerStream<T> extends _StreamImpl<T> { | 601 class _ControllerStream<T> extends _StreamImpl<T> { |
587 _StreamControllerLifecycle<T> _controller; | 602 _StreamControllerLifecycle<T> _controller; |
588 | 603 |
589 _ControllerStream(this._controller); | 604 _ControllerStream(this._controller); |
590 | 605 |
591 StreamSubscription<T> _createSubscription( | 606 StreamSubscription<T> _createSubscription( |
592 void onData(T data), | 607 void onData(T data), |
593 void onError(Object error), | 608 void onError(Object error), |
594 void onDone(), | 609 void onDone(), |
595 bool cancelOnError) => | 610 bool cancelOnError) => |
(...skipping 15 matching lines...) Expand all Loading... | |
611 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 626 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
612 final _StreamControllerLifecycle<T> _controller; | 627 final _StreamControllerLifecycle<T> _controller; |
613 | 628 |
614 _ControllerSubscription(this._controller, | 629 _ControllerSubscription(this._controller, |
615 void onData(T data), | 630 void onData(T data), |
616 void onError(Object error), | 631 void onError(Object error), |
617 void onDone(), | 632 void onDone(), |
618 bool cancelOnError) | 633 bool cancelOnError) |
619 : super(onData, onError, onDone, cancelOnError); | 634 : super(onData, onError, onDone, cancelOnError); |
620 | 635 |
621 void _onCancel() { | 636 _FutureImpl _onCancel() { |
622 _controller._recordCancel(this); | 637 return _controller._recordCancel(this); |
623 } | 638 } |
624 | 639 |
625 void _onPause() { | 640 void _onPause() { |
626 _controller._recordPause(this); | 641 _controller._recordPause(this); |
627 } | 642 } |
628 | 643 |
629 void _onResume() { | 644 void _onResume() { |
630 _controller._recordResume(this); | 645 _controller._recordResume(this); |
631 } | 646 } |
632 } | 647 } |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
685 var varData; | 700 var varData; |
686 | 701 |
687 _StreamControllerAddStreamState(_StreamController controller, | 702 _StreamControllerAddStreamState(_StreamController controller, |
688 this.varData, | 703 this.varData, |
689 Stream source) : super(controller, source) { | 704 Stream source) : super(controller, source) { |
690 if (controller.isPaused) { | 705 if (controller.isPaused) { |
691 addSubscription.pause(); | 706 addSubscription.pause(); |
692 } | 707 } |
693 } | 708 } |
694 } | 709 } |
OLD | NEW |