OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 821 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
832 onError: _controller.addError, | 832 onError: _controller.addError, |
833 onDone: _controller.close); | 833 onDone: _controller.close); |
834 } | 834 } |
835 cancelOnError = identical(true, cancelOnError); | 835 cancelOnError = identical(true, cancelOnError); |
836 return _controller._subscribe(onData, onError, onDone, cancelOnError); | 836 return _controller._subscribe(onData, onError, onDone, cancelOnError); |
837 } | 837 } |
838 | 838 |
839 void _onCancel() { | 839 void _onCancel() { |
840 bool shutdown = (_controller == null) || _controller.isClosed; | 840 bool shutdown = (_controller == null) || _controller.isClosed; |
841 if (_onCancelHandler != null) { | 841 if (_onCancelHandler != null) { |
842 _zone.runUnary(_onCancelHandler, new _BroadcastSubscriptionWrapper(this)); | 842 _zone.runUnary( |
| 843 _onCancelHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
843 } | 844 } |
844 if (shutdown) { | 845 if (shutdown) { |
845 if (_subscription != null) { | 846 if (_subscription != null) { |
846 _subscription.cancel(); | 847 _subscription.cancel(); |
847 _subscription = null; | 848 _subscription = null; |
848 } | 849 } |
849 } | 850 } |
850 } | 851 } |
851 | 852 |
852 void _onListen() { | 853 void _onListen() { |
853 if (_onListenHandler != null) { | 854 if (_onListenHandler != null) { |
854 _zone.runUnary(_onListenHandler, new _BroadcastSubscriptionWrapper(this)); | 855 _zone.runUnary( |
| 856 _onListenHandler, new _BroadcastSubscriptionWrapper<T>(this)); |
855 } | 857 } |
856 } | 858 } |
857 | 859 |
858 // Methods called from _BroadcastSubscriptionWrapper. | 860 // Methods called from _BroadcastSubscriptionWrapper. |
859 void _cancelSubscription() { | 861 void _cancelSubscription() { |
860 if (_subscription == null) return; | 862 if (_subscription == null) return; |
861 // Called by [_controller] when it has no subscribers left. | 863 // Called by [_controller] when it has no subscribers left. |
862 StreamSubscription subscription = _subscription; | 864 StreamSubscription subscription = _subscription; |
863 _subscription = null; | 865 _subscription = null; |
864 _controller = null; // Marks the stream as no longer listenable. | 866 _controller = null; // Marks the stream as no longer listenable. |
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1086 class _EmptyStream<T> extends Stream<T> { | 1088 class _EmptyStream<T> extends Stream<T> { |
1087 const _EmptyStream() : super._internal(); | 1089 const _EmptyStream() : super._internal(); |
1088 bool get isBroadcast => true; | 1090 bool get isBroadcast => true; |
1089 StreamSubscription<T> listen(void onData(T data), | 1091 StreamSubscription<T> listen(void onData(T data), |
1090 {Function onError, | 1092 {Function onError, |
1091 void onDone(), | 1093 void onDone(), |
1092 bool cancelOnError}) { | 1094 bool cancelOnError}) { |
1093 return new _DoneStreamSubscription<T>(onDone); | 1095 return new _DoneStreamSubscription<T>(onDone); |
1094 } | 1096 } |
1095 } | 1097 } |
OLD | NEW |