| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 599 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 610 * If [this] is empty throws a [StateError]. Otherwise this method is | 610 * If [this] is empty throws a [StateError]. Otherwise this method is |
| 611 * equivalent to [:this.elementAt(0):] | 611 * equivalent to [:this.elementAt(0):] |
| 612 */ | 612 */ |
| 613 Future<T> get first { | 613 Future<T> get first { |
| 614 _FutureImpl<T> future = new _FutureImpl<T>(); | 614 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 615 StreamSubscription subscription; | 615 StreamSubscription subscription; |
| 616 subscription = this.listen( | 616 subscription = this.listen( |
| 617 // TODO(ahe): Restore type when feature is implemented in dart2js | 617 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 618 // checked mode. http://dartbug.com/7733 | 618 // checked mode. http://dartbug.com/7733 |
| 619 (/*T*/ value) { | 619 (/*T*/ value) { |
| 620 subscription.cancel(); |
| 620 future._setValue(value); | 621 future._setValue(value); |
| 621 subscription.cancel(); | |
| 622 return; | 622 return; |
| 623 }, | 623 }, |
| 624 onError: future._setError, | 624 onError: future._setError, |
| 625 onDone: () { | 625 onDone: () { |
| 626 future._setError(new AsyncError(new StateError("No elements"))); | 626 future._setError(new AsyncError(new StateError("No elements"))); |
| 627 }, | 627 }, |
| 628 unsubscribeOnError: true); | 628 unsubscribeOnError: true); |
| 629 return future; | 629 return future; |
| 630 } | 630 } |
| 631 | 631 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 666 Future<T> get single { | 666 Future<T> get single { |
| 667 _FutureImpl<T> future = new _FutureImpl<T>(); | 667 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 668 T result = null; | 668 T result = null; |
| 669 bool foundResult = false; | 669 bool foundResult = false; |
| 670 StreamSubscription subscription; | 670 StreamSubscription subscription; |
| 671 subscription = this.listen( | 671 subscription = this.listen( |
| 672 // TODO(ahe): Restore type when feature is implemented in dart2js | 672 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 673 // checked mode. http://dartbug.com/7733 | 673 // checked mode. http://dartbug.com/7733 |
| 674 (/*T*/ value) { | 674 (/*T*/ value) { |
| 675 if (foundResult) { | 675 if (foundResult) { |
| 676 subscription.cancel(); |
| 676 // This is the second element we get. | 677 // This is the second element we get. |
| 677 Error error = new StateError("More than one element"); | 678 Error error = new StateError("More than one element"); |
| 678 future._setError(new AsyncError(error)); | 679 future._setError(new AsyncError(error)); |
| 679 subscription.cancel(); | |
| 680 return; | 680 return; |
| 681 } | 681 } |
| 682 foundResult = true; | 682 foundResult = true; |
| 683 result = value; | 683 result = value; |
| 684 }, | 684 }, |
| 685 onError: future._setError, | 685 onError: future._setError, |
| 686 onDone: () { | 686 onDone: () { |
| 687 if (foundResult) { | 687 if (foundResult) { |
| 688 future._setValue(result); | 688 future._setValue(result); |
| 689 return; | 689 return; |
| (...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 838 */ | 838 */ |
| 839 Future<T> elementAt(int index) { | 839 Future<T> elementAt(int index) { |
| 840 if (index is! int || index < 0) throw new ArgumentError(index); | 840 if (index is! int || index < 0) throw new ArgumentError(index); |
| 841 _FutureImpl<T> future = new _FutureImpl<T>(); | 841 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 842 StreamSubscription subscription; | 842 StreamSubscription subscription; |
| 843 subscription = this.listen( | 843 subscription = this.listen( |
| 844 // TODO(ahe): Restore type when feature is implemented in dart2js | 844 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 845 // checked mode. http://dartbug.com/7733 | 845 // checked mode. http://dartbug.com/7733 |
| 846 (/*T*/ value) { | 846 (/*T*/ value) { |
| 847 if (index == 0) { | 847 if (index == 0) { |
| 848 subscription.cancel(); |
| 848 future._setValue(value); | 849 future._setValue(value); |
| 849 subscription.cancel(); | |
| 850 return; | 850 return; |
| 851 } | 851 } |
| 852 index -= 1; | 852 index -= 1; |
| 853 }, | 853 }, |
| 854 onError: future._setError, | 854 onError: future._setError, |
| 855 onDone: () { | 855 onDone: () { |
| 856 future._setError(new AsyncError( | 856 future._setError(new AsyncError( |
| 857 new StateError("Not enough elements for elementAt"))); | 857 new StateError("Not enough elements for elementAt"))); |
| 858 }, | 858 }, |
| 859 unsubscribeOnError: true); | 859 unsubscribeOnError: true); |
| (...skipping 264 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1124 void onError(AsyncError error), | 1124 void onError(AsyncError error), |
| 1125 void onDone(), | 1125 void onDone(), |
| 1126 this._unsubscribeOnError) | 1126 this._unsubscribeOnError) |
| 1127 : super(onData, onError, onDone) { | 1127 : super(onData, onError, onDone) { |
| 1128 _sink = new _EventOutputSinkWrapper<T>(this); | 1128 _sink = new _EventOutputSinkWrapper<T>(this); |
| 1129 _subscription = source.listen(_handleData, | 1129 _subscription = source.listen(_handleData, |
| 1130 onError: _handleError, | 1130 onError: _handleError, |
| 1131 onDone: _handleDone); | 1131 onDone: _handleDone); |
| 1132 } | 1132 } |
| 1133 | 1133 |
| 1134 /** Whether this subscription is still subscribed to its source. */ |
| 1135 bool get _isSubscribed => _subscription != null; |
| 1136 |
| 1134 void pause([Future pauseSignal]) { | 1137 void pause([Future pauseSignal]) { |
| 1135 if (_subscription != null) _subscription.pause(pauseSignal); | 1138 if (_isSubscribed) _subscription.pause(pauseSignal); |
| 1136 } | 1139 } |
| 1137 | 1140 |
| 1138 void resume() { | 1141 void resume() { |
| 1139 if (_subscription != null) _subscription.resume(); | 1142 if (_isSubscribed) _subscription.resume(); |
| 1140 } | 1143 } |
| 1141 | 1144 |
| 1142 void cancel() { | 1145 void cancel() { |
| 1143 if (_subscription != null) { | 1146 if (_isSubscribed) { |
| 1144 _subscription.cancel(); | 1147 _subscription.cancel(); |
| 1145 _subscription = null; | 1148 _subscription = null; |
| 1146 } | 1149 } |
| 1147 } | 1150 } |
| 1148 | 1151 |
| 1149 void _handleData(S data) { | 1152 void _handleData(S data) { |
| 1150 try { | 1153 try { |
| 1151 _transformer.handleData(data, _sink); | 1154 _transformer.handleData(data, _sink); |
| 1152 } catch (e, s) { | 1155 } catch (e, s) { |
| 1153 _sendError(_asyncError(e, s)); | 1156 _sendError(_asyncError(e, s)); |
| 1154 } | 1157 } |
| 1155 } | 1158 } |
| 1156 | 1159 |
| 1157 void _handleError(AsyncError error) { | 1160 void _handleError(AsyncError error) { |
| 1158 try { | 1161 try { |
| 1159 _transformer.handleError(error, _sink); | 1162 _transformer.handleError(error, _sink); |
| 1160 } catch (e, s) { | 1163 } catch (e, s) { |
| 1161 _sendError(_asyncError(e, s, error)); | 1164 _sendError(_asyncError(e, s, error)); |
| 1162 } | 1165 } |
| 1163 } | 1166 } |
| 1164 | 1167 |
| 1165 void _handleDone() { | 1168 void _handleDone() { |
| 1166 _subscription = null; | |
| 1167 try { | 1169 try { |
| 1168 _transformer.handleDone(_sink); | 1170 _transformer.handleDone(_sink); |
| 1169 } catch (e, s) { | 1171 } catch (e, s) { |
| 1170 _sendError(_asyncError(e, s)); | 1172 _sendError(_asyncError(e, s)); |
| 1171 } | 1173 } |
| 1172 } | 1174 } |
| 1173 | 1175 |
| 1174 // EventOutputSink interface. | 1176 // EventOutputSink interface. |
| 1175 void _sendData(T data) { | 1177 void _sendData(T data) { |
| 1178 if (!_isSubscribed) return; |
| 1176 _onData(data); | 1179 _onData(data); |
| 1177 } | 1180 } |
| 1178 | 1181 |
| 1179 void _sendError(AsyncError error) { | 1182 void _sendError(AsyncError error) { |
| 1183 if (!_isSubscribed) return; |
| 1180 _onError(error); | 1184 _onError(error); |
| 1181 if (_unsubscribeOnError) { | 1185 if (_unsubscribeOnError) { |
| 1182 cancel(); | 1186 cancel(); |
| 1183 } | 1187 } |
| 1184 } | 1188 } |
| 1185 | 1189 |
| 1186 void _sendDone() { | 1190 void _sendDone() { |
| 1187 // It's ok to cancel even if we have been unsubscribed already. | 1191 if (!_isSubscribed) return; |
| 1188 cancel(); | 1192 _subscription.cancel(); |
| 1193 _subscription = null; |
| 1189 _onDone(); | 1194 _onDone(); |
| 1190 } | 1195 } |
| 1191 } | 1196 } |
| 1192 | 1197 |
| 1193 /* TODO(8997): Implement EventSink instead, */ | 1198 /* TODO(8997): Implement EventSink instead, */ |
| 1194 class _EventOutputSinkWrapper<T> extends StreamSink<T> { | 1199 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
| 1195 _EventOutputSink _sink; | 1200 _EventOutputSink _sink; |
| 1196 _EventOutputSinkWrapper(this._sink); | 1201 _EventOutputSinkWrapper(this._sink); |
| 1197 | 1202 |
| 1198 void add(T data) { _sink._sendData(data); } | 1203 void add(T data) { _sink._sendData(data); } |
| 1199 void addError(AsyncError error) { _sink._sendError(error); } | 1204 void addError(AsyncError error) { _sink._sendError(error); } |
| 1200 void close() { _sink._sendDone(); } | 1205 void close() { _sink._sendDone(); } |
| 1201 } | 1206 } |
| OLD | NEW |