OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
66 * after a call to [resume] if there are pending events. | 66 * after a call to [resume] if there are pending events. |
67 */ | 67 */ |
68 static const int _STATE_INPUT_PAUSED = 4; | 68 static const int _STATE_INPUT_PAUSED = 4; |
69 /** | 69 /** |
70 * Whether the subscription has been canceled. | 70 * Whether the subscription has been canceled. |
71 * | 71 * |
72 * Set by calling [cancel], or by handling a "done" event, or an "error" event | 72 * Set by calling [cancel], or by handling a "done" event, or an "error" event |
73 * when `cancelOnError` is true. | 73 * when `cancelOnError` is true. |
74 */ | 74 */ |
75 static const int _STATE_CANCELED = 8; | 75 static const int _STATE_CANCELED = 8; |
76 static const int _STATE_IN_CALLBACK = 16; | 76 /** |
77 static const int _STATE_HAS_PENDING = 32; | 77 * Set when either: |
78 static const int _STATE_PAUSE_COUNT = 64; | 78 * |
79 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 79 * * an error is sent, and [cancelOnError] is true, or |
| 80 * * a done event is sent. |
| 81 * |
| 82 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the |
| 83 * state is unset, and no furher events must be delivered. |
| 84 */ |
| 85 static const int _STATE_WAIT_FOR_CANCEL = 16; |
| 86 static const int _STATE_IN_CALLBACK = 32; |
| 87 static const int _STATE_HAS_PENDING = 64; |
| 88 static const int _STATE_PAUSE_COUNT = 128; |
| 89 static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
80 | 90 |
81 /* Event handlers provided in constructor. */ | 91 /* Event handlers provided in constructor. */ |
82 _DataHandler<T> _onData; | 92 _DataHandler<T> _onData; |
83 Function _onError; | 93 Function _onError; |
84 _DoneHandler _onDone; | 94 _DoneHandler _onDone; |
85 final Zone _zone = Zone.current; | 95 final Zone _zone = Zone.current; |
86 | 96 |
87 /** Bit vector based on state-constants above. */ | 97 /** Bit vector based on state-constants above. */ |
88 int _state; | 98 int _state; |
89 | 99 |
| 100 // TODO(floitsch): reuse another field |
| 101 /** The future [_onCancel] may return. */ |
| 102 Future _cancelFuture; |
| 103 |
90 /** | 104 /** |
91 * Queue of pending events. | 105 * Queue of pending events. |
92 * | 106 * |
93 * Is created when necessary, or set in constructor for preconfigured events. | 107 * Is created when necessary, or set in constructor for preconfigured events. |
94 */ | 108 */ |
95 _PendingEvents _pending; | 109 _PendingEvents _pending; |
96 | 110 |
97 _BufferingStreamSubscription(bool cancelOnError) | 111 _BufferingStreamSubscription(bool cancelOnError) |
98 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); | 112 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); |
99 | 113 |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
164 _pending.schedule(this); | 178 _pending.schedule(this); |
165 } else { | 179 } else { |
166 assert(_mayResumeInput); | 180 assert(_mayResumeInput); |
167 _state &= ~_STATE_INPUT_PAUSED; | 181 _state &= ~_STATE_INPUT_PAUSED; |
168 if (!_inCallback) _guardCallback(_onResume); | 182 if (!_inCallback) _guardCallback(_onResume); |
169 } | 183 } |
170 } | 184 } |
171 } | 185 } |
172 } | 186 } |
173 | 187 |
174 void cancel() { | 188 Future cancel() { |
175 if (_isCanceled) return; | 189 // The user doesn't want to receive any further events. If there is an |
| 190 // error or done event pending (waiting for the cancel to be done) discard |
| 191 // that event. |
| 192 _state &= ~_STATE_WAIT_FOR_CANCEL; |
| 193 if (_isCanceled) return _cancelFuture; |
176 _cancel(); | 194 _cancel(); |
177 if (!_inCallback) { | 195 return _cancelFuture; |
178 // otherwise checkState will be called after firing or callback completes. | |
179 _state |= _STATE_IN_CALLBACK; | |
180 _onCancel(); | |
181 _pending = null; | |
182 _state &= ~_STATE_IN_CALLBACK; | |
183 } | |
184 } | 196 } |
185 | 197 |
186 Future asFuture([var futureValue]) { | 198 Future asFuture([var futureValue]) { |
187 _Future<T> result = new _Future<T>(); | 199 _Future<T> result = new _Future<T>(); |
188 | 200 |
189 // Overwrite the onDone and onError handlers. | 201 // Overwrite the onDone and onError handlers. |
190 _onDone = () { result._complete(futureValue); }; | 202 _onDone = () { result._complete(futureValue); }; |
191 _onError = (error, stackTrace) { | 203 _onError = (error, stackTrace) { |
192 cancel(); | 204 cancel(); |
193 result._completeError(error, stackTrace); | 205 result._completeError(error, stackTrace); |
194 }; | 206 }; |
195 | 207 |
196 return result; | 208 return result; |
197 } | 209 } |
198 | 210 |
199 // State management. | 211 // State management. |
200 | 212 |
201 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 213 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
202 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 214 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
203 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 215 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| 216 bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; |
204 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | 217 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
205 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | 218 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
206 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | 219 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
207 bool get _canFire => _state < _STATE_IN_CALLBACK; | 220 bool get _canFire => _state < _STATE_IN_CALLBACK; |
208 bool get _mayResumeInput => | 221 bool get _mayResumeInput => |
209 !_isPaused && (_pending == null || _pending.isEmpty); | 222 !_isPaused && (_pending == null || _pending.isEmpty); |
210 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | 223 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
211 | 224 |
212 bool get isPaused => _isPaused; | 225 bool get isPaused => _isPaused; |
213 | 226 |
214 void _cancel() { | 227 void _cancel() { |
215 _state |= _STATE_CANCELED; | 228 _state |= _STATE_CANCELED; |
216 if (_hasPending) { | 229 if (_hasPending) { |
217 _pending.cancelSchedule(); | 230 _pending.cancelSchedule(); |
218 } | 231 } |
| 232 if (!_inCallback) _pending = null; |
| 233 _cancelFuture = _onCancel(); |
219 } | 234 } |
220 | 235 |
221 /** | 236 /** |
222 * Increment the pause count. | 237 * Increment the pause count. |
223 * | 238 * |
224 * Also marks input as paused. | 239 * Also marks input as paused. |
225 */ | 240 */ |
226 void _incrementPauseCount() { | 241 void _incrementPauseCount() { |
227 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 242 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
228 } | 243 } |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
276 // try/catch wrapping and send any errors to | 291 // try/catch wrapping and send any errors to |
277 // [_Zone.current.handleUncaughtError]. | 292 // [_Zone.current.handleUncaughtError]. |
278 void _onPause() { | 293 void _onPause() { |
279 assert(_isInputPaused); | 294 assert(_isInputPaused); |
280 } | 295 } |
281 | 296 |
282 void _onResume() { | 297 void _onResume() { |
283 assert(!_isInputPaused); | 298 assert(!_isInputPaused); |
284 } | 299 } |
285 | 300 |
286 void _onCancel() { | 301 Future _onCancel() { |
287 assert(_isCanceled); | 302 assert(_isCanceled); |
288 } | 303 } |
289 | 304 |
290 // Handle pending events. | 305 // Handle pending events. |
291 | 306 |
292 /** | 307 /** |
293 * Add a pending event. | 308 * Add a pending event. |
294 * | 309 * |
295 * If the subscription is not paused, this also schedules a firing | 310 * If the subscription is not paused, this also schedules a firing |
296 * of pending events later (if necessary). | 311 * of pending events later (if necessary). |
(...skipping 21 matching lines...) Expand all Loading... |
318 _zone.runUnaryGuarded(_onData, data); | 333 _zone.runUnaryGuarded(_onData, data); |
319 _state &= ~_STATE_IN_CALLBACK; | 334 _state &= ~_STATE_IN_CALLBACK; |
320 _checkState(wasInputPaused); | 335 _checkState(wasInputPaused); |
321 } | 336 } |
322 | 337 |
323 void _sendError(var error, StackTrace stackTrace) { | 338 void _sendError(var error, StackTrace stackTrace) { |
324 assert(!_isCanceled); | 339 assert(!_isCanceled); |
325 assert(!_isPaused); | 340 assert(!_isPaused); |
326 assert(!_inCallback); | 341 assert(!_inCallback); |
327 bool wasInputPaused = _isInputPaused; | 342 bool wasInputPaused = _isInputPaused; |
328 _state |= _STATE_IN_CALLBACK; | 343 |
329 if (!_zone.inSameErrorZone(Zone.current)) { | 344 void sendError() { |
330 // Errors are not allowed to traverse zone boundaries. | 345 // If the subscription has been canceled while waiting for the cancel |
331 Zone.current.handleUncaughtError(error, stackTrace); | 346 // future to finish we must not report the error. |
332 } else if (_onError is ZoneBinaryCallback) { | 347 if (_isCanceled && !_waitsForCancel) return; |
333 _zone.runBinaryGuarded(_onError, error, stackTrace); | 348 _state |= _STATE_IN_CALLBACK; |
| 349 if (!_zone.inSameErrorZone(Zone.current)) { |
| 350 // Errors are not allowed to traverse zone boundaries. |
| 351 Zone.current.handleUncaughtError(error, stackTrace); |
| 352 } else if (_onError is ZoneBinaryCallback) { |
| 353 _zone.runBinaryGuarded(_onError, error, stackTrace); |
| 354 } else { |
| 355 _zone.runUnaryGuarded(_onError, error); |
| 356 } |
| 357 _state &= ~_STATE_IN_CALLBACK; |
| 358 } |
| 359 |
| 360 if (_cancelOnError) { |
| 361 _state |= _STATE_WAIT_FOR_CANCEL; |
| 362 _cancel(); |
| 363 if (_cancelFuture is Future) { |
| 364 _cancelFuture.whenComplete(sendError); |
| 365 } else { |
| 366 sendError(); |
| 367 } |
334 } else { | 368 } else { |
335 _zone.runUnaryGuarded(_onError, error); | 369 sendError(); |
| 370 // Only check state if not cancelOnError. |
| 371 _checkState(wasInputPaused); |
336 } | 372 } |
337 _state &= ~_STATE_IN_CALLBACK; | |
338 if (_cancelOnError) { | |
339 _cancel(); | |
340 } | |
341 _checkState(wasInputPaused); | |
342 } | 373 } |
343 | 374 |
344 void _sendDone() { | 375 void _sendDone() { |
345 assert(!_isCanceled); | 376 assert(!_isCanceled); |
346 assert(!_isPaused); | 377 assert(!_isPaused); |
347 assert(!_inCallback); | 378 assert(!_inCallback); |
348 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | 379 |
349 _zone.runGuarded(_onDone); | 380 void sendDone() { |
350 _onCancel(); // No checkState after cancel, it is always the last event. | 381 // If the subscription has been canceled while waiting for the cancel |
351 _state &= ~_STATE_IN_CALLBACK; | 382 // future to finish we must not report the done event. |
| 383 if (!_waitsForCancel) return; |
| 384 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| 385 _zone.runGuarded(_onDone); |
| 386 _state &= ~_STATE_IN_CALLBACK; |
| 387 } |
| 388 |
| 389 _cancel(); |
| 390 _state |= _STATE_WAIT_FOR_CANCEL; |
| 391 if (_cancelFuture is Future) { |
| 392 _cancelFuture.whenComplete(sendDone); |
| 393 } else { |
| 394 sendDone(); |
| 395 } |
352 } | 396 } |
353 | 397 |
354 /** | 398 /** |
355 * Call a hook function. | 399 * Call a hook function. |
356 * | 400 * |
357 * The call is properly wrapped in code to avoid other callbacks | 401 * The call is properly wrapped in code to avoid other callbacks |
358 * during the call, and it checks for state changes after the call | 402 * during the call, and it checks for state changes after the call |
359 * that should cause further callbacks. | 403 * that should cause further callbacks. |
360 */ | 404 */ |
361 void _guardCallback(callback) { | 405 void _guardCallback(callback) { |
(...skipping 20 matching lines...) Expand all Loading... |
382 if (_hasPending && _pending.isEmpty) { | 426 if (_hasPending && _pending.isEmpty) { |
383 _state &= ~_STATE_HAS_PENDING; | 427 _state &= ~_STATE_HAS_PENDING; |
384 if (_isInputPaused && _mayResumeInput) { | 428 if (_isInputPaused && _mayResumeInput) { |
385 _state &= ~_STATE_INPUT_PAUSED; | 429 _state &= ~_STATE_INPUT_PAUSED; |
386 } | 430 } |
387 } | 431 } |
388 // If the state changes during a callback, we immediately | 432 // If the state changes during a callback, we immediately |
389 // make a new state-change callback. Loop until the state didn't change. | 433 // make a new state-change callback. Loop until the state didn't change. |
390 while (true) { | 434 while (true) { |
391 if (_isCanceled) { | 435 if (_isCanceled) { |
392 _onCancel(); | |
393 _pending = null; | 436 _pending = null; |
394 return; | 437 return; |
395 } | 438 } |
396 bool isInputPaused = _isInputPaused; | 439 bool isInputPaused = _isInputPaused; |
397 if (wasInputPaused == isInputPaused) break; | 440 if (wasInputPaused == isInputPaused) break; |
398 _state ^= _STATE_IN_CALLBACK; | 441 _state ^= _STATE_IN_CALLBACK; |
399 if (isInputPaused) { | 442 if (isInputPaused) { |
400 _onPause(); | 443 _onPause(); |
401 } else { | 444 } else { |
402 _onResume(); | 445 _onResume(); |
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
692 void onError(Function handleError) {} | 735 void onError(Function handleError) {} |
693 void onDone(void handleDone()) {} | 736 void onDone(void handleDone()) {} |
694 | 737 |
695 void pause([Future resumeSignal]) { | 738 void pause([Future resumeSignal]) { |
696 _pauseCounter++; | 739 _pauseCounter++; |
697 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 740 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
698 } | 741 } |
699 void resume() { | 742 void resume() { |
700 if (_pauseCounter > 0) _pauseCounter--; | 743 if (_pauseCounter > 0) _pauseCounter--; |
701 } | 744 } |
702 void cancel() {} | 745 Future cancel() => null; |
703 bool get isPaused => _pauseCounter > 0; | 746 bool get isPaused => _pauseCounter > 0; |
704 | 747 |
705 Future asFuture([futureValue]) => new _Future(); | 748 Future asFuture([futureValue]) => new _Future(); |
706 } | 749 } |
707 | 750 |
708 class _AsBroadcastStream<T> extends Stream<T> { | 751 class _AsBroadcastStream<T> extends Stream<T> { |
709 final Stream<T> _source; | 752 final Stream<T> _source; |
710 final _broadcastCallback _onListenHandler; | 753 final _broadcastCallback _onListenHandler; |
711 final _broadcastCallback _onCancelHandler; | 754 final _broadcastCallback _onCancelHandler; |
712 final Zone _zone; | 755 final Zone _zone; |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
816 } | 859 } |
817 | 860 |
818 void pause([Future resumeSignal]) { | 861 void pause([Future resumeSignal]) { |
819 _stream._pauseSubscription(resumeSignal); | 862 _stream._pauseSubscription(resumeSignal); |
820 } | 863 } |
821 | 864 |
822 void resume() { | 865 void resume() { |
823 _stream._resumeSubscription(); | 866 _stream._resumeSubscription(); |
824 } | 867 } |
825 | 868 |
826 void cancel() { | 869 Future cancel() { |
827 _stream._cancelSubscription(); | 870 _stream._cancelSubscription(); |
| 871 return null; |
828 } | 872 } |
829 | 873 |
830 bool get isPaused { | 874 bool get isPaused { |
831 return _stream._isSubscriptionPaused; | 875 return _stream._isSubscriptionPaused; |
832 } | 876 } |
833 | 877 |
834 Future asFuture([var futureValue]) { | 878 Future asFuture([var futureValue]) { |
835 throw new UnsupportedError( | 879 throw new UnsupportedError( |
836 "Cannot change handlers of asBroadcastStream source subscription."); | 880 "Cannot change handlers of asBroadcastStream source subscription."); |
837 } | 881 } |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
928 } | 972 } |
929 | 973 |
930 /** Clears up the internal state when the iterator ends. */ | 974 /** Clears up the internal state when the iterator ends. */ |
931 void _clear() { | 975 void _clear() { |
932 _subscription = null; | 976 _subscription = null; |
933 _futureOrPrefetch = null; | 977 _futureOrPrefetch = null; |
934 _current = null; | 978 _current = null; |
935 _state = _STATE_DONE; | 979 _state = _STATE_DONE; |
936 } | 980 } |
937 | 981 |
938 void cancel() { | 982 Future cancel() { |
939 StreamSubscription subscription = _subscription; | 983 StreamSubscription subscription = _subscription; |
940 if (_state == _STATE_MOVING) { | 984 if (_state == _STATE_MOVING) { |
941 _Future<bool> hasNext = _futureOrPrefetch; | 985 _Future<bool> hasNext = _futureOrPrefetch; |
942 _clear(); | 986 _clear(); |
943 hasNext._complete(false); | 987 hasNext._complete(false); |
944 } else { | 988 } else { |
945 _clear(); | 989 _clear(); |
946 } | 990 } |
947 subscription.cancel(); | 991 return subscription.cancel(); |
948 } | 992 } |
949 | 993 |
950 void _onData(T data) { | 994 void _onData(T data) { |
951 if (_state == _STATE_MOVING) { | 995 if (_state == _STATE_MOVING) { |
952 _current = data; | 996 _current = data; |
953 _Future<bool> hasNext = _futureOrPrefetch; | 997 _Future<bool> hasNext = _futureOrPrefetch; |
954 _futureOrPrefetch = null; | 998 _futureOrPrefetch = null; |
955 _state = _STATE_FOUND; | 999 _state = _STATE_FOUND; |
956 hasNext._complete(true); | 1000 hasNext._complete(true); |
957 return; | 1001 return; |
(...skipping 23 matching lines...) Expand all Loading... |
981 _Future<bool> hasNext = _futureOrPrefetch; | 1025 _Future<bool> hasNext = _futureOrPrefetch; |
982 _clear(); | 1026 _clear(); |
983 hasNext._complete(false); | 1027 hasNext._complete(false); |
984 return; | 1028 return; |
985 } | 1029 } |
986 _subscription.pause(); | 1030 _subscription.pause(); |
987 _futureOrPrefetch = null; | 1031 _futureOrPrefetch = null; |
988 _state = _STATE_EXTRA_DONE; | 1032 _state = _STATE_EXTRA_DONE; |
989 } | 1033 } |
990 } | 1034 } |
OLD | NEW |