Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(233)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 2202533003: Return futures on Stream.cancel when possible. (Closed) Base URL: git@github.com:dart-lang/sdk.git@master
Patch Set: Remove debug-print. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 760 matching lines...) Expand 10 before | Expand all | Expand 10 after
771 771
772 void resume() { 772 void resume() {
773 if (isPaused) { 773 if (isPaused) {
774 _state -= _PAUSED; 774 _state -= _PAUSED;
775 if (!isPaused && !_isSent) { 775 if (!isPaused && !_isSent) {
776 _schedule(); 776 _schedule();
777 } 777 }
778 } 778 }
779 } 779 }
780 780
781 Future cancel() => null; 781 Future cancel() => new Future.value(null);
Lasse Reichstein Nielsen 2016/08/01 15:23:26 Future._nullFuture
floitsch 2016/08/01 21:00:37 Done.
782 782
783 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 783 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
784 _Future/*<E>*/ result = new _Future/*<E>*/(); 784 _Future/*<E>*/ result = new _Future/*<E>*/();
785 _onDone = () { result._completeWithValue(null); }; 785 _onDone = () { result._completeWithValue(null); };
786 return result; 786 return result;
787 } 787 }
788 788
789 void _sendDone() { 789 void _sendDone() {
790 _state &= ~_SCHEDULED; 790 _state &= ~_SCHEDULED;
791 if (isPaused) return; 791 if (isPaused) return;
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
909 void pause([Future resumeSignal]) { 909 void pause([Future resumeSignal]) {
910 _stream._pauseSubscription(resumeSignal); 910 _stream._pauseSubscription(resumeSignal);
911 } 911 }
912 912
913 void resume() { 913 void resume() {
914 _stream._resumeSubscription(); 914 _stream._resumeSubscription();
915 } 915 }
916 916
917 Future cancel() { 917 Future cancel() {
918 _stream._cancelSubscription(); 918 _stream._cancelSubscription();
919 return null; 919 return new Future.value(null);
Lasse Reichstein Nielsen 2016/08/01 15:23:26 Future._nullFuture (I will stop saying it now, but
floitsch 2016/08/01 21:00:37 Done.
920 } 920 }
921 921
922 bool get isPaused { 922 bool get isPaused {
923 return _stream._isSubscriptionPaused; 923 return _stream._isSubscriptionPaused;
924 } 924 }
925 925
926 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) { 926 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
927 throw new UnsupportedError( 927 throw new UnsupportedError(
928 "Cannot change handlers of asBroadcastStream source subscription."); 928 "Cannot change handlers of asBroadcastStream source subscription.");
929 } 929 }
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
1025 /** Clears up the internal state when the iterator ends. */ 1025 /** Clears up the internal state when the iterator ends. */
1026 void _clear() { 1026 void _clear() {
1027 _subscription = null; 1027 _subscription = null;
1028 _futureOrPrefetch = null; 1028 _futureOrPrefetch = null;
1029 _current = null; 1029 _current = null;
1030 _state = _STATE_DONE; 1030 _state = _STATE_DONE;
1031 } 1031 }
1032 1032
1033 Future cancel() { 1033 Future cancel() {
1034 StreamSubscription subscription = _subscription; 1034 StreamSubscription subscription = _subscription;
1035 if (subscription == null) return null; 1035 if (subscription == null) return new Future.value(null);
1036 if (_state == _STATE_MOVING) { 1036 if (_state == _STATE_MOVING) {
1037 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; 1037 _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
1038 _clear(); 1038 _clear();
1039 hasNext._complete(false); 1039 hasNext._complete(false);
1040 } else { 1040 } else {
1041 _clear(); 1041 _clear();
1042 } 1042 }
1043 return subscription.cancel(); 1043 return subscription.cancel();
1044 } 1044 }
1045 1045
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
1089 class _EmptyStream<T> extends Stream<T> { 1089 class _EmptyStream<T> extends Stream<T> {
1090 const _EmptyStream() : super._internal(); 1090 const _EmptyStream() : super._internal();
1091 bool get isBroadcast => true; 1091 bool get isBroadcast => true;
1092 StreamSubscription<T> listen(void onData(T data), 1092 StreamSubscription<T> listen(void onData(T data),
1093 {Function onError, 1093 {Function onError,
1094 void onDone(), 1094 void onDone(),
1095 bool cancelOnError}) { 1095 bool cancelOnError}) {
1096 return new _DoneStreamSubscription<T>(onDone); 1096 return new _DoneStreamSubscription<T>(onDone);
1097 } 1097 }
1098 } 1098 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698