OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Controller for creating and adding events to a stream. | 8 // Controller for creating and adding events to a stream. |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 29 matching lines...) Expand all Loading... |
40 * a stream action, for example firing an event. If the state changes multiple | 40 * a stream action, for example firing an event. If the state changes multiple |
41 * times during the action, and then ends up in the same state as before, no | 41 * times during the action, and then ends up in the same state as before, no |
42 * callback is performed. | 42 * callback is performed. |
43 * | 43 * |
44 * If listeners are added after the stream has completed (sent a "done" event), | 44 * If listeners are added after the stream has completed (sent a "done" event), |
45 * the listeners will be sent a "done" event eventually, but they won't affect | 45 * the listeners will be sent a "done" event eventually, but they won't affect |
46 * the stream at all, and won't trigger callbacks. From the controller's point | 46 * the stream at all, and won't trigger callbacks. From the controller's point |
47 * of view, the stream is completely inert when has completed. | 47 * of view, the stream is completely inert when has completed. |
48 */ | 48 */ |
49 class StreamController<T> extends EventSink<T> { | 49 class StreamController<T> extends EventSink<T> { |
50 final _StreamImpl<T> stream; | 50 static const int _STATE_OPEN = 0; |
| 51 static const int _STATE_CANCELLED = 1; |
| 52 static const int _STATE_CLOSED = 2; |
| 53 |
| 54 final _NotificationHandler _onListen; |
| 55 final _NotificationHandler _onPause; |
| 56 final _NotificationHandler _onResume; |
| 57 final _NotificationHandler _onCancel; |
| 58 _StreamImpl<T> _stream; |
| 59 |
| 60 // An active subscription on the stream, or null if no subscripton is active. |
| 61 _ControllerSubscription<T> _subscription; |
| 62 |
| 63 // Whether we have sent a "done" event. |
| 64 int _state = _STATE_OPEN; |
| 65 |
| 66 // Events added to the stream before it has an active subscription. |
| 67 _PendingEvents _pendingEvents = null; |
51 | 68 |
52 /** | 69 /** |
53 * A controller with a [stream] that supports only one single subscriber. | 70 * A controller with a [stream] that supports only one single subscriber. |
54 * | 71 * |
55 * The controller will buffer all incoming events until the subscriber is | 72 * The controller will buffer all incoming events until the subscriber is |
56 * registered. | 73 * registered. |
57 * | 74 * |
58 * The [onPause] function is called when the stream becomes | 75 * The [onPause] function is called when the stream becomes |
59 * paused. [onResume] is called when the stream resumed. | 76 * paused. [onResume] is called when the stream resumed. |
60 * | 77 * |
61 * The [onListen] callback is called when the stream | 78 * The [onListen] callback is called when the stream |
62 * receives its listener. [onCancel] when the listener cancels | 79 * receives its listener and [onCancel] when the listener ends |
63 * its subscription. | 80 * its subscription. |
64 * | 81 * |
65 * If the stream is canceled before the controller needs new data the | 82 * If the stream is canceled before the controller needs new data the |
66 * [onResume] call might not be executed. | 83 * [onResume] call might not be executed. |
67 */ | 84 */ |
68 StreamController({void onListen(), | 85 StreamController({void onListen(), |
69 void onPause(), | 86 void onPause(), |
70 void onResume(), | 87 void onResume(), |
71 void onCancel()}) | 88 void onCancel()}) |
72 : stream = new _SingleControllerStream<T>( | 89 : _onListen = onListen, |
73 onListen, onPause, onResume, onCancel); | 90 _onPause = onPause, |
| 91 _onResume = onResume, |
| 92 _onCancel = onCancel { |
| 93 _stream = new _ControllerStream<T>(this); |
| 94 } |
| 95 |
| 96 Stream<T> get stream => _stream; |
74 | 97 |
75 /** | 98 /** |
76 * Returns a view of this object that only exposes the [EventSink] interface. | 99 * Returns a view of this object that only exposes the [EventSink] interface. |
77 */ | 100 */ |
78 EventSink<T> get sink => new _EventSinkView<T>(this); | 101 EventSink<T> get sink => new _EventSinkView<T>(this); |
79 | 102 |
80 /** | 103 /** |
| 104 * Whether a listener has existed and been cancelled. |
| 105 * |
| 106 * After this, adding more events will be ignored. |
| 107 */ |
| 108 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; |
| 109 |
| 110 /** |
81 * Whether the stream is closed for adding more events. | 111 * Whether the stream is closed for adding more events. |
82 * | 112 * |
83 * If true, the "done" event might not have fired yet, but it has been | 113 * If true, the "done" event might not have fired yet, but it has been |
84 * scheduled, and it is too late to add more events. | 114 * scheduled, and it is too late to add more events. |
85 */ | 115 */ |
86 bool get isClosed => stream._isClosed; | 116 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
87 | 117 |
88 /** Whether one or more active subscribers have requested a pause. */ | 118 /** Whether the subscription is active and paused. */ |
89 bool get isPaused => stream._isInputPaused; | 119 bool get isPaused => _subscription != null && _subscription._isInputPaused; |
90 | 120 |
91 /** Whether there are currently any subscribers on this [Stream]. */ | 121 /** Whether there are currently a subscriber on the [Stream]. */ |
92 bool get hasListener => stream._hasListener; | 122 bool get hasListener => _subscription != null; |
93 | 123 |
94 /** | 124 /** |
95 * Send or queue a data event. | 125 * Send or queue a data event. |
96 */ | 126 */ |
97 void add(T value) => stream._add(value); | 127 void add(T value) { |
| 128 if (isClosed) throw new StateError("Adding event after close"); |
| 129 if (_subscription != null) { |
| 130 _subscription._add(value); |
| 131 } else if (!_isCancelled) { |
| 132 _addPendingEvent(new _DelayedData<T>(value)); |
| 133 } |
| 134 } |
98 | 135 |
99 /** | 136 /** |
100 * Send or enqueue an error event. | 137 * Send or enqueue an error event. |
101 * | |
102 * If a subscription has requested to be unsubscribed on errors, | |
103 * it will be unsubscribed after receiving this event. | |
104 */ | 138 */ |
105 void addError(Object error, [Object stackTrace]) { | 139 void addError(Object error, [Object stackTrace]) { |
| 140 if (isClosed) throw new StateError("Adding event after close"); |
106 if (stackTrace != null) { | 141 if (stackTrace != null) { |
107 // Force stack trace overwrite. Even if the error already contained | 142 // Force stack trace overwrite. Even if the error already contained |
108 // a stack trace. | 143 // a stack trace. |
109 _attachStackTrace(error, stackTrace); | 144 _attachStackTrace(error, stackTrace); |
110 } | 145 } |
111 stream._addError(error); | 146 if (_subscription != null) { |
| 147 _subscription._addError(error); |
| 148 } else if (!_isCancelled) { |
| 149 _addPendingEvent(new _DelayedError(error)); |
| 150 } |
112 } | 151 } |
113 | 152 |
114 /** | 153 /** |
115 * Send or enqueue a "done" message. | 154 * Closes this controller. |
116 * | 155 * |
117 * The "done" message should be sent at most once by a stream, and it | 156 * After closing, no further events may be added using [add] or [addError]. |
118 * should be the last message sent. | 157 * |
| 158 * You are allowed to close the controller more than once, but only the first |
| 159 * call has any effect. |
| 160 * |
| 161 * The first time a controller is closed, a "done" event is sent to its |
| 162 * stream. |
119 */ | 163 */ |
120 void close() { stream._close(); } | 164 void close() { |
| 165 if (isClosed) return; |
| 166 _state |= _STATE_CLOSED; |
| 167 if (_subscription != null) { |
| 168 _subscription._close(); |
| 169 } else if (!_isCancelled) { |
| 170 _addPendingEvent(const _DelayedDone()); |
| 171 } |
| 172 } |
| 173 |
| 174 void _addPendingEvent(_DelayedEvent event) { |
| 175 if (_isCancelled) return; |
| 176 _StreamImplEvents events = _pendingEvents; |
| 177 if (events == null) { |
| 178 events = _pendingEvents = new _StreamImplEvents(); |
| 179 } |
| 180 events.add(event); |
| 181 } |
| 182 |
| 183 void _recordListen(_BufferingStreamSubscription subscription) { |
| 184 assert(_subscription == null); |
| 185 _subscription = subscription; |
| 186 subscription._setPendingEvents(_pendingEvents); |
| 187 _pendingEvents = null; |
| 188 subscription._guardCallback(() { |
| 189 _runGuarded(_onListen); |
| 190 }); |
| 191 } |
| 192 |
| 193 void _recordCancel() { |
| 194 _subscription = null; |
| 195 _state |= _STATE_CANCELLED; |
| 196 _runGuarded(_onCancel); |
| 197 } |
| 198 |
| 199 void _recordPause() { |
| 200 _runGuarded(_onPause); |
| 201 } |
| 202 |
| 203 void _recordResume() { |
| 204 _runGuarded(_onResume); |
| 205 } |
121 } | 206 } |
122 | 207 |
123 typedef void _NotificationHandler(); | 208 typedef void _NotificationHandler(); |
124 | 209 |
125 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { | 210 void _runGuarded(_NotificationHandler notificationHandler) { |
126 _NotificationHandler _onListen; | 211 if (notificationHandler == null) return; |
127 _NotificationHandler _onPause; | 212 try { |
128 _NotificationHandler _onResume; | 213 notificationHandler(); |
129 _NotificationHandler _onCancel; | 214 } catch (e, s) { |
| 215 _throwDelayed(e, s); |
| 216 } |
| 217 } |
130 | 218 |
131 // TODO(floitsch): share this code with _MultiControllerStream. | 219 class _ControllerStream<T> extends _StreamImpl<T> { |
132 _runGuarded(_NotificationHandler notificationHandler) { | 220 StreamController _controller; |
133 if (notificationHandler == null) return; | 221 bool _hasListener = false; |
134 try { | 222 |
135 notificationHandler(); | 223 _ControllerStream(this._controller); |
136 } catch (e, s) { | 224 |
137 _throwDelayed(e, s); | 225 StreamSubscription<T> _createSubscription( |
| 226 void onData(T data), |
| 227 void onError(Object error), |
| 228 void onDone(), |
| 229 bool cancelOnError) { |
| 230 if (_hasListener) { |
| 231 throw new StateError("The stream has already been listened to."); |
138 } | 232 } |
| 233 _hasListener = true; |
| 234 return new _ControllerSubscription<T>( |
| 235 _controller, onData, onError, onDone, cancelOnError); |
139 } | 236 } |
140 | 237 |
141 _SingleControllerStream(this._onListen, | 238 void _onListen(_BufferingStreamSubscription subscription) { |
142 this._onPause, | 239 _controller._recordListen(subscription); |
143 this._onResume, | 240 } |
144 this._onCancel); | 241 } |
145 | 242 |
146 void _onSubscriptionStateChange() { | 243 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
147 _runGuarded(_hasListener ? _onListen : _onCancel); | 244 final StreamController _controller; |
| 245 |
| 246 _ControllerSubscription(StreamController controller, |
| 247 void onData(T data), |
| 248 void onError(Object error), |
| 249 void onDone(), |
| 250 bool cancelOnError) |
| 251 : _controller = controller, |
| 252 super(onData, onError, onDone, cancelOnError); |
| 253 |
| 254 void _onCancel() { |
| 255 _controller._recordCancel(); |
148 } | 256 } |
149 | 257 |
150 void _onPauseStateChange() { | 258 void _onPause() { |
151 _runGuarded(_isPaused ? _onPause : _onResume); | 259 _controller._recordPause(); |
| 260 } |
| 261 |
| 262 void _onResume() { |
| 263 _controller._recordResume(); |
152 } | 264 } |
153 } | 265 } |
OLD | NEW |