Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error); | 10 void _addError(Object error); |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 64 * after a call to [resume] if there are pending events. | 64 * after a call to [resume] if there are pending events. |
| 65 */ | 65 */ |
| 66 static const int _STATE_INPUT_PAUSED = 4; | 66 static const int _STATE_INPUT_PAUSED = 4; |
| 67 /** | 67 /** |
| 68 * Whether the subscription has been canceled. | 68 * Whether the subscription has been canceled. |
| 69 * | 69 * |
| 70 * Set by calling [cancel], or by handling a "done" event, or an "error" event | 70 * Set by calling [cancel], or by handling a "done" event, or an "error" event |
| 71 * when `cancelOnError` is true. | 71 * when `cancelOnError` is true. |
| 72 */ | 72 */ |
| 73 static const int _STATE_CANCELED = 8; | 73 static const int _STATE_CANCELED = 8; |
| 74 static const int _STATE_IN_CALLBACK = 16; | 74 static const int _STATE_IN_ERROR_CANCEL = 16; |
|
floitsch
2013/10/12 18:53:57
Add comment that _STATE_IN_ERROR_CANCEL implies th
Lasse Reichstein Nielsen
2013/10/14 11:32:33
And document what it means. In painful detail, bec
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 75 static const int _STATE_HAS_PENDING = 32; | 75 static const int _STATE_IN_CALLBACK = 32; |
| 76 static const int _STATE_PAUSE_COUNT = 64; | 76 static const int _STATE_HAS_PENDING = 64; |
| 77 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 77 static const int _STATE_PAUSE_COUNT = 128; |
| 78 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
| 78 | 79 |
| 79 /* Event handlers provided in constructor. */ | 80 /* Event handlers provided in constructor. */ |
| 80 _DataHandler<T> _onData; | 81 _DataHandler<T> _onData; |
| 81 _ErrorHandler _onError; | 82 _ErrorHandler _onError; |
| 82 _DoneHandler _onDone; | 83 _DoneHandler _onDone; |
| 83 final Zone _zone = Zone.current; | 84 final Zone _zone = Zone.current; |
| 84 | 85 |
| 85 /** Bit vector based on state-constants above. */ | 86 /** Bit vector based on state-constants above. */ |
| 86 int _state; | 87 int _state; |
| 87 | 88 |
| 89 _Future _cancelFuture; | |
|
floitsch
2013/10/12 18:53:57
Add "TODO(floitsch): reuse another field"
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 90 | |
| 88 /** | 91 /** |
| 89 * Queue of pending events. | 92 * Queue of pending events. |
| 90 * | 93 * |
| 91 * Is created when necessary, or set in constructor for preconfigured events. | 94 * Is created when necessary, or set in constructor for preconfigured events. |
| 92 */ | 95 */ |
| 93 _PendingEvents _pending; | 96 _PendingEvents _pending; |
| 94 | 97 |
| 95 _BufferingStreamSubscription(void onData(T data), | 98 _BufferingStreamSubscription(void onData(T data), |
| 96 void onError(error), | 99 void onError(error), |
| 97 void onDone(), | 100 void onDone(), |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 172 _pending.schedule(this); | 175 _pending.schedule(this); |
| 173 } else { | 176 } else { |
| 174 assert(_mayResumeInput); | 177 assert(_mayResumeInput); |
| 175 _state &= ~_STATE_INPUT_PAUSED; | 178 _state &= ~_STATE_INPUT_PAUSED; |
| 176 if (!_inCallback) _guardCallback(_onResume); | 179 if (!_inCallback) _guardCallback(_onResume); |
| 177 } | 180 } |
| 178 } | 181 } |
| 179 } | 182 } |
| 180 } | 183 } |
| 181 | 184 |
| 182 void cancel() { | 185 Future cancel() { |
| 183 if (_isCanceled) return; | 186 _state &= ~_STATE_IN_ERROR_CANCEL; |
|
floitsch
2013/10/12 18:53:57
Add comment.
// The user doesn't want to receive a
Lasse Reichstein Nielsen
2013/10/14 11:32:33
any events anymore -> any further events.
This co
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 187 if (_isCanceled) return _cancelFuture; | |
| 184 _cancel(); | 188 _cancel(); |
| 185 if (!_inCallback) { | 189 return _cancelFuture; |
| 186 // otherwise checkState will be called after firing or callback completes. | |
| 187 _state |= _STATE_IN_CALLBACK; | |
| 188 _onCancel(); | |
| 189 _pending = null; | |
| 190 _state &= ~_STATE_IN_CALLBACK; | |
| 191 } | |
| 192 } | 190 } |
| 193 | 191 |
| 194 Future asFuture([var futureValue]) { | 192 Future asFuture([var futureValue]) { |
| 195 _Future<T> result = new _Future<T>(); | 193 _Future<T> result = new _Future<T>(); |
| 196 | 194 |
| 197 // Overwrite the onDone and onError handlers. | 195 // Overwrite the onDone and onError handlers. |
| 198 _onDone = () { result._complete(futureValue); }; | 196 _onDone = () { result._complete(futureValue); }; |
| 199 _onError = (error) { | 197 _onError = (error) { |
| 200 cancel(); | 198 cancel(); |
| 201 result._completeError(error); | 199 result._completeError(error); |
| 202 }; | 200 }; |
| 203 | 201 |
| 204 return result; | 202 return result; |
| 205 } | 203 } |
| 206 | 204 |
| 207 // State management. | 205 // State management. |
| 208 | 206 |
| 209 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 207 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| 210 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 208 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| 211 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 209 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| 210 bool get _inErrorCancel => (_state & _STATE_IN_ERROR_CANCEL) != 0; | |
| 212 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | 211 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
| 213 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | 212 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
| 214 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | 213 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
| 215 bool get _canFire => _state < _STATE_IN_CALLBACK; | 214 bool get _canFire => _state < _STATE_IN_CALLBACK; |
| 216 bool get _mayResumeInput => | 215 bool get _mayResumeInput => |
| 217 !_isPaused && (_pending == null || _pending.isEmpty); | 216 !_isPaused && (_pending == null || _pending.isEmpty); |
| 218 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | 217 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
| 219 | 218 |
| 220 bool get isPaused => _isPaused; | 219 bool get isPaused => _isPaused; |
| 221 | 220 |
| 222 void _cancel() { | 221 void _cancel() { |
| 223 _state |= _STATE_CANCELED; | 222 _state |= _STATE_CANCELED; |
| 224 if (_hasPending) { | 223 if (_hasPending) { |
| 225 _pending.cancelSchedule(); | 224 _pending.cancelSchedule(); |
| 226 } | 225 } |
| 226 if (!_inCallback) _pending = null; | |
| 227 _cancelFuture = _onCancel(); | |
|
floitsch
2013/10/12 18:53:57
I'm not sure we are allowed to call "_onCancel" wh
Lasse Reichstein Nielsen
2013/10/14 11:32:33
I accepted that we had to call _onCancel during an
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 227 } | 228 } |
| 228 | 229 |
| 229 /** | 230 /** |
| 230 * Increment the pause count. | 231 * Increment the pause count. |
| 231 * | 232 * |
| 232 * Also marks input as paused. | 233 * Also marks input as paused. |
| 233 */ | 234 */ |
| 234 void _incrementPauseCount() { | 235 void _incrementPauseCount() { |
| 235 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 236 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| 236 } | 237 } |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 284 // try/catch wrapping and send any errors to | 285 // try/catch wrapping and send any errors to |
| 285 // [_Zone.current.handleUncaughtError]. | 286 // [_Zone.current.handleUncaughtError]. |
| 286 void _onPause() { | 287 void _onPause() { |
| 287 assert(_isInputPaused); | 288 assert(_isInputPaused); |
| 288 } | 289 } |
| 289 | 290 |
| 290 void _onResume() { | 291 void _onResume() { |
| 291 assert(!_isInputPaused); | 292 assert(!_isInputPaused); |
| 292 } | 293 } |
| 293 | 294 |
| 294 void _onCancel() { | 295 Future _onCancel() { |
| 295 assert(_isCanceled); | 296 assert(_isCanceled); |
| 296 } | 297 } |
| 297 | 298 |
| 298 // Handle pending events. | 299 // Handle pending events. |
| 299 | 300 |
| 300 /** | 301 /** |
| 301 * Add a pending event. | 302 * Add a pending event. |
| 302 * | 303 * |
| 303 * If the subscription is not paused, this also schedules a firing | 304 * If the subscription is not paused, this also schedules a firing |
| 304 * of pending events later (if necessary). | 305 * of pending events later (if necessary). |
| (...skipping 21 matching lines...) Expand all Loading... | |
| 326 _zone.runUnaryGuarded(_onData, data); | 327 _zone.runUnaryGuarded(_onData, data); |
| 327 _state &= ~_STATE_IN_CALLBACK; | 328 _state &= ~_STATE_IN_CALLBACK; |
| 328 _checkState(wasInputPaused); | 329 _checkState(wasInputPaused); |
| 329 } | 330 } |
| 330 | 331 |
| 331 void _sendError(var error) { | 332 void _sendError(var error) { |
| 332 assert(!_isCanceled); | 333 assert(!_isCanceled); |
| 333 assert(!_isPaused); | 334 assert(!_isPaused); |
| 334 assert(!_inCallback); | 335 assert(!_inCallback); |
| 335 bool wasInputPaused = _isInputPaused; | 336 bool wasInputPaused = _isInputPaused; |
| 336 _state |= _STATE_IN_CALLBACK; | 337 void sendError() { |
|
floitsch
2013/10/12 18:53:57
one line before and one line after the function.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 337 if (!_zone.inSameErrorZone(Zone.current)) { | 338 if (!_isCanceled || _inErrorCancel) { |
|
floitsch
2013/10/12 18:53:57
Comment: If the subscription has been canceled whi
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 338 // Errors are not allowed to traverse zone boundaries. | 339 _state |= _STATE_IN_CALLBACK; |
| 339 Zone.current.handleUncaughtError(error); | 340 if (!_zone.inSameErrorZone(Zone.current)) { |
| 341 // Errors are not allowed to traverse zone boundaries. | |
| 342 Zone.current.handleUncaughtError(error); | |
| 343 } else { | |
| 344 _zone.runUnaryGuarded(_onError, error); | |
| 345 } | |
| 346 _state &= ~_STATE_IN_CALLBACK; | |
| 347 } | |
| 348 } | |
| 349 if (_cancelOnError) { | |
| 350 _state |= _STATE_IN_ERROR_CANCEL; | |
| 351 _cancel(); | |
| 352 if (_cancelFuture != null) { | |
| 353 _cancelFuture.whenComplete(sendError); | |
| 354 } else { | |
| 355 sendError(); | |
| 356 } | |
| 340 } else { | 357 } else { |
| 341 _zone.runUnaryGuarded(_onError, error); | 358 sendError(); |
| 359 // Only check state if not cancelOnError. | |
| 360 _checkState(wasInputPaused); | |
| 342 } | 361 } |
| 343 _state &= ~_STATE_IN_CALLBACK; | |
| 344 if (_cancelOnError) { | |
| 345 _cancel(); | |
| 346 } | |
| 347 _checkState(wasInputPaused); | |
| 348 } | 362 } |
| 349 | 363 |
| 350 void _sendDone() { | 364 void _sendDone() { |
| 351 assert(!_isCanceled); | 365 assert(!_isCanceled); |
| 352 assert(!_isPaused); | 366 assert(!_isPaused); |
| 353 assert(!_inCallback); | 367 assert(!_inCallback); |
| 354 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | 368 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| 355 _zone.runGuarded(_onDone); | 369 _zone.runGuarded(_onDone); |
| 356 _onCancel(); // No checkState after cancel, it is always the last event. | 370 // TODO(ajohnsen): Run it before _onDone and wait for future? |
|
Anders Johnsen
2013/10/11 12:32:40
Please comment on this one - I'm not sure what the
floitsch
2013/10/12 18:53:57
Neither.
The done event should *not* call cancel.
Anders Johnsen
2013/10/16 11:52:21
Done.
Anders Johnsen
2013/10/16 11:52:21
Done.
| |
| 371 _cancel(); // No checkState after cancel, it is always the last event. | |
| 357 _state &= ~_STATE_IN_CALLBACK; | 372 _state &= ~_STATE_IN_CALLBACK; |
| 358 } | 373 } |
| 359 | 374 |
| 360 /** | 375 /** |
| 361 * Call a hook function. | 376 * Call a hook function. |
| 362 * | 377 * |
| 363 * The call is properly wrapped in code to avoid other callbacks | 378 * The call is properly wrapped in code to avoid other callbacks |
| 364 * during the call, and it checks for state changes after the call | 379 * during the call, and it checks for state changes after the call |
| 365 * that should cause further callbacks. | 380 * that should cause further callbacks. |
| 366 */ | 381 */ |
| (...skipping 21 matching lines...) Expand all Loading... | |
| 388 if (_hasPending && _pending.isEmpty) { | 403 if (_hasPending && _pending.isEmpty) { |
| 389 _state &= ~_STATE_HAS_PENDING; | 404 _state &= ~_STATE_HAS_PENDING; |
| 390 if (_isInputPaused && _mayResumeInput) { | 405 if (_isInputPaused && _mayResumeInput) { |
| 391 _state &= ~_STATE_INPUT_PAUSED; | 406 _state &= ~_STATE_INPUT_PAUSED; |
| 392 } | 407 } |
| 393 } | 408 } |
| 394 // If the state changes during a callback, we immediately | 409 // If the state changes during a callback, we immediately |
| 395 // make a new state-change callback. Loop until the state didn't change. | 410 // make a new state-change callback. Loop until the state didn't change. |
| 396 while (true) { | 411 while (true) { |
| 397 if (_isCanceled) { | 412 if (_isCanceled) { |
| 398 _onCancel(); | |
| 399 _pending = null; | 413 _pending = null; |
| 400 return; | 414 return; |
| 401 } | 415 } |
| 402 bool isInputPaused = _isInputPaused; | 416 bool isInputPaused = _isInputPaused; |
| 403 if (wasInputPaused == isInputPaused) break; | 417 if (wasInputPaused == isInputPaused) break; |
| 404 _state ^= _STATE_IN_CALLBACK; | 418 _state ^= _STATE_IN_CALLBACK; |
| 405 if (isInputPaused) { | 419 if (isInputPaused) { |
| 406 _onPause(); | 420 _onPause(); |
| 407 } else { | 421 } else { |
| 408 _onResume(); | 422 _onResume(); |
| (...skipping 298 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 707 void onError(void handleError(Object data)) {} | 721 void onError(void handleError(Object data)) {} |
| 708 void onDone(void handleDone()) {} | 722 void onDone(void handleDone()) {} |
| 709 | 723 |
| 710 void pause([Future resumeSignal]) { | 724 void pause([Future resumeSignal]) { |
| 711 _pauseCounter++; | 725 _pauseCounter++; |
| 712 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 726 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
| 713 } | 727 } |
| 714 void resume() { | 728 void resume() { |
| 715 if (_pauseCounter > 0) _pauseCounter--; | 729 if (_pauseCounter > 0) _pauseCounter--; |
| 716 } | 730 } |
| 717 void cancel() {} | 731 Future cancel() => null; |
| 718 bool get isPaused => _pauseCounter > 0; | 732 bool get isPaused => _pauseCounter > 0; |
| 719 | 733 |
| 720 Future asFuture([futureValue]) => new _Future(); | 734 Future asFuture([futureValue]) => new _Future(); |
| 721 } | 735 } |
| 722 | 736 |
| 723 class _AsBroadcastStream<T> extends Stream<T> { | 737 class _AsBroadcastStream<T> extends Stream<T> { |
| 724 final Stream<T> _source; | 738 final Stream<T> _source; |
| 725 final _broadcastCallback _onListenHandler; | 739 final _broadcastCallback _onListenHandler; |
| 726 final _broadcastCallback _onCancelHandler; | 740 final _broadcastCallback _onCancelHandler; |
| 727 final Zone _zone; | 741 final Zone _zone; |
| (...skipping 102 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 830 } | 844 } |
| 831 | 845 |
| 832 void pause([Future resumeSignal]) { | 846 void pause([Future resumeSignal]) { |
| 833 _stream._pauseSubscription(resumeSignal); | 847 _stream._pauseSubscription(resumeSignal); |
| 834 } | 848 } |
| 835 | 849 |
| 836 void resume() { | 850 void resume() { |
| 837 _stream._resumeSubscription(); | 851 _stream._resumeSubscription(); |
| 838 } | 852 } |
| 839 | 853 |
| 840 void cancel() { | 854 Future cancel() { |
| 841 _stream._cancelSubscription(); | 855 _stream._cancelSubscription(); |
| 856 return null; | |
| 842 } | 857 } |
| 843 | 858 |
| 844 bool get isPaused { | 859 bool get isPaused { |
| 845 return _stream._isSubscriptionPaused; | 860 return _stream._isSubscriptionPaused; |
| 846 } | 861 } |
| 847 | 862 |
| 848 Future asFuture([var futureValue]) { | 863 Future asFuture([var futureValue]) { |
| 849 throw new UnsupportedError( | 864 throw new UnsupportedError( |
| 850 "Cannot change handlers of asBroadcastStream source subscription."); | 865 "Cannot change handlers of asBroadcastStream source subscription."); |
| 851 } | 866 } |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 942 } | 957 } |
| 943 | 958 |
| 944 /** Clears up the internal state when the iterator ends. */ | 959 /** Clears up the internal state when the iterator ends. */ |
| 945 void _clear() { | 960 void _clear() { |
| 946 _subscription = null; | 961 _subscription = null; |
| 947 _futureOrPrefetch = null; | 962 _futureOrPrefetch = null; |
| 948 _current = null; | 963 _current = null; |
| 949 _state = _STATE_DONE; | 964 _state = _STATE_DONE; |
| 950 } | 965 } |
| 951 | 966 |
| 952 void cancel() { | 967 Future cancel() { |
| 953 StreamSubscription subscription = _subscription; | 968 StreamSubscription subscription = _subscription; |
| 954 if (_state == _STATE_MOVING) { | 969 if (_state == _STATE_MOVING) { |
| 955 _Future<bool> hasNext = _futureOrPrefetch; | 970 _Future<bool> hasNext = _futureOrPrefetch; |
| 956 _clear(); | 971 _clear(); |
| 957 hasNext._complete(false); | 972 hasNext._complete(false); |
| 958 } else { | 973 } else { |
| 959 _clear(); | 974 _clear(); |
| 960 } | 975 } |
| 961 subscription.cancel(); | 976 return subscription.cancel(); |
| 962 } | 977 } |
| 963 | 978 |
| 964 void _onData(T data) { | 979 void _onData(T data) { |
| 965 if (_state == _STATE_MOVING) { | 980 if (_state == _STATE_MOVING) { |
| 966 _current = data; | 981 _current = data; |
| 967 _Future<bool> hasNext = _futureOrPrefetch; | 982 _Future<bool> hasNext = _futureOrPrefetch; |
| 968 _futureOrPrefetch = null; | 983 _futureOrPrefetch = null; |
| 969 _state = _STATE_FOUND; | 984 _state = _STATE_FOUND; |
| 970 hasNext._complete(true); | 985 hasNext._complete(true); |
| 971 return; | 986 return; |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 995 _Future<bool> hasNext = _futureOrPrefetch; | 1010 _Future<bool> hasNext = _futureOrPrefetch; |
| 996 _clear(); | 1011 _clear(); |
| 997 hasNext._complete(false); | 1012 hasNext._complete(false); |
| 998 return; | 1013 return; |
| 999 } | 1014 } |
| 1000 _subscription.pause(); | 1015 _subscription.pause(); |
| 1001 _futureOrPrefetch = null; | 1016 _futureOrPrefetch = null; |
| 1002 _state = _STATE_EXTRA_DONE; | 1017 _state = _STATE_EXTRA_DONE; |
| 1003 } | 1018 } |
| 1004 } | 1019 } |
| OLD | NEW |