Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1119)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Mark failing tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
66 * after a call to [resume] if there are pending events. 66 * after a call to [resume] if there are pending events.
67 */ 67 */
68 static const int _STATE_INPUT_PAUSED = 4; 68 static const int _STATE_INPUT_PAUSED = 4;
69 /** 69 /**
70 * Whether the subscription has been canceled. 70 * Whether the subscription has been canceled.
71 * 71 *
72 * Set by calling [cancel], or by handling a "done" event, or an "error" event 72 * Set by calling [cancel], or by handling a "done" event, or an "error" event
73 * when `cancelOnError` is true. 73 * when `cancelOnError` is true.
74 */ 74 */
75 static const int _STATE_CANCELED = 8; 75 static const int _STATE_CANCELED = 8;
76 static const int _STATE_IN_CALLBACK = 16; 76 /**
77 static const int _STATE_HAS_PENDING = 32; 77 * Set when either:
78 static const int _STATE_PAUSE_COUNT = 64; 78 *
79 static const int _STATE_PAUSE_COUNT_SHIFT = 6; 79 * * an error is sent, and [cancelOnError] is true, or
80 * * a done event is sent.
81 *
82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
83 * state is unset, and no furher events must be delivered.
84 */
85 static const int _STATE_WAIT_FOR_CANCEL = 16;
86 static const int _STATE_IN_CALLBACK = 32;
87 static const int _STATE_HAS_PENDING = 64;
88 static const int _STATE_PAUSE_COUNT = 128;
89 static const int _STATE_PAUSE_COUNT_SHIFT = 7;
80 90
81 /* Event handlers provided in constructor. */ 91 /* Event handlers provided in constructor. */
82 _DataHandler<T> _onData; 92 _DataHandler<T> _onData;
83 Function _onError; 93 Function _onError;
84 _DoneHandler _onDone; 94 _DoneHandler _onDone;
85 final Zone _zone = Zone.current; 95 final Zone _zone = Zone.current;
86 96
87 /** Bit vector based on state-constants above. */ 97 /** Bit vector based on state-constants above. */
88 int _state; 98 int _state;
89 99
100 // TODO(floitsch): reuse another field
101 /** The future [_onCancel] may return. */
102 Future _cancelFuture;
103
90 /** 104 /**
91 * Queue of pending events. 105 * Queue of pending events.
92 * 106 *
93 * Is created when necessary, or set in constructor for preconfigured events. 107 * Is created when necessary, or set in constructor for preconfigured events.
94 */ 108 */
95 _PendingEvents _pending; 109 _PendingEvents _pending;
96 110
97 _BufferingStreamSubscription(bool cancelOnError) 111 _BufferingStreamSubscription(bool cancelOnError)
98 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); 112 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0);
99 113
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
164 _pending.schedule(this); 178 _pending.schedule(this);
165 } else { 179 } else {
166 assert(_mayResumeInput); 180 assert(_mayResumeInput);
167 _state &= ~_STATE_INPUT_PAUSED; 181 _state &= ~_STATE_INPUT_PAUSED;
168 if (!_inCallback) _guardCallback(_onResume); 182 if (!_inCallback) _guardCallback(_onResume);
169 } 183 }
170 } 184 }
171 } 185 }
172 } 186 }
173 187
174 void cancel() { 188 Future cancel() {
175 if (_isCanceled) return; 189 // The user doesn't want to receive any further events. If there is an
190 // error or done event pending (waiting for the cancel to be done) discard
191 // that event.
192 _state &= ~_STATE_WAIT_FOR_CANCEL;
193 if (_isCanceled) return _cancelFuture;
176 _cancel(); 194 _cancel();
177 if (!_inCallback) { 195 return _cancelFuture;
178 // otherwise checkState will be called after firing or callback completes.
179 _state |= _STATE_IN_CALLBACK;
180 _onCancel();
181 _pending = null;
182 _state &= ~_STATE_IN_CALLBACK;
183 }
184 } 196 }
185 197
186 Future asFuture([var futureValue]) { 198 Future asFuture([var futureValue]) {
187 _Future<T> result = new _Future<T>(); 199 _Future<T> result = new _Future<T>();
188 200
189 // Overwrite the onDone and onError handlers. 201 // Overwrite the onDone and onError handlers.
190 _onDone = () { result._complete(futureValue); }; 202 _onDone = () { result._complete(futureValue); };
191 _onError = (error, stackTrace) { 203 _onError = (error, stackTrace) {
192 cancel(); 204 cancel();
193 result._completeError(error, stackTrace); 205 result._completeError(error, stackTrace);
194 }; 206 };
195 207
196 return result; 208 return result;
197 } 209 }
198 210
199 // State management. 211 // State management.
200 212
201 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; 213 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
202 bool get _isClosed => (_state & _STATE_CLOSED) != 0; 214 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
203 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 215 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
216 bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0;
204 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; 217 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
205 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; 218 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
206 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; 219 bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
207 bool get _canFire => _state < _STATE_IN_CALLBACK; 220 bool get _canFire => _state < _STATE_IN_CALLBACK;
208 bool get _mayResumeInput => 221 bool get _mayResumeInput =>
209 !_isPaused && (_pending == null || _pending.isEmpty); 222 !_isPaused && (_pending == null || _pending.isEmpty);
210 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; 223 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
211 224
212 bool get isPaused => _isPaused; 225 bool get isPaused => _isPaused;
213 226
214 void _cancel() { 227 void _cancel() {
215 _state |= _STATE_CANCELED; 228 _state |= _STATE_CANCELED;
216 if (_hasPending) { 229 if (_hasPending) {
217 _pending.cancelSchedule(); 230 _pending.cancelSchedule();
218 } 231 }
232 if (!_inCallback) _pending = null;
233 _cancelFuture = _onCancel();
219 } 234 }
220 235
221 /** 236 /**
222 * Increment the pause count. 237 * Increment the pause count.
223 * 238 *
224 * Also marks input as paused. 239 * Also marks input as paused.
225 */ 240 */
226 void _incrementPauseCount() { 241 void _incrementPauseCount() {
227 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; 242 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
228 } 243 }
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 // try/catch wrapping and send any errors to 291 // try/catch wrapping and send any errors to
277 // [_Zone.current.handleUncaughtError]. 292 // [_Zone.current.handleUncaughtError].
278 void _onPause() { 293 void _onPause() {
279 assert(_isInputPaused); 294 assert(_isInputPaused);
280 } 295 }
281 296
282 void _onResume() { 297 void _onResume() {
283 assert(!_isInputPaused); 298 assert(!_isInputPaused);
284 } 299 }
285 300
286 void _onCancel() { 301 Future _onCancel() {
287 assert(_isCanceled); 302 assert(_isCanceled);
288 } 303 }
289 304
290 // Handle pending events. 305 // Handle pending events.
291 306
292 /** 307 /**
293 * Add a pending event. 308 * Add a pending event.
294 * 309 *
295 * If the subscription is not paused, this also schedules a firing 310 * If the subscription is not paused, this also schedules a firing
296 * of pending events later (if necessary). 311 * of pending events later (if necessary).
(...skipping 21 matching lines...) Expand all
318 _zone.runUnaryGuarded(_onData, data); 333 _zone.runUnaryGuarded(_onData, data);
319 _state &= ~_STATE_IN_CALLBACK; 334 _state &= ~_STATE_IN_CALLBACK;
320 _checkState(wasInputPaused); 335 _checkState(wasInputPaused);
321 } 336 }
322 337
323 void _sendError(var error, StackTrace stackTrace) { 338 void _sendError(var error, StackTrace stackTrace) {
324 assert(!_isCanceled); 339 assert(!_isCanceled);
325 assert(!_isPaused); 340 assert(!_isPaused);
326 assert(!_inCallback); 341 assert(!_inCallback);
327 bool wasInputPaused = _isInputPaused; 342 bool wasInputPaused = _isInputPaused;
328 _state |= _STATE_IN_CALLBACK; 343
329 if (!_zone.inSameErrorZone(Zone.current)) { 344 void sendError() {
330 // Errors are not allowed to traverse zone boundaries. 345 // If the subscription has been canceled while waiting for the cancel
331 Zone.current.handleUncaughtError(error, stackTrace); 346 // future to finish we must not report the error.
332 } else if (_onError is ZoneBinaryCallback) { 347 if (_isCanceled && !_waitsForCancel) return;
333 _zone.runBinaryGuarded(_onError, error, stackTrace); 348 _state |= _STATE_IN_CALLBACK;
349 if (!_zone.inSameErrorZone(Zone.current)) {
350 // Errors are not allowed to traverse zone boundaries.
351 Zone.current.handleUncaughtError(error, stackTrace);
352 } else if (_onError is ZoneBinaryCallback) {
353 _zone.runBinaryGuarded(_onError, error, stackTrace);
354 } else {
355 _zone.runUnaryGuarded(_onError, error);
356 }
357 _state &= ~_STATE_IN_CALLBACK;
358 }
359
360 if (_cancelOnError) {
361 _state |= _STATE_WAIT_FOR_CANCEL;
362 _cancel();
363 if (_cancelFuture is Future) {
364 _cancelFuture.whenComplete(sendError);
365 } else {
366 sendError();
367 }
334 } else { 368 } else {
335 _zone.runUnaryGuarded(_onError, error); 369 sendError();
370 // Only check state if not cancelOnError.
371 _checkState(wasInputPaused);
336 } 372 }
337 _state &= ~_STATE_IN_CALLBACK;
338 if (_cancelOnError) {
339 _cancel();
340 }
341 _checkState(wasInputPaused);
342 } 373 }
343 374
344 void _sendDone() { 375 void _sendDone() {
345 assert(!_isCanceled); 376 assert(!_isCanceled);
346 assert(!_isPaused); 377 assert(!_isPaused);
347 assert(!_inCallback); 378 assert(!_inCallback);
348 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); 379
349 _zone.runGuarded(_onDone); 380 void sendDone() {
350 _onCancel(); // No checkState after cancel, it is always the last event. 381 // If the subscription has been canceled while waiting for the cancel
351 _state &= ~_STATE_IN_CALLBACK; 382 // future to finish we must not report the done event.
383 if (!_waitsForCancel) return;
384 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
385 _zone.runGuarded(_onDone);
386 _state &= ~_STATE_IN_CALLBACK;
387 }
388
389 _cancel();
390 _state |= _STATE_WAIT_FOR_CANCEL;
391 if (_cancelFuture is Future) {
392 _cancelFuture.whenComplete(sendDone);
393 } else {
394 sendDone();
395 }
352 } 396 }
353 397
354 /** 398 /**
355 * Call a hook function. 399 * Call a hook function.
356 * 400 *
357 * The call is properly wrapped in code to avoid other callbacks 401 * The call is properly wrapped in code to avoid other callbacks
358 * during the call, and it checks for state changes after the call 402 * during the call, and it checks for state changes after the call
359 * that should cause further callbacks. 403 * that should cause further callbacks.
360 */ 404 */
361 void _guardCallback(callback) { 405 void _guardCallback(callback) {
(...skipping 20 matching lines...) Expand all
382 if (_hasPending && _pending.isEmpty) { 426 if (_hasPending && _pending.isEmpty) {
383 _state &= ~_STATE_HAS_PENDING; 427 _state &= ~_STATE_HAS_PENDING;
384 if (_isInputPaused && _mayResumeInput) { 428 if (_isInputPaused && _mayResumeInput) {
385 _state &= ~_STATE_INPUT_PAUSED; 429 _state &= ~_STATE_INPUT_PAUSED;
386 } 430 }
387 } 431 }
388 // If the state changes during a callback, we immediately 432 // If the state changes during a callback, we immediately
389 // make a new state-change callback. Loop until the state didn't change. 433 // make a new state-change callback. Loop until the state didn't change.
390 while (true) { 434 while (true) {
391 if (_isCanceled) { 435 if (_isCanceled) {
392 _onCancel();
393 _pending = null; 436 _pending = null;
394 return; 437 return;
395 } 438 }
396 bool isInputPaused = _isInputPaused; 439 bool isInputPaused = _isInputPaused;
397 if (wasInputPaused == isInputPaused) break; 440 if (wasInputPaused == isInputPaused) break;
398 _state ^= _STATE_IN_CALLBACK; 441 _state ^= _STATE_IN_CALLBACK;
399 if (isInputPaused) { 442 if (isInputPaused) {
400 _onPause(); 443 _onPause();
401 } else { 444 } else {
402 _onResume(); 445 _onResume();
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after
692 void onError(Function handleError) {} 735 void onError(Function handleError) {}
693 void onDone(void handleDone()) {} 736 void onDone(void handleDone()) {}
694 737
695 void pause([Future resumeSignal]) { 738 void pause([Future resumeSignal]) {
696 _pauseCounter++; 739 _pauseCounter++;
697 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); 740 if (resumeSignal != null) resumeSignal.then((_) { resume(); });
698 } 741 }
699 void resume() { 742 void resume() {
700 if (_pauseCounter > 0) _pauseCounter--; 743 if (_pauseCounter > 0) _pauseCounter--;
701 } 744 }
702 void cancel() {} 745 Future cancel() => null;
703 bool get isPaused => _pauseCounter > 0; 746 bool get isPaused => _pauseCounter > 0;
704 747
705 Future asFuture([futureValue]) => new _Future(); 748 Future asFuture([futureValue]) => new _Future();
706 } 749 }
707 750
708 class _AsBroadcastStream<T> extends Stream<T> { 751 class _AsBroadcastStream<T> extends Stream<T> {
709 final Stream<T> _source; 752 final Stream<T> _source;
710 final _broadcastCallback _onListenHandler; 753 final _broadcastCallback _onListenHandler;
711 final _broadcastCallback _onCancelHandler; 754 final _broadcastCallback _onCancelHandler;
712 final Zone _zone; 755 final Zone _zone;
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
816 } 859 }
817 860
818 void pause([Future resumeSignal]) { 861 void pause([Future resumeSignal]) {
819 _stream._pauseSubscription(resumeSignal); 862 _stream._pauseSubscription(resumeSignal);
820 } 863 }
821 864
822 void resume() { 865 void resume() {
823 _stream._resumeSubscription(); 866 _stream._resumeSubscription();
824 } 867 }
825 868
826 void cancel() { 869 Future cancel() {
827 _stream._cancelSubscription(); 870 _stream._cancelSubscription();
871 return null;
828 } 872 }
829 873
830 bool get isPaused { 874 bool get isPaused {
831 return _stream._isSubscriptionPaused; 875 return _stream._isSubscriptionPaused;
832 } 876 }
833 877
834 Future asFuture([var futureValue]) { 878 Future asFuture([var futureValue]) {
835 throw new UnsupportedError( 879 throw new UnsupportedError(
836 "Cannot change handlers of asBroadcastStream source subscription."); 880 "Cannot change handlers of asBroadcastStream source subscription.");
837 } 881 }
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
928 } 972 }
929 973
930 /** Clears up the internal state when the iterator ends. */ 974 /** Clears up the internal state when the iterator ends. */
931 void _clear() { 975 void _clear() {
932 _subscription = null; 976 _subscription = null;
933 _futureOrPrefetch = null; 977 _futureOrPrefetch = null;
934 _current = null; 978 _current = null;
935 _state = _STATE_DONE; 979 _state = _STATE_DONE;
936 } 980 }
937 981
938 void cancel() { 982 Future cancel() {
939 StreamSubscription subscription = _subscription; 983 StreamSubscription subscription = _subscription;
940 if (_state == _STATE_MOVING) { 984 if (_state == _STATE_MOVING) {
941 _Future<bool> hasNext = _futureOrPrefetch; 985 _Future<bool> hasNext = _futureOrPrefetch;
942 _clear(); 986 _clear();
943 hasNext._complete(false); 987 hasNext._complete(false);
944 } else { 988 } else {
945 _clear(); 989 _clear();
946 } 990 }
947 subscription.cancel(); 991 return subscription.cancel();
948 } 992 }
949 993
950 void _onData(T data) { 994 void _onData(T data) {
951 if (_state == _STATE_MOVING) { 995 if (_state == _STATE_MOVING) {
952 _current = data; 996 _current = data;
953 _Future<bool> hasNext = _futureOrPrefetch; 997 _Future<bool> hasNext = _futureOrPrefetch;
954 _futureOrPrefetch = null; 998 _futureOrPrefetch = null;
955 _state = _STATE_FOUND; 999 _state = _STATE_FOUND;
956 hasNext._complete(true); 1000 hasNext._complete(true);
957 return; 1001 return;
(...skipping 23 matching lines...) Expand all
981 _Future<bool> hasNext = _futureOrPrefetch; 1025 _Future<bool> hasNext = _futureOrPrefetch;
982 _clear(); 1026 _clear();
983 hasNext._complete(false); 1027 hasNext._complete(false);
984 return; 1028 return;
985 } 1029 }
986 _subscription.pause(); 1030 _subscription.pause();
987 _futureOrPrefetch = null; 1031 _futureOrPrefetch = null;
988 _state = _STATE_EXTRA_DONE; 1032 _state = _STATE_EXTRA_DONE;
989 } 1033 }
990 } 1034 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698