OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error, StackTrace stackTrace); | 10 void _addError(Object error, StackTrace stackTrace); |
(...skipping 18 matching lines...) Expand all Loading... |
29 * | 29 * |
30 * The only public methods are those of [StreamSubscription], so instances of | 30 * The only public methods are those of [StreamSubscription], so instances of |
31 * [_BufferingStreamSubscription] can be returned directly as a | 31 * [_BufferingStreamSubscription] can be returned directly as a |
32 * [StreamSubscription] without exposing internal functionality. | 32 * [StreamSubscription] without exposing internal functionality. |
33 * | 33 * |
34 * The [StreamController] is a public facing version of [Stream] and this class, | 34 * The [StreamController] is a public facing version of [Stream] and this class, |
35 * with some methods made public. | 35 * with some methods made public. |
36 * | 36 * |
37 * The user interface of [_BufferingStreamSubscription] are the following | 37 * The user interface of [_BufferingStreamSubscription] are the following |
38 * methods: | 38 * methods: |
| 39 * |
39 * * [_add]: Add a data event to the stream. | 40 * * [_add]: Add a data event to the stream. |
40 * * [_addError]: Add an error event to the stream. | 41 * * [_addError]: Add an error event to the stream. |
41 * * [_close]: Request to close the stream. | 42 * * [_close]: Request to close the stream. |
42 * * [_onCancel]: Called when the subscription will provide no more events, | 43 * * [_onCancel]: Called when the subscription will provide no more events, |
43 * either due to being actively canceled, or after sending a done event. | 44 * either due to being actively canceled, or after sending a done event. |
44 * * [_onPause]: Called when the subscription wants the event source to pause. | 45 * * [_onPause]: Called when the subscription wants the event source to pause. |
45 * * [_onResume]: Called when allowing new events after a pause. | 46 * * [_onResume]: Called when allowing new events after a pause. |
| 47 * |
46 * The user should not add new events when the subscription requests a paused, | 48 * The user should not add new events when the subscription requests a paused, |
47 * but if it happens anyway, the subscription will enqueue the events just as | 49 * but if it happens anyway, the subscription will enqueue the events just as |
48 * when new events arrive while still firing an old event. | 50 * when new events arrive while still firing an old event. |
49 */ | 51 */ |
50 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 52 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
51 _EventSink<T>, | 53 _EventSink<T>, |
52 _EventDispatch<T> { | 54 _EventDispatch<T> { |
53 /** The `cancelOnError` flag from the `listen` call. */ | 55 /** The `cancelOnError` flag from the `listen` call. */ |
54 static const int _STATE_CANCEL_ON_ERROR = 1; | 56 static const int _STATE_CANCEL_ON_ERROR = 1; |
55 /** | 57 /** |
(...skipping 29 matching lines...) Expand all Loading... |
85 /** Bit vector based on state-constants above. */ | 87 /** Bit vector based on state-constants above. */ |
86 int _state; | 88 int _state; |
87 | 89 |
88 /** | 90 /** |
89 * Queue of pending events. | 91 * Queue of pending events. |
90 * | 92 * |
91 * Is created when necessary, or set in constructor for preconfigured events. | 93 * Is created when necessary, or set in constructor for preconfigured events. |
92 */ | 94 */ |
93 _PendingEvents _pending; | 95 _PendingEvents _pending; |
94 | 96 |
95 _BufferingStreamSubscription(void onData(T data), | 97 _BufferingStreamSubscription(void onDataHandler(T data), |
96 Function onError, | 98 Function onErrorHandler, |
97 void onDone(), | 99 void onDoneHandler(), |
98 bool cancelOnError) | 100 bool cancelOnError) |
99 : _onData = Zone.current.registerUnaryCallback(onData), | 101 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
100 _onError = _registerErrorCallback(onError), | 102 onData(onDataHandler); |
101 _onDone = Zone.current.registerCallback(onDone), | 103 onError(onErrorHandler); |
102 _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 104 onDone(onDoneHandler); |
103 assert(_onData != null); | |
104 assert(_onError != null); | |
105 assert(_onDone != null); | |
106 } | |
107 | |
108 static _registerErrorCallback(Function errorCallback) { | |
109 if (errorCallback is ZoneBinaryCallback) { | |
110 return Zone.current.registerBinaryCallback(errorCallback); | |
111 } else { | |
112 return Zone.current.registerUnaryCallback(errorCallback); | |
113 } | |
114 } | 105 } |
115 | 106 |
116 /** | 107 /** |
117 * Sets the subscription's pending events object. | 108 * Sets the subscription's pending events object. |
118 * | 109 * |
119 * This can only be done once. The pending events object is used for the | 110 * This can only be done once. The pending events object is used for the |
120 * rest of the subscription's life cycle. | 111 * rest of the subscription's life cycle. |
121 */ | 112 */ |
122 void _setPendingEvents(_PendingEvents pendingEvents) { | 113 void _setPendingEvents(_PendingEvents pendingEvents) { |
123 assert(_pending == null); | 114 assert(_pending == null); |
(...skipping 18 matching lines...) Expand all Loading... |
142 return events; | 133 return events; |
143 } | 134 } |
144 | 135 |
145 // StreamSubscription interface. | 136 // StreamSubscription interface. |
146 | 137 |
147 void onData(void handleData(T event)) { | 138 void onData(void handleData(T event)) { |
148 if (handleData == null) handleData = _nullDataHandler; | 139 if (handleData == null) handleData = _nullDataHandler; |
149 _onData = Zone.current.registerUnaryCallback(handleData); | 140 _onData = Zone.current.registerUnaryCallback(handleData); |
150 } | 141 } |
151 | 142 |
| 143 static _registerErrorCallback(Function errorCallback) { |
| 144 if (errorCallback is ZoneBinaryCallback) { |
| 145 return Zone.current.registerBinaryCallback(errorCallback); |
| 146 } else { |
| 147 return Zone.current.registerUnaryCallback(errorCallback); |
| 148 } |
| 149 } |
| 150 |
152 void onError(Function handleError) { | 151 void onError(Function handleError) { |
153 if (handleError == null) handleError = _nullErrorHandler; | 152 if (handleError == null) handleError = _nullErrorHandler; |
154 _onError = _registerErrorCallback(handleError); | 153 _onError = _registerErrorCallback(handleError); |
155 } | 154 } |
156 | 155 |
157 void onDone(void handleDone()) { | 156 void onDone(void handleDone()) { |
158 if (handleDone == null) handleDone = _nullDoneHandler; | 157 if (handleDone == null) handleDone = _nullDoneHandler; |
159 _onDone = Zone.current.registerCallback(handleDone); | 158 _onDone = Zone.current.registerCallback(handleDone); |
160 } | 159 } |
161 | 160 |
(...skipping 844 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1006 _Future<bool> hasNext = _futureOrPrefetch; | 1005 _Future<bool> hasNext = _futureOrPrefetch; |
1007 _clear(); | 1006 _clear(); |
1008 hasNext._complete(false); | 1007 hasNext._complete(false); |
1009 return; | 1008 return; |
1010 } | 1009 } |
1011 _subscription.pause(); | 1010 _subscription.pause(); |
1012 _futureOrPrefetch = null; | 1011 _futureOrPrefetch = null; |
1013 _state = _STATE_EXTRA_DONE; | 1012 _state = _STATE_EXTRA_DONE; |
1014 } | 1013 } |
1015 } | 1014 } |
OLD | NEW |