OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 599 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
610 * If [this] is empty throws a [StateError]. Otherwise this method is | 610 * If [this] is empty throws a [StateError]. Otherwise this method is |
611 * equivalent to [:this.elementAt(0):] | 611 * equivalent to [:this.elementAt(0):] |
612 */ | 612 */ |
613 Future<T> get first { | 613 Future<T> get first { |
614 _FutureImpl<T> future = new _FutureImpl<T>(); | 614 _FutureImpl<T> future = new _FutureImpl<T>(); |
615 StreamSubscription subscription; | 615 StreamSubscription subscription; |
616 subscription = this.listen( | 616 subscription = this.listen( |
617 // TODO(ahe): Restore type when feature is implemented in dart2js | 617 // TODO(ahe): Restore type when feature is implemented in dart2js |
618 // checked mode. http://dartbug.com/7733 | 618 // checked mode. http://dartbug.com/7733 |
619 (/*T*/ value) { | 619 (/*T*/ value) { |
| 620 subscription.cancel(); |
620 future._setValue(value); | 621 future._setValue(value); |
621 subscription.cancel(); | |
622 return; | 622 return; |
623 }, | 623 }, |
624 onError: future._setError, | 624 onError: future._setError, |
625 onDone: () { | 625 onDone: () { |
626 future._setError(new AsyncError(new StateError("No elements"))); | 626 future._setError(new AsyncError(new StateError("No elements"))); |
627 }, | 627 }, |
628 unsubscribeOnError: true); | 628 unsubscribeOnError: true); |
629 return future; | 629 return future; |
630 } | 630 } |
631 | 631 |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
666 Future<T> get single { | 666 Future<T> get single { |
667 _FutureImpl<T> future = new _FutureImpl<T>(); | 667 _FutureImpl<T> future = new _FutureImpl<T>(); |
668 T result = null; | 668 T result = null; |
669 bool foundResult = false; | 669 bool foundResult = false; |
670 StreamSubscription subscription; | 670 StreamSubscription subscription; |
671 subscription = this.listen( | 671 subscription = this.listen( |
672 // TODO(ahe): Restore type when feature is implemented in dart2js | 672 // TODO(ahe): Restore type when feature is implemented in dart2js |
673 // checked mode. http://dartbug.com/7733 | 673 // checked mode. http://dartbug.com/7733 |
674 (/*T*/ value) { | 674 (/*T*/ value) { |
675 if (foundResult) { | 675 if (foundResult) { |
| 676 subscription.cancel(); |
676 // This is the second element we get. | 677 // This is the second element we get. |
677 Error error = new StateError("More than one element"); | 678 Error error = new StateError("More than one element"); |
678 future._setError(new AsyncError(error)); | 679 future._setError(new AsyncError(error)); |
679 subscription.cancel(); | |
680 return; | 680 return; |
681 } | 681 } |
682 foundResult = true; | 682 foundResult = true; |
683 result = value; | 683 result = value; |
684 }, | 684 }, |
685 onError: future._setError, | 685 onError: future._setError, |
686 onDone: () { | 686 onDone: () { |
687 if (foundResult) { | 687 if (foundResult) { |
688 future._setValue(result); | 688 future._setValue(result); |
689 return; | 689 return; |
(...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
838 */ | 838 */ |
839 Future<T> elementAt(int index) { | 839 Future<T> elementAt(int index) { |
840 if (index is! int || index < 0) throw new ArgumentError(index); | 840 if (index is! int || index < 0) throw new ArgumentError(index); |
841 _FutureImpl<T> future = new _FutureImpl<T>(); | 841 _FutureImpl<T> future = new _FutureImpl<T>(); |
842 StreamSubscription subscription; | 842 StreamSubscription subscription; |
843 subscription = this.listen( | 843 subscription = this.listen( |
844 // TODO(ahe): Restore type when feature is implemented in dart2js | 844 // TODO(ahe): Restore type when feature is implemented in dart2js |
845 // checked mode. http://dartbug.com/7733 | 845 // checked mode. http://dartbug.com/7733 |
846 (/*T*/ value) { | 846 (/*T*/ value) { |
847 if (index == 0) { | 847 if (index == 0) { |
| 848 subscription.cancel(); |
848 future._setValue(value); | 849 future._setValue(value); |
849 subscription.cancel(); | |
850 return; | 850 return; |
851 } | 851 } |
852 index -= 1; | 852 index -= 1; |
853 }, | 853 }, |
854 onError: future._setError, | 854 onError: future._setError, |
855 onDone: () { | 855 onDone: () { |
856 future._setError(new AsyncError( | 856 future._setError(new AsyncError( |
857 new StateError("Not enough elements for elementAt"))); | 857 new StateError("Not enough elements for elementAt"))); |
858 }, | 858 }, |
859 unsubscribeOnError: true); | 859 unsubscribeOnError: true); |
(...skipping 264 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1124 void onError(AsyncError error), | 1124 void onError(AsyncError error), |
1125 void onDone(), | 1125 void onDone(), |
1126 this._unsubscribeOnError) | 1126 this._unsubscribeOnError) |
1127 : super(onData, onError, onDone) { | 1127 : super(onData, onError, onDone) { |
1128 _sink = new _EventOutputSinkWrapper<T>(this); | 1128 _sink = new _EventOutputSinkWrapper<T>(this); |
1129 _subscription = source.listen(_handleData, | 1129 _subscription = source.listen(_handleData, |
1130 onError: _handleError, | 1130 onError: _handleError, |
1131 onDone: _handleDone); | 1131 onDone: _handleDone); |
1132 } | 1132 } |
1133 | 1133 |
| 1134 /** Whether this subscription is still subscribed to its source. */ |
| 1135 bool get _isSubscribed => _subscription != null; |
| 1136 |
1134 void pause([Future pauseSignal]) { | 1137 void pause([Future pauseSignal]) { |
1135 if (_subscription != null) _subscription.pause(pauseSignal); | 1138 if (_isSubscribed) _subscription.pause(pauseSignal); |
1136 } | 1139 } |
1137 | 1140 |
1138 void resume() { | 1141 void resume() { |
1139 if (_subscription != null) _subscription.resume(); | 1142 if (_isSubscribed) _subscription.resume(); |
1140 } | 1143 } |
1141 | 1144 |
1142 void cancel() { | 1145 void cancel() { |
1143 if (_subscription != null) { | 1146 if (_isSubscribed) { |
1144 _subscription.cancel(); | 1147 _subscription.cancel(); |
1145 _subscription = null; | 1148 _subscription = null; |
1146 } | 1149 } |
1147 } | 1150 } |
1148 | 1151 |
1149 void _handleData(S data) { | 1152 void _handleData(S data) { |
1150 try { | 1153 try { |
1151 _transformer.handleData(data, _sink); | 1154 _transformer.handleData(data, _sink); |
1152 } catch (e, s) { | 1155 } catch (e, s) { |
1153 _sendError(_asyncError(e, s)); | 1156 _sendError(_asyncError(e, s)); |
1154 } | 1157 } |
1155 } | 1158 } |
1156 | 1159 |
1157 void _handleError(AsyncError error) { | 1160 void _handleError(AsyncError error) { |
1158 try { | 1161 try { |
1159 _transformer.handleError(error, _sink); | 1162 _transformer.handleError(error, _sink); |
1160 } catch (e, s) { | 1163 } catch (e, s) { |
1161 _sendError(_asyncError(e, s, error)); | 1164 _sendError(_asyncError(e, s, error)); |
1162 } | 1165 } |
1163 } | 1166 } |
1164 | 1167 |
1165 void _handleDone() { | 1168 void _handleDone() { |
1166 _subscription = null; | |
1167 try { | 1169 try { |
1168 _transformer.handleDone(_sink); | 1170 _transformer.handleDone(_sink); |
1169 } catch (e, s) { | 1171 } catch (e, s) { |
1170 _sendError(_asyncError(e, s)); | 1172 _sendError(_asyncError(e, s)); |
1171 } | 1173 } |
1172 } | 1174 } |
1173 | 1175 |
1174 // EventOutputSink interface. | 1176 // EventOutputSink interface. |
1175 void _sendData(T data) { | 1177 void _sendData(T data) { |
| 1178 if (!_isSubscribed) return; |
1176 _onData(data); | 1179 _onData(data); |
1177 } | 1180 } |
1178 | 1181 |
1179 void _sendError(AsyncError error) { | 1182 void _sendError(AsyncError error) { |
| 1183 if (!_isSubscribed) return; |
1180 _onError(error); | 1184 _onError(error); |
1181 if (_unsubscribeOnError) { | 1185 if (_unsubscribeOnError) { |
1182 cancel(); | 1186 cancel(); |
1183 } | 1187 } |
1184 } | 1188 } |
1185 | 1189 |
1186 void _sendDone() { | 1190 void _sendDone() { |
1187 // It's ok to cancel even if we have been unsubscribed already. | 1191 if (!_isSubscribed) return; |
1188 cancel(); | 1192 _subscription.cancel(); |
| 1193 _subscription = null; |
1189 _onDone(); | 1194 _onDone(); |
1190 } | 1195 } |
1191 } | 1196 } |
1192 | 1197 |
1193 /* TODO(8997): Implement EventSink instead, */ | 1198 /* TODO(8997): Implement EventSink instead, */ |
1194 class _EventOutputSinkWrapper<T> extends StreamSink<T> { | 1199 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
1195 _EventOutputSink _sink; | 1200 _EventOutputSink _sink; |
1196 _EventOutputSinkWrapper(this._sink); | 1201 _EventOutputSinkWrapper(this._sink); |
1197 | 1202 |
1198 void add(T data) { _sink._sendData(data); } | 1203 void add(T data) { _sink._sendData(data); } |
1199 void addError(AsyncError error) { _sink._sendError(error); } | 1204 void addError(AsyncError error) { _sink._sendError(error); } |
1200 void close() { _sink._sendDone(); } | 1205 void close() { _sink._sendDone(); } |
1201 } | 1206 } |
OLD | NEW |