Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed comments. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // States shared by single/multi stream implementations.
8
9 // Completion state of the stream.
10 /// Initial and default state where the stream can receive and send events.
11 const int _STREAM_OPEN = 0;
12 /// The stream has received a request to complete, but hasn't done so yet.
13 /// No further events can be added to the stream.
14 const int _STREAM_CLOSED = 1;
15 /// The stream has completed and will no longer receive or send events.
16 /// Also counts as closed. The stream must not be paused when it's completed.
17 /// Always used in conjunction with [_STREAM_CLOSED].
18 const int _STREAM_COMPLETE = 2;
19
20 /// Bit that alternates between events, and listeners are updated to the
21 /// current value when they are notified of the event.
22 const int _STREAM_EVENT_ID = 4;
23 const int _STREAM_EVENT_ID_SHIFT = 2;
24
25 // The activity state of the stream: What is it currently doing.
26 /// Bit set while firing and clear while not.
27 const int _STREAM_FIRING = 8;
28 /// Bit set while calling a pause-state or subscription-state change callback.
29 const int _STREAM_CALLBACK = 16;
30
31 // The pause state of the stream.
32 /// Bit set when resuming with pending events. Cleared after all pending events
33 /// have been transmitted. Means that the controller still considers the
34 /// stream paused, even if the listener doesn't.
35 const int _STREAM_PENDING_RESUME = 32;
36 /// The count of times a stream has paused is stored in the
37 /// state, shifted by this amount.
38 const int _STREAM_PAUSE_COUNT_SHIFT = 6;
39
40 // States for listeners.
41
42 /// The listener is currently not subscribed to its source stream.
43 const int _LISTENER_UNSUBSCRIBED = 0;
44 /// The listener is actively subscribed to its source stream.
45 const int _LISTENER_SUBSCRIBED = 1;
46 /// The listener is subscribed until it has been notified of the current event.
47 /// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
49
50 /// Bit that contains the last sent event's "id bit".
51 const int _LISTENER_EVENT_ID = 4;
52 const int _LISTENER_EVENT_ID_SHIFT = 2;
53
54 /// The count of times a listener has paused is stored in the
55 /// state, shifted by this amount.
56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
57
58 /** Throws the given error in the next cycle. */ 7 /** Throws the given error in the next cycle. */
59 _throwDelayed(var error, [Object stackTrace]) { 8 _throwDelayed(var error, [Object stackTrace]) {
60 // We are going to reach the top-level here, but there might be a global 9 // We are going to reach the top-level here, but there might be a global
61 // exception handler. This means that we shouldn't print the stack trace. 10 // exception handler. This means that we shouldn't print the stack trace.
62 // TODO(floitsch): Find better solution that doesn't print the stack trace 11 // TODO(floitsch): Find better solution that doesn't print the stack trace
63 // if there is a global exception handler. 12 // if there is a global exception handler.
64 runAsync(() { 13 runAsync(() {
65 if (stackTrace != null) print(stackTrace); 14 if (stackTrace != null) print(stackTrace);
66 var trace = getAttachedStackTrace(error); 15 var trace = getAttachedStackTrace(error);
67 if (trace != null && trace != stackTrace) print(trace); 16 if (trace != null && trace != stackTrace) print(trace);
68 throw error; 17 throw error;
69 }); 18 });
70 } 19 }
71 20
21 /** Abstract and private interface for a place to put events. */
22 abstract class _EventSink<T> {
23 void _add(T data);
24 void _addError(Object error);
25 void _close();
26 }
27
28 /**
29 * Abstract and private interface for a place to send events.
30 *
31 * Used by event buffering to finally dispatch the pending event, where
32 * [_EventSink] is where the event first enters the stream subscription,
33 * and may yet be buffered.
34 */
35 abstract class _EventDispatch<T> {
36 void _sendData(T data);
37 void _sendError(Object error);
38 void _sendDone();
39 }
40
41 /**
42 * Default implementation of stream subscription of buffering events.
43 *
44 * The only public methods are those of [StreamSubscription], so instances of
45 * [_BufferingStreamSubscription] can be returned directly as a
46 * [StreamSubscription] without exposing internal functionality.
47 *
48 * The [StreamController] is a public facing version of [Stream] and this class,
49 * with some methods made public.
50 *
51 * The user interface of [_BufferingStreamSubscription] are the following
52 * methods:
53 * * [_add]: Add a data event to the stream.
54 * * [_addError]: Add an error event to the stream.
55 * * [_close]: Request to close the stream.
56 * * [_onCancel]: Called when the subscription will provide no more events,
57 * either due to being actively canceled, or after sending a done event.
58 * * [_onPause]: Called when the subscription wants the event source to pause.
59 * * [_onResume]: Called when allowing new events after a pause.
60 * The user should not add new events when the subscription requests a paused,
61 * but if it happens anyway, the subscription will enqueue the events just as
62 * when new events arrive while still firing an old event.
63 */
64 class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
65 _EventSink<T>,
66 _EventDispatch<T> {
67 /** The `cancelOnError` flag from the `listen` call. */
68 static const int _STATE_CANCEL_ON_ERROR = 1;
69 /**
70 * Whether the "done" event has been received.
71 * No further events are accepted after this.
72 */
73 static const int _STATE_CLOSED = 2;
74 /**
75 * Set if the input has been asked not to send events.
76 *
77 * This is not the same as being paused, since the input will remain paused
78 * after a call to [resume] if there are pending events.
79 */
80 static const int _STATE_INPUT_PAUSED = 4;
81 /**
82 * Whether the subscription has been canceled.
83 *
84 * Set by calling [cancel], or by handling a "done" event, or an "error" event
85 * when `cancelOnError` is true.
86 */
87 static const int _STATE_CANCELED = 8;
88 static const int _STATE_IN_CALLBACK = 16;
89 static const int _STATE_HAS_PENDING = 32;
90 static const int _STATE_PAUSE_COUNT = 64;
91 static const int _STATE_PAUSE_COUNT_SHIFT = 6;
92
93 /* Event handlers provided in constructor. */
94 /* TODO(7733): Fix Function->_DataHandler<T> when dart2js understands
95 * parameterized function types. */
96 Function _onData;
97 _ErrorHandler _onError;
98 _DoneHandler _onDone;
99
100 /** Bit vector based on state-constants above. */
101 int _state;
102
103 /**
104 * Queue of pending events.
105 *
106 * Is created when necessary, or set in constructor for preconfigured events.
107 */
108 _PendingEvents _pending;
109
110 _BufferingStreamSubscription(this._onData,
111 this._onError,
112 this._onDone,
113 bool cancelOnError)
114 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
115 assert(_onData != null);
116 assert(_onError != null);
117 assert(_onDone != null);
118 }
119
120 /**
121 * Sets the subscription's pending events object.
122 *
123 * This can only be done once. The pending events object is used for the
124 * rest of the subscription's life cycle.
125 */
126 void _setPendingEvents(_PendingEvents pendingEvents) {
127 assert(_pending == null);
128 if (pendingEvents == null) return;
129 _pending = pendingEvents;
130 if (!pendingEvents.isEmpty) {
131 _state |= _STATE_HAS_PENDING;
132 _pending.schedule(this);
133 }
134 }
135
136 /**
137 * Extracts the pending events from a canceled stream.
138 *
139 * This can only be done during the [_onCancel] method call. After that,
140 * any remaining pending events will be cleared.
141 */
142 _PendingEvents _extractPending() {
143 assert(_isCanceled);
144 _PendingEvents events = _pending;
145 _pending = null;
146 return events;
147 }
148
149 // StreamSubscription interface.
150
151 void onData(void handleData(T event)) {
152 if (handleData == null) handleData = _nullDataHandler;
153 _onData = handleData;
154 }
155
156 void onError(void handleError(error)) {
157 if (handleError == null) handleError = _nullErrorHandler;
158 _onError = handleError;
159 }
160
161 void onDone(void handleDone()) {
162 if (handleDone == null) handleDone = _nullDoneHandler;
163 _onDone = handleDone;
164 }
165
166 void pause([Future resumeSignal]) {
167 if (_isCanceled) return;
168 bool wasPaused = _isPaused;
169 bool wasInputPaused = _isInputPaused;
170 // Increment pause count and mark input paused (if it isn't already).
171 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
172 if (resumeSignal != null) resumeSignal.whenComplete(resume);
173 if (!wasPaused && _pending != null) _pending.cancelSchedule();
174 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
175 }
176
177 void resume() {
178 if (_isCanceled) return;
179 if (_isPaused) {
180 _decrementPauseCount();
181 if (!_isPaused) {
182 if (_hasPending && !_pending.isEmpty) {
183 // Input is still paused.
184 _pending.schedule(this);
185 } else {
186 assert(_mayResumeInput);
187 _state &= ~_STATE_INPUT_PAUSED;
188 if (!_inCallback) _guardCallback(_onResume);
189 }
190 }
191 }
192 }
193
194 void cancel() {
195 if (_isCanceled) return;
196 _cancel();
197 if (!_inCallback) {
198 // otherwise checkState will be called after firing or callback completes.
199 _state |= _STATE_IN_CALLBACK;
200 _onCancel();
201 _pending = null;
202 _state &= ~_STATE_IN_CALLBACK;
203 }
204 }
205
206 Future asFuture([var futureValue]) {
207 _FutureImpl<T> result = new _FutureImpl<T>();
208
209 // Overwrite the onDone and onError handlers.
210 _onDone = () { result._setValue(futureValue); };
211 _onError = (error) {
212 cancel();
213 result._setError(error);
214 };
215
216 return result;
217 }
218
219 // State management.
220
221 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
222 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
223 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
224 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
225 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
226 bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
227 bool get _canFire => _state < _STATE_IN_CALLBACK;
228 bool get _mayResumeInput =>
229 !_isPaused && (_pending == null || _pending.isEmpty);
230 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
231
232 bool get isPaused => _isPaused;
233
234 void _cancel() {
235 _state |= _STATE_CANCELED;
236 if (_hasPending) {
237 _pending.cancelSchedule();
238 }
239 }
240
241 /**
242 * Increment the pause count.
243 *
244 * Also marks input as paused.
245 */
246 void _incrementPauseCount() {
247 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
248 }
249
250 /**
251 * Decrements the pause count.
252 *
253 * Does not automatically unpause the input (call [_onResume]) when
254 * the pause count reaches zero. This is handled elsewhere, and only
255 * if there are no pending events buffered.
256 */
257 void _decrementPauseCount() {
258 assert(_isPaused);
259 _state -= _STATE_PAUSE_COUNT;
260 }
261
262 // _EventSink interface.
263
264 void _add(T data) {
265 assert(!_isClosed);
266 if (_isCanceled) return;
267 if (_canFire) {
268 _sendData(data);
269 } else {
270 _addPending(new _DelayedData(data));
271 }
272 }
273
274 void _addError(Object error) {
275 if (_isCanceled) return;
276 if (_canFire) {
277 _sendError(error); // Reports cancel after sending.
278 } else {
279 _addPending(new _DelayedError(error));
280 }
281 }
282
283 void _close() {
284 assert(!_isClosed);
285 if (_isCanceled) return;
286 _state |= _STATE_CLOSED;
287 if (_canFire) {
288 _sendDone();
289 } else {
290 _addPending(const _DelayedDone());
291 }
292 }
293
294 // Hooks called when the input is paused, unpaused or canceled.
295 // These must not throw. If overwritten to call user code, include suitable
296 // try/catch wrapping and send any errors to [_throwDelayed].
297 void _onPause() {
298 assert(_isInputPaused);
299 }
300
301 void _onResume() {
302 assert(!_isInputPaused);
303 }
304
305 void _onCancel() {
306 assert(_isCanceled);
307 }
308
309 // Handle pending events.
310
311 /**
312 * Add a pending event.
313 *
314 * If the subscription is not paused, this also schedules a firing
315 * of pending events later (if necessary).
316 */
317 void _addPending(_DelayedEvent event) {
318 _StreamImplEvents pending = _pending;
319 if (_pending == null) pending = _pending = new _StreamImplEvents();
320 pending.add(event);
321 if (!_hasPending) {
322 _state |= _STATE_HAS_PENDING;
323 if (!_isPaused) {
324 _pending.schedule(this);
325 }
326 }
327 }
328
329 /* _EventDispatch interface. */
330
331 void _sendData(T data) {
332 assert(!_isCanceled);
333 assert(!_isPaused);
334 assert(!_inCallback);
335 bool wasInputPaused = _isInputPaused;
336 _state |= _STATE_IN_CALLBACK;
337 try {
338 _onData(data);
339 } catch (e, s) {
340 _throwDelayed(e, s);
341 }
342 _state &= ~_STATE_IN_CALLBACK;
343 _checkState(wasInputPaused);
344 }
345
346 void _sendError(var error) {
347 assert(!_isCanceled);
348 assert(!_isPaused);
349 assert(!_inCallback);
350 bool wasInputPaused = _isInputPaused;
351 _state |= _STATE_IN_CALLBACK;
352 try {
353 _onError(error);
354 } catch (e, s) {
355 _throwDelayed(e, s);
356 }
357 _state &= ~_STATE_IN_CALLBACK;
358 if (_cancelOnError) {
359 _cancel();
360 }
361 _checkState(wasInputPaused);
362 }
363
364 void _sendDone() {
365 assert(!_isCanceled);
366 assert(!_isPaused);
367 assert(!_inCallback);
368 _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
369 try {
370 _onDone();
371 } catch (e, s) {
372 _throwDelayed(e, s);
373 }
374 _onCancel(); // No checkState after cancel, it is always the last event.
375 _state &= ~_STATE_IN_CALLBACK;
376 }
377
378 /**
379 * Call a hook function.
380 *
381 * The call is properly wrapped in code to avoid other callbacks
382 * during the call, and it checks for state changes after the call
383 * that should cause further callbacks.
384 */
385 void _guardCallback(callback) {
386 assert(!_inCallback);
387 bool wasInputPaused = _isInputPaused;
388 _state |= _STATE_IN_CALLBACK;
389 callback();
390 _state &= ~_STATE_IN_CALLBACK;
391 _checkState(wasInputPaused);
392 }
393
394 /**
395 * Check if the input needs to be informed of state changes.
396 *
397 * State changes are pausing, resuming and canceling.
398 *
399 * After ing, no further callbacks will happen.
floitsch 2013/05/27 08:24:43 bad search/replace.
Lasse Reichstein Nielsen 2013/05/27 08:33:44 Done.
400 *
401 * The cancel callback is called after a user cancel, or after
402 * the final done event is sent.
403 */
404 void _checkState(bool wasInputPaused) {
405 assert(!_inCallback);
406 if (_hasPending && _pending.isEmpty) {
407 _state &= ~_STATE_HAS_PENDING;
408 if (_isInputPaused && _mayResumeInput) {
409 _state &= ~_STATE_INPUT_PAUSED;
410 }
411 }
412 // If the state changes during a callback, we immediately
413 // make a new state-change callback. Loop until the state didn't change.
414 while (true) {
415 if (_isCanceled) {
416 _onCancel();
417 _pending = null;
418 return;
419 }
420 bool isInputPaused = _isInputPaused;
421 if (wasInputPaused == isInputPaused) break;
422 _state ^= _STATE_IN_CALLBACK;
423 if (isInputPaused) {
424 _onPause();
425 } else {
426 _onResume();
427 }
428 _state &= ~_STATE_IN_CALLBACK;
429 wasInputPaused = isInputPaused;
430 }
431 if (_hasPending && !_isPaused) {
432 _pending.schedule(this);
433 }
434 }
435 }
72 436
73 // ------------------------------------------------------------------- 437 // -------------------------------------------------------------------
74 // Common base class for single and multi-subscription streams. 438 // Common base class for single and multi-subscription streams.
75 // ------------------------------------------------------------------- 439 // -------------------------------------------------------------------
76 abstract class _StreamImpl<T> extends Stream<T> { 440 abstract class _StreamImpl<T> extends Stream<T> {
77 /** Current state of the stream. */
78 int _state = _STREAM_OPEN;
79
80 /**
81 * List of pending events.
82 *
83 * If events are added to the stream (using [_add], [_addError] or [_done])
84 * while the stream is paused, or while another event is firing, events will
85 * stored here.
86 * Also supports scheduling the events for later execution.
87 */
88 _PendingEvents _pendingEvents;
89
90 // ------------------------------------------------------------------ 441 // ------------------------------------------------------------------
91 // Stream interface. 442 // Stream interface.
92 443
93 StreamSubscription<T> listen(void onData(T data), 444 StreamSubscription<T> listen(void onData(T data),
94 { void onError(error), 445 { void onError(error),
95 void onDone(), 446 void onDone(),
96 bool cancelOnError }) { 447 bool cancelOnError }) {
97 if (_isComplete) {
98 return new _DoneSubscription(onDone);
99 }
100 if (onData == null) onData = _nullDataHandler; 448 if (onData == null) onData = _nullDataHandler;
101 if (onError == null) onError = _nullErrorHandler; 449 if (onError == null) onError = _nullErrorHandler;
102 if (onDone == null) onDone = _nullDoneHandler; 450 if (onDone == null) onDone = _nullDoneHandler;
103 cancelOnError = identical(true, cancelOnError); 451 cancelOnError = identical(true, cancelOnError);
104 _StreamSubscriptionImpl subscription = 452 StreamSubscription subscription =
105 _createSubscription(onData, onError, onDone, cancelOnError); 453 _createSubscription(onData, onError, onDone, cancelOnError);
106 _addListener(subscription); 454 _onListen(subscription);
107 return subscription; 455 return subscription;
108 } 456 }
109 457
110 // ------------------------------------------------------------------
111 // EventSink interface-like methods for sending events into the stream.
112 // It's the responsibility of the caller to ensure that the stream is not
113 // paused when adding events. If the stream is paused, the events will be
114 // queued, but it's better to not send events at all.
115
116 /**
117 * Send or queue a data event.
118 */
119 void _add(T value) {
120 if (_isClosed) throw new StateError("Sending on closed stream");
121 if (!_mayFireState) {
122 // Not the time to send events.
123 _addPendingEvent(new _DelayedData<T>(value));
124 return;
125 }
126 if (_hasPendingEvent) {
127 _addPendingEvent(new _DelayedData<T>(value));
128 } else {
129 _sendData(value);
130 }
131 _handlePendingEvents();
132 }
133
134 /**
135 * Send or enqueue an error event.
136 *
137 * If a subscription has requested to be unsubscribed on errors,
138 * it will be unsubscribed after receiving this event.
139 */
140 void _addError(error) {
141 if (_isClosed) throw new StateError("Sending on closed stream");
142 if (!_mayFireState) {
143 // Not the time to send events.
144 _addPendingEvent(new _DelayedError(error));
145 return;
146 }
147 if (_hasPendingEvent) {
148 _addPendingEvent(new _DelayedError(error));
149 } else {
150 _sendError(error);
151 }
152 _handlePendingEvents();
153 }
154
155 /**
156 * Send or enqueue a "done" message.
157 *
158 * The "done" message should be sent at most once by a stream, and it
159 * should be the last message sent.
160 */
161 void _close() {
162 if (_isClosed) return;
163 _state |= _STREAM_CLOSED;
164 if (!_mayFireState) {
165 // Not the time to send events.
166 _addPendingEvent(const _DelayedDone());
167 return;
168 }
169 if (_hasPendingEvent) {
170 _addPendingEvent(new _DelayedDone());
171 _handlePendingEvents();
172 } else {
173 _sendDone();
174 assert(_isComplete);
175 assert(!_hasPendingEvent);
176 }
177 }
178
179 // ------------------------------------------------------------------- 458 // -------------------------------------------------------------------
180 // Internal implementation.
181
182 // State predicates.
183
184 // Lifecycle state.
185 /** Whether the stream is in the default, open, state for events. */
186 bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0;
187
188 /** Whether the stream has been closed (a done event requested). */
189 bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
190
191 /** Whether the stream is completed. */
192 bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
193
194 // Pause state.
195
196 /** Whether one or more active subscribers have requested a pause. */
197 bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
198
199 /** How many times the stream has been paused. */
200 int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT;
201
202 /**
203 * Whether a controller thinks the stream is paused.
204 *
205 * When this changes, a pause-state change callback is performed.
206 *
207 * It may differ from [_isPaused] if there are pending events
208 * in the queue when the listeners resume. The controller won't
209 * be informed until all queued events have been fired.
210 */
211 bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME);
212
213 /** Whether we have a pending resume scheduled. */
214 bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0;
215
216
217 // Action state. If the stream makes a call-out to external code,
218 // this state tracks it and avoids reentrancy problems.
219
220 /** Whether the stream is not currently firing or calling a callback. */
221 bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0;
222
223 /** Whether we are currently executing a state-chance callback. */
224 bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0;
225
226 /** Whether we are currently firing an event. */
227 bool get _isFiring => (_state & _STREAM_FIRING) != 0;
228
229 /** Check whether the pending event queue is non-empty */
230 bool get _hasPendingEvent =>
231 _pendingEvents != null && !_pendingEvents.isEmpty;
232
233 /**
234 * The bit representing the current or last event fired.
235 *
236 * This bit matches a bit on listeners that have received the corresponding
237 * event. It is toggled for each new event being fired.
238 */
239 int get _currentEventIdBit =>
240 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
241
242 /** Whether there is currently a subscriber on this [Stream]. */
243 bool get _hasListener;
244
245
246 /** Whether the state bits allow firing. */
247 bool get _mayFireState {
248 // The state allows firing unless:
249 // - it's currently firing
250 // - it's currently in a callback
251 // - it's paused
252 const int mask =
253 _STREAM_FIRING |
254 _STREAM_CALLBACK |
255 ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1);
256 return (_state & mask) == 0;
257 }
258
259 // State modification.
260
261 /** Record an increases in the number of times the listener has paused. */
262 void _incrementPauseCount(_StreamListener<T> listener) {
263 listener._incrementPauseCount();
264 _state &= ~_STREAM_PENDING_RESUME;
265 _updatePauseCount(1);
266 }
267
268 /** Record a decrease in the number of times the listener has paused. */
269 void _decrementPauseCount(_StreamListener<T> listener) {
270 assert(_isPaused);
271 listener._decrementPauseCount();
272 _updatePauseCount(-1);
273 }
274
275 /** Update the stream's own pause count only. */
276 void _updatePauseCount(int by) {
277 int oldState = _state;
278 // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js
279 // converts the result of the left-shift to a positive number.
280 if (by >= 0) {
281 _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT);
282 } else {
283 _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT);
284 }
285 assert(_state >= 0);
286 assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) ==
287 (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by);
288 }
289
290 void _setClosed() {
291 assert(!_isClosed);
292 _state |= _STREAM_CLOSED;
293 }
294
295 void _setComplete() {
296 assert(_isClosed);
297 _state = _state |_STREAM_COMPLETE;
298 }
299
300 void _startFiring() {
301 assert(!_isFiring);
302 assert(!_isInCallback);
303 assert(_hasListener);
304 assert(!_isPaused);
305 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
306 // bit. All current subscribers will now have a _LISTENER_EVENT_ID
307 // that doesn't match _STREAM_EVENT_ID, and they will receive the
308 // event being fired.
309 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
310 }
311
312 void _endFiring(bool wasInputPaused) {
313 assert(_isFiring);
314 _state ^= _STREAM_FIRING;
315 // Had listeners, or we wouldn't have fired.
316 _checkCallbacks(true, wasInputPaused);
317 }
318
319 /**
320 * Record that a listener wants a pause from events.
321 *
322 * This methods is called from [_StreamListener.pause()].
323 * Subclasses can override this method, along with [isPaused] and
324 * [createSubscription], if they want to do a different handling of paused
325 * subscriptions, e.g., a filtering stream pausing its own source if all its
326 * subscribers are paused.
327 */
328 void _pause(_StreamListener<T> listener, Future resumeSignal) {
329 assert(identical(listener._source, this));
330 if (!listener._isSubscribed) {
331 throw new StateError("Subscription has been canceled.");
332 }
333 assert(!_isComplete); // There can be no subscribers when complete.
334 bool wasInputPaused = _isInputPaused;
335 bool wasPaused = _isPaused;
336 _incrementPauseCount(listener);
337 if (resumeSignal != null) {
338 resumeSignal.whenComplete(() { this._resume(listener, true); });
339 }
340 if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) {
341 _pendingEvents.cancelSchedule();
342 }
343 if (_isInactive && !wasInputPaused) {
344 _checkCallbacks(true, false);
345 if (!_isPaused && _hasPendingEvent) {
346 _schedulePendingEvents();
347 }
348 }
349 }
350
351 /** Stops pausing due to one request from the given listener. */
352 void _resume(_StreamListener<T> listener, bool fromEvent) {
353 if (!listener.isPaused) return;
354 assert(listener._isSubscribed);
355 assert(_isPaused);
356 _decrementPauseCount(listener);
357 if (!_isPaused) {
358 if (_hasPendingEvent) {
359 _state |= _STREAM_PENDING_RESUME;
360 // Controller's pause state hasn't changed.
361 // If we can fire events now, fire any pending events right away.
362 if (_isInactive) {
363 if (fromEvent) {
364 _handlePendingEvents();
365 } else {
366 _schedulePendingEvents();
367 }
368 }
369 } else if (_isInactive) {
370 _checkCallbacks(true, true);
371 if (!_isPaused && _hasPendingEvent) {
372 if (fromEvent) {
373 _handlePendingEvents();
374 } else {
375 _schedulePendingEvents();
376 }
377 }
378 }
379 }
380 }
381
382 /** Schedule pending events to be executed. */
383 void _schedulePendingEvents() {
384 assert(_hasPendingEvent);
385 _pendingEvents.schedule(this);
386 }
387
388 /** Create a subscription object. Called by [subcribe]. */ 459 /** Create a subscription object. Called by [subcribe]. */
389 _StreamSubscriptionImpl<T> _createSubscription( 460 _BufferingStreamSubscription<T> _createSubscription(
390 void onData(T data),
391 void onError(error),
392 void onDone(),
393 bool cancelOnError);
394
395 /**
396 * Adds a listener to this stream.
397 */
398 void _addListener(_StreamSubscriptionImpl subscription);
399
400 /**
401 * Handle a cancel requested from a [_StreamSubscriptionImpl].
402 *
403 * This method is called from [_StreamSubscriptionImpl.cancel].
404 *
405 * If an event is currently firing, the cancel is delayed
406 * until after the subscribers have received the event.
407 */
408 void _cancel(_StreamSubscriptionImpl subscriber);
409
410 /**
411 * Iterate over all current subscribers and perform an action on each.
412 *
413 * Subscribers added during the iteration will not be visited.
414 * Subscribers unsubscribed during the iteration will only be removed
415 * after they have been acted on.
416 *
417 * Any change in the pause state is only reported after all subscribers have
418 * received the event.
419 *
420 * The [action] must not throw, or the controller will be left in an
421 * invalid state.
422 *
423 * This method must not be called while [isFiring] is true.
424 */
425 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
426
427 /**
428 * Checks whether the subscription/pause state has changed.
429 *
430 * Calls the appropriate callback if the state has changed from the
431 * provided one. Repeats calling callbacks as long as the call changes
432 * the state.
433 */
434 void _checkCallbacks(bool hadListener, bool wasPaused) {
435 assert(!_isFiring);
436 // Will be handled after the current callback.
437 if (_isInCallback) return;
438 if (_hasPendingResume && !_hasPendingEvent) {
439 _state ^= _STREAM_PENDING_RESUME;
440 }
441 _state |= _STREAM_CALLBACK;
442 while (true) {
443 bool hasListener = _hasListener;
444 bool isPaused = _isInputPaused;
445 if (hadListener != hasListener) {
446 _onSubscriptionStateChange();
447 } else if (isPaused != wasPaused) {
448 _onPauseStateChange();
449 } else {
450 _state ^= _STREAM_CALLBACK;
451 return;
452 }
453 wasPaused = isPaused;
454 hadListener = hasListener;
455 }
456 }
457
458 /**
459 * Called when the first subscriber requests a pause or the last a resume.
460 *
461 * Read [isPaused] to see the new state.
462 */
463 void _onPauseStateChange() {}
464
465 /**
466 * Called when the first listener subscribes or the last unsubscribes.
467 *
468 * Read [hasListener] to see what the new state is.
469 */
470 void _onSubscriptionStateChange() {}
471
472 /**
473 * Add a pending event at the end of the pending event queue.
474 *
475 * Schedules events if currently not paused and inside a callback.
476 */
477 void _addPendingEvent(_DelayedEvent event) {
478 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
479 _StreamImplEvents events = _pendingEvents;
480 events.add(event);
481 if (_isPaused || _isFiring) return;
482 if (_isInCallback) {
483 _schedulePendingEvents();
484 return;
485 }
486 }
487
488 /** Fire any pending events until the pending event queue is empty. */
489 void _handlePendingEvents() {
490 assert(_isInactive);
491 if (!_hasPendingEvent) return;
492 _PendingEvents events = _pendingEvents;
493 do {
494 if (_isPaused) return;
495 if (events.isScheduled) events.cancelSchedule();
496 events.handleNext(this);
497 } while (!events.isEmpty);
498 }
499
500 /**
501 * Send a data event directly to each subscriber.
502 */
503 _sendData(T value) {
504 assert(!_isPaused);
505 assert(!_isComplete);
506 if (!_hasListener) return;
507 _forEachSubscriber((subscriber) {
508 try {
509 subscriber._sendData(value);
510 } catch (e, s) {
511 _throwDelayed(e, s);
512 }
513 });
514 }
515
516 /**
517 * Sends an error event directly to each subscriber.
518 */
519 void _sendError(error) {
520 assert(!_isPaused);
521 assert(!_isComplete);
522 if (!_hasListener) return;
523 _forEachSubscriber((subscriber) {
524 try {
525 subscriber._sendError(error);
526 } catch (e, s) {
527 _throwDelayed(e, s);
528 }
529 });
530 }
531
532 /**
533 * Sends the "done" message directly to each subscriber.
534 * This automatically stops further subscription and
535 * unsubscribes all subscribers.
536 */
537 void _sendDone() {
538 assert(!_isPaused);
539 assert(_isClosed);
540 _setComplete();
541 if (!_hasListener) return;
542 _forEachSubscriber((subscriber) {
543 _cancel(subscriber);
544 try {
545 subscriber._sendDone();
546 } catch (e, s) {
547 _throwDelayed(e, s);
548 }
549 });
550 assert(!_hasListener);
551 }
552 }
553
554 // -------------------------------------------------------------------
555 // Default implementation of a stream with a single subscriber.
556 // -------------------------------------------------------------------
557 /**
558 * Default implementation of stream capable of sending events to one subscriber.
559 *
560 * Any class needing to implement [Stream] can either directly extend this
561 * class, or extend [Stream] and delegate the subscribe method to an instance
562 * of this class.
563 *
564 * The only public methods are those of [Stream], so instances of
565 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
566 * internal functionality.
567 *
568 * The [StreamController] is a public facing version of this class, with
569 * some methods made public.
570 *
571 * The user interface of [_SingleStreamImpl] are the following methods:
572 * * [_add]: Add a data event to the stream.
573 * * [_addError]: Add an error event to the stream.
574 * * [_close]: Request to close the stream.
575 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
576 * when losing the last subscriber.
577 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
578 * * [_hasListener]: Test whether there are currently any subscribers.
579 * * [_isInputPaused]: Test whether the stream is currently paused.
580 * The user should not add new events while the stream is paused, but if it
581 * happens anyway, the stream will enqueue the events just as when new events
582 * arrive while still firing an old event.
583 */
584 class _SingleStreamImpl<T> extends _StreamImpl<T> {
585 _StreamListener _subscriber = null;
586
587 /** Whether there is currently a subscriber on this [Stream]. */
588 bool get _hasListener => _subscriber != null;
589
590 // -------------------------------------------------------------------
591 // Internal implementation.
592
593 _SingleStreamImpl() {
594 // Start out paused.
595 _updatePauseCount(1);
596 }
597
598 /**
599 * Create the new subscription object.
600 */
601 _StreamSubscriptionImpl<T> _createSubscription(
602 void onData(T data), 461 void onData(T data),
603 void onError(error), 462 void onError(error),
604 void onDone(), 463 void onDone(),
605 bool cancelOnError) { 464 bool cancelOnError) {
606 return new _StreamSubscriptionImpl<T>( 465 return new _BufferingStreamSubscription<T>(
607 this, onData, onError, onDone, cancelOnError); 466 onData, onError, onDone, cancelOnError);
608 } 467 }
609 468
610 void _addListener(_StreamListener subscription) { 469 /** Hook called when the subscription has been created. */
611 assert(!_isComplete); 470 void _onListen(StreamSubscription subscription) {}
612 if (_hasListener) { 471 }
613 throw new StateError("Stream already has subscriber."); 472
614 } 473 typedef _PendingEvents _EventGenerator();
615 assert(_pauseCount == 1);
616 _updatePauseCount(-1);
617 _subscriber = subscription;
618 subscription._setSubscribed(0);
619 if (_isInactive) {
620 _checkCallbacks(false, true);
621 if (!_isPaused && _hasPendingEvent) {
622 _schedulePendingEvents();
623 }
624 }
625 }
626
627 /**
628 * Handle a cancel requested from a [_StreamSubscriptionImpl].
629 *
630 * This method is called from [_StreamSubscriptionImpl.cancel].
631 */
632 void _cancel(_StreamListener subscriber) {
633 assert(identical(subscriber._source, this));
634 // We allow unsubscribing the currently firing subscription during
635 // the event firing, because it is indistinguishable from delaying it since
636 // that event has already received the event.
637 if (!identical(_subscriber, subscriber)) {
638 // You may unsubscribe more than once, only the first one counts.
639 return;
640 }
641 _subscriber = null;
642 // Unsubscribing a paused subscription also cancels its pauses.
643 int resumeCount = subscriber._setUnsubscribed();
644 // Keep being paused while there is no subscriber and the stream is not
645 // complete.
646 _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1);
647 if (_isInactive) {
648 _checkCallbacks(true, resumeCount > 0);
649 if (!_isPaused && _hasPendingEvent) {
650 _schedulePendingEvents();
651 }
652 }
653 }
654
655 void _forEachSubscriber(
656 void action(_StreamListener<T> subscription)) {
657 assert(!_isPaused);
658 bool wasInputPaused = _isInputPaused;
659 _StreamListener subscription = _subscriber;
660 assert(subscription != null);
661 _startFiring();
662 action(subscription);
663 _endFiring(wasInputPaused);
664 }
665 }
666
667 // -------------------------------------------------------------------
668 // Default implementation of a stream with subscribers.
669 // -------------------------------------------------------------------
670
671 /**
672 * Default implementation of stream capable of sending events to subscribers.
673 *
674 * Any class needing to implement [Stream] can either directly extend this
675 * class, or extend [Stream] and delegate the subscribe method to an instance
676 * of this class.
677 *
678 * The only public methods are those of [Stream], so instances of
679 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
680 * internal functionality.
681 *
682 * The [StreamController] is a public facing version of this class, with
683 * some methods made public.
684 *
685 * The user interface of [_MultiStreamImpl] are the following methods:
686 * * [_add]: Add a data event to the stream.
687 * * [_addError]: Add an error event to the stream.
688 * * [_close]: Request to close the stream.
689 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
690 * when losing the last subscriber.
691 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
692 * * [_hasListener]: Test whether there are currently any subscribers.
693 * * [_isPaused]: Test whether the stream is currently paused.
694 * The user should not add new events while the stream is paused, but if it
695 * happens anyway, the stream will enqueue the events just as when new events
696 * arrive while still firing an old event.
697 */
698 class _MultiStreamImpl<T> extends _StreamImpl<T>
699 implements _InternalLinkList {
700 // Link list implementation (mixin when possible).
701 _InternalLink _nextLink;
702 _InternalLink _previousLink;
703
704 _MultiStreamImpl() {
705 _nextLink = _previousLink = this;
706 }
707
708 bool get isBroadcast => true;
709
710 Stream<T> asBroadcastStream() => this;
711
712 // ------------------------------------------------------------------
713 // Helper functions that can be overridden in subclasses.
714
715 /** Whether there are currently any subscribers on this [Stream]. */
716 bool get _hasListener => !_InternalLinkList.isEmpty(this);
717
718 /**
719 * Create the new subscription object.
720 */
721 _StreamListener<T> _createSubscription(
722 void onData(T data),
723 void onError(error),
724 void onDone(),
725 bool cancelOnError) {
726 return new _StreamSubscriptionImpl<T>(
727 this, onData, onError, onDone, cancelOnError);
728 }
729
730 // -------------------------------------------------------------------
731 // Internal implementation.
732
733 /**
734 * Iterate over all current subscribers and perform an action on each.
735 *
736 * The set of subscribers cannot be modified during this iteration.
737 * All attempts to add or unsubscribe subscribers will be delayed until
738 * after the iteration is complete.
739 *
740 * The [action] must not throw, or the controller will be left in an
741 * invalid state.
742 *
743 * This method must not be called while [isFiring] is true.
744 */
745 void _forEachSubscriber(
746 void action(_StreamListener<T> subscription)) {
747 assert(!_isFiring);
748 if (!_hasListener) return;
749 bool wasInputPaused = _isInputPaused;
750 _startFiring();
751 _InternalLink cursor = this._nextLink;
752 while (!identical(cursor, this)) {
753 _StreamListener<T> current = cursor;
754 if (current._needsEvent(_currentEventIdBit)) {
755 action(current);
756 // Marks as having received the event.
757 current._toggleEventReceived();
758 }
759 cursor = current._nextLink;
760 if (current._isPendingUnsubscribe) {
761 _removeListener(current);
762 }
763 }
764 _endFiring(wasInputPaused);
765 }
766
767 void _addListener(_StreamListener listener) {
768 listener._setSubscribed(_currentEventIdBit);
769 bool hadListener = _hasListener;
770 _InternalLinkList.add(this, listener);
771 if (!hadListener && _isInactive) {
772 _checkCallbacks(false, false);
773 if (!_isPaused && _hasPendingEvent) {
774 _schedulePendingEvents();
775 }
776 }
777 }
778
779 /**
780 * Handle a cancel requested from a [_StreamListener].
781 *
782 * This method is called from [_StreamListener.cancel].
783 *
784 * If an event is currently firing, the cancel is delayed
785 * until after the subscribers have received the event.
786 */
787 void _cancel(_StreamListener listener) {
788 assert(identical(listener._source, this));
789 if (_InternalLink.isUnlinked(listener)) {
790 // You may unsubscribe more than once, only the first one counts.
791 return;
792 }
793 if (_isFiring) {
794 if (listener._needsEvent(_currentEventIdBit)) {
795 assert(listener._isSubscribed);
796 listener._setPendingUnsubscribe(_currentEventIdBit);
797 } else {
798 // The listener has been notified of the event (or don't need to,
799 // if it's still pending subscription) so it's safe to remove it.
800 _removeListener(listener);
801 }
802 // Pause and subscription state changes are reported when we end
803 // firing.
804 } else {
805 bool wasInputPaused = _isInputPaused;
806 _removeListener(listener);
807 if (_isInactive) {
808 _checkCallbacks(true, wasInputPaused);
809 if (!_isPaused && _hasPendingEvent) {
810 _schedulePendingEvents();
811 }
812 }
813 }
814 }
815
816 /**
817 * Removes a listener from this stream and cancels its pauses.
818 *
819 * This is a low-level action that doesn't call [_onSubscriptionStateChange].
820 * or [_callOnPauseStateChange].
821 */
822 void _removeListener(_StreamListener listener) {
823 int pauseCount = listener._setUnsubscribed();
824 _InternalLinkList.remove(listener);
825 if (pauseCount > 0) {
826 _updatePauseCount(-pauseCount);
827 if (!_isPaused && _hasPendingEvent) {
828 _state |= _STREAM_PENDING_RESUME;
829 }
830 }
831 }
832 }
833
834 474
835 /** Stream that generates its own events. */ 475 /** Stream that generates its own events. */
836 class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { 476 class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
837 /** 477 final _EventGenerator _pending;
838 * Initializes the stream to have only the events provided by [events]. 478 /**
839 * 479 * Initializes the stream to have only the events provided by a
840 * A [_PendingEvents] implementation provides events that are handled 480 * [_PendingEvents].
841 * by calling [_PendingEvents.handleNext] with the [_StreamImpl]. 481 *
842 */ 482 * A new [_PendingEvents] must be generated for each listen.
843 _GeneratedSingleStreamImpl(_PendingEvents events) { 483 */
844 _pendingEvents = events; 484 _GeneratedStreamImpl(this._pending);
845 _setClosed(); // Closed for input since all events are already pending. 485
846 } 486 StreamSubscription _createSubscription(void onData(T data),
847 487 void onError(Object error),
848 void _add(T value) { 488 void onDone(),
849 throw new UnsupportedError("Cannot inject events into generated stream"); 489 bool cancelOnError) {
850 } 490 _BufferingStreamSubscription<T> subscription =
851 491 new _BufferingStreamSubscription(
852 void _addError(value) { 492 onData, onError, onDone, cancelOnError);
853 throw new UnsupportedError("Cannot inject events into generated stream"); 493 subscription._setPendingEvents(_pending());
854 } 494 return subscription;
855
856 void _close() {
857 throw new UnsupportedError("Cannot inject events into generated stream");
858 } 495 }
859 } 496 }
860 497
861 498
862 /** Pending events object that gets its events from an [Iterable]. */ 499 /** Pending events object that gets its events from an [Iterable]. */
863 class _IterablePendingEvents<T> extends _PendingEvents { 500 class _IterablePendingEvents<T> extends _PendingEvents {
864 final Iterator<T> _iterator; 501 // The iterator providing data for data events.
865 /** 502 // Set to null when iteration has completed.
866 * Whether there are no more events to be sent. 503 Iterator<T> _iterator;
867 *
868 * This starts out as [:false:] since there is always at least
869 * a 'done' event to be sent.
870 */
871 bool _isDone = false;
872 504
873 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; 505 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
874 506
875 bool get isEmpty => _isDone; 507 bool get isEmpty => _iterator == null;
876 508
877 void handleNext(_StreamImpl<T> stream) { 509 void handleNext(_EventDispatch dispatch) {
878 if (_isDone) throw new StateError("No events pending."); 510 if (_iterator == null) {
511 throw new StateError("No events pending.");
512 }
513 // Send one event per call to moveNext.
514 // If moveNext returns true, send the current element as data.
515 // If moveNext returns false, send a done event and clear the _iterator.
516 // If moveNext throws an error, send an error and clear the _iterator.
517 // After an error, no further events will be sent.
518 bool isDone;
879 try { 519 try {
880 _isDone = !_iterator.moveNext(); 520 isDone = !_iterator.moveNext();
881 if (!_isDone) {
882 stream._sendData(_iterator.current);
883 } else {
884 stream._sendDone();
885 }
886 } catch (e, s) { 521 } catch (e, s) {
887 stream._sendError(_asyncError(e, s)); 522 _iterator = null;
888 stream._sendDone(); 523 dispatch._sendError(_asyncError(e, s));
889 _isDone = true; 524 return;
890 } 525 }
891 } 526 if (!isDone) {
892 } 527 dispatch._sendData(_iterator.current);
893 528 } else {
894 529 _iterator = null;
895 /** 530 dispatch._sendDone();
896 * The subscription class that the [StreamController] uses. 531 }
897 * 532 }
898 * The [_StreamImpl.createSubscription] method should 533
899 * create an object of this type, or another subclass of [_StreamListener]. 534 void clear() {
900 * A subclass of [_StreamImpl] can specify which subclass 535 if (isScheduled) cancelSchedule();
901 * of [_StreamSubscriptionImpl] it uses by overriding 536 _iterator = null;
902 * [_StreamImpl.createSubscription]. 537 }
903 * 538 }
904 * The subscription is in one of three states: 539
905 * * Subscribed.
906 * * Paused-and-subscribed.
907 * * Unsubscribed.
908 * Unsubscribing also resumes any pauses started by the subscription.
909 */
910 class _StreamSubscriptionImpl<T> extends _StreamListener<T>
911 implements StreamSubscription<T> {
912 final bool _cancelOnError;
913 // TODO(ahe): Restore type when feature is implemented in dart2js
914 // checked mode. http://dartbug.com/7733
915 var /* _DataHandler<T> */ _onData;
916 _ErrorHandler _onError;
917 _DoneHandler _onDone;
918 _StreamSubscriptionImpl(_StreamImpl source,
919 this._onData,
920 this._onError,
921 this._onDone,
922 this._cancelOnError) : super(source);
923
924 void onData(void handleData(T event)) {
925 if (handleData == null) handleData = _nullDataHandler;
926 _onData = handleData;
927 }
928
929 void onError(void handleError(error)) {
930 if (handleError == null) handleError = _nullErrorHandler;
931 _onError = handleError;
932 }
933
934 void onDone(void handleDone()) {
935 if (handleDone == null) handleDone = _nullDoneHandler;
936 _onDone = handleDone;
937 }
938
939 void _sendData(T data) {
940 _onData(data);
941 }
942
943 void _sendError(error) {
944 _onError(error);
945 if (_cancelOnError) _source._cancel(this);
946 }
947
948 void _sendDone() {
949 _onDone();
950 }
951
952 void cancel() {
953 if (!_isSubscribed) return;
954 _source._cancel(this);
955 }
956
957 void pause([Future resumeSignal]) {
958 if (!_isSubscribed) return;
959 _source._pause(this, resumeSignal);
960 }
961
962 void resume() {
963 if (!_isSubscribed || !isPaused) return;
964 _source._resume(this, false);
965 }
966
967 Future asFuture([var futureValue]) {
968 _FutureImpl<T> result = new _FutureImpl<T>();
969
970 // Overwrite the onDone and onError handlers.
971 onDone(() { result._setValue(futureValue); });
972 onError((error) {
973 cancel();
974 result._setError(error);
975 });
976
977 return result;
978 }
979 }
980 540
981 // Internal helpers. 541 // Internal helpers.
982 542
983 // Types of the different handlers on a stream. Types used to type fields. 543 // Types of the different handlers on a stream. Types used to type fields.
984 typedef void _DataHandler<T>(T value); 544 typedef void _DataHandler<T>(T value);
985 typedef void _ErrorHandler(error); 545 typedef void _ErrorHandler(error);
986 typedef void _DoneHandler(); 546 typedef void _DoneHandler();
987 547
988 548
989 /** Default data handler, does nothing. */ 549 /** Default data handler, does nothing. */
990 void _nullDataHandler(var value) {} 550 void _nullDataHandler(var value) {}
991 551
992 /** Default error handler, reports the error to the global handler. */ 552 /** Default error handler, reports the error to the global handler. */
993 void _nullErrorHandler(error) { 553 void _nullErrorHandler(error) {
994 _throwDelayed(error); 554 _throwDelayed(error);
995 } 555 }
996 556
997 /** Default done handler, does nothing. */ 557 /** Default done handler, does nothing. */
998 void _nullDoneHandler() {} 558 void _nullDoneHandler() {}
999 559
1000 560
1001 /** A delayed event on a stream implementation. */ 561 /** A delayed event on a buffering stream subscription. */
1002 abstract class _DelayedEvent { 562 abstract class _DelayedEvent {
1003 /** Added as a linked list on the [StreamController]. */ 563 /** Added as a linked list on the [StreamController]. */
1004 _DelayedEvent next; 564 _DelayedEvent next;
1005 /** Execute the delayed event on the [StreamController]. */ 565 /** Execute the delayed event on the [StreamController]. */
1006 void perform(_StreamImpl stream); 566 void perform(_EventDispatch dispatch);
1007 } 567 }
1008 568
1009 /** A delayed data event. */ 569 /** A delayed data event. */
1010 class _DelayedData<T> extends _DelayedEvent{ 570 class _DelayedData<T> extends _DelayedEvent{
1011 final T value; 571 final T value;
1012 _DelayedData(this.value); 572 _DelayedData(this.value);
1013 void perform(_StreamImpl<T> stream) { 573 void perform(_EventDispatch<T> dispatch) {
1014 stream._sendData(value); 574 dispatch._sendData(value);
1015 } 575 }
1016 } 576 }
1017 577
1018 /** A delayed error event. */ 578 /** A delayed error event. */
1019 class _DelayedError extends _DelayedEvent { 579 class _DelayedError extends _DelayedEvent {
1020 final error; 580 final error;
1021 _DelayedError(this.error); 581 _DelayedError(this.error);
1022 void perform(_StreamImpl stream) { 582 void perform(_EventDispatch dispatch) {
1023 stream._sendError(error); 583 dispatch._sendError(error);
1024 } 584 }
1025 } 585 }
1026 586
1027 /** A delayed done event. */ 587 /** A delayed done event. */
1028 class _DelayedDone implements _DelayedEvent { 588 class _DelayedDone implements _DelayedEvent {
1029 const _DelayedDone(); 589 const _DelayedDone();
1030 void perform(_StreamImpl stream) { 590 void perform(_EventDispatch dispatch) {
1031 stream._sendDone(); 591 dispatch._sendDone();
1032 } 592 }
1033 593
1034 _DelayedEvent get next => null; 594 _DelayedEvent get next => null;
1035 595
1036 void set next(_DelayedEvent _) { 596 void set next(_DelayedEvent _) {
1037 throw new StateError("No events after a done."); 597 throw new StateError("No events after a done.");
1038 } 598 }
1039 } 599 }
1040 600
1041 /** 601 /**
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
1111 listLast._nextLink = otherNext; 671 listLast._nextLink = otherNext;
1112 otherNext._previousLink = listLast; 672 otherNext._previousLink = listLast;
1113 _InternalLink otherLast = other._previousLink; 673 _InternalLink otherLast = other._previousLink;
1114 list._previousLink = otherLast; 674 list._previousLink = otherLast;
1115 otherLast._nextLink = list; 675 otherLast._nextLink = list;
1116 // Clean up [other]. 676 // Clean up [other].
1117 other._nextLink = other._previousLink = other; 677 other._nextLink = other._previousLink = other;
1118 } 678 }
1119 } 679 }
1120 680
1121 /** Abstract type for an internal interface for sending events. */
1122 abstract class _EventOutputSink<T> {
1123 _sendData(T data);
1124 _sendError(error);
1125 _sendDone();
1126 }
1127
1128 abstract class _StreamListener<T> extends _InternalLink
1129 implements _EventOutputSink<T> {
1130 final _StreamImpl _source;
1131 int _state = _LISTENER_UNSUBSCRIBED;
1132
1133 _StreamListener(this._source);
1134
1135 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
1136
1137 bool get _isPendingUnsubscribe =>
1138 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
1139
1140 bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
1141
1142 /**
1143 * Whether the listener still needs to receive the currently firing event.
1144 *
1145 * The currently firing event is identified by a single bit, which alternates
1146 * between events. The [_state] contains the previously sent event's bit in
1147 * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
1148 * still need the current event.
1149 */
1150 bool _needsEvent(int currentEventIdBit) {
1151 int lastEventIdBit =
1152 (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
1153 return lastEventIdBit != currentEventIdBit;
1154 }
1155
1156 /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
1157 /// we are currently firing an event and the subscriber still need to receive
1158 /// the event.
1159 void _toggleEventReceived() {
1160 _state ^= _LISTENER_EVENT_ID;
1161 }
1162
1163 void _setSubscribed(int eventIdBit) {
1164 assert(eventIdBit == 0 || eventIdBit == 1);
1165 _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
1166 }
1167
1168 void _setPendingUnsubscribe(int currentEventIdBit) {
1169 assert(_isSubscribed);
1170 // Sets the pending unsubscribe, and ensures that the listener
1171 // won't get the current event.
1172 _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID;
1173 _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT;
1174 assert(!_needsEvent(currentEventIdBit));
1175 }
1176
1177 /**
1178 * Marks the listener as unsubscibed.
1179 *
1180 * Returns the number of unresumed pauses for the listener.
1181 */
1182 int _setUnsubscribed() {
1183 assert(_isSubscribed);
1184 int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
1185 _state = _LISTENER_UNSUBSCRIBED;
1186 return timesPaused;
1187 }
1188
1189 void _incrementPauseCount() {
1190 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
1191 }
1192
1193 void _decrementPauseCount() {
1194 assert(isPaused);
1195 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
1196 }
1197
1198 _sendData(T data);
1199 _sendError(error);
1200 _sendDone();
1201 }
1202
1203 /** Superclass for provider of pending events. */ 681 /** Superclass for provider of pending events. */
1204 abstract class _PendingEvents { 682 abstract class _PendingEvents {
683 // No async event has been scheduled.
684 static const int _STATE_UNSCHEDULED = 0;
685 // An async event has been scheduled to run a function.
686 static const int _STATE_SCHEDULED = 1;
687 // An async event has been scheduled, but it will do nothing when it runs.
688 // Async events can't be preempted.
689 static const int _STATE_CANCELED = 3;
690
1205 /** 691 /**
1206 * Timer set when pending events are scheduled for execution. 692 * State of being scheduled.
1207 * 693 *
1208 * When scheduling pending events for execution in a later cycle, the timer 694 * Set to [_STATE_SCHEDULED] when pending events are scheduled for
1209 * is stored here. If pending events are executed earlier than that, e.g., 695 * async dispatch. Since we can't cancel a [runAsync] call, if schduling
1210 * due to a second event in the current cycle, the timer is canceled again. 696 * is "canceled", the _state is simply set to [_STATE_CANCELED] which will
697 * make the async code do nothing except resetting [_state].
698 *
699 * If events are scheduled while the state is [_STATE_CANCELED], it is
700 * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync]
701 * is performed.
1211 */ 702 */
1212 Timer scheduleTimer = null; 703 int _state = _STATE_UNSCHEDULED;
1213 704
1214 bool get isEmpty; 705 bool get isEmpty;
1215 706
1216 bool get isScheduled => scheduleTimer != null; 707 bool get isScheduled => _state == _STATE_SCHEDULED;
708 bool get _eventScheduled => _state >= _STATE_SCHEDULED;
1217 709
1218 void schedule(_StreamImpl stream) { 710 /**
711 * Schedule an event to run later.
712 *
713 * If called more than once, it should be called with the same dispatch as
714 * argument each time. It may reuse an earlier argument in some cases.
715 */
716 void schedule(_EventDispatch dispatch) {
1219 if (isScheduled) return; 717 if (isScheduled) return;
1220 scheduleTimer = new Timer(Duration.ZERO, () { 718 assert(!isEmpty);
1221 scheduleTimer = null; 719 if (_eventScheduled) {
1222 stream._handlePendingEvents(); 720 assert(_state == _STATE_CANCELED);
721 _state = _STATE_SCHEDULED;
722 return;
723 }
724 runAsync(() {
725 int oldState = _state;
726 _state = _STATE_UNSCHEDULED;
727 if (oldState == _STATE_CANCELED) return;
728 handleNext(dispatch);
1223 }); 729 });
730 _state = _STATE_SCHEDULED;
1224 } 731 }
1225 732
1226 void cancelSchedule() { 733 void cancelSchedule() {
1227 assert(isScheduled); 734 if (isScheduled) _state = _STATE_CANCELED;
1228 scheduleTimer.cancel();
1229 scheduleTimer = null;
1230 } 735 }
1231 736
1232 void handleNext(_StreamImpl stream); 737 void handleNext(_EventDispatch dispatch);
738
739 /** Throw away any pending events and cancel scheduled events. */
740 void clear();
1233 } 741 }
1234 742
1235 743
1236 /** Class holding pending events for a [_StreamImpl]. */ 744 /** Class holding pending events for a [_StreamImpl]. */
1237 class _StreamImplEvents extends _PendingEvents { 745 class _StreamImplEvents extends _PendingEvents {
1238 /// Single linked list of [_DelayedEvent] objects. 746 /// Single linked list of [_DelayedEvent] objects.
1239 _DelayedEvent firstPendingEvent = null; 747 _DelayedEvent firstPendingEvent = null;
1240 /// Last element in the list of pending events. New events are added after it. 748 /// Last element in the list of pending events. New events are added after it.
1241 _DelayedEvent lastPendingEvent = null; 749 _DelayedEvent lastPendingEvent = null;
1242 750
1243 bool get isEmpty => lastPendingEvent == null; 751 bool get isEmpty => lastPendingEvent == null;
1244 752
1245 bool get isScheduled => scheduleTimer != null;
1246
1247 void add(_DelayedEvent event) { 753 void add(_DelayedEvent event) {
1248 if (lastPendingEvent == null) { 754 if (lastPendingEvent == null) {
1249 firstPendingEvent = lastPendingEvent = event; 755 firstPendingEvent = lastPendingEvent = event;
1250 } else { 756 } else {
1251 lastPendingEvent = lastPendingEvent.next = event; 757 lastPendingEvent = lastPendingEvent.next = event;
1252 } 758 }
1253 } 759 }
1254 760
1255 void handleNext(_StreamImpl stream) { 761 void handleNext(_EventDispatch dispatch) {
1256 assert(!isScheduled); 762 assert(!isScheduled);
1257 _DelayedEvent event = firstPendingEvent; 763 _DelayedEvent event = firstPendingEvent;
1258 firstPendingEvent = event.next; 764 firstPendingEvent = event.next;
1259 if (firstPendingEvent == null) { 765 if (firstPendingEvent == null) {
1260 lastPendingEvent = null; 766 lastPendingEvent = null;
1261 } 767 }
1262 event.perform(stream); 768 event.perform(dispatch);
769 }
770
771 void clear() {
772 if (isScheduled) cancelSchedule();
773 firstPendingEvent = lastPendingEvent = null;
1263 } 774 }
1264 } 775 }
1265 776
1266 777 class _MultiplexerLinkedList {
1267 class _DoneSubscription<T> implements StreamSubscription<T> { 778 _MultiplexerLinkedList _next;
1268 _DoneHandler _handler; 779 _MultiplexerLinkedList _previous;
1269 Timer _timer; 780
1270 int _pauseCount = 0; 781 void _unlink() {
1271 782 _previous._next = _next;
1272 _DoneSubscription(this._handler) { 783 _next._previous = _previous;
1273 _delayDone(); 784 _next = _previous = this;
1274 } 785 }
1275 786
1276 void _delayDone() { 787 void _insertBefore(_MultiplexerLinkedList newNext) {
1277 assert(_timer == null && _pauseCount == 0); 788 _MultiplexerLinkedList newPrevious = newNext._previous;
1278 _timer = new Timer(Duration.ZERO, () { 789 newPrevious._next = this;
1279 if (_handler != null) _handler(); 790 newNext._previous = _previous;
1280 }); 791 _previous._next = newNext;
1281 } 792 _previous = newPrevious;
1282
1283 bool get _isComplete => _timer == null && _pauseCount == 0;
1284
1285 void onData(void handleAction(T value)) {}
1286
1287 void onError(void handleError(error)) {}
1288
1289 void onDone(void handleDone()) {
1290 _handler = handleDone;
1291 }
1292
1293 void pause([Future signal]) {
1294 if (_isComplete) return;
1295 if (_timer != null) {
1296 _timer.cancel();
1297 _timer = null;
1298 }
1299 _pauseCount++;
1300 if (signal != null) signal.whenComplete(resume);
1301 }
1302
1303 void resume() {
1304 if (_isComplete) return;
1305 if (_pauseCount == 0) return;
1306 _pauseCount--;
1307 if (_pauseCount == 0) {
1308 _delayDone();
1309 }
1310 }
1311
1312 bool get isPaused => _pauseCount > 0;
1313
1314 void cancel() {
1315 if (_isComplete) return;
1316 if (_timer != null) {
1317 _timer.cancel();
1318 _timer = null;
1319 }
1320 _pauseCount = 0;
1321 }
1322
1323 Future asFuture([var futureValue]) {
1324 // TODO(floitsch): share more code.
1325 _FutureImpl<T> result = new _FutureImpl<T>();
1326
1327 // Overwrite the onDone and onError handlers.
1328 onDone(() { result._setValue(futureValue); });
1329 onError((error) {
1330 cancel();
1331 result._setError(error);
1332 });
1333
1334 return result;
1335 } 793 }
1336 } 794 }
1337 795
1338 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { 796 /**
797 * A subscription used by [_SingleStreamMultiplexer].
798 *
799 * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
800 * listeners at a time. It is used to implement [Stream.asBroadcastStream].
801 *
802 * It is itself listening to another stream for events, and it forwards all
803 * events to all of its simultanous listeners.
804 *
805 * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
806 */
807 // TODO(lrn): Change "implements" to "with" when automatic mixin constructors
808 // are implemented.
809 class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
810 implements _MultiplexerLinkedList {
811 static const int _STATE_NOT_LISTENING = 0;
812 // Bit that alternates between event firings. If bit matches the one currently
813 // firing, the subscription will not be notified.
814 static const int _STATE_EVENT_ID_BIT = 1;
815 // Whether the subscription is listening at all. This should be set while
816 // it is part of the linked list of listeners of a multiplexer stream.
817 static const int _STATE_LISTENING = 2;
818 // State bit set while firing an event.
819 static const int _STATE_IS_FIRING = 4;
820 // Bit set if a subscription is canceled while it's firing (the
821 // [_STATE_IS_FIRING] bit is set).
822 // If the subscription is canceled while firing, it is not removed from the
823 // linked list immediately (to avoid breaking iteration), but is instead
824 // removed after it is done firing.
825 static const int _STATE_REMOVE_AFTER_FIRING = 8;
826
827 // Firing state.
828 int _multiplexState;
829
830 _SingleStreamMultiplexer _source;
831
832 _MultiplexerSubscription(this._source,
833 void onData(T data),
834 void onError(Object error),
835 void onDone(),
836 bool cancelOnError,
837 int nextEventId)
838 : _multiplexState = _STATE_LISTENING | nextEventId,
839 super(onData, onError, onDone, cancelOnError) {
840 _next = _previous = this;
841 }
842
843 // Mixin workaround.
844 _MultiplexerLinkedList _next;
845 _MultiplexerLinkedList _previous;
846
847 void _unlink() {
848 _previous._next = _next;
849 _next._previous = _previous;
850 _next = _previous = this;
851 }
852
853 void _insertBefore(_MultiplexerLinkedList newNext) {
854 _MultiplexerLinkedList newPrevious = newNext._previous;
855 newPrevious._next = this;
856 newNext._previous = _previous;
857 _previous._next = newNext;
858 _previous = newPrevious;
859 }
860 // End mixin.
861
862 bool get _isListening => _multiplexState >= _STATE_LISTENING;
863 bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
864 bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
865 int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
866
867 void _setRemoveAfterFiring() {
868 assert(_isFiring);
869 _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
870 }
871
872 void _startFiring() {
873 assert(!_isFiring);
874 _multiplexState |= _STATE_IS_FIRING;
875 }
876
877 /// Marks listener as no longer firing, and toggles its event id.
878 void _endFiring() {
879 assert(_isFiring);
880 _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
881 }
882
883 void _setNotListening() {
884 assert(_isListening);
885 _multiplexState = _STATE_NOT_LISTENING;
886 }
887
888 void _onCancel() {
889 assert(_isListening);
890 _source._removeListener(this);
891 }
892 }
893
894 /**
895 * A stream that sends events from another stream to multiple listeners.
896 *
897 * This is used to implement [Stream.asBroadcastStream].
898 *
899 * This stream allows listening more than once.
900 * When the first listener is added, it starts listening on its source
901 * stream for events. All events from the source stream are sent to all
902 * active listeners. The listeners handle their own buffering.
903 * When the last listener cancels, the source stream subscription is also
904 * canceled, and no further listening is possible.
905 */
906 // TODO(lrn): change "implements" to "with" when the VM supports it.
907 class _SingleStreamMultiplexer<T> extends Stream<T>
908 implements _MultiplexerLinkedList,
909 _EventDispatch<T> {
1339 final Stream<T> _source; 910 final Stream<T> _source;
1340 StreamSubscription<T> _subscription; 911 StreamSubscription<T> _subscription;
1341 912 // Alternates between zero and one for each event fired.
1342 _SingleStreamMultiplexer(this._source); 913 // Listeners are initialized with the next event's id, and will
1343 914 // only be notified if they match the event being fired.
1344 void _callOnPauseStateChange() { 915 // That way listeners added during event firing will not receive
1345 if (_isPaused) { 916 // the current event.
1346 if (_subscription != null) { 917 int _eventId = 0;
1347 _subscription.pause(); 918
919 bool _isFiring = false;
920
921 // Remember events added while firing.
922 _StreamImplEvents _pending;
923
924 _SingleStreamMultiplexer(this._source) {
925 _next = _previous = this;
926 }
927
928 bool get _hasPending => _pending != null && !_pending.isEmpty;
929
930 // Should be mixin.
931 _MultiplexerLinkedList _next;
932 _MultiplexerLinkedList _previous;
933
934 void _unlink() {
935 _previous._next = _next;
936 _next._previous = _previous;
937 _next = _previous = this;
938 }
939
940 void _insertBefore(_MultiplexerLinkedList newNext) {
941 _MultiplexerLinkedList newPrevious = newNext._previous;
942 newPrevious._next = this;
943 newNext._previous = _previous;
944 _previous._next = newNext;
945 _previous = newPrevious;
946 }
947 // End of mixin.
948
949 StreamSubscription<T> listen(void onData(T data),
950 { void onError(Object error),
951 void onDone(),
952 bool cancelOnError }) {
953 if (onData == null) onData = _nullDataHandler;
954 if (onError == null) onError = _nullErrorHandler;
955 if (onDone == null) onDone = _nullDoneHandler;
956 cancelOnError = identical(true, cancelOnError);
957 _MultiplexerSubscription subscription =
958 new _MultiplexerSubscription(this, onData, onError, onDone,
959 cancelOnError, _eventId);
960 if (_subscription == null) {
961 _subscription = _source.listen(_add, onError: _addError, onDone: _close);
962 }
963 subscription._insertBefore(this);
964 return subscription;
965 }
966
967 /** Called by [_MultiplexerSubscription.remove]. */
968 void _removeListener(_MultiplexerSubscription listener) {
969 if (listener._isFiring) {
970 listener._setRemoveAfterFiring();
971 } else {
972 _unlinkListener(listener);
973 }
974 }
975
976 /** Remove a listener and close the subscription after the last one. */
977 void _unlinkListener(_MultiplexerSubscription listener) {
978 listener._setNotListening();
979 listener._unlink();
980 if (identical(_next, this)) {
981 // Last listener removed.
982 _cancel();
983 }
984 }
985
986 void _cancel() {
987 StreamSubscription subscription = _subscription;
988 _subscription = null;
989 subscription.cancel();
990 if (_pending != null) _pending.cancelSchedule();
991 }
992
993 void _forEachListener(void action(_MultiplexerSubscription listener)) {
994 int eventId = _eventId;
995 _eventId ^= 1;
996 _isFiring = true;
997 _MultiplexerLinkedList entry = _next;
998 // Call each listener in order. A listener can be removed during any
999 // other listener's event. During its own event it will only be marked
1000 // as "to be removed", and it will be handled after the event is done.
1001 while (!identical(entry, this)) {
1002 _MultiplexerSubscription listener = entry;
1003 if (listener._eventId == eventId) {
1004 listener._startFiring();
1005 action(listener);
1006 listener._endFiring(); // Also toggles the event id.
1348 } 1007 }
1349 } else { 1008 entry = listener._next;
1350 if (_subscription != null) { 1009 if (listener._removeAfterFiring) {
1351 _subscription.resume(); 1010 _unlinkListener(listener);
1352 } 1011 }
1353 } 1012 }
1354 } 1013 _isFiring = false;
1355 1014 }
1356 /** 1015
1357 * Subscribe or unsubscribe on [_source] depending on whether 1016 void _add(T data) {
1358 * [_stream] has subscribers. 1017 if (_isFiring || _hasPending) {
1359 */ 1018 _StreamImplEvents pending = _pending;
1360 void _onSubscriptionStateChange() { 1019 if (pending == null) pending = _pending = new _StreamImplEvents();
1361 if (_hasListener) { 1020 pending.add(new _DelayedData(data));
1362 assert(_subscription == null); 1021 } else {
1363 _subscription = _source.listen(this._add, 1022 _sendData(data);
1364 onError: this._addError, 1023 }
1365 onDone: this._close); 1024 }
1366 } else { 1025
1367 // TODO(lrn): Check why this can happen. 1026 void _addError(Object error) {
1368 if (_subscription == null) return; 1027 if (_isFiring || _hasPending) {
1369 _subscription.cancel(); 1028 _StreamImplEvents pending = _pending;
1370 _subscription = null; 1029 if (pending == null) pending = _pending = new _StreamImplEvents();
1371 } 1030 pending.add(new _DelayedError(error));
1031 } else {
1032 _sendError(error);
1033 }
1034 }
1035
1036 void _close() {
1037 if (_isFiring || _hasPending) {
1038 _StreamImplEvents pending = _pending;
1039 if (pending == null) pending = _pending = new _StreamImplEvents();
1040 pending.add(const _DelayedDone());
1041 } else {
1042 _sendDone();
1043 }
1044 }
1045
1046 void _sendData(T data) {
1047 _forEachListener((_MultiplexerSubscription listener) {
1048 listener._add(data);
1049 });
1050 if (_hasPending) {
1051 _pending.schedule(this);
1052 }
1053 }
1054
1055 void _sendError(Object error) {
1056 _forEachListener((_MultiplexerSubscription listener) {
1057 listener._addError(error);
1058 });
1059 if (_hasPending) {
1060 _pending.schedule(this);
1061 }
1062 }
1063
1064 void _sendDone() {
1065 _forEachListener((_MultiplexerSubscription listener) {
1066 listener._setRemoveAfterFiring();
1067 listener._close();
1068 });
1372 } 1069 }
1373 } 1070 }
1071
1072
1073 /**
1074 * Simple implementation of [StreamIterator].
1075 */
1076 class _StreamIteratorImpl<T> implements StreamIterator<T> {
1077 // Internal state of the stream iterator.
1078 // At any time, it is in one of these states.
1079 // The interpretation of the [_data] field depends on the state.
1080 // In _STATE_MOVING, the _data field holds the most recently returned
1081 // future.
1082 // In other _STATE_FOUND and _STATE_EXTRA_* it contains the value returned
1083 // by [current].
1084 // If in on of the _STATE_EXTRA_* states, the _cache object may hold the
floitsch 2013/05/27 08:24:43 When in one...
Lasse Reichstein Nielsen 2013/05/27 08:33:44 Done.
1085 // next data/error object, and the subscription is paused.
1086
1087 /// The simple state where [_data] holds the data to return, and [moveNext]
1088 /// is allowed. The subscription is actively listening.
1089 static const int _STATE_FOUND = 0;
1090 /// State set after [moveNext] has returned false or an error,
1091 /// or after calling [cancel]. The subscription is always canceled.
1092 static const int _STATE_DONE = 1;
1093 /// State set after calling [moveNext], but before its returned future has
1094 /// completed. Calling [moveNext] again is not allowed in this state.
1095 /// The subscription is actively listening.
1096 static const int _STATE_MOVING = 2;
1097 /// States set when another event occurs while in _STATE_FOUND.
1098 /// This extra overflow event is cached until the next call to [moveNext],
1099 /// which will complete as if it received the event normally.
1100 /// The subscription is paused in these states, so we only ever get one
1101 /// event too many.
1102 static const int _STATE_EXTRA_DATA = 3;
1103 static const int _STATE_EXTRA_ERROR = 4;
1104 static const int _STATE_EXTRA_DONE = 5;
1105
1106 /// Subscription being listened to.
1107 StreamSubscription _subscription;
1108
1109 /// The current element represented by the most recent call to moveNext.
1110 ///
1111 /// Is null between the time moveNext is called and its future completes.
1112 T _current = null;
1113
1114 /// The future returned by the most recent call to [moveNext].
1115 ///
1116 /// Also used to store the next value/error in case the stream provides an
1117 /// event before [moveNext] is called again. In that case, the stream will
1118 /// be paused to prevent further events.
1119 var _futureOrPrefetch = null;
1120
1121 /// The current state.
1122 int _state = _STATE_FOUND;
1123
1124 _StreamIteratorImpl(final Stream<T> stream) {
1125 _subscription = stream.listen(_onData,
1126 onError: _onError,
1127 onDone: _onDone,
1128 cancelOnError: true);
1129 }
1130
1131 T get current => _current;
1132
1133 Future<bool> moveNext() {
floitsch 2013/05/27 08:24:43 comment.
Lasse Reichstein Nielsen 2013/05/27 08:33:44 In the interface.
1134 if (_state == _STATE_DONE) {
1135 return new _FutureImpl<bool>.immediate(false);
1136 }
1137 if (_state == _STATE_MOVING) {
1138 throw new StateError("Already waiting for next.");
1139 }
1140 if (_state == _STATE_FOUND) {
1141 _state = _STATE_MOVING;
1142 _futureOrPrefetch = new _FutureImpl<bool>();
1143 return _futureOrPrefetch;
1144 } else {
1145 assert(_state >= _STATE_EXTRA_DATA);
1146 switch (_state) {
1147 case _STATE_EXTRA_DATA:
1148 _state = _STATE_FOUND;
1149 _current = _futureOrPrefetch;
1150 _futureOrPrefetch = null;
1151 _subscription.resume();
1152 return new FutureImpl<bool>.immediate(true);
1153 case _STATE_EXTRA_ERROR:
1154 Object prefetch = _futureOrPrefetch;
1155 _cancel();
1156 return new FutureImpl<bool>.error(prefetch);
1157 case _STATE_EXTRA_DONE:
1158 _cancel();
1159 return new FutureImpl<bool>.immediate(false);
1160 }
1161 }
1162 }
1163
1164 void _clear() {
floitsch 2013/05/27 08:24:43 Comment.
Lasse Reichstein Nielsen 2013/05/27 08:33:44 Done.
1165 _subscription = null;
1166 _futureOrPrefetch = null;
1167 _current = null;
1168 _state = _STATE_DONE;
1169 }
1170
1171 void cancel() {
floitsch 2013/05/27 08:24:43 comment.
Lasse Reichstein Nielsen 2013/05/27 08:33:44 In the interface.
1172 StreamSubscription subscription = _subscription;
1173 if (_state == _STATE_MOVING) {
1174 _FutureImpl<bool> hasNext = _futureOrPrefetch;
1175 _clear();
1176 hasNext._setValue(false);
1177 } else {
1178 _clear();
1179 }
1180 subscription.cancel();
1181 }
1182
1183 void _onData(T data) {
1184 if (_state == _STATE_MOVING) {
1185 _current = data;
1186 _FutureImpl<bool> hasNext = _futureOrPrefetch;
1187 _futureOrPrefetch = null;
1188 _state = _STATE_FOUND;
1189 hasNext._setValue(true);
1190 return;
1191 }
1192 _subscription.pause();
1193 _futureOrPrefetch = data;
floitsch 2013/05/27 08:24:43 assert that _futureOrPrefetch == null
Lasse Reichstein Nielsen 2013/05/27 08:33:44 Done.
1194 _state = _STATE_EXTRA_DATA;
1195 }
1196
1197 void _onError(Object error) {
1198 if (_state == _STATE_MOVING) {
1199 _FutureImpl<bool> hasNext = _futureOrPrefetch;
1200 // We have cancelOnError: true, so the subscription is canceled.
1201 _clear();
1202 hasNext._setError(error);
1203 return;
1204 }
1205 _subscription.pause();
1206 _futureOrPrefetch = error;
1207 _state = _STATE_EXTRA_ERROR;
1208 }
1209
1210 void _onDone() {
1211 if (_state == _STATE_MOVING) {
1212 _FutureImpl<bool> hasNext = _futureOrPrefetch;
1213 _clear();
1214 hasNext._setValue(false);
1215 return;
1216 }
1217 _subscription.pause();
1218 _futureOrPrefetch = null;
1219 _state = _STATE_EXTRA_DONE;
1220 }
1221 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698