Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 2223133002: Revert "Return futures on Stream.cancel when possible." (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
181 } 181 }
182 } 182 }
183 } 183 }
184 } 184 }
185 185
186 Future cancel() { 186 Future cancel() {
187 // The user doesn't want to receive any further events. If there is an 187 // The user doesn't want to receive any further events. If there is an
188 // error or done event pending (waiting for the cancel to be done) discard 188 // error or done event pending (waiting for the cancel to be done) discard
189 // that event. 189 // that event.
190 _state &= ~_STATE_WAIT_FOR_CANCEL; 190 _state &= ~_STATE_WAIT_FOR_CANCEL;
191 if (!_isCanceled) { 191 if (_isCanceled) return _cancelFuture;
192 _cancel(); 192 _cancel();
193 } 193 return _cancelFuture;
194 return _cancelFuture ?? Future._nullFuture;
195 } 194 }
196 195
197 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 196 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
198 _Future/*<E>*/ result = new _Future/*<E>*/(); 197 _Future/*<E>*/ result = new _Future/*<E>*/();
199 198
200 // Overwrite the onDone and onError handlers. 199 // Overwrite the onDone and onError handlers.
201 _onDone = () { result._complete(futureValue); }; 200 _onDone = () { result._complete(futureValue); };
202 _onError = (error, stackTrace) { 201 _onError = (error, stackTrace) {
203 Future cancelFuture = cancel(); 202 cancel();
204 if (!identical(cancelFuture, Future._nullFuture)) { 203 result._completeError(error, stackTrace);
205 cancelFuture.whenComplete(() {
206 result._completeError(error, stackTrace);
207 });
208 } else {
209 result._completeError(error, stackTrace);
210 }
211 }; 204 };
212 205
213 return result; 206 return result;
214 } 207 }
215 208
216 // State management. 209 // State management.
217 210
218 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; 211 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
219 bool get _isClosed => (_state & _STATE_CLOSED) != 0; 212 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
220 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 213 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after
361 } else { 354 } else {
362 _zone.runUnaryGuarded/*<dynamic, dynamic>*/( 355 _zone.runUnaryGuarded/*<dynamic, dynamic>*/(
363 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); 356 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
364 } 357 }
365 _state &= ~_STATE_IN_CALLBACK; 358 _state &= ~_STATE_IN_CALLBACK;
366 } 359 }
367 360
368 if (_cancelOnError) { 361 if (_cancelOnError) {
369 _state |= _STATE_WAIT_FOR_CANCEL; 362 _state |= _STATE_WAIT_FOR_CANCEL;
370 _cancel(); 363 _cancel();
371 if (_cancelFuture is Future && 364 if (_cancelFuture is Future) {
372 !identical(_cancelFuture, Future._nullFuture)) {
373 _cancelFuture.whenComplete(sendError); 365 _cancelFuture.whenComplete(sendError);
374 } else { 366 } else {
375 sendError(); 367 sendError();
376 } 368 }
377 } else { 369 } else {
378 sendError(); 370 sendError();
379 // Only check state if not cancelOnError. 371 // Only check state if not cancelOnError.
380 _checkState(wasInputPaused); 372 _checkState(wasInputPaused);
381 } 373 }
382 } 374 }
383 375
384 void _sendDone() { 376 void _sendDone() {
385 assert(!_isCanceled); 377 assert(!_isCanceled);
386 assert(!_isPaused); 378 assert(!_isPaused);
387 assert(!_inCallback); 379 assert(!_inCallback);
388 380
389 void sendDone() { 381 void sendDone() {
390 // If the subscription has been canceled while waiting for the cancel 382 // If the subscription has been canceled while waiting for the cancel
391 // future to finish we must not report the done event. 383 // future to finish we must not report the done event.
392 if (!_waitsForCancel) return; 384 if (!_waitsForCancel) return;
393 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); 385 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
394 _zone.runGuarded(_onDone); 386 _zone.runGuarded(_onDone);
395 _state &= ~_STATE_IN_CALLBACK; 387 _state &= ~_STATE_IN_CALLBACK;
396 } 388 }
397 389
398 _cancel(); 390 _cancel();
399 _state |= _STATE_WAIT_FOR_CANCEL; 391 _state |= _STATE_WAIT_FOR_CANCEL;
400 if (_cancelFuture is Future && 392 if (_cancelFuture is Future) {
401 !identical(_cancelFuture, Future._nullFuture)) {
402 _cancelFuture.whenComplete(sendDone); 393 _cancelFuture.whenComplete(sendDone);
403 } else { 394 } else {
404 sendDone(); 395 sendDone();
405 } 396 }
406 } 397 }
407 398
408 /** 399 /**
409 * Call a hook function. 400 * Call a hook function.
410 * 401 *
411 * The call is properly wrapped in code to avoid other callbacks 402 * The call is properly wrapped in code to avoid other callbacks
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after
780 771
781 void resume() { 772 void resume() {
782 if (isPaused) { 773 if (isPaused) {
783 _state -= _PAUSED; 774 _state -= _PAUSED;
784 if (!isPaused && !_isSent) { 775 if (!isPaused && !_isSent) {
785 _schedule(); 776 _schedule();
786 } 777 }
787 } 778 }
788 } 779 }
789 780
790 Future cancel() => Future._nullFuture; 781 Future cancel() => null;
791 782
792 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 783 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
793 _Future/*<E>*/ result = new _Future/*<E>*/(); 784 _Future/*<E>*/ result = new _Future/*<E>*/();
794 _onDone = () { result._completeWithValue(null); }; 785 _onDone = () { result._completeWithValue(null); };
795 return result; 786 return result;
796 } 787 }
797 788
798 void _sendDone() { 789 void _sendDone() {
799 _state &= ~_SCHEDULED; 790 _state &= ~_SCHEDULED;
800 if (isPaused) return; 791 if (isPaused) return;
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
918 void pause([Future resumeSignal]) { 909 void pause([Future resumeSignal]) {
919 _stream._pauseSubscription(resumeSignal); 910 _stream._pauseSubscription(resumeSignal);
920 } 911 }
921 912
922 void resume() { 913 void resume() {
923 _stream._resumeSubscription(); 914 _stream._resumeSubscription();
924 } 915 }
925 916
926 Future cancel() { 917 Future cancel() {
927 _stream._cancelSubscription(); 918 _stream._cancelSubscription();
928 return Future._nullFuture; 919 return null;
929 } 920 }
930 921
931 bool get isPaused { 922 bool get isPaused {
932 return _stream._isSubscriptionPaused; 923 return _stream._isSubscriptionPaused;
933 } 924 }
934 925
935 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 926 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
936 throw new UnsupportedError( 927 throw new UnsupportedError(
937 "Cannot change handlers of asBroadcastStream source subscription."); 928 "Cannot change handlers of asBroadcastStream source subscription.");
938 } 929 }
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
1034 /** Clears up the internal state when the iterator ends. */ 1025 /** Clears up the internal state when the iterator ends. */
1035 void _clear() { 1026 void _clear() {
1036 _subscription = null; 1027 _subscription = null;
1037 _futureOrPrefetch = null; 1028 _futureOrPrefetch = null;
1038 _current = null; 1029 _current = null;
1039 _state = _STATE_DONE; 1030 _state = _STATE_DONE;
1040 } 1031 }
1041 1032
1042 Future cancel() { 1033 Future cancel() {
1043 StreamSubscription subscription = _subscription; 1034 StreamSubscription subscription = _subscription;
1044 if (subscription == null) return Future._nullFuture; 1035 if (subscription == null) return null;
1045 if (_state == _STATE_MOVING) { 1036 if (_state == _STATE_MOVING) {
1046 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1037 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1047 _clear(); 1038 _clear();
1048 hasNext._complete(false); 1039 hasNext._complete(false);
1049 } else { 1040 } else {
1050 _clear(); 1041 _clear();
1051 } 1042 }
1052 return subscription.cancel(); 1043 return subscription.cancel();
1053 } 1044 }
1054 1045
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
1098 class _EmptyStream<T> extends Stream<T> { 1089 class _EmptyStream<T> extends Stream<T> {
1099 const _EmptyStream() : super._internal(); 1090 const _EmptyStream() : super._internal();
1100 bool get isBroadcast => true; 1091 bool get isBroadcast => true;
1101 StreamSubscription<T> listen(void onData(T data), 1092 StreamSubscription<T> listen(void onData(T data),
1102 {Function onError, 1093 {Function onError,
1103 void onDone(), 1094 void onDone(),
1104 bool cancelOnError}) { 1095 bool cancelOnError}) {
1105 return new _DoneStreamSubscription<T>(onDone); 1096 return new _DoneStreamSubscription<T>(onDone);
1106 } 1097 }
1107 } 1098 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698