Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(239)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16003002: Add StreamController.multiplex constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: More methods on interface. Use multiplexer. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 28 matching lines...) Expand all
39 * Whether to invoke a callback depends only on the state before and after 39 * Whether to invoke a callback depends only on the state before and after
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 class StreamController<T> extends EventSink<T> { 49 abstract class StreamController<T> implements EventSink<T> {
50 /** The stream that this controller is controlling. */
51 Stream<T> get stream;
52
53 /**
54 * A controller with a [stream] that supports only one single subscriber.
55 *
56 * The controller will buffer all incoming events until the subscriber is
57 * registered.
58 *
59 * The [onPause] function is called when the stream becomes
60 * paused. [onResume] is called when the stream resumed.
61 *
62 * The [onListen] callback is called when the stream
63 * receives its listener and [onCancel] when the listener ends
64 * its subscription.
65 *
66 * If the stream is canceled before the controller needs new data the
67 * [onResume] call might not be executed.
68 */
69 factory StreamController({void onListen(),
70 void onPause(),
71 void onResume(),
72 void onCancel()})
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel);
74
75 /**
76 * A controller where [stream] creates new stream each time it is read.
77 *
78 * The controller distributes any events to all currently subscribed streams.
79 *
80 * The [onListen] callback is called when the first listener is subscribed,
81 * and the [onCancel] is called when there is no longer any active listeners.
82 * If a listener is added again later, after the [onCancel] was called,
83 * the [onListen] will be called again.
84 */
85 factory StreamController.multiplex({void onListen(), void onCancel()}) {
86 return new _MultiplexStreamController<T>(onListen, onCancel);
87 }
88
89 /**
90 * Returns a view of this object that only exposes the [EventSink] interface.
91 */
92 EventSink<T> get sink;
93
94 /**
95 * Whether the stream is closed for adding more events.
96 *
97 * If true, the "done" event might not have fired yet, but it has been
98 * scheduled, and it is too late to add more events.
99 */
100 bool get isClosed;
101
102 /** Whether the subscription is active and paused. */
103 bool get isPaused;
104
105 /** Whether there is currently a subscriber on the [Stream]. */
floitsch 2013/05/27 08:27:09 -currently-
Lasse Reichstein Nielsen 2013/05/27 11:17:29 Done.
Lasse Reichstein Nielsen 2013/05/27 11:17:29 Done.
106 bool get hasListener;
107 }
108
109
110 abstract class _StreamControllerLifecycle<T> {
111 void _recordListen(StreamSubscription<T> subscription) {}
112 void _recordPause(StreamSubscription<T> subscription) {}
113 void _recordResume(StreamSubscription<T> subscription) {}
114 void _recordCancel(StreamSubscription<T> subscription) {}
115 }
116
117 /**
118 * Default implementation of [StreamController].
119 *
120 * Controls a stream that only supports a single controller.
121 */
122 class _StreamControllerImpl<T> implements StreamController<T>,
123 _StreamControllerLifecycle<T> {
50 static const int _STATE_OPEN = 0; 124 static const int _STATE_OPEN = 0;
51 static const int _STATE_CANCELLED = 1; 125 static const int _STATE_CANCELLED = 1;
52 static const int _STATE_CLOSED = 2; 126 static const int _STATE_CLOSED = 2;
53 127
54 final _NotificationHandler _onListen; 128 final _NotificationHandler _onListen;
55 final _NotificationHandler _onPause; 129 final _NotificationHandler _onPause;
56 final _NotificationHandler _onResume; 130 final _NotificationHandler _onResume;
57 final _NotificationHandler _onCancel; 131 final _NotificationHandler _onCancel;
58 _StreamImpl<T> _stream; 132 _StreamImpl<T> _stream;
59 133
60 // An active subscription on the stream, or null if no subscripton is active. 134 // An active subscription on the stream, or null if no subscripton is active.
61 _ControllerSubscription<T> _subscription; 135 _ControllerSubscription<T> _subscription;
62 136
63 // Whether we have sent a "done" event. 137 // Whether we have sent a "done" event.
64 int _state = _STATE_OPEN; 138 int _state = _STATE_OPEN;
65 139
66 // Events added to the stream before it has an active subscription. 140 // Events added to the stream before it has an active subscription.
67 _PendingEvents _pendingEvents = null; 141 _PendingEvents _pendingEvents = null;
68 142
69 /** 143 _StreamControllerImpl(this._onListen,
70 * A controller with a [stream] that supports only one single subscriber. 144 this._onPause,
71 * 145 this._onResume,
72 * The controller will buffer all incoming events until the subscriber is 146 this._onCancel) {
73 * registered.
74 *
75 * The [onPause] function is called when the stream becomes
76 * paused. [onResume] is called when the stream resumed.
77 *
78 * The [onListen] callback is called when the stream
79 * receives its listener and [onCancel] when the listener ends
80 * its subscription.
81 *
82 * If the stream is canceled before the controller needs new data the
83 * [onResume] call might not be executed.
84 */
85 StreamController({void onListen(),
86 void onPause(),
87 void onResume(),
88 void onCancel()})
89 : _onListen = onListen,
90 _onPause = onPause,
91 _onResume = onResume,
92 _onCancel = onCancel {
93 _stream = new _ControllerStream<T>(this); 147 _stream = new _ControllerStream<T>(this);
94 } 148 }
95 149
96 Stream<T> get stream => _stream; 150 Stream<T> get stream => _stream;
97 151
98 /** 152 /**
99 * Returns a view of this object that only exposes the [EventSink] interface. 153 * Returns a view of this object that only exposes the [EventSink] interface.
100 */ 154 */
101 EventSink<T> get sink => new _EventSinkView<T>(this); 155 EventSink<T> get sink => new _EventSinkView<T>(this);
102 156
103 /** 157 /**
104 * Whether a listener has existed and been cancelled. 158 * Whether a listener has existed and been cancelled.
105 * 159 *
106 * After this, adding more events will be ignored. 160 * After this, adding more events will be ignored.
107 */ 161 */
108 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; 162 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
109 163
110 /**
111 * Whether the stream is closed for adding more events.
112 *
113 * If true, the "done" event might not have fired yet, but it has been
114 * scheduled, and it is too late to add more events.
115 */
116 bool get isClosed => (_state & _STATE_CLOSED) != 0; 164 bool get isClosed => (_state & _STATE_CLOSED) != 0;
117 165
118 /** Whether the subscription is active and paused. */
119 bool get isPaused => _subscription != null && _subscription._isInputPaused; 166 bool get isPaused => _subscription != null && _subscription._isInputPaused;
120 167
121 /** Whether there are currently a subscriber on the [Stream]. */
122 bool get hasListener => _subscription != null; 168 bool get hasListener => _subscription != null;
123 169
124 /** 170 /**
125 * Send or queue a data event. 171 * Send or queue a data event.
126 */ 172 */
127 void add(T value) { 173 void add(T value) {
128 if (isClosed) throw new StateError("Adding event after close"); 174 if (isClosed) throw new StateError("Adding event after close");
129 if (_subscription != null) { 175 if (_subscription != null) {
130 _subscription._add(value); 176 _subscription._add(value);
131 } else if (!_isCancelled) { 177 } else if (!_isCancelled) {
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
173 219
174 void _addPendingEvent(_DelayedEvent event) { 220 void _addPendingEvent(_DelayedEvent event) {
175 if (_isCancelled) return; 221 if (_isCancelled) return;
176 _StreamImplEvents events = _pendingEvents; 222 _StreamImplEvents events = _pendingEvents;
177 if (events == null) { 223 if (events == null) {
178 events = _pendingEvents = new _StreamImplEvents(); 224 events = _pendingEvents = new _StreamImplEvents();
179 } 225 }
180 events.add(event); 226 events.add(event);
181 } 227 }
182 228
183 void _recordListen(_BufferingStreamSubscription subscription) { 229 void _recordListen(_BufferingStreamSubscription<T> subscription) {
184 assert(_subscription == null); 230 assert(_subscription == null);
185 _subscription = subscription; 231 _subscription = subscription;
186 subscription._setPendingEvents(_pendingEvents); 232 subscription._setPendingEvents(_pendingEvents);
187 _pendingEvents = null; 233 _pendingEvents = null;
188 subscription._guardCallback(() { 234 subscription._guardCallback(() {
189 _runGuarded(_onListen); 235 _runGuarded(_onListen);
190 }); 236 });
191 } 237 }
192 238
193 void _recordCancel() { 239 void _recordCancel(StreamSubscription<T> subscription) {
240 assert(identical(_subscription, subscription));
194 _subscription = null; 241 _subscription = null;
195 _state |= _STATE_CANCELLED; 242 _state |= _STATE_CANCELLED;
196 _runGuarded(_onCancel); 243 _runGuarded(_onCancel);
197 } 244 }
198 245
199 void _recordPause() { 246 void _recordPause(StreamSubscription<T> subscription) {
200 _runGuarded(_onPause); 247 _runGuarded(_onPause);
201 } 248 }
202 249
203 void _recordResume() { 250 void _recordResume(StreamSubscription<T> subscription) {
204 _runGuarded(_onResume); 251 _runGuarded(_onResume);
205 } 252 }
206 } 253 }
207 254
208 typedef void _NotificationHandler(); 255 typedef void _NotificationHandler();
209 256
210 void _runGuarded(_NotificationHandler notificationHandler) { 257 void _runGuarded(_NotificationHandler notificationHandler) {
211 if (notificationHandler == null) return; 258 if (notificationHandler == null) return;
212 try { 259 try {
213 notificationHandler(); 260 notificationHandler();
214 } catch (e, s) { 261 } catch (e, s) {
215 _throwDelayed(e, s); 262 _throwDelayed(e, s);
216 } 263 }
217 } 264 }
218 265
219 class _ControllerStream<T> extends _StreamImpl<T> { 266 class _ControllerStream<T> extends _StreamImpl<T> {
220 StreamController _controller; 267 _StreamControllerLifecycle<T> _controller;
221 bool _hasListener = false; 268 bool _hasListener = false;
222 269
223 _ControllerStream(this._controller); 270 _ControllerStream(this._controller);
224 271
225 StreamSubscription<T> _createSubscription( 272 StreamSubscription<T> _createSubscription(
226 void onData(T data), 273 void onData(T data),
227 void onError(Object error), 274 void onError(Object error),
228 void onDone(), 275 void onDone(),
229 bool cancelOnError) { 276 bool cancelOnError) {
230 if (_hasListener) { 277 if (_hasListener) {
231 throw new StateError("The stream has already been listened to."); 278 throw new StateError("The stream has already been listened to.");
232 } 279 }
233 _hasListener = true; 280 _hasListener = true;
234 return new _ControllerSubscription<T>( 281 return new _ControllerSubscription<T>(
235 _controller, onData, onError, onDone, cancelOnError); 282 _controller, onData, onError, onDone, cancelOnError);
236 } 283 }
237 284
238 void _onListen(_BufferingStreamSubscription subscription) { 285 void _onListen(_BufferingStreamSubscription subscription) {
239 _controller._recordListen(subscription); 286 _controller._recordListen(subscription);
240 } 287 }
241 } 288 }
242 289
243 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 290 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
244 final StreamController _controller; 291 final _StreamControllerLifecycle<T> _controller;
245 292
246 _ControllerSubscription(StreamController controller, 293 _ControllerSubscription(this._controller,
247 void onData(T data), 294 void onData(T data),
248 void onError(Object error), 295 void onError(Object error),
249 void onDone(), 296 void onDone(),
250 bool cancelOnError) 297 bool cancelOnError)
251 : _controller = controller, 298 : super(onData, onError, onDone, cancelOnError);
252 super(onData, onError, onDone, cancelOnError);
253 299
254 void _onCancel() { 300 void _onCancel() {
255 _controller._recordCancel(); 301 _controller._recordCancel(this);
256 } 302 }
257 303
258 void _onPause() { 304 void _onPause() {
259 _controller._recordPause(); 305 _controller._recordPause(this);
260 } 306 }
261 307
262 void _onResume() { 308 void _onResume() {
263 _controller._recordResume(); 309 _controller._recordResume(this);
264 } 310 }
265 } 311 }
312
313 class _MultiplexStreamController<T> implements StreamController<T>,
314 _StreamControllerLifecycle<T> {
315 final _NotificationHandler _onListen;
316 final _NotificationHandler _onCancel;
317 /** Set when the [close] method is called. */
318 bool _isClosed = false;
319
320 // TODO(lrn): Make a more efficient implementation of these subscriptions,
321 // e.g., the traditional double-linked list with concurrent add and remove
322 // while firing.
323 Set<_BufferingStreamSubscription<T>> _streams;
324
325 _MultiplexStreamController(this._onListen, this._onCancel)
326 : _streams = new Set<_BufferingStreamSubscription<T>>();
327
328 // StreamController interface.
329
330 Stream<T> get stream => new _ControllerStream<T>(this);
331
332 EventSink<T> get sink => new _EventSinkView<T>(this);
333
334 bool get isClosed => _isClosed;
335
336 /**
337 * A multiplex controller is never paused.
338 *
339 * Each receiving stream may be paused individually, and they handle their
340 * own buffering.
341 */
342 bool get isPaused => false;
343
344 /** Whether there are currently a subscriber on the [Stream]. */
345 bool get hasListener => !_streams.isEmpty;
346
347 // _StreamControllerLifecycle interface.
348
349 void _recordListen(_BufferingStreamSubscription<T> subscription) {
350 bool isFirst = _streams.isEmpty;
351 _streams.add(subscription);
352 if (isFirst) {
353 _runGuarded(_onListen);
354 }
355 }
356
357 void _recordCancel(_BufferingStreamSubscription<T> subscription) {
358 _streams.remove(subscription);
359 if (_streams.isEmpty) {
360 _runGuarded(_onCancel);
361 }
362 }
363
364 void _recordPause(StreamSubscription<T> subscription) {}
365 void _recordResume(StreamSubscription<T> subscription) {}
366
367 // EventSink interface.
368
369 void add(T data) {
370 if (_streams.isEmpty) return;
371 _forEachListener((_BufferingStreamSubscription<T> subscription) {
372 subscription._add(data);
373 });
374 }
375
376 void addError(Object error) {
377 if (_streams.isEmpty) return;
378 _forEachListener((_BufferingStreamSubscription<T> subscription) {
379 subscription._addError(error);
380 });
381 }
382
383 void close() {
384 _isClosed = true;
385 if (_streams.isEmpty) return;
386 _forEachListener((_BufferingStreamSubscription<T> subscription) {
387 _streams.remove(subscription);
388 subscription._close();
389 });
390 }
391
392 void _forEachListener(
393 void action(_BufferingStreamSubscription<T> subscription)) {
394 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList();
395 for (_BufferingStreamSubscription<T> subscription in subscriptions) {
396 if (_streams.contains(subscription)) {
397 action(subscription);
398 }
399 }
400 }
401 }
402
OLDNEW
« no previous file with comments | « pkg/scheduled_test/lib/src/mock_clock.dart ('k') | sdk/lib/mdv_observe_impl/mdv_observe_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698