Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 67 * The [onListen] callback is called when the stream | 67 * The [onListen] callback is called when the stream |
| 68 * receives its listener and [onCancel] when the listener ends | 68 * receives its listener and [onCancel] when the listener ends |
| 69 * its subscription. | 69 * its subscription. |
| 70 * | 70 * |
| 71 * If the stream is canceled before the controller needs new data the | 71 * If the stream is canceled before the controller needs new data the |
| 72 * [onResume] call might not be executed. | 72 * [onResume] call might not be executed. |
| 73 */ | 73 */ |
| 74 factory StreamController({void onListen(), | 74 factory StreamController({void onListen(), |
| 75 void onPause(), | 75 void onPause(), |
| 76 void onResume(), | 76 void onResume(), |
| 77 void onCancel(), | 77 onCancel(), |
|
floitsch
2013/10/12 18:53:57
Needs comment.
Lasse Reichstein Nielsen
2013/10/14 11:32:33
Can the return type be Future?
It's always ok to
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 78 bool sync: false}) { | 78 bool sync: false}) { |
| 79 if (onListen == null && onPause == null && | 79 if (onListen == null && onPause == null && |
| 80 onResume == null && onCancel == null) { | 80 onResume == null && onCancel == null) { |
| 81 return sync | 81 return sync |
| 82 ? new _NoCallbackSyncStreamController/*<T>*/() | 82 ? new _NoCallbackSyncStreamController/*<T>*/() |
| 83 : new _NoCallbackAsyncStreamController/*<T>*/(); | 83 : new _NoCallbackAsyncStreamController/*<T>*/(); |
| 84 } | 84 } |
| 85 return sync | 85 return sync |
| 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 168 } | 168 } |
| 169 | 169 |
| 170 | 170 |
| 171 abstract class _StreamControllerLifecycle<T> { | 171 abstract class _StreamControllerLifecycle<T> { |
| 172 StreamSubscription<T> _subscribe(void onData(T data), | 172 StreamSubscription<T> _subscribe(void onData(T data), |
| 173 void onError(Object error), | 173 void onError(Object error), |
| 174 void onDone(), | 174 void onDone(), |
| 175 bool cancelOnError); | 175 bool cancelOnError); |
| 176 void _recordPause(StreamSubscription<T> subscription) {} | 176 void _recordPause(StreamSubscription<T> subscription) {} |
| 177 void _recordResume(StreamSubscription<T> subscription) {} | 177 void _recordResume(StreamSubscription<T> subscription) {} |
| 178 void _recordCancel(StreamSubscription<T> subscription) {} | 178 Future _recordCancel(StreamSubscription<T> subscription) => null; |
| 179 } | 179 } |
| 180 | 180 |
| 181 /** | 181 /** |
| 182 * Default implementation of [StreamController]. | 182 * Default implementation of [StreamController]. |
| 183 * | 183 * |
| 184 * Controls a stream that only supports a single controller. | 184 * Controls a stream that only supports a single controller. |
| 185 */ | 185 */ |
| 186 abstract class _StreamController<T> implements StreamController<T>, | 186 abstract class _StreamController<T> implements StreamController<T>, |
| 187 _StreamControllerLifecycle<T>, | 187 _StreamControllerLifecycle<T>, |
| 188 _EventSink<T>, | 188 _EventSink<T>, |
| (...skipping 273 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 462 _varData = subscription; | 462 _varData = subscription; |
| 463 } | 463 } |
| 464 subscription._setPendingEvents(pendingEvents); | 464 subscription._setPendingEvents(pendingEvents); |
| 465 subscription._guardCallback(() { | 465 subscription._guardCallback(() { |
| 466 _runGuarded(_onListen); | 466 _runGuarded(_onListen); |
| 467 }); | 467 }); |
| 468 | 468 |
| 469 return subscription; | 469 return subscription; |
| 470 } | 470 } |
| 471 | 471 |
| 472 void _recordCancel(StreamSubscription<T> subscription) { | 472 Future _recordCancel(StreamSubscription<T> subscription) { |
| 473 if (_isAddingStream) { | 473 if (_isAddingStream) { |
| 474 _StreamControllerAddStreamState addState = _varData; | 474 _StreamControllerAddStreamState addState = _varData; |
| 475 addState.cancel(); | 475 addState.cancel(); |
| 476 } | 476 } |
| 477 _varData = null; | 477 _varData = null; |
| 478 _state = | 478 _state = |
| 479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| 480 _runGuarded(_onCancel); | 480 var future = _runGuarded(_onCancel); |
|
floitsch
2013/10/12 18:53:57
Use type.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 481 if (_doneFuture != null && _doneFuture._mayComplete) { | 481 if (_doneFuture != null && _doneFuture._mayComplete) { |
| 482 _doneFuture._asyncComplete(null); | 482 _doneFuture._asyncComplete(null); |
| 483 } | 483 } |
| 484 return future; | |
| 484 } | 485 } |
| 485 | 486 |
| 486 void _recordPause(StreamSubscription<T> subscription) { | 487 void _recordPause(StreamSubscription<T> subscription) { |
| 487 if (_isAddingStream) { | 488 if (_isAddingStream) { |
| 488 _StreamControllerAddStreamState addState = _varData; | 489 _StreamControllerAddStreamState addState = _varData; |
| 489 addState.pause(); | 490 addState.pause(); |
| 490 } | 491 } |
| 491 _runGuarded(_onPause); | 492 _runGuarded(_onPause); |
| 492 } | 493 } |
| 493 | 494 |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 536 class _AsyncStreamController<T> extends _StreamController<T> | 537 class _AsyncStreamController<T> extends _StreamController<T> |
| 537 with _AsyncStreamControllerDispatch<T> { | 538 with _AsyncStreamControllerDispatch<T> { |
| 538 final _NotificationHandler _onListen; | 539 final _NotificationHandler _onListen; |
| 539 final _NotificationHandler _onPause; | 540 final _NotificationHandler _onPause; |
| 540 final _NotificationHandler _onResume; | 541 final _NotificationHandler _onResume; |
| 541 final _NotificationHandler _onCancel; | 542 final _NotificationHandler _onCancel; |
| 542 | 543 |
| 543 _AsyncStreamController(void this._onListen(), | 544 _AsyncStreamController(void this._onListen(), |
| 544 void this._onPause(), | 545 void this._onPause(), |
| 545 void this._onResume(), | 546 void this._onResume(), |
| 546 void this._onCancel()); | 547 this._onCancel()); |
| 547 } | 548 } |
| 548 | 549 |
| 549 class _SyncStreamController<T> extends _StreamController<T> | 550 class _SyncStreamController<T> extends _StreamController<T> |
| 550 with _SyncStreamControllerDispatch<T> { | 551 with _SyncStreamControllerDispatch<T> { |
| 551 final _NotificationHandler _onListen; | 552 final _NotificationHandler _onListen; |
| 552 final _NotificationHandler _onPause; | 553 final _NotificationHandler _onPause; |
| 553 final _NotificationHandler _onResume; | 554 final _NotificationHandler _onResume; |
| 554 final _NotificationHandler _onCancel; | 555 final _NotificationHandler _onCancel; |
| 555 | 556 |
| 556 _SyncStreamController(void this._onListen(), | 557 _SyncStreamController(void this._onListen(), |
| 557 void this._onPause(), | 558 void this._onPause(), |
| 558 void this._onResume(), | 559 void this._onResume(), |
| 559 void this._onCancel()); | 560 this._onCancel()); |
| 560 } | 561 } |
| 561 | 562 |
| 562 abstract class _NoCallbacks { | 563 abstract class _NoCallbacks { |
| 563 _NotificationHandler get _onListen => null; | 564 _NotificationHandler get _onListen => null; |
| 564 _NotificationHandler get _onPause => null; | 565 _NotificationHandler get _onPause => null; |
| 565 _NotificationHandler get _onResume => null; | 566 _NotificationHandler get _onResume => null; |
| 566 _NotificationHandler get _onCancel => null; | 567 _NotificationHandler get _onCancel => null; |
| 567 } | 568 } |
| 568 | 569 |
| 569 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 570 typedef _NoCallbackAsyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
| 570 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 571 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| 571 | 572 |
| 572 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ | 573 typedef _NoCallbackSyncStreamController/*<T>*/ = _StreamController/*<T>*/ |
| 573 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; | 574 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| 574 | 575 |
| 575 typedef void _NotificationHandler(); | 576 typedef void _NotificationHandler(); |
| 576 | 577 |
| 577 void _runGuarded(_NotificationHandler notificationHandler) { | 578 _runGuarded(_NotificationHandler notificationHandler) { |
|
floitsch
2013/10/12 18:53:57
"Future" as return type ?
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 578 if (notificationHandler == null) return; | 579 if (notificationHandler == null) return null; |
| 579 try { | 580 try { |
|
Lasse Reichstein Nielsen
2013/10/14 11:32:33
Is this not just
Zone.current.runGuarded(notifica
Anders Johnsen
2013/10/16 11:52:21
Not sure, Florian?
floitsch
2013/10/16 14:43:44
Zone.current.runGuarded(notificationHandler) sound
| |
| 580 notificationHandler(); | 581 return notificationHandler(); |
| 581 } catch (e, s) { | 582 } catch (e, s) { |
| 582 Zone.current.handleUncaughtError(_asyncError(e, s)); | 583 Zone.current.handleUncaughtError(_asyncError(e, s)); |
| 583 } | 584 } |
| 584 } | 585 } |
| 585 | 586 |
| 586 class _ControllerStream<T> extends _StreamImpl<T> { | 587 class _ControllerStream<T> extends _StreamImpl<T> { |
| 587 _StreamControllerLifecycle<T> _controller; | 588 _StreamControllerLifecycle<T> _controller; |
| 588 | 589 |
| 589 _ControllerStream(this._controller); | 590 _ControllerStream(this._controller); |
| 590 | 591 |
| (...skipping 21 matching lines...) Expand all Loading... | |
| 612 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 613 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| 613 final _StreamControllerLifecycle<T> _controller; | 614 final _StreamControllerLifecycle<T> _controller; |
| 614 | 615 |
| 615 _ControllerSubscription(this._controller, | 616 _ControllerSubscription(this._controller, |
| 616 void onData(T data), | 617 void onData(T data), |
| 617 void onError(Object error), | 618 void onError(Object error), |
| 618 void onDone(), | 619 void onDone(), |
| 619 bool cancelOnError) | 620 bool cancelOnError) |
| 620 : super(onData, onError, onDone, cancelOnError); | 621 : super(onData, onError, onDone, cancelOnError); |
| 621 | 622 |
| 622 void _onCancel() { | 623 Future _onCancel() { |
| 623 _controller._recordCancel(this); | 624 return _controller._recordCancel(this); |
| 624 } | 625 } |
| 625 | 626 |
| 626 void _onPause() { | 627 void _onPause() { |
| 627 _controller._recordPause(this); | 628 _controller._recordPause(this); |
| 628 } | 629 } |
| 629 | 630 |
| 630 void _onResume() { | 631 void _onResume() { |
| 631 _controller._recordResume(this); | 632 _controller._recordResume(this); |
| 632 } | 633 } |
| 633 } | 634 } |
| 634 | 635 |
| 635 | 636 |
| 636 /** A class that exposes only the [StreamSink] interface of an object. */ | 637 /** A class that exposes only the [StreamSink] interface of an object. */ |
| 637 class _StreamSinkWrapper<T> implements StreamSink<T> { | 638 class _StreamSinkWrapper<T> implements StreamSink<T> { |
| 638 final StreamSink _target; | 639 final StreamSink _target; |
| 639 _StreamSinkWrapper(this._target); | 640 _StreamSinkWrapper(this._target); |
| 640 void add(T data) { _target.add(data); } | 641 void add(T data) { _target.add(data); } |
| 641 void addError(Object error) { _target.addError(error); } | 642 void addError(Object error) { _target.addError(error); } |
| 642 Future close() => _target.close(); | 643 Future close() => _target.close(); |
| 643 Future addStream(Stream<T> source) => _target.addStream(source); | 644 Future addStream(Stream<T> source) => _target.addStream(source); |
| 644 Future get done => _target.done; | 645 Future get done => _target.done; |
| 645 } | 646 } |
| 646 | 647 |
| 647 /** | 648 /** |
| 648 * Object containing the state used to handle [StreamController.addStream]. | 649 * Object containing the state used to handle [StreamController.addStream]. |
| 649 */ | 650 */ |
| 650 class _AddStreamState<T> { | 651 class _AddStreamState<T> { |
| 651 // [_FutureImpl] returned by call to addStream. | 652 // [_Future] returned by call to addStream. |
| 652 _Future addStreamFuture; | 653 _Future addStreamFuture; |
| 653 | 654 |
| 654 // Subscription on stream argument to addStream. | 655 // Subscription on stream argument to addStream. |
| 655 StreamSubscription addSubscription; | 656 StreamSubscription addSubscription; |
| 656 | 657 |
| 657 _AddStreamState(_EventSink<T> controller, Stream source) | 658 _AddStreamState(_EventSink<T> controller, Stream source) |
| 658 : addStreamFuture = new _Future(), | 659 : addStreamFuture = new _Future(), |
| 659 addSubscription = source.listen(controller._add, | 660 addSubscription = source.listen(controller._add, |
| 660 onError: controller._addError, | 661 onError: controller._addError, |
| 661 onDone: controller._close, | 662 onDone: controller._close, |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 686 var varData; | 687 var varData; |
| 687 | 688 |
| 688 _StreamControllerAddStreamState(_StreamController controller, | 689 _StreamControllerAddStreamState(_StreamController controller, |
| 689 this.varData, | 690 this.varData, |
| 690 Stream source) : super(controller, source) { | 691 Stream source) : super(controller, source) { |
| 691 if (controller.isPaused) { | 692 if (controller.isPaused) { |
| 692 addSubscription.pause(); | 693 addSubscription.pause(); |
| 693 } | 694 } |
| 694 } | 695 } |
| 695 } | 696 } |
| OLD | NEW |