Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(947)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 14251013: Rename unsubscribeOnError to cancelOnError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // States shared by single/multi stream implementations. 7 // States shared by single/multi stream implementations.
8 8
9 // Completion state of the stream. 9 // Completion state of the stream.
10 /// Initial and default state where the stream can receive and send events. 10 /// Initial and default state where the stream can receive and send events.
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 * Also supports scheduling the events for later execution. 72 * Also supports scheduling the events for later execution.
73 */ 73 */
74 _PendingEvents _pendingEvents; 74 _PendingEvents _pendingEvents;
75 75
76 // ------------------------------------------------------------------ 76 // ------------------------------------------------------------------
77 // Stream interface. 77 // Stream interface.
78 78
79 StreamSubscription<T> listen(void onData(T data), 79 StreamSubscription<T> listen(void onData(T data),
80 { void onError(AsyncError error), 80 { void onError(AsyncError error),
81 void onDone(), 81 void onDone(),
82 bool unsubscribeOnError }) { 82 bool cancelOnError }) {
83 if (_isComplete) { 83 if (_isComplete) {
84 return new _DoneSubscription(onDone); 84 return new _DoneSubscription(onDone);
85 } 85 }
86 if (onData == null) onData = _nullDataHandler; 86 if (onData == null) onData = _nullDataHandler;
87 if (onError == null) onError = _nullErrorHandler; 87 if (onError == null) onError = _nullErrorHandler;
88 if (onDone == null) onDone = _nullDoneHandler; 88 if (onDone == null) onDone = _nullDoneHandler;
89 unsubscribeOnError = identical(true, unsubscribeOnError); 89 cancelOnError = identical(true, cancelOnError);
90 _StreamSubscriptionImpl subscription = 90 _StreamSubscriptionImpl subscription =
91 _createSubscription(onData, onError, onDone, unsubscribeOnError); 91 _createSubscription(onData, onError, onDone, cancelOnError);
92 _addListener(subscription); 92 _addListener(subscription);
93 return subscription; 93 return subscription;
94 } 94 }
95 95
96 // ------------------------------------------------------------------ 96 // ------------------------------------------------------------------
97 // EventSink interface-like methods for sending events into the stream. 97 // EventSink interface-like methods for sending events into the stream.
98 // It's the responsibility of the caller to ensure that the stream is not 98 // It's the responsibility of the caller to ensure that the stream is not
99 // paused when adding events. If the stream is paused, the events will be 99 // paused when adding events. If the stream is paused, the events will be
100 // queued, but it's better to not send events at all. 100 // queued, but it's better to not send events at all.
101 101
(...skipping 267 matching lines...) Expand 10 before | Expand all | Expand 10 after
369 void _schedulePendingEvents() { 369 void _schedulePendingEvents() {
370 assert(_hasPendingEvent); 370 assert(_hasPendingEvent);
371 _pendingEvents.schedule(this); 371 _pendingEvents.schedule(this);
372 } 372 }
373 373
374 /** Create a subscription object. Called by [subcribe]. */ 374 /** Create a subscription object. Called by [subcribe]. */
375 _StreamSubscriptionImpl<T> _createSubscription( 375 _StreamSubscriptionImpl<T> _createSubscription(
376 void onData(T data), 376 void onData(T data),
377 void onError(AsyncError error), 377 void onError(AsyncError error),
378 void onDone(), 378 void onDone(),
379 bool unsubscribeOnError); 379 bool cancelOnError);
380 380
381 /** 381 /**
382 * Adds a listener to this stream. 382 * Adds a listener to this stream.
383 */ 383 */
384 void _addListener(_StreamSubscriptionImpl subscription); 384 void _addListener(_StreamSubscriptionImpl subscription);
385 385
386 /** 386 /**
387 * Handle a cancel requested from a [_StreamSubscriptionImpl]. 387 * Handle a cancel requested from a [_StreamSubscriptionImpl].
388 * 388 *
389 * This method is called from [_StreamSubscriptionImpl.cancel]. 389 * This method is called from [_StreamSubscriptionImpl.cancel].
(...skipping 197 matching lines...) Expand 10 before | Expand all | Expand 10 after
587 _updatePauseCount(1); 587 _updatePauseCount(1);
588 } 588 }
589 589
590 /** 590 /**
591 * Create the new subscription object. 591 * Create the new subscription object.
592 */ 592 */
593 _StreamSubscriptionImpl<T> _createSubscription( 593 _StreamSubscriptionImpl<T> _createSubscription(
594 void onData(T data), 594 void onData(T data),
595 void onError(AsyncError error), 595 void onError(AsyncError error),
596 void onDone(), 596 void onDone(),
597 bool unsubscribeOnError) { 597 bool cancelOnError) {
598 return new _StreamSubscriptionImpl<T>( 598 return new _StreamSubscriptionImpl<T>(
599 this, onData, onError, onDone, unsubscribeOnError); 599 this, onData, onError, onDone, cancelOnError);
600 } 600 }
601 601
602 void _addListener(_StreamListener subscription) { 602 void _addListener(_StreamListener subscription) {
603 assert(!_isComplete); 603 assert(!_isComplete);
604 if (_hasListener) { 604 if (_hasListener) {
605 throw new StateError("Stream already has subscriber."); 605 throw new StateError("Stream already has subscriber.");
606 } 606 }
607 assert(_pauseCount == 1); 607 assert(_pauseCount == 1);
608 _updatePauseCount(-1); 608 _updatePauseCount(-1);
609 _subscriber = subscription; 609 _subscriber = subscription;
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
707 /** Whether there are currently any subscribers on this [Stream]. */ 707 /** Whether there are currently any subscribers on this [Stream]. */
708 bool get _hasListener => !_InternalLinkList.isEmpty(this); 708 bool get _hasListener => !_InternalLinkList.isEmpty(this);
709 709
710 /** 710 /**
711 * Create the new subscription object. 711 * Create the new subscription object.
712 */ 712 */
713 _StreamListener<T> _createSubscription( 713 _StreamListener<T> _createSubscription(
714 void onData(T data), 714 void onData(T data),
715 void onError(AsyncError error), 715 void onError(AsyncError error),
716 void onDone(), 716 void onDone(),
717 bool unsubscribeOnError) { 717 bool cancelOnError) {
718 return new _StreamSubscriptionImpl<T>( 718 return new _StreamSubscriptionImpl<T>(
719 this, onData, onError, onDone, unsubscribeOnError); 719 this, onData, onError, onDone, cancelOnError);
720 } 720 }
721 721
722 // ------------------------------------------------------------------- 722 // -------------------------------------------------------------------
723 // Internal implementation. 723 // Internal implementation.
724 724
725 /** 725 /**
726 * Iterate over all current subscribers and perform an action on each. 726 * Iterate over all current subscribers and perform an action on each.
727 * 727 *
728 * The set of subscribers cannot be modified during this iteration. 728 * The set of subscribers cannot be modified during this iteration.
729 * All attempts to add or unsubscribe subscribers will be delayed until 729 * All attempts to add or unsubscribe subscribers will be delayed until
(...skipping 164 matching lines...) Expand 10 before | Expand all | Expand 10 after
894 * [_StreamImpl.createSubscription]. 894 * [_StreamImpl.createSubscription].
895 * 895 *
896 * The subscription is in one of three states: 896 * The subscription is in one of three states:
897 * * Subscribed. 897 * * Subscribed.
898 * * Paused-and-subscribed. 898 * * Paused-and-subscribed.
899 * * Unsubscribed. 899 * * Unsubscribed.
900 * Unsubscribing also resumes any pauses started by the subscription. 900 * Unsubscribing also resumes any pauses started by the subscription.
901 */ 901 */
902 class _StreamSubscriptionImpl<T> extends _StreamListener<T> 902 class _StreamSubscriptionImpl<T> extends _StreamListener<T>
903 implements StreamSubscription<T> { 903 implements StreamSubscription<T> {
904 final bool _unsubscribeOnError; 904 final bool _cancelOnError;
905 // TODO(ahe): Restore type when feature is implemented in dart2js 905 // TODO(ahe): Restore type when feature is implemented in dart2js
906 // checked mode. http://dartbug.com/7733 906 // checked mode. http://dartbug.com/7733
907 var /* _DataHandler<T> */ _onData; 907 var /* _DataHandler<T> */ _onData;
908 _ErrorHandler _onError; 908 _ErrorHandler _onError;
909 _DoneHandler _onDone; 909 _DoneHandler _onDone;
910 _StreamSubscriptionImpl(_StreamImpl source, 910 _StreamSubscriptionImpl(_StreamImpl source,
911 this._onData, 911 this._onData,
912 this._onError, 912 this._onError,
913 this._onDone, 913 this._onDone,
914 this._unsubscribeOnError) : super(source); 914 this._cancelOnError) : super(source);
915 915
916 void onData(void handleData(T event)) { 916 void onData(void handleData(T event)) {
917 if (handleData == null) handleData = _nullDataHandler; 917 if (handleData == null) handleData = _nullDataHandler;
918 _onData = handleData; 918 _onData = handleData;
919 } 919 }
920 920
921 void onError(void handleError(AsyncError error)) { 921 void onError(void handleError(AsyncError error)) {
922 if (handleError == null) handleError = _nullErrorHandler; 922 if (handleError == null) handleError = _nullErrorHandler;
923 _onError = handleError; 923 _onError = handleError;
924 } 924 }
925 925
926 void onDone(void handleDone()) { 926 void onDone(void handleDone()) {
927 if (handleDone == null) handleDone = _nullDoneHandler; 927 if (handleDone == null) handleDone = _nullDoneHandler;
928 _onDone = handleDone; 928 _onDone = handleDone;
929 } 929 }
930 930
931 void _sendData(T data) { 931 void _sendData(T data) {
932 _onData(data); 932 _onData(data);
933 } 933 }
934 934
935 void _sendError(AsyncError error) { 935 void _sendError(AsyncError error) {
936 _onError(error); 936 _onError(error);
937 if (_unsubscribeOnError) _source._cancel(this); 937 if (_cancelOnError) _source._cancel(this);
938 } 938 }
939 939
940 void _sendDone() { 940 void _sendDone() {
941 _onDone(); 941 _onDone();
942 } 942 }
943 943
944 void cancel() { 944 void cancel() {
945 if (!_isSubscribed) return; 945 if (!_isSubscribed) return;
946 _source._cancel(this); 946 _source._cancel(this);
947 } 947 }
(...skipping 408 matching lines...) Expand 10 before | Expand all | Expand 10 after
1356 onError: this._addError, 1356 onError: this._addError,
1357 onDone: this._close); 1357 onDone: this._close);
1358 } else { 1358 } else {
1359 // TODO(lrn): Check why this can happen. 1359 // TODO(lrn): Check why this can happen.
1360 if (_subscription == null) return; 1360 if (_subscription == null) return;
1361 _subscription.cancel(); 1361 _subscription.cancel();
1362 _subscription = null; 1362 _subscription = null;
1363 } 1363 }
1364 } 1364 }
1365 } 1365 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698