Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(227)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // part of dart.async;
6
7 // States shared by single/multi stream implementations.
8
9 /// Initial and default state where the stream can receive and send events.
10 const int _STREAM_OPEN = 0;
11 /// The stream has received a request to complete, but hasn't done so yet.
12 /// No further events can be aded to the stream.
13 const int _STREAM_CLOSED = 1;
14 /// The stream has completed and will no longer receive or send events.
15 /// Also counts as closed. The stream must not be paused when it's completed.
16 /// Always used in conjunction with [_STREAM_CLOSED].
17 const int _STREAM_COMPLETE = 2;
18 /// Bit that alternates between events, and listeners are updated to the
19 /// current value when they are notified of the event.
20 const int _STREAM_EVENT_ID = 4;
21 const int _STREAM_EVENT_ID_SHIFT = 2;
22 /// Bit set while firing and clear while not.
23 const int _STREAM_FIRING = 8;
24 /// The count of times a stream has paused is stored in the
25 /// state, shifted by this amount.
26 const int _STREAM_PAUSE_COUNT_SHIFT = 4;
27
28 // States for listeners.
29
30 /// The listener is currently not subscribed to its source stream.
31 const int _LISTENER_UNSUBSCRIBED = 0;
32 /// The listener is actively subscribed to its source stream.
33 const int _LISTENER_SUBSCRIBED = 1;
34 /// The listener is subscribed until it has been notified of the current event.
35 /// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
36 const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
37 /// Bit that contains the last sent event's "id bit".
38 const int _LISTENER_EVENT_ID = 4;
39 const int _LISTENER_EVENT_ID_SHIFT = 2;
40 /// The count of times a listener has paused is stored in the
41 /// state, shifted by this amount.
42 const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
43
44
45 // -------------------------------------------------------------------
46 // Common base class for single and multi-subscription streams.
47 // -------------------------------------------------------------------
48 abstract class _StreamImpl<T> extends Stream<T> {
49 /** Current state of the stream. */
50 int _state = _STREAM_OPEN;
51
52 /**
53 * List of pending events.
54 *
55 * If events are added to the stream (using [_add], [_signalError] or [_done])
56 * while the stream is paused, or while another event is firing, events will
57 * stored here.
58 * Also supports scheduling the events for later execution.
59 */
60 _StreamImplEvents _pendingEvents;
61
62 // ------------------------------------------------------------------
63 // Stream interface.
64
65 StreamSubscription listen(void onData(T data),
66 { void onError(AsyncError error),
67 void onDone(),
68 bool unsubscribeOnError }) {
69 if (_isComplete) {
70 return new _DoneSubscription(onDone);
71 }
72 if (onData == null) onData = _nullDataHandler;
73 if (onError == null) onError = _nullErrorHandler;
74 if (onDone == null) onDone = _nullDoneHandler;
75 unsubscribeOnError = identical(true, unsubscribeOnError);
76 _StreamListener subscription =
77 _createSubscription(onData, onError, onDone, unsubscribeOnError);
78 _addListener(subscription);
79 return subscription;
80 }
81
82 // ------------------------------------------------------------------
83 // StreamSink interface-like methods for sending events into the stream.
84 // It's the responsibility of the caller to ensure that the stream is not
85 // paused when adding events. If the stream is paused, the events will be
86 // queued, but it's better to not send events at all.
87
88 /**
89 * Send or queue a data event.
90 */
91 void _add(T value) {
92 if (_isClosed) throw new StateError("Sending on closed stream");
93 if (!_canFireEvent) {
94 _addPendingEvent(new _DelayedData<T>(value));
95 return;
96 }
97 _sendData(value);
98 _handlePendingEvents();
99 }
100
101 /**
102 * Send or enqueue an error event.
103 *
104 * If a subscription has requested to be unsubscribed on errors,
105 * it will be unsubscribed after receiving this event.
106 */
107 void _signalError(AsyncError error) {
108 if (_isClosed) throw new StateError("Sending on closed stream");
109 if (!_canFireEvent) {
110 _addPendingEvent(new _DelayedError(error));
111 return;
112 }
113 _sendError(error);
114 _handlePendingEvents();
115 }
116
117 /**
118 * Send or enqueue a "done" message.
119 *
120 * The "done" message should be sent at most once by a stream, and it
121 * should be the last message sent.
122 */
123 void _close() {
124 if (_isClosed) throw new StateError("Sending on closed stream");
125 _state |= _STREAM_CLOSED;
126 if (!_canFireEvent) {
127 // You can't enqueue an event after the Done, so make it const.
128 _addPendingEvent(const _DelayedDone());
129 return;
130 }
131 _sendDone();
132 assert(!_hasPendingEvent);
133 }
134
135 // -------------------------------------------------------------------
136 // Internal implementation.
137
138 // State prediates.
139
140 /** Whether the stream has been closed (a done event requested). */
141 bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
142
143 /** Whether the stream is completed. */
144 bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
145
146 /** Whether one or more active subscribers have requested a pause. */
147 bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
148
149 /** Check whether the pending event queue is non-empty */
150 bool get _hasPendingEvent =>
151 _pendingEvents != null && !_pendingEvents.isEmpty;
152
153 /** Whether we are currently firing an event. */
154 bool get _isFiring => (_state & _STREAM_FIRING) != 0;
155
156 int get _currentEventIdBit =>
157 (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
158
159 /** Whether there is currently a subscriber on this [Stream]. */
160 bool get _hasSubscribers;
161
162 /** Whether the stream can fire a new event. */
163 bool get _canFireEvent => !_isFiring && !_isPaused && !_hasPendingEvent;
164
165 // State modification.
166
167 /** Record an increases in the number of times the listener has paused. */
168 void _incrementPauseCount(_StreamListener<T> listener) {
169 listener._incrementPauseCount();
170 _updatePauseCount(1);
171 }
172
173 /** Record a decrease in the number of times the listener has paused. */
174 void _decrementPauseCount(_StreamListener<T> listener) {
175 assert(_isPaused);
176 listener._decrementPauseCount();
177 _updatePauseCount(-1);
178 }
179
180 /** Update the stream's own pause count only. */
181 void _updatePauseCount(int by) {
182 _state += by << _STREAM_PAUSE_COUNT_SHIFT;
183 assert(_state >= 0);
184 }
185
186 void _setClosed() {
187 assert(!_isClosed);
188 _state |= _STREAM_CLOSED;
189 }
190
191 void _setComplete() {
192 assert(_isClosed);
193 _state = _state |_STREAM_COMPLETE;
194 }
195
196 void _startFiring() {
197 assert(!_isFiring);
198 // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
199 // bit. All current subscribers will now have a _LISTENER_EVENT_ID
200 // that doesn't match _STREAM_EVENT_ID, and they will receive the
201 // event being fired.
202 _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
203 }
204
205 void _endFiring() {
206 assert(_isFiring);
207 _state ^= _STREAM_FIRING;
208 }
209
210 /**
211 * Record that a listener wants a pause from events.
212 *
213 * This methods is called from [_StreamListener.pause()].
214 * Subclasses can override this method, along with [isPaused] and
215 * [createSubscription], if they want to do a different handling of paused
216 * subscriptions, e.g., a filtering stream pausing its own source if all its
217 * subscribers are paused.
218 */
219 void _pause(_StreamListener<T> listener, Signal resumeSignal) {
220 assert(identical(listener._source, this));
221 if (!listener._isSubscribed) {
222 throw new StateError("Subscription has been canceled.");
223 }
224 assert(!_isComplete); // There can be no subscribers when complete.
225 bool wasPaused = _isPaused;
226 _incrementPauseCount(listener);
227 if (resumeSignal != null) {
228 resumeSignal.then(() { this._resume(listener, true); });
229 }
230 if (!wasPaused) {
231 _onPauseStateChange();
232 }
233 }
234
235 /** Stops pausing due to one request from the given listener. */
236 void _resume(_StreamListener<T> listener, bool fromEvent) {
237 if (!listener.isPaused) return;
238 assert(listener._isSubscribed);
239 assert(_isPaused);
240 _decrementPauseCount(listener);
241 if (!_isPaused) {
242 _onPauseStateChange();
243 if (_hasPendingEvent) {
244 // If we can fire events now, fire any pending events right away.
245 if (fromEvent && !_isFiring) {
246 _handlePendingEvents();
247 } else {
248 _pendingEvents.schedule(this);
249 }
250 }
251 }
252 }
253
254 /** Create a subscription object. Called by [subcribe]. */
255 _StreamSubscriptionImpl<T> _createSubscription(
256 void onData(T data),
257 void onError(AsyncError error),
258 void onDone(),
259 bool unsubscribeOnError);
260
261 /**
262 * Adds a listener to this stream.
263 */
264 void _addListener(_StreamSubscriptionImpl subscription);
265
266 /**
267 * Handle a cancel requested from a [_StreamSubscriptionImpl].
268 *
269 * This method is called from [_StreamSubscriptionImpl.cancel].
270 *
271 * If an event is currently firing, the cancel is delayed
272 * until after the subscribers have received the event.
273 */
274 void _cancel(_StreamSubscriptionImpl subscriber);
275
276 /**
277 * Iterate over all current subscribers and perform an action on each.
278 *
279 * Subscribers added during the iteration will not be visited.
280 * Subscribers unsubscribed during the iteration will only be removed
281 * after they have been acted on.
282 *
283 * Any change in the pause state is only reported after all subscribers have
284 * received the event.
285 *
286 * The [action] must not throw, or the controller will be left in an
287 * invalid state.
288 *
289 * This method must not be called while [isFiring] is true.
290 */
291 void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
292
293 /**
294 * Called when the first subscriber requests a pause or the last a resume.
295 *
296 * Read [isPaused] to see the new state.
297 */
298 void _onPauseStateChange() {}
299
300 /**
301 * Called when the first listener subscribes or the last unsubscribes.
302 *
303 * Read [hasSubscribers] to see what the new state is.
304 */
305 void _onSubscriptionStateChange() {}
306
307 /** Add a pending event at the end of the pending event queue. */
308 void _addPendingEvent(_DelayedEvent event) {
309 if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
310 _pendingEvents.add(event);
311 }
312
313 /** Fire any pending events until the pending event queue. */
314 void _handlePendingEvents() {
315 _StreamImplEvents events = _pendingEvents;
316 if (events == null) return;
317 while (!events.isEmpty && !_isPaused) {
318 events.removeFirst().perform(this);
319 }
320 }
321
322 /**
323 * Send a data event directly to each subscriber.
324 */
325 _sendData(T value) {
326 assert(!_isPaused);
327 assert(!_isComplete);
328 _forEachSubscriber((subscriber) {
329 try {
330 subscriber._sendData(value);
331 } catch (e, s) {
332 new AsyncError(e, s).throwDelayed();
333 }
334 });
335 }
336
337 /**
338 * Sends an error event directly to each subscriber.
339 */
340 void _sendError(AsyncError error) {
341 assert(!_isPaused);
342 assert(!_isComplete);
343 _forEachSubscriber((subscriber) {
344 try {
345 subscriber._sendError(error);
346 } catch (e, s) {
347 new AsyncError.withCause(e, s, error).throwDelayed();
348 }
349 });
350 }
351
352 /**
353 * Sends the "done" message directly to each subscriber.
354 * This automatically stops further subscription and
355 * unsubscribes all subscribers.
356 */
357 void _sendDone() {
358 assert(!_isPaused);
359 assert(_isClosed);
360 _setComplete();
361 if (!_hasSubscribers) return;
362 _forEachSubscriber((subscriber) {
363 _cancel(subscriber);
364 try {
365 subscriber._sendDone();
366 } catch (e, s) {
367 new AsyncError(e, s).throwDelayed();
368 }
369 });
370 assert(!_hasSubscribers);
371 _onSubscriptionStateChange();
372 }
373 }
374
375 // -------------------------------------------------------------------
376 // Default implementation of a stream with a single subscriber.
377 // -------------------------------------------------------------------
378 /**
379 * Default implementation of stream capable of sending events to one subscriber.
380 *
381 * Any class needing to implement [Stream] can either directly extend this
382 * class, or extend [Stream] and delegate the subscribe method to an instance
383 * of this class.
384 *
385 * The only public methods are those of [Stream], so instances of
386 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
387 * internal functionality.
388 *
389 * The [StreamController] is a public facing version of this class, with
390 * some methods made public.
391 *
392 * The user interface of [_SingleStreamImpl] are the following methods:
393 * * [_add]: Add a data event to the stream.
394 * * [_signalError]: Add an error event to the stream.
395 * * [_close]: Request to close the stream.
396 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
397 * when losing the last subscriber.
398 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
399 * * [_hasSubscribers]: Test whether there are currently any subscribers.
400 * * [_isPaused]: Test whether the stream is currently paused.
401 * The user should not add new events while the stream is paused, but if it
402 * happens anyway, the stream will enqueue the events just as when new events
403 * arrive while still firing an old event.
404 */
405 class _SingleStreamImpl<T> extends _StreamImpl<T> {
406 _StreamSubscriptionImpl _subscriber = null;
407
408 /** Whether one or more active subscribers have requested a pause. */
409 bool get _isPaused => !_hasSubscribers || super._isPaused;
410
411 /** Whether there is currently a subscriber on this [Stream]. */
412 bool get _hasSubscribers => _subscriber != null;
413
414 // -------------------------------------------------------------------
415 // Internal implementation.
416
417 /**
418 * Create the new subscription object.
419 */
420 _StreamSubscriptionImpl<T> _createSubscription(
421 void onData(T data),
422 void onError(AsyncError error),
423 void onDone(),
424 bool unsubscribeOnError) {
425 return new _StreamSubscriptionImpl<T>(
426 this, onData, onError, onDone, unsubscribeOnError);
427 }
428
429 void _addListener(_StreamSubscriptionImpl subscription) {
430 if (_hasSubscribers) {
431 throw new StateError("Stream has already subscriber.");
432 }
433 _subscriber = subscription;
434 subscription._setSubscribed(0);
435 _onSubscriptionStateChange();
436 // TODO(floitsch): Should this be delayed?
437 _handlePendingEvents();
438 }
439
440 /**
441 * Handle a cancel requested from a [_StreamSubscriptionImpl].
442 *
443 * This method is called from [_StreamSubscriptionImpl.cancel].
444 *
445 * If an event is currently firing, the cancel is delayed
446 * until after the subscriber has received the event.
447 */
448 void _cancel(_StreamSubscriptionImpl subscriber) {
449 assert(identical(subscriber._source, this));
450 // We allow unsubscribing the currently firing subscription during
451 // the event firing, because it is indistinguishable from delaying it since
452 // that event has already received the event.
453 if (!identical(_subscriber, subscriber)) {
454 // You may unsubscribe more than once, only the first one counts.
455 return;
456 }
457 _subscriber = null;
458 int timesPaused = subscriber._setUnsubscribed();
459 _updatePauseCount(-timesPaused);
460 if (timesPaused > 0) {
461 _onPauseStateChange();
462 }
463 _onSubscriptionStateChange();
464 }
465
466 void _forEachSubscriber(
467 void action(_StreamSubscriptionImpl<T> subscription)) {
468 _StreamSubscriptionImpl subscription = _subscriber;
469 assert(subscription != null);
470 _startFiring();
471 action(subscription);
472 _endFiring();
473 }
474 }
475
476 // -------------------------------------------------------------------
477 // Default implementation of a stream with subscribers.
478 // -------------------------------------------------------------------
479
480 /**
481 * Default implementation of stream capable of sending events to subscribers.
482 *
483 * Any class needing to implement [Stream] can either directly extend this
484 * class, or extend [Stream] and delegate the subscribe method to an instance
485 * of this class.
486 *
487 * The only public methods are those of [Stream], so instances of
488 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
489 * internal functionality.
490 *
491 * The [StreamController] is a public facing version of this class, with
492 * some methods made public.
493 *
494 * The user interface of [_MultiStreamImpl] are the following methods:
495 * * [_add]: Add a data event to the stream.
496 * * [_signalError]: Add an error event to the stream.
497 * * [_close]: Request to close the stream.
498 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
499 * when losing the last subscriber.
500 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
501 * * [_hasSubscribers]: Test whether there are currently any subscribers.
502 * * [_isPaused]: Test whether the stream is currently paused.
503 * The user should not add new events while the stream is paused, but if it
504 * happens anyway, the stream will enqueue the events just as when new events
505 * arrive while still firing an old event.
506 */
507 class _MultiStreamImpl<T> extends _StreamImpl<T>
508 implements _InternalLinkList {
509 // Link list implementation (mixin when possible).
510 _InternalLink _nextLink;
511 _InternalLink _previousLink;
512
513 _MultiStreamImpl() {
514 _nextLink = _previousLink = this;
515 }
516
517 // ------------------------------------------------------------------
518 // Helper functions that can be overridden in subclasses.
519
520 /** Whether there are currently any subscribers on this [Stream]. */
521 bool get _hasSubscribers => !_InternalLinkList.isEmpty(this);
522
523 /**
524 * Create the new subscription object.
525 */
526 _StreamListener<T> _createSubscription(
527 void onData(T data),
528 void onError(AsyncError error),
529 void onDone(),
530 bool unsubscribeOnError) {
531 return new _StreamSubscriptionImpl<T>(
532 this, onData, onError, onDone, unsubscribeOnError);
533 }
534
535 // -------------------------------------------------------------------
536 // Internal implementation.
537
538 /**
539 * Iterate over all current subscribers and perform an action on each.
540 *
541 * The set of subscribers cannot be modified during this iteration.
542 * All attempts to add or unsubscribe subscribers will be delayed until
543 * after the iteration is complete.
544 *
545 * The [action] must not throw, or the controller will be left in an
546 * invalid state.
547 *
548 * This method must not be called while [isFiring] is true.
549 */
550 void _forEachSubscriber(
551 void action(_StreamListener<T> subscription)) {
552 assert(!_isFiring);
553 if (!_hasSubscribers) return;
554 _startFiring();
555 _InternalLink cursor = this._nextLink;
556 while (!identical(cursor, this)) {
557 _StreamListener<T> current = cursor;
558 if (current._needsEvent(_currentEventIdBit)) {
559 action(current);
560 // Marks as having received the event.
561 current._toggleEventReceived();
562 }
563 cursor = current._nextLink;
564 if (current._isPendingUnsubscribe) {
565 _removeListener(current);
566 }
567 }
568 _endFiring();
569 if (_isPaused) _onPauseStateChange();
570 if (!_hasSubscribers) _onSubscriptionStateChange();
571 }
572
573 void _addListener(_StreamListener listener) {
574 listener._setSubscribed(_currentEventIdBit);
575 bool firstSubscriber = !_hasSubscribers;
576 _InternalLinkList.add(this, listener);
577 if (firstSubscriber) {
578 _onSubscriptionStateChange();
579 }
580 }
581
582 /**
583 * Handle a cancel requested from a [_StreamListener].
584 *
585 * This method is called from [_StreamListener.cancel].
586 *
587 * If an event is currently firing, the cancel is delayed
588 * until after the subscribers have received the event.
589 */
590 void _cancel(_StreamListener listener) {
591 assert(identical(listener._source, this));
592 if (_InternalLink.isUnlinked(listener)) {
593 // You may unsubscribe more than once, only the first one counts.
594 return;
595 }
596 if (_isFiring) {
597 if (listener._needsEvent(_currentEventIdBit)) {
598 assert(listener._isSubscribed);
599 listener._setPendingUnsubscribe();
600 } else {
601 // The listener has been notified of the event (or don't need to,
602 // if it's still pending subscription) so it's safe to remove it.
603 _removeListener(listener);
604 }
605 // Pause and subscription state changes are reported when we end
606 // firing.
607 } else {
608 bool wasPaused = _isPaused;
609 _removeListener(listener);
610 if (wasPaused != _isPaused) _onPauseStateChange();
611 if (!_hasSubscribers) _onSubscriptionStateChange();
612 }
613 }
614
615 /**
616 * Removes a listener from this stream and cancels its pauses.
617 *
618 * This is a low-level action that doesn't call [_onSubscriptionStateChange].
619 * or [_onPauseStateChange].
620 */
621 void _removeListener(_StreamListener listener) {
622 int pauseCount = listener._setUnsubscribed();
623 _updatePauseCount(-pauseCount);
624 _InternalLinkList.remove(listener);
625 }
626 }
627
628 /**
629 * The subscription class that the [StreamController] uses.
630 *
631 * The [StreamController.createSubscription] method should
632 * create an object of this type, or another subclass of [_StreamListener].
633 * A subclass of [StreamController] can specify which subclass
634 * of [_StreamSubscriptionImpl] it uses by overriding
635 * [StreamController.createSubscription].
636 *
637 * The subscription is in one of three states:
638 * * Subscribed.
639 * * Paused-and-subscribed.
640 * * Unsubscribed.
641 * Unsubscribing also unpauses.
642 */
643 class _StreamSubscriptionImpl<T> extends _StreamListener<T>
644 implements StreamSubscription<T> {
645 final bool _unsubscribeOnError;
646 _DataHandler _onData;
647 _ErrorHandler _onError;
648 _DoneHandler _onDone;
649 _StreamSubscriptionImpl(_StreamImpl source,
650 this._onData,
651 this._onError,
652 this._onDone,
653 this._unsubscribeOnError) : super(source);
654
655 void onData(void handleData(T event)) {
656 if (handleData == null) handleData = _nullDataHandler;
657 _onData = handleData;
658 }
659
660 void onError(void handleError(AsyncError error)) {
661 if (handleError == null) handleError = _nullErrorHandler;
662 _onError = handleError;
663 }
664
665 void onDone(void handleDone()) {
666 if (handleDone == null) handleDone = _nullDoneHandler;
667 _onDone = handleDone;
668 }
669
670 void _sendData(T data) {
671 _onData(data);
672 }
673
674 void _sendError(AsyncError error) {
675 _onError(error);
676 if (_unsubscribeOnError) _source._cancel(this);
677 }
678
679 void _sendDone() {
680 _onDone();
681 }
682
683 void cancel() {
684 _source._cancel(this);
685 }
686
687 void pause([Signal resumeSignal]) {
688 _source._pause(this, resumeSignal);
689 }
690
691 void resume() {
692 if (!isPaused) {
693 throw new StateError("Resuming unpaused subscription");
694 }
695 _source._resume(this, false);
696 }
697 }
698
699 // Internal helpers.
700
701 // Types of the different handlers on a stream. Types used to type fields.
702 typedef void _DataHandler<T>(T value);
703 typedef void _ErrorHandler(AsyncError error);
704 typedef void _DoneHandler();
705
706
707 /** Default data handler, does nothing. */
708 void _nullDataHandler(var value) {}
709
710 /** Default error handler, reports the error to the global handler. */
711 void _nullErrorHandler(AsyncError error) {
712 error.throwDelayed();
713 }
714
715 /** Default done handler, does nothing. */
716 void _nullDoneHandler() {}
717
718
719 /** A delayed event on a stream implementation. */
720 abstract class _DelayedEvent {
721 /** Added as a linked list on the [StreamController]. */
722 _DelayedEvent next;
723 /** Execute the delayed event on the [StreamController]. */
724 void perform(_StreamImpl stream);
725 }
726
727 /** A delayed data event. */
728 class _DelayedData<T> extends _DelayedEvent{
729 T value;
730 _DelayedData(this.value);
731 void perform(_StreamImpl<T> stream) {
732 stream._sendData(value);
733 }
734 }
735
736 /** A delayed error event. */
737 class _DelayedError extends _DelayedEvent {
738 AsyncError error;
739 _DelayedError(this.error);
740 void perform(_StreamImpl stream) {
741 stream._sendError(error);
742 }
743 }
744
745 /** A delayed done event. */
746 class _DelayedDone implements _DelayedEvent {
747 const _DelayedDone();
748 void perform(_StreamImpl stream) {
749 stream._sendDone();
750 }
751
752 _DelayedEvent get next => null;
753
754 void set next(_DelayedEvent _) {
755 throw new StateError("No events after a done.");
756 }
757 }
758
759 /**
760 * Simple internal doubly-linked list implementation.
761 *
762 * In an internal linked list, the links are in the data objects themselves,
763 * instead of in a separate object. That means each element can be in at most
764 * one list at a time.
765 *
766 * All links are always members of an element cycle. At creation it's a
767 * singleton cycle.
768 */
769 abstract class _InternalLink {
770 _InternalLink _nextLink;
771 _InternalLink _previousLink;
772
773 _InternalLink() {
774 this._previousLink = this._nextLink = this;
775 }
776
777 /* Removes a link from any list it may be part of, and links it to itself. */
778 static void unlink(_InternalLink element) {
779 _InternalLink next = element._nextLink;
780 _InternalLink previous = element._previousLink;
781 next._previousLink = previous;
782 previous._nextLink = next;
783 element._nextLink = element._previousLink = element;
784 }
785
786 /** Check whether an element is unattached to other elements. */
787 static bool isUnlinked(_InternalLink element) {
788 return identical(element, element._nextLink);
789 }
790 }
791
792 /**
793 * Marker interface for "list" links.
794 *
795 * An "InternalLinkList" is an abstraction on top of a link cycle, where the
796 * "list" object itself is not considered an element (it's just a header link
797 * created to avoid edge cases).
798 * An element is considered part of a list if it is in the list's cycle.
799 * There should never be more than one "list" object in a cycle.
800 */
801 abstract class _InternalLinkList extends _InternalLink {
802 /**
803 * Adds an element to a list, just before the header link.
804 *
805 * This effectively adds it at the end of the list.
806 */
807 static void add(_InternalLinkList list, _InternalLink element) {
808 if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element);
809 _InternalLink listEnd = list._previousLink;
810 listEnd._nextLink = element;
811 list._previousLink = element;
812 element._previousLink = listEnd;
813 element._nextLink = list;
814 }
815
816 /** Removes an element from its list. */
817 static void remove(_InternalLink element) {
818 _InternalLink.unlink(element);
819 }
820
821 /** Check whether a list contains no elements, only the header link. */
822 static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list);
823
824 /** Moves all elements from the list [other] to [list]. */
825 static void addAll(_InternalLinkList list, _InternalLinkList other) {
826 if (isEmpty(other)) return;
827 _InternalLink listLast = list._previousLink;
828 _InternalLink otherNext = other._nextLink;
829 listLast._nextLink = otherNext;
830 otherNext._previousLink = listLast;
831 _InternalLink otherLast = other._previousLink;
832 list._previousLink = otherLast;
833 otherLast._nextLink = list;
834 // Clean up [other].
835 other._nextLink = other._previousLink = other;
836 }
837 }
838
839 abstract class _StreamListener<T> extends _InternalLink {
840 final _StreamImpl _source;
841 int _state = _LISTENER_UNSUBSCRIBED;
842
843 _StreamListener(this._source);
844
845 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
846
847 bool get _isPendingUnsubscribe =>
848 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
849
850 bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
851
852 /**
853 * Whether the listener still needs to receive the currently firing event.
854 *
855 * The currently firing event is identified by a single bit, which alternates
856 * between events. The [_state] contains the previously sent event's bit in
857 * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
858 * still need the current event.
859 */
860 bool _needsEvent(int currentEventIdBit) {
861 int lastEventIdBit =
862 (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
863 return lastEventIdBit != currentEventIdBit;
864 }
865
866 /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
867 /// we are currently firing an event and the subscriber still need to receive
868 /// the event.
869 void _toggleEventReceived() {
870 _state ^= _LISTENER_EVENT_ID;
871 }
872
873 void _setSubscribed(int eventIdBit) {
874 assert(eventIdBit == 0 || eventIdBit == 1);
875 _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
876 }
877
878 void _setPendingUnsubscribe() {
879 assert(_isSubscribed);
880 _state |= _LISTENER_PENDING_UNSUBSCRIBE;
881 }
882
883 /**
884 * Marks the listener as unsubscibed.
885 *
886 * Returns the number of unresumed pauses for the listener.
887 */
888 int _setUnsubscribed() {
889 assert(_isSubscribed);
890 int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
891 _state = _LISTENER_UNSUBSCRIBED;
892 return timesPaused;
893 }
894
895 void _incrementPauseCount() {
896 _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
897 }
898
899 void _decrementPauseCount() {
900 assert(isPaused);
901 _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
902 }
903
904 _sendData(T data);
905 _sendError(AsyncError error);
906 _sendDone();
907 }
908
909 /** Class holding pending events for a [_StreamImpl]. */
910 class _StreamImplEvents {
911 /// Single linked list of [_DelayedEvent] objects.
912 _DelayedEvent firstPendingEvent = null;
913 /// Last element in the list of pending events. New events are added after it.
914 _DelayedEvent lastPendingEvent = null;
915 /**
916 * Timer set when pending events are scheduled for execution.
917 *
918 * When scheduling pending events for execution in a later cycle, the timer
919 * is stored here. If pending events are executed earlier than that, e.g.,
920 * due to a second event in the current cycle, the timer is canceled again.
921 */
922 Timer scheduleTimer = null;
923
924 bool get isEmpty => lastPendingEvent == null;
925
926 bool get isScheduled => scheduleTimer != null;
927
928 void schedule(_StreamImpl stream) {
929 if (isScheduled) return;
930 scheduleTimer = new Timer(0, (_) {
931 scheduleTimer = null;
932 stream._handlePendingEvents();
933 });
934 }
935
936 void cancelSchedule() {
937 assert(isScheduled);
938 scheduleTimer.cancel();
939 scheduleTimer = null;
940 }
941
942 void add(_DelayedEvent event) {
943 if (lastPendingEvent == null) {
944 firstPendingEvent = lastPendingEvent = event;
945 } else {
946 lastPendingEvent = lastPendingEvent.next = event;
947 }
948 }
949
950 _DelayedEvent removeFirst() {
951 if (isScheduled) cancelSchedule();
952 _DelayedEvent event = firstPendingEvent;
953 firstPendingEvent = event.next;
954 if (firstPendingEvent == null) {
955 lastPendingEvent = null;
956 }
957 return event;
958 }
959 }
960
961
962 class _DoneSubscription<T> implements StreamSubscription<T> {
963 _DoneHandler _handler;
964 Timer _timer;
965 int _pauseCount = 0;
966
967 _DoneSubscription(this._handler) {
968 _delayDone();
969 }
970
971 void _delayDone() {
972 assert(_timer == null && _pauseCount == 0);
973 _timer = new Timer(0, (_) {
974 if (_handler != null) _handler();
975 });
976 }
977
978 bool get _isComplete => _timer == null && _pauseCount == 0;
979
980 void onData(void handleAction(T value)) {}
981 void onError(void handleError(StateError error)) {}
982 void onDone(void handleDone(T value)) {
983 _handler = handleDone;
984 }
985
986 void pause([Signal signal]) {
987 if (_isComplete) {
988 throw new StateError("Subscription has been canceled.");
989 }
990 if (_timer != null) _timer.cancel();
991 _pauseCount++;
992 }
993
994 void resume() {
995 if (_isComplete) {
996 throw new StateError("Subscription has been canceled.");
997 }
998 if (_pauseCount == 0) return;
999 _pauseCount--;
1000 if (_pauseCount == 0) {
1001 _delayDone();
1002 }
1003 }
1004
1005 bool get isPaused => _pauseCount > 0;
1006
1007 void cancel() {
1008 if (_isComplete) {
1009 throw new StateError("Subscription has been canceled.");
1010 }
1011 if (_timer != null) {
1012 _timer.cancel();
1013 _timer = null;
1014 }
1015 _pauseCount = 0;
1016 }
1017 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698