Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Made tests run (mostly) Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 29 matching lines...) Expand all
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 class StreamController<T> extends EventSink<T> { 49 class StreamController<T> extends EventSink<T> {
50 final _StreamImpl<T> stream; 50 static const int _STATE_CANCELLED = 1;
51 static const int _STATE_CLOSED = 2;
52
53 final _NotificationHandler _onListen;
54 final _NotificationHandler _onPause;
55 final _NotificationHandler _onResume;
56 final _NotificationHandler _onCancel;
57 _StreamImpl<T> _stream;
58
59 // An active subscription on the stream, or null if no subscripton is active.
60 _ControllerSubscription<T> _subscription;
61
62 // Whether we have sent a "done" event.
floitsch 2013/05/22 16:26:29 "Wether" is not right. Also, what does "0" mean?
Lasse Reichstein Nielsen 2013/05/24 06:02:49 I see "Whether"?
floitsch 2013/05/24 13:53:41 I meant that this doesn't look a boolean. "Whether
63 int _state = 0;
64
65 // Events added to the stream before it has an active subscription.
66 _PendingEvents _pendingEvents = null;
51 67
52 /** 68 /**
53 *
54 * If the stream is canceled before the controller needs new data the 69 * If the stream is canceled before the controller needs new data the
55 * [onResume] call might not be executed. 70 * [onResume] call might not be executed.
56 : stream = new _MultiControllerStream<T>(
57 onListen, onPause, onResume, onCancel);
floitsch 2013/05/22 16:26:29 You will have to merge this. I removed lines 53 to
Lasse Reichstein Nielsen 2013/05/24 06:02:49 ACK.
58 * A controller with a [stream] that supports only one single subscriber. 71 * A controller with a [stream] that supports only one single subscriber.
59 * 72 *
60 * The controller will buffer all incoming events until the subscriber is 73 * The controller will buffer all incoming events until the subscriber is
61 * registered. 74 * registered.
62 * 75 *
63 * The [onPause] function is called when the stream becomes 76 * The [onPause] function is called when the stream becomes
64 * paused. [onResume] is called when the stream resumed. 77 * paused. [onResume] is called when the stream resumed.
65 * 78 *
66 * The [onListen] callback is called when the stream 79 * The [onListen] callback is called when the stream
67 * receives its listener. [onCancel] when the listener cancels 80 * receives its listener. [onCancel] when the listener cancels
68 * its subscription. 81 * its subscription.
69 * 82 *
70 * If the stream is canceled before the controller needs new data the 83 * If the stream is canceled before the controller needs new data the
71 * [onResume] call might not be executed. 84 * [onResume] call might not be executed.
72 */ 85 */
73 StreamController({void onListen(), 86 StreamController({void onListen(),
74 void onPause(), 87 void onPause(),
75 void onResume(), 88 void onResume(),
76 void onCancel()}) 89 void onCancel()})
77 : stream = new _SingleControllerStream<T>( 90 : _onListen = onListen,
78 onListen, onPause, onResume, onCancel); 91 _onPause = onPause,
92 _onResume = onResume,
93 _onCancel = onCancel {
94 _stream = new _ControllerStream<T>(this);
95 }
96
97 Stream<T> get stream => _stream;
79 98
80 /** 99 /**
81 * Returns a view of this object that only exposes the [EventSink] interface. 100 * Returns a view of this object that only exposes the [EventSink] interface.
82 */ 101 */
83 EventSink<T> get sink => new _EventSinkView<T>(this); 102 EventSink<T> get sink => new _EventSinkView<T>(this);
84 103
85 /** 104 /**
105 * Whether a listener has existed and been cancelled.
106 *
107 * After this, adding more events will be ignored.
108 */
109 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
110
111 /**
86 * Whether the stream is closed for adding more events. 112 * Whether the stream is closed for adding more events.
87 * 113 *
88 * If true, the "done" event might not have fired yet, but it has been 114 * If true, the "done" event might not have fired yet, but it has been
89 * scheduled, and it is too late to add more events. 115 * scheduled, and it is too late to add more events.
90 */ 116 */
91 bool get isClosed => stream._isClosed; 117 bool get isClosed => (_state & _STATE_CLOSED) != 0;
92 118
93 /** Whether one or more active subscribers have requested a pause. */ 119 /** Whether the subscription is active and paused. */
94 bool get isPaused => stream._isInputPaused; 120 bool get isPaused => _subscription != null && _subscription._isInputPaused;
95 121
96 /** Whether there are currently any subscribers on this [Stream]. */ 122 /** Whether there are currently any subscribers on the [Stream]. */
floitsch 2013/05/22 16:26:29 there is currently a subscriber on the [Stream].
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
97 bool get hasListener => stream._hasListener; 123 bool get hasListener => _subscription != null;
98 124
99 /** 125 /**
100 * Send or queue a data event. 126 * Send or queue a data event.
101 */ 127 */
102 void add(T value) => stream._add(value); 128 void add(T value) {
129 if (isClosed) throw new StateError("Adding event after close");
130 if (_subscription != null) {
131 _subscription._add(value);
132 } else if (!_isCancelled) {
133 _addPendingEvent(new _DelayedData<T>(value));
134 }
135 }
103 136
104 /** 137 /**
105 * Send or enqueue an error event. 138 * Send or enqueue an error event.
106 * 139 *
107 * If a subscription has requested to be unsubscribed on errors, 140 * If the subscription has requested to be unsubscribed on errors,
floitsch 2013/05/22 16:26:29 Should we keep this here? It's not really a Contro
Lasse Reichstein Nielsen 2013/05/24 06:02:49 I'm fine with removing it.
108 * it will be unsubscribed after receiving this event. 141 * it will be unsubscribed after receiving this event.
109 */ 142 */
110 void addError(Object error, [Object stackTrace]) { 143 void addError(Object error, [Object stackTrace]) {
144 if (isClosed) throw new StateError("Adding event after close");
111 if (stackTrace != null) { 145 if (stackTrace != null) {
112 // Force stack trace overwrite. Even if the error already contained 146 // Force stack trace overwrite. Even if the error already contained
113 // a stack trace. 147 // a stack trace.
114 _attachStackTrace(error, stackTrace); 148 _attachStackTrace(error, stackTrace);
115 } 149 }
116 stream._addError(error); 150 if (_subscription != null) {
151 _subscription._addError(error);
152 } else if (!_isCancelled) {
153 _addPendingEvent(new _DelayedError(error));
154 }
117 } 155 }
118 156
119 /** 157 /**
120 * Send or enqueue a "done" message. 158 * Send or enqueue a "done" message.
121 * 159 *
122 * The "done" message should be sent at most once by a stream, and it 160 * The "done" message should be sent at most once by a stream, and it
floitsch 2013/05/22 16:26:29 Too many "should"s. Maybe: Closes this controller.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 sounds fine.
123 * should be the last message sent. 161 * should be the last message sent.
124 */ 162 */
125 void close() { stream._close(); } 163 void close() {
164 if (isClosed) return;
165 _state |= _STATE_CLOSED;
166 if (_subscription != null) {
167 _subscription._close();
168 } else if (!_isCancelled) {
169 _addPendingEvent(const _DelayedDone());
170 }
171 }
172
173 void _addPendingEvent(_DelayedEvent event) {
174 if (_isCancelled) return;
175 _StreamImplEvents events = _pendingEvents;
176 if (events == null) {
177 events = _pendingEvents = new _StreamImplEvents();
178 }
179 events.add(event);
180 }
181
182 void _recordListen(_BufferingStreamSubscription subscription) {
183 assert(_subscription == null);
184 _subscription = subscription;
185 _pendingEvents = null; // These have been taken over by the stream.
floitsch 2013/05/22 16:26:29 by the subscription. I would prefer if we transfe
Lasse Reichstein Nielsen 2013/05/24 06:02:49 ok, will change to not pass them in the constructo
186 _subscription._guardCallback(() {
187 _runGuarded(_onListen);
188 });
189 }
190
191 void _recordCancel() {
192 _subscription = null;
193 _state |= _STATE_CANCELLED;
194 _runGuarded(_onCancel);
195 }
196
197 void _recordPause() {
198 _runGuarded(_onPause);
199 }
200
201 void _recordResume() {
202 _runGuarded(_onResume);
203 }
126 } 204 }
127 205
128 typedef void _NotificationHandler(); 206 typedef void _NotificationHandler();
129 207
130 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { 208 void _runGuarded(_NotificationHandler notificationHandler) {
131 _NotificationHandler _onListen; 209 if (notificationHandler == null) return;
132 _NotificationHandler _onPause; 210 try {
133 _NotificationHandler _onResume; 211 notificationHandler();
134 _NotificationHandler _onCancel; 212 } catch (e, s) {
213 _throwDelayed(e, s);
214 }
215 }
135 216
136 // TODO(floitsch): share this code with _MultiControllerStream. 217 class _ControllerStream<T> extends _StreamImpl<T> {
137 _runGuarded(_NotificationHandler notificationHandler) { 218 StreamController _controller;
138 if (notificationHandler == null) return; 219 bool _hasListener = false;
139 try { 220
140 notificationHandler(); 221 _ControllerStream(this._controller);
141 } catch (e, s) { 222
142 _throwDelayed(e, s); 223 StreamSubscription<T> _createSubscription(
224 void onData(T data),
225 void onError(Object error),
226 void onDone(),
227 bool cancelOnError) {
228 if (_hasListener) {
229 try {
230 throw 0;
231 } catch (e, s) {
232 print("LISTEN TWICE(#$hashCode)\n$s");
233 }
234 throw new StateError("The stream has already been listened to.");
143 } 235 }
236 //try { throw 0; } catch (e, s) { print("LISTEN ONCE(#$hashCode):\n$s"); }
237 _hasListener = true;
238 return new _ControllerSubscription<T>(
239 _controller, onData, onError, onDone, cancelOnError);
144 } 240 }
145 241
146 _SingleControllerStream(this._onListen, 242 void _onListen(_BufferingStreamSubscription subscription) {
147 this._onPause, 243 _controller._recordListen(subscription);
148 this._onResume, 244 }
149 this._onCancel); 245 }
150 246
151 void _onSubscriptionStateChange() { 247 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
152 _runGuarded(_hasListener ? _onListen : _onCancel); 248 final StreamController _controller;
249
250 _ControllerSubscription(StreamController controller,
251 void onData(T data),
252 void onError(Object error),
253 void onDone(),
254 bool cancelOnError)
255 : _controller = controller,
256 super(onData, onError, onDone, cancelOnError,
257 controller._pendingEvents);
258
259 void _onCancel() {
260 _controller._recordCancel();
153 } 261 }
154 262
155 void _onPauseStateChange() { 263 void _onPause() {
156 _runGuarded(_isPaused ? _onPause : _onResume); 264 _controller._recordPause();
265 }
266
267 void _onResume() {
268 _controller._recordResume();
157 } 269 }
158 } 270 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698