Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 13625003: Make EventTransformerStream's subscription not send events after cancel. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/lib/async/first_regression_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 599 matching lines...) Expand 10 before | Expand all | Expand 10 after
610 * If [this] is empty throws a [StateError]. Otherwise this method is 610 * If [this] is empty throws a [StateError]. Otherwise this method is
611 * equivalent to [:this.elementAt(0):] 611 * equivalent to [:this.elementAt(0):]
612 */ 612 */
613 Future<T> get first { 613 Future<T> get first {
614 _FutureImpl<T> future = new _FutureImpl<T>(); 614 _FutureImpl<T> future = new _FutureImpl<T>();
615 StreamSubscription subscription; 615 StreamSubscription subscription;
616 subscription = this.listen( 616 subscription = this.listen(
617 // TODO(ahe): Restore type when feature is implemented in dart2js 617 // TODO(ahe): Restore type when feature is implemented in dart2js
618 // checked mode. http://dartbug.com/7733 618 // checked mode. http://dartbug.com/7733
619 (/*T*/ value) { 619 (/*T*/ value) {
620 subscription.cancel();
620 future._setValue(value); 621 future._setValue(value);
621 subscription.cancel();
622 return; 622 return;
623 }, 623 },
624 onError: future._setError, 624 onError: future._setError,
625 onDone: () { 625 onDone: () {
626 future._setError(new AsyncError(new StateError("No elements"))); 626 future._setError(new AsyncError(new StateError("No elements")));
627 }, 627 },
628 unsubscribeOnError: true); 628 unsubscribeOnError: true);
629 return future; 629 return future;
630 } 630 }
631 631
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
666 Future<T> get single { 666 Future<T> get single {
667 _FutureImpl<T> future = new _FutureImpl<T>(); 667 _FutureImpl<T> future = new _FutureImpl<T>();
668 T result = null; 668 T result = null;
669 bool foundResult = false; 669 bool foundResult = false;
670 StreamSubscription subscription; 670 StreamSubscription subscription;
671 subscription = this.listen( 671 subscription = this.listen(
672 // TODO(ahe): Restore type when feature is implemented in dart2js 672 // TODO(ahe): Restore type when feature is implemented in dart2js
673 // checked mode. http://dartbug.com/7733 673 // checked mode. http://dartbug.com/7733
674 (/*T*/ value) { 674 (/*T*/ value) {
675 if (foundResult) { 675 if (foundResult) {
676 subscription.cancel();
676 // This is the second element we get. 677 // This is the second element we get.
677 Error error = new StateError("More than one element"); 678 Error error = new StateError("More than one element");
678 future._setError(new AsyncError(error)); 679 future._setError(new AsyncError(error));
679 subscription.cancel();
680 return; 680 return;
681 } 681 }
682 foundResult = true; 682 foundResult = true;
683 result = value; 683 result = value;
684 }, 684 },
685 onError: future._setError, 685 onError: future._setError,
686 onDone: () { 686 onDone: () {
687 if (foundResult) { 687 if (foundResult) {
688 future._setValue(result); 688 future._setValue(result);
689 return; 689 return;
(...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after
838 */ 838 */
839 Future<T> elementAt(int index) { 839 Future<T> elementAt(int index) {
840 if (index is! int || index < 0) throw new ArgumentError(index); 840 if (index is! int || index < 0) throw new ArgumentError(index);
841 _FutureImpl<T> future = new _FutureImpl<T>(); 841 _FutureImpl<T> future = new _FutureImpl<T>();
842 StreamSubscription subscription; 842 StreamSubscription subscription;
843 subscription = this.listen( 843 subscription = this.listen(
844 // TODO(ahe): Restore type when feature is implemented in dart2js 844 // TODO(ahe): Restore type when feature is implemented in dart2js
845 // checked mode. http://dartbug.com/7733 845 // checked mode. http://dartbug.com/7733
846 (/*T*/ value) { 846 (/*T*/ value) {
847 if (index == 0) { 847 if (index == 0) {
848 subscription.cancel();
848 future._setValue(value); 849 future._setValue(value);
849 subscription.cancel();
850 return; 850 return;
851 } 851 }
852 index -= 1; 852 index -= 1;
853 }, 853 },
854 onError: future._setError, 854 onError: future._setError,
855 onDone: () { 855 onDone: () {
856 future._setError(new AsyncError( 856 future._setError(new AsyncError(
857 new StateError("Not enough elements for elementAt"))); 857 new StateError("Not enough elements for elementAt")));
858 }, 858 },
859 unsubscribeOnError: true); 859 unsubscribeOnError: true);
(...skipping 264 matching lines...) Expand 10 before | Expand all | Expand 10 after
1124 void onError(AsyncError error), 1124 void onError(AsyncError error),
1125 void onDone(), 1125 void onDone(),
1126 this._unsubscribeOnError) 1126 this._unsubscribeOnError)
1127 : super(onData, onError, onDone) { 1127 : super(onData, onError, onDone) {
1128 _sink = new _EventOutputSinkWrapper<T>(this); 1128 _sink = new _EventOutputSinkWrapper<T>(this);
1129 _subscription = source.listen(_handleData, 1129 _subscription = source.listen(_handleData,
1130 onError: _handleError, 1130 onError: _handleError,
1131 onDone: _handleDone); 1131 onDone: _handleDone);
1132 } 1132 }
1133 1133
1134 /** Whether this subscription is still subscribed to its source. */
1135 bool get _isSubscribed => _subscription != null;
1136
1134 void pause([Future pauseSignal]) { 1137 void pause([Future pauseSignal]) {
1135 if (_subscription != null) _subscription.pause(pauseSignal); 1138 if (_isSubscribed) _subscription.pause(pauseSignal);
1136 } 1139 }
1137 1140
1138 void resume() { 1141 void resume() {
1139 if (_subscription != null) _subscription.resume(); 1142 if (_isSubscribed) _subscription.resume();
1140 } 1143 }
1141 1144
1142 void cancel() { 1145 void cancel() {
1143 if (_subscription != null) { 1146 if (_isSubscribed) {
1144 _subscription.cancel(); 1147 _subscription.cancel();
1145 _subscription = null; 1148 _subscription = null;
1146 } 1149 }
1147 } 1150 }
1148 1151
1149 void _handleData(S data) { 1152 void _handleData(S data) {
1150 try { 1153 try {
1151 _transformer.handleData(data, _sink); 1154 _transformer.handleData(data, _sink);
1152 } catch (e, s) { 1155 } catch (e, s) {
1153 _sendError(_asyncError(e, s)); 1156 _sendError(_asyncError(e, s));
1154 } 1157 }
1155 } 1158 }
1156 1159
1157 void _handleError(AsyncError error) { 1160 void _handleError(AsyncError error) {
1158 try { 1161 try {
1159 _transformer.handleError(error, _sink); 1162 _transformer.handleError(error, _sink);
1160 } catch (e, s) { 1163 } catch (e, s) {
1161 _sendError(_asyncError(e, s, error)); 1164 _sendError(_asyncError(e, s, error));
1162 } 1165 }
1163 } 1166 }
1164 1167
1165 void _handleDone() { 1168 void _handleDone() {
1166 _subscription = null;
1167 try { 1169 try {
1168 _transformer.handleDone(_sink); 1170 _transformer.handleDone(_sink);
1169 } catch (e, s) { 1171 } catch (e, s) {
1170 _sendError(_asyncError(e, s)); 1172 _sendError(_asyncError(e, s));
1171 } 1173 }
1172 } 1174 }
1173 1175
1174 // EventOutputSink interface. 1176 // EventOutputSink interface.
1175 void _sendData(T data) { 1177 void _sendData(T data) {
1178 if (!_isSubscribed) return;
1176 _onData(data); 1179 _onData(data);
1177 } 1180 }
1178 1181
1179 void _sendError(AsyncError error) { 1182 void _sendError(AsyncError error) {
1183 if (!_isSubscribed) return;
1180 _onError(error); 1184 _onError(error);
1181 if (_unsubscribeOnError) { 1185 if (_unsubscribeOnError) {
1182 cancel(); 1186 cancel();
1183 } 1187 }
1184 } 1188 }
1185 1189
1186 void _sendDone() { 1190 void _sendDone() {
1187 // It's ok to cancel even if we have been unsubscribed already. 1191 if (!_isSubscribed) return;
1188 cancel(); 1192 _subscription.cancel();
1193 _subscription = null;
1189 _onDone(); 1194 _onDone();
1190 } 1195 }
1191 } 1196 }
1192 1197
1193 /* TODO(8997): Implement EventSink instead, */ 1198 /* TODO(8997): Implement EventSink instead, */
1194 class _EventOutputSinkWrapper<T> extends StreamSink<T> { 1199 class _EventOutputSinkWrapper<T> extends StreamSink<T> {
1195 _EventOutputSink _sink; 1200 _EventOutputSink _sink;
1196 _EventOutputSinkWrapper(this._sink); 1201 _EventOutputSinkWrapper(this._sink);
1197 1202
1198 void add(T data) { _sink._sendData(data); } 1203 void add(T data) { _sink._sendData(data); }
1199 void addError(AsyncError error) { _sink._sendError(error); } 1204 void addError(AsyncError error) { _sink._sendError(error); }
1200 void close() { _sink._sendDone(); } 1205 void close() { _sink._sendDone(); }
1201 } 1206 }
OLDNEW
« no previous file with comments | « no previous file | tests/lib/async/first_regression_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698