Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(403)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16003002: Add StreamController.multiplex constructor. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Add more test. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/lib/async/stream_controller_async_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 28 matching lines...) Expand all
39 * Whether to invoke a callback depends only on the state before and after 39 * Whether to invoke a callback depends only on the state before and after
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 class StreamController<T> extends EventSink<T> { 49 abstract class StreamController<T> implements EventSink<T> {
50 /** The stream that this controller is controlling. */
51 Stream<T> get stream;
52
53 /**
54 * A controller with a [stream] that supports only one single subscriber.
55 *
56 * The controller will buffer all incoming events until the subscriber is
57 * registered.
58 *
59 * The [onPause] function is called when the stream becomes
60 * paused. [onResume] is called when the stream resumed.
61 *
62 * The [onListen] callback is called when the stream
63 * receives its listener and [onCancel] when the listener ends
64 * its subscription.
65 *
66 * If the stream is canceled before the controller needs new data the
67 * [onResume] call might not be executed.
68 */
69 factory StreamController({void onListen(),
70 void onPause(),
71 void onResume(),
72 void onCancel()})
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel);
74
75 /**
76 * A controller where [stream] creates new stream each time it is read.
77 *
78 * The controller distributes any events to all currently subscribed streams.
79 *
80 * The [onListen] callback is called when the first listener is subscribed,
81 * and the [onCancel] is called when there is no longer any active listeners.
82 * If a listener is added again later, after the [onCancel] was called,
83 * the [onListen] will be called again.
84 */
85 factory StreamController.multiplex({void onListen(), void onCancel()}) {
86 return new _MultiplexStreamController<T>(onListen, onCancel);
87 }
88
89 /**
90 * Returns a view of this object that only exposes the [EventSink] interface.
91 */
92 EventSink<T> get sink;
93 }
94
95
96 abstract class _StreamControllerLifecycle<T> {
97 void _recordListen(StreamSubscription<T> subscription) {}
98 void _recordPause(StreamSubscription<T> subscription) {}
99 void _recordResume(StreamSubscription<T> subscription) {}
100 void _recordCancel(StreamSubscription<T> subscription) {}
101 }
102
103 /**
104 * Default implementation of [StreamController].
105 *
106 * Controls a stream that only supports a single controller.
107 */
108 class _StreamControllerImpl<T> implements StreamController<T>,
109 _StreamControllerLifecycle<T> {
50 static const int _STATE_OPEN = 0; 110 static const int _STATE_OPEN = 0;
51 static const int _STATE_CANCELLED = 1; 111 static const int _STATE_CANCELLED = 1;
52 static const int _STATE_CLOSED = 2; 112 static const int _STATE_CLOSED = 2;
53 113
54 final _NotificationHandler _onListen; 114 final _NotificationHandler _onListen;
55 final _NotificationHandler _onPause; 115 final _NotificationHandler _onPause;
56 final _NotificationHandler _onResume; 116 final _NotificationHandler _onResume;
57 final _NotificationHandler _onCancel; 117 final _NotificationHandler _onCancel;
58 _StreamImpl<T> _stream; 118 _StreamImpl<T> _stream;
59 119
60 // An active subscription on the stream, or null if no subscripton is active. 120 // An active subscription on the stream, or null if no subscripton is active.
61 _ControllerSubscription<T> _subscription; 121 _ControllerSubscription<T> _subscription;
62 122
63 // Whether we have sent a "done" event. 123 // Whether we have sent a "done" event.
64 int _state = _STATE_OPEN; 124 int _state = _STATE_OPEN;
65 125
66 // Events added to the stream before it has an active subscription. 126 // Events added to the stream before it has an active subscription.
67 _PendingEvents _pendingEvents = null; 127 _PendingEvents _pendingEvents = null;
68 128
69 /** 129 _StreamControllerImpl(this._onListen,
70 * A controller with a [stream] that supports only one single subscriber. 130 this._onPause,
71 * 131 this._onResume,
72 * The controller will buffer all incoming events until the subscriber is 132 this._onCancel) {
73 * registered.
74 *
75 * The [onPause] function is called when the stream becomes
76 * paused. [onResume] is called when the stream resumed.
77 *
78 * The [onListen] callback is called when the stream
79 * receives its listener and [onCancel] when the listener ends
80 * its subscription.
81 *
82 * If the stream is canceled before the controller needs new data the
83 * [onResume] call might not be executed.
84 */
85 StreamController({void onListen(),
86 void onPause(),
87 void onResume(),
88 void onCancel()})
89 : _onListen = onListen,
90 _onPause = onPause,
91 _onResume = onResume,
92 _onCancel = onCancel {
93 _stream = new _ControllerStream<T>(this); 133 _stream = new _ControllerStream<T>(this);
94 } 134 }
95 135
96 Stream<T> get stream => _stream; 136 Stream<T> get stream => _stream;
97 137
98 /** 138 /**
99 * Returns a view of this object that only exposes the [EventSink] interface. 139 * Returns a view of this object that only exposes the [EventSink] interface.
100 */ 140 */
101 EventSink<T> get sink => new _EventSinkView<T>(this); 141 EventSink<T> get sink => new _EventSinkView<T>(this);
102 142
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
173 213
174 void _addPendingEvent(_DelayedEvent event) { 214 void _addPendingEvent(_DelayedEvent event) {
175 if (_isCancelled) return; 215 if (_isCancelled) return;
176 _StreamImplEvents events = _pendingEvents; 216 _StreamImplEvents events = _pendingEvents;
177 if (events == null) { 217 if (events == null) {
178 events = _pendingEvents = new _StreamImplEvents(); 218 events = _pendingEvents = new _StreamImplEvents();
179 } 219 }
180 events.add(event); 220 events.add(event);
181 } 221 }
182 222
183 void _recordListen(_BufferingStreamSubscription subscription) { 223 void _recordListen(StreamSubscription<T> subscription) {
floitsch 2013/05/24 20:51:42 Why not keep the type?
Lasse Reichstein Nielsen 2013/05/27 08:04:12 True, it is true for this subclass, and I avoid a
224 _BufferingStreamSubscription bufferingSubscription = subscription;
184 assert(_subscription == null); 225 assert(_subscription == null);
185 _subscription = subscription; 226 _subscription = bufferingSubscription;
186 subscription._setPendingEvents(_pendingEvents); 227 bufferingSubscription._setPendingEvents(_pendingEvents);
187 _pendingEvents = null; 228 _pendingEvents = null;
188 subscription._guardCallback(() { 229 bufferingSubscription._guardCallback(() {
189 _runGuarded(_onListen); 230 _runGuarded(_onListen);
190 }); 231 });
191 } 232 }
192 233
193 void _recordCancel() { 234 void _recordCancel(StreamSubscription<T> subscription) {
194 _subscription = null; 235 _subscription = null;
floitsch 2013/05/24 20:51:42 assert that the subscription is the same?
Lasse Reichstein Nielsen 2013/05/27 08:04:12 Done.
195 _state |= _STATE_CANCELLED; 236 _state |= _STATE_CANCELLED;
196 _runGuarded(_onCancel); 237 _runGuarded(_onCancel);
197 } 238 }
198 239
199 void _recordPause() { 240 void _recordPause(StreamSubscription<T> subscription) {
200 _runGuarded(_onPause); 241 _runGuarded(_onPause);
201 } 242 }
202 243
203 void _recordResume() { 244 void _recordResume(StreamSubscription<T> subscription) {
204 _runGuarded(_onResume); 245 _runGuarded(_onResume);
205 } 246 }
206 } 247 }
207 248
208 typedef void _NotificationHandler(); 249 typedef void _NotificationHandler();
209 250
210 void _runGuarded(_NotificationHandler notificationHandler) { 251 void _runGuarded(_NotificationHandler notificationHandler) {
211 if (notificationHandler == null) return; 252 if (notificationHandler == null) return;
212 try { 253 try {
213 notificationHandler(); 254 notificationHandler();
214 } catch (e, s) { 255 } catch (e, s) {
215 _throwDelayed(e, s); 256 _throwDelayed(e, s);
216 } 257 }
217 } 258 }
218 259
219 class _ControllerStream<T> extends _StreamImpl<T> { 260 class _ControllerStream<T> extends _StreamImpl<T> {
220 StreamController _controller; 261 _StreamControllerLifecycle<T> _controller;
221 bool _hasListener = false; 262 bool _hasListener = false;
222 263
223 _ControllerStream(this._controller); 264 _ControllerStream(this._controller);
224 265
225 StreamSubscription<T> _createSubscription( 266 StreamSubscription<T> _createSubscription(
226 void onData(T data), 267 void onData(T data),
227 void onError(Object error), 268 void onError(Object error),
228 void onDone(), 269 void onDone(),
229 bool cancelOnError) { 270 bool cancelOnError) {
230 if (_hasListener) { 271 if (_hasListener) {
231 throw new StateError("The stream has already been listened to."); 272 throw new StateError("The stream has already been listened to.");
232 } 273 }
233 _hasListener = true; 274 _hasListener = true;
234 return new _ControllerSubscription<T>( 275 return new _ControllerSubscription<T>(
235 _controller, onData, onError, onDone, cancelOnError); 276 _controller, onData, onError, onDone, cancelOnError);
236 } 277 }
237 278
238 void _onListen(_BufferingStreamSubscription subscription) { 279 void _onListen(_BufferingStreamSubscription subscription) {
239 _controller._recordListen(subscription); 280 _controller._recordListen(subscription);
240 } 281 }
241 } 282 }
242 283
243 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 284 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
244 final StreamController _controller; 285 final _StreamControllerLifecycle<T> _controller;
245 286
246 _ControllerSubscription(StreamController controller, 287 _ControllerSubscription(this._controller,
247 void onData(T data), 288 void onData(T data),
248 void onError(Object error), 289 void onError(Object error),
249 void onDone(), 290 void onDone(),
250 bool cancelOnError) 291 bool cancelOnError)
251 : _controller = controller, 292 : super(onData, onError, onDone, cancelOnError);
252 super(onData, onError, onDone, cancelOnError);
253 293
254 void _onCancel() { 294 void _onCancel() {
255 _controller._recordCancel(); 295 _controller._recordCancel(this);
256 } 296 }
257 297
258 void _onPause() { 298 void _onPause() {
259 _controller._recordPause(); 299 _controller._recordPause(this);
260 } 300 }
261 301
262 void _onResume() { 302 void _onResume() {
263 _controller._recordResume(); 303 _controller._recordResume(this);
264 } 304 }
265 } 305 }
306
307 class _MultiplexStreamController<T> implements StreamController<T>,
308 _StreamControllerLifecycle<T> {
309 final _NotificationHandler _onListen;
310 final _NotificationHandler _onCancel;
311 // TODO(lrn): Make a more efficient implementation of these subscriptions,
312 // e.g., the traditional double-linked list with concurrent add and remove
313 // while firing.
314 Set<_BufferingStreamSubscription<T>> _streams;
315
316 _MultiplexStreamController(this._onListen, this._onCancel)
317 : _streams = new Set<_BufferingStreamSubscription<T>>();
318
319 // StreamController interface.
320
321 Stream<T> get stream => new _ControllerStream<T>(this);
322
323 EventSink<T> get sink => new _EventSinkView<T>(this);
324
325 // _StreamControllerLifecycle interface.
326
327 void _recordListen(_BufferingStreamSubscription<T> subscription) {
328 bool isFirst = _streams.isEmpty;
329 _streams.add(subscription);
330 if (isFirst) {
331 _runGuarded(_onListen);
332 }
333 }
334
335 void _recordCancel(_BufferingStreamSubscription<T> subscription) {
336 _streams.remove(subscription);
337 if (_streams.isEmpty) {
338 _runGuarded(_onCancel);
339 }
340 }
341
342 void _recordPause(StreamSubscription<T> subscription) {}
343 void _recordResume(StreamSubscription<T> subscription) {}
344
345 // EventSink interface.
346
347 void add(T data) {
348 if (_streams.isEmpty) return;
349 _forEachListener((_BufferingStreamSubscription<T> subscription) {
350 subscription._add(data);
351 });
352 }
353
354 void addError(Object error) {
355 if (_streams.isEmpty) return;
356 _forEachListener((_BufferingStreamSubscription<T> subscription) {
357 subscription._addError(error);
358 });
359 }
360
361 void close() {
362 if (_streams.isEmpty) return;
363 _forEachListener((_BufferingStreamSubscription<T> subscription) {
364 _streams.remove(subscription);
365 subscription._close();
366 });
367 }
368
369 void _forEachListener(
370 void action(_BufferingStreamSubscription<T> subscription)) {
371 List<_BufferingStreamSubscription<T>> subscriptions = _streams.toList();
372 for (_BufferingStreamSubscription<T> subscription in subscriptions) {
373 if (_streams.contains(subscription)) {
374 action(subscription);
375 }
376 }
377 }
378 }
379
OLDNEW
« no previous file with comments | « no previous file | tests/lib/async/stream_controller_async_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698