Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(380)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 12610006: Renamed StreamSink to EventSink. Renamed signalError to addError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Changed inheritance back! Now create StreamSink instead of EventSink where we create them. Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // States shared by single/multi stream implementations. 7 // States shared by single/multi stream implementations.
8 8
9 // Completion state of the stream. 9 // Completion state of the stream.
10 /// Initial and default state where the stream can receive and send events. 10 /// Initial and default state where the stream can receive and send events.
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
59 // ------------------------------------------------------------------- 59 // -------------------------------------------------------------------
60 // Common base class for single and multi-subscription streams. 60 // Common base class for single and multi-subscription streams.
61 // ------------------------------------------------------------------- 61 // -------------------------------------------------------------------
62 abstract class _StreamImpl<T> extends Stream<T> { 62 abstract class _StreamImpl<T> extends Stream<T> {
63 /** Current state of the stream. */ 63 /** Current state of the stream. */
64 int _state = _STREAM_OPEN; 64 int _state = _STREAM_OPEN;
65 65
66 /** 66 /**
67 * List of pending events. 67 * List of pending events.
68 * 68 *
69 * If events are added to the stream (using [_add], [_signalError] or [_done]) 69 * If events are added to the stream (using [_add], [_addError] or [_done])
70 * while the stream is paused, or while another event is firing, events will 70 * while the stream is paused, or while another event is firing, events will
71 * stored here. 71 * stored here.
72 * Also supports scheduling the events for later execution. 72 * Also supports scheduling the events for later execution.
73 */ 73 */
74 _PendingEvents _pendingEvents; 74 _PendingEvents _pendingEvents;
75 75
76 // ------------------------------------------------------------------ 76 // ------------------------------------------------------------------
77 // Stream interface. 77 // Stream interface.
78 78
79 StreamSubscription<T> listen(void onData(T data), 79 StreamSubscription<T> listen(void onData(T data),
80 { void onError(AsyncError error), 80 { void onError(AsyncError error),
81 void onDone(), 81 void onDone(),
82 bool unsubscribeOnError }) { 82 bool unsubscribeOnError }) {
83 if (_isComplete) { 83 if (_isComplete) {
84 return new _DoneSubscription(onDone); 84 return new _DoneSubscription(onDone);
85 } 85 }
86 if (onData == null) onData = _nullDataHandler; 86 if (onData == null) onData = _nullDataHandler;
87 if (onError == null) onError = _nullErrorHandler; 87 if (onError == null) onError = _nullErrorHandler;
88 if (onDone == null) onDone = _nullDoneHandler; 88 if (onDone == null) onDone = _nullDoneHandler;
89 unsubscribeOnError = identical(true, unsubscribeOnError); 89 unsubscribeOnError = identical(true, unsubscribeOnError);
90 _StreamSubscriptionImpl subscription = 90 _StreamSubscriptionImpl subscription =
91 _createSubscription(onData, onError, onDone, unsubscribeOnError); 91 _createSubscription(onData, onError, onDone, unsubscribeOnError);
92 _addListener(subscription); 92 _addListener(subscription);
93 return subscription; 93 return subscription;
94 } 94 }
95 95
96 // ------------------------------------------------------------------ 96 // ------------------------------------------------------------------
97 // StreamSink interface-like methods for sending events into the stream. 97 // EventSink interface-like methods for sending events into the stream.
98 // It's the responsibility of the caller to ensure that the stream is not 98 // It's the responsibility of the caller to ensure that the stream is not
99 // paused when adding events. If the stream is paused, the events will be 99 // paused when adding events. If the stream is paused, the events will be
100 // queued, but it's better to not send events at all. 100 // queued, but it's better to not send events at all.
101 101
102 /** 102 /**
103 * Send or queue a data event. 103 * Send or queue a data event.
104 */ 104 */
105 void _add(T value) { 105 void _add(T value) {
106 if (_isClosed) throw new StateError("Sending on closed stream"); 106 if (_isClosed) throw new StateError("Sending on closed stream");
107 if (!_mayFireState) { 107 if (!_mayFireState) {
108 // Not the time to send events. 108 // Not the time to send events.
109 _addPendingEvent(new _DelayedData<T>(value)); 109 _addPendingEvent(new _DelayedData<T>(value));
110 return; 110 return;
111 } 111 }
112 if (_hasPendingEvent) { 112 if (_hasPendingEvent) {
113 _addPendingEvent(new _DelayedData<T>(value)); 113 _addPendingEvent(new _DelayedData<T>(value));
114 } else { 114 } else {
115 _sendData(value); 115 _sendData(value);
116 } 116 }
117 _handlePendingEvents(); 117 _handlePendingEvents();
118 } 118 }
119 119
120 /** 120 /**
121 * Send or enqueue an error event. 121 * Send or enqueue an error event.
122 * 122 *
123 * If a subscription has requested to be unsubscribed on errors, 123 * If a subscription has requested to be unsubscribed on errors,
124 * it will be unsubscribed after receiving this event. 124 * it will be unsubscribed after receiving this event.
125 */ 125 */
126 void _signalError(AsyncError error) { 126 void _addError(AsyncError error) {
127 if (_isClosed) throw new StateError("Sending on closed stream"); 127 if (_isClosed) throw new StateError("Sending on closed stream");
128 if (!_mayFireState) { 128 if (!_mayFireState) {
129 // Not the time to send events. 129 // Not the time to send events.
130 _addPendingEvent(new _DelayedError(error)); 130 _addPendingEvent(new _DelayedError(error));
131 return; 131 return;
132 } 132 }
133 if (_hasPendingEvent) { 133 if (_hasPendingEvent) {
134 _addPendingEvent(new _DelayedError(error)); 134 _addPendingEvent(new _DelayedError(error));
135 } else { 135 } else {
136 _sendError(error); 136 _sendError(error);
(...skipping 418 matching lines...) Expand 10 before | Expand all | Expand 10 after
555 * 555 *
556 * The only public methods are those of [Stream], so instances of 556 * The only public methods are those of [Stream], so instances of
557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing 557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
558 * internal functionality. 558 * internal functionality.
559 * 559 *
560 * The [StreamController] is a public facing version of this class, with 560 * The [StreamController] is a public facing version of this class, with
561 * some methods made public. 561 * some methods made public.
562 * 562 *
563 * The user interface of [_SingleStreamImpl] are the following methods: 563 * The user interface of [_SingleStreamImpl] are the following methods:
564 * * [_add]: Add a data event to the stream. 564 * * [_add]: Add a data event to the stream.
565 * * [_signalError]: Add an error event to the stream. 565 * * [_addError]: Add an error event to the stream.
566 * * [_close]: Request to close the stream. 566 * * [_close]: Request to close the stream.
567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or 567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
568 * when losing the last subscriber. 568 * when losing the last subscriber.
569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. 569 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
570 * * [_hasSubscribers]: Test whether there are currently any subscribers. 570 * * [_hasSubscribers]: Test whether there are currently any subscribers.
571 * * [_isInputPaused]: Test whether the stream is currently paused. 571 * * [_isInputPaused]: Test whether the stream is currently paused.
572 * The user should not add new events while the stream is paused, but if it 572 * The user should not add new events while the stream is paused, but if it
573 * happens anyway, the stream will enqueue the events just as when new events 573 * happens anyway, the stream will enqueue the events just as when new events
574 * arrive while still firing an old event. 574 * arrive while still firing an old event.
575 */ 575 */
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
669 * 669 *
670 * The only public methods are those of [Stream], so instances of 670 * The only public methods are those of [Stream], so instances of
671 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing 671 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
672 * internal functionality. 672 * internal functionality.
673 * 673 *
674 * The [StreamController] is a public facing version of this class, with 674 * The [StreamController] is a public facing version of this class, with
675 * some methods made public. 675 * some methods made public.
676 * 676 *
677 * The user interface of [_MultiStreamImpl] are the following methods: 677 * The user interface of [_MultiStreamImpl] are the following methods:
678 * * [_add]: Add a data event to the stream. 678 * * [_add]: Add a data event to the stream.
679 * * [_signalError]: Add an error event to the stream. 679 * * [_addError]: Add an error event to the stream.
680 * * [_close]: Request to close the stream. 680 * * [_close]: Request to close the stream.
681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or 681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
682 * when losing the last subscriber. 682 * when losing the last subscriber.
683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. 683 * * [_onPauseStateChange]: Called when entering or leaving paused mode.
684 * * [_hasSubscribers]: Test whether there are currently any subscribers. 684 * * [_hasSubscribers]: Test whether there are currently any subscribers.
685 * * [_isPaused]: Test whether the stream is currently paused. 685 * * [_isPaused]: Test whether the stream is currently paused.
686 * The user should not add new events while the stream is paused, but if it 686 * The user should not add new events while the stream is paused, but if it
687 * happens anyway, the stream will enqueue the events just as when new events 687 * happens anyway, the stream will enqueue the events just as when new events
688 * arrive while still firing an old event. 688 * arrive while still firing an old event.
689 */ 689 */
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after
834 */ 834 */
835 _GeneratedSingleStreamImpl(_PendingEvents events) { 835 _GeneratedSingleStreamImpl(_PendingEvents events) {
836 _pendingEvents = events; 836 _pendingEvents = events;
837 _setClosed(); // Closed for input since all events are already pending. 837 _setClosed(); // Closed for input since all events are already pending.
838 } 838 }
839 839
840 void _add(T value) { 840 void _add(T value) {
841 throw new UnsupportedError("Cannot inject events into generated stream"); 841 throw new UnsupportedError("Cannot inject events into generated stream");
842 } 842 }
843 843
844 void _signalError(AsyncError value) { 844 void _addError(AsyncError value) {
845 throw new UnsupportedError("Cannot inject events into generated stream"); 845 throw new UnsupportedError("Cannot inject events into generated stream");
846 } 846 }
847 847
848 void _close() { 848 void _close() {
849 throw new UnsupportedError("Cannot inject events into generated stream"); 849 throw new UnsupportedError("Cannot inject events into generated stream");
850 } 850 }
851 } 851 }
852 852
853 853
854 /** Pending events object that gets its events from an [Iterable]. */ 854 /** Pending events object that gets its events from an [Iterable]. */
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after
1091 otherNext._previousLink = listLast; 1091 otherNext._previousLink = listLast;
1092 _InternalLink otherLast = other._previousLink; 1092 _InternalLink otherLast = other._previousLink;
1093 list._previousLink = otherLast; 1093 list._previousLink = otherLast;
1094 otherLast._nextLink = list; 1094 otherLast._nextLink = list;
1095 // Clean up [other]. 1095 // Clean up [other].
1096 other._nextLink = other._previousLink = other; 1096 other._nextLink = other._previousLink = other;
1097 } 1097 }
1098 } 1098 }
1099 1099
1100 /** Abstract type for an internal interface for sending events. */ 1100 /** Abstract type for an internal interface for sending events. */
1101 abstract class _StreamOutputSink<T> { 1101 abstract class _EventOutputSink<T> {
1102 _sendData(T data); 1102 _sendData(T data);
1103 _sendError(AsyncError error); 1103 _sendError(AsyncError error);
1104 _sendDone(); 1104 _sendDone();
1105 } 1105 }
1106 1106
1107 abstract class _StreamListener<T> extends _InternalLink 1107 abstract class _StreamListener<T> extends _InternalLink
1108 implements _StreamOutputSink<T> { 1108 implements _EventOutputSink<T> {
1109 final _StreamImpl _source; 1109 final _StreamImpl _source;
1110 int _state = _LISTENER_UNSUBSCRIBED; 1110 int _state = _LISTENER_UNSUBSCRIBED;
1111 1111
1112 _StreamListener(this._source); 1112 _StreamListener(this._source);
1113 1113
1114 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); 1114 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
1115 1115
1116 bool get _isPendingUnsubscribe => 1116 bool get _isPendingUnsubscribe =>
1117 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; 1117 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
1118 1118
(...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after
1325 } 1325 }
1326 1326
1327 /** 1327 /**
1328 * Subscribe or unsubscribe on [_source] depending on whether 1328 * Subscribe or unsubscribe on [_source] depending on whether
1329 * [_stream] has subscribers. 1329 * [_stream] has subscribers.
1330 */ 1330 */
1331 void _onSubscriptionStateChange() { 1331 void _onSubscriptionStateChange() {
1332 if (_hasSubscribers) { 1332 if (_hasSubscribers) {
1333 assert(_subscription == null); 1333 assert(_subscription == null);
1334 _subscription = _source.listen(this._add, 1334 _subscription = _source.listen(this._add,
1335 onError: this._signalError, 1335 onError: this._addError,
1336 onDone: this._close); 1336 onDone: this._close);
1337 } else { 1337 } else {
1338 // TODO(lrn): Check why this can happen. 1338 // TODO(lrn): Check why this can happen.
1339 if (_subscription == null) return; 1339 if (_subscription == null) return;
1340 _subscription.cancel(); 1340 _subscription.cancel();
1341 _subscription = null; 1341 _subscription = null;
1342 } 1342 }
1343 } 1343 }
1344 } 1344 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698