OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
8 | 8 |
9 // Completion state of the stream. | 9 // Completion state of the stream. |
10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
72 * Also supports scheduling the events for later execution. | 72 * Also supports scheduling the events for later execution. |
73 */ | 73 */ |
74 _PendingEvents _pendingEvents; | 74 _PendingEvents _pendingEvents; |
75 | 75 |
76 // ------------------------------------------------------------------ | 76 // ------------------------------------------------------------------ |
77 // Stream interface. | 77 // Stream interface. |
78 | 78 |
79 StreamSubscription<T> listen(void onData(T data), | 79 StreamSubscription<T> listen(void onData(T data), |
80 { void onError(AsyncError error), | 80 { void onError(AsyncError error), |
81 void onDone(), | 81 void onDone(), |
82 bool unsubscribeOnError }) { | 82 bool cancelOnError }) { |
83 if (_isComplete) { | 83 if (_isComplete) { |
84 return new _DoneSubscription(onDone); | 84 return new _DoneSubscription(onDone); |
85 } | 85 } |
86 if (onData == null) onData = _nullDataHandler; | 86 if (onData == null) onData = _nullDataHandler; |
87 if (onError == null) onError = _nullErrorHandler; | 87 if (onError == null) onError = _nullErrorHandler; |
88 if (onDone == null) onDone = _nullDoneHandler; | 88 if (onDone == null) onDone = _nullDoneHandler; |
89 unsubscribeOnError = identical(true, unsubscribeOnError); | 89 cancelOnError = identical(true, cancelOnError); |
90 _StreamSubscriptionImpl subscription = | 90 _StreamSubscriptionImpl subscription = |
91 _createSubscription(onData, onError, onDone, unsubscribeOnError); | 91 _createSubscription(onData, onError, onDone, cancelOnError); |
92 _addListener(subscription); | 92 _addListener(subscription); |
93 return subscription; | 93 return subscription; |
94 } | 94 } |
95 | 95 |
96 // ------------------------------------------------------------------ | 96 // ------------------------------------------------------------------ |
97 // EventSink interface-like methods for sending events into the stream. | 97 // EventSink interface-like methods for sending events into the stream. |
98 // It's the responsibility of the caller to ensure that the stream is not | 98 // It's the responsibility of the caller to ensure that the stream is not |
99 // paused when adding events. If the stream is paused, the events will be | 99 // paused when adding events. If the stream is paused, the events will be |
100 // queued, but it's better to not send events at all. | 100 // queued, but it's better to not send events at all. |
101 | 101 |
(...skipping 267 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
369 void _schedulePendingEvents() { | 369 void _schedulePendingEvents() { |
370 assert(_hasPendingEvent); | 370 assert(_hasPendingEvent); |
371 _pendingEvents.schedule(this); | 371 _pendingEvents.schedule(this); |
372 } | 372 } |
373 | 373 |
374 /** Create a subscription object. Called by [subcribe]. */ | 374 /** Create a subscription object. Called by [subcribe]. */ |
375 _StreamSubscriptionImpl<T> _createSubscription( | 375 _StreamSubscriptionImpl<T> _createSubscription( |
376 void onData(T data), | 376 void onData(T data), |
377 void onError(AsyncError error), | 377 void onError(AsyncError error), |
378 void onDone(), | 378 void onDone(), |
379 bool unsubscribeOnError); | 379 bool cancelOnError); |
380 | 380 |
381 /** | 381 /** |
382 * Adds a listener to this stream. | 382 * Adds a listener to this stream. |
383 */ | 383 */ |
384 void _addListener(_StreamSubscriptionImpl subscription); | 384 void _addListener(_StreamSubscriptionImpl subscription); |
385 | 385 |
386 /** | 386 /** |
387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. | 387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
388 * | 388 * |
389 * This method is called from [_StreamSubscriptionImpl.cancel]. | 389 * This method is called from [_StreamSubscriptionImpl.cancel]. |
(...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
587 _updatePauseCount(1); | 587 _updatePauseCount(1); |
588 } | 588 } |
589 | 589 |
590 /** | 590 /** |
591 * Create the new subscription object. | 591 * Create the new subscription object. |
592 */ | 592 */ |
593 _StreamSubscriptionImpl<T> _createSubscription( | 593 _StreamSubscriptionImpl<T> _createSubscription( |
594 void onData(T data), | 594 void onData(T data), |
595 void onError(AsyncError error), | 595 void onError(AsyncError error), |
596 void onDone(), | 596 void onDone(), |
597 bool unsubscribeOnError) { | 597 bool cancelOnError) { |
598 return new _StreamSubscriptionImpl<T>( | 598 return new _StreamSubscriptionImpl<T>( |
599 this, onData, onError, onDone, unsubscribeOnError); | 599 this, onData, onError, onDone, cancelOnError); |
600 } | 600 } |
601 | 601 |
602 void _addListener(_StreamListener subscription) { | 602 void _addListener(_StreamListener subscription) { |
603 assert(!_isComplete); | 603 assert(!_isComplete); |
604 if (_hasListener) { | 604 if (_hasListener) { |
605 throw new StateError("Stream already has subscriber."); | 605 throw new StateError("Stream already has subscriber."); |
606 } | 606 } |
607 assert(_pauseCount == 1); | 607 assert(_pauseCount == 1); |
608 _updatePauseCount(-1); | 608 _updatePauseCount(-1); |
609 _subscriber = subscription; | 609 _subscriber = subscription; |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
707 /** Whether there are currently any subscribers on this [Stream]. */ | 707 /** Whether there are currently any subscribers on this [Stream]. */ |
708 bool get _hasListener => !_InternalLinkList.isEmpty(this); | 708 bool get _hasListener => !_InternalLinkList.isEmpty(this); |
709 | 709 |
710 /** | 710 /** |
711 * Create the new subscription object. | 711 * Create the new subscription object. |
712 */ | 712 */ |
713 _StreamListener<T> _createSubscription( | 713 _StreamListener<T> _createSubscription( |
714 void onData(T data), | 714 void onData(T data), |
715 void onError(AsyncError error), | 715 void onError(AsyncError error), |
716 void onDone(), | 716 void onDone(), |
717 bool unsubscribeOnError) { | 717 bool cancelOnError) { |
718 return new _StreamSubscriptionImpl<T>( | 718 return new _StreamSubscriptionImpl<T>( |
719 this, onData, onError, onDone, unsubscribeOnError); | 719 this, onData, onError, onDone, cancelOnError); |
720 } | 720 } |
721 | 721 |
722 // ------------------------------------------------------------------- | 722 // ------------------------------------------------------------------- |
723 // Internal implementation. | 723 // Internal implementation. |
724 | 724 |
725 /** | 725 /** |
726 * Iterate over all current subscribers and perform an action on each. | 726 * Iterate over all current subscribers and perform an action on each. |
727 * | 727 * |
728 * The set of subscribers cannot be modified during this iteration. | 728 * The set of subscribers cannot be modified during this iteration. |
729 * All attempts to add or unsubscribe subscribers will be delayed until | 729 * All attempts to add or unsubscribe subscribers will be delayed until |
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
894 * [_StreamImpl.createSubscription]. | 894 * [_StreamImpl.createSubscription]. |
895 * | 895 * |
896 * The subscription is in one of three states: | 896 * The subscription is in one of three states: |
897 * * Subscribed. | 897 * * Subscribed. |
898 * * Paused-and-subscribed. | 898 * * Paused-and-subscribed. |
899 * * Unsubscribed. | 899 * * Unsubscribed. |
900 * Unsubscribing also resumes any pauses started by the subscription. | 900 * Unsubscribing also resumes any pauses started by the subscription. |
901 */ | 901 */ |
902 class _StreamSubscriptionImpl<T> extends _StreamListener<T> | 902 class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
903 implements StreamSubscription<T> { | 903 implements StreamSubscription<T> { |
904 final bool _unsubscribeOnError; | 904 final bool _cancelOnError; |
905 // TODO(ahe): Restore type when feature is implemented in dart2js | 905 // TODO(ahe): Restore type when feature is implemented in dart2js |
906 // checked mode. http://dartbug.com/7733 | 906 // checked mode. http://dartbug.com/7733 |
907 var /* _DataHandler<T> */ _onData; | 907 var /* _DataHandler<T> */ _onData; |
908 _ErrorHandler _onError; | 908 _ErrorHandler _onError; |
909 _DoneHandler _onDone; | 909 _DoneHandler _onDone; |
910 _StreamSubscriptionImpl(_StreamImpl source, | 910 _StreamSubscriptionImpl(_StreamImpl source, |
911 this._onData, | 911 this._onData, |
912 this._onError, | 912 this._onError, |
913 this._onDone, | 913 this._onDone, |
914 this._unsubscribeOnError) : super(source); | 914 this._cancelOnError) : super(source); |
915 | 915 |
916 void onData(void handleData(T event)) { | 916 void onData(void handleData(T event)) { |
917 if (handleData == null) handleData = _nullDataHandler; | 917 if (handleData == null) handleData = _nullDataHandler; |
918 _onData = handleData; | 918 _onData = handleData; |
919 } | 919 } |
920 | 920 |
921 void onError(void handleError(AsyncError error)) { | 921 void onError(void handleError(AsyncError error)) { |
922 if (handleError == null) handleError = _nullErrorHandler; | 922 if (handleError == null) handleError = _nullErrorHandler; |
923 _onError = handleError; | 923 _onError = handleError; |
924 } | 924 } |
925 | 925 |
926 void onDone(void handleDone()) { | 926 void onDone(void handleDone()) { |
927 if (handleDone == null) handleDone = _nullDoneHandler; | 927 if (handleDone == null) handleDone = _nullDoneHandler; |
928 _onDone = handleDone; | 928 _onDone = handleDone; |
929 } | 929 } |
930 | 930 |
931 void _sendData(T data) { | 931 void _sendData(T data) { |
932 _onData(data); | 932 _onData(data); |
933 } | 933 } |
934 | 934 |
935 void _sendError(AsyncError error) { | 935 void _sendError(AsyncError error) { |
936 _onError(error); | 936 _onError(error); |
937 if (_unsubscribeOnError) _source._cancel(this); | 937 if (_cancelOnError) _source._cancel(this); |
938 } | 938 } |
939 | 939 |
940 void _sendDone() { | 940 void _sendDone() { |
941 _onDone(); | 941 _onDone(); |
942 } | 942 } |
943 | 943 |
944 void cancel() { | 944 void cancel() { |
945 if (!_isSubscribed) return; | 945 if (!_isSubscribed) return; |
946 _source._cancel(this); | 946 _source._cancel(this); |
947 } | 947 } |
(...skipping 408 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1356 onError: this._addError, | 1356 onError: this._addError, |
1357 onDone: this._close); | 1357 onDone: this._close); |
1358 } else { | 1358 } else { |
1359 // TODO(lrn): Check why this can happen. | 1359 // TODO(lrn): Check why this can happen. |
1360 if (_subscription == null) return; | 1360 if (_subscription == null) return; |
1361 _subscription.cancel(); | 1361 _subscription.cancel(); |
1362 _subscription = null; | 1362 _subscription = null; |
1363 } | 1363 } |
1364 } | 1364 } |
1365 } | 1365 } |
OLD | NEW |