Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 59 * the event has returned. | 59 * the event has returned. |
| 60 * | 60 * |
| 61 * The controller will buffer all incoming events until the subscriber is | 61 * The controller will buffer all incoming events until the subscriber is |
| 62 * registered. | 62 * registered. |
| 63 * | 63 * |
| 64 * The [onPause] function is called when the stream becomes | 64 * The [onPause] function is called when the stream becomes |
| 65 * paused. [onResume] is called when the stream resumed. | 65 * paused. [onResume] is called when the stream resumed. |
| 66 * | 66 * |
| 67 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
| 68 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
| 69 * its subscription. | 69 * its subscription. If [onCancel] needs to perform an asynchronous operation, |
| 70 * [onCancel] can return a [Future]. The [StreamSubscription] will then get | |
|
floitsch
2013/10/16 14:43:44
should return a future that completes when the can
Anders Johnsen
2013/10/21 08:01:46
Done.
| |
| 71 * that future, and can wait for it before continuing. | |
| 70 * | 72 * |
| 71 * If the stream is canceled before the controller needs new data the | 73 * If the stream is canceled before the controller needs new data the |
| 72 * [onResume] call might not be executed. | 74 * [onResume] call might not be executed. |
| 73 */ | 75 */ |
| 74 factory StreamController({void onListen(), | 76 factory StreamController({void onListen(), |
| 75 void onPause(), | 77 void onPause(), |
| 76 void onResume(), | 78 void onResume(), |
| 77 void onCancel(), | 79 onCancel(), |
| 78 bool sync: false}) { | 80 bool sync: false}) { |
| 79 if (onListen == null && onPause == null && | 81 if (onListen == null && onPause == null && |
| 80 onResume == null && onCancel == null) { | 82 onResume == null && onCancel == null) { |
| 81 return sync | 83 return sync |
| 82 ? new _NoCallbackSyncStreamController/*<T>*/() | 84 ? new _NoCallbackSyncStreamController/*<T>*/() |
| 83 : new _NoCallbackAsyncStreamController/*<T>*/(); | 85 : new _NoCallbackAsyncStreamController/*<T>*/(); |
| 84 } | 86 } |
| 85 return sync | 87 return sync |
| 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 88 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 89 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| (...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 165 * allows. | 167 * allows. |
| 166 */ | 168 */ |
| 167 void addError(Object error, [Object stackTrace]); | 169 void addError(Object error, [Object stackTrace]); |
| 168 } | 170 } |
| 169 | 171 |
| 170 | 172 |
| 171 abstract class _StreamControllerLifecycle<T> { | 173 abstract class _StreamControllerLifecycle<T> { |
| 172 StreamSubscription<T> _subscribe(bool cancelOnError); | 174 StreamSubscription<T> _subscribe(bool cancelOnError); |
| 173 void _recordPause(StreamSubscription<T> subscription) {} | 175 void _recordPause(StreamSubscription<T> subscription) {} |
| 174 void _recordResume(StreamSubscription<T> subscription) {} | 176 void _recordResume(StreamSubscription<T> subscription) {} |
| 175 void _recordCancel(StreamSubscription<T> subscription) {} | 177 Future _recordCancel(StreamSubscription<T> subscription) => null; |
| 176 } | 178 } |
| 177 | 179 |
| 178 /** | 180 /** |
| 179 * Default implementation of [StreamController]. | 181 * Default implementation of [StreamController]. |
| 180 * | 182 * |
| 181 * Controls a stream that only supports a single controller. | 183 * Controls a stream that only supports a single controller. |
| 182 */ | 184 */ |
| 183 abstract class _StreamController<T> implements StreamController<T>, | 185 abstract class _StreamController<T> implements StreamController<T>, |
| 184 _StreamControllerLifecycle<T>, | 186 _StreamControllerLifecycle<T>, |
| 185 _EventSink<T>, | 187 _EventSink<T>, |
| (...skipping 270 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 456 _varData = subscription; | 458 _varData = subscription; |
| 457 } | 459 } |
| 458 subscription._setPendingEvents(pendingEvents); | 460 subscription._setPendingEvents(pendingEvents); |
| 459 subscription._guardCallback(() { | 461 subscription._guardCallback(() { |
| 460 _runGuarded(_onListen); | 462 _runGuarded(_onListen); |
| 461 }); | 463 }); |
| 462 | 464 |
| 463 return subscription; | 465 return subscription; |
| 464 } | 466 } |
| 465 | 467 |
| 466 void _recordCancel(StreamSubscription<T> subscription) { | 468 Future _recordCancel(StreamSubscription<T> subscription) { |
| 467 if (_isAddingStream) { | 469 if (_isAddingStream) { |
| 468 _StreamControllerAddStreamState addState = _varData; | 470 _StreamControllerAddStreamState addState = _varData; |
| 469 addState.cancel(); | 471 addState.cancel(); |
| 470 } | 472 } |
| 471 _varData = null; | 473 _varData = null; |
| 472 _state = | 474 _state = |
| 473 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 475 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| 474 _runGuarded(_onCancel); | 476 Future future = _runGuarded(_onCancel); |
|
floitsch
2013/10/16 14:43:44
If the doneFuture and the cancel are not linked (s
Anders Johnsen
2013/10/21 08:01:46
Done.
| |
| 475 if (_doneFuture != null && _doneFuture._mayComplete) { | 477 if (_doneFuture != null && _doneFuture._mayComplete) { |
| 476 _doneFuture._asyncComplete(null); | 478 _doneFuture._asyncComplete(null); |
| 477 } | 479 } |
| 480 return future; | |
| 478 } | 481 } |
| 479 | 482 |
| 480 void _recordPause(StreamSubscription<T> subscription) { | 483 void _recordPause(StreamSubscription<T> subscription) { |
| 481 if (_isAddingStream) { | 484 if (_isAddingStream) { |
| 482 _StreamControllerAddStreamState addState = _varData; | 485 _StreamControllerAddStreamState addState = _varData; |
| 483 addState.pause(); | 486 addState.pause(); |
| 484 } | 487 } |
| 485 _runGuarded(_onPause); | 488 _runGuarded(_onPause); |
| 486 } | 489 } |
| 487 | 490 |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 530 class _AsyncStreamController<T> extends _StreamController<T> | 533 class _AsyncStreamController<T> extends _StreamController<T> |
| 531 with _AsyncStreamControllerDispatch<T> { | 534 with _AsyncStreamControllerDispatch<T> { |
| 532 final _NotificationHandler _onListen; | 535 final _NotificationHandler _onListen; |
| 533 final _NotificationHandler _onPause; | 536 final _NotificationHandler _onPause; |
| 534 final _NotificationHandler _onResume; | 537 final _NotificationHandler _onResume; |
| 535 final _NotificationHandler _onCancel; | 538 final _NotificationHandler _onCancel; |
| 536 | 539 |
| 537 _AsyncStreamController(void this._onListen(), | 540 _AsyncStreamController(void this._onListen(), |
| 538 void this._onPause(), | 541 void this._onPause(), |
| 539 void this._onResume(), | 542 void this._onResume(), |
| 540 void this._onCancel()); | 543 this._onCancel()); |
| 541 } | 544 } |
| 542 | 545 |
| 543 class _SyncStreamController<T> extends _StreamController<T> | 546 class _SyncStreamController<T> extends _StreamController<T> |
| 544 with _SyncStreamControllerDispatch<T> { | 547 with _SyncStreamControllerDispatch<T> { |
| 545 final _NotificationHandler _onListen; | 548 final _NotificationHandler _onListen; |
| 546 final _NotificationHandler _onPause; | 549 final _NotificationHandler _onPause; |
| 547 final _NotificationHandler _onResume; | 550 final _NotificationHandler _onResume; |
| 548 final _NotificationHandler _onCancel; | 551 final _NotificationHandler _onCancel; |
| 549 | 552 |
| 550 _SyncStreamController(void this._onListen(), | 553 _SyncStreamController(void this._onListen(), |
| 551 void this._onPause(), | 554 void this._onPause(), |
| 552 void this._onResume(), | 555 void this._onResume(), |
| 553 void this._onCancel()); | 556 this._onCancel()); |
| 554 } | 557 } |
| 555 | 558 |
| 556 abstract class _NoCallbacks { | 559 abstract class _NoCallbacks { |
| 557 _NotificationHandler get _onListen => null; | 560 _NotificationHandler get _onListen => null; |
| 558 _NotificationHandler get _onPause => null; | 561 _NotificationHandler get _onPause => null; |
| 559 _NotificationHandler get _onResume => null; | 562 _NotificationHandler get _onResume => null; |
| 560 _NotificationHandler get _onCancel => null; | 563 _NotificationHandler get _onCancel => null; |
| 561 } | 564 } |
| 562 | 565 |
| 563 class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 566 class _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
| 564 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 567 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| 565 | 568 |
| 566 class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 569 class _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
| 567 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 570 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| 568 | 571 |
| 569 typedef void _NotificationHandler(); | 572 typedef _NotificationHandler(); |
| 570 | 573 |
| 571 void _runGuarded(_NotificationHandler notificationHandler) { | 574 Future _runGuarded(_NotificationHandler notificationHandler) { |
| 572 if (notificationHandler == null) return; | 575 if (notificationHandler == null) return null; |
| 573 try { | 576 try { |
| 574 notificationHandler(); | 577 var result = notificationHandler(); |
| 578 if (result is Future) return result; | |
| 579 return null; | |
| 575 } catch (e, s) { | 580 } catch (e, s) { |
| 576 Zone.current.handleUncaughtError(_asyncError(e, s), s); | 581 Zone.current.handleUncaughtError(_asyncError(e, s), s); |
| 577 } | 582 } |
| 578 } | 583 } |
| 579 | 584 |
| 580 class _ControllerStream<T> extends _StreamImpl<T> { | 585 class _ControllerStream<T> extends _StreamImpl<T> { |
| 581 _StreamControllerLifecycle<T> _controller; | 586 _StreamControllerLifecycle<T> _controller; |
| 582 | 587 |
| 583 _ControllerStream(this._controller); | 588 _ControllerStream(this._controller); |
| 584 | 589 |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 598 return identical(otherStream._controller, this._controller); | 603 return identical(otherStream._controller, this._controller); |
| 599 } | 604 } |
| 600 } | 605 } |
| 601 | 606 |
| 602 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 607 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 603 final _StreamControllerLifecycle<T> _controller; | 608 final _StreamControllerLifecycle<T> _controller; |
| 604 | 609 |
| 605 _ControllerSubscription(this._controller, bool cancelOnError) | 610 _ControllerSubscription(this._controller, bool cancelOnError) |
| 606 : super(cancelOnError); | 611 : super(cancelOnError); |
| 607 | 612 |
| 608 void _onCancel() { | 613 Future _onCancel() { |
| 609 _controller._recordCancel(this); | 614 return _controller._recordCancel(this); |
| 610 } | 615 } |
| 611 | 616 |
| 612 void _onPause() { | 617 void _onPause() { |
| 613 _controller._recordPause(this); | 618 _controller._recordPause(this); |
| 614 } | 619 } |
| 615 | 620 |
| 616 void _onResume() { | 621 void _onResume() { |
| 617 _controller._recordResume(this); | 622 _controller._recordResume(this); |
| 618 } | 623 } |
| 619 } | 624 } |
| 620 | 625 |
| 621 | 626 |
| 622 /** A class that exposes only the [StreamSink] interface of an object. */ | 627 /** A class that exposes only the [StreamSink] interface of an object. */ |
| 623 class _StreamSinkWrapper<T> implements StreamSink<T> { | 628 class _StreamSinkWrapper<T> implements StreamSink<T> { |
| 624 final StreamSink _target; | 629 final StreamSink _target; |
| 625 _StreamSinkWrapper(this._target); | 630 _StreamSinkWrapper(this._target); |
| 626 void add(T data) { _target.add(data); } | 631 void add(T data) { _target.add(data); } |
| 627 void addError(Object error, [StackTrace stackTrace]) { | 632 void addError(Object error, [StackTrace stackTrace]) { |
| 628 _target.addError(error); | 633 _target.addError(error); |
| 629 } | 634 } |
| 630 Future close() => _target.close(); | 635 Future close() => _target.close(); |
| 631 Future addStream(Stream<T> source) => _target.addStream(source); | 636 Future addStream(Stream<T> source) => _target.addStream(source); |
| 632 Future get done => _target.done; | 637 Future get done => _target.done; |
| 633 } | 638 } |
| 634 | 639 |
| 635 /** | 640 /** |
| 636 * Object containing the state used to handle [StreamController.addStream]. | 641 * Object containing the state used to handle [StreamController.addStream]. |
| 637 */ | 642 */ |
| 638 class _AddStreamState<T> { | 643 class _AddStreamState<T> { |
| 639 // [_FutureImpl] returned by call to addStream. | 644 // [_Future] returned by call to addStream. |
| 640 _Future addStreamFuture; | 645 _Future addStreamFuture; |
| 641 | 646 |
| 642 // Subscription on stream argument to addStream. | 647 // Subscription on stream argument to addStream. |
| 643 StreamSubscription addSubscription; | 648 StreamSubscription addSubscription; |
| 644 | 649 |
| 645 _AddStreamState(_EventSink<T> controller, Stream source) | 650 _AddStreamState(_EventSink<T> controller, Stream source) |
| 646 : addStreamFuture = new _Future(), | 651 : addStreamFuture = new _Future(), |
| 647 addSubscription = source.listen(controller._add, | 652 addSubscription = source.listen(controller._add, |
| 648 onError: controller._addError, | 653 onError: controller._addError, |
| 649 onDone: controller._close, | 654 onDone: controller._close, |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 674 var varData; | 679 var varData; |
| 675 | 680 |
| 676 _StreamControllerAddStreamState(_StreamController controller, | 681 _StreamControllerAddStreamState(_StreamController controller, |
| 677 this.varData, | 682 this.varData, |
| 678 Stream source) : super(controller, source) { | 683 Stream source) : super(controller, source) { |
| 679 if (controller.isPaused) { | 684 if (controller.isPaused) { |
| 680 addSubscription.pause(); | 685 addSubscription.pause(); |
| 681 } | 686 } |
| 682 } | 687 } |
| 683 } | 688 } |
| OLD | NEW |