Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(121)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix type. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error, StackTrace stackTrace); 10 void _addError(Object error, StackTrace stackTrace);
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
66 * after a call to [resume] if there are pending events. 66 * after a call to [resume] if there are pending events.
67 */ 67 */
68 static const int _STATE_INPUT_PAUSED = 4; 68 static const int _STATE_INPUT_PAUSED = 4;
69 /** 69 /**
70 * Whether the subscription has been canceled. 70 * Whether the subscription has been canceled.
71 * 71 *
72 * Set by calling [cancel], or by handling a "done" event, or an "error" event 72 * Set by calling [cancel], or by handling a "done" event, or an "error" event
73 * when `cancelOnError` is true. 73 * when `cancelOnError` is true.
74 */ 74 */
75 static const int _STATE_CANCELED = 8; 75 static const int _STATE_CANCELED = 8;
76 static const int _STATE_IN_CALLBACK = 16; 76 /**
77 static const int _STATE_HAS_PENDING = 32; 77 * Set when either:
78 static const int _STATE_PAUSE_COUNT = 64; 78 * * An error is sent, and [cancelOnError] is true.
floitsch 2013/10/16 14:43:44 Nit (you also need the new line as otherwise dartd
Anders Johnsen 2013/10/21 08:01:46 Done.
79 static const int _STATE_PAUSE_COUNT_SHIFT = 6; 79 * * A done is sent.
80 *
81 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
82 * state is unset, and no furher events should be delivered.
floitsch 2013/10/16 14:43:44 "should" is not helpful. I guess "is" is more accu
Anders Johnsen 2013/10/21 08:01:46 Using 'must', as the comment is dictating how the
83 */
84 static const int _STATE_WAIT_FOR_CANCEL = 16;
85 static const int _STATE_IN_CALLBACK = 32;
86 static const int _STATE_HAS_PENDING = 64;
87 static const int _STATE_PAUSE_COUNT = 128;
88 static const int _STATE_PAUSE_COUNT_SHIFT = 7;
80 89
81 /* Event handlers provided in constructor. */ 90 /* Event handlers provided in constructor. */
82 _DataHandler<T> _onData; 91 _DataHandler<T> _onData;
83 Function _onError; 92 Function _onError;
84 _DoneHandler _onDone; 93 _DoneHandler _onDone;
85 final Zone _zone = Zone.current; 94 final Zone _zone = Zone.current;
86 95
87 /** Bit vector based on state-constants above. */ 96 /** Bit vector based on state-constants above. */
88 int _state; 97 int _state;
89 98
99 // TODO(floitsch): reuse another field
100 /** The future [_onCancel] may return. */
101 Future _cancelFuture;
102
90 /** 103 /**
91 * Queue of pending events. 104 * Queue of pending events.
92 * 105 *
93 * Is created when necessary, or set in constructor for preconfigured events. 106 * Is created when necessary, or set in constructor for preconfigured events.
94 */ 107 */
95 _PendingEvents _pending; 108 _PendingEvents _pending;
96 109
97 _BufferingStreamSubscription(bool cancelOnError) 110 _BufferingStreamSubscription(bool cancelOnError)
98 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); 111 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0);
99 112
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
164 _pending.schedule(this); 177 _pending.schedule(this);
165 } else { 178 } else {
166 assert(_mayResumeInput); 179 assert(_mayResumeInput);
167 _state &= ~_STATE_INPUT_PAUSED; 180 _state &= ~_STATE_INPUT_PAUSED;
168 if (!_inCallback) _guardCallback(_onResume); 181 if (!_inCallback) _guardCallback(_onResume);
169 } 182 }
170 } 183 }
171 } 184 }
172 } 185 }
173 186
174 void cancel() { 187 Future cancel() {
175 if (_isCanceled) return; 188 // The user doesn't want to receive any further events. If there is an
189 // error or done event pending (waiting for the cancel to be done) discard
190 // that event.
191 _state &= ~_STATE_WAIT_FOR_CANCEL;
192 if (_isCanceled) return _cancelFuture;
176 _cancel(); 193 _cancel();
177 if (!_inCallback) { 194 return _cancelFuture;
178 // otherwise checkState will be called after firing or callback completes.
179 _state |= _STATE_IN_CALLBACK;
180 _onCancel();
181 _pending = null;
182 _state &= ~_STATE_IN_CALLBACK;
183 }
184 } 195 }
185 196
186 Future asFuture([var futureValue]) { 197 Future asFuture([var futureValue]) {
187 _Future<T> result = new _Future<T>(); 198 _Future<T> result = new _Future<T>();
188 199
189 // Overwrite the onDone and onError handlers. 200 // Overwrite the onDone and onError handlers.
190 _onDone = () { result._complete(futureValue); }; 201 _onDone = () { result._complete(futureValue); };
191 _onError = (error, stackTrace) { 202 _onError = (error, stackTrace) {
192 cancel(); 203 cancel();
193 result._completeError(error, stackTrace); 204 result._completeError(error, stackTrace);
194 }; 205 };
195 206
196 return result; 207 return result;
197 } 208 }
198 209
199 // State management. 210 // State management.
200 211
201 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; 212 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
202 bool get _isClosed => (_state & _STATE_CLOSED) != 0; 213 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
203 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 214 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
215 bool get _waitForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0;
floitsch 2013/10/16 14:43:44 _waitsForCancel
Anders Johnsen 2013/10/21 08:01:46 Done.
204 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; 216 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
205 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; 217 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
206 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; 218 bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
207 bool get _canFire => _state < _STATE_IN_CALLBACK; 219 bool get _canFire => _state < _STATE_IN_CALLBACK;
208 bool get _mayResumeInput => 220 bool get _mayResumeInput =>
209 !_isPaused && (_pending == null || _pending.isEmpty); 221 !_isPaused && (_pending == null || _pending.isEmpty);
210 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; 222 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
211 223
212 bool get isPaused => _isPaused; 224 bool get isPaused => _isPaused;
213 225
214 void _cancel() { 226 void _cancel() {
215 _state |= _STATE_CANCELED; 227 _state |= _STATE_CANCELED;
216 if (_hasPending) { 228 if (_hasPending) {
217 _pending.cancelSchedule(); 229 _pending.cancelSchedule();
218 } 230 }
231 if (!_inCallback) _pending = null;
232 _cancelFuture = _onCancel();
219 } 233 }
220 234
221 /** 235 /**
222 * Increment the pause count. 236 * Increment the pause count.
223 * 237 *
224 * Also marks input as paused. 238 * Also marks input as paused.
225 */ 239 */
226 void _incrementPauseCount() { 240 void _incrementPauseCount() {
227 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; 241 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
228 } 242 }
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 // try/catch wrapping and send any errors to 290 // try/catch wrapping and send any errors to
277 // [_Zone.current.handleUncaughtError]. 291 // [_Zone.current.handleUncaughtError].
278 void _onPause() { 292 void _onPause() {
279 assert(_isInputPaused); 293 assert(_isInputPaused);
280 } 294 }
281 295
282 void _onResume() { 296 void _onResume() {
283 assert(!_isInputPaused); 297 assert(!_isInputPaused);
284 } 298 }
285 299
286 void _onCancel() { 300 Future _onCancel() {
287 assert(_isCanceled); 301 assert(_isCanceled);
288 } 302 }
289 303
290 // Handle pending events. 304 // Handle pending events.
291 305
292 /** 306 /**
293 * Add a pending event. 307 * Add a pending event.
294 * 308 *
295 * If the subscription is not paused, this also schedules a firing 309 * If the subscription is not paused, this also schedules a firing
296 * of pending events later (if necessary). 310 * of pending events later (if necessary).
(...skipping 21 matching lines...) Expand all
318 _zone.runUnaryGuarded(_onData, data); 332 _zone.runUnaryGuarded(_onData, data);
319 _state &= ~_STATE_IN_CALLBACK; 333 _state &= ~_STATE_IN_CALLBACK;
320 _checkState(wasInputPaused); 334 _checkState(wasInputPaused);
321 } 335 }
322 336
323 void _sendError(var error, StackTrace stackTrace) { 337 void _sendError(var error, StackTrace stackTrace) {
324 assert(!_isCanceled); 338 assert(!_isCanceled);
325 assert(!_isPaused); 339 assert(!_isPaused);
326 assert(!_inCallback); 340 assert(!_inCallback);
327 bool wasInputPaused = _isInputPaused; 341 bool wasInputPaused = _isInputPaused;
328 _state |= _STATE_IN_CALLBACK; 342
329 if (!_zone.inSameErrorZone(Zone.current)) { 343 void sendError() {
330 // Errors are not allowed to traverse zone boundaries. 344 // If the subscription has been canceled while waiting for the cancel
331 Zone.current.handleUncaughtError(error, stackTrace); 345 // future to finish we must not report the error.
332 } else if (_onError is ZoneBinaryCallback) { 346 if (_isCanceled && !_waitForCancel) return;
333 _zone.runBinaryGuarded(_onError, error, stackTrace); 347 _state |= _STATE_IN_CALLBACK;
348 if (!_zone.inSameErrorZone(Zone.current)) {
349 // Errors are not allowed to traverse zone boundaries.
350 Zone.current.handleUncaughtError(error, stackTrace);
351 } else if (_onError is ZoneBinaryCallback) {
352 _zone.runBinaryGuarded(_onError, error, stackTrace);
353 } else {
354 _zone.runUnaryGuarded(_onError, error);
355 }
356 _state &= ~_STATE_IN_CALLBACK;
357 }
358
359 if (_cancelOnError) {
360 _state |= _STATE_WAIT_FOR_CANCEL;
361 _cancel();
362 if (_cancelFuture is Future) {
363 _cancelFuture.whenComplete(sendError);
364 } else {
365 sendError();
366 }
334 } else { 367 } else {
335 _zone.runUnaryGuarded(_onError, error); 368 sendError();
369 // Only check state if not cancelOnError.
370 _checkState(wasInputPaused);
336 } 371 }
337 _state &= ~_STATE_IN_CALLBACK;
338 if (_cancelOnError) {
339 _cancel();
340 }
341 _checkState(wasInputPaused);
342 } 372 }
343 373
344 void _sendDone() { 374 void _sendDone() {
345 assert(!_isCanceled); 375 assert(!_isCanceled);
346 assert(!_isPaused); 376 assert(!_isPaused);
347 assert(!_inCallback); 377 assert(!_inCallback);
348 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); 378
349 _zone.runGuarded(_onDone); 379 void sendDone() {
350 _onCancel(); // No checkState after cancel, it is always the last event. 380 // If the subscription has been canceled while waiting for the cancel
351 _state &= ~_STATE_IN_CALLBACK; 381 // future to finish we must not report the done event.
382 if (!_waitForCancel) return;
383 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
384 _zone.runGuarded(_onDone);
385 _state &= ~_STATE_IN_CALLBACK;
386 }
387
388 _cancel();
389 _state |= _STATE_WAIT_FOR_CANCEL;
390 if (_cancelFuture is Future) {
391 _cancelFuture.whenComplete(sendDone);
392 } else {
393 sendDone();
394 }
352 } 395 }
353 396
354 /** 397 /**
355 * Call a hook function. 398 * Call a hook function.
356 * 399 *
357 * The call is properly wrapped in code to avoid other callbacks 400 * The call is properly wrapped in code to avoid other callbacks
358 * during the call, and it checks for state changes after the call 401 * during the call, and it checks for state changes after the call
359 * that should cause further callbacks. 402 * that should cause further callbacks.
360 */ 403 */
361 void _guardCallback(callback) { 404 void _guardCallback(callback) {
(...skipping 20 matching lines...) Expand all
382 if (_hasPending && _pending.isEmpty) { 425 if (_hasPending && _pending.isEmpty) {
383 _state &= ~_STATE_HAS_PENDING; 426 _state &= ~_STATE_HAS_PENDING;
384 if (_isInputPaused && _mayResumeInput) { 427 if (_isInputPaused && _mayResumeInput) {
385 _state &= ~_STATE_INPUT_PAUSED; 428 _state &= ~_STATE_INPUT_PAUSED;
386 } 429 }
387 } 430 }
388 // If the state changes during a callback, we immediately 431 // If the state changes during a callback, we immediately
389 // make a new state-change callback. Loop until the state didn't change. 432 // make a new state-change callback. Loop until the state didn't change.
390 while (true) { 433 while (true) {
391 if (_isCanceled) { 434 if (_isCanceled) {
392 _onCancel();
393 _pending = null; 435 _pending = null;
394 return; 436 return;
395 } 437 }
396 bool isInputPaused = _isInputPaused; 438 bool isInputPaused = _isInputPaused;
397 if (wasInputPaused == isInputPaused) break; 439 if (wasInputPaused == isInputPaused) break;
398 _state ^= _STATE_IN_CALLBACK; 440 _state ^= _STATE_IN_CALLBACK;
399 if (isInputPaused) { 441 if (isInputPaused) {
400 _onPause(); 442 _onPause();
401 } else { 443 } else {
402 _onResume(); 444 _onResume();
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after
692 void onError(Function handleError) {} 734 void onError(Function handleError) {}
693 void onDone(void handleDone()) {} 735 void onDone(void handleDone()) {}
694 736
695 void pause([Future resumeSignal]) { 737 void pause([Future resumeSignal]) {
696 _pauseCounter++; 738 _pauseCounter++;
697 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); 739 if (resumeSignal != null) resumeSignal.then((_) { resume(); });
698 } 740 }
699 void resume() { 741 void resume() {
700 if (_pauseCounter > 0) _pauseCounter--; 742 if (_pauseCounter > 0) _pauseCounter--;
701 } 743 }
702 void cancel() {} 744 Future cancel() => null;
703 bool get isPaused => _pauseCounter > 0; 745 bool get isPaused => _pauseCounter > 0;
704 746
705 Future asFuture([futureValue]) => new _Future(); 747 Future asFuture([futureValue]) => new _Future();
706 } 748 }
707 749
708 class _AsBroadcastStream<T> extends Stream<T> { 750 class _AsBroadcastStream<T> extends Stream<T> {
709 final Stream<T> _source; 751 final Stream<T> _source;
710 final _broadcastCallback _onListenHandler; 752 final _broadcastCallback _onListenHandler;
711 final _broadcastCallback _onCancelHandler; 753 final _broadcastCallback _onCancelHandler;
712 final Zone _zone; 754 final Zone _zone;
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
816 } 858 }
817 859
818 void pause([Future resumeSignal]) { 860 void pause([Future resumeSignal]) {
819 _stream._pauseSubscription(resumeSignal); 861 _stream._pauseSubscription(resumeSignal);
820 } 862 }
821 863
822 void resume() { 864 void resume() {
823 _stream._resumeSubscription(); 865 _stream._resumeSubscription();
824 } 866 }
825 867
826 void cancel() { 868 Future cancel() {
827 _stream._cancelSubscription(); 869 _stream._cancelSubscription();
870 return null;
828 } 871 }
829 872
830 bool get isPaused { 873 bool get isPaused {
831 return _stream._isSubscriptionPaused; 874 return _stream._isSubscriptionPaused;
832 } 875 }
833 876
834 Future asFuture([var futureValue]) { 877 Future asFuture([var futureValue]) {
835 throw new UnsupportedError( 878 throw new UnsupportedError(
836 "Cannot change handlers of asBroadcastStream source subscription."); 879 "Cannot change handlers of asBroadcastStream source subscription.");
837 } 880 }
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
928 } 971 }
929 972
930 /** Clears up the internal state when the iterator ends. */ 973 /** Clears up the internal state when the iterator ends. */
931 void _clear() { 974 void _clear() {
932 _subscription = null; 975 _subscription = null;
933 _futureOrPrefetch = null; 976 _futureOrPrefetch = null;
934 _current = null; 977 _current = null;
935 _state = _STATE_DONE; 978 _state = _STATE_DONE;
936 } 979 }
937 980
938 void cancel() { 981 Future cancel() {
939 StreamSubscription subscription = _subscription; 982 StreamSubscription subscription = _subscription;
940 if (_state == _STATE_MOVING) { 983 if (_state == _STATE_MOVING) {
941 _Future<bool> hasNext = _futureOrPrefetch; 984 _Future<bool> hasNext = _futureOrPrefetch;
942 _clear(); 985 _clear();
943 hasNext._complete(false); 986 hasNext._complete(false);
944 } else { 987 } else {
945 _clear(); 988 _clear();
946 } 989 }
947 subscription.cancel(); 990 return subscription.cancel();
948 } 991 }
949 992
950 void _onData(T data) { 993 void _onData(T data) {
951 if (_state == _STATE_MOVING) { 994 if (_state == _STATE_MOVING) {
952 _current = data; 995 _current = data;
953 _Future<bool> hasNext = _futureOrPrefetch; 996 _Future<bool> hasNext = _futureOrPrefetch;
954 _futureOrPrefetch = null; 997 _futureOrPrefetch = null;
955 _state = _STATE_FOUND; 998 _state = _STATE_FOUND;
956 hasNext._complete(true); 999 hasNext._complete(true);
957 return; 1000 return;
(...skipping 23 matching lines...) Expand all
981 _Future<bool> hasNext = _futureOrPrefetch; 1024 _Future<bool> hasNext = _futureOrPrefetch;
982 _clear(); 1025 _clear();
983 hasNext._complete(false); 1026 hasNext._complete(false);
984 return; 1027 return;
985 } 1028 }
986 _subscription.pause(); 1029 _subscription.pause();
987 _futureOrPrefetch = null; 1030 _futureOrPrefetch = null;
988 _state = _STATE_EXTRA_DONE; 1031 _state = _STATE_EXTRA_DONE;
989 } 1032 }
990 } 1033 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698