OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // States shared by single/multi stream implementations. | |
8 | |
9 // Completion state of the stream. | |
10 /// Initial and default state where the stream can receive and send events. | |
11 const int _STREAM_OPEN = 0; | |
12 /// The stream has received a request to complete, but hasn't done so yet. | |
13 /// No further events can be added to the stream. | |
14 const int _STREAM_CLOSED = 1; | |
15 /// The stream has completed and will no longer receive or send events. | |
16 /// Also counts as closed. The stream must not be paused when it's completed. | |
17 /// Always used in conjunction with [_STREAM_CLOSED]. | |
18 const int _STREAM_COMPLETE = 2; | |
19 | |
20 /// Bit that alternates between events, and listeners are updated to the | |
21 /// current value when they are notified of the event. | |
22 const int _STREAM_EVENT_ID = 4; | |
23 const int _STREAM_EVENT_ID_SHIFT = 2; | |
24 | |
25 // The activity state of the stream: What is it currently doing. | |
26 /// Bit set while firing and clear while not. | |
27 const int _STREAM_FIRING = 8; | |
28 /// Bit set while calling a pause-state or subscription-state change callback. | |
29 const int _STREAM_CALLBACK = 16; | |
30 | |
31 // The pause state of the stream. | |
32 /// Bit set when resuming with pending events. Cleared after all pending events | |
33 /// have been transmitted. Means that the controller still considers the | |
34 /// stream paused, even if the listener doesn't. | |
35 const int _STREAM_PENDING_RESUME = 32; | |
36 /// The count of times a stream has paused is stored in the | |
37 /// state, shifted by this amount. | |
38 const int _STREAM_PAUSE_COUNT_SHIFT = 6; | |
39 | |
40 // States for listeners. | |
41 | |
42 /// The listener is currently not subscribed to its source stream. | |
43 const int _LISTENER_UNSUBSCRIBED = 0; | |
44 /// The listener is actively subscribed to its source stream. | |
45 const int _LISTENER_SUBSCRIBED = 1; | |
46 /// The listener is subscribed until it has been notified of the current event. | |
47 /// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED]. | |
48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2; | |
49 | |
50 /// Bit that contains the last sent event's "id bit". | |
51 const int _LISTENER_EVENT_ID = 4; | |
52 const int _LISTENER_EVENT_ID_SHIFT = 2; | |
53 | |
54 /// The count of times a listener has paused is stored in the | |
55 /// state, shifted by this amount. | |
56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3; | |
57 | |
58 /** Throws the given error in the next cycle. */ | 7 /** Throws the given error in the next cycle. */ |
59 _throwDelayed(var error, [Object stackTrace]) { | 8 _throwDelayed(var error, [Object stackTrace]) { |
60 // We are going to reach the top-level here, but there might be a global | 9 // We are going to reach the top-level here, but there might be a global |
61 // exception handler. This means that we shouldn't print the stack trace. | 10 // exception handler. This means that we shouldn't print the stack trace. |
62 // TODO(floitsch): Find better solution that doesn't print the stack trace | 11 // TODO(floitsch): Find better solution that doesn't print the stack trace |
63 // if there is a global exception handler. | 12 // if there is a global exception handler. |
64 runAsync(() { | 13 runAsync(() { |
65 if (stackTrace != null) print(stackTrace); | 14 if (stackTrace != null) print(stackTrace); |
66 var trace = getAttachedStackTrace(error); | 15 var trace = getAttachedStackTrace(error); |
67 if (trace != null && trace != stackTrace) print(trace); | 16 if (trace != null && trace != stackTrace) print(trace); |
68 throw error; | 17 throw error; |
69 }); | 18 }); |
70 } | 19 } |
71 | 20 |
21 /** Abstract and private interface for a place to put events. */ | |
22 abstract class _EventSink<T> { | |
23 void _add(T data); | |
24 void _addError(Object error); | |
25 void _close(); | |
26 } | |
27 | |
28 /** | |
29 * Abstract and private interface for a place to send events. | |
30 * | |
31 * Used by event buffering to finally dispatch the pending event, where | |
32 * [_EventSink] is where the event first enters the stream subscription, | |
33 * and may yet be buffered. | |
34 */ | |
35 abstract class _EventDispatch<T> { | |
36 void _sendData(T data); | |
37 void _sendError(Object error); | |
38 void _sendDone(); | |
39 } | |
40 | |
41 /** | |
42 * Default implementation of stream subscription of buffering events. | |
43 * | |
44 * The only public methods are those of [StreamSubscription], so instances of | |
45 * [_BufferingStreamSubscription] can be returned directly as a | |
46 * [StreamSubscription] without exposing internal functionality. | |
47 * | |
48 * The [StreamController] is a public facing version of [Stream] and this class, | |
49 * with some methods made public. | |
50 * | |
51 * The user interface of [_BufferingStreamSubscription] are the following | |
52 * methods: | |
53 * * [_add]: Add a data event to the stream. | |
54 * * [_addError]: Add an error event to the stream. | |
55 * * [_close]: Request to close the stream. | |
56 * * [_onCancel]: Called when the subscription stops listening. | |
57 * * [_onPause]: Called when the subscription wants the event source to pause. | |
58 * * [_onResume]: Called when allowing new events after a pause. | |
59 * The user should not add new eventswhen the subscription requests a paused, | |
floitsch
2013/05/22 16:26:29
space missing.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
60 * but if it happens anyway, the subscription will enqueue the events just as | |
61 * when new events arrive while still firing an old event. | |
62 */ | |
63 class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | |
64 _EventSink<T>, | |
65 _EventDispatch<T> { | |
66 /** The `cancelOnError` flag from the `listen` call. */ | |
67 static const int _STATE_CANCEL_ON_ERROR = 1; | |
68 /** | |
69 * Whether the "done" event has been received. | |
70 * No further events are accepted after this. | |
71 */ | |
72 static const int _STATE_CLOSED = 2; | |
73 /** | |
74 * Set if the input has been asked not to send events. | |
75 * | |
76 * This is not the same as being paused, since the input will remain paused | |
77 * after a call to [resume] if there are pending events. | |
78 */ | |
79 static const int _STATE_INPUT_PAUSED = 4; | |
80 /** | |
81 * Whether the subscription has been cancelled. | |
82 * | |
83 * Set by calling [cancel], or by handling a "done" event, or an "error" event | |
84 * when `cancelOnError` is true. | |
85 */ | |
86 static const int _STATE_CANCELLED = 8; | |
floitsch
2013/05/22 16:26:29
_STATE_CANCELED
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Bowdlerizing constants! :(
| |
87 static const int _STATE_IN_CALLBACK = 16; | |
88 static const int _STATE_HAS_PENDING = 32; | |
89 static const int _STATE_PAUSE_COUNT = 64; | |
90 static const int _STATE_PAUSE_COUNT_SHIFT = 6; | |
91 | |
92 /** Event handlers provided in constructor. */ | |
93 _DataHandler<T> _onData; | |
94 _ErrorHandler _onError; | |
95 _DoneHandler _onDone; | |
96 | |
97 /** Bit vector based on state-constants above. */ | |
98 int _state; | |
99 | |
100 /** | |
101 * Queue of pending events. | |
102 * | |
103 * Is created when necessary, or set in constructor for preconfigured events. | |
104 */ | |
105 _PendingEvents _pending; | |
106 | |
107 _BufferingStreamSubscription(this._onData, | |
108 this._onError, | |
109 this._onDone, | |
110 bool cancelOnError, | |
111 this._pending) | |
112 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | |
113 assert(_onData != null); | |
114 assert(_onError != null); | |
115 assert(_onDone != null); | |
116 if (_pending != null && !_pending.isEmpty) { | |
117 _state |= _STATE_HAS_PENDING; | |
118 _pending.schedule(this); | |
119 } | |
120 } | |
121 | |
122 // StreamSubscription interface. | |
123 | |
124 void onData(void handleData(T event)) { | |
125 if (handleData == null) handleData = _nullDataHandler; | |
126 _onData = handleData; | |
127 } | |
128 | |
129 void onError(void handleError(error)) { | |
130 if (handleError == null) handleError = _nullErrorHandler; | |
131 _onError = handleError; | |
132 } | |
133 | |
134 void onDone(void handleDone()) { | |
135 if (handleDone == null) handleDone = _nullDoneHandler; | |
136 _onDone = handleDone; | |
137 } | |
138 | |
139 void pause([Future resumeSignal]) { | |
140 if (_cancelled) return; | |
141 bool wasPaused = _isPaused; | |
142 bool wasInputPaused = _isInputPaused; | |
143 // Increment pause count and mark input paused (if it isn't already). | |
144 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
145 if (resumeSignal != null) resumeSignal.whenComplete(resume); | |
146 if (!wasPaused && _pending != null) _pending.cancelSchedule(); | |
147 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); | |
148 } | |
149 | |
150 void resume() { | |
151 if (_cancelled) return; | |
152 if (_isPaused) { | |
153 _decrementPauseCount(); | |
154 if (!_isPaused) { | |
155 if (_hasPending && !_pending.isEmpty) { | |
156 // Input is still paused. | |
157 _pending.schedule(this); | |
158 } else { | |
159 assert(_mayResumeInput); | |
160 _state &= ~_STATE_INPUT_PAUSED; | |
161 if (!_inCallback) _guardCallback(_onResume); | |
162 } | |
163 } | |
164 } | |
165 } | |
166 | |
167 void cancel() { | |
168 if (_cancelled) return; | |
169 _cancel(); | |
170 if (!_inCallback) { | |
171 // otherwise checkState will be called after firing or callback completes. | |
172 _state |= _STATE_IN_CALLBACK; | |
173 try { | |
174 _onCancel(); | |
175 } catch (e, s) { | |
176 print("BAD THROW: \n$s"); | |
floitsch
2013/05/22 16:26:29
So _onCancel is not allowed to throw? Should this
Lasse Reichstein Nielsen
2013/05/24 06:02:49
_onCancel is an internal method only. It should ne
| |
177 } | |
178 _state &= ~_STATE_IN_CALLBACK; | |
179 } | |
180 } | |
181 | |
182 Future asFuture([var futureValue]) { | |
183 _FutureImpl<T> result = new _FutureImpl<T>(); | |
184 | |
185 // Overwrite the onDone and onError handlers. | |
186 _onDone = () { result._setValue(futureValue); }; | |
187 _onError = (error) { | |
188 cancel(); | |
189 result._setError(error); | |
190 }; | |
191 | |
192 return result; | |
193 } | |
194 | |
195 // State management. | |
196 | |
197 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | |
198 bool get _isClosed => (_state & _STATE_CLOSED) != 0; | |
199 bool get _cancelled => (_state & _STATE_CANCELLED) != 0; | |
floitsch
2013/05/22 16:26:29
isCanceled
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
200 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | |
201 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | |
202 bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | |
203 bool get _canFire => _state < _STATE_IN_CALLBACK; | |
204 bool get _mayResumeInput => | |
205 !_isPaused && (_pending == null || _pending.isEmpty); | |
floitsch
2013/05/22 16:26:29
shouldn't _hasPending do the null-check too?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
_hasPending is accessing a cached bit in the state
| |
206 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | |
207 | |
208 bool get isPaused => _isPaused; | |
209 | |
210 void _cancel() { | |
211 _state |= _STATE_CANCELLED; | |
212 if (_hasPending) { | |
213 _pending.clear(); | |
214 } | |
215 } | |
216 | |
217 /** | |
218 * Increment the pause count. | |
219 * | |
220 * Also marks input as paused. | |
221 */ | |
222 void _incrementPauseCount() { | |
223 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | |
224 } | |
225 | |
226 /** | |
227 * Decrements the pause count. | |
228 * | |
229 * Does not automatically unpause the input (call [_onResume]) when | |
230 * the pause count reaches zero. This is handled elsewhere, and only | |
231 * if there are no pending events buffered. | |
232 */ | |
233 void _decrementPauseCount() { | |
234 assert(_isPaused); | |
235 _state -= _STATE_PAUSE_COUNT; | |
236 } | |
237 | |
238 // _EventSink interface. | |
239 | |
240 void _add(T data) { | |
241 assert(!_isClosed); | |
242 if (_cancelled) return; | |
243 if (_canFire) { | |
244 _sendData(data); | |
245 } else { | |
246 _addPending(new _DelayedData(data)); | |
247 } | |
248 } | |
249 | |
250 void _addError(Object error) { | |
251 if (_cancelled) return; | |
252 if (_cancelOnError) { | |
253 /// TODO: handle here? | |
254 } | |
255 if (_canFire) { | |
256 _sendError(error); // Reports cancel after sending. | |
257 } else { | |
258 _addPending(new _DelayedError(error)); | |
259 } | |
260 } | |
261 | |
262 void _close() { | |
263 assert(!_isClosed); | |
264 if (_cancelled) return; | |
265 _state |= _STATE_CLOSED; | |
266 if (_canFire) { | |
267 _sendDone(); | |
268 } else { | |
269 _addPending(const _DelayedDone()); | |
270 } | |
271 } | |
272 | |
273 // Hooks called when the input is paused, unpaused or cancelled. | |
274 // These must not throw. If overwritten to call user code, include suitable | |
275 // try/catch wrapping and send any errors to [_throwDelayed]. | |
276 void _onPause() { | |
277 assert(_isInputPaused); | |
278 } | |
279 | |
280 void _onResume() { | |
281 assert(!_isInputPaused); | |
282 } | |
283 | |
284 void _onCancel() { | |
285 assert(_cancelled); | |
286 } | |
287 | |
288 // Handle pending events. | |
289 | |
290 /** | |
291 * Add a pending event. | |
292 * | |
293 * If the subscription is not paused, this also schedules a firing | |
294 * of pending events later (if necessary). | |
295 */ | |
296 void _addPending(_DelayedEvent event) { | |
297 _StreamImplEvents pending = _pending; | |
298 if (_pending == null) pending = _pending = new _StreamImplEvents(); | |
299 pending.add(event); | |
300 if (!_hasPending) { | |
301 _state |= _STATE_HAS_PENDING; | |
302 if (!_isPaused) { | |
303 _pending.schedule(this); | |
304 } | |
305 } | |
306 } | |
307 | |
308 /* _EventDispatch interface. */ | |
309 | |
310 void _sendData(T data) { | |
311 assert(!_cancelled); | |
312 assert(!_isPaused); | |
313 assert(!_inCallback); | |
314 bool wasInputPaused = _isInputPaused; | |
315 _state |= _STATE_IN_CALLBACK; | |
316 try { | |
floitsch
2013/05/22 16:26:29
move the `try`s into a separate function?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Rather not. It would only be used for _send{Data,E
| |
317 _onData(data); | |
318 } catch (e, s) { | |
319 _throwDelayed(e, s); | |
320 } | |
321 _state &= ~_STATE_IN_CALLBACK; | |
322 _checkState(wasInputPaused); | |
323 } | |
324 | |
325 void _sendError(var error) { | |
326 assert(!_cancelled); | |
327 assert(!_isPaused); | |
328 assert(!_inCallback); | |
329 bool wasInputPaused = _isInputPaused; | |
330 _state |= _STATE_IN_CALLBACK; | |
331 try { | |
332 _onError(error); | |
333 } catch (e, s) { | |
334 _throwDelayed(e, s); | |
335 } | |
336 _state &= ~_STATE_IN_CALLBACK; | |
337 if (_cancelOnError) { | |
338 _cancel(); | |
339 } | |
340 _checkState(wasInputPaused); | |
341 } | |
342 | |
343 void _sendDone() { | |
344 assert(!_cancelled); | |
345 assert(!_isPaused); | |
346 assert(!_inCallback); | |
347 _state |= (_STATE_CANCELLED | _STATE_CLOSED | _STATE_IN_CALLBACK); | |
348 try { | |
349 _onDone(); | |
350 } catch (e, s) { | |
351 _throwDelayed(e, s); | |
352 } | |
353 try { | |
354 _onCancel(); // No checkState after cancel, it is always the last event. | |
355 } catch (e, s) { | |
356 print("BAD THROW 5: \n$s"); | |
357 } | |
358 _state &= ~_STATE_IN_CALLBACK; | |
359 } | |
360 | |
361 /** | |
362 * Call a hook function. | |
363 * | |
364 * The call is properly wrapped in code to avoid other callbacks | |
365 * during the call, and it checks for state changes after the call | |
366 * that should cause further callbacks. | |
367 */ | |
368 void _guardCallback(callback) { | |
369 assert(!_inCallback); | |
370 bool wasInputPaused = _isInputPaused; | |
371 _state |= _STATE_IN_CALLBACK; | |
372 try { | |
373 callback(); | |
374 } catch (e, s) { | |
375 print("BAD THROW 2: \n$s"); | |
376 } | |
377 _state &= ~_STATE_IN_CALLBACK; | |
378 _checkState(wasInputPaused); | |
379 } | |
380 | |
381 /** | |
382 * Check if the input needs to be informed of state changes. | |
383 * | |
384 * State changes are pausing, resuming and cancelling. | |
385 * After cancelling, no further callbacks will happen. | |
floitsch
2013/05/22 16:26:29
separate with more new lines.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
386 * The cancel callback is called after a user cancel, or after | |
387 * the final done event is sent. | |
388 */ | |
389 void _checkState(bool wasInputPaused) { | |
390 assert(!_inCallback); | |
391 if (_hasPending && _pending.isEmpty) { | |
392 _state &= ~_STATE_HAS_PENDING; | |
393 if (_isInputPaused && _mayResumeInput) { | |
394 _state &= ~_STATE_INPUT_PAUSED; | |
395 } | |
396 } | |
397 while (true) { | |
floitsch
2013/05/22 16:26:29
Add comment why we need a loop.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Ok.
| |
398 if (_cancelled) { | |
399 try { | |
400 _onCancel(); | |
401 } catch (e, s) { | |
402 print("BAD THROW 3: \n$s"); | |
403 } | |
404 return; | |
405 } | |
406 bool isInputPaused = _isInputPaused; | |
407 if (wasInputPaused == isInputPaused) break; | |
408 _state ^= _STATE_IN_CALLBACK; | |
409 try { | |
410 if (isInputPaused) { | |
411 _onPause(); | |
412 } else { | |
413 _onResume(); | |
414 } | |
415 } catch (e, s) { | |
416 print("BAD THROW 4: \n$s"); | |
417 } | |
418 _state &= ~_STATE_IN_CALLBACK; | |
419 wasInputPaused = isInputPaused; | |
420 } | |
421 if (_hasPending && !_isPaused) { | |
422 _pending.schedule(this); | |
423 } | |
424 } | |
425 } | |
72 | 426 |
73 // ------------------------------------------------------------------- | 427 // ------------------------------------------------------------------- |
74 // Common base class for single and multi-subscription streams. | 428 // Common base class for single and multi-subscription streams. |
75 // ------------------------------------------------------------------- | 429 // ------------------------------------------------------------------- |
76 abstract class _StreamImpl<T> extends Stream<T> { | 430 abstract class _StreamImpl<T> extends Stream<T> { |
77 /** Current state of the stream. */ | |
78 int _state = _STREAM_OPEN; | |
79 | |
80 /** | |
81 * List of pending events. | |
82 * | |
83 * If events are added to the stream (using [_add], [_addError] or [_done]) | |
84 * while the stream is paused, or while another event is firing, events will | |
85 * stored here. | |
86 * Also supports scheduling the events for later execution. | |
87 */ | |
88 _PendingEvents _pendingEvents; | |
89 | |
90 // ------------------------------------------------------------------ | 431 // ------------------------------------------------------------------ |
91 // Stream interface. | 432 // Stream interface. |
92 | 433 |
93 StreamSubscription<T> listen(void onData(T data), | 434 StreamSubscription<T> listen(void onData(T data), |
94 { void onError(error), | 435 { void onError(error), |
95 void onDone(), | 436 void onDone(), |
96 bool cancelOnError }) { | 437 bool cancelOnError }) { |
97 if (_isComplete) { | |
98 return new _DoneSubscription(onDone); | |
99 } | |
100 if (onData == null) onData = _nullDataHandler; | 438 if (onData == null) onData = _nullDataHandler; |
101 if (onError == null) onError = _nullErrorHandler; | 439 if (onError == null) onError = _nullErrorHandler; |
102 if (onDone == null) onDone = _nullDoneHandler; | 440 if (onDone == null) onDone = _nullDoneHandler; |
103 cancelOnError = identical(true, cancelOnError); | 441 cancelOnError = identical(true, cancelOnError); |
104 _StreamSubscriptionImpl subscription = | 442 StreamSubscription subscription = |
105 _createSubscription(onData, onError, onDone, cancelOnError); | 443 _createSubscription(onData, onError, onDone, cancelOnError); |
106 _addListener(subscription); | 444 _onListen(subscription); |
107 return subscription; | 445 return subscription; |
108 } | 446 } |
109 | 447 |
110 // ------------------------------------------------------------------ | |
111 // EventSink interface-like methods for sending events into the stream. | |
112 // It's the responsibility of the caller to ensure that the stream is not | |
113 // paused when adding events. If the stream is paused, the events will be | |
114 // queued, but it's better to not send events at all. | |
115 | |
116 /** | |
117 * Send or queue a data event. | |
118 */ | |
119 void _add(T value) { | |
120 if (_isClosed) throw new StateError("Sending on closed stream"); | |
121 if (!_mayFireState) { | |
122 // Not the time to send events. | |
123 _addPendingEvent(new _DelayedData<T>(value)); | |
124 return; | |
125 } | |
126 if (_hasPendingEvent) { | |
127 _addPendingEvent(new _DelayedData<T>(value)); | |
128 } else { | |
129 _sendData(value); | |
130 } | |
131 _handlePendingEvents(); | |
132 } | |
133 | |
134 /** | |
135 * Send or enqueue an error event. | |
136 * | |
137 * If a subscription has requested to be unsubscribed on errors, | |
138 * it will be unsubscribed after receiving this event. | |
139 */ | |
140 void _addError(error) { | |
141 if (_isClosed) throw new StateError("Sending on closed stream"); | |
142 if (!_mayFireState) { | |
143 // Not the time to send events. | |
144 _addPendingEvent(new _DelayedError(error)); | |
145 return; | |
146 } | |
147 if (_hasPendingEvent) { | |
148 _addPendingEvent(new _DelayedError(error)); | |
149 } else { | |
150 _sendError(error); | |
151 } | |
152 _handlePendingEvents(); | |
153 } | |
154 | |
155 /** | |
156 * Send or enqueue a "done" message. | |
157 * | |
158 * The "done" message should be sent at most once by a stream, and it | |
159 * should be the last message sent. | |
160 */ | |
161 void _close() { | |
162 if (_isClosed) return; | |
163 _state |= _STREAM_CLOSED; | |
164 if (!_mayFireState) { | |
165 // Not the time to send events. | |
166 _addPendingEvent(const _DelayedDone()); | |
167 return; | |
168 } | |
169 if (_hasPendingEvent) { | |
170 _addPendingEvent(new _DelayedDone()); | |
171 _handlePendingEvents(); | |
172 } else { | |
173 _sendDone(); | |
174 assert(_isComplete); | |
175 assert(!_hasPendingEvent); | |
176 } | |
177 } | |
178 | |
179 // ------------------------------------------------------------------- | 448 // ------------------------------------------------------------------- |
180 // Internal implementation. | |
181 | |
182 // State predicates. | |
183 | |
184 // Lifecycle state. | |
185 /** Whether the stream is in the default, open, state for events. */ | |
186 bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0; | |
187 | |
188 /** Whether the stream has been closed (a done event requested). */ | |
189 bool get _isClosed => (_state & _STREAM_CLOSED) != 0; | |
190 | |
191 /** Whether the stream is completed. */ | |
192 bool get _isComplete => (_state & _STREAM_COMPLETE) != 0; | |
193 | |
194 // Pause state. | |
195 | |
196 /** Whether one or more active subscribers have requested a pause. */ | |
197 bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT); | |
198 | |
199 /** How many times the stream has been paused. */ | |
200 int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT; | |
201 | |
202 /** | |
203 * Whether a controller thinks the stream is paused. | |
204 * | |
205 * When this changes, a pause-state change callback is performed. | |
206 * | |
207 * It may differ from [_isPaused] if there are pending events | |
208 * in the queue when the listeners resume. The controller won't | |
209 * be informed until all queued events have been fired. | |
210 */ | |
211 bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME); | |
212 | |
213 /** Whether we have a pending resume scheduled. */ | |
214 bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0; | |
215 | |
216 | |
217 // Action state. If the stream makes a call-out to external code, | |
218 // this state tracks it and avoids reentrancy problems. | |
219 | |
220 /** Whether the stream is not currently firing or calling a callback. */ | |
221 bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0; | |
222 | |
223 /** Whether we are currently executing a state-chance callback. */ | |
224 bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0; | |
225 | |
226 /** Whether we are currently firing an event. */ | |
227 bool get _isFiring => (_state & _STREAM_FIRING) != 0; | |
228 | |
229 /** Check whether the pending event queue is non-empty */ | |
230 bool get _hasPendingEvent => | |
231 _pendingEvents != null && !_pendingEvents.isEmpty; | |
232 | |
233 /** | |
234 * The bit representing the current or last event fired. | |
235 * | |
236 * This bit matches a bit on listeners that have received the corresponding | |
237 * event. It is toggled for each new event being fired. | |
238 */ | |
239 int get _currentEventIdBit => | |
240 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; | |
241 | |
242 /** Whether there is currently a subscriber on this [Stream]. */ | |
243 bool get _hasListener; | |
244 | |
245 | |
246 /** Whether the state bits allow firing. */ | |
247 bool get _mayFireState { | |
248 // The state allows firing unless: | |
249 // - it's currently firing | |
250 // - it's currently in a callback | |
251 // - it's paused | |
252 const int mask = | |
253 _STREAM_FIRING | | |
254 _STREAM_CALLBACK | | |
255 ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1); | |
256 return (_state & mask) == 0; | |
257 } | |
258 | |
259 // State modification. | |
260 | |
261 /** Record an increases in the number of times the listener has paused. */ | |
262 void _incrementPauseCount(_StreamListener<T> listener) { | |
263 listener._incrementPauseCount(); | |
264 _state &= ~_STREAM_PENDING_RESUME; | |
265 _updatePauseCount(1); | |
266 } | |
267 | |
268 /** Record a decrease in the number of times the listener has paused. */ | |
269 void _decrementPauseCount(_StreamListener<T> listener) { | |
270 assert(_isPaused); | |
271 listener._decrementPauseCount(); | |
272 _updatePauseCount(-1); | |
273 } | |
274 | |
275 /** Update the stream's own pause count only. */ | |
276 void _updatePauseCount(int by) { | |
277 int oldState = _state; | |
278 // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js | |
279 // converts the result of the left-shift to a positive number. | |
280 if (by >= 0) { | |
281 _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT); | |
282 } else { | |
283 _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT); | |
284 } | |
285 assert(_state >= 0); | |
286 assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) == | |
287 (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by); | |
288 } | |
289 | |
290 void _setClosed() { | |
291 assert(!_isClosed); | |
292 _state |= _STREAM_CLOSED; | |
293 } | |
294 | |
295 void _setComplete() { | |
296 assert(_isClosed); | |
297 _state = _state |_STREAM_COMPLETE; | |
298 } | |
299 | |
300 void _startFiring() { | |
301 assert(!_isFiring); | |
302 assert(!_isInCallback); | |
303 assert(_hasListener); | |
304 assert(!_isPaused); | |
305 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID | |
306 // bit. All current subscribers will now have a _LISTENER_EVENT_ID | |
307 // that doesn't match _STREAM_EVENT_ID, and they will receive the | |
308 // event being fired. | |
309 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; | |
310 } | |
311 | |
312 void _endFiring(bool wasInputPaused) { | |
313 assert(_isFiring); | |
314 _state ^= _STREAM_FIRING; | |
315 // Had listeners, or we wouldn't have fired. | |
316 _checkCallbacks(true, wasInputPaused); | |
317 } | |
318 | |
319 /** | |
320 * Record that a listener wants a pause from events. | |
321 * | |
322 * This methods is called from [_StreamListener.pause()]. | |
323 * Subclasses can override this method, along with [isPaused] and | |
324 * [createSubscription], if they want to do a different handling of paused | |
325 * subscriptions, e.g., a filtering stream pausing its own source if all its | |
326 * subscribers are paused. | |
327 */ | |
328 void _pause(_StreamListener<T> listener, Future resumeSignal) { | |
329 assert(identical(listener._source, this)); | |
330 if (!listener._isSubscribed) { | |
331 throw new StateError("Subscription has been canceled."); | |
332 } | |
333 assert(!_isComplete); // There can be no subscribers when complete. | |
334 bool wasInputPaused = _isInputPaused; | |
335 bool wasPaused = _isPaused; | |
336 _incrementPauseCount(listener); | |
337 if (resumeSignal != null) { | |
338 resumeSignal.whenComplete(() { this._resume(listener, true); }); | |
339 } | |
340 if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) { | |
341 _pendingEvents.cancelSchedule(); | |
342 } | |
343 if (_isInactive && !wasInputPaused) { | |
344 _checkCallbacks(true, false); | |
345 if (!_isPaused && _hasPendingEvent) { | |
346 _schedulePendingEvents(); | |
347 } | |
348 } | |
349 } | |
350 | |
351 /** Stops pausing due to one request from the given listener. */ | |
352 void _resume(_StreamListener<T> listener, bool fromEvent) { | |
353 if (!listener.isPaused) return; | |
354 assert(listener._isSubscribed); | |
355 assert(_isPaused); | |
356 _decrementPauseCount(listener); | |
357 if (!_isPaused) { | |
358 if (_hasPendingEvent) { | |
359 _state |= _STREAM_PENDING_RESUME; | |
360 // Controller's pause state hasn't changed. | |
361 // If we can fire events now, fire any pending events right away. | |
362 if (_isInactive) { | |
363 if (fromEvent) { | |
364 _handlePendingEvents(); | |
365 } else { | |
366 _schedulePendingEvents(); | |
367 } | |
368 } | |
369 } else if (_isInactive) { | |
370 _checkCallbacks(true, true); | |
371 if (!_isPaused && _hasPendingEvent) { | |
372 if (fromEvent) { | |
373 _handlePendingEvents(); | |
374 } else { | |
375 _schedulePendingEvents(); | |
376 } | |
377 } | |
378 } | |
379 } | |
380 } | |
381 | |
382 /** Schedule pending events to be executed. */ | |
383 void _schedulePendingEvents() { | |
384 assert(_hasPendingEvent); | |
385 _pendingEvents.schedule(this); | |
386 } | |
387 | |
388 /** Create a subscription object. Called by [subcribe]. */ | 449 /** Create a subscription object. Called by [subcribe]. */ |
389 _StreamSubscriptionImpl<T> _createSubscription( | 450 _BufferingStreamSubscription<T> _createSubscription( |
390 void onData(T data), | |
391 void onError(error), | |
392 void onDone(), | |
393 bool cancelOnError); | |
394 | |
395 /** | |
396 * Adds a listener to this stream. | |
397 */ | |
398 void _addListener(_StreamSubscriptionImpl subscription); | |
399 | |
400 /** | |
401 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | |
402 * | |
403 * This method is called from [_StreamSubscriptionImpl.cancel]. | |
404 * | |
405 * If an event is currently firing, the cancel is delayed | |
406 * until after the subscribers have received the event. | |
407 */ | |
408 void _cancel(_StreamSubscriptionImpl subscriber); | |
409 | |
410 /** | |
411 * Iterate over all current subscribers and perform an action on each. | |
412 * | |
413 * Subscribers added during the iteration will not be visited. | |
414 * Subscribers unsubscribed during the iteration will only be removed | |
415 * after they have been acted on. | |
416 * | |
417 * Any change in the pause state is only reported after all subscribers have | |
418 * received the event. | |
419 * | |
420 * The [action] must not throw, or the controller will be left in an | |
421 * invalid state. | |
422 * | |
423 * This method must not be called while [isFiring] is true. | |
424 */ | |
425 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); | |
426 | |
427 /** | |
428 * Checks whether the subscription/pause state has changed. | |
429 * | |
430 * Calls the appropriate callback if the state has changed from the | |
431 * provided one. Repeats calling callbacks as long as the call changes | |
432 * the state. | |
433 */ | |
434 void _checkCallbacks(bool hadListener, bool wasPaused) { | |
435 assert(!_isFiring); | |
436 // Will be handled after the current callback. | |
437 if (_isInCallback) return; | |
438 if (_hasPendingResume && !_hasPendingEvent) { | |
439 _state ^= _STREAM_PENDING_RESUME; | |
440 } | |
441 _state |= _STREAM_CALLBACK; | |
442 while (true) { | |
443 bool hasListener = _hasListener; | |
444 bool isPaused = _isInputPaused; | |
445 if (hadListener != hasListener) { | |
446 _onSubscriptionStateChange(); | |
447 } else if (isPaused != wasPaused) { | |
448 _onPauseStateChange(); | |
449 } else { | |
450 _state ^= _STREAM_CALLBACK; | |
451 return; | |
452 } | |
453 wasPaused = isPaused; | |
454 hadListener = hasListener; | |
455 } | |
456 } | |
457 | |
458 /** | |
459 * Called when the first subscriber requests a pause or the last a resume. | |
460 * | |
461 * Read [isPaused] to see the new state. | |
462 */ | |
463 void _onPauseStateChange() {} | |
464 | |
465 /** | |
466 * Called when the first listener subscribes or the last unsubscribes. | |
467 * | |
468 * Read [hasListener] to see what the new state is. | |
469 */ | |
470 void _onSubscriptionStateChange() {} | |
471 | |
472 /** | |
473 * Add a pending event at the end of the pending event queue. | |
474 * | |
475 * Schedules events if currently not paused and inside a callback. | |
476 */ | |
477 void _addPendingEvent(_DelayedEvent event) { | |
478 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); | |
479 _StreamImplEvents events = _pendingEvents; | |
480 events.add(event); | |
481 if (_isPaused || _isFiring) return; | |
482 if (_isInCallback) { | |
483 _schedulePendingEvents(); | |
484 return; | |
485 } | |
486 } | |
487 | |
488 /** Fire any pending events until the pending event queue is empty. */ | |
489 void _handlePendingEvents() { | |
490 assert(_isInactive); | |
491 if (!_hasPendingEvent) return; | |
492 _PendingEvents events = _pendingEvents; | |
493 do { | |
494 if (_isPaused) return; | |
495 if (events.isScheduled) events.cancelSchedule(); | |
496 events.handleNext(this); | |
497 } while (!events.isEmpty); | |
498 } | |
499 | |
500 /** | |
501 * Send a data event directly to each subscriber. | |
502 */ | |
503 _sendData(T value) { | |
504 assert(!_isPaused); | |
505 assert(!_isComplete); | |
506 if (!_hasListener) return; | |
507 _forEachSubscriber((subscriber) { | |
508 try { | |
509 subscriber._sendData(value); | |
510 } catch (e, s) { | |
511 _throwDelayed(e, s); | |
512 } | |
513 }); | |
514 } | |
515 | |
516 /** | |
517 * Sends an error event directly to each subscriber. | |
518 */ | |
519 void _sendError(error) { | |
520 assert(!_isPaused); | |
521 assert(!_isComplete); | |
522 if (!_hasListener) return; | |
523 _forEachSubscriber((subscriber) { | |
524 try { | |
525 subscriber._sendError(error); | |
526 } catch (e, s) { | |
527 _throwDelayed(e, s); | |
528 } | |
529 }); | |
530 } | |
531 | |
532 /** | |
533 * Sends the "done" message directly to each subscriber. | |
534 * This automatically stops further subscription and | |
535 * unsubscribes all subscribers. | |
536 */ | |
537 void _sendDone() { | |
538 assert(!_isPaused); | |
539 assert(_isClosed); | |
540 _setComplete(); | |
541 if (!_hasListener) return; | |
542 _forEachSubscriber((subscriber) { | |
543 _cancel(subscriber); | |
544 try { | |
545 subscriber._sendDone(); | |
546 } catch (e, s) { | |
547 _throwDelayed(e, s); | |
548 } | |
549 }); | |
550 assert(!_hasListener); | |
551 } | |
552 } | |
553 | |
554 // ------------------------------------------------------------------- | |
555 // Default implementation of a stream with a single subscriber. | |
556 // ------------------------------------------------------------------- | |
557 /** | |
558 * Default implementation of stream capable of sending events to one subscriber. | |
559 * | |
560 * Any class needing to implement [Stream] can either directly extend this | |
561 * class, or extend [Stream] and delegate the subscribe method to an instance | |
562 * of this class. | |
563 * | |
564 * The only public methods are those of [Stream], so instances of | |
565 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing | |
566 * internal functionality. | |
567 * | |
568 * The [StreamController] is a public facing version of this class, with | |
569 * some methods made public. | |
570 * | |
571 * The user interface of [_SingleStreamImpl] are the following methods: | |
572 * * [_add]: Add a data event to the stream. | |
573 * * [_addError]: Add an error event to the stream. | |
574 * * [_close]: Request to close the stream. | |
575 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or | |
576 * when losing the last subscriber. | |
577 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | |
578 * * [_hasListener]: Test whether there are currently any subscribers. | |
579 * * [_isInputPaused]: Test whether the stream is currently paused. | |
580 * The user should not add new events while the stream is paused, but if it | |
581 * happens anyway, the stream will enqueue the events just as when new events | |
582 * arrive while still firing an old event. | |
583 */ | |
584 class _SingleStreamImpl<T> extends _StreamImpl<T> { | |
585 _StreamListener _subscriber = null; | |
586 | |
587 /** Whether there is currently a subscriber on this [Stream]. */ | |
588 bool get _hasListener => _subscriber != null; | |
589 | |
590 // ------------------------------------------------------------------- | |
591 // Internal implementation. | |
592 | |
593 _SingleStreamImpl() { | |
594 // Start out paused. | |
595 _updatePauseCount(1); | |
596 } | |
597 | |
598 /** | |
599 * Create the new subscription object. | |
600 */ | |
601 _StreamSubscriptionImpl<T> _createSubscription( | |
602 void onData(T data), | 451 void onData(T data), |
603 void onError(error), | 452 void onError(error), |
604 void onDone(), | 453 void onDone(), |
605 bool cancelOnError) { | 454 bool cancelOnError) { |
606 return new _StreamSubscriptionImpl<T>( | 455 return new _BufferingStreamSubscription<T>( |
607 this, onData, onError, onDone, cancelOnError); | 456 onData, onError, onDone, cancelOnError, null); |
608 } | 457 } |
609 | 458 |
610 void _addListener(_StreamListener subscription) { | 459 /** Hook called when the subscription has been created. */ |
611 assert(!_isComplete); | 460 void _onListen(StreamSubscription subscription) {} |
612 if (_hasListener) { | 461 } |
613 throw new StateError("Stream already has subscriber."); | 462 |
614 } | 463 typedef _PendingEvents _EventGenerator(); |
615 assert(_pauseCount == 1); | |
616 _updatePauseCount(-1); | |
617 _subscriber = subscription; | |
618 subscription._setSubscribed(0); | |
619 if (_isInactive) { | |
620 _checkCallbacks(false, true); | |
621 if (!_isPaused && _hasPendingEvent) { | |
622 _schedulePendingEvents(); | |
623 } | |
624 } | |
625 } | |
626 | |
627 /** | |
628 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | |
629 * | |
630 * This method is called from [_StreamSubscriptionImpl.cancel]. | |
631 */ | |
632 void _cancel(_StreamListener subscriber) { | |
633 assert(identical(subscriber._source, this)); | |
634 // We allow unsubscribing the currently firing subscription during | |
635 // the event firing, because it is indistinguishable from delaying it since | |
636 // that event has already received the event. | |
637 if (!identical(_subscriber, subscriber)) { | |
638 // You may unsubscribe more than once, only the first one counts. | |
639 return; | |
640 } | |
641 _subscriber = null; | |
642 // Unsubscribing a paused subscription also cancels its pauses. | |
643 int resumeCount = subscriber._setUnsubscribed(); | |
644 // Keep being paused while there is no subscriber and the stream is not | |
645 // complete. | |
646 _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1); | |
647 if (_isInactive) { | |
648 _checkCallbacks(true, resumeCount > 0); | |
649 if (!_isPaused && _hasPendingEvent) { | |
650 _schedulePendingEvents(); | |
651 } | |
652 } | |
653 } | |
654 | |
655 void _forEachSubscriber( | |
656 void action(_StreamListener<T> subscription)) { | |
657 assert(!_isPaused); | |
658 bool wasInputPaused = _isInputPaused; | |
659 _StreamListener subscription = _subscriber; | |
660 assert(subscription != null); | |
661 _startFiring(); | |
662 action(subscription); | |
663 _endFiring(wasInputPaused); | |
664 } | |
665 } | |
666 | |
667 // ------------------------------------------------------------------- | |
668 // Default implementation of a stream with subscribers. | |
669 // ------------------------------------------------------------------- | |
670 | |
671 /** | |
672 * Default implementation of stream capable of sending events to subscribers. | |
673 * | |
674 * Any class needing to implement [Stream] can either directly extend this | |
675 * class, or extend [Stream] and delegate the subscribe method to an instance | |
676 * of this class. | |
677 * | |
678 * The only public methods are those of [Stream], so instances of | |
679 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing | |
680 * internal functionality. | |
681 * | |
682 * The [StreamController] is a public facing version of this class, with | |
683 * some methods made public. | |
684 * | |
685 * The user interface of [_MultiStreamImpl] are the following methods: | |
686 * * [_add]: Add a data event to the stream. | |
687 * * [_addError]: Add an error event to the stream. | |
688 * * [_close]: Request to close the stream. | |
689 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or | |
690 * when losing the last subscriber. | |
691 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | |
692 * * [_hasListener]: Test whether there are currently any subscribers. | |
693 * * [_isPaused]: Test whether the stream is currently paused. | |
694 * The user should not add new events while the stream is paused, but if it | |
695 * happens anyway, the stream will enqueue the events just as when new events | |
696 * arrive while still firing an old event. | |
697 */ | |
698 class _MultiStreamImpl<T> extends _StreamImpl<T> | |
699 implements _InternalLinkList { | |
700 // Link list implementation (mixin when possible). | |
701 _InternalLink _nextLink; | |
702 _InternalLink _previousLink; | |
703 | |
704 _MultiStreamImpl() { | |
705 _nextLink = _previousLink = this; | |
706 } | |
707 | |
708 bool get isBroadcast => true; | |
709 | |
710 Stream<T> asBroadcastStream() => this; | |
711 | |
712 // ------------------------------------------------------------------ | |
713 // Helper functions that can be overridden in subclasses. | |
714 | |
715 /** Whether there are currently any subscribers on this [Stream]. */ | |
716 bool get _hasListener => !_InternalLinkList.isEmpty(this); | |
717 | |
718 /** | |
719 * Create the new subscription object. | |
720 */ | |
721 _StreamListener<T> _createSubscription( | |
722 void onData(T data), | |
723 void onError(error), | |
724 void onDone(), | |
725 bool cancelOnError) { | |
726 return new _StreamSubscriptionImpl<T>( | |
727 this, onData, onError, onDone, cancelOnError); | |
728 } | |
729 | |
730 // ------------------------------------------------------------------- | |
731 // Internal implementation. | |
732 | |
733 /** | |
734 * Iterate over all current subscribers and perform an action on each. | |
735 * | |
736 * The set of subscribers cannot be modified during this iteration. | |
737 * All attempts to add or unsubscribe subscribers will be delayed until | |
738 * after the iteration is complete. | |
739 * | |
740 * The [action] must not throw, or the controller will be left in an | |
741 * invalid state. | |
742 * | |
743 * This method must not be called while [isFiring] is true. | |
744 */ | |
745 void _forEachSubscriber( | |
746 void action(_StreamListener<T> subscription)) { | |
747 assert(!_isFiring); | |
748 if (!_hasListener) return; | |
749 bool wasInputPaused = _isInputPaused; | |
750 _startFiring(); | |
751 _InternalLink cursor = this._nextLink; | |
752 while (!identical(cursor, this)) { | |
753 _StreamListener<T> current = cursor; | |
754 if (current._needsEvent(_currentEventIdBit)) { | |
755 action(current); | |
756 // Marks as having received the event. | |
757 current._toggleEventReceived(); | |
758 } | |
759 cursor = current._nextLink; | |
760 if (current._isPendingUnsubscribe) { | |
761 _removeListener(current); | |
762 } | |
763 } | |
764 _endFiring(wasInputPaused); | |
765 } | |
766 | |
767 void _addListener(_StreamListener listener) { | |
768 listener._setSubscribed(_currentEventIdBit); | |
769 bool hadListener = _hasListener; | |
770 _InternalLinkList.add(this, listener); | |
771 if (!hadListener && _isInactive) { | |
772 _checkCallbacks(false, false); | |
773 if (!_isPaused && _hasPendingEvent) { | |
774 _schedulePendingEvents(); | |
775 } | |
776 } | |
777 } | |
778 | |
779 /** | |
780 * Handle a cancel requested from a [_StreamListener]. | |
781 * | |
782 * This method is called from [_StreamListener.cancel]. | |
783 * | |
784 * If an event is currently firing, the cancel is delayed | |
785 * until after the subscribers have received the event. | |
786 */ | |
787 void _cancel(_StreamListener listener) { | |
788 assert(identical(listener._source, this)); | |
789 if (_InternalLink.isUnlinked(listener)) { | |
790 // You may unsubscribe more than once, only the first one counts. | |
791 return; | |
792 } | |
793 if (_isFiring) { | |
794 if (listener._needsEvent(_currentEventIdBit)) { | |
795 assert(listener._isSubscribed); | |
796 listener._setPendingUnsubscribe(_currentEventIdBit); | |
797 } else { | |
798 // The listener has been notified of the event (or don't need to, | |
799 // if it's still pending subscription) so it's safe to remove it. | |
800 _removeListener(listener); | |
801 } | |
802 // Pause and subscription state changes are reported when we end | |
803 // firing. | |
804 } else { | |
805 bool wasInputPaused = _isInputPaused; | |
806 _removeListener(listener); | |
807 if (_isInactive) { | |
808 _checkCallbacks(true, wasInputPaused); | |
809 if (!_isPaused && _hasPendingEvent) { | |
810 _schedulePendingEvents(); | |
811 } | |
812 } | |
813 } | |
814 } | |
815 | |
816 /** | |
817 * Removes a listener from this stream and cancels its pauses. | |
818 * | |
819 * This is a low-level action that doesn't call [_onSubscriptionStateChange]. | |
820 * or [_callOnPauseStateChange]. | |
821 */ | |
822 void _removeListener(_StreamListener listener) { | |
823 int pauseCount = listener._setUnsubscribed(); | |
824 _InternalLinkList.remove(listener); | |
825 if (pauseCount > 0) { | |
826 _updatePauseCount(-pauseCount); | |
827 if (!_isPaused && _hasPendingEvent) { | |
828 _state |= _STREAM_PENDING_RESUME; | |
829 } | |
830 } | |
831 } | |
832 } | |
833 | |
834 | 464 |
835 /** Stream that generates its own events. */ | 465 /** Stream that generates its own events. */ |
836 class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { | 466 class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
837 /** | 467 final _EventGenerator _pending; |
838 * Initializes the stream to have only the events provided by [events]. | 468 /** |
839 * | 469 * Initializes the stream to have only the events provided by a |
840 * A [_PendingEvents] implementation provides events that are handled | 470 * [_PendingEvents]. |
841 * by calling [_PendingEvents.handleNext] with the [_StreamImpl]. | 471 * |
842 */ | 472 * A new [_PendingEvents] must be generated for each listen. |
843 _GeneratedSingleStreamImpl(_PendingEvents events) { | 473 */ |
844 _pendingEvents = events; | 474 _GeneratedStreamImpl(this._pending); |
845 _setClosed(); // Closed for input since all events are already pending. | 475 |
846 } | 476 StreamSubscription _createSubscription(void onData(T data), |
847 | 477 void onError(Object error), |
848 void _add(T value) { | 478 void onDone(), |
849 throw new UnsupportedError("Cannot inject events into generated stream"); | 479 bool cancelOnError) { |
850 } | 480 return new _BufferingStreamSubscription( |
851 | 481 onData, onError, onDone, cancelOnError, _pending()); |
852 void _addError(value) { | |
853 throw new UnsupportedError("Cannot inject events into generated stream"); | |
854 } | |
855 | |
856 void _close() { | |
857 throw new UnsupportedError("Cannot inject events into generated stream"); | |
858 } | 482 } |
859 } | 483 } |
860 | 484 |
861 | 485 |
862 /** Pending events object that gets its events from an [Iterable]. */ | 486 /** Pending events object that gets its events from an [Iterable]. */ |
863 class _IterablePendingEvents<T> extends _PendingEvents { | 487 class _IterablePendingEvents<T> extends _PendingEvents { |
488 // The stream has been cancelled by an error, but hasn't sent a final | |
489 // "Done" event yet. | |
490 static const int _STATE_CANCELLED = 1; | |
491 // The stream is completely done. | |
492 static const int _STATE_CLOSED = 3; | |
493 | |
864 final Iterator<T> _iterator; | 494 final Iterator<T> _iterator; |
495 int _iterationState = 0; | |
496 | |
497 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | |
498 | |
865 /** | 499 /** |
866 * Whether there are no more events to be sent. | 500 * Whether there are no more events to be sent. |
867 * | 501 * |
868 * This starts out as [:false:] since there is always at least | 502 * This starts out as [:false:] since there is always at least |
869 * a 'done' event to be sent. | 503 * a 'done' event to be sent. |
870 */ | 504 */ |
871 bool _isDone = false; | 505 bool get _isDone => _iterationState == _STATE_CLOSED; |
872 | 506 bool get _isCancelled => _iterationState == _STATE_CANCELLED; |
873 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | |
874 | 507 |
875 bool get isEmpty => _isDone; | 508 bool get isEmpty => _isDone; |
876 | 509 |
877 void handleNext(_StreamImpl<T> stream) { | 510 void handleNext(_EventDispatch dispatch) { |
878 if (_isDone) throw new StateError("No events pending."); | 511 if (_isCancelled) { |
512 _iterationState = _STATE_CLOSED; | |
513 dispatch._sendDone(); | |
514 return; | |
515 } | |
516 if (_isDone) { | |
517 throw new StateError("No events pending."); | |
518 } | |
519 bool isDone; | |
879 try { | 520 try { |
880 _isDone = !_iterator.moveNext(); | 521 isDone = !_iterator.moveNext(); |
881 if (!_isDone) { | |
882 stream._sendData(_iterator.current); | |
883 } else { | |
884 stream._sendDone(); | |
885 } | |
886 } catch (e, s) { | 522 } catch (e, s) { |
887 stream._sendError(_asyncError(e, s)); | 523 _iterationState = _STATE_CANCELLED; // Will send a single done after this . |
floitsch
2013/05/22 16:26:29
long line.
I don't think it makes sense to send a
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Good point. Some listeners might be left hanging,
| |
888 stream._sendDone(); | 524 dispatch._sendError(_asyncError(e, s)); |
889 _isDone = true; | 525 return; |
890 } | 526 } |
891 } | 527 if (!isDone) { |
892 } | 528 dispatch._sendData(_iterator.current); |
893 | 529 } else { |
894 | 530 _iterationState = _STATE_CLOSED; |
895 /** | 531 dispatch._sendDone(); |
896 * The subscription class that the [StreamController] uses. | 532 } |
897 * | 533 } |
898 * The [_StreamImpl.createSubscription] method should | 534 |
899 * create an object of this type, or another subclass of [_StreamListener]. | 535 void clear() { |
900 * A subclass of [_StreamImpl] can specify which subclass | 536 if (isScheduled) cancelSchedule(); |
901 * of [_StreamSubscriptionImpl] it uses by overriding | 537 _state = _STATE_CLOSED; |
902 * [_StreamImpl.createSubscription]. | 538 } |
903 * | 539 } |
904 * The subscription is in one of three states: | 540 |
905 * * Subscribed. | |
906 * * Paused-and-subscribed. | |
907 * * Unsubscribed. | |
908 * Unsubscribing also resumes any pauses started by the subscription. | |
909 */ | |
910 class _StreamSubscriptionImpl<T> extends _StreamListener<T> | |
911 implements StreamSubscription<T> { | |
912 final bool _cancelOnError; | |
913 // TODO(ahe): Restore type when feature is implemented in dart2js | |
914 // checked mode. http://dartbug.com/7733 | |
915 var /* _DataHandler<T> */ _onData; | |
916 _ErrorHandler _onError; | |
917 _DoneHandler _onDone; | |
918 _StreamSubscriptionImpl(_StreamImpl source, | |
919 this._onData, | |
920 this._onError, | |
921 this._onDone, | |
922 this._cancelOnError) : super(source); | |
923 | |
924 void onData(void handleData(T event)) { | |
925 if (handleData == null) handleData = _nullDataHandler; | |
926 _onData = handleData; | |
927 } | |
928 | |
929 void onError(void handleError(error)) { | |
930 if (handleError == null) handleError = _nullErrorHandler; | |
931 _onError = handleError; | |
932 } | |
933 | |
934 void onDone(void handleDone()) { | |
935 if (handleDone == null) handleDone = _nullDoneHandler; | |
936 _onDone = handleDone; | |
937 } | |
938 | |
939 void _sendData(T data) { | |
940 _onData(data); | |
941 } | |
942 | |
943 void _sendError(error) { | |
944 _onError(error); | |
945 if (_cancelOnError) _source._cancel(this); | |
946 } | |
947 | |
948 void _sendDone() { | |
949 _onDone(); | |
950 } | |
951 | |
952 void cancel() { | |
953 if (!_isSubscribed) return; | |
954 _source._cancel(this); | |
955 } | |
956 | |
957 void pause([Future resumeSignal]) { | |
958 if (!_isSubscribed) return; | |
959 _source._pause(this, resumeSignal); | |
960 } | |
961 | |
962 void resume() { | |
963 if (!_isSubscribed || !isPaused) return; | |
964 _source._resume(this, false); | |
965 } | |
966 | |
967 Future asFuture([var futureValue]) { | |
968 _FutureImpl<T> result = new _FutureImpl<T>(); | |
969 | |
970 // Overwrite the onDone and onError handlers. | |
971 onDone(() { result._setValue(futureValue); }); | |
972 onError((error) { | |
973 cancel(); | |
974 result._setError(error); | |
975 }); | |
976 | |
977 return result; | |
978 } | |
979 } | |
980 | 541 |
981 // Internal helpers. | 542 // Internal helpers. |
982 | 543 |
983 // Types of the different handlers on a stream. Types used to type fields. | 544 // Types of the different handlers on a stream. Types used to type fields. |
984 typedef void _DataHandler<T>(T value); | 545 typedef void _DataHandler<T>(T value); |
985 typedef void _ErrorHandler(error); | 546 typedef void _ErrorHandler(error); |
986 typedef void _DoneHandler(); | 547 typedef void _DoneHandler(); |
987 | 548 |
988 | 549 |
989 /** Default data handler, does nothing. */ | 550 /** Default data handler, does nothing. */ |
990 void _nullDataHandler(var value) {} | 551 void _nullDataHandler(var value) {} |
991 | 552 |
992 /** Default error handler, reports the error to the global handler. */ | 553 /** Default error handler, reports the error to the global handler. */ |
993 void _nullErrorHandler(error) { | 554 void _nullErrorHandler(error) { |
994 _throwDelayed(error); | 555 _throwDelayed(error); |
995 } | 556 } |
996 | 557 |
997 /** Default done handler, does nothing. */ | 558 /** Default done handler, does nothing. */ |
998 void _nullDoneHandler() {} | 559 void _nullDoneHandler() {} |
999 | 560 |
1000 | 561 |
1001 /** A delayed event on a stream implementation. */ | 562 /** A delayed event on a buffering stream subscription. */ |
1002 abstract class _DelayedEvent { | 563 abstract class _DelayedEvent { |
1003 /** Added as a linked list on the [StreamController]. */ | 564 /** Added as a linked list on the [StreamController]. */ |
1004 _DelayedEvent next; | 565 _DelayedEvent next; |
1005 /** Execute the delayed event on the [StreamController]. */ | 566 /** Execute the delayed event on the [StreamController]. */ |
1006 void perform(_StreamImpl stream); | 567 void perform(_EventDispatch dispatch); |
1007 } | 568 } |
1008 | 569 |
1009 /** A delayed data event. */ | 570 /** A delayed data event. */ |
1010 class _DelayedData<T> extends _DelayedEvent{ | 571 class _DelayedData<T> extends _DelayedEvent{ |
1011 final T value; | 572 final T value; |
1012 _DelayedData(this.value); | 573 _DelayedData(this.value); |
1013 void perform(_StreamImpl<T> stream) { | 574 void perform(_EventDispatch<T> dispatch) { |
1014 stream._sendData(value); | 575 dispatch._sendData(value); |
1015 } | 576 } |
1016 } | 577 } |
1017 | 578 |
1018 /** A delayed error event. */ | 579 /** A delayed error event. */ |
1019 class _DelayedError extends _DelayedEvent { | 580 class _DelayedError extends _DelayedEvent { |
1020 final error; | 581 final error; |
1021 _DelayedError(this.error); | 582 _DelayedError(this.error); |
1022 void perform(_StreamImpl stream) { | 583 void perform(_EventDispatch dispatch) { |
1023 stream._sendError(error); | 584 dispatch._sendError(error); |
1024 } | 585 } |
1025 } | 586 } |
1026 | 587 |
1027 /** A delayed done event. */ | 588 /** A delayed done event. */ |
1028 class _DelayedDone implements _DelayedEvent { | 589 class _DelayedDone implements _DelayedEvent { |
1029 const _DelayedDone(); | 590 const _DelayedDone(); |
1030 void perform(_StreamImpl stream) { | 591 void perform(_EventDispatch dispatch) { |
1031 stream._sendDone(); | 592 dispatch._sendDone(); |
1032 } | 593 } |
1033 | 594 |
1034 _DelayedEvent get next => null; | 595 _DelayedEvent get next => null; |
1035 | 596 |
1036 void set next(_DelayedEvent _) { | 597 void set next(_DelayedEvent _) { |
1037 throw new StateError("No events after a done."); | 598 throw new StateError("No events after a done."); |
1038 } | 599 } |
1039 } | 600 } |
1040 | 601 |
1041 /** | 602 /** |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1111 listLast._nextLink = otherNext; | 672 listLast._nextLink = otherNext; |
1112 otherNext._previousLink = listLast; | 673 otherNext._previousLink = listLast; |
1113 _InternalLink otherLast = other._previousLink; | 674 _InternalLink otherLast = other._previousLink; |
1114 list._previousLink = otherLast; | 675 list._previousLink = otherLast; |
1115 otherLast._nextLink = list; | 676 otherLast._nextLink = list; |
1116 // Clean up [other]. | 677 // Clean up [other]. |
1117 other._nextLink = other._previousLink = other; | 678 other._nextLink = other._previousLink = other; |
1118 } | 679 } |
1119 } | 680 } |
1120 | 681 |
1121 /** Abstract type for an internal interface for sending events. */ | |
1122 abstract class _EventOutputSink<T> { | |
1123 _sendData(T data); | |
1124 _sendError(error); | |
1125 _sendDone(); | |
1126 } | |
1127 | |
1128 abstract class _StreamListener<T> extends _InternalLink | |
1129 implements _EventOutputSink<T> { | |
1130 final _StreamImpl _source; | |
1131 int _state = _LISTENER_UNSUBSCRIBED; | |
1132 | |
1133 _StreamListener(this._source); | |
1134 | |
1135 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); | |
1136 | |
1137 bool get _isPendingUnsubscribe => | |
1138 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; | |
1139 | |
1140 bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0; | |
1141 | |
1142 /** | |
1143 * Whether the listener still needs to receive the currently firing event. | |
1144 * | |
1145 * The currently firing event is identified by a single bit, which alternates | |
1146 * between events. The [_state] contains the previously sent event's bit in | |
1147 * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener | |
1148 * still need the current event. | |
1149 */ | |
1150 bool _needsEvent(int currentEventIdBit) { | |
1151 int lastEventIdBit = | |
1152 (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT; | |
1153 return lastEventIdBit != currentEventIdBit; | |
1154 } | |
1155 | |
1156 /// If a subscriber's "firing bit" doesn't match the stream's firing bit, | |
1157 /// we are currently firing an event and the subscriber still need to receive | |
1158 /// the event. | |
1159 void _toggleEventReceived() { | |
1160 _state ^= _LISTENER_EVENT_ID; | |
1161 } | |
1162 | |
1163 void _setSubscribed(int eventIdBit) { | |
1164 assert(eventIdBit == 0 || eventIdBit == 1); | |
1165 _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT); | |
1166 } | |
1167 | |
1168 void _setPendingUnsubscribe(int currentEventIdBit) { | |
1169 assert(_isSubscribed); | |
1170 // Sets the pending unsubscribe, and ensures that the listener | |
1171 // won't get the current event. | |
1172 _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID; | |
1173 _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT; | |
1174 assert(!_needsEvent(currentEventIdBit)); | |
1175 } | |
1176 | |
1177 /** | |
1178 * Marks the listener as unsubscibed. | |
1179 * | |
1180 * Returns the number of unresumed pauses for the listener. | |
1181 */ | |
1182 int _setUnsubscribed() { | |
1183 assert(_isSubscribed); | |
1184 int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT; | |
1185 _state = _LISTENER_UNSUBSCRIBED; | |
1186 return timesPaused; | |
1187 } | |
1188 | |
1189 void _incrementPauseCount() { | |
1190 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; | |
1191 } | |
1192 | |
1193 void _decrementPauseCount() { | |
1194 assert(isPaused); | |
1195 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; | |
1196 } | |
1197 | |
1198 _sendData(T data); | |
1199 _sendError(error); | |
1200 _sendDone(); | |
1201 } | |
1202 | |
1203 /** Superclass for provider of pending events. */ | 682 /** Superclass for provider of pending events. */ |
1204 abstract class _PendingEvents { | 683 abstract class _PendingEvents { |
684 // No async event has been scheduled. | |
685 static const int _STATE_UNSCHEDULED = 0; | |
686 // An async event has been scheduled to run a function. | |
687 static const int _STATE_SCHEDULED = 1; | |
688 // An async event has been scheduled, but it will do nothing when it runs. | |
689 // Async events can't be preempted. | |
690 static const int _STATE_CANCELLED = 3; | |
691 | |
1205 /** | 692 /** |
1206 * Timer set when pending events are scheduled for execution. | 693 * Timer set when pending events are scheduled for execution. |
floitsch
2013/05/22 16:26:29
Update comments.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
1207 * | 694 * |
1208 * When scheduling pending events for execution in a later cycle, the timer | 695 * When scheduling pending events for execution in a later cycle, the timer |
1209 * is stored here. If pending events are executed earlier than that, e.g., | 696 * is stored here. If pending events are executed earlier than that, e.g., |
1210 * due to a second event in the current cycle, the timer is canceled again. | 697 * due to a second event in the current cycle, the timer is canceled again. |
1211 */ | 698 */ |
1212 Timer scheduleTimer = null; | 699 int _state = _STATE_UNSCHEDULED; |
1213 | 700 |
1214 bool get isEmpty; | 701 bool get isEmpty; |
1215 | 702 |
1216 bool get isScheduled => scheduleTimer != null; | 703 bool get isScheduled => _state == _STATE_SCHEDULED; |
704 bool get _eventScheduled => _state >= _STATE_SCHEDULED; | |
1217 | 705 |
1218 void schedule(_StreamImpl stream) { | 706 /** |
707 * Schedule an event to run later. | |
708 * | |
709 * If called more than once, it should be called with the same dispatch as | |
710 * argument each time. It may reuse an earlier argument in some cases. | |
711 */ | |
712 void schedule(_EventDispatch dispatch) { | |
1219 if (isScheduled) return; | 713 if (isScheduled) return; |
1220 scheduleTimer = new Timer(Duration.ZERO, () { | 714 assert(!isEmpty); |
1221 scheduleTimer = null; | 715 if (_eventScheduled) { |
1222 stream._handlePendingEvents(); | 716 assert(_state == _STATE_CANCELLED); |
717 _state = _STATE_SCHEDULED; | |
718 return; | |
719 } | |
720 runAsync(() { | |
721 int oldState = _state; | |
722 _state = _STATE_UNSCHEDULED; | |
723 if (oldState == _STATE_CANCELLED) return; | |
724 handleNext(dispatch); | |
1223 }); | 725 }); |
726 _state = _STATE_SCHEDULED; | |
1224 } | 727 } |
1225 | 728 |
1226 void cancelSchedule() { | 729 void cancelSchedule() { |
1227 assert(isScheduled); | 730 if (isScheduled) _state = _STATE_CANCELLED; |
1228 scheduleTimer.cancel(); | |
1229 scheduleTimer = null; | |
1230 } | 731 } |
1231 | 732 |
1232 void handleNext(_StreamImpl stream); | 733 void handleNext(_EventDispatch dispatch); |
734 | |
735 /** Throw away any pending events and cancel scheduled events. */ | |
736 void clear(); | |
1233 } | 737 } |
1234 | 738 |
1235 | 739 |
1236 /** Class holding pending events for a [_StreamImpl]. */ | 740 /** Class holding pending events for a [_StreamImpl]. */ |
1237 class _StreamImplEvents extends _PendingEvents { | 741 class _StreamImplEvents extends _PendingEvents { |
1238 /// Single linked list of [_DelayedEvent] objects. | 742 /// Single linked list of [_DelayedEvent] objects. |
1239 _DelayedEvent firstPendingEvent = null; | 743 _DelayedEvent firstPendingEvent = null; |
1240 /// Last element in the list of pending events. New events are added after it. | 744 /// Last element in the list of pending events. New events are added after it. |
1241 _DelayedEvent lastPendingEvent = null; | 745 _DelayedEvent lastPendingEvent = null; |
1242 | 746 |
1243 bool get isEmpty => lastPendingEvent == null; | 747 bool get isEmpty => lastPendingEvent == null; |
1244 | 748 |
1245 bool get isScheduled => scheduleTimer != null; | |
1246 | |
1247 void add(_DelayedEvent event) { | 749 void add(_DelayedEvent event) { |
1248 if (lastPendingEvent == null) { | 750 if (lastPendingEvent == null) { |
1249 firstPendingEvent = lastPendingEvent = event; | 751 firstPendingEvent = lastPendingEvent = event; |
1250 } else { | 752 } else { |
1251 lastPendingEvent = lastPendingEvent.next = event; | 753 lastPendingEvent = lastPendingEvent.next = event; |
1252 } | 754 } |
1253 } | 755 } |
1254 | 756 |
1255 void handleNext(_StreamImpl stream) { | 757 void handleNext(_EventDispatch dispatch) { |
1256 assert(!isScheduled); | 758 assert(!isScheduled); |
1257 _DelayedEvent event = firstPendingEvent; | 759 _DelayedEvent event = firstPendingEvent; |
1258 firstPendingEvent = event.next; | 760 firstPendingEvent = event.next; |
1259 if (firstPendingEvent == null) { | 761 if (firstPendingEvent == null) { |
1260 lastPendingEvent = null; | 762 lastPendingEvent = null; |
1261 } | 763 } |
1262 event.perform(stream); | 764 event.perform(dispatch); |
1263 } | 765 } |
1264 } | 766 |
1265 | 767 void clear() { |
1266 | 768 if (isScheduled) cancelSchedule(); |
1267 class _DoneSubscription<T> implements StreamSubscription<T> { | 769 firstPendingEvent = lastPendingEvent = null; |
1268 _DoneHandler _handler; | 770 } |
1269 Timer _timer; | 771 } |
1270 int _pauseCount = 0; | 772 |
1271 | 773 class _MultiplexerLinkedList { |
1272 _DoneSubscription(this._handler) { | 774 _MultiplexerLinkedList _next; |
1273 _delayDone(); | 775 _MultiplexerLinkedList _previous; |
1274 } | 776 |
1275 | 777 void _unlink() { |
1276 void _delayDone() { | 778 _previous._next = _next; |
1277 assert(_timer == null && _pauseCount == 0); | 779 _next._previous = _previous; |
1278 _timer = new Timer(Duration.ZERO, () { | 780 _next = _previous = this; |
1279 if (_handler != null) _handler(); | 781 } |
1280 }); | 782 |
1281 } | 783 void _insertBefore(_MultiplexerLinkedList newNext) { |
1282 | 784 _MultiplexerLinkedList newPrevious = newNext._previous; |
1283 bool get _isComplete => _timer == null && _pauseCount == 0; | 785 newPrevious._next = this; |
1284 | 786 newNext._previous = _previous; |
1285 void onData(void handleAction(T value)) {} | 787 _previous._next = newNext; |
1286 | 788 _previous = newPrevious; |
1287 void onError(void handleError(error)) {} | 789 } |
1288 | 790 } |
1289 void onDone(void handleDone()) { | 791 |
1290 _handler = handleDone; | 792 // TODO(lrn): Change "implements" to "with" when automatic mixin constructors |
1291 } | 793 // are implemented. |
1292 | 794 class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> |
floitsch
2013/05/22 16:26:29
Add comment what this class does.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
1293 void pause([Future signal]) { | 795 implements _MultiplexerLinkedList { |
1294 if (_isComplete) return; | 796 static const int _STATE_NOT_LISTENING = 0; |
1295 if (_timer != null) { | 797 // Bit that alternates between event firings. If bit matches the one currently |
1296 _timer.cancel(); | 798 // firing, the subscription will not be notified. |
1297 _timer = null; | 799 static const int _STATE_EVENT_ID_BIT = 1; |
1298 } | 800 static const int _STATE_LISTENING = 2; |
1299 _pauseCount++; | 801 static const int _STATE_IS_FIRING = 4; |
1300 if (signal != null) signal.whenComplete(resume); | 802 static const int _STATE_REMOVE_AFTER_FIRING = 8; |
1301 } | 803 |
1302 | 804 // Firing state. |
1303 void resume() { | 805 int _multiplexState; |
1304 if (_isComplete) return; | 806 |
1305 if (_pauseCount == 0) return; | 807 _SingleStreamMultiplexer _source; |
1306 _pauseCount--; | 808 |
1307 if (_pauseCount == 0) { | 809 _MultiplexerSubscription(this._source, |
1308 _delayDone(); | 810 void onData(T data), |
1309 } | 811 void onError(Object error), |
1310 } | 812 void onDone(), |
1311 | 813 bool cancelOnError, |
1312 bool get isPaused => _pauseCount > 0; | 814 int nextEventId) |
1313 | 815 : _multiplexState = _STATE_LISTENING | nextEventId, |
1314 void cancel() { | 816 super(onData, onError, onDone, cancelOnError, null) { |
1315 if (_isComplete) return; | 817 _next = _previous = this; |
1316 if (_timer != null) { | 818 } |
1317 _timer.cancel(); | 819 |
1318 _timer = null; | 820 // Mixin workaround. |
1319 } | 821 _MultiplexerLinkedList _next; |
1320 _pauseCount = 0; | 822 _MultiplexerLinkedList _previous; |
1321 } | 823 |
1322 | 824 void _unlink() { |
1323 Future asFuture([var futureValue]) { | 825 _previous._next = _next; |
1324 // TODO(floitsch): share more code. | 826 _next._previous = _previous; |
1325 _FutureImpl<T> result = new _FutureImpl<T>(); | 827 _next = _previous = this; |
1326 | 828 } |
1327 // Overwrite the onDone and onError handlers. | 829 |
1328 onDone(() { result._setValue(futureValue); }); | 830 void _insertBefore(_MultiplexerLinkedList newNext) { |
1329 onError((error) { | 831 _MultiplexerLinkedList newPrevious = newNext._previous; |
1330 cancel(); | 832 newPrevious._next = this; |
1331 result._setError(error); | 833 newNext._previous = _previous; |
1332 }); | 834 _previous._next = newNext; |
1333 | 835 _previous = newPrevious; |
1334 return result; | 836 } |
1335 } | 837 // End mixin. |
1336 } | 838 |
1337 | 839 bool get _isListening => _multiplexState >= _STATE_LISTENING; |
1338 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { | 840 bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; |
841 bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; | |
842 int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; | |
843 | |
844 void _setRemoveAfterFiring() { | |
845 assert(_isFiring); | |
846 _multiplexState |= _STATE_REMOVE_AFTER_FIRING; | |
847 } | |
848 | |
849 void _startFiring() { | |
850 assert(!_isFiring); | |
851 _multiplexState |= _STATE_IS_FIRING; | |
852 } | |
853 | |
854 /// Marks listener as no longer firing, and toggles its event id. | |
855 void _endFiring() { | |
856 assert(_isFiring); | |
857 _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); | |
858 } | |
859 | |
860 void _setNotListening() { | |
861 assert(_isListening); | |
862 _multiplexState = _STATE_NOT_LISTENING; | |
863 } | |
864 | |
865 void _onCancel() { | |
866 assert(_isListening); | |
867 _source._removeListener(this); | |
868 } | |
869 } | |
870 | |
871 // TODO(lrn): change "implements" to "with" when the VM supports it. | |
872 class _SingleStreamMultiplexer<T> extends Stream<T> | |
floitsch
2013/05/22 16:26:29
Add comment.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
873 implements _MultiplexerLinkedList, | |
874 _EventDispatch<T> { | |
1339 final Stream<T> _source; | 875 final Stream<T> _source; |
1340 StreamSubscription<T> _subscription; | 876 StreamSubscription<T> _subscription; |
1341 | 877 // Alternates between zero and one for each event fired. |
1342 _SingleStreamMultiplexer(this._source); | 878 // Listeners are initialized with the next event's id, and will |
1343 | 879 // only be notified if they match the event being fired. |
1344 void _callOnPauseStateChange() { | 880 // That way listeners added during event firing will not receive |
1345 if (_isPaused) { | 881 // the current event. |
1346 if (_subscription != null) { | 882 int _eventId = 0; |
1347 _subscription.pause(); | 883 |
884 bool _isFiring = false; | |
885 | |
886 // Remember events added while firing. | |
887 _StreamImplEvents _pending; | |
888 | |
889 _SingleStreamMultiplexer(this._source) { | |
890 _next = _previous = this; | |
891 } | |
892 | |
893 bool get _hasPending => _pending != null && !_pending.isEmpty; | |
894 | |
895 // Should be mixin. | |
896 _MultiplexerLinkedList _next; | |
897 _MultiplexerLinkedList _previous; | |
898 | |
899 void _unlink() { | |
900 _previous._next = _next; | |
901 _next._previous = _previous; | |
902 _next = _previous = this; | |
903 } | |
904 | |
905 void _insertBefore(_MultiplexerLinkedList newNext) { | |
906 _MultiplexerLinkedList newPrevious = newNext._previous; | |
907 newPrevious._next = this; | |
908 newNext._previous = _previous; | |
909 _previous._next = newNext; | |
910 _previous = newPrevious; | |
911 } | |
912 // End of mixin. | |
913 | |
914 StreamSubscription<T> listen(void onData(T data), | |
915 { void onError(Object error), | |
916 void onDone(), | |
917 bool cancelOnError }) { | |
918 if (onData == null) onData = _nullDataHandler; | |
919 if (onError == null) onError = _nullErrorHandler; | |
920 if (onDone == null) onDone = _nullDoneHandler; | |
921 cancelOnError = identical(true, cancelOnError); | |
922 _MultiplexerSubscription subscription = | |
923 new _MultiplexerSubscription(this, onData, onError, onDone, | |
924 cancelOnError, _eventId); | |
925 if (_subscription == null) { | |
926 _subscription = _source.listen(_add, onError: _addError, onDone: _close); | |
927 } | |
928 subscription._insertBefore(this); | |
929 return subscription; | |
930 } | |
931 | |
932 /** Called by [_MultiplexerSubscription.remove]. */ | |
933 void _removeListener(_MultiplexerSubscription listener) { | |
934 if (listener._isFiring) { | |
935 listener._setRemoveAfterFiring(); | |
936 } else { | |
937 _unlinkListener(listener); | |
938 } | |
939 } | |
940 | |
941 /** Remove a listener and close the subscription after the last one. */ | |
942 void _unlinkListener(_MultiplexerSubscription listener) { | |
943 listener._setNotListening(); | |
944 listener._unlink(); | |
945 if (identical(_next, this)) { | |
946 // Last listener removed. | |
947 _cancel(); | |
948 } | |
949 } | |
950 | |
951 void _cancel() { | |
952 StreamSubscription subscription = _subscription; | |
953 _subscription = null; | |
954 subscription.cancel(); | |
955 if (_pending != null) _pending.cancelSchedule(); | |
956 } | |
957 | |
958 void _forEachListener(void action(_MultiplexerSubscription listener)) { | |
959 int eventId = _eventId; | |
960 _eventId ^= 1; | |
961 _isFiring = true; | |
962 _MultiplexerLinkedList entry = _next; | |
963 // Call each listener in order. A listener can be removed during any | |
964 // other listener's event. During its own event it will only be marked | |
965 // as "to be removed", and it will be handled after the event is done. | |
966 while (!identical(entry, this)) { | |
967 _MultiplexerSubscription listener = entry; | |
968 if (listener._eventId == eventId) { | |
969 listener._startFiring(); | |
970 action(listener); | |
971 listener._endFiring(); // Also toggles the event id. | |
1348 } | 972 } |
973 entry = listener._next; | |
974 if (listener._removeAfterFiring) { | |
975 _unlinkListener(listener); | |
976 } | |
977 } | |
978 _isFiring = false; | |
979 } | |
980 | |
981 void _add(T data) { | |
floitsch
2013/05/22 16:26:29
Why is this not handled by the bufferingStreamSubs
Lasse Reichstein Nielsen
2013/05/24 06:02:49
This isn't a StreamSubscription - it's a Stream, m
| |
982 if (_isFiring || _hasPending) { | |
983 _StreamImplEvents pending = _pending; | |
984 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
985 pending.add(new _DelayedData(data)); | |
1349 } else { | 986 } else { |
1350 if (_subscription != null) { | 987 _sendData(data); |
1351 _subscription.resume(); | 988 } |
1352 } | 989 } |
1353 } | 990 |
1354 } | 991 void _addError(Object error) { |
1355 | 992 if (_isFiring || _hasPending) { |
floitsch
2013/05/22 16:26:29
ditto.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ditto too.
| |
1356 /** | 993 _StreamImplEvents pending = _pending; |
1357 * Subscribe or unsubscribe on [_source] depending on whether | 994 if (pending == null) pending = _pending = new _StreamImplEvents(); |
1358 * [_stream] has subscribers. | 995 pending.add(new _DelayedError(error)); |
1359 */ | |
1360 void _onSubscriptionStateChange() { | |
1361 if (_hasListener) { | |
1362 assert(_subscription == null); | |
1363 _subscription = _source.listen(this._add, | |
1364 onError: this._addError, | |
1365 onDone: this._close); | |
1366 } else { | 996 } else { |
1367 // TODO(lrn): Check why this can happen. | 997 _sendError(error); |
1368 if (_subscription == null) return; | 998 } |
1369 _subscription.cancel(); | 999 } |
1370 _subscription = null; | 1000 |
1371 } | 1001 void _close() { |
1372 } | 1002 if (_isFiring || _hasPending) { |
floitsch
2013/05/22 16:26:29
ditto.
| |
1373 } | 1003 _StreamImplEvents pending = _pending; |
1004 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
1005 pending.add(const _DelayedDone()); | |
1006 } else { | |
1007 _sendDone(); | |
1008 } | |
1009 } | |
1010 | |
1011 void _sendData(T data) { | |
1012 _forEachListener((_MultiplexerSubscription listener) { | |
1013 listener._add(data); | |
1014 }); | |
1015 if (_hasPending) { | |
1016 _pending.schedule(this); | |
1017 } | |
1018 } | |
1019 | |
1020 void _sendError(Object error) { | |
1021 _forEachListener((_MultiplexerSubscription listener) { | |
1022 listener._addError(error); | |
1023 }); | |
1024 if (_hasPending) { | |
1025 _pending.schedule(this); | |
1026 } | |
1027 } | |
1028 | |
1029 void _sendDone() { | |
1030 _forEachListener((_MultiplexerSubscription listener) { | |
1031 listener._setRemoveAfterFiring(); | |
1032 listener._close(); | |
1033 }); | |
1034 } | |
1035 } | |
1036 | |
1037 | |
1038 /** | |
1039 * Simple implementation of [StreamIterator]. | |
1040 */ | |
1041 class _StreamIteratorImpl<T> implements StreamIterator<T> { | |
1042 // TODO(lrn): Keep a one (or two) element buffer to avoid pausing when not | |
floitsch
2013/05/22 16:26:29
I guess the ideal solution would be if the constru
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I don't want this to be about buffering, it should
| |
1043 // necessary. | |
1044 StreamSubscription _subscription; | |
1045 _FutureImpl<bool> _hasNext; | |
1046 T _current; | |
1047 | |
1048 _StreamIteratorImpl(final Stream<T> stream) { | |
1049 _subscription = stream.listen(_onData, | |
1050 onError: _onError, | |
1051 onDone: _onDone, | |
1052 cancelOnError: true); | |
1053 _subscription.pause(); | |
1054 } | |
1055 | |
1056 Future<bool> moveNext() { | |
1057 if (_hasNext != null) throw new StateError("Already waiting for next."); | |
1058 if (_subscription == null) { | |
1059 return new _FutureImpl<bool>.immediate(false); | |
1060 } | |
1061 _current = null; | |
1062 _subscription.resume(); | |
1063 _hasNext = new _FutureImpl<bool>(); | |
1064 return _hasNext; | |
1065 } | |
1066 | |
1067 T get current => _current; | |
1068 | |
1069 void cancel() { | |
1070 StreamSubscription subscription = _subscription; | |
1071 _subscription = null; | |
1072 _current = null; | |
1073 subscription.cancel(); | |
1074 if (_hasNext != null) { | |
1075 _FutureImpl<bool> hasNext = _hasNext; | |
1076 _hasNext = null; | |
1077 hasNext._setValue(false); | |
1078 } | |
1079 } | |
1080 | |
1081 void _onData(T data) { | |
1082 assert(_hasNext != null); | |
1083 _FutureImpl<bool> hasNext = _hasNext; | |
1084 _hasNext = null; | |
1085 _current = data; | |
1086 _subscription.pause(); | |
1087 hasNext._setValue(true); | |
1088 } | |
1089 | |
1090 void _onError(Object error) { | |
1091 assert(_hasNext != null); | |
1092 _FutureImpl<bool> hasNext = _hasNext; | |
1093 _hasNext = null; | |
1094 _subscription = null; | |
1095 hasNext._setError(error); | |
1096 // We have cancelOnError: true, so the subscription is cancelled. | |
floitsch
2013/05/22 16:26:29
canceled.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
| |
1097 } | |
1098 | |
1099 void _onDone() { | |
1100 assert(_hasNext != null); | |
1101 _FutureImpl<bool> hasNext = _hasNext; | |
1102 _hasNext = null; | |
1103 _subscription = null; | |
1104 hasNext._setValue(false); | |
1105 } | |
1106 } | |
OLD | NEW |