OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
| 7 // States shared by single/multi stream implementations. |
| 8 |
| 9 // Completion state of the stream. |
| 10 /// Initial and default state where the stream can receive and send events. |
| 11 const int _STREAM_OPEN = 0; |
| 12 /// The stream has received a request to complete, but hasn't done so yet. |
| 13 /// No further events can be added to the stream. |
| 14 const int _STREAM_CLOSED = 1; |
| 15 /// The stream has completed and will no longer receive or send events. |
| 16 /// Also counts as closed. The stream must not be paused when it's completed. |
| 17 /// Always used in conjunction with [_STREAM_CLOSED]. |
| 18 const int _STREAM_COMPLETE = 2; |
| 19 |
| 20 /// Bit that alternates between events, and listeners are updated to the |
| 21 /// current value when they are notified of the event. |
| 22 const int _STREAM_EVENT_ID = 4; |
| 23 const int _STREAM_EVENT_ID_SHIFT = 2; |
| 24 |
| 25 // The activity state of the stream: What is it currently doing. |
| 26 /// Bit set while firing and clear while not. |
| 27 const int _STREAM_FIRING = 8; |
| 28 /// Bit set while calling a pause-state or subscription-state change callback. |
| 29 const int _STREAM_CALLBACK = 16; |
| 30 |
| 31 // The pause state of the stream. |
| 32 /// Bit set when resuming with pending events. Cleared after all pending events |
| 33 /// have been transmitted. Means that the controller still considers the |
| 34 /// stream paused, even if the listener doesn't. |
| 35 const int _STREAM_PENDING_RESUME = 32; |
| 36 /// The count of times a stream has paused is stored in the |
| 37 /// state, shifted by this amount. |
| 38 const int _STREAM_PAUSE_COUNT_SHIFT = 6; |
| 39 |
| 40 // States for listeners. |
| 41 |
| 42 /// The listener is currently not subscribed to its source stream. |
| 43 const int _LISTENER_UNSUBSCRIBED = 0; |
| 44 /// The listener is actively subscribed to its source stream. |
| 45 const int _LISTENER_SUBSCRIBED = 1; |
| 46 /// The listener is subscribed until it has been notified of the current event. |
| 47 /// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED]. |
| 48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
| 49 |
| 50 /// Bit that contains the last sent event's "id bit". |
| 51 const int _LISTENER_EVENT_ID = 4; |
| 52 const int _LISTENER_EVENT_ID_SHIFT = 2; |
| 53 |
| 54 /// The count of times a listener has paused is stored in the |
| 55 /// state, shifted by this amount. |
| 56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
| 57 |
7 /** Throws the given error in the next cycle. */ | 58 /** Throws the given error in the next cycle. */ |
8 _throwDelayed(var error, [Object stackTrace]) { | 59 _throwDelayed(var error, [Object stackTrace]) { |
9 // We are going to reach the top-level here, but there might be a global | 60 // We are going to reach the top-level here, but there might be a global |
10 // exception handler. This means that we shouldn't print the stack trace. | 61 // exception handler. This means that we shouldn't print the stack trace. |
11 // TODO(floitsch): Find better solution that doesn't print the stack trace | 62 // TODO(floitsch): Find better solution that doesn't print the stack trace |
12 // if there is a global exception handler. | 63 // if there is a global exception handler. |
13 runAsync(() { | 64 runAsync(() { |
14 if (stackTrace != null) print(stackTrace); | 65 if (stackTrace != null) print(stackTrace); |
15 var trace = getAttachedStackTrace(error); | 66 var trace = getAttachedStackTrace(error); |
16 if (trace != null && trace != stackTrace) print(trace); | 67 if (trace != null && trace != stackTrace) print(trace); |
17 throw error; | 68 throw error; |
18 }); | 69 }); |
19 } | 70 } |
20 | 71 |
21 /** Abstract and private interface for a place to put events. */ | |
22 abstract class _EventSink<T> { | |
23 void _add(T data); | |
24 void _addError(Object error); | |
25 void _close(); | |
26 } | |
27 | |
28 /** | |
29 * Abstract and private interface for a place to send events. | |
30 * | |
31 * Used by event buffering to finally dispatch the pending event, where | |
32 * [_EventSink] is where the event first enters the stream subscription, | |
33 * and may yet be buffered. | |
34 */ | |
35 abstract class _EventDispatch<T> { | |
36 void _sendData(T data); | |
37 void _sendError(Object error); | |
38 void _sendDone(); | |
39 } | |
40 | |
41 /** | |
42 * Default implementation of stream subscription of buffering events. | |
43 * | |
44 * The only public methods are those of [StreamSubscription], so instances of | |
45 * [_BufferingStreamSubscription] can be returned directly as a | |
46 * [StreamSubscription] without exposing internal functionality. | |
47 * | |
48 * The [StreamController] is a public facing version of [Stream] and this class, | |
49 * with some methods made public. | |
50 * | |
51 * The user interface of [_BufferingStreamSubscription] are the following | |
52 * methods: | |
53 * * [_add]: Add a data event to the stream. | |
54 * * [_addError]: Add an error event to the stream. | |
55 * * [_close]: Request to close the stream. | |
56 * * [_onCancel]: Called when the subscription will provide no more events, | |
57 * either due to being actively canceled, or after sending a done event. | |
58 * * [_onPause]: Called when the subscription wants the event source to pause. | |
59 * * [_onResume]: Called when allowing new events after a pause. | |
60 * The user should not add new events when the subscription requests a paused, | |
61 * but if it happens anyway, the subscription will enqueue the events just as | |
62 * when new events arrive while still firing an old event. | |
63 */ | |
64 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | |
65 _EventSink<T>, | |
66 _EventDispatch<T> { | |
67 /** The `cancelOnError` flag from the `listen` call. */ | |
68 static const int _STATE_CANCEL_ON_ERROR = 1; | |
69 /** | |
70 * Whether the "done" event has been received. | |
71 * No further events are accepted after this. | |
72 */ | |
73 static const int _STATE_CLOSED = 2; | |
74 /** | |
75 * Set if the input has been asked not to send events. | |
76 * | |
77 * This is not the same as being paused, since the input will remain paused | |
78 * after a call to [resume] if there are pending events. | |
79 */ | |
80 static const int _STATE_INPUT_PAUSED = 4; | |
81 /** | |
82 * Whether the subscription has been canceled. | |
83 * | |
84 * Set by calling [cancel], or by handling a "done" event, or an "error" event | |
85 * when `cancelOnError` is true. | |
86 */ | |
87 static const int _STATE_CANCELED = 8; | |
88 static const int _STATE_IN_CALLBACK = 16; | |
89 static const int _STATE_HAS_PENDING = 32; | |
90 static const int _STATE_PAUSE_COUNT = 64; | |
91 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | |
92 | |
93 /* Event handlers provided in constructor. */ | |
94 /* TODO(7733): Fix Function->_DataHandler<T> when dart2js understands | |
95 * parameterized function types. */ | |
96 Function _onData; | |
97 _ErrorHandler _onError; | |
98 _DoneHandler _onDone; | |
99 | |
100 /** Bit vector based on state-constants above. */ | |
101 int _state; | |
102 | |
103 /** | |
104 * Queue of pending events. | |
105 * | |
106 * Is created when necessary, or set in constructor for preconfigured events. | |
107 */ | |
108 _PendingEvents _pending; | |
109 | |
110 _BufferingStreamSubscription(this._onData, | |
111 this._onError, | |
112 this._onDone, | |
113 bool cancelOnError) | |
114 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | |
115 assert(_onData != null); | |
116 assert(_onError != null); | |
117 assert(_onDone != null); | |
118 } | |
119 | |
120 /** | |
121 * Sets the subscription's pending events object. | |
122 * | |
123 * This can only be done once. The pending events object is used for the | |
124 * rest of the subscription's life cycle. | |
125 */ | |
126 void _setPendingEvents(_PendingEvents pendingEvents) { | |
127 assert(_pending == null); | |
128 if (pendingEvents == null) return; | |
129 _pending = pendingEvents; | |
130 if (!pendingEvents.isEmpty) { | |
131 _state |= _STATE_HAS_PENDING; | |
132 _pending.schedule(this); | |
133 } | |
134 } | |
135 | |
136 /** | |
137 * Extracts the pending events from a canceled stream. | |
138 * | |
139 * This can only be done during the [_onCancel] method call. After that, | |
140 * any remaining pending events will be cleared. | |
141 */ | |
142 _PendingEvents _extractPending() { | |
143 assert(_isCanceled); | |
144 _PendingEvents events = _pending; | |
145 _pending = null; | |
146 return events; | |
147 } | |
148 | |
149 // StreamSubscription interface. | |
150 | |
151 void onData(void handleData(T event)) { | |
152 if (handleData == null) handleData = _nullDataHandler; | |
153 _onData = handleData; | |
154 } | |
155 | |
156 void onError(void handleError(error)) { | |
157 if (handleError == null) handleError = _nullErrorHandler; | |
158 _onError = handleError; | |
159 } | |
160 | |
161 void onDone(void handleDone()) { | |
162 if (handleDone == null) handleDone = _nullDoneHandler; | |
163 _onDone = handleDone; | |
164 } | |
165 | |
166 void pause([Future resumeSignal]) { | |
167 if (_isCanceled) return; | |
168 bool wasPaused = _isPaused; | |
169 bool wasInputPaused = _isInputPaused; | |
170 // Increment pause count and mark input paused (if it isn't already). | |
171 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
172 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
173 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | |
174 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); | |
175 } | |
176 | |
177 void resume() { | |
178 if (_isCanceled) return; | |
179 if (_isPaused) { | |
180 _decrementPauseCount(); | |
181 if (!_isPaused) { | |
182 if (_hasPending && !_pending.isEmpty) { | |
183 // Input is still paused. | |
184 _pending.schedule(this); | |
185 } else { | |
186 assert(_mayResumeInput); | |
187 _state &= ~_STATE_INPUT_PAUSED; | |
188 if (!_inCallback) _guardCallback(_onResume); | |
189 } | |
190 } | |
191 } | |
192 } | |
193 | |
194 void cancel() { | |
195 if (_isCanceled) return; | |
196 _cancel(); | |
197 if (!_inCallback) { | |
198 // otherwise checkState will be called after firing or callback completes. | |
199 _state |= _STATE_IN_CALLBACK; | |
200 _onCancel(); | |
201 _pending = null; | |
202 _state &= ~_STATE_IN_CALLBACK; | |
203 } | |
204 } | |
205 | |
206 Future asFuture([var futureValue]) { | |
207 _FutureImpl<T> result = new _FutureImpl<T>(); | |
208 | |
209 // Overwrite the onDone and onError handlers. | |
210 _onDone = () { result._setValue(futureValue); }; | |
211 _onError = (error) { | |
212 cancel(); | |
213 result._setError(error); | |
214 }; | |
215 | |
216 return result; | |
217 } | |
218 | |
219 // State management. | |
220 | |
221 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | |
222 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | |
223 bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | |
224 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | |
225 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | |
226 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | |
227 bool get _canFire => _state < _STATE_IN_CALLBACK; | |
228 bool get _mayResumeInput => | |
229 !_isPaused && (_pending == null || _pending.isEmpty); | |
230 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | |
231 | |
232 bool get isPaused => _isPaused; | |
233 | |
234 void _cancel() { | |
235 _state |= _STATE_CANCELED; | |
236 if (_hasPending) { | |
237 _pending.cancelSchedule(); | |
238 } | |
239 } | |
240 | |
241 /** | |
242 * Increment the pause count. | |
243 * | |
244 * Also marks input as paused. | |
245 */ | |
246 void _incrementPauseCount() { | |
247 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
248 } | |
249 | |
250 /** | |
251 * Decrements the pause count. | |
252 * | |
253 * Does not automatically unpause the input (call [_onResume]) when | |
254 * the pause count reaches zero. This is handled elsewhere, and only | |
255 * if there are no pending events buffered. | |
256 */ | |
257 void _decrementPauseCount() { | |
258 assert(_isPaused); | |
259 _state -= _STATE_PAUSE_COUNT; | |
260 } | |
261 | |
262 // _EventSink interface. | |
263 | |
264 void _add(T data) { | |
265 assert(!_isClosed); | |
266 if (_isCanceled) return; | |
267 if (_canFire) { | |
268 _sendData(data); | |
269 } else { | |
270 _addPending(new _DelayedData(data)); | |
271 } | |
272 } | |
273 | |
274 void _addError(Object error) { | |
275 if (_isCanceled) return; | |
276 if (_canFire) { | |
277 _sendError(error); // Reports cancel after sending. | |
278 } else { | |
279 _addPending(new _DelayedError(error)); | |
280 } | |
281 } | |
282 | |
283 void _close() { | |
284 assert(!_isClosed); | |
285 if (_isCanceled) return; | |
286 _state |= _STATE_CLOSED; | |
287 if (_canFire) { | |
288 _sendDone(); | |
289 } else { | |
290 _addPending(const _DelayedDone()); | |
291 } | |
292 } | |
293 | |
294 // Hooks called when the input is paused, unpaused or canceled. | |
295 // These must not throw. If overwritten to call user code, include suitable | |
296 // try/catch wrapping and send any errors to [_throwDelayed]. | |
297 void _onPause() { | |
298 assert(_isInputPaused); | |
299 } | |
300 | |
301 void _onResume() { | |
302 assert(!_isInputPaused); | |
303 } | |
304 | |
305 void _onCancel() { | |
306 assert(_isCanceled); | |
307 } | |
308 | |
309 // Handle pending events. | |
310 | |
311 /** | |
312 * Add a pending event. | |
313 * | |
314 * If the subscription is not paused, this also schedules a firing | |
315 * of pending events later (if necessary). | |
316 */ | |
317 void _addPending(_DelayedEvent event) { | |
318 _StreamImplEvents pending = _pending; | |
319 if (_pending == null) pending = _pending = new _StreamImplEvents(); | |
320 pending.add(event); | |
321 if (!_hasPending) { | |
322 _state |= _STATE_HAS_PENDING; | |
323 if (!_isPaused) { | |
324 _pending.schedule(this); | |
325 } | |
326 } | |
327 } | |
328 | |
329 /* _EventDispatch interface. */ | |
330 | |
331 void _sendData(T data) { | |
332 assert(!_isCanceled); | |
333 assert(!_isPaused); | |
334 assert(!_inCallback); | |
335 bool wasInputPaused = _isInputPaused; | |
336 _state |= _STATE_IN_CALLBACK; | |
337 try { | |
338 _onData(data); | |
339 } catch (e, s) { | |
340 _throwDelayed(e, s); | |
341 } | |
342 _state &= ~_STATE_IN_CALLBACK; | |
343 _checkState(wasInputPaused); | |
344 } | |
345 | |
346 void _sendError(var error) { | |
347 assert(!_isCanceled); | |
348 assert(!_isPaused); | |
349 assert(!_inCallback); | |
350 bool wasInputPaused = _isInputPaused; | |
351 _state |= _STATE_IN_CALLBACK; | |
352 try { | |
353 _onError(error); | |
354 } catch (e, s) { | |
355 _throwDelayed(e, s); | |
356 } | |
357 _state &= ~_STATE_IN_CALLBACK; | |
358 if (_cancelOnError) { | |
359 _cancel(); | |
360 } | |
361 _checkState(wasInputPaused); | |
362 } | |
363 | |
364 void _sendDone() { | |
365 assert(!_isCanceled); | |
366 assert(!_isPaused); | |
367 assert(!_inCallback); | |
368 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | |
369 try { | |
370 _onDone(); | |
371 } catch (e, s) { | |
372 _throwDelayed(e, s); | |
373 } | |
374 _onCancel(); // No checkState after cancel, it is always the last event. | |
375 _state &= ~_STATE_IN_CALLBACK; | |
376 } | |
377 | |
378 /** | |
379 * Call a hook function. | |
380 * | |
381 * The call is properly wrapped in code to avoid other callbacks | |
382 * during the call, and it checks for state changes after the call | |
383 * that should cause further callbacks. | |
384 */ | |
385 void _guardCallback(callback) { | |
386 assert(!_inCallback); | |
387 bool wasInputPaused = _isInputPaused; | |
388 _state |= _STATE_IN_CALLBACK; | |
389 callback(); | |
390 _state &= ~_STATE_IN_CALLBACK; | |
391 _checkState(wasInputPaused); | |
392 } | |
393 | |
394 /** | |
395 * Check if the input needs to be informed of state changes. | |
396 * | |
397 * State changes are pausing, resuming and canceling. | |
398 * | |
399 * After canceling, no further callbacks will happen. | |
400 * | |
401 * The cancel callback is called after a user cancel, or after | |
402 * the final done event is sent. | |
403 */ | |
404 void _checkState(bool wasInputPaused) { | |
405 assert(!_inCallback); | |
406 if (_hasPending && _pending.isEmpty) { | |
407 _state &= ~_STATE_HAS_PENDING; | |
408 if (_isInputPaused && _mayResumeInput) { | |
409 _state &= ~_STATE_INPUT_PAUSED; | |
410 } | |
411 } | |
412 // If the state changes during a callback, we immediately | |
413 // make a new state-change callback. Loop until the state didn't change. | |
414 while (true) { | |
415 if (_isCanceled) { | |
416 _onCancel(); | |
417 _pending = null; | |
418 return; | |
419 } | |
420 bool isInputPaused = _isInputPaused; | |
421 if (wasInputPaused == isInputPaused) break; | |
422 _state ^= _STATE_IN_CALLBACK; | |
423 if (isInputPaused) { | |
424 _onPause(); | |
425 } else { | |
426 _onResume(); | |
427 } | |
428 _state &= ~_STATE_IN_CALLBACK; | |
429 wasInputPaused = isInputPaused; | |
430 } | |
431 if (_hasPending && !_isPaused) { | |
432 _pending.schedule(this); | |
433 } | |
434 } | |
435 } | |
436 | 72 |
437 // ------------------------------------------------------------------- | 73 // ------------------------------------------------------------------- |
438 // Common base class for single and multi-subscription streams. | 74 // Common base class for single and multi-subscription streams. |
439 // ------------------------------------------------------------------- | 75 // ------------------------------------------------------------------- |
440 abstract class _StreamImpl<T> extends Stream<T> { | 76 abstract class _StreamImpl<T> extends Stream<T> { |
| 77 /** Current state of the stream. */ |
| 78 int _state = _STREAM_OPEN; |
| 79 |
| 80 /** |
| 81 * List of pending events. |
| 82 * |
| 83 * If events are added to the stream (using [_add], [_addError] or [_done]) |
| 84 * while the stream is paused, or while another event is firing, events will |
| 85 * stored here. |
| 86 * Also supports scheduling the events for later execution. |
| 87 */ |
| 88 _PendingEvents _pendingEvents; |
| 89 |
441 // ------------------------------------------------------------------ | 90 // ------------------------------------------------------------------ |
442 // Stream interface. | 91 // Stream interface. |
443 | 92 |
444 StreamSubscription<T> listen(void onData(T data), | 93 StreamSubscription<T> listen(void onData(T data), |
445 { void onError(error), | 94 { void onError(error), |
446 void onDone(), | 95 void onDone(), |
447 bool cancelOnError }) { | 96 bool cancelOnError }) { |
| 97 if (_isComplete) { |
| 98 return new _DoneSubscription(onDone); |
| 99 } |
448 if (onData == null) onData = _nullDataHandler; | 100 if (onData == null) onData = _nullDataHandler; |
449 if (onError == null) onError = _nullErrorHandler; | 101 if (onError == null) onError = _nullErrorHandler; |
450 if (onDone == null) onDone = _nullDoneHandler; | 102 if (onDone == null) onDone = _nullDoneHandler; |
451 cancelOnError = identical(true, cancelOnError); | 103 cancelOnError = identical(true, cancelOnError); |
452 StreamSubscription subscription = | 104 _StreamSubscriptionImpl subscription = |
453 _createSubscription(onData, onError, onDone, cancelOnError); | 105 _createSubscription(onData, onError, onDone, cancelOnError); |
454 _onListen(subscription); | 106 _addListener(subscription); |
455 return subscription; | 107 return subscription; |
456 } | 108 } |
457 | 109 |
| 110 // ------------------------------------------------------------------ |
| 111 // EventSink interface-like methods for sending events into the stream. |
| 112 // It's the responsibility of the caller to ensure that the stream is not |
| 113 // paused when adding events. If the stream is paused, the events will be |
| 114 // queued, but it's better to not send events at all. |
| 115 |
| 116 /** |
| 117 * Send or queue a data event. |
| 118 */ |
| 119 void _add(T value) { |
| 120 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 121 if (!_mayFireState) { |
| 122 // Not the time to send events. |
| 123 _addPendingEvent(new _DelayedData<T>(value)); |
| 124 return; |
| 125 } |
| 126 if (_hasPendingEvent) { |
| 127 _addPendingEvent(new _DelayedData<T>(value)); |
| 128 } else { |
| 129 _sendData(value); |
| 130 } |
| 131 _handlePendingEvents(); |
| 132 } |
| 133 |
| 134 /** |
| 135 * Send or enqueue an error event. |
| 136 * |
| 137 * If a subscription has requested to be unsubscribed on errors, |
| 138 * it will be unsubscribed after receiving this event. |
| 139 */ |
| 140 void _addError(error) { |
| 141 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 142 if (!_mayFireState) { |
| 143 // Not the time to send events. |
| 144 _addPendingEvent(new _DelayedError(error)); |
| 145 return; |
| 146 } |
| 147 if (_hasPendingEvent) { |
| 148 _addPendingEvent(new _DelayedError(error)); |
| 149 } else { |
| 150 _sendError(error); |
| 151 } |
| 152 _handlePendingEvents(); |
| 153 } |
| 154 |
| 155 /** |
| 156 * Send or enqueue a "done" message. |
| 157 * |
| 158 * The "done" message should be sent at most once by a stream, and it |
| 159 * should be the last message sent. |
| 160 */ |
| 161 void _close() { |
| 162 if (_isClosed) return; |
| 163 _state |= _STREAM_CLOSED; |
| 164 if (!_mayFireState) { |
| 165 // Not the time to send events. |
| 166 _addPendingEvent(const _DelayedDone()); |
| 167 return; |
| 168 } |
| 169 if (_hasPendingEvent) { |
| 170 _addPendingEvent(new _DelayedDone()); |
| 171 _handlePendingEvents(); |
| 172 } else { |
| 173 _sendDone(); |
| 174 assert(_isComplete); |
| 175 assert(!_hasPendingEvent); |
| 176 } |
| 177 } |
| 178 |
458 // ------------------------------------------------------------------- | 179 // ------------------------------------------------------------------- |
| 180 // Internal implementation. |
| 181 |
| 182 // State predicates. |
| 183 |
| 184 // Lifecycle state. |
| 185 /** Whether the stream is in the default, open, state for events. */ |
| 186 bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0; |
| 187 |
| 188 /** Whether the stream has been closed (a done event requested). */ |
| 189 bool get _isClosed => (_state & _STREAM_CLOSED) != 0; |
| 190 |
| 191 /** Whether the stream is completed. */ |
| 192 bool get _isComplete => (_state & _STREAM_COMPLETE) != 0; |
| 193 |
| 194 // Pause state. |
| 195 |
| 196 /** Whether one or more active subscribers have requested a pause. */ |
| 197 bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT); |
| 198 |
| 199 /** How many times the stream has been paused. */ |
| 200 int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT; |
| 201 |
| 202 /** |
| 203 * Whether a controller thinks the stream is paused. |
| 204 * |
| 205 * When this changes, a pause-state change callback is performed. |
| 206 * |
| 207 * It may differ from [_isPaused] if there are pending events |
| 208 * in the queue when the listeners resume. The controller won't |
| 209 * be informed until all queued events have been fired. |
| 210 */ |
| 211 bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME); |
| 212 |
| 213 /** Whether we have a pending resume scheduled. */ |
| 214 bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0; |
| 215 |
| 216 |
| 217 // Action state. If the stream makes a call-out to external code, |
| 218 // this state tracks it and avoids reentrancy problems. |
| 219 |
| 220 /** Whether the stream is not currently firing or calling a callback. */ |
| 221 bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0; |
| 222 |
| 223 /** Whether we are currently executing a state-chance callback. */ |
| 224 bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0; |
| 225 |
| 226 /** Whether we are currently firing an event. */ |
| 227 bool get _isFiring => (_state & _STREAM_FIRING) != 0; |
| 228 |
| 229 /** Check whether the pending event queue is non-empty */ |
| 230 bool get _hasPendingEvent => |
| 231 _pendingEvents != null && !_pendingEvents.isEmpty; |
| 232 |
| 233 /** |
| 234 * The bit representing the current or last event fired. |
| 235 * |
| 236 * This bit matches a bit on listeners that have received the corresponding |
| 237 * event. It is toggled for each new event being fired. |
| 238 */ |
| 239 int get _currentEventIdBit => |
| 240 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
| 241 |
| 242 /** Whether there is currently a subscriber on this [Stream]. */ |
| 243 bool get _hasListener; |
| 244 |
| 245 |
| 246 /** Whether the state bits allow firing. */ |
| 247 bool get _mayFireState { |
| 248 // The state allows firing unless: |
| 249 // - it's currently firing |
| 250 // - it's currently in a callback |
| 251 // - it's paused |
| 252 const int mask = |
| 253 _STREAM_FIRING | |
| 254 _STREAM_CALLBACK | |
| 255 ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1); |
| 256 return (_state & mask) == 0; |
| 257 } |
| 258 |
| 259 // State modification. |
| 260 |
| 261 /** Record an increases in the number of times the listener has paused. */ |
| 262 void _incrementPauseCount(_StreamListener<T> listener) { |
| 263 listener._incrementPauseCount(); |
| 264 _state &= ~_STREAM_PENDING_RESUME; |
| 265 _updatePauseCount(1); |
| 266 } |
| 267 |
| 268 /** Record a decrease in the number of times the listener has paused. */ |
| 269 void _decrementPauseCount(_StreamListener<T> listener) { |
| 270 assert(_isPaused); |
| 271 listener._decrementPauseCount(); |
| 272 _updatePauseCount(-1); |
| 273 } |
| 274 |
| 275 /** Update the stream's own pause count only. */ |
| 276 void _updatePauseCount(int by) { |
| 277 int oldState = _state; |
| 278 // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js |
| 279 // converts the result of the left-shift to a positive number. |
| 280 if (by >= 0) { |
| 281 _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT); |
| 282 } else { |
| 283 _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT); |
| 284 } |
| 285 assert(_state >= 0); |
| 286 assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) == |
| 287 (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by); |
| 288 } |
| 289 |
| 290 void _setClosed() { |
| 291 assert(!_isClosed); |
| 292 _state |= _STREAM_CLOSED; |
| 293 } |
| 294 |
| 295 void _setComplete() { |
| 296 assert(_isClosed); |
| 297 _state = _state |_STREAM_COMPLETE; |
| 298 } |
| 299 |
| 300 void _startFiring() { |
| 301 assert(!_isFiring); |
| 302 assert(!_isInCallback); |
| 303 assert(_hasListener); |
| 304 assert(!_isPaused); |
| 305 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
| 306 // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
| 307 // that doesn't match _STREAM_EVENT_ID, and they will receive the |
| 308 // event being fired. |
| 309 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
| 310 } |
| 311 |
| 312 void _endFiring(bool wasInputPaused) { |
| 313 assert(_isFiring); |
| 314 _state ^= _STREAM_FIRING; |
| 315 // Had listeners, or we wouldn't have fired. |
| 316 _checkCallbacks(true, wasInputPaused); |
| 317 } |
| 318 |
| 319 /** |
| 320 * Record that a listener wants a pause from events. |
| 321 * |
| 322 * This methods is called from [_StreamListener.pause()]. |
| 323 * Subclasses can override this method, along with [isPaused] and |
| 324 * [createSubscription], if they want to do a different handling of paused |
| 325 * subscriptions, e.g., a filtering stream pausing its own source if all its |
| 326 * subscribers are paused. |
| 327 */ |
| 328 void _pause(_StreamListener<T> listener, Future resumeSignal) { |
| 329 assert(identical(listener._source, this)); |
| 330 if (!listener._isSubscribed) { |
| 331 throw new StateError("Subscription has been canceled."); |
| 332 } |
| 333 assert(!_isComplete); // There can be no subscribers when complete. |
| 334 bool wasInputPaused = _isInputPaused; |
| 335 bool wasPaused = _isPaused; |
| 336 _incrementPauseCount(listener); |
| 337 if (resumeSignal != null) { |
| 338 resumeSignal.whenComplete(() { this._resume(listener, true); }); |
| 339 } |
| 340 if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) { |
| 341 _pendingEvents.cancelSchedule(); |
| 342 } |
| 343 if (_isInactive && !wasInputPaused) { |
| 344 _checkCallbacks(true, false); |
| 345 if (!_isPaused && _hasPendingEvent) { |
| 346 _schedulePendingEvents(); |
| 347 } |
| 348 } |
| 349 } |
| 350 |
| 351 /** Stops pausing due to one request from the given listener. */ |
| 352 void _resume(_StreamListener<T> listener, bool fromEvent) { |
| 353 if (!listener.isPaused) return; |
| 354 assert(listener._isSubscribed); |
| 355 assert(_isPaused); |
| 356 _decrementPauseCount(listener); |
| 357 if (!_isPaused) { |
| 358 if (_hasPendingEvent) { |
| 359 _state |= _STREAM_PENDING_RESUME; |
| 360 // Controller's pause state hasn't changed. |
| 361 // If we can fire events now, fire any pending events right away. |
| 362 if (_isInactive) { |
| 363 if (fromEvent) { |
| 364 _handlePendingEvents(); |
| 365 } else { |
| 366 _schedulePendingEvents(); |
| 367 } |
| 368 } |
| 369 } else if (_isInactive) { |
| 370 _checkCallbacks(true, true); |
| 371 if (!_isPaused && _hasPendingEvent) { |
| 372 if (fromEvent) { |
| 373 _handlePendingEvents(); |
| 374 } else { |
| 375 _schedulePendingEvents(); |
| 376 } |
| 377 } |
| 378 } |
| 379 } |
| 380 } |
| 381 |
| 382 /** Schedule pending events to be executed. */ |
| 383 void _schedulePendingEvents() { |
| 384 assert(_hasPendingEvent); |
| 385 _pendingEvents.schedule(this); |
| 386 } |
| 387 |
459 /** Create a subscription object. Called by [subcribe]. */ | 388 /** Create a subscription object. Called by [subcribe]. */ |
460 _BufferingStreamSubscription<T> _createSubscription( | 389 _StreamSubscriptionImpl<T> _createSubscription( |
| 390 void onData(T data), |
| 391 void onError(error), |
| 392 void onDone(), |
| 393 bool cancelOnError); |
| 394 |
| 395 /** |
| 396 * Adds a listener to this stream. |
| 397 */ |
| 398 void _addListener(_StreamSubscriptionImpl subscription); |
| 399 |
| 400 /** |
| 401 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 402 * |
| 403 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| 404 * |
| 405 * If an event is currently firing, the cancel is delayed |
| 406 * until after the subscribers have received the event. |
| 407 */ |
| 408 void _cancel(_StreamSubscriptionImpl subscriber); |
| 409 |
| 410 /** |
| 411 * Iterate over all current subscribers and perform an action on each. |
| 412 * |
| 413 * Subscribers added during the iteration will not be visited. |
| 414 * Subscribers unsubscribed during the iteration will only be removed |
| 415 * after they have been acted on. |
| 416 * |
| 417 * Any change in the pause state is only reported after all subscribers have |
| 418 * received the event. |
| 419 * |
| 420 * The [action] must not throw, or the controller will be left in an |
| 421 * invalid state. |
| 422 * |
| 423 * This method must not be called while [isFiring] is true. |
| 424 */ |
| 425 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
| 426 |
| 427 /** |
| 428 * Checks whether the subscription/pause state has changed. |
| 429 * |
| 430 * Calls the appropriate callback if the state has changed from the |
| 431 * provided one. Repeats calling callbacks as long as the call changes |
| 432 * the state. |
| 433 */ |
| 434 void _checkCallbacks(bool hadListener, bool wasPaused) { |
| 435 assert(!_isFiring); |
| 436 // Will be handled after the current callback. |
| 437 if (_isInCallback) return; |
| 438 if (_hasPendingResume && !_hasPendingEvent) { |
| 439 _state ^= _STREAM_PENDING_RESUME; |
| 440 } |
| 441 _state |= _STREAM_CALLBACK; |
| 442 while (true) { |
| 443 bool hasListener = _hasListener; |
| 444 bool isPaused = _isInputPaused; |
| 445 if (hadListener != hasListener) { |
| 446 _onSubscriptionStateChange(); |
| 447 } else if (isPaused != wasPaused) { |
| 448 _onPauseStateChange(); |
| 449 } else { |
| 450 _state ^= _STREAM_CALLBACK; |
| 451 return; |
| 452 } |
| 453 wasPaused = isPaused; |
| 454 hadListener = hasListener; |
| 455 } |
| 456 } |
| 457 |
| 458 /** |
| 459 * Called when the first subscriber requests a pause or the last a resume. |
| 460 * |
| 461 * Read [isPaused] to see the new state. |
| 462 */ |
| 463 void _onPauseStateChange() {} |
| 464 |
| 465 /** |
| 466 * Called when the first listener subscribes or the last unsubscribes. |
| 467 * |
| 468 * Read [hasListener] to see what the new state is. |
| 469 */ |
| 470 void _onSubscriptionStateChange() {} |
| 471 |
| 472 /** |
| 473 * Add a pending event at the end of the pending event queue. |
| 474 * |
| 475 * Schedules events if currently not paused and inside a callback. |
| 476 */ |
| 477 void _addPendingEvent(_DelayedEvent event) { |
| 478 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
| 479 _StreamImplEvents events = _pendingEvents; |
| 480 events.add(event); |
| 481 if (_isPaused || _isFiring) return; |
| 482 if (_isInCallback) { |
| 483 _schedulePendingEvents(); |
| 484 return; |
| 485 } |
| 486 } |
| 487 |
| 488 /** Fire any pending events until the pending event queue is empty. */ |
| 489 void _handlePendingEvents() { |
| 490 assert(_isInactive); |
| 491 if (!_hasPendingEvent) return; |
| 492 _PendingEvents events = _pendingEvents; |
| 493 do { |
| 494 if (_isPaused) return; |
| 495 if (events.isScheduled) events.cancelSchedule(); |
| 496 events.handleNext(this); |
| 497 } while (!events.isEmpty); |
| 498 } |
| 499 |
| 500 /** |
| 501 * Send a data event directly to each subscriber. |
| 502 */ |
| 503 _sendData(T value) { |
| 504 assert(!_isPaused); |
| 505 assert(!_isComplete); |
| 506 if (!_hasListener) return; |
| 507 _forEachSubscriber((subscriber) { |
| 508 try { |
| 509 subscriber._sendData(value); |
| 510 } catch (e, s) { |
| 511 _throwDelayed(e, s); |
| 512 } |
| 513 }); |
| 514 } |
| 515 |
| 516 /** |
| 517 * Sends an error event directly to each subscriber. |
| 518 */ |
| 519 void _sendError(error) { |
| 520 assert(!_isPaused); |
| 521 assert(!_isComplete); |
| 522 if (!_hasListener) return; |
| 523 _forEachSubscriber((subscriber) { |
| 524 try { |
| 525 subscriber._sendError(error); |
| 526 } catch (e, s) { |
| 527 _throwDelayed(e, s); |
| 528 } |
| 529 }); |
| 530 } |
| 531 |
| 532 /** |
| 533 * Sends the "done" message directly to each subscriber. |
| 534 * This automatically stops further subscription and |
| 535 * unsubscribes all subscribers. |
| 536 */ |
| 537 void _sendDone() { |
| 538 assert(!_isPaused); |
| 539 assert(_isClosed); |
| 540 _setComplete(); |
| 541 if (!_hasListener) return; |
| 542 _forEachSubscriber((subscriber) { |
| 543 _cancel(subscriber); |
| 544 try { |
| 545 subscriber._sendDone(); |
| 546 } catch (e, s) { |
| 547 _throwDelayed(e, s); |
| 548 } |
| 549 }); |
| 550 assert(!_hasListener); |
| 551 } |
| 552 } |
| 553 |
| 554 // ------------------------------------------------------------------- |
| 555 // Default implementation of a stream with a single subscriber. |
| 556 // ------------------------------------------------------------------- |
| 557 /** |
| 558 * Default implementation of stream capable of sending events to one subscriber. |
| 559 * |
| 560 * Any class needing to implement [Stream] can either directly extend this |
| 561 * class, or extend [Stream] and delegate the subscribe method to an instance |
| 562 * of this class. |
| 563 * |
| 564 * The only public methods are those of [Stream], so instances of |
| 565 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
| 566 * internal functionality. |
| 567 * |
| 568 * The [StreamController] is a public facing version of this class, with |
| 569 * some methods made public. |
| 570 * |
| 571 * The user interface of [_SingleStreamImpl] are the following methods: |
| 572 * * [_add]: Add a data event to the stream. |
| 573 * * [_addError]: Add an error event to the stream. |
| 574 * * [_close]: Request to close the stream. |
| 575 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
| 576 * when losing the last subscriber. |
| 577 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 578 * * [_hasListener]: Test whether there are currently any subscribers. |
| 579 * * [_isInputPaused]: Test whether the stream is currently paused. |
| 580 * The user should not add new events while the stream is paused, but if it |
| 581 * happens anyway, the stream will enqueue the events just as when new events |
| 582 * arrive while still firing an old event. |
| 583 */ |
| 584 class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| 585 _StreamListener _subscriber = null; |
| 586 |
| 587 /** Whether there is currently a subscriber on this [Stream]. */ |
| 588 bool get _hasListener => _subscriber != null; |
| 589 |
| 590 // ------------------------------------------------------------------- |
| 591 // Internal implementation. |
| 592 |
| 593 _SingleStreamImpl() { |
| 594 // Start out paused. |
| 595 _updatePauseCount(1); |
| 596 } |
| 597 |
| 598 /** |
| 599 * Create the new subscription object. |
| 600 */ |
| 601 _StreamSubscriptionImpl<T> _createSubscription( |
461 void onData(T data), | 602 void onData(T data), |
462 void onError(error), | 603 void onError(error), |
463 void onDone(), | 604 void onDone(), |
464 bool cancelOnError) { | 605 bool cancelOnError) { |
465 return new _BufferingStreamSubscription<T>( | 606 return new _StreamSubscriptionImpl<T>( |
466 onData, onError, onDone, cancelOnError); | 607 this, onData, onError, onDone, cancelOnError); |
467 } | 608 } |
468 | 609 |
469 /** Hook called when the subscription has been created. */ | 610 void _addListener(_StreamListener subscription) { |
470 void _onListen(StreamSubscription subscription) {} | 611 assert(!_isComplete); |
| 612 if (_hasListener) { |
| 613 throw new StateError("Stream already has subscriber."); |
| 614 } |
| 615 assert(_pauseCount == 1); |
| 616 _updatePauseCount(-1); |
| 617 _subscriber = subscription; |
| 618 subscription._setSubscribed(0); |
| 619 if (_isInactive) { |
| 620 _checkCallbacks(false, true); |
| 621 if (!_isPaused && _hasPendingEvent) { |
| 622 _schedulePendingEvents(); |
| 623 } |
| 624 } |
| 625 } |
| 626 |
| 627 /** |
| 628 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| 629 * |
| 630 * This method is called from [_StreamSubscriptionImpl.cancel]. |
| 631 */ |
| 632 void _cancel(_StreamListener subscriber) { |
| 633 assert(identical(subscriber._source, this)); |
| 634 // We allow unsubscribing the currently firing subscription during |
| 635 // the event firing, because it is indistinguishable from delaying it since |
| 636 // that event has already received the event. |
| 637 if (!identical(_subscriber, subscriber)) { |
| 638 // You may unsubscribe more than once, only the first one counts. |
| 639 return; |
| 640 } |
| 641 _subscriber = null; |
| 642 // Unsubscribing a paused subscription also cancels its pauses. |
| 643 int resumeCount = subscriber._setUnsubscribed(); |
| 644 // Keep being paused while there is no subscriber and the stream is not |
| 645 // complete. |
| 646 _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1); |
| 647 if (_isInactive) { |
| 648 _checkCallbacks(true, resumeCount > 0); |
| 649 if (!_isPaused && _hasPendingEvent) { |
| 650 _schedulePendingEvents(); |
| 651 } |
| 652 } |
| 653 } |
| 654 |
| 655 void _forEachSubscriber( |
| 656 void action(_StreamListener<T> subscription)) { |
| 657 assert(!_isPaused); |
| 658 bool wasInputPaused = _isInputPaused; |
| 659 _StreamListener subscription = _subscriber; |
| 660 assert(subscription != null); |
| 661 _startFiring(); |
| 662 action(subscription); |
| 663 _endFiring(wasInputPaused); |
| 664 } |
471 } | 665 } |
472 | 666 |
473 typedef _PendingEvents _EventGenerator(); | 667 // ------------------------------------------------------------------- |
| 668 // Default implementation of a stream with subscribers. |
| 669 // ------------------------------------------------------------------- |
| 670 |
| 671 /** |
| 672 * Default implementation of stream capable of sending events to subscribers. |
| 673 * |
| 674 * Any class needing to implement [Stream] can either directly extend this |
| 675 * class, or extend [Stream] and delegate the subscribe method to an instance |
| 676 * of this class. |
| 677 * |
| 678 * The only public methods are those of [Stream], so instances of |
| 679 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
| 680 * internal functionality. |
| 681 * |
| 682 * The [StreamController] is a public facing version of this class, with |
| 683 * some methods made public. |
| 684 * |
| 685 * The user interface of [_MultiStreamImpl] are the following methods: |
| 686 * * [_add]: Add a data event to the stream. |
| 687 * * [_addError]: Add an error event to the stream. |
| 688 * * [_close]: Request to close the stream. |
| 689 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
| 690 * when losing the last subscriber. |
| 691 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 692 * * [_hasListener]: Test whether there are currently any subscribers. |
| 693 * * [_isPaused]: Test whether the stream is currently paused. |
| 694 * The user should not add new events while the stream is paused, but if it |
| 695 * happens anyway, the stream will enqueue the events just as when new events |
| 696 * arrive while still firing an old event. |
| 697 */ |
| 698 class _MultiStreamImpl<T> extends _StreamImpl<T> |
| 699 implements _InternalLinkList { |
| 700 // Link list implementation (mixin when possible). |
| 701 _InternalLink _nextLink; |
| 702 _InternalLink _previousLink; |
| 703 |
| 704 _MultiStreamImpl() { |
| 705 _nextLink = _previousLink = this; |
| 706 } |
| 707 |
| 708 bool get isBroadcast => true; |
| 709 |
| 710 Stream<T> asBroadcastStream() => this; |
| 711 |
| 712 // ------------------------------------------------------------------ |
| 713 // Helper functions that can be overridden in subclasses. |
| 714 |
| 715 /** Whether there are currently any subscribers on this [Stream]. */ |
| 716 bool get _hasListener => !_InternalLinkList.isEmpty(this); |
| 717 |
| 718 /** |
| 719 * Create the new subscription object. |
| 720 */ |
| 721 _StreamListener<T> _createSubscription( |
| 722 void onData(T data), |
| 723 void onError(error), |
| 724 void onDone(), |
| 725 bool cancelOnError) { |
| 726 return new _StreamSubscriptionImpl<T>( |
| 727 this, onData, onError, onDone, cancelOnError); |
| 728 } |
| 729 |
| 730 // ------------------------------------------------------------------- |
| 731 // Internal implementation. |
| 732 |
| 733 /** |
| 734 * Iterate over all current subscribers and perform an action on each. |
| 735 * |
| 736 * The set of subscribers cannot be modified during this iteration. |
| 737 * All attempts to add or unsubscribe subscribers will be delayed until |
| 738 * after the iteration is complete. |
| 739 * |
| 740 * The [action] must not throw, or the controller will be left in an |
| 741 * invalid state. |
| 742 * |
| 743 * This method must not be called while [isFiring] is true. |
| 744 */ |
| 745 void _forEachSubscriber( |
| 746 void action(_StreamListener<T> subscription)) { |
| 747 assert(!_isFiring); |
| 748 if (!_hasListener) return; |
| 749 bool wasInputPaused = _isInputPaused; |
| 750 _startFiring(); |
| 751 _InternalLink cursor = this._nextLink; |
| 752 while (!identical(cursor, this)) { |
| 753 _StreamListener<T> current = cursor; |
| 754 if (current._needsEvent(_currentEventIdBit)) { |
| 755 action(current); |
| 756 // Marks as having received the event. |
| 757 current._toggleEventReceived(); |
| 758 } |
| 759 cursor = current._nextLink; |
| 760 if (current._isPendingUnsubscribe) { |
| 761 _removeListener(current); |
| 762 } |
| 763 } |
| 764 _endFiring(wasInputPaused); |
| 765 } |
| 766 |
| 767 void _addListener(_StreamListener listener) { |
| 768 listener._setSubscribed(_currentEventIdBit); |
| 769 bool hadListener = _hasListener; |
| 770 _InternalLinkList.add(this, listener); |
| 771 if (!hadListener && _isInactive) { |
| 772 _checkCallbacks(false, false); |
| 773 if (!_isPaused && _hasPendingEvent) { |
| 774 _schedulePendingEvents(); |
| 775 } |
| 776 } |
| 777 } |
| 778 |
| 779 /** |
| 780 * Handle a cancel requested from a [_StreamListener]. |
| 781 * |
| 782 * This method is called from [_StreamListener.cancel]. |
| 783 * |
| 784 * If an event is currently firing, the cancel is delayed |
| 785 * until after the subscribers have received the event. |
| 786 */ |
| 787 void _cancel(_StreamListener listener) { |
| 788 assert(identical(listener._source, this)); |
| 789 if (_InternalLink.isUnlinked(listener)) { |
| 790 // You may unsubscribe more than once, only the first one counts. |
| 791 return; |
| 792 } |
| 793 if (_isFiring) { |
| 794 if (listener._needsEvent(_currentEventIdBit)) { |
| 795 assert(listener._isSubscribed); |
| 796 listener._setPendingUnsubscribe(_currentEventIdBit); |
| 797 } else { |
| 798 // The listener has been notified of the event (or don't need to, |
| 799 // if it's still pending subscription) so it's safe to remove it. |
| 800 _removeListener(listener); |
| 801 } |
| 802 // Pause and subscription state changes are reported when we end |
| 803 // firing. |
| 804 } else { |
| 805 bool wasInputPaused = _isInputPaused; |
| 806 _removeListener(listener); |
| 807 if (_isInactive) { |
| 808 _checkCallbacks(true, wasInputPaused); |
| 809 if (!_isPaused && _hasPendingEvent) { |
| 810 _schedulePendingEvents(); |
| 811 } |
| 812 } |
| 813 } |
| 814 } |
| 815 |
| 816 /** |
| 817 * Removes a listener from this stream and cancels its pauses. |
| 818 * |
| 819 * This is a low-level action that doesn't call [_onSubscriptionStateChange]. |
| 820 * or [_callOnPauseStateChange]. |
| 821 */ |
| 822 void _removeListener(_StreamListener listener) { |
| 823 int pauseCount = listener._setUnsubscribed(); |
| 824 _InternalLinkList.remove(listener); |
| 825 if (pauseCount > 0) { |
| 826 _updatePauseCount(-pauseCount); |
| 827 if (!_isPaused && _hasPendingEvent) { |
| 828 _state |= _STREAM_PENDING_RESUME; |
| 829 } |
| 830 } |
| 831 } |
| 832 } |
| 833 |
474 | 834 |
475 /** Stream that generates its own events. */ | 835 /** Stream that generates its own events. */ |
476 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 836 class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
477 final _EventGenerator _pending; | 837 /** |
478 /** | 838 * Initializes the stream to have only the events provided by [events]. |
479 * Initializes the stream to have only the events provided by a | 839 * |
480 * [_PendingEvents]. | 840 * A [_PendingEvents] implementation provides events that are handled |
481 * | 841 * by calling [_PendingEvents.handleNext] with the [_StreamImpl]. |
482 * A new [_PendingEvents] must be generated for each listen. | 842 */ |
483 */ | 843 _GeneratedSingleStreamImpl(_PendingEvents events) { |
484 _GeneratedStreamImpl(this._pending); | 844 _pendingEvents = events; |
485 | 845 _setClosed(); // Closed for input since all events are already pending. |
486 StreamSubscription _createSubscription(void onData(T data), | 846 } |
487 void onError(Object error), | 847 |
488 void onDone(), | 848 void _add(T value) { |
489 bool cancelOnError) { | 849 throw new UnsupportedError("Cannot inject events into generated stream"); |
490 _BufferingStreamSubscription<T> subscription = | 850 } |
491 new _BufferingStreamSubscription( | 851 |
492 onData, onError, onDone, cancelOnError); | 852 void _addError(value) { |
493 subscription._setPendingEvents(_pending()); | 853 throw new UnsupportedError("Cannot inject events into generated stream"); |
494 return subscription; | 854 } |
| 855 |
| 856 void _close() { |
| 857 throw new UnsupportedError("Cannot inject events into generated stream"); |
495 } | 858 } |
496 } | 859 } |
497 | 860 |
498 | 861 |
499 /** Pending events object that gets its events from an [Iterable]. */ | 862 /** Pending events object that gets its events from an [Iterable]. */ |
500 class _IterablePendingEvents<T> extends _PendingEvents { | 863 class _IterablePendingEvents<T> extends _PendingEvents { |
501 // The iterator providing data for data events. | 864 final Iterator<T> _iterator; |
502 // Set to null when iteration has completed. | 865 /** |
503 Iterator<T> _iterator; | 866 * Whether there are no more events to be sent. |
| 867 * |
| 868 * This starts out as [:false:] since there is always at least |
| 869 * a 'done' event to be sent. |
| 870 */ |
| 871 bool _isDone = false; |
504 | 872 |
505 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | 873 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
506 | 874 |
507 bool get isEmpty => _iterator == null; | 875 bool get isEmpty => _isDone; |
508 | 876 |
509 void handleNext(_EventDispatch dispatch) { | 877 void handleNext(_StreamImpl<T> stream) { |
510 if (_iterator == null) { | 878 if (_isDone) throw new StateError("No events pending."); |
511 throw new StateError("No events pending."); | |
512 } | |
513 // Send one event per call to moveNext. | |
514 // If moveNext returns true, send the current element as data. | |
515 // If moveNext returns false, send a done event and clear the _iterator. | |
516 // If moveNext throws an error, send an error and clear the _iterator. | |
517 // After an error, no further events will be sent. | |
518 bool isDone; | |
519 try { | 879 try { |
520 isDone = !_iterator.moveNext(); | 880 _isDone = !_iterator.moveNext(); |
| 881 if (!_isDone) { |
| 882 stream._sendData(_iterator.current); |
| 883 } else { |
| 884 stream._sendDone(); |
| 885 } |
521 } catch (e, s) { | 886 } catch (e, s) { |
522 _iterator = null; | 887 stream._sendError(_asyncError(e, s)); |
523 dispatch._sendError(_asyncError(e, s)); | 888 stream._sendDone(); |
524 return; | 889 _isDone = true; |
525 } | 890 } |
526 if (!isDone) { | |
527 dispatch._sendData(_iterator.current); | |
528 } else { | |
529 _iterator = null; | |
530 dispatch._sendDone(); | |
531 } | |
532 } | |
533 | |
534 void clear() { | |
535 if (isScheduled) cancelSchedule(); | |
536 _iterator = null; | |
537 } | 891 } |
538 } | 892 } |
539 | 893 |
540 | 894 |
| 895 /** |
| 896 * The subscription class that the [StreamController] uses. |
| 897 * |
| 898 * The [_StreamImpl.createSubscription] method should |
| 899 * create an object of this type, or another subclass of [_StreamListener]. |
| 900 * A subclass of [_StreamImpl] can specify which subclass |
| 901 * of [_StreamSubscriptionImpl] it uses by overriding |
| 902 * [_StreamImpl.createSubscription]. |
| 903 * |
| 904 * The subscription is in one of three states: |
| 905 * * Subscribed. |
| 906 * * Paused-and-subscribed. |
| 907 * * Unsubscribed. |
| 908 * Unsubscribing also resumes any pauses started by the subscription. |
| 909 */ |
| 910 class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| 911 implements StreamSubscription<T> { |
| 912 final bool _cancelOnError; |
| 913 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 914 // checked mode. http://dartbug.com/7733 |
| 915 var /* _DataHandler<T> */ _onData; |
| 916 _ErrorHandler _onError; |
| 917 _DoneHandler _onDone; |
| 918 _StreamSubscriptionImpl(_StreamImpl source, |
| 919 this._onData, |
| 920 this._onError, |
| 921 this._onDone, |
| 922 this._cancelOnError) : super(source); |
| 923 |
| 924 void onData(void handleData(T event)) { |
| 925 if (handleData == null) handleData = _nullDataHandler; |
| 926 _onData = handleData; |
| 927 } |
| 928 |
| 929 void onError(void handleError(error)) { |
| 930 if (handleError == null) handleError = _nullErrorHandler; |
| 931 _onError = handleError; |
| 932 } |
| 933 |
| 934 void onDone(void handleDone()) { |
| 935 if (handleDone == null) handleDone = _nullDoneHandler; |
| 936 _onDone = handleDone; |
| 937 } |
| 938 |
| 939 void _sendData(T data) { |
| 940 _onData(data); |
| 941 } |
| 942 |
| 943 void _sendError(error) { |
| 944 _onError(error); |
| 945 if (_cancelOnError) _source._cancel(this); |
| 946 } |
| 947 |
| 948 void _sendDone() { |
| 949 _onDone(); |
| 950 } |
| 951 |
| 952 void cancel() { |
| 953 if (!_isSubscribed) return; |
| 954 _source._cancel(this); |
| 955 } |
| 956 |
| 957 void pause([Future resumeSignal]) { |
| 958 if (!_isSubscribed) return; |
| 959 _source._pause(this, resumeSignal); |
| 960 } |
| 961 |
| 962 void resume() { |
| 963 if (!_isSubscribed || !isPaused) return; |
| 964 _source._resume(this, false); |
| 965 } |
| 966 |
| 967 Future asFuture([var futureValue]) { |
| 968 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 969 |
| 970 // Overwrite the onDone and onError handlers. |
| 971 onDone(() { result._setValue(futureValue); }); |
| 972 onError((error) { |
| 973 cancel(); |
| 974 result._setError(error); |
| 975 }); |
| 976 |
| 977 return result; |
| 978 } |
| 979 } |
| 980 |
541 // Internal helpers. | 981 // Internal helpers. |
542 | 982 |
543 // Types of the different handlers on a stream. Types used to type fields. | 983 // Types of the different handlers on a stream. Types used to type fields. |
544 typedef void _DataHandler<T>(T value); | 984 typedef void _DataHandler<T>(T value); |
545 typedef void _ErrorHandler(error); | 985 typedef void _ErrorHandler(error); |
546 typedef void _DoneHandler(); | 986 typedef void _DoneHandler(); |
547 | 987 |
548 | 988 |
549 /** Default data handler, does nothing. */ | 989 /** Default data handler, does nothing. */ |
550 void _nullDataHandler(var value) {} | 990 void _nullDataHandler(var value) {} |
551 | 991 |
552 /** Default error handler, reports the error to the global handler. */ | 992 /** Default error handler, reports the error to the global handler. */ |
553 void _nullErrorHandler(error) { | 993 void _nullErrorHandler(error) { |
554 _throwDelayed(error); | 994 _throwDelayed(error); |
555 } | 995 } |
556 | 996 |
557 /** Default done handler, does nothing. */ | 997 /** Default done handler, does nothing. */ |
558 void _nullDoneHandler() {} | 998 void _nullDoneHandler() {} |
559 | 999 |
560 | 1000 |
561 /** A delayed event on a buffering stream subscription. */ | 1001 /** A delayed event on a stream implementation. */ |
562 abstract class _DelayedEvent { | 1002 abstract class _DelayedEvent { |
563 /** Added as a linked list on the [StreamController]. */ | 1003 /** Added as a linked list on the [StreamController]. */ |
564 _DelayedEvent next; | 1004 _DelayedEvent next; |
565 /** Execute the delayed event on the [StreamController]. */ | 1005 /** Execute the delayed event on the [StreamController]. */ |
566 void perform(_EventDispatch dispatch); | 1006 void perform(_StreamImpl stream); |
567 } | 1007 } |
568 | 1008 |
569 /** A delayed data event. */ | 1009 /** A delayed data event. */ |
570 class _DelayedData<T> extends _DelayedEvent{ | 1010 class _DelayedData<T> extends _DelayedEvent{ |
571 final T value; | 1011 final T value; |
572 _DelayedData(this.value); | 1012 _DelayedData(this.value); |
573 void perform(_EventDispatch<T> dispatch) { | 1013 void perform(_StreamImpl<T> stream) { |
574 dispatch._sendData(value); | 1014 stream._sendData(value); |
575 } | 1015 } |
576 } | 1016 } |
577 | 1017 |
578 /** A delayed error event. */ | 1018 /** A delayed error event. */ |
579 class _DelayedError extends _DelayedEvent { | 1019 class _DelayedError extends _DelayedEvent { |
580 final error; | 1020 final error; |
581 _DelayedError(this.error); | 1021 _DelayedError(this.error); |
582 void perform(_EventDispatch dispatch) { | 1022 void perform(_StreamImpl stream) { |
583 dispatch._sendError(error); | 1023 stream._sendError(error); |
584 } | 1024 } |
585 } | 1025 } |
586 | 1026 |
587 /** A delayed done event. */ | 1027 /** A delayed done event. */ |
588 class _DelayedDone implements _DelayedEvent { | 1028 class _DelayedDone implements _DelayedEvent { |
589 const _DelayedDone(); | 1029 const _DelayedDone(); |
590 void perform(_EventDispatch dispatch) { | 1030 void perform(_StreamImpl stream) { |
591 dispatch._sendDone(); | 1031 stream._sendDone(); |
592 } | 1032 } |
593 | 1033 |
594 _DelayedEvent get next => null; | 1034 _DelayedEvent get next => null; |
595 | 1035 |
596 void set next(_DelayedEvent _) { | 1036 void set next(_DelayedEvent _) { |
597 throw new StateError("No events after a done."); | 1037 throw new StateError("No events after a done."); |
598 } | 1038 } |
599 } | 1039 } |
600 | 1040 |
601 /** | 1041 /** |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
671 listLast._nextLink = otherNext; | 1111 listLast._nextLink = otherNext; |
672 otherNext._previousLink = listLast; | 1112 otherNext._previousLink = listLast; |
673 _InternalLink otherLast = other._previousLink; | 1113 _InternalLink otherLast = other._previousLink; |
674 list._previousLink = otherLast; | 1114 list._previousLink = otherLast; |
675 otherLast._nextLink = list; | 1115 otherLast._nextLink = list; |
676 // Clean up [other]. | 1116 // Clean up [other]. |
677 other._nextLink = other._previousLink = other; | 1117 other._nextLink = other._previousLink = other; |
678 } | 1118 } |
679 } | 1119 } |
680 | 1120 |
| 1121 /** Abstract type for an internal interface for sending events. */ |
| 1122 abstract class _EventOutputSink<T> { |
| 1123 _sendData(T data); |
| 1124 _sendError(error); |
| 1125 _sendDone(); |
| 1126 } |
| 1127 |
| 1128 abstract class _StreamListener<T> extends _InternalLink |
| 1129 implements _EventOutputSink<T> { |
| 1130 final _StreamImpl _source; |
| 1131 int _state = _LISTENER_UNSUBSCRIBED; |
| 1132 |
| 1133 _StreamListener(this._source); |
| 1134 |
| 1135 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
| 1136 |
| 1137 bool get _isPendingUnsubscribe => |
| 1138 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
| 1139 |
| 1140 bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0; |
| 1141 |
| 1142 /** |
| 1143 * Whether the listener still needs to receive the currently firing event. |
| 1144 * |
| 1145 * The currently firing event is identified by a single bit, which alternates |
| 1146 * between events. The [_state] contains the previously sent event's bit in |
| 1147 * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener |
| 1148 * still need the current event. |
| 1149 */ |
| 1150 bool _needsEvent(int currentEventIdBit) { |
| 1151 int lastEventIdBit = |
| 1152 (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT; |
| 1153 return lastEventIdBit != currentEventIdBit; |
| 1154 } |
| 1155 |
| 1156 /// If a subscriber's "firing bit" doesn't match the stream's firing bit, |
| 1157 /// we are currently firing an event and the subscriber still need to receive |
| 1158 /// the event. |
| 1159 void _toggleEventReceived() { |
| 1160 _state ^= _LISTENER_EVENT_ID; |
| 1161 } |
| 1162 |
| 1163 void _setSubscribed(int eventIdBit) { |
| 1164 assert(eventIdBit == 0 || eventIdBit == 1); |
| 1165 _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT); |
| 1166 } |
| 1167 |
| 1168 void _setPendingUnsubscribe(int currentEventIdBit) { |
| 1169 assert(_isSubscribed); |
| 1170 // Sets the pending unsubscribe, and ensures that the listener |
| 1171 // won't get the current event. |
| 1172 _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID; |
| 1173 _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT; |
| 1174 assert(!_needsEvent(currentEventIdBit)); |
| 1175 } |
| 1176 |
| 1177 /** |
| 1178 * Marks the listener as unsubscibed. |
| 1179 * |
| 1180 * Returns the number of unresumed pauses for the listener. |
| 1181 */ |
| 1182 int _setUnsubscribed() { |
| 1183 assert(_isSubscribed); |
| 1184 int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT; |
| 1185 _state = _LISTENER_UNSUBSCRIBED; |
| 1186 return timesPaused; |
| 1187 } |
| 1188 |
| 1189 void _incrementPauseCount() { |
| 1190 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| 1191 } |
| 1192 |
| 1193 void _decrementPauseCount() { |
| 1194 assert(isPaused); |
| 1195 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| 1196 } |
| 1197 |
| 1198 _sendData(T data); |
| 1199 _sendError(error); |
| 1200 _sendDone(); |
| 1201 } |
| 1202 |
681 /** Superclass for provider of pending events. */ | 1203 /** Superclass for provider of pending events. */ |
682 abstract class _PendingEvents { | 1204 abstract class _PendingEvents { |
683 // No async event has been scheduled. | |
684 static const int _STATE_UNSCHEDULED = 0; | |
685 // An async event has been scheduled to run a function. | |
686 static const int _STATE_SCHEDULED = 1; | |
687 // An async event has been scheduled, but it will do nothing when it runs. | |
688 // Async events can't be preempted. | |
689 static const int _STATE_CANCELED = 3; | |
690 | |
691 /** | 1205 /** |
692 * State of being scheduled. | 1206 * Timer set when pending events are scheduled for execution. |
693 * | 1207 * |
694 * Set to [_STATE_SCHEDULED] when pending events are scheduled for | 1208 * When scheduling pending events for execution in a later cycle, the timer |
695 * async dispatch. Since we can't cancel a [runAsync] call, if schduling | 1209 * is stored here. If pending events are executed earlier than that, e.g., |
696 * is "canceled", the _state is simply set to [_STATE_CANCELED] which will | 1210 * due to a second event in the current cycle, the timer is canceled again. |
697 * make the async code do nothing except resetting [_state]. | |
698 * | |
699 * If events are scheduled while the state is [_STATE_CANCELED], it is | |
700 * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync] | |
701 * is performed. | |
702 */ | 1211 */ |
703 int _state = _STATE_UNSCHEDULED; | 1212 Timer scheduleTimer = null; |
704 | 1213 |
705 bool get isEmpty; | 1214 bool get isEmpty; |
706 | 1215 |
707 bool get isScheduled => _state == _STATE_SCHEDULED; | 1216 bool get isScheduled => scheduleTimer != null; |
708 bool get _eventScheduled => _state >= _STATE_SCHEDULED; | |
709 | 1217 |
710 /** | 1218 void schedule(_StreamImpl stream) { |
711 * Schedule an event to run later. | |
712 * | |
713 * If called more than once, it should be called with the same dispatch as | |
714 * argument each time. It may reuse an earlier argument in some cases. | |
715 */ | |
716 void schedule(_EventDispatch dispatch) { | |
717 if (isScheduled) return; | 1219 if (isScheduled) return; |
718 assert(!isEmpty); | 1220 scheduleTimer = new Timer(Duration.ZERO, () { |
719 if (_eventScheduled) { | 1221 scheduleTimer = null; |
720 assert(_state == _STATE_CANCELED); | 1222 stream._handlePendingEvents(); |
721 _state = _STATE_SCHEDULED; | |
722 return; | |
723 } | |
724 runAsync(() { | |
725 int oldState = _state; | |
726 _state = _STATE_UNSCHEDULED; | |
727 if (oldState == _STATE_CANCELED) return; | |
728 handleNext(dispatch); | |
729 }); | 1223 }); |
730 _state = _STATE_SCHEDULED; | |
731 } | 1224 } |
732 | 1225 |
733 void cancelSchedule() { | 1226 void cancelSchedule() { |
734 if (isScheduled) _state = _STATE_CANCELED; | 1227 assert(isScheduled); |
| 1228 scheduleTimer.cancel(); |
| 1229 scheduleTimer = null; |
735 } | 1230 } |
736 | 1231 |
737 void handleNext(_EventDispatch dispatch); | 1232 void handleNext(_StreamImpl stream); |
738 | |
739 /** Throw away any pending events and cancel scheduled events. */ | |
740 void clear(); | |
741 } | 1233 } |
742 | 1234 |
743 | 1235 |
744 /** Class holding pending events for a [_StreamImpl]. */ | 1236 /** Class holding pending events for a [_StreamImpl]. */ |
745 class _StreamImplEvents extends _PendingEvents { | 1237 class _StreamImplEvents extends _PendingEvents { |
746 /// Single linked list of [_DelayedEvent] objects. | 1238 /// Single linked list of [_DelayedEvent] objects. |
747 _DelayedEvent firstPendingEvent = null; | 1239 _DelayedEvent firstPendingEvent = null; |
748 /// Last element in the list of pending events. New events are added after it. | 1240 /// Last element in the list of pending events. New events are added after it. |
749 _DelayedEvent lastPendingEvent = null; | 1241 _DelayedEvent lastPendingEvent = null; |
750 | 1242 |
751 bool get isEmpty => lastPendingEvent == null; | 1243 bool get isEmpty => lastPendingEvent == null; |
752 | 1244 |
| 1245 bool get isScheduled => scheduleTimer != null; |
| 1246 |
753 void add(_DelayedEvent event) { | 1247 void add(_DelayedEvent event) { |
754 if (lastPendingEvent == null) { | 1248 if (lastPendingEvent == null) { |
755 firstPendingEvent = lastPendingEvent = event; | 1249 firstPendingEvent = lastPendingEvent = event; |
756 } else { | 1250 } else { |
757 lastPendingEvent = lastPendingEvent.next = event; | 1251 lastPendingEvent = lastPendingEvent.next = event; |
758 } | 1252 } |
759 } | 1253 } |
760 | 1254 |
761 void handleNext(_EventDispatch dispatch) { | 1255 void handleNext(_StreamImpl stream) { |
762 assert(!isScheduled); | 1256 assert(!isScheduled); |
763 _DelayedEvent event = firstPendingEvent; | 1257 _DelayedEvent event = firstPendingEvent; |
764 firstPendingEvent = event.next; | 1258 firstPendingEvent = event.next; |
765 if (firstPendingEvent == null) { | 1259 if (firstPendingEvent == null) { |
766 lastPendingEvent = null; | 1260 lastPendingEvent = null; |
767 } | 1261 } |
768 event.perform(dispatch); | 1262 event.perform(stream); |
769 } | |
770 | |
771 void clear() { | |
772 if (isScheduled) cancelSchedule(); | |
773 firstPendingEvent = lastPendingEvent = null; | |
774 } | |
775 } | |
776 | |
777 class _MultiplexerLinkedList { | |
778 _MultiplexerLinkedList _next; | |
779 _MultiplexerLinkedList _previous; | |
780 | |
781 void _unlink() { | |
782 _previous._next = _next; | |
783 _next._previous = _previous; | |
784 _next = _previous = this; | |
785 } | |
786 | |
787 void _insertBefore(_MultiplexerLinkedList newNext) { | |
788 _MultiplexerLinkedList newPrevious = newNext._previous; | |
789 newPrevious._next = this; | |
790 newNext._previous = _previous; | |
791 _previous._next = newNext; | |
792 _previous = newPrevious; | |
793 } | |
794 } | |
795 | |
796 /** | |
797 * A subscription used by [_SingleStreamMultiplexer]. | |
798 * | |
799 * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple | |
800 * listeners at a time. It is used to implement [Stream.asBroadcastStream]. | |
801 * | |
802 * It is itself listening to another stream for events, and it forwards all | |
803 * events to all of its simultanous listeners. | |
804 * | |
805 * The listeners are [_MultiplexerSubscription]s and are kept as a linked list. | |
806 */ | |
807 // TODO(lrn): Change "implements" to "with" when automatic mixin constructors | |
808 // are implemented. | |
809 class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> | |
810 implements _MultiplexerLinkedList { | |
811 static const int _STATE_NOT_LISTENING = 0; | |
812 // Bit that alternates between event firings. If bit matches the one currently | |
813 // firing, the subscription will not be notified. | |
814 static const int _STATE_EVENT_ID_BIT = 1; | |
815 // Whether the subscription is listening at all. This should be set while | |
816 // it is part of the linked list of listeners of a multiplexer stream. | |
817 static const int _STATE_LISTENING = 2; | |
818 // State bit set while firing an event. | |
819 static const int _STATE_IS_FIRING = 4; | |
820 // Bit set if a subscription is canceled while it's firing (the | |
821 // [_STATE_IS_FIRING] bit is set). | |
822 // If the subscription is canceled while firing, it is not removed from the | |
823 // linked list immediately (to avoid breaking iteration), but is instead | |
824 // removed after it is done firing. | |
825 static const int _STATE_REMOVE_AFTER_FIRING = 8; | |
826 | |
827 // Firing state. | |
828 int _multiplexState; | |
829 | |
830 _SingleStreamMultiplexer _source; | |
831 | |
832 _MultiplexerSubscription(this._source, | |
833 void onData(T data), | |
834 void onError(Object error), | |
835 void onDone(), | |
836 bool cancelOnError, | |
837 int nextEventId) | |
838 : _multiplexState = _STATE_LISTENING | nextEventId, | |
839 super(onData, onError, onDone, cancelOnError) { | |
840 _next = _previous = this; | |
841 } | |
842 | |
843 // Mixin workaround. | |
844 _MultiplexerLinkedList _next; | |
845 _MultiplexerLinkedList _previous; | |
846 | |
847 void _unlink() { | |
848 _previous._next = _next; | |
849 _next._previous = _previous; | |
850 _next = _previous = this; | |
851 } | |
852 | |
853 void _insertBefore(_MultiplexerLinkedList newNext) { | |
854 _MultiplexerLinkedList newPrevious = newNext._previous; | |
855 newPrevious._next = this; | |
856 newNext._previous = _previous; | |
857 _previous._next = newNext; | |
858 _previous = newPrevious; | |
859 } | |
860 // End mixin. | |
861 | |
862 bool get _isListening => _multiplexState >= _STATE_LISTENING; | |
863 bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; | |
864 bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; | |
865 int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; | |
866 | |
867 void _setRemoveAfterFiring() { | |
868 assert(_isFiring); | |
869 _multiplexState |= _STATE_REMOVE_AFTER_FIRING; | |
870 } | |
871 | |
872 void _startFiring() { | |
873 assert(!_isFiring); | |
874 _multiplexState |= _STATE_IS_FIRING; | |
875 } | |
876 | |
877 /// Marks listener as no longer firing, and toggles its event id. | |
878 void _endFiring() { | |
879 assert(_isFiring); | |
880 _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); | |
881 } | |
882 | |
883 void _setNotListening() { | |
884 assert(_isListening); | |
885 _multiplexState = _STATE_NOT_LISTENING; | |
886 } | |
887 | |
888 void _onCancel() { | |
889 assert(_isListening); | |
890 _source._removeListener(this); | |
891 } | |
892 } | |
893 | |
894 /** | |
895 * A stream that sends events from another stream to multiple listeners. | |
896 * | |
897 * This is used to implement [Stream.asBroadcastStream]. | |
898 * | |
899 * This stream allows listening more than once. | |
900 * When the first listener is added, it starts listening on its source | |
901 * stream for events. All events from the source stream are sent to all | |
902 * active listeners. The listeners handle their own buffering. | |
903 * When the last listener cancels, the source stream subscription is also | |
904 * canceled, and no further listening is possible. | |
905 */ | |
906 // TODO(lrn): change "implements" to "with" when the VM supports it. | |
907 class _SingleStreamMultiplexer<T> extends Stream<T> | |
908 implements _MultiplexerLinkedList, | |
909 _EventDispatch<T> { | |
910 final Stream<T> _source; | |
911 StreamSubscription<T> _subscription; | |
912 // Alternates between zero and one for each event fired. | |
913 // Listeners are initialized with the next event's id, and will | |
914 // only be notified if they match the event being fired. | |
915 // That way listeners added during event firing will not receive | |
916 // the current event. | |
917 int _eventId = 0; | |
918 | |
919 bool _isFiring = false; | |
920 | |
921 // Remember events added while firing. | |
922 _StreamImplEvents _pending; | |
923 | |
924 _SingleStreamMultiplexer(this._source) { | |
925 _next = _previous = this; | |
926 } | |
927 | |
928 bool get _hasPending => _pending != null && !_pending.isEmpty; | |
929 | |
930 // Should be mixin. | |
931 _MultiplexerLinkedList _next; | |
932 _MultiplexerLinkedList _previous; | |
933 | |
934 void _unlink() { | |
935 _previous._next = _next; | |
936 _next._previous = _previous; | |
937 _next = _previous = this; | |
938 } | |
939 | |
940 void _insertBefore(_MultiplexerLinkedList newNext) { | |
941 _MultiplexerLinkedList newPrevious = newNext._previous; | |
942 newPrevious._next = this; | |
943 newNext._previous = _previous; | |
944 _previous._next = newNext; | |
945 _previous = newPrevious; | |
946 } | |
947 // End of mixin. | |
948 | |
949 StreamSubscription<T> listen(void onData(T data), | |
950 { void onError(Object error), | |
951 void onDone(), | |
952 bool cancelOnError }) { | |
953 if (onData == null) onData = _nullDataHandler; | |
954 if (onError == null) onError = _nullErrorHandler; | |
955 if (onDone == null) onDone = _nullDoneHandler; | |
956 cancelOnError = identical(true, cancelOnError); | |
957 _MultiplexerSubscription subscription = | |
958 new _MultiplexerSubscription(this, onData, onError, onDone, | |
959 cancelOnError, _eventId); | |
960 if (_subscription == null) { | |
961 _subscription = _source.listen(_add, onError: _addError, onDone: _close); | |
962 } | |
963 subscription._insertBefore(this); | |
964 return subscription; | |
965 } | |
966 | |
967 /** Called by [_MultiplexerSubscription.remove]. */ | |
968 void _removeListener(_MultiplexerSubscription listener) { | |
969 if (listener._isFiring) { | |
970 listener._setRemoveAfterFiring(); | |
971 } else { | |
972 _unlinkListener(listener); | |
973 } | |
974 } | |
975 | |
976 /** Remove a listener and close the subscription after the last one. */ | |
977 void _unlinkListener(_MultiplexerSubscription listener) { | |
978 listener._setNotListening(); | |
979 listener._unlink(); | |
980 if (identical(_next, this)) { | |
981 // Last listener removed. | |
982 _cancel(); | |
983 } | |
984 } | |
985 | |
986 void _cancel() { | |
987 StreamSubscription subscription = _subscription; | |
988 _subscription = null; | |
989 subscription.cancel(); | |
990 if (_pending != null) _pending.cancelSchedule(); | |
991 } | |
992 | |
993 void _forEachListener(void action(_MultiplexerSubscription listener)) { | |
994 int eventId = _eventId; | |
995 _eventId ^= 1; | |
996 _isFiring = true; | |
997 _MultiplexerLinkedList entry = _next; | |
998 // Call each listener in order. A listener can be removed during any | |
999 // other listener's event. During its own event it will only be marked | |
1000 // as "to be removed", and it will be handled after the event is done. | |
1001 while (!identical(entry, this)) { | |
1002 _MultiplexerSubscription listener = entry; | |
1003 if (listener._eventId == eventId) { | |
1004 listener._startFiring(); | |
1005 action(listener); | |
1006 listener._endFiring(); // Also toggles the event id. | |
1007 } | |
1008 entry = listener._next; | |
1009 if (listener._removeAfterFiring) { | |
1010 _unlinkListener(listener); | |
1011 } | |
1012 } | |
1013 _isFiring = false; | |
1014 } | |
1015 | |
1016 void _add(T data) { | |
1017 if (_isFiring || _hasPending) { | |
1018 _StreamImplEvents pending = _pending; | |
1019 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
1020 pending.add(new _DelayedData(data)); | |
1021 } else { | |
1022 _sendData(data); | |
1023 } | |
1024 } | |
1025 | |
1026 void _addError(Object error) { | |
1027 if (_isFiring || _hasPending) { | |
1028 _StreamImplEvents pending = _pending; | |
1029 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
1030 pending.add(new _DelayedError(error)); | |
1031 } else { | |
1032 _sendError(error); | |
1033 } | |
1034 } | |
1035 | |
1036 void _close() { | |
1037 if (_isFiring || _hasPending) { | |
1038 _StreamImplEvents pending = _pending; | |
1039 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
1040 pending.add(const _DelayedDone()); | |
1041 } else { | |
1042 _sendDone(); | |
1043 } | |
1044 } | |
1045 | |
1046 void _sendData(T data) { | |
1047 _forEachListener((_MultiplexerSubscription listener) { | |
1048 listener._add(data); | |
1049 }); | |
1050 if (_hasPending) { | |
1051 _pending.schedule(this); | |
1052 } | |
1053 } | |
1054 | |
1055 void _sendError(Object error) { | |
1056 _forEachListener((_MultiplexerSubscription listener) { | |
1057 listener._addError(error); | |
1058 }); | |
1059 if (_hasPending) { | |
1060 _pending.schedule(this); | |
1061 } | |
1062 } | |
1063 | |
1064 void _sendDone() { | |
1065 _forEachListener((_MultiplexerSubscription listener) { | |
1066 listener._setRemoveAfterFiring(); | |
1067 listener._close(); | |
1068 }); | |
1069 } | 1263 } |
1070 } | 1264 } |
1071 | 1265 |
1072 | 1266 |
1073 /** | 1267 class _DoneSubscription<T> implements StreamSubscription<T> { |
1074 * Simple implementation of [StreamIterator]. | 1268 _DoneHandler _handler; |
1075 */ | 1269 Timer _timer; |
1076 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 1270 int _pauseCount = 0; |
1077 // Internal state of the stream iterator. | |
1078 // At any time, it is in one of these states. | |
1079 // The interpretation of the [_futureOrPrefecth] field depends on the state. | |
1080 // In _STATE_MOVING, the _data field holds the most recently returned | |
1081 // future. | |
1082 // When in one of the _STATE_EXTRA_* states, the it may hold the | |
1083 // next data/error object, and the subscription is paused. | |
1084 | 1271 |
1085 /// The simple state where [_data] holds the data to return, and [moveNext] | 1272 _DoneSubscription(this._handler) { |
1086 /// is allowed. The subscription is actively listening. | 1273 _delayDone(); |
1087 static const int _STATE_FOUND = 0; | |
1088 /// State set after [moveNext] has returned false or an error, | |
1089 /// or after calling [cancel]. The subscription is always canceled. | |
1090 static const int _STATE_DONE = 1; | |
1091 /// State set after calling [moveNext], but before its returned future has | |
1092 /// completed. Calling [moveNext] again is not allowed in this state. | |
1093 /// The subscription is actively listening. | |
1094 static const int _STATE_MOVING = 2; | |
1095 /// States set when another event occurs while in _STATE_FOUND. | |
1096 /// This extra overflow event is cached until the next call to [moveNext], | |
1097 /// which will complete as if it received the event normally. | |
1098 /// The subscription is paused in these states, so we only ever get one | |
1099 /// event too many. | |
1100 static const int _STATE_EXTRA_DATA = 3; | |
1101 static const int _STATE_EXTRA_ERROR = 4; | |
1102 static const int _STATE_EXTRA_DONE = 5; | |
1103 | |
1104 /// Subscription being listened to. | |
1105 StreamSubscription _subscription; | |
1106 | |
1107 /// The current element represented by the most recent call to moveNext. | |
1108 /// | |
1109 /// Is null between the time moveNext is called and its future completes. | |
1110 T _current = null; | |
1111 | |
1112 /// The future returned by the most recent call to [moveNext]. | |
1113 /// | |
1114 /// Also used to store the next value/error in case the stream provides an | |
1115 /// event before [moveNext] is called again. In that case, the stream will | |
1116 /// be paused to prevent further events. | |
1117 var _futureOrPrefetch = null; | |
1118 | |
1119 /// The current state. | |
1120 int _state = _STATE_FOUND; | |
1121 | |
1122 _StreamIteratorImpl(final Stream<T> stream) { | |
1123 _subscription = stream.listen(_onData, | |
1124 onError: _onError, | |
1125 onDone: _onDone, | |
1126 cancelOnError: true); | |
1127 } | 1274 } |
1128 | 1275 |
1129 T get current => _current; | 1276 void _delayDone() { |
| 1277 assert(_timer == null && _pauseCount == 0); |
| 1278 _timer = new Timer(Duration.ZERO, () { |
| 1279 if (_handler != null) _handler(); |
| 1280 }); |
| 1281 } |
1130 | 1282 |
1131 Future<bool> moveNext() { | 1283 bool get _isComplete => _timer == null && _pauseCount == 0; |
1132 if (_state == _STATE_DONE) { | 1284 |
1133 return new _FutureImpl<bool>.immediate(false); | 1285 void onData(void handleAction(T value)) {} |
| 1286 |
| 1287 void onError(void handleError(error)) {} |
| 1288 |
| 1289 void onDone(void handleDone()) { |
| 1290 _handler = handleDone; |
| 1291 } |
| 1292 |
| 1293 void pause([Future signal]) { |
| 1294 if (_isComplete) return; |
| 1295 if (_timer != null) { |
| 1296 _timer.cancel(); |
| 1297 _timer = null; |
1134 } | 1298 } |
1135 if (_state == _STATE_MOVING) { | 1299 _pauseCount++; |
1136 throw new StateError("Already waiting for next."); | 1300 if (signal != null) signal.whenComplete(resume); |
| 1301 } |
| 1302 |
| 1303 void resume() { |
| 1304 if (_isComplete) return; |
| 1305 if (_pauseCount == 0) return; |
| 1306 _pauseCount--; |
| 1307 if (_pauseCount == 0) { |
| 1308 _delayDone(); |
1137 } | 1309 } |
1138 if (_state == _STATE_FOUND) { | 1310 } |
1139 _state = _STATE_MOVING; | 1311 |
1140 _futureOrPrefetch = new _FutureImpl<bool>(); | 1312 bool get isPaused => _pauseCount > 0; |
1141 return _futureOrPrefetch; | 1313 |
| 1314 void cancel() { |
| 1315 if (_isComplete) return; |
| 1316 if (_timer != null) { |
| 1317 _timer.cancel(); |
| 1318 _timer = null; |
| 1319 } |
| 1320 _pauseCount = 0; |
| 1321 } |
| 1322 |
| 1323 Future asFuture([var futureValue]) { |
| 1324 // TODO(floitsch): share more code. |
| 1325 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 1326 |
| 1327 // Overwrite the onDone and onError handlers. |
| 1328 onDone(() { result._setValue(futureValue); }); |
| 1329 onError((error) { |
| 1330 cancel(); |
| 1331 result._setError(error); |
| 1332 }); |
| 1333 |
| 1334 return result; |
| 1335 } |
| 1336 } |
| 1337 |
| 1338 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
| 1339 final Stream<T> _source; |
| 1340 StreamSubscription<T> _subscription; |
| 1341 |
| 1342 _SingleStreamMultiplexer(this._source); |
| 1343 |
| 1344 void _callOnPauseStateChange() { |
| 1345 if (_isPaused) { |
| 1346 if (_subscription != null) { |
| 1347 _subscription.pause(); |
| 1348 } |
1142 } else { | 1349 } else { |
1143 assert(_state >= _STATE_EXTRA_DATA); | 1350 if (_subscription != null) { |
1144 switch (_state) { | 1351 _subscription.resume(); |
1145 case _STATE_EXTRA_DATA: | |
1146 _state = _STATE_FOUND; | |
1147 _current = _futureOrPrefetch; | |
1148 _futureOrPrefetch = null; | |
1149 _subscription.resume(); | |
1150 return new _FutureImpl<bool>.immediate(true); | |
1151 case _STATE_EXTRA_ERROR: | |
1152 Object prefetch = _futureOrPrefetch; | |
1153 _clear(); | |
1154 return new _FutureImpl<bool>.immediateError(prefetch); | |
1155 case _STATE_EXTRA_DONE: | |
1156 _clear(); | |
1157 return new _FutureImpl<bool>.immediate(false); | |
1158 } | 1352 } |
1159 } | 1353 } |
1160 } | 1354 } |
1161 | 1355 |
1162 /** Clears up the internal state when the iterator ends. */ | 1356 /** |
1163 void _clear() { | 1357 * Subscribe or unsubscribe on [_source] depending on whether |
1164 _subscription = null; | 1358 * [_stream] has subscribers. |
1165 _futureOrPrefetch = null; | 1359 */ |
1166 _current = null; | 1360 void _onSubscriptionStateChange() { |
1167 _state = _STATE_DONE; | 1361 if (_hasListener) { |
1168 } | 1362 assert(_subscription == null); |
1169 | 1363 _subscription = _source.listen(this._add, |
1170 void cancel() { | 1364 onError: this._addError, |
1171 StreamSubscription subscription = _subscription; | 1365 onDone: this._close); |
1172 if (_state == _STATE_MOVING) { | |
1173 _FutureImpl<bool> hasNext = _futureOrPrefetch; | |
1174 _clear(); | |
1175 hasNext._setValue(false); | |
1176 } else { | 1366 } else { |
1177 _clear(); | 1367 // TODO(lrn): Check why this can happen. |
| 1368 if (_subscription == null) return; |
| 1369 _subscription.cancel(); |
| 1370 _subscription = null; |
1178 } | 1371 } |
1179 subscription.cancel(); | |
1180 } | |
1181 | |
1182 void _onData(T data) { | |
1183 if (_state == _STATE_MOVING) { | |
1184 _current = data; | |
1185 _FutureImpl<bool> hasNext = _futureOrPrefetch; | |
1186 _futureOrPrefetch = null; | |
1187 _state = _STATE_FOUND; | |
1188 hasNext._setValue(true); | |
1189 return; | |
1190 } | |
1191 _subscription.pause(); | |
1192 assert(_futureOrPrefetch == null); | |
1193 _futureOrPrefetch = data; | |
1194 _state = _STATE_EXTRA_DATA; | |
1195 } | |
1196 | |
1197 void _onError(Object error) { | |
1198 if (_state == _STATE_MOVING) { | |
1199 _FutureImpl<bool> hasNext = _futureOrPrefetch; | |
1200 // We have cancelOnError: true, so the subscription is canceled. | |
1201 _clear(); | |
1202 hasNext._setError(error); | |
1203 return; | |
1204 } | |
1205 _subscription.pause(); | |
1206 assert(_futureOrPrefetch == null); | |
1207 _futureOrPrefetch = error; | |
1208 _state = _STATE_EXTRA_ERROR; | |
1209 } | |
1210 | |
1211 void _onDone() { | |
1212 if (_state == _STATE_MOVING) { | |
1213 _FutureImpl<bool> hasNext = _futureOrPrefetch; | |
1214 _clear(); | |
1215 hasNext._setValue(false); | |
1216 return; | |
1217 } | |
1218 _subscription.pause(); | |
1219 _futureOrPrefetch = null; | |
1220 _state = _STATE_EXTRA_DONE; | |
1221 } | 1372 } |
1222 } | 1373 } |
OLD | NEW |