| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 66 * after a call to [resume] if there are pending events. | 66 * after a call to [resume] if there are pending events. |
| 67 */ | 67 */ |
| 68 static const int _STATE_INPUT_PAUSED = 4; | 68 static const int _STATE_INPUT_PAUSED = 4; |
| 69 /** | 69 /** |
| 70 * Whether the subscription has been canceled. | 70 * Whether the subscription has been canceled. |
| 71 * | 71 * |
| 72 * Set by calling [cancel], or by handling a "done" event, or an "error" event | 72 * Set by calling [cancel], or by handling a "done" event, or an "error" event |
| 73 * when `cancelOnError` is true. | 73 * when `cancelOnError` is true. |
| 74 */ | 74 */ |
| 75 static const int _STATE_CANCELED = 8; | 75 static const int _STATE_CANCELED = 8; |
| 76 static const int _STATE_IN_CALLBACK = 16; | 76 /** |
| 77 static const int _STATE_HAS_PENDING = 32; | 77 * Set when either: |
| 78 static const int _STATE_PAUSE_COUNT = 64; | 78 * |
| 79 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 79 * * an error is sent, and [cancelOnError] is true, or |
| 80 * * a done event is sent. |
| 81 * |
| 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
| 83 * state is unset, and no furher events must be delivered. |
| 84 */ |
| 85 static const int _STATE_WAIT_FOR_CANCEL = 16; |
| 86 static const int _STATE_IN_CALLBACK = 32; |
| 87 static const int _STATE_HAS_PENDING = 64; |
| 88 static const int _STATE_PAUSE_COUNT = 128; |
| 89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
| 80 | 90 |
| 81 /* Event handlers provided in constructor. */ | 91 /* Event handlers provided in constructor. */ |
| 82 _DataHandler<T> _onData; | 92 _DataHandler<T> _onData; |
| 83 Function _onError; | 93 Function _onError; |
| 84 _DoneHandler _onDone; | 94 _DoneHandler _onDone; |
| 85 final Zone _zone = Zone.current; | 95 final Zone _zone = Zone.current; |
| 86 | 96 |
| 87 /** Bit vector based on state-constants above. */ | 97 /** Bit vector based on state-constants above. */ |
| 88 int _state; | 98 int _state; |
| 89 | 99 |
| 100 // TODO(floitsch): reuse another field |
| 101 /** The future [_onCancel] may return. */ |
| 102 Future _cancelFuture; |
| 103 |
| 90 /** | 104 /** |
| 91 * Queue of pending events. | 105 * Queue of pending events. |
| 92 * | 106 * |
| 93 * Is created when necessary, or set in constructor for preconfigured events. | 107 * Is created when necessary, or set in constructor for preconfigured events. |
| 94 */ | 108 */ |
| 95 _PendingEvents _pending; | 109 _PendingEvents _pending; |
| 96 | 110 |
| 97 _BufferingStreamSubscription(bool cancelOnError) | 111 _BufferingStreamSubscription(bool cancelOnError) |
| 98 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); | 112 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); |
| 99 | 113 |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 164 _pending.schedule(this); | 178 _pending.schedule(this); |
| 165 } else { | 179 } else { |
| 166 assert(_mayResumeInput); | 180 assert(_mayResumeInput); |
| 167 _state &= ~_STATE_INPUT_PAUSED; | 181 _state &= ~_STATE_INPUT_PAUSED; |
| 168 if (!_inCallback) _guardCallback(_onResume); | 182 if (!_inCallback) _guardCallback(_onResume); |
| 169 } | 183 } |
| 170 } | 184 } |
| 171 } | 185 } |
| 172 } | 186 } |
| 173 | 187 |
| 174 void cancel() { | 188 Future cancel() { |
| 175 if (_isCanceled) return; | 189 // The user doesn't want to receive any further events. If there is an |
| 190 // error or done event pending (waiting for the cancel to be done) discard |
| 191 // that event. |
| 192 _state &= ~_STATE_WAIT_FOR_CANCEL; |
| 193 if (_isCanceled) return _cancelFuture; |
| 176 _cancel(); | 194 _cancel(); |
| 177 if (!_inCallback) { | 195 return _cancelFuture; |
| 178 // otherwise checkState will be called after firing or callback completes. | |
| 179 _state |= _STATE_IN_CALLBACK; | |
| 180 _onCancel(); | |
| 181 _pending = null; | |
| 182 _state &= ~_STATE_IN_CALLBACK; | |
| 183 } | |
| 184 } | 196 } |
| 185 | 197 |
| 186 Future asFuture([var futureValue]) { | 198 Future asFuture([var futureValue]) { |
| 187 _Future<T> result = new _Future<T>(); | 199 _Future<T> result = new _Future<T>(); |
| 188 | 200 |
| 189 // Overwrite the onDone and onError handlers. | 201 // Overwrite the onDone and onError handlers. |
| 190 _onDone = () { result._complete(futureValue); }; | 202 _onDone = () { result._complete(futureValue); }; |
| 191 _onError = (error, stackTrace) { | 203 _onError = (error, stackTrace) { |
| 192 cancel(); | 204 cancel(); |
| 193 result._completeError(error, stackTrace); | 205 result._completeError(error, stackTrace); |
| 194 }; | 206 }; |
| 195 | 207 |
| 196 return result; | 208 return result; |
| 197 } | 209 } |
| 198 | 210 |
| 199 // State management. | 211 // State management. |
| 200 | 212 |
| 201 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 213 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| 202 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 214 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| 203 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 215 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| 216 bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; |
| 204 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | 217 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
| 205 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | 218 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
| 206 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | 219 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
| 207 bool get _canFire => _state < _STATE_IN_CALLBACK; | 220 bool get _canFire => _state < _STATE_IN_CALLBACK; |
| 208 bool get _mayResumeInput => | 221 bool get _mayResumeInput => |
| 209 !_isPaused && (_pending == null || _pending.isEmpty); | 222 !_isPaused && (_pending == null || _pending.isEmpty); |
| 210 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | 223 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
| 211 | 224 |
| 212 bool get isPaused => _isPaused; | 225 bool get isPaused => _isPaused; |
| 213 | 226 |
| 214 void _cancel() { | 227 void _cancel() { |
| 215 _state |= _STATE_CANCELED; | 228 _state |= _STATE_CANCELED; |
| 216 if (_hasPending) { | 229 if (_hasPending) { |
| 217 _pending.cancelSchedule(); | 230 _pending.cancelSchedule(); |
| 218 } | 231 } |
| 232 if (!_inCallback) _pending = null; |
| 233 _cancelFuture = _onCancel(); |
| 219 } | 234 } |
| 220 | 235 |
| 221 /** | 236 /** |
| 222 * Increment the pause count. | 237 * Increment the pause count. |
| 223 * | 238 * |
| 224 * Also marks input as paused. | 239 * Also marks input as paused. |
| 225 */ | 240 */ |
| 226 void _incrementPauseCount() { | 241 void _incrementPauseCount() { |
| 227 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 242 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| 228 } | 243 } |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 276 // try/catch wrapping and send any errors to | 291 // try/catch wrapping and send any errors to |
| 277 // [_Zone.current.handleUncaughtError]. | 292 // [_Zone.current.handleUncaughtError]. |
| 278 void _onPause() { | 293 void _onPause() { |
| 279 assert(_isInputPaused); | 294 assert(_isInputPaused); |
| 280 } | 295 } |
| 281 | 296 |
| 282 void _onResume() { | 297 void _onResume() { |
| 283 assert(!_isInputPaused); | 298 assert(!_isInputPaused); |
| 284 } | 299 } |
| 285 | 300 |
| 286 void _onCancel() { | 301 Future _onCancel() { |
| 287 assert(_isCanceled); | 302 assert(_isCanceled); |
| 288 } | 303 } |
| 289 | 304 |
| 290 // Handle pending events. | 305 // Handle pending events. |
| 291 | 306 |
| 292 /** | 307 /** |
| 293 * Add a pending event. | 308 * Add a pending event. |
| 294 * | 309 * |
| 295 * If the subscription is not paused, this also schedules a firing | 310 * If the subscription is not paused, this also schedules a firing |
| 296 * of pending events later (if necessary). | 311 * of pending events later (if necessary). |
| (...skipping 21 matching lines...) Expand all Loading... |
| 318 _zone.runUnaryGuarded(_onData, data); | 333 _zone.runUnaryGuarded(_onData, data); |
| 319 _state &= ~_STATE_IN_CALLBACK; | 334 _state &= ~_STATE_IN_CALLBACK; |
| 320 _checkState(wasInputPaused); | 335 _checkState(wasInputPaused); |
| 321 } | 336 } |
| 322 | 337 |
| 323 void _sendError(var error, StackTrace stackTrace) { | 338 void _sendError(var error, StackTrace stackTrace) { |
| 324 assert(!_isCanceled); | 339 assert(!_isCanceled); |
| 325 assert(!_isPaused); | 340 assert(!_isPaused); |
| 326 assert(!_inCallback); | 341 assert(!_inCallback); |
| 327 bool wasInputPaused = _isInputPaused; | 342 bool wasInputPaused = _isInputPaused; |
| 328 _state |= _STATE_IN_CALLBACK; | 343 |
| 329 if (!_zone.inSameErrorZone(Zone.current)) { | 344 void sendError() { |
| 330 // Errors are not allowed to traverse zone boundaries. | 345 // If the subscription has been canceled while waiting for the cancel |
| 331 Zone.current.handleUncaughtError(error, stackTrace); | 346 // future to finish we must not report the error. |
| 332 } else if (_onError is ZoneBinaryCallback) { | 347 if (_isCanceled && !_waitsForCancel) return; |
| 333 _zone.runBinaryGuarded(_onError, error, stackTrace); | 348 _state |= _STATE_IN_CALLBACK; |
| 349 if (!_zone.inSameErrorZone(Zone.current)) { |
| 350 // Errors are not allowed to traverse zone boundaries. |
| 351 Zone.current.handleUncaughtError(error, stackTrace); |
| 352 } else if (_onError is ZoneBinaryCallback) { |
| 353 _zone.runBinaryGuarded(_onError, error, stackTrace); |
| 354 } else { |
| 355 _zone.runUnaryGuarded(_onError, error); |
| 356 } |
| 357 _state &= ~_STATE_IN_CALLBACK; |
| 358 } |
| 359 |
| 360 if (_cancelOnError) { |
| 361 _state |= _STATE_WAIT_FOR_CANCEL; |
| 362 _cancel(); |
| 363 if (_cancelFuture is Future) { |
| 364 _cancelFuture.whenComplete(sendError); |
| 365 } else { |
| 366 sendError(); |
| 367 } |
| 334 } else { | 368 } else { |
| 335 _zone.runUnaryGuarded(_onError, error); | 369 sendError(); |
| 370 // Only check state if not cancelOnError. |
| 371 _checkState(wasInputPaused); |
| 336 } | 372 } |
| 337 _state &= ~_STATE_IN_CALLBACK; | |
| 338 if (_cancelOnError) { | |
| 339 _cancel(); | |
| 340 } | |
| 341 _checkState(wasInputPaused); | |
| 342 } | 373 } |
| 343 | 374 |
| 344 void _sendDone() { | 375 void _sendDone() { |
| 345 assert(!_isCanceled); | 376 assert(!_isCanceled); |
| 346 assert(!_isPaused); | 377 assert(!_isPaused); |
| 347 assert(!_inCallback); | 378 assert(!_inCallback); |
| 348 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | 379 |
| 349 _zone.runGuarded(_onDone); | 380 void sendDone() { |
| 350 _onCancel(); // No checkState after cancel, it is always the last event. | 381 // If the subscription has been canceled while waiting for the cancel |
| 351 _state &= ~_STATE_IN_CALLBACK; | 382 // future to finish we must not report the done event. |
| 383 if (!_waitsForCancel) return; |
| 384 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| 385 _zone.runGuarded(_onDone); |
| 386 _state &= ~_STATE_IN_CALLBACK; |
| 387 } |
| 388 |
| 389 _cancel(); |
| 390 _state |= _STATE_WAIT_FOR_CANCEL; |
| 391 if (_cancelFuture is Future) { |
| 392 _cancelFuture.whenComplete(sendDone); |
| 393 } else { |
| 394 sendDone(); |
| 395 } |
| 352 } | 396 } |
| 353 | 397 |
| 354 /** | 398 /** |
| 355 * Call a hook function. | 399 * Call a hook function. |
| 356 * | 400 * |
| 357 * The call is properly wrapped in code to avoid other callbacks | 401 * The call is properly wrapped in code to avoid other callbacks |
| 358 * during the call, and it checks for state changes after the call | 402 * during the call, and it checks for state changes after the call |
| 359 * that should cause further callbacks. | 403 * that should cause further callbacks. |
| 360 */ | 404 */ |
| 361 void _guardCallback(callback) { | 405 void _guardCallback(callback) { |
| (...skipping 20 matching lines...) Expand all Loading... |
| 382 if (_hasPending && _pending.isEmpty) { | 426 if (_hasPending && _pending.isEmpty) { |
| 383 _state &= ~_STATE_HAS_PENDING; | 427 _state &= ~_STATE_HAS_PENDING; |
| 384 if (_isInputPaused && _mayResumeInput) { | 428 if (_isInputPaused && _mayResumeInput) { |
| 385 _state &= ~_STATE_INPUT_PAUSED; | 429 _state &= ~_STATE_INPUT_PAUSED; |
| 386 } | 430 } |
| 387 } | 431 } |
| 388 // If the state changes during a callback, we immediately | 432 // If the state changes during a callback, we immediately |
| 389 // make a new state-change callback. Loop until the state didn't change. | 433 // make a new state-change callback. Loop until the state didn't change. |
| 390 while (true) { | 434 while (true) { |
| 391 if (_isCanceled) { | 435 if (_isCanceled) { |
| 392 _onCancel(); | |
| 393 _pending = null; | 436 _pending = null; |
| 394 return; | 437 return; |
| 395 } | 438 } |
| 396 bool isInputPaused = _isInputPaused; | 439 bool isInputPaused = _isInputPaused; |
| 397 if (wasInputPaused == isInputPaused) break; | 440 if (wasInputPaused == isInputPaused) break; |
| 398 _state ^= _STATE_IN_CALLBACK; | 441 _state ^= _STATE_IN_CALLBACK; |
| 399 if (isInputPaused) { | 442 if (isInputPaused) { |
| 400 _onPause(); | 443 _onPause(); |
| 401 } else { | 444 } else { |
| 402 _onResume(); | 445 _onResume(); |
| (...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 692 void onError(Function handleError) {} | 735 void onError(Function handleError) {} |
| 693 void onDone(void handleDone()) {} | 736 void onDone(void handleDone()) {} |
| 694 | 737 |
| 695 void pause([Future resumeSignal]) { | 738 void pause([Future resumeSignal]) { |
| 696 _pauseCounter++; | 739 _pauseCounter++; |
| 697 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 740 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
| 698 } | 741 } |
| 699 void resume() { | 742 void resume() { |
| 700 if (_pauseCounter > 0) _pauseCounter--; | 743 if (_pauseCounter > 0) _pauseCounter--; |
| 701 } | 744 } |
| 702 void cancel() {} | 745 Future cancel() => null; |
| 703 bool get isPaused => _pauseCounter > 0; | 746 bool get isPaused => _pauseCounter > 0; |
| 704 | 747 |
| 705 Future asFuture([futureValue]) => new _Future(); | 748 Future asFuture([futureValue]) => new _Future(); |
| 706 } | 749 } |
| 707 | 750 |
| 708 class _AsBroadcastStream<T> extends Stream<T> { | 751 class _AsBroadcastStream<T> extends Stream<T> { |
| 709 final Stream<T> _source; | 752 final Stream<T> _source; |
| 710 final _broadcastCallback _onListenHandler; | 753 final _broadcastCallback _onListenHandler; |
| 711 final _broadcastCallback _onCancelHandler; | 754 final _broadcastCallback _onCancelHandler; |
| 712 final Zone _zone; | 755 final Zone _zone; |
| (...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 816 } | 859 } |
| 817 | 860 |
| 818 void pause([Future resumeSignal]) { | 861 void pause([Future resumeSignal]) { |
| 819 _stream._pauseSubscription(resumeSignal); | 862 _stream._pauseSubscription(resumeSignal); |
| 820 } | 863 } |
| 821 | 864 |
| 822 void resume() { | 865 void resume() { |
| 823 _stream._resumeSubscription(); | 866 _stream._resumeSubscription(); |
| 824 } | 867 } |
| 825 | 868 |
| 826 void cancel() { | 869 Future cancel() { |
| 827 _stream._cancelSubscription(); | 870 _stream._cancelSubscription(); |
| 871 return null; |
| 828 } | 872 } |
| 829 | 873 |
| 830 bool get isPaused { | 874 bool get isPaused { |
| 831 return _stream._isSubscriptionPaused; | 875 return _stream._isSubscriptionPaused; |
| 832 } | 876 } |
| 833 | 877 |
| 834 Future asFuture([var futureValue]) { | 878 Future asFuture([var futureValue]) { |
| 835 throw new UnsupportedError( | 879 throw new UnsupportedError( |
| 836 "Cannot change handlers of asBroadcastStream source subscription."); | 880 "Cannot change handlers of asBroadcastStream source subscription."); |
| 837 } | 881 } |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 928 } | 972 } |
| 929 | 973 |
| 930 /** Clears up the internal state when the iterator ends. */ | 974 /** Clears up the internal state when the iterator ends. */ |
| 931 void _clear() { | 975 void _clear() { |
| 932 _subscription = null; | 976 _subscription = null; |
| 933 _futureOrPrefetch = null; | 977 _futureOrPrefetch = null; |
| 934 _current = null; | 978 _current = null; |
| 935 _state = _STATE_DONE; | 979 _state = _STATE_DONE; |
| 936 } | 980 } |
| 937 | 981 |
| 938 void cancel() { | 982 Future cancel() { |
| 939 StreamSubscription subscription = _subscription; | 983 StreamSubscription subscription = _subscription; |
| 940 if (_state == _STATE_MOVING) { | 984 if (_state == _STATE_MOVING) { |
| 941 _Future<bool> hasNext = _futureOrPrefetch; | 985 _Future<bool> hasNext = _futureOrPrefetch; |
| 942 _clear(); | 986 _clear(); |
| 943 hasNext._complete(false); | 987 hasNext._complete(false); |
| 944 } else { | 988 } else { |
| 945 _clear(); | 989 _clear(); |
| 946 } | 990 } |
| 947 subscription.cancel(); | 991 return subscription.cancel(); |
| 948 } | 992 } |
| 949 | 993 |
| 950 void _onData(T data) { | 994 void _onData(T data) { |
| 951 if (_state == _STATE_MOVING) { | 995 if (_state == _STATE_MOVING) { |
| 952 _current = data; | 996 _current = data; |
| 953 _Future<bool> hasNext = _futureOrPrefetch; | 997 _Future<bool> hasNext = _futureOrPrefetch; |
| 954 _futureOrPrefetch = null; | 998 _futureOrPrefetch = null; |
| 955 _state = _STATE_FOUND; | 999 _state = _STATE_FOUND; |
| 956 hasNext._complete(true); | 1000 hasNext._complete(true); |
| 957 return; | 1001 return; |
| (...skipping 23 matching lines...) Expand all Loading... |
| 981 _Future<bool> hasNext = _futureOrPrefetch; | 1025 _Future<bool> hasNext = _futureOrPrefetch; |
| 982 _clear(); | 1026 _clear(); |
| 983 hasNext._complete(false); | 1027 hasNext._complete(false); |
| 984 return; | 1028 return; |
| 985 } | 1029 } |
| 986 _subscription.pause(); | 1030 _subscription.pause(); |
| 987 _futureOrPrefetch = null; | 1031 _futureOrPrefetch = null; |
| 988 _state = _STATE_EXTRA_DONE; | 1032 _state = _STATE_EXTRA_DONE; |
| 989 } | 1033 } |
| 990 } | 1034 } |
| OLD | NEW |