OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
66 * after a call to [resume] if there are pending events. | 66 * after a call to [resume] if there are pending events. |
67 */ | 67 */ |
68 static const int _STATE_INPUT_PAUSED = 4; | 68 static const int _STATE_INPUT_PAUSED = 4; |
69 /** | 69 /** |
70 * Whether the subscription has been canceled. | 70 * Whether the subscription has been canceled. |
71 * | 71 * |
72 * Set by calling [cancel], or by handling a "done" event, or an "error" event | 72 * Set by calling [cancel], or by handling a "done" event, or an "error" event |
73 * when `cancelOnError` is true. | 73 * when `cancelOnError` is true. |
74 */ | 74 */ |
75 static const int _STATE_CANCELED = 8; | 75 static const int _STATE_CANCELED = 8; |
76 static const int _STATE_IN_CALLBACK = 16; | 76 /** |
77 static const int _STATE_HAS_PENDING = 32; | 77 * Set when either: |
78 static const int _STATE_PAUSE_COUNT = 64; | 78 * * An error is sent, and [cancelOnError] is true. |
floitsch
2013/10/16 14:43:44
Nit (you also need the new line as otherwise dartd
Anders Johnsen
2013/10/21 08:01:46
Done.
| |
79 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 79 * * A done is sent. |
80 * | |
81 * If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the | |
82 * state is unset, and no furher events should be delivered. | |
floitsch
2013/10/16 14:43:44
"should" is not helpful.
I guess "is" is more accu
Anders Johnsen
2013/10/21 08:01:46
Using 'must', as the comment is dictating how the
| |
83 */ | |
84 static const int _STATE_WAIT_FOR_CANCEL = 16; | |
85 static const int _STATE_IN_CALLBACK = 32; | |
86 static const int _STATE_HAS_PENDING = 64; | |
87 static const int _STATE_PAUSE_COUNT = 128; | |
88 static const int _STATE_PAUSE_COUNT_SHIFT = 7; | |
80 | 89 |
81 /* Event handlers provided in constructor. */ | 90 /* Event handlers provided in constructor. */ |
82 _DataHandler<T> _onData; | 91 _DataHandler<T> _onData; |
83 Function _onError; | 92 Function _onError; |
84 _DoneHandler _onDone; | 93 _DoneHandler _onDone; |
85 final Zone _zone = Zone.current; | 94 final Zone _zone = Zone.current; |
86 | 95 |
87 /** Bit vector based on state-constants above. */ | 96 /** Bit vector based on state-constants above. */ |
88 int _state; | 97 int _state; |
89 | 98 |
99 // TODO(floitsch): reuse another field | |
100 /** The future [_onCancel] may return. */ | |
101 Future _cancelFuture; | |
102 | |
90 /** | 103 /** |
91 * Queue of pending events. | 104 * Queue of pending events. |
92 * | 105 * |
93 * Is created when necessary, or set in constructor for preconfigured events. | 106 * Is created when necessary, or set in constructor for preconfigured events. |
94 */ | 107 */ |
95 _PendingEvents _pending; | 108 _PendingEvents _pending; |
96 | 109 |
97 _BufferingStreamSubscription(bool cancelOnError) | 110 _BufferingStreamSubscription(bool cancelOnError) |
98 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); | 111 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0); |
99 | 112 |
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
164 _pending.schedule(this); | 177 _pending.schedule(this); |
165 } else { | 178 } else { |
166 assert(_mayResumeInput); | 179 assert(_mayResumeInput); |
167 _state &= ~_STATE_INPUT_PAUSED; | 180 _state &= ~_STATE_INPUT_PAUSED; |
168 if (!_inCallback) _guardCallback(_onResume); | 181 if (!_inCallback) _guardCallback(_onResume); |
169 } | 182 } |
170 } | 183 } |
171 } | 184 } |
172 } | 185 } |
173 | 186 |
174 void cancel() { | 187 Future cancel() { |
175 if (_isCanceled) return; | 188 // The user doesn't want to receive any further events. If there is an |
189 // error or done event pending (waiting for the cancel to be done) discard | |
190 // that event. | |
191 _state &= ~_STATE_WAIT_FOR_CANCEL; | |
192 if (_isCanceled) return _cancelFuture; | |
176 _cancel(); | 193 _cancel(); |
177 if (!_inCallback) { | 194 return _cancelFuture; |
178 // otherwise checkState will be called after firing or callback completes. | |
179 _state |= _STATE_IN_CALLBACK; | |
180 _onCancel(); | |
181 _pending = null; | |
182 _state &= ~_STATE_IN_CALLBACK; | |
183 } | |
184 } | 195 } |
185 | 196 |
186 Future asFuture([var futureValue]) { | 197 Future asFuture([var futureValue]) { |
187 _Future<T> result = new _Future<T>(); | 198 _Future<T> result = new _Future<T>(); |
188 | 199 |
189 // Overwrite the onDone and onError handlers. | 200 // Overwrite the onDone and onError handlers. |
190 _onDone = () { result._complete(futureValue); }; | 201 _onDone = () { result._complete(futureValue); }; |
191 _onError = (error, stackTrace) { | 202 _onError = (error, stackTrace) { |
192 cancel(); | 203 cancel(); |
193 result._completeError(error, stackTrace); | 204 result._completeError(error, stackTrace); |
194 }; | 205 }; |
195 | 206 |
196 return result; | 207 return result; |
197 } | 208 } |
198 | 209 |
199 // State management. | 210 // State management. |
200 | 211 |
201 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 212 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
202 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 213 bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
203 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 214 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
215 bool get _waitForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0; | |
floitsch
2013/10/16 14:43:44
_waitsForCancel
Anders Johnsen
2013/10/21 08:01:46
Done.
| |
204 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | 216 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
205 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | 217 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
206 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | 218 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
207 bool get _canFire => _state < _STATE_IN_CALLBACK; | 219 bool get _canFire => _state < _STATE_IN_CALLBACK; |
208 bool get _mayResumeInput => | 220 bool get _mayResumeInput => |
209 !_isPaused && (_pending == null || _pending.isEmpty); | 221 !_isPaused && (_pending == null || _pending.isEmpty); |
210 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | 222 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
211 | 223 |
212 bool get isPaused => _isPaused; | 224 bool get isPaused => _isPaused; |
213 | 225 |
214 void _cancel() { | 226 void _cancel() { |
215 _state |= _STATE_CANCELED; | 227 _state |= _STATE_CANCELED; |
216 if (_hasPending) { | 228 if (_hasPending) { |
217 _pending.cancelSchedule(); | 229 _pending.cancelSchedule(); |
218 } | 230 } |
231 if (!_inCallback) _pending = null; | |
232 _cancelFuture = _onCancel(); | |
219 } | 233 } |
220 | 234 |
221 /** | 235 /** |
222 * Increment the pause count. | 236 * Increment the pause count. |
223 * | 237 * |
224 * Also marks input as paused. | 238 * Also marks input as paused. |
225 */ | 239 */ |
226 void _incrementPauseCount() { | 240 void _incrementPauseCount() { |
227 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 241 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
228 } | 242 } |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
276 // try/catch wrapping and send any errors to | 290 // try/catch wrapping and send any errors to |
277 // [_Zone.current.handleUncaughtError]. | 291 // [_Zone.current.handleUncaughtError]. |
278 void _onPause() { | 292 void _onPause() { |
279 assert(_isInputPaused); | 293 assert(_isInputPaused); |
280 } | 294 } |
281 | 295 |
282 void _onResume() { | 296 void _onResume() { |
283 assert(!_isInputPaused); | 297 assert(!_isInputPaused); |
284 } | 298 } |
285 | 299 |
286 void _onCancel() { | 300 Future _onCancel() { |
287 assert(_isCanceled); | 301 assert(_isCanceled); |
288 } | 302 } |
289 | 303 |
290 // Handle pending events. | 304 // Handle pending events. |
291 | 305 |
292 /** | 306 /** |
293 * Add a pending event. | 307 * Add a pending event. |
294 * | 308 * |
295 * If the subscription is not paused, this also schedules a firing | 309 * If the subscription is not paused, this also schedules a firing |
296 * of pending events later (if necessary). | 310 * of pending events later (if necessary). |
(...skipping 21 matching lines...) Expand all Loading... | |
318 _zone.runUnaryGuarded(_onData, data); | 332 _zone.runUnaryGuarded(_onData, data); |
319 _state &= ~_STATE_IN_CALLBACK; | 333 _state &= ~_STATE_IN_CALLBACK; |
320 _checkState(wasInputPaused); | 334 _checkState(wasInputPaused); |
321 } | 335 } |
322 | 336 |
323 void _sendError(var error, StackTrace stackTrace) { | 337 void _sendError(var error, StackTrace stackTrace) { |
324 assert(!_isCanceled); | 338 assert(!_isCanceled); |
325 assert(!_isPaused); | 339 assert(!_isPaused); |
326 assert(!_inCallback); | 340 assert(!_inCallback); |
327 bool wasInputPaused = _isInputPaused; | 341 bool wasInputPaused = _isInputPaused; |
328 _state |= _STATE_IN_CALLBACK; | 342 |
329 if (!_zone.inSameErrorZone(Zone.current)) { | 343 void sendError() { |
330 // Errors are not allowed to traverse zone boundaries. | 344 // If the subscription has been canceled while waiting for the cancel |
331 Zone.current.handleUncaughtError(error, stackTrace); | 345 // future to finish we must not report the error. |
332 } else if (_onError is ZoneBinaryCallback) { | 346 if (_isCanceled && !_waitForCancel) return; |
333 _zone.runBinaryGuarded(_onError, error, stackTrace); | 347 _state |= _STATE_IN_CALLBACK; |
348 if (!_zone.inSameErrorZone(Zone.current)) { | |
349 // Errors are not allowed to traverse zone boundaries. | |
350 Zone.current.handleUncaughtError(error, stackTrace); | |
351 } else if (_onError is ZoneBinaryCallback) { | |
352 _zone.runBinaryGuarded(_onError, error, stackTrace); | |
353 } else { | |
354 _zone.runUnaryGuarded(_onError, error); | |
355 } | |
356 _state &= ~_STATE_IN_CALLBACK; | |
357 } | |
358 | |
359 if (_cancelOnError) { | |
360 _state |= _STATE_WAIT_FOR_CANCEL; | |
361 _cancel(); | |
362 if (_cancelFuture is Future) { | |
363 _cancelFuture.whenComplete(sendError); | |
364 } else { | |
365 sendError(); | |
366 } | |
334 } else { | 367 } else { |
335 _zone.runUnaryGuarded(_onError, error); | 368 sendError(); |
369 // Only check state if not cancelOnError. | |
370 _checkState(wasInputPaused); | |
336 } | 371 } |
337 _state &= ~_STATE_IN_CALLBACK; | |
338 if (_cancelOnError) { | |
339 _cancel(); | |
340 } | |
341 _checkState(wasInputPaused); | |
342 } | 372 } |
343 | 373 |
344 void _sendDone() { | 374 void _sendDone() { |
345 assert(!_isCanceled); | 375 assert(!_isCanceled); |
346 assert(!_isPaused); | 376 assert(!_isPaused); |
347 assert(!_inCallback); | 377 assert(!_inCallback); |
348 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | 378 |
349 _zone.runGuarded(_onDone); | 379 void sendDone() { |
350 _onCancel(); // No checkState after cancel, it is always the last event. | 380 // If the subscription has been canceled while waiting for the cancel |
351 _state &= ~_STATE_IN_CALLBACK; | 381 // future to finish we must not report the done event. |
382 if (!_waitForCancel) return; | |
383 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | |
384 _zone.runGuarded(_onDone); | |
385 _state &= ~_STATE_IN_CALLBACK; | |
386 } | |
387 | |
388 _cancel(); | |
389 _state |= _STATE_WAIT_FOR_CANCEL; | |
390 if (_cancelFuture is Future) { | |
391 _cancelFuture.whenComplete(sendDone); | |
392 } else { | |
393 sendDone(); | |
394 } | |
352 } | 395 } |
353 | 396 |
354 /** | 397 /** |
355 * Call a hook function. | 398 * Call a hook function. |
356 * | 399 * |
357 * The call is properly wrapped in code to avoid other callbacks | 400 * The call is properly wrapped in code to avoid other callbacks |
358 * during the call, and it checks for state changes after the call | 401 * during the call, and it checks for state changes after the call |
359 * that should cause further callbacks. | 402 * that should cause further callbacks. |
360 */ | 403 */ |
361 void _guardCallback(callback) { | 404 void _guardCallback(callback) { |
(...skipping 20 matching lines...) Expand all Loading... | |
382 if (_hasPending && _pending.isEmpty) { | 425 if (_hasPending && _pending.isEmpty) { |
383 _state &= ~_STATE_HAS_PENDING; | 426 _state &= ~_STATE_HAS_PENDING; |
384 if (_isInputPaused && _mayResumeInput) { | 427 if (_isInputPaused && _mayResumeInput) { |
385 _state &= ~_STATE_INPUT_PAUSED; | 428 _state &= ~_STATE_INPUT_PAUSED; |
386 } | 429 } |
387 } | 430 } |
388 // If the state changes during a callback, we immediately | 431 // If the state changes during a callback, we immediately |
389 // make a new state-change callback. Loop until the state didn't change. | 432 // make a new state-change callback. Loop until the state didn't change. |
390 while (true) { | 433 while (true) { |
391 if (_isCanceled) { | 434 if (_isCanceled) { |
392 _onCancel(); | |
393 _pending = null; | 435 _pending = null; |
394 return; | 436 return; |
395 } | 437 } |
396 bool isInputPaused = _isInputPaused; | 438 bool isInputPaused = _isInputPaused; |
397 if (wasInputPaused == isInputPaused) break; | 439 if (wasInputPaused == isInputPaused) break; |
398 _state ^= _STATE_IN_CALLBACK; | 440 _state ^= _STATE_IN_CALLBACK; |
399 if (isInputPaused) { | 441 if (isInputPaused) { |
400 _onPause(); | 442 _onPause(); |
401 } else { | 443 } else { |
402 _onResume(); | 444 _onResume(); |
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
692 void onError(Function handleError) {} | 734 void onError(Function handleError) {} |
693 void onDone(void handleDone()) {} | 735 void onDone(void handleDone()) {} |
694 | 736 |
695 void pause([Future resumeSignal]) { | 737 void pause([Future resumeSignal]) { |
696 _pauseCounter++; | 738 _pauseCounter++; |
697 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 739 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
698 } | 740 } |
699 void resume() { | 741 void resume() { |
700 if (_pauseCounter > 0) _pauseCounter--; | 742 if (_pauseCounter > 0) _pauseCounter--; |
701 } | 743 } |
702 void cancel() {} | 744 Future cancel() => null; |
703 bool get isPaused => _pauseCounter > 0; | 745 bool get isPaused => _pauseCounter > 0; |
704 | 746 |
705 Future asFuture([futureValue]) => new _Future(); | 747 Future asFuture([futureValue]) => new _Future(); |
706 } | 748 } |
707 | 749 |
708 class _AsBroadcastStream<T> extends Stream<T> { | 750 class _AsBroadcastStream<T> extends Stream<T> { |
709 final Stream<T> _source; | 751 final Stream<T> _source; |
710 final _broadcastCallback _onListenHandler; | 752 final _broadcastCallback _onListenHandler; |
711 final _broadcastCallback _onCancelHandler; | 753 final _broadcastCallback _onCancelHandler; |
712 final Zone _zone; | 754 final Zone _zone; |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
816 } | 858 } |
817 | 859 |
818 void pause([Future resumeSignal]) { | 860 void pause([Future resumeSignal]) { |
819 _stream._pauseSubscription(resumeSignal); | 861 _stream._pauseSubscription(resumeSignal); |
820 } | 862 } |
821 | 863 |
822 void resume() { | 864 void resume() { |
823 _stream._resumeSubscription(); | 865 _stream._resumeSubscription(); |
824 } | 866 } |
825 | 867 |
826 void cancel() { | 868 Future cancel() { |
827 _stream._cancelSubscription(); | 869 _stream._cancelSubscription(); |
870 return null; | |
828 } | 871 } |
829 | 872 |
830 bool get isPaused { | 873 bool get isPaused { |
831 return _stream._isSubscriptionPaused; | 874 return _stream._isSubscriptionPaused; |
832 } | 875 } |
833 | 876 |
834 Future asFuture([var futureValue]) { | 877 Future asFuture([var futureValue]) { |
835 throw new UnsupportedError( | 878 throw new UnsupportedError( |
836 "Cannot change handlers of asBroadcastStream source subscription."); | 879 "Cannot change handlers of asBroadcastStream source subscription."); |
837 } | 880 } |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
928 } | 971 } |
929 | 972 |
930 /** Clears up the internal state when the iterator ends. */ | 973 /** Clears up the internal state when the iterator ends. */ |
931 void _clear() { | 974 void _clear() { |
932 _subscription = null; | 975 _subscription = null; |
933 _futureOrPrefetch = null; | 976 _futureOrPrefetch = null; |
934 _current = null; | 977 _current = null; |
935 _state = _STATE_DONE; | 978 _state = _STATE_DONE; |
936 } | 979 } |
937 | 980 |
938 void cancel() { | 981 Future cancel() { |
939 StreamSubscription subscription = _subscription; | 982 StreamSubscription subscription = _subscription; |
940 if (_state == _STATE_MOVING) { | 983 if (_state == _STATE_MOVING) { |
941 _Future<bool> hasNext = _futureOrPrefetch; | 984 _Future<bool> hasNext = _futureOrPrefetch; |
942 _clear(); | 985 _clear(); |
943 hasNext._complete(false); | 986 hasNext._complete(false); |
944 } else { | 987 } else { |
945 _clear(); | 988 _clear(); |
946 } | 989 } |
947 subscription.cancel(); | 990 return subscription.cancel(); |
948 } | 991 } |
949 | 992 |
950 void _onData(T data) { | 993 void _onData(T data) { |
951 if (_state == _STATE_MOVING) { | 994 if (_state == _STATE_MOVING) { |
952 _current = data; | 995 _current = data; |
953 _Future<bool> hasNext = _futureOrPrefetch; | 996 _Future<bool> hasNext = _futureOrPrefetch; |
954 _futureOrPrefetch = null; | 997 _futureOrPrefetch = null; |
955 _state = _STATE_FOUND; | 998 _state = _STATE_FOUND; |
956 hasNext._complete(true); | 999 hasNext._complete(true); |
957 return; | 1000 return; |
(...skipping 23 matching lines...) Expand all Loading... | |
981 _Future<bool> hasNext = _futureOrPrefetch; | 1024 _Future<bool> hasNext = _futureOrPrefetch; |
982 _clear(); | 1025 _clear(); |
983 hasNext._complete(false); | 1026 hasNext._complete(false); |
984 return; | 1027 return; |
985 } | 1028 } |
986 _subscription.pause(); | 1029 _subscription.pause(); |
987 _futureOrPrefetch = null; | 1030 _futureOrPrefetch = null; |
988 _state = _STATE_EXTRA_DONE; | 1031 _state = _STATE_EXTRA_DONE; |
989 } | 1032 } |
990 } | 1033 } |
OLD | NEW |