Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(380)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove remaining debugging prints. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 29 matching lines...) Expand all
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 class StreamController<T> extends EventSink<T> { 49 class StreamController<T> extends EventSink<T> {
50 final _StreamImpl<T> stream; 50 static const int _STATE_OPEN = 0;
51 static const int _STATE_CANCELLED = 1;
52 static const int _STATE_CLOSED = 2;
53
54 final _NotificationHandler _onListen;
55 final _NotificationHandler _onPause;
56 final _NotificationHandler _onResume;
57 final _NotificationHandler _onCancel;
58 _StreamImpl<T> _stream;
59
60 // An active subscription on the stream, or null if no subscripton is active.
61 _ControllerSubscription<T> _subscription;
62
63 // Whether we have sent a "done" event.
64 int _state = _STATE_OPEN;
65
66 // Events added to the stream before it has an active subscription.
67 _PendingEvents _pendingEvents = null;
51 68
52 /** 69 /**
53 * A controller with a [stream] that supports only one single subscriber. 70 * A controller with a [stream] that supports only one single subscriber.
54 * 71 *
55 * The controller will buffer all incoming events until the subscriber is 72 * The controller will buffer all incoming events until the subscriber is
56 * registered. 73 * registered.
57 * 74 *
58 * The [onPause] function is called when the stream becomes 75 * The [onPause] function is called when the stream becomes
59 * paused. [onResume] is called when the stream resumed. 76 * paused. [onResume] is called when the stream resumed.
60 * 77 *
61 * The [onListen] callback is called when the stream 78 * The [onListen] callback is called when the stream
62 * receives its listener. [onCancel] when the listener cancels 79 * receives its listener and [onCancel] when the listener ends
63 * its subscription. 80 * its subscription.
64 * 81 *
65 * If the stream is canceled before the controller needs new data the 82 * If the stream is canceled before the controller needs new data the
66 * [onResume] call might not be executed. 83 * [onResume] call might not be executed.
67 */ 84 */
68 StreamController({void onListen(), 85 StreamController({void onListen(),
69 void onPause(), 86 void onPause(),
70 void onResume(), 87 void onResume(),
71 void onCancel()}) 88 void onCancel()})
72 : stream = new _SingleControllerStream<T>( 89 : _onListen = onListen,
73 onListen, onPause, onResume, onCancel); 90 _onPause = onPause,
91 _onResume = onResume,
92 _onCancel = onCancel {
93 _stream = new _ControllerStream<T>(this);
94 }
95
96 Stream<T> get stream => _stream;
74 97
75 /** 98 /**
76 * Returns a view of this object that only exposes the [EventSink] interface. 99 * Returns a view of this object that only exposes the [EventSink] interface.
77 */ 100 */
78 EventSink<T> get sink => new _EventSinkView<T>(this); 101 EventSink<T> get sink => new _EventSinkView<T>(this);
79 102
80 /** 103 /**
104 * Whether a listener has existed and been cancelled.
105 *
106 * After this, adding more events will be ignored.
107 */
108 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
109
110 /**
81 * Whether the stream is closed for adding more events. 111 * Whether the stream is closed for adding more events.
82 * 112 *
83 * If true, the "done" event might not have fired yet, but it has been 113 * If true, the "done" event might not have fired yet, but it has been
84 * scheduled, and it is too late to add more events. 114 * scheduled, and it is too late to add more events.
85 */ 115 */
86 bool get isClosed => stream._isClosed; 116 bool get isClosed => (_state & _STATE_CLOSED) != 0;
87 117
88 /** Whether one or more active subscribers have requested a pause. */ 118 /** Whether the subscription is active and paused. */
89 bool get isPaused => stream._isInputPaused; 119 bool get isPaused => _subscription != null && _subscription._isInputPaused;
90 120
91 /** Whether there are currently any subscribers on this [Stream]. */ 121 /** Whether there are currently a subscriber on the [Stream]. */
92 bool get hasListener => stream._hasListener; 122 bool get hasListener => _subscription != null;
93 123
94 /** 124 /**
95 * Send or queue a data event. 125 * Send or queue a data event.
96 */ 126 */
97 void add(T value) => stream._add(value); 127 void add(T value) {
128 if (isClosed) throw new StateError("Adding event after close");
129 if (_subscription != null) {
130 _subscription._add(value);
131 } else if (!_isCancelled) {
132 _addPendingEvent(new _DelayedData<T>(value));
133 }
134 }
98 135
99 /** 136 /**
100 * Send or enqueue an error event. 137 * Send or enqueue an error event.
101 *
102 * If a subscription has requested to be unsubscribed on errors,
103 * it will be unsubscribed after receiving this event.
104 */ 138 */
105 void addError(Object error, [Object stackTrace]) { 139 void addError(Object error, [Object stackTrace]) {
140 if (isClosed) throw new StateError("Adding event after close");
106 if (stackTrace != null) { 141 if (stackTrace != null) {
107 // Force stack trace overwrite. Even if the error already contained 142 // Force stack trace overwrite. Even if the error already contained
108 // a stack trace. 143 // a stack trace.
109 _attachStackTrace(error, stackTrace); 144 _attachStackTrace(error, stackTrace);
110 } 145 }
111 stream._addError(error); 146 if (_subscription != null) {
147 _subscription._addError(error);
148 } else if (!_isCancelled) {
149 _addPendingEvent(new _DelayedError(error));
150 }
112 } 151 }
113 152
114 /** 153 /**
115 * Send or enqueue a "done" message. 154 * Closes this controller.
116 * 155 *
117 * The "done" message should be sent at most once by a stream, and it 156 * After closing, no further events may be added using [add] or [addError].
118 * should be the last message sent. 157 *
158 * You are allowed to close the controller more than once, but only the first
159 * call has any effect.
160 *
161 * The first time a controller is closed, a "done" event is sent to its
162 * stream.
119 */ 163 */
120 void close() { stream._close(); } 164 void close() {
165 if (isClosed) return;
166 _state |= _STATE_CLOSED;
167 if (_subscription != null) {
168 _subscription._close();
169 } else if (!_isCancelled) {
170 _addPendingEvent(const _DelayedDone());
171 }
172 }
173
174 void _addPendingEvent(_DelayedEvent event) {
175 if (_isCancelled) return;
176 _StreamImplEvents events = _pendingEvents;
177 if (events == null) {
178 events = _pendingEvents = new _StreamImplEvents();
179 }
180 events.add(event);
181 }
182
183 void _recordListen(_BufferingStreamSubscription subscription) {
floitsch 2013/05/24 15:53:17 add function comment.
184 assert(_subscription == null);
185 _subscription = subscription;
186 subscription._setPendingEvents(_pendingEvents);
187 _pendingEvents = null;
188 subscription._guardCallback(() {
floitsch 2013/05/24 15:53:17 Explain why you don't call _runGuarded directly.
189 _runGuarded(_onListen);
190 });
191 }
192
193 void _recordCancel() {
194 _subscription = null;
195 _state |= _STATE_CANCELLED;
196 _runGuarded(_onCancel);
197 }
198
199 void _recordPause() {
200 _runGuarded(_onPause);
201 }
202
203 void _recordResume() {
204 _runGuarded(_onResume);
205 }
121 } 206 }
122 207
123 typedef void _NotificationHandler(); 208 typedef void _NotificationHandler();
124 209
125 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { 210 void _runGuarded(_NotificationHandler notificationHandler) {
126 _NotificationHandler _onListen; 211 if (notificationHandler == null) return;
127 _NotificationHandler _onPause; 212 try {
128 _NotificationHandler _onResume; 213 notificationHandler();
129 _NotificationHandler _onCancel; 214 } catch (e, s) {
215 _throwDelayed(e, s);
216 }
217 }
130 218
131 // TODO(floitsch): share this code with _MultiControllerStream. 219 class _ControllerStream<T> extends _StreamImpl<T> {
132 _runGuarded(_NotificationHandler notificationHandler) { 220 StreamController _controller;
133 if (notificationHandler == null) return; 221 bool _hasListener = false;
134 try { 222
135 notificationHandler(); 223 _ControllerStream(this._controller);
136 } catch (e, s) { 224
137 _throwDelayed(e, s); 225 StreamSubscription<T> _createSubscription(
226 void onData(T data),
227 void onError(Object error),
228 void onDone(),
229 bool cancelOnError) {
230 if (_hasListener) {
231 throw new StateError("The stream has already been listened to.");
138 } 232 }
233 _hasListener = true;
234 return new _ControllerSubscription<T>(
235 _controller, onData, onError, onDone, cancelOnError);
139 } 236 }
140 237
141 _SingleControllerStream(this._onListen, 238 void _onListen(_BufferingStreamSubscription subscription) {
142 this._onPause, 239 _controller._recordListen(subscription);
143 this._onResume, 240 }
144 this._onCancel); 241 }
145 242
146 void _onSubscriptionStateChange() { 243 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
147 _runGuarded(_hasListener ? _onListen : _onCancel); 244 final StreamController _controller;
245
246 _ControllerSubscription(StreamController controller,
247 void onData(T data),
248 void onError(Object error),
249 void onDone(),
250 bool cancelOnError)
251 : _controller = controller,
252 super(onData, onError, onDone, cancelOnError);
253
254 void _onCancel() {
255 _controller._recordCancel();
148 } 256 }
149 257
150 void _onPauseStateChange() { 258 void _onPause() {
151 _runGuarded(_isPaused ? _onPause : _onResume); 259 _controller._recordPause();
260 }
261
262 void _onResume() {
263 _controller._recordResume();
152 } 264 }
153 } 265 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698