Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(388)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16131003: Reapply "Active stream subscriptions". (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Patch from sgjesse fixing file descriptor error. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 28 matching lines...) Expand all
39 * Whether to invoke a callback depends only on the state before and after 39 * Whether to invoke a callback depends only on the state before and after
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 class StreamController<T> extends EventSink<T> { 49 abstract class StreamController<T> implements EventSink<T> {
50 final _StreamImpl<T> stream; 50 /** The stream that this controller is controlling. */
51 Stream<T> get stream;
51 52
52 /** 53 /**
53 * A controller with a [stream] that supports only one single subscriber. 54 * A controller with a [stream] that supports only one single subscriber.
54 * 55 *
55 * The controller will buffer all incoming events until the subscriber is 56 * The controller will buffer all incoming events until the subscriber is
56 * registered. 57 * registered.
57 * 58 *
58 * The [onPause] function is called when the stream becomes 59 * The [onPause] function is called when the stream becomes
59 * paused. [onResume] is called when the stream resumed. 60 * paused. [onResume] is called when the stream resumed.
60 * 61 *
61 * The [onListen] callback is called when the stream 62 * The [onListen] callback is called when the stream
62 * receives its listener. [onCancel] when the listener cancels 63 * receives its listener and [onCancel] when the listener ends
63 * its subscription. 64 * its subscription.
64 * 65 *
65 * If the stream is canceled before the controller needs new data the 66 * If the stream is canceled before the controller needs new data the
66 * [onResume] call might not be executed. 67 * [onResume] call might not be executed.
67 */ 68 */
68 StreamController({void onListen(), 69 factory StreamController({void onListen(),
69 void onPause(), 70 void onPause(),
70 void onResume(), 71 void onResume(),
71 void onCancel()}) 72 void onCancel()})
72 : stream = new _SingleControllerStream<T>( 73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel);
73 onListen, onPause, onResume, onCancel); 74
75 /**
76 * A controller where [stream] creates new stream each time it is read.
77 *
78 * The controller distributes any events to all currently subscribed streams.
79 *
80 * The [onListen] callback is called when the first listener is subscribed,
81 * and the [onCancel] is called when there is no longer any active listeners.
82 * If a listener is added again later, after the [onCancel] was called,
83 * the [onListen] will be called again.
84 */
85 factory StreamController.multiplex({void onListen(), void onCancel()}) {
86 return new _MultiplexStreamController<T>(onListen, onCancel);
87 }
74 88
75 /** 89 /**
76 * Returns a view of this object that only exposes the [EventSink] interface. 90 * Returns a view of this object that only exposes the [EventSink] interface.
77 */ 91 */
78 EventSink<T> get sink => new _EventSinkView<T>(this); 92 EventSink<T> get sink;
79 93
80 /** 94 /**
81 * Whether the stream is closed for adding more events. 95 * Whether the stream is closed for adding more events.
82 * 96 *
83 * If true, the "done" event might not have fired yet, but it has been 97 * If true, the "done" event might not have fired yet, but it has been
84 * scheduled, and it is too late to add more events. 98 * scheduled, and it is too late to add more events.
85 */ 99 */
86 bool get isClosed => stream._isClosed; 100 bool get isClosed;
87 101
88 /** Whether one or more active subscribers have requested a pause. */ 102 /** Whether the subscription is active and paused. */
89 bool get isPaused => stream._isInputPaused; 103 bool get isPaused;
90 104
91 /** Whether there are currently any subscribers on this [Stream]. */ 105 /** Whether there is a subscriber on the [Stream]. */
92 bool get hasListener => stream._hasListener; 106 bool get hasListener;
107
108 /**
109 * Send or enqueue an error event.
110 *
111 * Also allows an objection stack trace object, on top of what [EventSink]
112 * allows.
113 */
114 void addError(Object error, [Object stackTrace]);
115 }
116
117
118 abstract class _StreamControllerLifecycle<T> {
119 void _recordListen(StreamSubscription<T> subscription) {}
120 void _recordPause(StreamSubscription<T> subscription) {}
121 void _recordResume(StreamSubscription<T> subscription) {}
122 void _recordCancel(StreamSubscription<T> subscription) {}
123 }
124
125 /**
126 * Default implementation of [StreamController].
127 *
128 * Controls a stream that only supports a single controller.
129 */
130 class _StreamControllerImpl<T> implements StreamController<T>,
131 _StreamControllerLifecycle<T> {
132 static const int _STATE_OPEN = 0;
133 static const int _STATE_CANCELLED = 1;
134 static const int _STATE_CLOSED = 2;
135
136 final _NotificationHandler _onListen;
137 final _NotificationHandler _onPause;
138 final _NotificationHandler _onResume;
139 final _NotificationHandler _onCancel;
140 _StreamImpl<T> _stream;
141
142 // An active subscription on the stream, or null if no subscripton is active.
143 _ControllerSubscription<T> _subscription;
144
145 // Whether we have sent a "done" event.
146 int _state = _STATE_OPEN;
147
148 // Events added to the stream before it has an active subscription.
149 _PendingEvents _pendingEvents = null;
150
151 _StreamControllerImpl(this._onListen,
152 this._onPause,
153 this._onResume,
154 this._onCancel) {
155 _stream = new _ControllerStream<T>(this);
156 }
157
158 Stream<T> get stream => _stream;
159
160 /**
161 * Returns a view of this object that only exposes the [EventSink] interface.
162 */
163 EventSink<T> get sink => new _EventSinkView<T>(this);
164
165 /**
166 * Whether a listener has existed and been cancelled.
167 *
168 * After this, adding more events will be ignored.
169 */
170 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
171
172 bool get isClosed => (_state & _STATE_CLOSED) != 0;
173
174 bool get isPaused => _subscription != null && _subscription._isInputPaused;
175
176 bool get hasListener => _subscription != null;
93 177
94 /** 178 /**
95 * Send or queue a data event. 179 * Send or queue a data event.
96 */ 180 */
97 void add(T value) => stream._add(value); 181 void add(T value) {
182 if (isClosed) throw new StateError("Adding event after close");
183 if (_subscription != null) {
184 _subscription._add(value);
185 } else if (!_isCancelled) {
186 _addPendingEvent(new _DelayedData<T>(value));
187 }
188 }
98 189
99 /** 190 /**
100 * Send or enqueue an error event. 191 * Send or enqueue an error event.
101 *
102 * If a subscription has requested to be unsubscribed on errors,
103 * it will be unsubscribed after receiving this event.
104 */ 192 */
105 void addError(Object error, [Object stackTrace]) { 193 void addError(Object error, [Object stackTrace]) {
194 if (isClosed) throw new StateError("Adding event after close");
106 if (stackTrace != null) { 195 if (stackTrace != null) {
107 // Force stack trace overwrite. Even if the error already contained 196 // Force stack trace overwrite. Even if the error already contained
108 // a stack trace. 197 // a stack trace.
109 _attachStackTrace(error, stackTrace); 198 _attachStackTrace(error, stackTrace);
110 } 199 }
111 stream._addError(error); 200 if (_subscription != null) {
112 } 201 _subscription._addError(error);
113 202 } else if (!_isCancelled) {
114 /** 203 _addPendingEvent(new _DelayedError(error));
115 * Send or enqueue a "done" message. 204 }
116 * 205 }
117 * The "done" message should be sent at most once by a stream, and it 206
118 * should be the last message sent. 207 /**
119 */ 208 * Closes this controller.
120 void close() { stream._close(); } 209 *
210 * After closing, no further events may be added using [add] or [addError].
211 *
212 * You are allowed to close the controller more than once, but only the first
213 * call has any effect.
214 *
215 * The first time a controller is closed, a "done" event is sent to its
216 * stream.
217 */
218 void close() {
219 if (isClosed) return;
220 _state |= _STATE_CLOSED;
221 if (_subscription != null) {
222 _subscription._close();
223 } else if (!_isCancelled) {
224 _addPendingEvent(const _DelayedDone());
225 }
226 }
227
228 void _addPendingEvent(_DelayedEvent event) {
229 if (_isCancelled) return;
230 _StreamImplEvents events = _pendingEvents;
231 if (events == null) {
232 events = _pendingEvents = new _StreamImplEvents();
233 }
234 events.add(event);
235 }
236
237 void _recordListen(_BufferingStreamSubscription<T> subscription) {
238 assert(_subscription == null);
239 _subscription = subscription;
240 subscription._setPendingEvents(_pendingEvents);
241 _pendingEvents = null;
242 subscription._guardCallback(() {
243 _runGuarded(_onListen);
244 });
245 }
246
247 void _recordCancel(StreamSubscription<T> subscription) {
248 assert(identical(_subscription, subscription));
249 _subscription = null;
250 _state |= _STATE_CANCELLED;
251 _runGuarded(_onCancel);
252 }
253
254 void _recordPause(StreamSubscription<T> subscription) {
255 _runGuarded(_onPause);
256 }
257
258 void _recordResume(StreamSubscription<T> subscription) {
259 _runGuarded(_onResume);
260 }
121 } 261 }
122 262
123 typedef void _NotificationHandler(); 263 typedef void _NotificationHandler();
124 264
125 class _SingleControllerStream<T> extends _SingleStreamImpl<T> { 265 void _runGuarded(_NotificationHandler notificationHandler) {
126 _NotificationHandler _onListen; 266 if (notificationHandler == null) return;
127 _NotificationHandler _onPause; 267 try {
128 _NotificationHandler _onResume; 268 notificationHandler();
129 _NotificationHandler _onCancel; 269 } catch (e, s) {
130 270 _throwDelayed(e, s);
131 // TODO(floitsch): share this code with _MultiControllerStream. 271 }
132 _runGuarded(_NotificationHandler notificationHandler) { 272 }
133 if (notificationHandler == null) return; 273
134 try { 274 class _ControllerStream<T> extends _StreamImpl<T> {
135 notificationHandler(); 275 _StreamControllerLifecycle<T> _controller;
136 } catch (e, s) { 276 bool _hasListener = false;
137 _throwDelayed(e, s); 277
138 } 278 _ControllerStream(this._controller);
139 } 279
140 280 StreamSubscription<T> _createSubscription(
141 _SingleControllerStream(this._onListen, 281 void onData(T data),
142 this._onPause, 282 void onError(Object error),
143 this._onResume, 283 void onDone(),
144 this._onCancel); 284 bool cancelOnError) {
145 285 if (_hasListener) {
146 void _onSubscriptionStateChange() { 286 throw new StateError("The stream has already been listened to.");
147 _runGuarded(_hasListener ? _onListen : _onCancel); 287 }
148 } 288 _hasListener = true;
149 289 return new _ControllerSubscription<T>(
150 void _onPauseStateChange() { 290 _controller, onData, onError, onDone, cancelOnError);
151 _runGuarded(_isPaused ? _onPause : _onResume); 291 }
152 } 292
153 } 293 void _onListen(_BufferingStreamSubscription subscription) {
294 _controller._recordListen(subscription);
295 }
296 }
297
298 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
299 final _StreamControllerLifecycle<T> _controller;
300
301 _ControllerSubscription(this._controller,
302 void onData(T data),
303 void onError(Object error),
304 void onDone(),
305 bool cancelOnError)
306 : super(onData, onError, onDone, cancelOnError);
307
308 void _onCancel() {
309 _controller._recordCancel(this);
310 }
311
312 void _onPause() {
313 _controller._recordPause(this);
314 }
315
316 void _onResume() {
317 _controller._recordResume(this);
318 }
319 }
320
321 class _MultiplexStreamController<T> implements StreamController<T>,
322 _StreamControllerLifecycle<T> {
323 final _NotificationHandler _onListen;
324 final _NotificationHandler _onCancel;
325 /** Set when the [close] method is called. */
326 bool _isClosed = false;
327
328 // TODO(lrn): Make a more efficient implementation of these subscriptions,
329 // e.g., the traditional double-linked list with concurrent add and remove
330 // while firing.
331 Set<_BufferingStreamSubscription<T>> _streams;
332
333 _MultiplexStreamController(this._onListen, this._onCancel)
334 : _streams = new Set<_BufferingStreamSubscription<T>>();
335
336 // StreamController interface.
337
338 Stream<T> get stream => new _ControllerStream<T>(this);
339
340 EventSink<T> get sink => new _EventSinkView<T>(this);
341
342 bool get isClosed => _isClosed;
343
344 /**
345 * A multiplex controller is never paused.
346 *
347 * Each receiving stream may be paused individually, and they handle their
348 * own buffering.
349 */
350 bool get isPaused => false;
351
352 /** Whether there are currently a subscriber on the [Stream]. */
353 bool get hasListener => !_streams.isEmpty;
354
355 // _StreamControllerLifecycle interface.
356
357 void _recordListen(_BufferingStreamSubscription<T> subscription) {
358 bool isFirst = _streams.isEmpty;
359 _streams.add(subscription);
360 if (isFirst) {
361 _runGuarded(_onListen);
362 }
363 }
364
365 void _recordCancel(_BufferingStreamSubscription<T> subscription) {
366 _streams.remove(subscription);
367 if (_streams.isEmpty) {
368 _runGuarded(_onCancel);
369 }
370 }
371
372 void _recordPause(StreamSubscription<T> subscription) {}
373 void _recordResume(StreamSubscription<T> subscription) {}
374
375 // EventSink interface.
376
377 void add(T data) {
378 if (_streams.isEmpty) return;
379 _forEachListener((_BufferingStreamSubscription<T> subscription) {
380 subscription._add(data);
381 });
382 }
383
384 void addError(Object error, [Object stackTrace]) {
385 if (_streams.isEmpty) return;
386 _forEachListener((_BufferingStreamSubscription<T> subscription) {
387 subscription._addError(error);
388 });
389 }
390
391 void close() {
392 _isClosed = true;
393 if (_streams.isEmpty) return;
394 _forEachListener((_BufferingStreamSubscription<T> subscription) {
395 _streams.remove(subscription);
396 subscription._close();
397 });
398 }
399
400 void _forEachListener(
401 void action(_BufferingStreamSubscription<T> subscription)) {
402 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList();
403 for (_BufferingStreamSubscription<T> subscription in subscriptions) {
404 if (_streams.contains(subscription)) {
405 action(subscription);
406 }
407 }
408 }
409 }
410
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698