| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 29 * | 29 * |
| 30 * The only public methods are those of [StreamSubscription], so instances of | 30 * The only public methods are those of [StreamSubscription], so instances of |
| 31 * [_BufferingStreamSubscription] can be returned directly as a | 31 * [_BufferingStreamSubscription] can be returned directly as a |
| 32 * [StreamSubscription] without exposing internal functionality. | 32 * [StreamSubscription] without exposing internal functionality. |
| 33 * | 33 * |
| 34 * The [StreamController] is a public facing version of [Stream] and this class, | 34 * The [StreamController] is a public facing version of [Stream] and this class, |
| 35 * with some methods made public. | 35 * with some methods made public. |
| 36 * | 36 * |
| 37 * The user interface of [_BufferingStreamSubscription] are the following | 37 * The user interface of [_BufferingStreamSubscription] are the following |
| 38 * methods: | 38 * methods: |
| 39 * |
| 39 * * [_add]: Add a data event to the stream. | 40 * * [_add]: Add a data event to the stream. |
| 40 * * [_addError]: Add an error event to the stream. | 41 * * [_addError]: Add an error event to the stream. |
| 41 * * [_close]: Request to close the stream. | 42 * * [_close]: Request to close the stream. |
| 42 * * [_onCancel]: Called when the subscription will provide no more events, | 43 * * [_onCancel]: Called when the subscription will provide no more events, |
| 43 * either due to being actively canceled, or after sending a done event. | 44 * either due to being actively canceled, or after sending a done event. |
| 44 * * [_onPause]: Called when the subscription wants the event source to pause. | 45 * * [_onPause]: Called when the subscription wants the event source to pause. |
| 45 * * [_onResume]: Called when allowing new events after a pause. | 46 * * [_onResume]: Called when allowing new events after a pause. |
| 47 * |
| 46 * The user should not add new events when the subscription requests a paused, | 48 * The user should not add new events when the subscription requests a paused, |
| 47 * but if it happens anyway, the subscription will enqueue the events just as | 49 * but if it happens anyway, the subscription will enqueue the events just as |
| 48 * when new events arrive while still firing an old event. | 50 * when new events arrive while still firing an old event. |
| 49 */ | 51 */ |
| 50 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| 51 _EventSink<T>, | 53 _EventSink<T>, |
| 52 _EventDispatch<T> { | 54 _EventDispatch<T> { |
| 53 /** The `cancelOnError` flag from the `listen` call. */ | 55 /** The `cancelOnError` flag from the `listen` call. */ |
| 54 static const int _STATE_CANCEL_ON_ERROR = 1; | 56 static const int _STATE_CANCEL_ON_ERROR = 1; |
| 55 /** | 57 /** |
| (...skipping 29 matching lines...) Expand all Loading... |
| 85 /** Bit vector based on state-constants above. */ | 87 /** Bit vector based on state-constants above. */ |
| 86 int _state; | 88 int _state; |
| 87 | 89 |
| 88 /** | 90 /** |
| 89 * Queue of pending events. | 91 * Queue of pending events. |
| 90 * | 92 * |
| 91 * Is created when necessary, or set in constructor for preconfigured events. | 93 * Is created when necessary, or set in constructor for preconfigured events. |
| 92 */ | 94 */ |
| 93 _PendingEvents _pending; | 95 _PendingEvents _pending; |
| 94 | 96 |
| 95 _BufferingStreamSubscription(void onData(T data), | 97 _BufferingStreamSubscription(void onDataHandler(T data), |
| 96 Function onError, | 98 Function onErrorHandler, |
| 97 void onDone(), | 99 void onDoneHandler(), |
| 98 bool cancelOnError) | 100 bool cancelOnError) |
| 99 : _onData = Zone.current.registerUnaryCallback(onData), | 101 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
| 100 _onError = _registerErrorCallback(onError), | 102 onData(onDataHandler); |
| 101 _onDone = Zone.current.registerCallback(onDone), | 103 onError(onErrorHandler); |
| 102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 104 onDone(onDoneHandler); |
| 103 assert(_onData != null); | |
| 104 assert(_onError != null); | |
| 105 assert(_onDone != null); | |
| 106 } | |
| 107 | |
| 108 static _registerErrorCallback(Function errorCallback) { | |
| 109 if (errorCallback is ZoneBinaryCallback) { | |
| 110 return Zone.current.registerBinaryCallback(errorCallback); | |
| 111 } else { | |
| 112 return Zone.current.registerUnaryCallback(errorCallback); | |
| 113 } | |
| 114 } | 105 } |
| 115 | 106 |
| 116 /** | 107 /** |
| 117 * Sets the subscription's pending events object. | 108 * Sets the subscription's pending events object. |
| 118 * | 109 * |
| 119 * This can only be done once. The pending events object is used for the | 110 * This can only be done once. The pending events object is used for the |
| 120 * rest of the subscription's life cycle. | 111 * rest of the subscription's life cycle. |
| 121 */ | 112 */ |
| 122 void _setPendingEvents(_PendingEvents pendingEvents) { | 113 void _setPendingEvents(_PendingEvents pendingEvents) { |
| 123 assert(_pending == null); | 114 assert(_pending == null); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 142 return events; | 133 return events; |
| 143 } | 134 } |
| 144 | 135 |
| 145 // StreamSubscription interface. | 136 // StreamSubscription interface. |
| 146 | 137 |
| 147 void onData(void handleData(T event)) { | 138 void onData(void handleData(T event)) { |
| 148 if (handleData == null) handleData = _nullDataHandler; | 139 if (handleData == null) handleData = _nullDataHandler; |
| 149 _onData = Zone.current.registerUnaryCallback(handleData); | 140 _onData = Zone.current.registerUnaryCallback(handleData); |
| 150 } | 141 } |
| 151 | 142 |
| 143 static _registerErrorCallback(Function errorCallback) { |
| 144 if (errorCallback is ZoneBinaryCallback) { |
| 145 return Zone.current.registerBinaryCallback(errorCallback); |
| 146 } else { |
| 147 return Zone.current.registerUnaryCallback(errorCallback); |
| 148 } |
| 149 } |
| 150 |
| 152 void onError(Function handleError) { | 151 void onError(Function handleError) { |
| 153 if (handleError == null) handleError = _nullErrorHandler; | 152 if (handleError == null) handleError = _nullErrorHandler; |
| 154 _onError = _registerErrorCallback(handleError); | 153 _onError = _registerErrorCallback(handleError); |
| 155 } | 154 } |
| 156 | 155 |
| 157 void onDone(void handleDone()) { | 156 void onDone(void handleDone()) { |
| 158 if (handleDone == null) handleDone = _nullDoneHandler; | 157 if (handleDone == null) handleDone = _nullDoneHandler; |
| 159 _onDone = Zone.current.registerCallback(handleDone); | 158 _onDone = Zone.current.registerCallback(handleDone); |
| 160 } | 159 } |
| 161 | 160 |
| (...skipping 844 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1006 _Future<bool> hasNext = _futureOrPrefetch; | 1005 _Future<bool> hasNext = _futureOrPrefetch; |
| 1007 _clear(); | 1006 _clear(); |
| 1008 hasNext._complete(false); | 1007 hasNext._complete(false); |
| 1009 return; | 1008 return; |
| 1010 } | 1009 } |
| 1011 _subscription.pause(); | 1010 _subscription.pause(); |
| 1012 _futureOrPrefetch = null; | 1011 _futureOrPrefetch = null; |
| 1013 _state = _STATE_EXTRA_DONE; | 1012 _state = _STATE_EXTRA_DONE; |
| 1014 } | 1013 } |
| 1015 } | 1014 } |
| OLD | NEW |