Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(929)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Made tests run (mostly) Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // States shared by single/multi stream implementations.
8
9 // Completion state of the stream.
10 /// Initial and default state where the stream can receive and send events.
11 const int _STREAM_OPEN = 0;
12 /// The stream has received a request to complete, but hasn't done so yet.
13 /// No further events can be added to the stream.
14 const int _STREAM_CLOSED = 1;
15 /// The stream has completed and will no longer receive or send events.
16 /// Also counts as closed. The stream must not be paused when it's completed.
17 /// Always used in conjunction with [_STREAM_CLOSED].
18 const int _STREAM_COMPLETE = 2;
19
20 /// Bit that alternates between events, and listeners are updated to the
21 /// current value when they are notified of the event.
22 const int _STREAM_EVENT_ID = 4;
23 const int _STREAM_EVENT_ID_SHIFT = 2;
24
25 // The activity state of the stream: What is it currently doing.
26 /// Bit set while firing and clear while not.
27 const int _STREAM_FIRING = 8;
28 /// Bit set while calling a pause-state or subscription-state change callback.
29 const int _STREAM_CALLBACK = 16;
30
31 // The pause state of the stream.
32 /// Bit set when resuming with pending events. Cleared after all pending events
33 /// have been transmitted. Means that the controller still considers the
34 /// stream paused, even if the listener doesn't.
35 const int _STREAM_PENDING_RESUME = 32;
36 /// The count of times a stream has paused is stored in the
37 /// state, shifted by this amount.
38 const int _STREAM_PAUSE_COUNT_SHIFT = 6;
39
40 // States for listeners.
41
42 /// The listener is currently not subscribed to its source stream.
43 const int _LISTENER_UNSUBSCRIBED = 0;
44 /// The listener is actively subscribed to its source stream.
45 const int _LISTENER_SUBSCRIBED = 1;
46 /// The listener is subscribed until it has been notified of the current event.
47 /// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
48 const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
49
50 /// Bit that contains the last sent event's "id bit".
51 const int _LISTENER_EVENT_ID = 4;
52 const int _LISTENER_EVENT_ID_SHIFT = 2;
53
54 /// The count of times a listener has paused is stored in the
55 /// state, shifted by this amount.
56 const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
57
58 /** Throws the given error in the next cycle. */ 7 /** Throws the given error in the next cycle. */
59 _throwDelayed(var error, [Object stackTrace]) { 8 _throwDelayed(var error, [Object stackTrace]) {
60 // We are going to reach the top-level here, but there might be a global 9 // We are going to reach the top-level here, but there might be a global
61 // exception handler. This means that we shouldn't print the stack trace. 10 // exception handler. This means that we shouldn't print the stack trace.
62 // TODO(floitsch): Find better solution that doesn't print the stack trace 11 // TODO(floitsch): Find better solution that doesn't print the stack trace
63 // if there is a global exception handler. 12 // if there is a global exception handler.
64 runAsync(() { 13 runAsync(() {
65 if (stackTrace != null) print(stackTrace); 14 if (stackTrace != null) print(stackTrace);
66 var trace = getAttachedStackTrace(error); 15 var trace = getAttachedStackTrace(error);
67 if (trace != null && trace != stackTrace) print(trace); 16 if (trace != null && trace != stackTrace) print(trace);
68 throw error; 17 throw error;
69 }); 18 });
70 } 19 }
71 20
21 /** Abstract and private interface for a place to put events. */
22 abstract class _EventSink<T> {
23 void _add(T data);
24 void _addError(Object error);
25 void _close();
26 }
27
28 /**
29 * Abstract and private interface for a place to send events.
30 *
31 * Used by event buffering to finally dispatch the pending event, where
32 * [_EventSink] is where the event first enters the stream subscription,
33 * and may yet be buffered.
34 */
35 abstract class _EventDispatch<T> {
36 void _sendData(T data);
37 void _sendError(Object error);
38 void _sendDone();
39 }
40
41 /**
42 * Default implementation of stream subscription of buffering events.
43 *
44 * The only public methods are those of [StreamSubscription], so instances of
45 * [_BufferingStreamSubscription] can be returned directly as a
46 * [StreamSubscription] without exposing internal functionality.
47 *
48 * The [StreamController] is a public facing version of [Stream] and this class,
49 * with some methods made public.
50 *
51 * The user interface of [_BufferingStreamSubscription] are the following
52 * methods:
53 * * [_add]: Add a data event to the stream.
54 * * [_addError]: Add an error event to the stream.
55 * * [_close]: Request to close the stream.
56 * * [_onCancel]: Called when the subscription stops listening.
57 * * [_onPause]: Called when the subscription wants the event source to pause.
58 * * [_onResume]: Called when allowing new events after a pause.
59 * The user should not add new eventswhen the subscription requests a paused,
floitsch 2013/05/22 16:26:29 space missing.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
60 * but if it happens anyway, the subscription will enqueue the events just as
61 * when new events arrive while still firing an old event.
62 */
63 class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
64 _EventSink<T>,
65 _EventDispatch<T> {
66 /** The `cancelOnError` flag from the `listen` call. */
67 static const int _STATE_CANCEL_ON_ERROR = 1;
68 /**
69 * Whether the "done" event has been received.
70 * No further events are accepted after this.
71 */
72 static const int _STATE_CLOSED = 2;
73 /**
74 * Set if the input has been asked not to send events.
75 *
76 * This is not the same as being paused, since the input will remain paused
77 * after a call to [resume] if there are pending events.
78 */
79 static const int _STATE_INPUT_PAUSED = 4;
80 /**
81 * Whether the subscription has been cancelled.
82 *
83 * Set by calling [cancel], or by handling a "done" event, or an "error" event
84 * when `cancelOnError` is true.
85 */
86 static const int _STATE_CANCELLED = 8;
floitsch 2013/05/22 16:26:29 _STATE_CANCELED
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Bowdlerizing constants! :(
87 static const int _STATE_IN_CALLBACK = 16;
88 static const int _STATE_HAS_PENDING = 32;
89 static const int _STATE_PAUSE_COUNT = 64;
90 static const int _STATE_PAUSE_COUNT_SHIFT = 6;
91
92 /** Event handlers provided in constructor. */
93 _DataHandler<T> _onData;
94 _ErrorHandler _onError;
95 _DoneHandler _onDone;
96
97 /** Bit vector based on state-constants above. */
98 int _state;
99
100 /**
101 * Queue of pending events.
102 *
103 * Is created when necessary, or set in constructor for preconfigured events.
104 */
105 _PendingEvents _pending;
106
107 _BufferingStreamSubscription(this._onData,
108 this._onError,
109 this._onDone,
110 bool cancelOnError,
111 this._pending)
112 : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
113 assert(_onData != null);
114 assert(_onError != null);
115 assert(_onDone != null);
116 if (_pending != null && !_pending.isEmpty) {
117 _state |= _STATE_HAS_PENDING;
118 _pending.schedule(this);
119 }
120 }
121
122 // StreamSubscription interface.
123
124 void onData(void handleData(T event)) {
125 if (handleData == null) handleData = _nullDataHandler;
126 _onData = handleData;
127 }
128
129 void onError(void handleError(error)) {
130 if (handleError == null) handleError = _nullErrorHandler;
131 _onError = handleError;
132 }
133
134 void onDone(void handleDone()) {
135 if (handleDone == null) handleDone = _nullDoneHandler;
136 _onDone = handleDone;
137 }
138
139 void pause([Future resumeSignal]) {
140 if (_cancelled) return;
141 bool wasPaused = _isPaused;
142 bool wasInputPaused = _isInputPaused;
143 // Increment pause count and mark input paused (if it isn't already).
144 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
145 if (resumeSignal != null) resumeSignal.whenComplete(resume);
146 if (!wasPaused && _pending != null) _pending.cancelSchedule();
147 if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
148 }
149
150 void resume() {
151 if (_cancelled) return;
152 if (_isPaused) {
153 _decrementPauseCount();
154 if (!_isPaused) {
155 if (_hasPending && !_pending.isEmpty) {
156 // Input is still paused.
157 _pending.schedule(this);
158 } else {
159 assert(_mayResumeInput);
160 _state &= ~_STATE_INPUT_PAUSED;
161 if (!_inCallback) _guardCallback(_onResume);
162 }
163 }
164 }
165 }
166
167 void cancel() {
168 if (_cancelled) return;
169 _cancel();
170 if (!_inCallback) {
171 // otherwise checkState will be called after firing or callback completes.
172 _state |= _STATE_IN_CALLBACK;
173 try {
174 _onCancel();
175 } catch (e, s) {
176 print("BAD THROW: \n$s");
floitsch 2013/05/22 16:26:29 So _onCancel is not allowed to throw? Should this
Lasse Reichstein Nielsen 2013/05/24 06:02:49 _onCancel is an internal method only. It should ne
177 }
178 _state &= ~_STATE_IN_CALLBACK;
179 }
180 }
181
182 Future asFuture([var futureValue]) {
183 _FutureImpl<T> result = new _FutureImpl<T>();
184
185 // Overwrite the onDone and onError handlers.
186 _onDone = () { result._setValue(futureValue); };
187 _onError = (error) {
188 cancel();
189 result._setError(error);
190 };
191
192 return result;
193 }
194
195 // State management.
196
197 bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
198 bool get _isClosed => (_state & _STATE_CLOSED) != 0;
199 bool get _cancelled => (_state & _STATE_CANCELLED) != 0;
floitsch 2013/05/22 16:26:29 isCanceled
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
200 bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
201 bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
202 bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
203 bool get _canFire => _state < _STATE_IN_CALLBACK;
204 bool get _mayResumeInput =>
205 !_isPaused && (_pending == null || _pending.isEmpty);
floitsch 2013/05/22 16:26:29 shouldn't _hasPending do the null-check too?
Lasse Reichstein Nielsen 2013/05/24 06:02:49 _hasPending is accessing a cached bit in the state
206 bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
207
208 bool get isPaused => _isPaused;
209
210 void _cancel() {
211 _state |= _STATE_CANCELLED;
212 if (_hasPending) {
213 _pending.clear();
214 }
215 }
216
217 /**
218 * Increment the pause count.
219 *
220 * Also marks input as paused.
221 */
222 void _incrementPauseCount() {
223 _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
224 }
225
226 /**
227 * Decrements the pause count.
228 *
229 * Does not automatically unpause the input (call [_onResume]) when
230 * the pause count reaches zero. This is handled elsewhere, and only
231 * if there are no pending events buffered.
232 */
233 void _decrementPauseCount() {
234 assert(_isPaused);
235 _state -= _STATE_PAUSE_COUNT;
236 }
237
238 // _EventSink interface.
239
240 void _add(T data) {
241 assert(!_isClosed);
242 if (_cancelled) return;
243 if (_canFire) {
244 _sendData(data);
245 } else {
246 _addPending(new _DelayedData(data));
247 }
248 }
249
250 void _addError(Object error) {
251 if (_cancelled) return;
252 if (_cancelOnError) {
253 /// TODO: handle here?
254 }
255 if (_canFire) {
256 _sendError(error); // Reports cancel after sending.
257 } else {
258 _addPending(new _DelayedError(error));
259 }
260 }
261
262 void _close() {
263 assert(!_isClosed);
264 if (_cancelled) return;
265 _state |= _STATE_CLOSED;
266 if (_canFire) {
267 _sendDone();
268 } else {
269 _addPending(const _DelayedDone());
270 }
271 }
272
273 // Hooks called when the input is paused, unpaused or cancelled.
274 // These must not throw. If overwritten to call user code, include suitable
275 // try/catch wrapping and send any errors to [_throwDelayed].
276 void _onPause() {
277 assert(_isInputPaused);
278 }
279
280 void _onResume() {
281 assert(!_isInputPaused);
282 }
283
284 void _onCancel() {
285 assert(_cancelled);
286 }
287
288 // Handle pending events.
289
290 /**
291 * Add a pending event.
292 *
293 * If the subscription is not paused, this also schedules a firing
294 * of pending events later (if necessary).
295 */
296 void _addPending(_DelayedEvent event) {
297 _StreamImplEvents pending = _pending;
298 if (_pending == null) pending = _pending = new _StreamImplEvents();
299 pending.add(event);
300 if (!_hasPending) {
301 _state |= _STATE_HAS_PENDING;
302 if (!_isPaused) {
303 _pending.schedule(this);
304 }
305 }
306 }
307
308 /* _EventDispatch interface. */
309
310 void _sendData(T data) {
311 assert(!_cancelled);
312 assert(!_isPaused);
313 assert(!_inCallback);
314 bool wasInputPaused = _isInputPaused;
315 _state |= _STATE_IN_CALLBACK;
316 try {
floitsch 2013/05/22 16:26:29 move the `try`s into a separate function?
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Rather not. It would only be used for _send{Data,E
317 _onData(data);
318 } catch (e, s) {
319 _throwDelayed(e, s);
320 }
321 _state &= ~_STATE_IN_CALLBACK;
322 _checkState(wasInputPaused);
323 }
324
325 void _sendError(var error) {
326 assert(!_cancelled);
327 assert(!_isPaused);
328 assert(!_inCallback);
329 bool wasInputPaused = _isInputPaused;
330 _state |= _STATE_IN_CALLBACK;
331 try {
332 _onError(error);
333 } catch (e, s) {
334 _throwDelayed(e, s);
335 }
336 _state &= ~_STATE_IN_CALLBACK;
337 if (_cancelOnError) {
338 _cancel();
339 }
340 _checkState(wasInputPaused);
341 }
342
343 void _sendDone() {
344 assert(!_cancelled);
345 assert(!_isPaused);
346 assert(!_inCallback);
347 _state |= (_STATE_CANCELLED | _STATE_CLOSED | _STATE_IN_CALLBACK);
348 try {
349 _onDone();
350 } catch (e, s) {
351 _throwDelayed(e, s);
352 }
353 try {
354 _onCancel(); // No checkState after cancel, it is always the last event.
355 } catch (e, s) {
356 print("BAD THROW 5: \n$s");
357 }
358 _state &= ~_STATE_IN_CALLBACK;
359 }
360
361 /**
362 * Call a hook function.
363 *
364 * The call is properly wrapped in code to avoid other callbacks
365 * during the call, and it checks for state changes after the call
366 * that should cause further callbacks.
367 */
368 void _guardCallback(callback) {
369 assert(!_inCallback);
370 bool wasInputPaused = _isInputPaused;
371 _state |= _STATE_IN_CALLBACK;
372 try {
373 callback();
374 } catch (e, s) {
375 print("BAD THROW 2: \n$s");
376 }
377 _state &= ~_STATE_IN_CALLBACK;
378 _checkState(wasInputPaused);
379 }
380
381 /**
382 * Check if the input needs to be informed of state changes.
383 *
384 * State changes are pausing, resuming and cancelling.
385 * After cancelling, no further callbacks will happen.
floitsch 2013/05/22 16:26:29 separate with more new lines.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
386 * The cancel callback is called after a user cancel, or after
387 * the final done event is sent.
388 */
389 void _checkState(bool wasInputPaused) {
390 assert(!_inCallback);
391 if (_hasPending && _pending.isEmpty) {
392 _state &= ~_STATE_HAS_PENDING;
393 if (_isInputPaused && _mayResumeInput) {
394 _state &= ~_STATE_INPUT_PAUSED;
395 }
396 }
397 while (true) {
floitsch 2013/05/22 16:26:29 Add comment why we need a loop.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Ok.
398 if (_cancelled) {
399 try {
400 _onCancel();
401 } catch (e, s) {
402 print("BAD THROW 3: \n$s");
403 }
404 return;
405 }
406 bool isInputPaused = _isInputPaused;
407 if (wasInputPaused == isInputPaused) break;
408 _state ^= _STATE_IN_CALLBACK;
409 try {
410 if (isInputPaused) {
411 _onPause();
412 } else {
413 _onResume();
414 }
415 } catch (e, s) {
416 print("BAD THROW 4: \n$s");
417 }
418 _state &= ~_STATE_IN_CALLBACK;
419 wasInputPaused = isInputPaused;
420 }
421 if (_hasPending && !_isPaused) {
422 _pending.schedule(this);
423 }
424 }
425 }
72 426
73 // ------------------------------------------------------------------- 427 // -------------------------------------------------------------------
74 // Common base class for single and multi-subscription streams. 428 // Common base class for single and multi-subscription streams.
75 // ------------------------------------------------------------------- 429 // -------------------------------------------------------------------
76 abstract class _StreamImpl<T> extends Stream<T> { 430 abstract class _StreamImpl<T> extends Stream<T> {
77 /** Current state of the stream. */
78 int _state = _STREAM_OPEN;
79
80 /**
81 * List of pending events.
82 *
83 * If events are added to the stream (using [_add], [_addError] or [_done])
84 * while the stream is paused, or while another event is firing, events will
85 * stored here.
86 * Also supports scheduling the events for later execution.
87 */
88 _PendingEvents _pendingEvents;
89
90 // ------------------------------------------------------------------ 431 // ------------------------------------------------------------------
91 // Stream interface. 432 // Stream interface.
92 433
93 StreamSubscription<T> listen(void onData(T data), 434 StreamSubscription<T> listen(void onData(T data),
94 { void onError(error), 435 { void onError(error),
95 void onDone(), 436 void onDone(),
96 bool cancelOnError }) { 437 bool cancelOnError }) {
97 if (_isComplete) {
98 return new _DoneSubscription(onDone);
99 }
100 if (onData == null) onData = _nullDataHandler; 438 if (onData == null) onData = _nullDataHandler;
101 if (onError == null) onError = _nullErrorHandler; 439 if (onError == null) onError = _nullErrorHandler;
102 if (onDone == null) onDone = _nullDoneHandler; 440 if (onDone == null) onDone = _nullDoneHandler;
103 cancelOnError = identical(true, cancelOnError); 441 cancelOnError = identical(true, cancelOnError);
104 _StreamSubscriptionImpl subscription = 442 StreamSubscription subscription =
105 _createSubscription(onData, onError, onDone, cancelOnError); 443 _createSubscription(onData, onError, onDone, cancelOnError);
106 _addListener(subscription); 444 _onListen(subscription);
107 return subscription; 445 return subscription;
108 } 446 }
109 447
110 // ------------------------------------------------------------------
111 // EventSink interface-like methods for sending events into the stream.
112 // It's the responsibility of the caller to ensure that the stream is not
113 // paused when adding events. If the stream is paused, the events will be
114 // queued, but it's better to not send events at all.
115
116 /**
117 * Send or queue a data event.
118 */
119 void _add(T value) {
120 if (_isClosed) throw new StateError("Sending on closed stream");
121 if (!_mayFireState) {
122 // Not the time to send events.
123 _addPendingEvent(new _DelayedData<T>(value));
124 return;
125 }
126 if (_hasPendingEvent) {
127 _addPendingEvent(new _DelayedData<T>(value));
128 } else {
129 _sendData(value);
130 }
131 _handlePendingEvents();
132 }
133
134 /**
135 * Send or enqueue an error event.
136 *
137 * If a subscription has requested to be unsubscribed on errors,
138 * it will be unsubscribed after receiving this event.
139 */
140 void _addError(error) {
141 if (_isClosed) throw new StateError("Sending on closed stream");
142 if (!_mayFireState) {
143 // Not the time to send events.
144 _addPendingEvent(new _DelayedError(error));
145 return;
146 }
147 if (_hasPendingEvent) {
148 _addPendingEvent(new _DelayedError(error));
149 } else {
150 _sendError(error);
151 }
152 _handlePendingEvents();
153 }
154
155 /**
156 * Send or enqueue a "done" message.
157 *
158 * The "done" message should be sent at most once by a stream, and it
159 * should be the last message sent.
160 */
161 void _close() {
162 if (_isClosed) return;
163 _state |= _STREAM_CLOSED;
164 if (!_mayFireState) {
165 // Not the time to send events.
166 _addPendingEvent(const _DelayedDone());
167 return;
168 }
169 if (_hasPendingEvent) {
170 _addPendingEvent(new _DelayedDone());
171 _handlePendingEvents();
172 } else {
173 _sendDone();
174 assert(_isComplete);
175 assert(!_hasPendingEvent);
176 }
177 }
178
179 // ------------------------------------------------------------------- 448 // -------------------------------------------------------------------
180 // Internal implementation.
181
182 // State predicates.
183
184 // Lifecycle state.
185 /** Whether the stream is in the default, open, state for events. */
186 bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0;
187
188 /** Whether the stream has been closed (a done event requested). */
189 bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
190
191 /** Whether the stream is completed. */
192 bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
193
194 // Pause state.
195
196 /** Whether one or more active subscribers have requested a pause. */
197 bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
198
199 /** How many times the stream has been paused. */
200 int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT;
201
202 /**
203 * Whether a controller thinks the stream is paused.
204 *
205 * When this changes, a pause-state change callback is performed.
206 *
207 * It may differ from [_isPaused] if there are pending events
208 * in the queue when the listeners resume. The controller won't
209 * be informed until all queued events have been fired.
210 */
211 bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME);
212
213 /** Whether we have a pending resume scheduled. */
214 bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0;
215
216
217 // Action state. If the stream makes a call-out to external code,
218 // this state tracks it and avoids reentrancy problems.
219
220 /** Whether the stream is not currently firing or calling a callback. */
221 bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0;
222
223 /** Whether we are currently executing a state-chance callback. */
224 bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0;
225
226 /** Whether we are currently firing an event. */
227 bool get _isFiring => (_state & _STREAM_FIRING) != 0;
228
229 /** Check whether the pending event queue is non-empty */
230 bool get _hasPendingEvent =>
231 _pendingEvents != null && !_pendingEvents.isEmpty;
232
233 /**
234 * The bit representing the current or last event fired.
235 *
236 * This bit matches a bit on listeners that have received the corresponding
237 * event. It is toggled for each new event being fired.
238 */
239 int get _currentEventIdBit =>
240 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
241
242 /** Whether there is currently a subscriber on this [Stream]. */
243 bool get _hasListener;
244
245
246 /** Whether the state bits allow firing. */
247 bool get _mayFireState {
248 // The state allows firing unless:
249 // - it's currently firing
250 // - it's currently in a callback
251 // - it's paused
252 const int mask =
253 _STREAM_FIRING |
254 _STREAM_CALLBACK |
255 ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1);
256 return (_state & mask) == 0;
257 }
258
259 // State modification.
260
261 /** Record an increases in the number of times the listener has paused. */
262 void _incrementPauseCount(_StreamListener<T> listener) {
263 listener._incrementPauseCount();
264 _state &= ~_STREAM_PENDING_RESUME;
265 _updatePauseCount(1);
266 }
267
268 /** Record a decrease in the number of times the listener has paused. */
269 void _decrementPauseCount(_StreamListener<T> listener) {
270 assert(_isPaused);
271 listener._decrementPauseCount();
272 _updatePauseCount(-1);
273 }
274
275 /** Update the stream's own pause count only. */
276 void _updatePauseCount(int by) {
277 int oldState = _state;
278 // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js
279 // converts the result of the left-shift to a positive number.
280 if (by >= 0) {
281 _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT);
282 } else {
283 _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT);
284 }
285 assert(_state >= 0);
286 assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) ==
287 (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by);
288 }
289
290 void _setClosed() {
291 assert(!_isClosed);
292 _state |= _STREAM_CLOSED;
293 }
294
295 void _setComplete() {
296 assert(_isClosed);
297 _state = _state |_STREAM_COMPLETE;
298 }
299
300 void _startFiring() {
301 assert(!_isFiring);
302 assert(!_isInCallback);
303 assert(_hasListener);
304 assert(!_isPaused);
305 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
306 // bit. All current subscribers will now have a _LISTENER_EVENT_ID
307 // that doesn't match _STREAM_EVENT_ID, and they will receive the
308 // event being fired.
309 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
310 }
311
312 void _endFiring(bool wasInputPaused) {
313 assert(_isFiring);
314 _state ^= _STREAM_FIRING;
315 // Had listeners, or we wouldn't have fired.
316 _checkCallbacks(true, wasInputPaused);
317 }
318
319 /**
320 * Record that a listener wants a pause from events.
321 *
322 * This methods is called from [_StreamListener.pause()].
323 * Subclasses can override this method, along with [isPaused] and
324 * [createSubscription], if they want to do a different handling of paused
325 * subscriptions, e.g., a filtering stream pausing its own source if all its
326 * subscribers are paused.
327 */
328 void _pause(_StreamListener<T> listener, Future resumeSignal) {
329 assert(identical(listener._source, this));
330 if (!listener._isSubscribed) {
331 throw new StateError("Subscription has been canceled.");
332 }
333 assert(!_isComplete); // There can be no subscribers when complete.
334 bool wasInputPaused = _isInputPaused;
335 bool wasPaused = _isPaused;
336 _incrementPauseCount(listener);
337 if (resumeSignal != null) {
338 resumeSignal.whenComplete(() { this._resume(listener, true); });
339 }
340 if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) {
341 _pendingEvents.cancelSchedule();
342 }
343 if (_isInactive && !wasInputPaused) {
344 _checkCallbacks(true, false);
345 if (!_isPaused && _hasPendingEvent) {
346 _schedulePendingEvents();
347 }
348 }
349 }
350
351 /** Stops pausing due to one request from the given listener. */
352 void _resume(_StreamListener<T> listener, bool fromEvent) {
353 if (!listener.isPaused) return;
354 assert(listener._isSubscribed);
355 assert(_isPaused);
356 _decrementPauseCount(listener);
357 if (!_isPaused) {
358 if (_hasPendingEvent) {
359 _state |= _STREAM_PENDING_RESUME;
360 // Controller's pause state hasn't changed.
361 // If we can fire events now, fire any pending events right away.
362 if (_isInactive) {
363 if (fromEvent) {
364 _handlePendingEvents();
365 } else {
366 _schedulePendingEvents();
367 }
368 }
369 } else if (_isInactive) {
370 _checkCallbacks(true, true);
371 if (!_isPaused && _hasPendingEvent) {
372 if (fromEvent) {
373 _handlePendingEvents();
374 } else {
375 _schedulePendingEvents();
376 }
377 }
378 }
379 }
380 }
381
382 /** Schedule pending events to be executed. */
383 void _schedulePendingEvents() {
384 assert(_hasPendingEvent);
385 _pendingEvents.schedule(this);
386 }
387
388 /** Create a subscription object. Called by [subcribe]. */ 449 /** Create a subscription object. Called by [subcribe]. */
389 _StreamSubscriptionImpl<T> _createSubscription( 450 _BufferingStreamSubscription<T> _createSubscription(
390 void onData(T data),
391 void onError(error),
392 void onDone(),
393 bool cancelOnError);
394
395 /**
396 * Adds a listener to this stream.
397 */
398 void _addListener(_StreamSubscriptionImpl subscription);
399
400 /**
401 * Handle a cancel requested from a [_StreamSubscriptionImpl].
402 *
403 * This method is called from [_StreamSubscriptionImpl.cancel].
404 *
405 * If an event is currently firing, the cancel is delayed
406 * until after the subscribers have received the event.
407 */
408 void _cancel(_StreamSubscriptionImpl subscriber);
409
410 /**
411 * Iterate over all current subscribers and perform an action on each.
412 *
413 * Subscribers added during the iteration will not be visited.
414 * Subscribers unsubscribed during the iteration will only be removed
415 * after they have been acted on.
416 *
417 * Any change in the pause state is only reported after all subscribers have
418 * received the event.
419 *
420 * The [action] must not throw, or the controller will be left in an
421 * invalid state.
422 *
423 * This method must not be called while [isFiring] is true.
424 */
425 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
426
427 /**
428 * Checks whether the subscription/pause state has changed.
429 *
430 * Calls the appropriate callback if the state has changed from the
431 * provided one. Repeats calling callbacks as long as the call changes
432 * the state.
433 */
434 void _checkCallbacks(bool hadListener, bool wasPaused) {
435 assert(!_isFiring);
436 // Will be handled after the current callback.
437 if (_isInCallback) return;
438 if (_hasPendingResume && !_hasPendingEvent) {
439 _state ^= _STREAM_PENDING_RESUME;
440 }
441 _state |= _STREAM_CALLBACK;
442 while (true) {
443 bool hasListener = _hasListener;
444 bool isPaused = _isInputPaused;
445 if (hadListener != hasListener) {
446 _onSubscriptionStateChange();
447 } else if (isPaused != wasPaused) {
448 _onPauseStateChange();
449 } else {
450 _state ^= _STREAM_CALLBACK;
451 return;
452 }
453 wasPaused = isPaused;
454 hadListener = hasListener;
455 }
456 }
457
458 /**
459 * Called when the first subscriber requests a pause or the last a resume.
460 *
461 * Read [isPaused] to see the new state.
462 */
463 void _onPauseStateChange() {}
464
465 /**
466 * Called when the first listener subscribes or the last unsubscribes.
467 *
468 * Read [hasListener] to see what the new state is.
469 */
470 void _onSubscriptionStateChange() {}
471
472 /**
473 * Add a pending event at the end of the pending event queue.
474 *
475 * Schedules events if currently not paused and inside a callback.
476 */
477 void _addPendingEvent(_DelayedEvent event) {
478 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
479 _StreamImplEvents events = _pendingEvents;
480 events.add(event);
481 if (_isPaused || _isFiring) return;
482 if (_isInCallback) {
483 _schedulePendingEvents();
484 return;
485 }
486 }
487
488 /** Fire any pending events until the pending event queue is empty. */
489 void _handlePendingEvents() {
490 assert(_isInactive);
491 if (!_hasPendingEvent) return;
492 _PendingEvents events = _pendingEvents;
493 do {
494 if (_isPaused) return;
495 if (events.isScheduled) events.cancelSchedule();
496 events.handleNext(this);
497 } while (!events.isEmpty);
498 }
499
500 /**
501 * Send a data event directly to each subscriber.
502 */
503 _sendData(T value) {
504 assert(!_isPaused);
505 assert(!_isComplete);
506 if (!_hasListener) return;
507 _forEachSubscriber((subscriber) {
508 try {
509 subscriber._sendData(value);
510 } catch (e, s) {
511 _throwDelayed(e, s);
512 }
513 });
514 }
515
516 /**
517 * Sends an error event directly to each subscriber.
518 */
519 void _sendError(error) {
520 assert(!_isPaused);
521 assert(!_isComplete);
522 if (!_hasListener) return;
523 _forEachSubscriber((subscriber) {
524 try {
525 subscriber._sendError(error);
526 } catch (e, s) {
527 _throwDelayed(e, s);
528 }
529 });
530 }
531
532 /**
533 * Sends the "done" message directly to each subscriber.
534 * This automatically stops further subscription and
535 * unsubscribes all subscribers.
536 */
537 void _sendDone() {
538 assert(!_isPaused);
539 assert(_isClosed);
540 _setComplete();
541 if (!_hasListener) return;
542 _forEachSubscriber((subscriber) {
543 _cancel(subscriber);
544 try {
545 subscriber._sendDone();
546 } catch (e, s) {
547 _throwDelayed(e, s);
548 }
549 });
550 assert(!_hasListener);
551 }
552 }
553
554 // -------------------------------------------------------------------
555 // Default implementation of a stream with a single subscriber.
556 // -------------------------------------------------------------------
557 /**
558 * Default implementation of stream capable of sending events to one subscriber.
559 *
560 * Any class needing to implement [Stream] can either directly extend this
561 * class, or extend [Stream] and delegate the subscribe method to an instance
562 * of this class.
563 *
564 * The only public methods are those of [Stream], so instances of
565 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
566 * internal functionality.
567 *
568 * The [StreamController] is a public facing version of this class, with
569 * some methods made public.
570 *
571 * The user interface of [_SingleStreamImpl] are the following methods:
572 * * [_add]: Add a data event to the stream.
573 * * [_addError]: Add an error event to the stream.
574 * * [_close]: Request to close the stream.
575 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
576 * when losing the last subscriber.
577 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
578 * * [_hasListener]: Test whether there are currently any subscribers.
579 * * [_isInputPaused]: Test whether the stream is currently paused.
580 * The user should not add new events while the stream is paused, but if it
581 * happens anyway, the stream will enqueue the events just as when new events
582 * arrive while still firing an old event.
583 */
584 class _SingleStreamImpl<T> extends _StreamImpl<T> {
585 _StreamListener _subscriber = null;
586
587 /** Whether there is currently a subscriber on this [Stream]. */
588 bool get _hasListener => _subscriber != null;
589
590 // -------------------------------------------------------------------
591 // Internal implementation.
592
593 _SingleStreamImpl() {
594 // Start out paused.
595 _updatePauseCount(1);
596 }
597
598 /**
599 * Create the new subscription object.
600 */
601 _StreamSubscriptionImpl<T> _createSubscription(
602 void onData(T data), 451 void onData(T data),
603 void onError(error), 452 void onError(error),
604 void onDone(), 453 void onDone(),
605 bool cancelOnError) { 454 bool cancelOnError) {
606 return new _StreamSubscriptionImpl<T>( 455 return new _BufferingStreamSubscription<T>(
607 this, onData, onError, onDone, cancelOnError); 456 onData, onError, onDone, cancelOnError, null);
608 } 457 }
609 458
610 void _addListener(_StreamListener subscription) { 459 /** Hook called when the subscription has been created. */
611 assert(!_isComplete); 460 void _onListen(StreamSubscription subscription) {}
612 if (_hasListener) { 461 }
613 throw new StateError("Stream already has subscriber."); 462
614 } 463 typedef _PendingEvents _EventGenerator();
615 assert(_pauseCount == 1);
616 _updatePauseCount(-1);
617 _subscriber = subscription;
618 subscription._setSubscribed(0);
619 if (_isInactive) {
620 _checkCallbacks(false, true);
621 if (!_isPaused && _hasPendingEvent) {
622 _schedulePendingEvents();
623 }
624 }
625 }
626
627 /**
628 * Handle a cancel requested from a [_StreamSubscriptionImpl].
629 *
630 * This method is called from [_StreamSubscriptionImpl.cancel].
631 */
632 void _cancel(_StreamListener subscriber) {
633 assert(identical(subscriber._source, this));
634 // We allow unsubscribing the currently firing subscription during
635 // the event firing, because it is indistinguishable from delaying it since
636 // that event has already received the event.
637 if (!identical(_subscriber, subscriber)) {
638 // You may unsubscribe more than once, only the first one counts.
639 return;
640 }
641 _subscriber = null;
642 // Unsubscribing a paused subscription also cancels its pauses.
643 int resumeCount = subscriber._setUnsubscribed();
644 // Keep being paused while there is no subscriber and the stream is not
645 // complete.
646 _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1);
647 if (_isInactive) {
648 _checkCallbacks(true, resumeCount > 0);
649 if (!_isPaused && _hasPendingEvent) {
650 _schedulePendingEvents();
651 }
652 }
653 }
654
655 void _forEachSubscriber(
656 void action(_StreamListener<T> subscription)) {
657 assert(!_isPaused);
658 bool wasInputPaused = _isInputPaused;
659 _StreamListener subscription = _subscriber;
660 assert(subscription != null);
661 _startFiring();
662 action(subscription);
663 _endFiring(wasInputPaused);
664 }
665 }
666
667 // -------------------------------------------------------------------
668 // Default implementation of a stream with subscribers.
669 // -------------------------------------------------------------------
670
671 /**
672 * Default implementation of stream capable of sending events to subscribers.
673 *
674 * Any class needing to implement [Stream] can either directly extend this
675 * class, or extend [Stream] and delegate the subscribe method to an instance
676 * of this class.
677 *
678 * The only public methods are those of [Stream], so instances of
679 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
680 * internal functionality.
681 *
682 * The [StreamController] is a public facing version of this class, with
683 * some methods made public.
684 *
685 * The user interface of [_MultiStreamImpl] are the following methods:
686 * * [_add]: Add a data event to the stream.
687 * * [_addError]: Add an error event to the stream.
688 * * [_close]: Request to close the stream.
689 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
690 * when losing the last subscriber.
691 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
692 * * [_hasListener]: Test whether there are currently any subscribers.
693 * * [_isPaused]: Test whether the stream is currently paused.
694 * The user should not add new events while the stream is paused, but if it
695 * happens anyway, the stream will enqueue the events just as when new events
696 * arrive while still firing an old event.
697 */
698 class _MultiStreamImpl<T> extends _StreamImpl<T>
699 implements _InternalLinkList {
700 // Link list implementation (mixin when possible).
701 _InternalLink _nextLink;
702 _InternalLink _previousLink;
703
704 _MultiStreamImpl() {
705 _nextLink = _previousLink = this;
706 }
707
708 bool get isBroadcast => true;
709
710 Stream<T> asBroadcastStream() => this;
711
712 // ------------------------------------------------------------------
713 // Helper functions that can be overridden in subclasses.
714
715 /** Whether there are currently any subscribers on this [Stream]. */
716 bool get _hasListener => !_InternalLinkList.isEmpty(this);
717
718 /**
719 * Create the new subscription object.
720 */
721 _StreamListener<T> _createSubscription(
722 void onData(T data),
723 void onError(error),
724 void onDone(),
725 bool cancelOnError) {
726 return new _StreamSubscriptionImpl<T>(
727 this, onData, onError, onDone, cancelOnError);
728 }
729
730 // -------------------------------------------------------------------
731 // Internal implementation.
732
733 /**
734 * Iterate over all current subscribers and perform an action on each.
735 *
736 * The set of subscribers cannot be modified during this iteration.
737 * All attempts to add or unsubscribe subscribers will be delayed until
738 * after the iteration is complete.
739 *
740 * The [action] must not throw, or the controller will be left in an
741 * invalid state.
742 *
743 * This method must not be called while [isFiring] is true.
744 */
745 void _forEachSubscriber(
746 void action(_StreamListener<T> subscription)) {
747 assert(!_isFiring);
748 if (!_hasListener) return;
749 bool wasInputPaused = _isInputPaused;
750 _startFiring();
751 _InternalLink cursor = this._nextLink;
752 while (!identical(cursor, this)) {
753 _StreamListener<T> current = cursor;
754 if (current._needsEvent(_currentEventIdBit)) {
755 action(current);
756 // Marks as having received the event.
757 current._toggleEventReceived();
758 }
759 cursor = current._nextLink;
760 if (current._isPendingUnsubscribe) {
761 _removeListener(current);
762 }
763 }
764 _endFiring(wasInputPaused);
765 }
766
767 void _addListener(_StreamListener listener) {
768 listener._setSubscribed(_currentEventIdBit);
769 bool hadListener = _hasListener;
770 _InternalLinkList.add(this, listener);
771 if (!hadListener && _isInactive) {
772 _checkCallbacks(false, false);
773 if (!_isPaused && _hasPendingEvent) {
774 _schedulePendingEvents();
775 }
776 }
777 }
778
779 /**
780 * Handle a cancel requested from a [_StreamListener].
781 *
782 * This method is called from [_StreamListener.cancel].
783 *
784 * If an event is currently firing, the cancel is delayed
785 * until after the subscribers have received the event.
786 */
787 void _cancel(_StreamListener listener) {
788 assert(identical(listener._source, this));
789 if (_InternalLink.isUnlinked(listener)) {
790 // You may unsubscribe more than once, only the first one counts.
791 return;
792 }
793 if (_isFiring) {
794 if (listener._needsEvent(_currentEventIdBit)) {
795 assert(listener._isSubscribed);
796 listener._setPendingUnsubscribe(_currentEventIdBit);
797 } else {
798 // The listener has been notified of the event (or don't need to,
799 // if it's still pending subscription) so it's safe to remove it.
800 _removeListener(listener);
801 }
802 // Pause and subscription state changes are reported when we end
803 // firing.
804 } else {
805 bool wasInputPaused = _isInputPaused;
806 _removeListener(listener);
807 if (_isInactive) {
808 _checkCallbacks(true, wasInputPaused);
809 if (!_isPaused && _hasPendingEvent) {
810 _schedulePendingEvents();
811 }
812 }
813 }
814 }
815
816 /**
817 * Removes a listener from this stream and cancels its pauses.
818 *
819 * This is a low-level action that doesn't call [_onSubscriptionStateChange].
820 * or [_callOnPauseStateChange].
821 */
822 void _removeListener(_StreamListener listener) {
823 int pauseCount = listener._setUnsubscribed();
824 _InternalLinkList.remove(listener);
825 if (pauseCount > 0) {
826 _updatePauseCount(-pauseCount);
827 if (!_isPaused && _hasPendingEvent) {
828 _state |= _STREAM_PENDING_RESUME;
829 }
830 }
831 }
832 }
833
834 464
835 /** Stream that generates its own events. */ 465 /** Stream that generates its own events. */
836 class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { 466 class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
837 /** 467 final _EventGenerator _pending;
838 * Initializes the stream to have only the events provided by [events]. 468 /**
839 * 469 * Initializes the stream to have only the events provided by a
840 * A [_PendingEvents] implementation provides events that are handled 470 * [_PendingEvents].
841 * by calling [_PendingEvents.handleNext] with the [_StreamImpl]. 471 *
842 */ 472 * A new [_PendingEvents] must be generated for each listen.
843 _GeneratedSingleStreamImpl(_PendingEvents events) { 473 */
844 _pendingEvents = events; 474 _GeneratedStreamImpl(this._pending);
845 _setClosed(); // Closed for input since all events are already pending. 475
846 } 476 StreamSubscription _createSubscription(void onData(T data),
847 477 void onError(Object error),
848 void _add(T value) { 478 void onDone(),
849 throw new UnsupportedError("Cannot inject events into generated stream"); 479 bool cancelOnError) {
850 } 480 return new _BufferingStreamSubscription(
851 481 onData, onError, onDone, cancelOnError, _pending());
852 void _addError(value) {
853 throw new UnsupportedError("Cannot inject events into generated stream");
854 }
855
856 void _close() {
857 throw new UnsupportedError("Cannot inject events into generated stream");
858 } 482 }
859 } 483 }
860 484
861 485
862 /** Pending events object that gets its events from an [Iterable]. */ 486 /** Pending events object that gets its events from an [Iterable]. */
863 class _IterablePendingEvents<T> extends _PendingEvents { 487 class _IterablePendingEvents<T> extends _PendingEvents {
488 // The stream has been cancelled by an error, but hasn't sent a final
489 // "Done" event yet.
490 static const int _STATE_CANCELLED = 1;
491 // The stream is completely done.
492 static const int _STATE_CLOSED = 3;
493
864 final Iterator<T> _iterator; 494 final Iterator<T> _iterator;
495 int _iterationState = 0;
496
497 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
498
865 /** 499 /**
866 * Whether there are no more events to be sent. 500 * Whether there are no more events to be sent.
867 * 501 *
868 * This starts out as [:false:] since there is always at least 502 * This starts out as [:false:] since there is always at least
869 * a 'done' event to be sent. 503 * a 'done' event to be sent.
870 */ 504 */
871 bool _isDone = false; 505 bool get _isDone => _iterationState == _STATE_CLOSED;
872 506 bool get _isCancelled => _iterationState == _STATE_CANCELLED;
873 _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
874 507
875 bool get isEmpty => _isDone; 508 bool get isEmpty => _isDone;
876 509
877 void handleNext(_StreamImpl<T> stream) { 510 void handleNext(_EventDispatch dispatch) {
878 if (_isDone) throw new StateError("No events pending."); 511 if (_isCancelled) {
512 _iterationState = _STATE_CLOSED;
513 dispatch._sendDone();
514 return;
515 }
516 if (_isDone) {
517 throw new StateError("No events pending.");
518 }
519 bool isDone;
879 try { 520 try {
880 _isDone = !_iterator.moveNext(); 521 isDone = !_iterator.moveNext();
881 if (!_isDone) {
882 stream._sendData(_iterator.current);
883 } else {
884 stream._sendDone();
885 }
886 } catch (e, s) { 522 } catch (e, s) {
887 stream._sendError(_asyncError(e, s)); 523 _iterationState = _STATE_CANCELLED; // Will send a single done after this .
floitsch 2013/05/22 16:26:29 long line. I don't think it makes sense to send a
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Good point. Some listeners might be left hanging,
888 stream._sendDone(); 524 dispatch._sendError(_asyncError(e, s));
889 _isDone = true; 525 return;
890 } 526 }
891 } 527 if (!isDone) {
892 } 528 dispatch._sendData(_iterator.current);
893 529 } else {
894 530 _iterationState = _STATE_CLOSED;
895 /** 531 dispatch._sendDone();
896 * The subscription class that the [StreamController] uses. 532 }
897 * 533 }
898 * The [_StreamImpl.createSubscription] method should 534
899 * create an object of this type, or another subclass of [_StreamListener]. 535 void clear() {
900 * A subclass of [_StreamImpl] can specify which subclass 536 if (isScheduled) cancelSchedule();
901 * of [_StreamSubscriptionImpl] it uses by overriding 537 _state = _STATE_CLOSED;
902 * [_StreamImpl.createSubscription]. 538 }
903 * 539 }
904 * The subscription is in one of three states: 540
905 * * Subscribed.
906 * * Paused-and-subscribed.
907 * * Unsubscribed.
908 * Unsubscribing also resumes any pauses started by the subscription.
909 */
910 class _StreamSubscriptionImpl<T> extends _StreamListener<T>
911 implements StreamSubscription<T> {
912 final bool _cancelOnError;
913 // TODO(ahe): Restore type when feature is implemented in dart2js
914 // checked mode. http://dartbug.com/7733
915 var /* _DataHandler<T> */ _onData;
916 _ErrorHandler _onError;
917 _DoneHandler _onDone;
918 _StreamSubscriptionImpl(_StreamImpl source,
919 this._onData,
920 this._onError,
921 this._onDone,
922 this._cancelOnError) : super(source);
923
924 void onData(void handleData(T event)) {
925 if (handleData == null) handleData = _nullDataHandler;
926 _onData = handleData;
927 }
928
929 void onError(void handleError(error)) {
930 if (handleError == null) handleError = _nullErrorHandler;
931 _onError = handleError;
932 }
933
934 void onDone(void handleDone()) {
935 if (handleDone == null) handleDone = _nullDoneHandler;
936 _onDone = handleDone;
937 }
938
939 void _sendData(T data) {
940 _onData(data);
941 }
942
943 void _sendError(error) {
944 _onError(error);
945 if (_cancelOnError) _source._cancel(this);
946 }
947
948 void _sendDone() {
949 _onDone();
950 }
951
952 void cancel() {
953 if (!_isSubscribed) return;
954 _source._cancel(this);
955 }
956
957 void pause([Future resumeSignal]) {
958 if (!_isSubscribed) return;
959 _source._pause(this, resumeSignal);
960 }
961
962 void resume() {
963 if (!_isSubscribed || !isPaused) return;
964 _source._resume(this, false);
965 }
966
967 Future asFuture([var futureValue]) {
968 _FutureImpl<T> result = new _FutureImpl<T>();
969
970 // Overwrite the onDone and onError handlers.
971 onDone(() { result._setValue(futureValue); });
972 onError((error) {
973 cancel();
974 result._setError(error);
975 });
976
977 return result;
978 }
979 }
980 541
981 // Internal helpers. 542 // Internal helpers.
982 543
983 // Types of the different handlers on a stream. Types used to type fields. 544 // Types of the different handlers on a stream. Types used to type fields.
984 typedef void _DataHandler<T>(T value); 545 typedef void _DataHandler<T>(T value);
985 typedef void _ErrorHandler(error); 546 typedef void _ErrorHandler(error);
986 typedef void _DoneHandler(); 547 typedef void _DoneHandler();
987 548
988 549
989 /** Default data handler, does nothing. */ 550 /** Default data handler, does nothing. */
990 void _nullDataHandler(var value) {} 551 void _nullDataHandler(var value) {}
991 552
992 /** Default error handler, reports the error to the global handler. */ 553 /** Default error handler, reports the error to the global handler. */
993 void _nullErrorHandler(error) { 554 void _nullErrorHandler(error) {
994 _throwDelayed(error); 555 _throwDelayed(error);
995 } 556 }
996 557
997 /** Default done handler, does nothing. */ 558 /** Default done handler, does nothing. */
998 void _nullDoneHandler() {} 559 void _nullDoneHandler() {}
999 560
1000 561
1001 /** A delayed event on a stream implementation. */ 562 /** A delayed event on a buffering stream subscription. */
1002 abstract class _DelayedEvent { 563 abstract class _DelayedEvent {
1003 /** Added as a linked list on the [StreamController]. */ 564 /** Added as a linked list on the [StreamController]. */
1004 _DelayedEvent next; 565 _DelayedEvent next;
1005 /** Execute the delayed event on the [StreamController]. */ 566 /** Execute the delayed event on the [StreamController]. */
1006 void perform(_StreamImpl stream); 567 void perform(_EventDispatch dispatch);
1007 } 568 }
1008 569
1009 /** A delayed data event. */ 570 /** A delayed data event. */
1010 class _DelayedData<T> extends _DelayedEvent{ 571 class _DelayedData<T> extends _DelayedEvent{
1011 final T value; 572 final T value;
1012 _DelayedData(this.value); 573 _DelayedData(this.value);
1013 void perform(_StreamImpl<T> stream) { 574 void perform(_EventDispatch<T> dispatch) {
1014 stream._sendData(value); 575 dispatch._sendData(value);
1015 } 576 }
1016 } 577 }
1017 578
1018 /** A delayed error event. */ 579 /** A delayed error event. */
1019 class _DelayedError extends _DelayedEvent { 580 class _DelayedError extends _DelayedEvent {
1020 final error; 581 final error;
1021 _DelayedError(this.error); 582 _DelayedError(this.error);
1022 void perform(_StreamImpl stream) { 583 void perform(_EventDispatch dispatch) {
1023 stream._sendError(error); 584 dispatch._sendError(error);
1024 } 585 }
1025 } 586 }
1026 587
1027 /** A delayed done event. */ 588 /** A delayed done event. */
1028 class _DelayedDone implements _DelayedEvent { 589 class _DelayedDone implements _DelayedEvent {
1029 const _DelayedDone(); 590 const _DelayedDone();
1030 void perform(_StreamImpl stream) { 591 void perform(_EventDispatch dispatch) {
1031 stream._sendDone(); 592 dispatch._sendDone();
1032 } 593 }
1033 594
1034 _DelayedEvent get next => null; 595 _DelayedEvent get next => null;
1035 596
1036 void set next(_DelayedEvent _) { 597 void set next(_DelayedEvent _) {
1037 throw new StateError("No events after a done."); 598 throw new StateError("No events after a done.");
1038 } 599 }
1039 } 600 }
1040 601
1041 /** 602 /**
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after
1111 listLast._nextLink = otherNext; 672 listLast._nextLink = otherNext;
1112 otherNext._previousLink = listLast; 673 otherNext._previousLink = listLast;
1113 _InternalLink otherLast = other._previousLink; 674 _InternalLink otherLast = other._previousLink;
1114 list._previousLink = otherLast; 675 list._previousLink = otherLast;
1115 otherLast._nextLink = list; 676 otherLast._nextLink = list;
1116 // Clean up [other]. 677 // Clean up [other].
1117 other._nextLink = other._previousLink = other; 678 other._nextLink = other._previousLink = other;
1118 } 679 }
1119 } 680 }
1120 681
1121 /** Abstract type for an internal interface for sending events. */
1122 abstract class _EventOutputSink<T> {
1123 _sendData(T data);
1124 _sendError(error);
1125 _sendDone();
1126 }
1127
1128 abstract class _StreamListener<T> extends _InternalLink
1129 implements _EventOutputSink<T> {
1130 final _StreamImpl _source;
1131 int _state = _LISTENER_UNSUBSCRIBED;
1132
1133 _StreamListener(this._source);
1134
1135 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
1136
1137 bool get _isPendingUnsubscribe =>
1138 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
1139
1140 bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
1141
1142 /**
1143 * Whether the listener still needs to receive the currently firing event.
1144 *
1145 * The currently firing event is identified by a single bit, which alternates
1146 * between events. The [_state] contains the previously sent event's bit in
1147 * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
1148 * still need the current event.
1149 */
1150 bool _needsEvent(int currentEventIdBit) {
1151 int lastEventIdBit =
1152 (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
1153 return lastEventIdBit != currentEventIdBit;
1154 }
1155
1156 /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
1157 /// we are currently firing an event and the subscriber still need to receive
1158 /// the event.
1159 void _toggleEventReceived() {
1160 _state ^= _LISTENER_EVENT_ID;
1161 }
1162
1163 void _setSubscribed(int eventIdBit) {
1164 assert(eventIdBit == 0 || eventIdBit == 1);
1165 _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
1166 }
1167
1168 void _setPendingUnsubscribe(int currentEventIdBit) {
1169 assert(_isSubscribed);
1170 // Sets the pending unsubscribe, and ensures that the listener
1171 // won't get the current event.
1172 _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID;
1173 _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT;
1174 assert(!_needsEvent(currentEventIdBit));
1175 }
1176
1177 /**
1178 * Marks the listener as unsubscibed.
1179 *
1180 * Returns the number of unresumed pauses for the listener.
1181 */
1182 int _setUnsubscribed() {
1183 assert(_isSubscribed);
1184 int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
1185 _state = _LISTENER_UNSUBSCRIBED;
1186 return timesPaused;
1187 }
1188
1189 void _incrementPauseCount() {
1190 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
1191 }
1192
1193 void _decrementPauseCount() {
1194 assert(isPaused);
1195 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
1196 }
1197
1198 _sendData(T data);
1199 _sendError(error);
1200 _sendDone();
1201 }
1202
1203 /** Superclass for provider of pending events. */ 682 /** Superclass for provider of pending events. */
1204 abstract class _PendingEvents { 683 abstract class _PendingEvents {
684 // No async event has been scheduled.
685 static const int _STATE_UNSCHEDULED = 0;
686 // An async event has been scheduled to run a function.
687 static const int _STATE_SCHEDULED = 1;
688 // An async event has been scheduled, but it will do nothing when it runs.
689 // Async events can't be preempted.
690 static const int _STATE_CANCELLED = 3;
691
1205 /** 692 /**
1206 * Timer set when pending events are scheduled for execution. 693 * Timer set when pending events are scheduled for execution.
floitsch 2013/05/22 16:26:29 Update comments.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
1207 * 694 *
1208 * When scheduling pending events for execution in a later cycle, the timer 695 * When scheduling pending events for execution in a later cycle, the timer
1209 * is stored here. If pending events are executed earlier than that, e.g., 696 * is stored here. If pending events are executed earlier than that, e.g.,
1210 * due to a second event in the current cycle, the timer is canceled again. 697 * due to a second event in the current cycle, the timer is canceled again.
1211 */ 698 */
1212 Timer scheduleTimer = null; 699 int _state = _STATE_UNSCHEDULED;
1213 700
1214 bool get isEmpty; 701 bool get isEmpty;
1215 702
1216 bool get isScheduled => scheduleTimer != null; 703 bool get isScheduled => _state == _STATE_SCHEDULED;
704 bool get _eventScheduled => _state >= _STATE_SCHEDULED;
1217 705
1218 void schedule(_StreamImpl stream) { 706 /**
707 * Schedule an event to run later.
708 *
709 * If called more than once, it should be called with the same dispatch as
710 * argument each time. It may reuse an earlier argument in some cases.
711 */
712 void schedule(_EventDispatch dispatch) {
1219 if (isScheduled) return; 713 if (isScheduled) return;
1220 scheduleTimer = new Timer(Duration.ZERO, () { 714 assert(!isEmpty);
1221 scheduleTimer = null; 715 if (_eventScheduled) {
1222 stream._handlePendingEvents(); 716 assert(_state == _STATE_CANCELLED);
717 _state = _STATE_SCHEDULED;
718 return;
719 }
720 runAsync(() {
721 int oldState = _state;
722 _state = _STATE_UNSCHEDULED;
723 if (oldState == _STATE_CANCELLED) return;
724 handleNext(dispatch);
1223 }); 725 });
726 _state = _STATE_SCHEDULED;
1224 } 727 }
1225 728
1226 void cancelSchedule() { 729 void cancelSchedule() {
1227 assert(isScheduled); 730 if (isScheduled) _state = _STATE_CANCELLED;
1228 scheduleTimer.cancel();
1229 scheduleTimer = null;
1230 } 731 }
1231 732
1232 void handleNext(_StreamImpl stream); 733 void handleNext(_EventDispatch dispatch);
734
735 /** Throw away any pending events and cancel scheduled events. */
736 void clear();
1233 } 737 }
1234 738
1235 739
1236 /** Class holding pending events for a [_StreamImpl]. */ 740 /** Class holding pending events for a [_StreamImpl]. */
1237 class _StreamImplEvents extends _PendingEvents { 741 class _StreamImplEvents extends _PendingEvents {
1238 /// Single linked list of [_DelayedEvent] objects. 742 /// Single linked list of [_DelayedEvent] objects.
1239 _DelayedEvent firstPendingEvent = null; 743 _DelayedEvent firstPendingEvent = null;
1240 /// Last element in the list of pending events. New events are added after it. 744 /// Last element in the list of pending events. New events are added after it.
1241 _DelayedEvent lastPendingEvent = null; 745 _DelayedEvent lastPendingEvent = null;
1242 746
1243 bool get isEmpty => lastPendingEvent == null; 747 bool get isEmpty => lastPendingEvent == null;
1244 748
1245 bool get isScheduled => scheduleTimer != null;
1246
1247 void add(_DelayedEvent event) { 749 void add(_DelayedEvent event) {
1248 if (lastPendingEvent == null) { 750 if (lastPendingEvent == null) {
1249 firstPendingEvent = lastPendingEvent = event; 751 firstPendingEvent = lastPendingEvent = event;
1250 } else { 752 } else {
1251 lastPendingEvent = lastPendingEvent.next = event; 753 lastPendingEvent = lastPendingEvent.next = event;
1252 } 754 }
1253 } 755 }
1254 756
1255 void handleNext(_StreamImpl stream) { 757 void handleNext(_EventDispatch dispatch) {
1256 assert(!isScheduled); 758 assert(!isScheduled);
1257 _DelayedEvent event = firstPendingEvent; 759 _DelayedEvent event = firstPendingEvent;
1258 firstPendingEvent = event.next; 760 firstPendingEvent = event.next;
1259 if (firstPendingEvent == null) { 761 if (firstPendingEvent == null) {
1260 lastPendingEvent = null; 762 lastPendingEvent = null;
1261 } 763 }
1262 event.perform(stream); 764 event.perform(dispatch);
1263 } 765 }
1264 } 766
1265 767 void clear() {
1266 768 if (isScheduled) cancelSchedule();
1267 class _DoneSubscription<T> implements StreamSubscription<T> { 769 firstPendingEvent = lastPendingEvent = null;
1268 _DoneHandler _handler; 770 }
1269 Timer _timer; 771 }
1270 int _pauseCount = 0; 772
1271 773 class _MultiplexerLinkedList {
1272 _DoneSubscription(this._handler) { 774 _MultiplexerLinkedList _next;
1273 _delayDone(); 775 _MultiplexerLinkedList _previous;
1274 } 776
1275 777 void _unlink() {
1276 void _delayDone() { 778 _previous._next = _next;
1277 assert(_timer == null && _pauseCount == 0); 779 _next._previous = _previous;
1278 _timer = new Timer(Duration.ZERO, () { 780 _next = _previous = this;
1279 if (_handler != null) _handler(); 781 }
1280 }); 782
1281 } 783 void _insertBefore(_MultiplexerLinkedList newNext) {
1282 784 _MultiplexerLinkedList newPrevious = newNext._previous;
1283 bool get _isComplete => _timer == null && _pauseCount == 0; 785 newPrevious._next = this;
1284 786 newNext._previous = _previous;
1285 void onData(void handleAction(T value)) {} 787 _previous._next = newNext;
1286 788 _previous = newPrevious;
1287 void onError(void handleError(error)) {} 789 }
1288 790 }
1289 void onDone(void handleDone()) { 791
1290 _handler = handleDone; 792 // TODO(lrn): Change "implements" to "with" when automatic mixin constructors
1291 } 793 // are implemented.
1292 794 class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
floitsch 2013/05/22 16:26:29 Add comment what this class does.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
1293 void pause([Future signal]) { 795 implements _MultiplexerLinkedList {
1294 if (_isComplete) return; 796 static const int _STATE_NOT_LISTENING = 0;
1295 if (_timer != null) { 797 // Bit that alternates between event firings. If bit matches the one currently
1296 _timer.cancel(); 798 // firing, the subscription will not be notified.
1297 _timer = null; 799 static const int _STATE_EVENT_ID_BIT = 1;
1298 } 800 static const int _STATE_LISTENING = 2;
1299 _pauseCount++; 801 static const int _STATE_IS_FIRING = 4;
1300 if (signal != null) signal.whenComplete(resume); 802 static const int _STATE_REMOVE_AFTER_FIRING = 8;
1301 } 803
1302 804 // Firing state.
1303 void resume() { 805 int _multiplexState;
1304 if (_isComplete) return; 806
1305 if (_pauseCount == 0) return; 807 _SingleStreamMultiplexer _source;
1306 _pauseCount--; 808
1307 if (_pauseCount == 0) { 809 _MultiplexerSubscription(this._source,
1308 _delayDone(); 810 void onData(T data),
1309 } 811 void onError(Object error),
1310 } 812 void onDone(),
1311 813 bool cancelOnError,
1312 bool get isPaused => _pauseCount > 0; 814 int nextEventId)
1313 815 : _multiplexState = _STATE_LISTENING | nextEventId,
1314 void cancel() { 816 super(onData, onError, onDone, cancelOnError, null) {
1315 if (_isComplete) return; 817 _next = _previous = this;
1316 if (_timer != null) { 818 }
1317 _timer.cancel(); 819
1318 _timer = null; 820 // Mixin workaround.
1319 } 821 _MultiplexerLinkedList _next;
1320 _pauseCount = 0; 822 _MultiplexerLinkedList _previous;
1321 } 823
1322 824 void _unlink() {
1323 Future asFuture([var futureValue]) { 825 _previous._next = _next;
1324 // TODO(floitsch): share more code. 826 _next._previous = _previous;
1325 _FutureImpl<T> result = new _FutureImpl<T>(); 827 _next = _previous = this;
1326 828 }
1327 // Overwrite the onDone and onError handlers. 829
1328 onDone(() { result._setValue(futureValue); }); 830 void _insertBefore(_MultiplexerLinkedList newNext) {
1329 onError((error) { 831 _MultiplexerLinkedList newPrevious = newNext._previous;
1330 cancel(); 832 newPrevious._next = this;
1331 result._setError(error); 833 newNext._previous = _previous;
1332 }); 834 _previous._next = newNext;
1333 835 _previous = newPrevious;
1334 return result; 836 }
1335 } 837 // End mixin.
1336 } 838
1337 839 bool get _isListening => _multiplexState >= _STATE_LISTENING;
1338 class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { 840 bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
841 bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
842 int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
843
844 void _setRemoveAfterFiring() {
845 assert(_isFiring);
846 _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
847 }
848
849 void _startFiring() {
850 assert(!_isFiring);
851 _multiplexState |= _STATE_IS_FIRING;
852 }
853
854 /// Marks listener as no longer firing, and toggles its event id.
855 void _endFiring() {
856 assert(_isFiring);
857 _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
858 }
859
860 void _setNotListening() {
861 assert(_isListening);
862 _multiplexState = _STATE_NOT_LISTENING;
863 }
864
865 void _onCancel() {
866 assert(_isListening);
867 _source._removeListener(this);
868 }
869 }
870
871 // TODO(lrn): change "implements" to "with" when the VM supports it.
872 class _SingleStreamMultiplexer<T> extends Stream<T>
floitsch 2013/05/22 16:26:29 Add comment.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
873 implements _MultiplexerLinkedList,
874 _EventDispatch<T> {
1339 final Stream<T> _source; 875 final Stream<T> _source;
1340 StreamSubscription<T> _subscription; 876 StreamSubscription<T> _subscription;
1341 877 // Alternates between zero and one for each event fired.
1342 _SingleStreamMultiplexer(this._source); 878 // Listeners are initialized with the next event's id, and will
1343 879 // only be notified if they match the event being fired.
1344 void _callOnPauseStateChange() { 880 // That way listeners added during event firing will not receive
1345 if (_isPaused) { 881 // the current event.
1346 if (_subscription != null) { 882 int _eventId = 0;
1347 _subscription.pause(); 883
884 bool _isFiring = false;
885
886 // Remember events added while firing.
887 _StreamImplEvents _pending;
888
889 _SingleStreamMultiplexer(this._source) {
890 _next = _previous = this;
891 }
892
893 bool get _hasPending => _pending != null && !_pending.isEmpty;
894
895 // Should be mixin.
896 _MultiplexerLinkedList _next;
897 _MultiplexerLinkedList _previous;
898
899 void _unlink() {
900 _previous._next = _next;
901 _next._previous = _previous;
902 _next = _previous = this;
903 }
904
905 void _insertBefore(_MultiplexerLinkedList newNext) {
906 _MultiplexerLinkedList newPrevious = newNext._previous;
907 newPrevious._next = this;
908 newNext._previous = _previous;
909 _previous._next = newNext;
910 _previous = newPrevious;
911 }
912 // End of mixin.
913
914 StreamSubscription<T> listen(void onData(T data),
915 { void onError(Object error),
916 void onDone(),
917 bool cancelOnError }) {
918 if (onData == null) onData = _nullDataHandler;
919 if (onError == null) onError = _nullErrorHandler;
920 if (onDone == null) onDone = _nullDoneHandler;
921 cancelOnError = identical(true, cancelOnError);
922 _MultiplexerSubscription subscription =
923 new _MultiplexerSubscription(this, onData, onError, onDone,
924 cancelOnError, _eventId);
925 if (_subscription == null) {
926 _subscription = _source.listen(_add, onError: _addError, onDone: _close);
927 }
928 subscription._insertBefore(this);
929 return subscription;
930 }
931
932 /** Called by [_MultiplexerSubscription.remove]. */
933 void _removeListener(_MultiplexerSubscription listener) {
934 if (listener._isFiring) {
935 listener._setRemoveAfterFiring();
936 } else {
937 _unlinkListener(listener);
938 }
939 }
940
941 /** Remove a listener and close the subscription after the last one. */
942 void _unlinkListener(_MultiplexerSubscription listener) {
943 listener._setNotListening();
944 listener._unlink();
945 if (identical(_next, this)) {
946 // Last listener removed.
947 _cancel();
948 }
949 }
950
951 void _cancel() {
952 StreamSubscription subscription = _subscription;
953 _subscription = null;
954 subscription.cancel();
955 if (_pending != null) _pending.cancelSchedule();
956 }
957
958 void _forEachListener(void action(_MultiplexerSubscription listener)) {
959 int eventId = _eventId;
960 _eventId ^= 1;
961 _isFiring = true;
962 _MultiplexerLinkedList entry = _next;
963 // Call each listener in order. A listener can be removed during any
964 // other listener's event. During its own event it will only be marked
965 // as "to be removed", and it will be handled after the event is done.
966 while (!identical(entry, this)) {
967 _MultiplexerSubscription listener = entry;
968 if (listener._eventId == eventId) {
969 listener._startFiring();
970 action(listener);
971 listener._endFiring(); // Also toggles the event id.
1348 } 972 }
973 entry = listener._next;
974 if (listener._removeAfterFiring) {
975 _unlinkListener(listener);
976 }
977 }
978 _isFiring = false;
979 }
980
981 void _add(T data) {
floitsch 2013/05/22 16:26:29 Why is this not handled by the bufferingStreamSubs
Lasse Reichstein Nielsen 2013/05/24 06:02:49 This isn't a StreamSubscription - it's a Stream, m
982 if (_isFiring || _hasPending) {
983 _StreamImplEvents pending = _pending;
984 if (pending == null) pending = _pending = new _StreamImplEvents();
985 pending.add(new _DelayedData(data));
1349 } else { 986 } else {
1350 if (_subscription != null) { 987 _sendData(data);
1351 _subscription.resume(); 988 }
1352 } 989 }
1353 } 990
1354 } 991 void _addError(Object error) {
1355 992 if (_isFiring || _hasPending) {
floitsch 2013/05/22 16:26:29 ditto.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 ditto too.
1356 /** 993 _StreamImplEvents pending = _pending;
1357 * Subscribe or unsubscribe on [_source] depending on whether 994 if (pending == null) pending = _pending = new _StreamImplEvents();
1358 * [_stream] has subscribers. 995 pending.add(new _DelayedError(error));
1359 */
1360 void _onSubscriptionStateChange() {
1361 if (_hasListener) {
1362 assert(_subscription == null);
1363 _subscription = _source.listen(this._add,
1364 onError: this._addError,
1365 onDone: this._close);
1366 } else { 996 } else {
1367 // TODO(lrn): Check why this can happen. 997 _sendError(error);
1368 if (_subscription == null) return; 998 }
1369 _subscription.cancel(); 999 }
1370 _subscription = null; 1000
1371 } 1001 void _close() {
1372 } 1002 if (_isFiring || _hasPending) {
floitsch 2013/05/22 16:26:29 ditto.
1373 } 1003 _StreamImplEvents pending = _pending;
1004 if (pending == null) pending = _pending = new _StreamImplEvents();
1005 pending.add(const _DelayedDone());
1006 } else {
1007 _sendDone();
1008 }
1009 }
1010
1011 void _sendData(T data) {
1012 _forEachListener((_MultiplexerSubscription listener) {
1013 listener._add(data);
1014 });
1015 if (_hasPending) {
1016 _pending.schedule(this);
1017 }
1018 }
1019
1020 void _sendError(Object error) {
1021 _forEachListener((_MultiplexerSubscription listener) {
1022 listener._addError(error);
1023 });
1024 if (_hasPending) {
1025 _pending.schedule(this);
1026 }
1027 }
1028
1029 void _sendDone() {
1030 _forEachListener((_MultiplexerSubscription listener) {
1031 listener._setRemoveAfterFiring();
1032 listener._close();
1033 });
1034 }
1035 }
1036
1037
1038 /**
1039 * Simple implementation of [StreamIterator].
1040 */
1041 class _StreamIteratorImpl<T> implements StreamIterator<T> {
1042 // TODO(lrn): Keep a one (or two) element buffer to avoid pausing when not
floitsch 2013/05/22 16:26:29 I guess the ideal solution would be if the constru
Lasse Reichstein Nielsen 2013/05/24 06:02:49 I don't want this to be about buffering, it should
1043 // necessary.
1044 StreamSubscription _subscription;
1045 _FutureImpl<bool> _hasNext;
1046 T _current;
1047
1048 _StreamIteratorImpl(final Stream<T> stream) {
1049 _subscription = stream.listen(_onData,
1050 onError: _onError,
1051 onDone: _onDone,
1052 cancelOnError: true);
1053 _subscription.pause();
1054 }
1055
1056 Future<bool> moveNext() {
1057 if (_hasNext != null) throw new StateError("Already waiting for next.");
1058 if (_subscription == null) {
1059 return new _FutureImpl<bool>.immediate(false);
1060 }
1061 _current = null;
1062 _subscription.resume();
1063 _hasNext = new _FutureImpl<bool>();
1064 return _hasNext;
1065 }
1066
1067 T get current => _current;
1068
1069 void cancel() {
1070 StreamSubscription subscription = _subscription;
1071 _subscription = null;
1072 _current = null;
1073 subscription.cancel();
1074 if (_hasNext != null) {
1075 _FutureImpl<bool> hasNext = _hasNext;
1076 _hasNext = null;
1077 hasNext._setValue(false);
1078 }
1079 }
1080
1081 void _onData(T data) {
1082 assert(_hasNext != null);
1083 _FutureImpl<bool> hasNext = _hasNext;
1084 _hasNext = null;
1085 _current = data;
1086 _subscription.pause();
1087 hasNext._setValue(true);
1088 }
1089
1090 void _onError(Object error) {
1091 assert(_hasNext != null);
1092 _FutureImpl<bool> hasNext = _hasNext;
1093 _hasNext = null;
1094 _subscription = null;
1095 hasNext._setError(error);
1096 // We have cancelOnError: true, so the subscription is cancelled.
floitsch 2013/05/22 16:26:29 canceled.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
1097 }
1098
1099 void _onDone() {
1100 assert(_hasNext != null);
1101 _FutureImpl<bool> hasNext = _hasNext;
1102 _hasNext = null;
1103 _subscription = null;
1104 hasNext._setValue(false);
1105 }
1106 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698