Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(950)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 18915008: Let StreamSubscription.cancel return a Future. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove dir stuff. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error); 10 void _addError(Object error);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
64 * after a call to [resume] if there are pending events. 64 * after a call to [resume] if there are pending events.
65 */ 65 */
66 static const int _STATE_INPUT_PAUSED = 4; 66 static const int _STATE_INPUT_PAUSED = 4;
67 /** 67 /**
68 * Whether the subscription has been canceled. 68 * Whether the subscription has been canceled.
69 * 69 *
70 * Set by calling [cancel], or by handling a "done" event, or an "error" event 70 * Set by calling [cancel], or by handling a "done" event, or an "error" event
71 * when `cancelOnError` is true. 71 * when `cancelOnError` is true.
72 */ 72 */
73 static const int _STATE_CANCELED = 8; 73 static const int _STATE_CANCELED = 8;
74 static const int _STATE_IN_CALLBACK = 16; 74 static const int _STATE_IN_ERROR_CANCEL = 16;
floitsch 2013/10/12 18:53:57 Add comment that _STATE_IN_ERROR_CANCEL implies th
Lasse Reichstein Nielsen 2013/10/14 11:32:33 And document what it means. In painful detail, bec
Anders Johnsen 2013/10/16 11:52:21 Done.
75 static const int _STATE_HAS_PENDING = 32; 75 static const int _STATE_IN_CALLBACK = 32;
76 static const int _STATE_PAUSE_COUNT = 64; 76 static const int _STATE_HAS_PENDING = 64;
77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; 77 static const int _STATE_PAUSE_COUNT = 128;
78 static const int _STATE_PAUSE_COUNT_SHIFT = 7;
78 79
79 /* Event handlers provided in constructor. */ 80 /* Event handlers provided in constructor. */
80 _DataHandler<T> _onData; 81 _DataHandler<T> _onData;
81 _ErrorHandler _onError; 82 _ErrorHandler _onError;
82 _DoneHandler _onDone; 83 _DoneHandler _onDone;
83 final Zone _zone = Zone.current; 84 final Zone _zone = Zone.current;
84 85
85 /** Bit vector based on state-constants above. */ 86 /** Bit vector based on state-constants above. */
86 int _state; 87 int _state;
87 88
89 _Future _cancelFuture;
floitsch 2013/10/12 18:53:57 Add "TODO(floitsch): reuse another field"
Anders Johnsen 2013/10/16 11:52:21 Done.
90
88 /** 91 /**
89 * Queue of pending events. 92 * Queue of pending events.
90 * 93 *
91 * Is created when necessary, or set in constructor for preconfigured events. 94 * Is created when necessary, or set in constructor for preconfigured events.
92 */ 95 */
93 _PendingEvents _pending; 96 _PendingEvents _pending;
94 97
95 _BufferingStreamSubscription(void onData(T data), 98 _BufferingStreamSubscription(void onData(T data),
96 void onError(error), 99 void onError(error),
97 void onDone(), 100 void onDone(),
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
172 _pending.schedule(this); 175 _pending.schedule(this);
173 } else { 176 } else {
174 assert(_mayResumeInput); 177 assert(_mayResumeInput);
175 _state &= ~_STATE_INPUT_PAUSED; 178 _state &= ~_STATE_INPUT_PAUSED;
176 if (!_inCallback) _guardCallback(_onResume); 179 if (!_inCallback) _guardCallback(_onResume);
177 } 180 }
178 } 181 }
179 } 182 }
180 } 183 }
181 184
182 void cancel() { 185 Future cancel() {
183 if (_isCanceled) return; 186 _state &= ~_STATE_IN_ERROR_CANCEL;
floitsch 2013/10/12 18:53:57 Add comment. // The user doesn't want to receive a
Lasse Reichstein Nielsen 2013/10/14 11:32:33 any events anymore -> any further events. This co
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
187 if (_isCanceled) return _cancelFuture;
184 _cancel(); 188 _cancel();
185 if (!_inCallback) { 189 return _cancelFuture;
186 // otherwise checkState will be called after firing or callback completes.
187 _state |= _STATE_IN_CALLBACK;
188 _onCancel();
189 _pending = null;
190 _state &= ~_STATE_IN_CALLBACK;
191 }
192 } 190 }
193 191
194 Future asFuture([var futureValue]) { 192 Future asFuture([var futureValue]) {
195 _Future<T> result = new _Future<T>(); 193 _Future<T> result = new _Future<T>();
196 194
197 // Overwrite the onDone and onError handlers. 195 // Overwrite the onDone and onError handlers.
198 _onDone = () { result._complete(futureValue); }; 196 _onDone = () { result._complete(futureValue); };
199 _onError = (error) { 197 _onError = (error) {
200 cancel(); 198 cancel();
201 result._completeError(error); 199 result._completeError(error);
202 }; 200 };
203 201
204 return result; 202 return result;
205 } 203 }
206 204
207 // State management. 205 // State management.
208 206
209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; 207 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; 208 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; 209 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
210 bool get _inErrorCancel => (_state & _STATE_IN_ERROR_CANCEL) != 0;
212 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; 211 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
213 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; 212 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
214 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; 213 bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
215 bool get _canFire => _state < _STATE_IN_CALLBACK; 214 bool get _canFire => _state < _STATE_IN_CALLBACK;
216 bool get _mayResumeInput => 215 bool get _mayResumeInput =>
217 !_isPaused && (_pending == null || _pending.isEmpty); 216 !_isPaused && (_pending == null || _pending.isEmpty);
218 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; 217 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
219 218
220 bool get isPaused => _isPaused; 219 bool get isPaused => _isPaused;
221 220
222 void _cancel() { 221 void _cancel() {
223 _state |= _STATE_CANCELED; 222 _state |= _STATE_CANCELED;
224 if (_hasPending) { 223 if (_hasPending) {
225 _pending.cancelSchedule(); 224 _pending.cancelSchedule();
226 } 225 }
226 if (!_inCallback) _pending = null;
227 _cancelFuture = _onCancel();
floitsch 2013/10/12 18:53:57 I'm not sure we are allowed to call "_onCancel" wh
Lasse Reichstein Nielsen 2013/10/14 11:32:33 I accepted that we had to call _onCancel during an
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
227 } 228 }
228 229
229 /** 230 /**
230 * Increment the pause count. 231 * Increment the pause count.
231 * 232 *
232 * Also marks input as paused. 233 * Also marks input as paused.
233 */ 234 */
234 void _incrementPauseCount() { 235 void _incrementPauseCount() {
235 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; 236 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
236 } 237 }
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
284 // try/catch wrapping and send any errors to 285 // try/catch wrapping and send any errors to
285 // [_Zone.current.handleUncaughtError]. 286 // [_Zone.current.handleUncaughtError].
286 void _onPause() { 287 void _onPause() {
287 assert(_isInputPaused); 288 assert(_isInputPaused);
288 } 289 }
289 290
290 void _onResume() { 291 void _onResume() {
291 assert(!_isInputPaused); 292 assert(!_isInputPaused);
292 } 293 }
293 294
294 void _onCancel() { 295 Future _onCancel() {
295 assert(_isCanceled); 296 assert(_isCanceled);
296 } 297 }
297 298
298 // Handle pending events. 299 // Handle pending events.
299 300
300 /** 301 /**
301 * Add a pending event. 302 * Add a pending event.
302 * 303 *
303 * If the subscription is not paused, this also schedules a firing 304 * If the subscription is not paused, this also schedules a firing
304 * of pending events later (if necessary). 305 * of pending events later (if necessary).
(...skipping 21 matching lines...) Expand all
326 _zone.runUnaryGuarded(_onData, data); 327 _zone.runUnaryGuarded(_onData, data);
327 _state &= ~_STATE_IN_CALLBACK; 328 _state &= ~_STATE_IN_CALLBACK;
328 _checkState(wasInputPaused); 329 _checkState(wasInputPaused);
329 } 330 }
330 331
331 void _sendError(var error) { 332 void _sendError(var error) {
332 assert(!_isCanceled); 333 assert(!_isCanceled);
333 assert(!_isPaused); 334 assert(!_isPaused);
334 assert(!_inCallback); 335 assert(!_inCallback);
335 bool wasInputPaused = _isInputPaused; 336 bool wasInputPaused = _isInputPaused;
336 _state |= _STATE_IN_CALLBACK; 337 void sendError() {
floitsch 2013/10/12 18:53:57 one line before and one line after the function.
Anders Johnsen 2013/10/16 11:52:21 Done.
337 if (!_zone.inSameErrorZone(Zone.current)) { 338 if (!_isCanceled || _inErrorCancel) {
floitsch 2013/10/12 18:53:57 Comment: If the subscription has been canceled whi
Anders Johnsen 2013/10/16 11:52:21 Done.
338 // Errors are not allowed to traverse zone boundaries. 339 _state |= _STATE_IN_CALLBACK;
339 Zone.current.handleUncaughtError(error); 340 if (!_zone.inSameErrorZone(Zone.current)) {
341 // Errors are not allowed to traverse zone boundaries.
342 Zone.current.handleUncaughtError(error);
343 } else {
344 _zone.runUnaryGuarded(_onError, error);
345 }
346 _state &= ~_STATE_IN_CALLBACK;
347 }
348 }
349 if (_cancelOnError) {
350 _state |= _STATE_IN_ERROR_CANCEL;
351 _cancel();
352 if (_cancelFuture != null) {
353 _cancelFuture.whenComplete(sendError);
354 } else {
355 sendError();
356 }
340 } else { 357 } else {
341 _zone.runUnaryGuarded(_onError, error); 358 sendError();
359 // Only check state if not cancelOnError.
360 _checkState(wasInputPaused);
342 } 361 }
343 _state &= ~_STATE_IN_CALLBACK;
344 if (_cancelOnError) {
345 _cancel();
346 }
347 _checkState(wasInputPaused);
348 } 362 }
349 363
350 void _sendDone() { 364 void _sendDone() {
351 assert(!_isCanceled); 365 assert(!_isCanceled);
352 assert(!_isPaused); 366 assert(!_isPaused);
353 assert(!_inCallback); 367 assert(!_inCallback);
354 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); 368 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
355 _zone.runGuarded(_onDone); 369 _zone.runGuarded(_onDone);
356 _onCancel(); // No checkState after cancel, it is always the last event. 370 // TODO(ajohnsen): Run it before _onDone and wait for future?
Anders Johnsen 2013/10/11 12:32:40 Please comment on this one - I'm not sure what the
floitsch 2013/10/12 18:53:57 Neither. The done event should *not* call cancel.
Anders Johnsen 2013/10/16 11:52:21 Done.
Anders Johnsen 2013/10/16 11:52:21 Done.
371 _cancel(); // No checkState after cancel, it is always the last event.
357 _state &= ~_STATE_IN_CALLBACK; 372 _state &= ~_STATE_IN_CALLBACK;
358 } 373 }
359 374
360 /** 375 /**
361 * Call a hook function. 376 * Call a hook function.
362 * 377 *
363 * The call is properly wrapped in code to avoid other callbacks 378 * The call is properly wrapped in code to avoid other callbacks
364 * during the call, and it checks for state changes after the call 379 * during the call, and it checks for state changes after the call
365 * that should cause further callbacks. 380 * that should cause further callbacks.
366 */ 381 */
(...skipping 21 matching lines...) Expand all
388 if (_hasPending && _pending.isEmpty) { 403 if (_hasPending && _pending.isEmpty) {
389 _state &= ~_STATE_HAS_PENDING; 404 _state &= ~_STATE_HAS_PENDING;
390 if (_isInputPaused && _mayResumeInput) { 405 if (_isInputPaused && _mayResumeInput) {
391 _state &= ~_STATE_INPUT_PAUSED; 406 _state &= ~_STATE_INPUT_PAUSED;
392 } 407 }
393 } 408 }
394 // If the state changes during a callback, we immediately 409 // If the state changes during a callback, we immediately
395 // make a new state-change callback. Loop until the state didn't change. 410 // make a new state-change callback. Loop until the state didn't change.
396 while (true) { 411 while (true) {
397 if (_isCanceled) { 412 if (_isCanceled) {
398 _onCancel();
399 _pending = null; 413 _pending = null;
400 return; 414 return;
401 } 415 }
402 bool isInputPaused = _isInputPaused; 416 bool isInputPaused = _isInputPaused;
403 if (wasInputPaused == isInputPaused) break; 417 if (wasInputPaused == isInputPaused) break;
404 _state ^= _STATE_IN_CALLBACK; 418 _state ^= _STATE_IN_CALLBACK;
405 if (isInputPaused) { 419 if (isInputPaused) {
406 _onPause(); 420 _onPause();
407 } else { 421 } else {
408 _onResume(); 422 _onResume();
(...skipping 298 matching lines...) Expand 10 before | Expand all | Expand 10 after
707 void onError(void handleError(Object data)) {} 721 void onError(void handleError(Object data)) {}
708 void onDone(void handleDone()) {} 722 void onDone(void handleDone()) {}
709 723
710 void pause([Future resumeSignal]) { 724 void pause([Future resumeSignal]) {
711 _pauseCounter++; 725 _pauseCounter++;
712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); 726 if (resumeSignal != null) resumeSignal.then((_) { resume(); });
713 } 727 }
714 void resume() { 728 void resume() {
715 if (_pauseCounter > 0) _pauseCounter--; 729 if (_pauseCounter > 0) _pauseCounter--;
716 } 730 }
717 void cancel() {} 731 Future cancel() => null;
718 bool get isPaused => _pauseCounter > 0; 732 bool get isPaused => _pauseCounter > 0;
719 733
720 Future asFuture([futureValue]) => new _Future(); 734 Future asFuture([futureValue]) => new _Future();
721 } 735 }
722 736
723 class _AsBroadcastStream<T> extends Stream<T> { 737 class _AsBroadcastStream<T> extends Stream<T> {
724 final Stream<T> _source; 738 final Stream<T> _source;
725 final _broadcastCallback _onListenHandler; 739 final _broadcastCallback _onListenHandler;
726 final _broadcastCallback _onCancelHandler; 740 final _broadcastCallback _onCancelHandler;
727 final Zone _zone; 741 final Zone _zone;
(...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after
830 } 844 }
831 845
832 void pause([Future resumeSignal]) { 846 void pause([Future resumeSignal]) {
833 _stream._pauseSubscription(resumeSignal); 847 _stream._pauseSubscription(resumeSignal);
834 } 848 }
835 849
836 void resume() { 850 void resume() {
837 _stream._resumeSubscription(); 851 _stream._resumeSubscription();
838 } 852 }
839 853
840 void cancel() { 854 Future cancel() {
841 _stream._cancelSubscription(); 855 _stream._cancelSubscription();
856 return null;
842 } 857 }
843 858
844 bool get isPaused { 859 bool get isPaused {
845 return _stream._isSubscriptionPaused; 860 return _stream._isSubscriptionPaused;
846 } 861 }
847 862
848 Future asFuture([var futureValue]) { 863 Future asFuture([var futureValue]) {
849 throw new UnsupportedError( 864 throw new UnsupportedError(
850 "Cannot change handlers of asBroadcastStream source subscription."); 865 "Cannot change handlers of asBroadcastStream source subscription.");
851 } 866 }
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
942 } 957 }
943 958
944 /** Clears up the internal state when the iterator ends. */ 959 /** Clears up the internal state when the iterator ends. */
945 void _clear() { 960 void _clear() {
946 _subscription = null; 961 _subscription = null;
947 _futureOrPrefetch = null; 962 _futureOrPrefetch = null;
948 _current = null; 963 _current = null;
949 _state = _STATE_DONE; 964 _state = _STATE_DONE;
950 } 965 }
951 966
952 void cancel() { 967 Future cancel() {
953 StreamSubscription subscription = _subscription; 968 StreamSubscription subscription = _subscription;
954 if (_state == _STATE_MOVING) { 969 if (_state == _STATE_MOVING) {
955 _Future<bool> hasNext = _futureOrPrefetch; 970 _Future<bool> hasNext = _futureOrPrefetch;
956 _clear(); 971 _clear();
957 hasNext._complete(false); 972 hasNext._complete(false);
958 } else { 973 } else {
959 _clear(); 974 _clear();
960 } 975 }
961 subscription.cancel(); 976 return subscription.cancel();
962 } 977 }
963 978
964 void _onData(T data) { 979 void _onData(T data) {
965 if (_state == _STATE_MOVING) { 980 if (_state == _STATE_MOVING) {
966 _current = data; 981 _current = data;
967 _Future<bool> hasNext = _futureOrPrefetch; 982 _Future<bool> hasNext = _futureOrPrefetch;
968 _futureOrPrefetch = null; 983 _futureOrPrefetch = null;
969 _state = _STATE_FOUND; 984 _state = _STATE_FOUND;
970 hasNext._complete(true); 985 hasNext._complete(true);
971 return; 986 return;
(...skipping 23 matching lines...) Expand all
995 _Future<bool> hasNext = _futureOrPrefetch; 1010 _Future<bool> hasNext = _futureOrPrefetch;
996 _clear(); 1011 _clear();
997 hasNext._complete(false); 1012 hasNext._complete(false);
998 return; 1013 return;
999 } 1014 }
1000 _subscription.pause(); 1015 _subscription.pause();
1001 _futureOrPrefetch = null; 1016 _futureOrPrefetch = null;
1002 _state = _STATE_EXTRA_DONE; 1017 _state = _STATE_EXTRA_DONE;
1003 } 1018 }
1004 } 1019 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698