Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(206)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 2202533003: Return futures on Stream.cancel when possible. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Don't make Pipe.cancel wait for the null future. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 170 matching lines...) Expand 10 before | Expand all | Expand 10 after
181 } 181 }
182 } 182 }
183 } 183 }
184 } 184 }
185 185
186 Future cancel() { 186 Future cancel() {
187 // The user doesn't want to receive any further events. If there is an 187 // The user doesn't want to receive any further events. If there is an
188 // error or done event pending (waiting for the cancel to be done) discard 188 // error or done event pending (waiting for the cancel to be done) discard
189 // that event. 189 // that event.
190 _state &= ~_STATE_WAIT_FOR_CANCEL; 190 _state &= ~_STATE_WAIT_FOR_CANCEL;
191 if (_isCanceled) return _cancelFuture; 191 if (!_isCanceled) {
192 _cancel(); 192 _cancel();
193 return _cancelFuture; 193 }
194 return _cancelFuture ?? Future._nullFuture;
194 } 195 }
195 196
196 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 197 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
197 _Future/*<E>*/ result = new _Future/*<E>*/(); 198 _Future/*<E>*/ result = new _Future/*<E>*/();
198 199
199 // Overwrite the onDone and onError handlers. 200 // Overwrite the onDone and onError handlers.
200 _onDone = () { result._complete(futureValue); }; 201 _onDone = () { result._complete(futureValue); };
201 _onError = (error, stackTrace) { 202 _onError = (error, stackTrace) {
202 cancel(); 203 Future cancelFuture = cancel();
203 result._completeError(error, stackTrace); 204 if (!identical(cancelFuture, Future._nullFuture)) {
205 cancelFuture.whenComplete(() {
206 result._completeError(error, stackTrace);
207 });
208 } else {
209 result._completeError(error, stackTrace);
210 }
204 }; 211 };
205 212
206 return result; 213 return result;
207 } 214 }
208 215
209 // State management. 216 // State management.
210 217
211 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; 218 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
212 bool get _isClosed => (_state & _STATE_CLOSED) != 0; 219 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
213 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 220 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after
354 } else { 361 } else {
355 _zone.runUnaryGuarded/*<dynamic, dynamic>*/( 362 _zone.runUnaryGuarded/*<dynamic, dynamic>*/(
356 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); 363 _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error);
357 } 364 }
358 _state &= ~_STATE_IN_CALLBACK; 365 _state &= ~_STATE_IN_CALLBACK;
359 } 366 }
360 367
361 if (_cancelOnError) { 368 if (_cancelOnError) {
362 _state |= _STATE_WAIT_FOR_CANCEL; 369 _state |= _STATE_WAIT_FOR_CANCEL;
363 _cancel(); 370 _cancel();
364 if (_cancelFuture is Future) { 371 if (_cancelFuture is Future &&
372 !identical(_cancelFuture, Future._nullFuture)) {
365 _cancelFuture.whenComplete(sendError); 373 _cancelFuture.whenComplete(sendError);
366 } else { 374 } else {
367 sendError(); 375 sendError();
368 } 376 }
369 } else { 377 } else {
370 sendError(); 378 sendError();
371 // Only check state if not cancelOnError. 379 // Only check state if not cancelOnError.
372 _checkState(wasInputPaused); 380 _checkState(wasInputPaused);
373 } 381 }
374 } 382 }
375 383
376 void _sendDone() { 384 void _sendDone() {
377 assert(!_isCanceled); 385 assert(!_isCanceled);
378 assert(!_isPaused); 386 assert(!_isPaused);
379 assert(!_inCallback); 387 assert(!_inCallback);
380 388
381 void sendDone() { 389 void sendDone() {
382 // If the subscription has been canceled while waiting for the cancel 390 // If the subscription has been canceled while waiting for the cancel
383 // future to finish we must not report the done event. 391 // future to finish we must not report the done event.
384 if (!_waitsForCancel) return; 392 if (!_waitsForCancel) return;
385 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); 393 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
386 _zone.runGuarded(_onDone); 394 _zone.runGuarded(_onDone);
387 _state &= ~_STATE_IN_CALLBACK; 395 _state &= ~_STATE_IN_CALLBACK;
388 } 396 }
389 397
390 _cancel(); 398 _cancel();
391 _state |= _STATE_WAIT_FOR_CANCEL; 399 _state |= _STATE_WAIT_FOR_CANCEL;
392 if (_cancelFuture is Future) { 400 if (_cancelFuture is Future &&
401 !identical(_cancelFuture, Future._nullFuture)) {
393 _cancelFuture.whenComplete(sendDone); 402 _cancelFuture.whenComplete(sendDone);
394 } else { 403 } else {
395 sendDone(); 404 sendDone();
396 } 405 }
397 } 406 }
398 407
399 /** 408 /**
400 * Call a hook function. 409 * Call a hook function.
401 * 410 *
402 * The call is properly wrapped in code to avoid other callbacks 411 * The call is properly wrapped in code to avoid other callbacks
(...skipping 368 matching lines...) Expand 10 before | Expand all | Expand 10 after
771 780
772 void resume() { 781 void resume() {
773 if (isPaused) { 782 if (isPaused) {
774 _state -= _PAUSED; 783 _state -= _PAUSED;
775 if (!isPaused && !_isSent) { 784 if (!isPaused && !_isSent) {
776 _schedule(); 785 _schedule();
777 } 786 }
778 } 787 }
779 } 788 }
780 789
781 Future cancel() => null; 790 Future cancel() => Future._nullFuture;
782 791
783 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 792 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
784 _Future/*<E>*/ result = new _Future/*<E>*/(); 793 _Future/*<E>*/ result = new _Future/*<E>*/();
785 _onDone = () { result._completeWithValue(null); }; 794 _onDone = () { result._completeWithValue(null); };
786 return result; 795 return result;
787 } 796 }
788 797
789 void _sendDone() { 798 void _sendDone() {
790 _state &= ~_SCHEDULED; 799 _state &= ~_SCHEDULED;
791 if (isPaused) return; 800 if (isPaused) return;
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
909 void pause([Future resumeSignal]) { 918 void pause([Future resumeSignal]) {
910 _stream._pauseSubscription(resumeSignal); 919 _stream._pauseSubscription(resumeSignal);
911 } 920 }
912 921
913 void resume() { 922 void resume() {
914 _stream._resumeSubscription(); 923 _stream._resumeSubscription();
915 } 924 }
916 925
917 Future cancel() { 926 Future cancel() {
918 _stream._cancelSubscription(); 927 _stream._cancelSubscription();
919 return null; 928 return Future._nullFuture;
920 } 929 }
921 930
922 bool get isPaused { 931 bool get isPaused {
923 return _stream._isSubscriptionPaused; 932 return _stream._isSubscriptionPaused;
924 } 933 }
925 934
926 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 935 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
927 throw new UnsupportedError( 936 throw new UnsupportedError(
928 "Cannot change handlers of asBroadcastStream source subscription."); 937 "Cannot change handlers of asBroadcastStream source subscription.");
929 } 938 }
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
1025 /** Clears up the internal state when the iterator ends. */ 1034 /** Clears up the internal state when the iterator ends. */
1026 void _clear() { 1035 void _clear() {
1027 _subscription = null; 1036 _subscription = null;
1028 _futureOrPrefetch = null; 1037 _futureOrPrefetch = null;
1029 _current = null; 1038 _current = null;
1030 _state = _STATE_DONE; 1039 _state = _STATE_DONE;
1031 } 1040 }
1032 1041
1033 Future cancel() { 1042 Future cancel() {
1034 StreamSubscription subscription = _subscription; 1043 StreamSubscription subscription = _subscription;
1035 if (subscription == null) return null; 1044 if (subscription == null) return Future._nullFuture;
1036 if (_state == _STATE_MOVING) { 1045 if (_state == _STATE_MOVING) {
1037 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1046 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1038 _clear(); 1047 _clear();
1039 hasNext._complete(false); 1048 hasNext._complete(false);
1040 } else { 1049 } else {
1041 _clear(); 1050 _clear();
1042 } 1051 }
1043 return subscription.cancel(); 1052 return subscription.cancel();
1044 } 1053 }
1045 1054
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
1089 class _EmptyStream<T> extends Stream<T> { 1098 class _EmptyStream<T> extends Stream<T> {
1090 const _EmptyStream() : super._internal(); 1099 const _EmptyStream() : super._internal();
1091 bool get isBroadcast => true; 1100 bool get isBroadcast => true;
1092 StreamSubscription<T> listen(void onData(T data), 1101 StreamSubscription<T> listen(void onData(T data),
1093 {Function onError, 1102 {Function onError,
1094 void onDone(), 1103 void onDone(),
1095 bool cancelOnError}) { 1104 bool cancelOnError}) {
1096 return new _DoneStreamSubscription<T>(onDone); 1105 return new _DoneStreamSubscription<T>(onDone);
1097 } 1106 }
1098 } 1107 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698