Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Complete rewrite. StreamController is now itself a StreamSink. Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 28 matching lines...) Expand all
39 * Whether to invoke a callback depends only on the state before and after 39 * Whether to invoke a callback depends only on the state before and after
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 abstract class StreamController<T> implements EventSink<T> { 49 abstract class StreamController<T> implements StreamSink<T> {
50 /** The stream that this controller is controlling. */ 50 /** The stream that this controller is controlling. */
51 Stream<T> get stream; 51 Stream<T> get stream;
52 52
53 /** 53 /**
54 * A controller with a [stream] that supports only one single subscriber. 54 * A controller with a [stream] that supports only one single subscriber.
55 * 55 *
56 * If [sync] is true, events may be passed directly to the stream's listener 56 * If [sync] is true, events may be passed directly to the stream's listener
57 * during an [add], [addError] or [close] call. If [sync] is false, the event 57 * during an [add], [addError] or [close] call. If [sync] is false, the event
58 * will be passed to the listener at a later time, after the code creating 58 * will be passed to the listener at a later time, after the code creating
59 * the event has returned. 59 * the event has returned.
60 * 60 *
61 * The controller will buffer all incoming events until the subscriber is 61 * The controller will buffer all incoming events until the subscriber is
62 * registered. 62 * registered.
63 * 63 *
64 * The [onPause] function is called when the stream becomes 64 * The [onPause] function is called when the stream becomes
65 * paused. [onResume] is called when the stream resumed. 65 * paused. [onResume] is called when the stream resumed.
66 * 66 *
67 * The [onListen] callback is called when the stream 67 * The [onListen] callback is called when the stream
68 * receives its listener and [onCancel] when the listener ends 68 * receives its listener and [onCancel] when the listener ends
69 * its subscription. 69 * its subscription.
70 * 70 *
71 * If the stream is canceled before the controller needs new data the 71 * If the stream is canceled before the controller needs new data the
72 * [onResume] call might not be executed. 72 * [onResume] call might not be executed.
73 */ 73 */
74 factory StreamController({void onListen(), 74 factory StreamController({void onListen(),
75 void onPause(), 75 void onPause(),
76 void onResume(), 76 void onResume(),
77 void onCancel(), 77 void onCancel(),
78 bool sync: false}) 78 bool sync: false}) {
79 => sync 79 if (onListen == null && onPause == null &&
80 onResume == null && onCancel == null) {
81 return sync
82 ? new _NoCallbackSyncStreamController<T>()
83 : new _NoCallbackAsyncStreamController<T>();
84 }
85 return sync
80 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
81 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
88 }
82 89
83 /** 90 /**
84 * A controller where [stream] can be listened to more than once. 91 * A controller where [stream] can be listened to more than once.
85 * 92 *
86 * The [Stream] returned by [stream] is a broadcast stream. It can be listened 93 * The [Stream] returned by [stream] is a broadcast stream. It can be listened
87 * to more than once. 94 * to more than once.
88 * 95 *
89 * The controller distributes any events to all currently subscribed 96 * The controller distributes any events to all currently subscribed
90 * listeners. 97 * listeners.
91 * It is not allowed to call [add], [addError], or [close] before a previous 98 * It is not allowed to call [add], [addError], or [close] before a previous
(...skipping 24 matching lines...) Expand all
116 */ 123 */
117 factory StreamController.broadcast({void onListen(), 124 factory StreamController.broadcast({void onListen(),
118 void onCancel(), 125 void onCancel(),
119 bool sync: false}) { 126 bool sync: false}) {
120 return sync 127 return sync
121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 128 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); 129 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
123 } 130 }
124 131
125 /** 132 /**
126 * Returns a view of this object that only exposes the [EventSink] interface. 133 * Returns a view of this object that only exposes the [StreamSink] interface.
127 */ 134 */
128 EventSink<T> get sink; 135 StreamSink<T> get sink;
129 136
130 /** 137 /**
131 * Whether the stream is closed for adding more events. 138 * Whether the stream is closed for adding more events.
132 * 139 *
133 * If true, the "done" event might not have fired yet, but it has been 140 * If true, the "done" event might not have _ yet, but it has been
floitsch 2013/06/27 15:15:19 undo change?
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
134 * scheduled, and it is too late to add more events. 141 * scheduled, and it is too late to add more events.
135 */ 142 */
136 bool get isClosed; 143 bool get isClosed;
137 144
138 /** 145 /**
139 * Whether the subscription would need to buffer events. 146 * Whether the subscription would need to buffer events.
140 * 147 *
141 * This is the case if the controller's stream has a listener and it is 148 * This is the case if the controller's stream has a listener and it is
142 * paused, or if it has not received a listener yet. In that case, the 149 * paused, or if it has not received a listener yet. In that case, the
143 * controller is considered paused as well. 150 * controller is considered paused as well.
(...skipping 11 matching lines...) Expand all
155 * Send or enqueue an error event. 162 * Send or enqueue an error event.
156 * 163 *
157 * Also allows an objection stack trace object, on top of what [EventSink] 164 * Also allows an objection stack trace object, on top of what [EventSink]
158 * allows. 165 * allows.
159 */ 166 */
160 void addError(Object error, [Object stackTrace]); 167 void addError(Object error, [Object stackTrace]);
161 } 168 }
162 169
163 170
164 abstract class _StreamControllerLifecycle<T> { 171 abstract class _StreamControllerLifecycle<T> {
165 void _recordListen(StreamSubscription<T> subscription) {} 172 StreamSubscription<T> _subscribe(void onData(T data),
173 void onError(Object error),
174 void onDone(),
175 bool cancelOnError);
166 void _recordPause(StreamSubscription<T> subscription) {} 176 void _recordPause(StreamSubscription<T> subscription) {}
167 void _recordResume(StreamSubscription<T> subscription) {} 177 void _recordResume(StreamSubscription<T> subscription) {}
168 void _recordCancel(StreamSubscription<T> subscription) {} 178 void _recordCancel(StreamSubscription<T> subscription) {}
169 } 179 }
170 180
171 /** 181 /**
172 * Default implementation of [StreamController]. 182 * Default implementation of [StreamController].
173 * 183 *
174 * Controls a stream that only supports a single controller. 184 * Controls a stream that only supports a single controller.
175 */ 185 */
176 abstract class _StreamController<T> implements StreamController<T>, 186 abstract class _StreamController<T> implements StreamController<T>,
177 _StreamControllerLifecycle<T>, 187 _StreamControllerLifecycle<T>,
188 _EventSink<T>,
178 _EventDispatch<T> { 189 _EventDispatch<T> {
179 static const int _STATE_OPEN = 0; 190 // The states are bit-flags. More than one can be set at a time.
180 static const int _STATE_CANCELLED = 1; 191 //
181 static const int _STATE_CLOSED = 2; 192 // The "subscription state" goes through the states:
193 // initial -> subscribed -> canceled.
194 // These are mutually exclusive.
195 // The "closed" state records whether the [close] method has been called
196 // on the controller. This can be done at any time. If done before
197 // subscription, the done event is queued. If done after cancel, the done
198 // event is ignored (just as any other event after a cancel).
182 199
183 final _NotificationHandler _onListen; 200 /** The controller is in its initial state with no subscription. */
184 final _NotificationHandler _onPause; 201 static const int _STATE_INITIAL = 0;
185 final _NotificationHandler _onResume; 202 /** The controller has a subscription, but hasn't been closed or canceled. */
186 final _NotificationHandler _onCancel; 203 static const int _STATE_SUBSCRIBED = 1;
187 _StreamImpl<T> _stream; 204 /** The subscription is canceled. */
205 static const int _STATE_CANCELED = 2;
206 /** Mask for the subscription state. */
207 static const int _STATE_SUBSCRIPTION_MASK = 3;
188 208
189 // An active subscription on the stream, or null if no subscripton is active. 209 // The following state relate to the controller, not the subscription.
190 _ControllerSubscription<T> _subscription; 210 // If closed, adding more events is not allowed.
191 211 // If executing an [addStream], now events are not allowed either, but will
floitsch 2013/06/27 15:15:19 -now- Move description of _STATE_ADDSTREAM below.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 now -> new. Reworded and reordered.
192 // Whether we have sent a "done" event. 212 // be added by the stream.
193 int _state = _STATE_OPEN; 213 /** The controller is closed due to calling [close]. */
194 214 static const int _STATE_CLOSED = 4;
195 // Events added to the stream before it has an active subscription. 215 /** The controller is in the middle of an [addStream] call. */
196 _PendingEvents _pendingEvents = null; 216 static const int _STATE_ADDSTREAM = 8;
197
198 _StreamController(this._onListen,
199 this._onPause,
200 this._onResume,
201 this._onCancel) {
202 _stream = new _ControllerStream<T>(this);
203 }
204
205 Stream<T> get stream => _stream;
206 217
207 /** 218 /**
208 * Returns a view of this object that only exposes the [EventSink] interface. 219 * Field containing different data depending on the current subscription
220 * state.
221 *
222 * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents]
223 * for events added to the controller before a subscription.
224 *
225 * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription.
226 *
227 * When [_state] is [_STATE_CANCELED] the field is currently not used.
209 */ 228 */
210 EventSink<T> get sink => new _EventSinkView<T>(this); 229 var _varData;
230
231 /** Current state of the controller. */
232 int _state = _STATE_INITIAL;
211 233
212 /** 234 /**
213 * Whether a listener has existed and been cancelled. 235 * Future completed when the stream sends its last event.
236 *
237 * This is also the future returned by [close].
238 */
239 // TODO(lrn): Could this be stored in the varData field too, if it's not
240 // accessed until the call to "close"? Then we need to special case if it's
241 // accessed earlier, or if close is called before subscribing.
242 _FutureImpl _doneFuture;
243
244 _StreamController();
245
246 _NotificationHandler get _onListen;
247 _NotificationHandler get _onPause;
248 _NotificationHandler get _onResume;
249 _NotificationHandler get _onCancel;
250
251 // Return a new stream every time. The streams are equal, but not identical.
252 Stream<T> get stream => new _ControllerStream(this);
253
254 /**
255 * Returns a view of this object that only exposes the [StreamSink] interface.
256 */
257 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
258
259 /**
260 * Whether a listener has existed and been canceled.
214 * 261 *
215 * After this, adding more events will be ignored. 262 * After this, adding more events will be ignored.
216 */ 263 */
217 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; 264 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
265
266 /** Whether there is an active listener. */
267 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
268
269 /** Whether there has not been a listener yet. */
270 bool get _isInitialState =>
271 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
218 272
219 bool get isClosed => (_state & _STATE_CLOSED) != 0; 273 bool get isClosed => (_state & _STATE_CLOSED) != 0;
220 274
221 bool get isPaused => hasListener ? _subscription._isInputPaused 275 bool get isPaused => hasListener ? _subscription._isInputPaused
222 : !_isCancelled; 276 : !_isCanceled;
223 277
224 bool get hasListener => _subscription != null; 278 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
279
280 /** New events may not be added after close, or during addStream. */
281 bool get _mayAddEvent => (_state < _STATE_CLOSED);
282
283 // Returns the pending events.
284 // Pending events are events added before a subscription exists.
285 // They are added to the subscription when it is created.
286 // Pending events, if any, are kept in the _varData field until the
287 // stream is listened to.
288 // While adding a stream, pending events are moved into the
289 // state object to allow the state object to use the _varData field.
290 _PendingEvents get _pendingEvents {
291 assert(_isInitialState);
292 if (!_isAddingStream) {
293 return _varData;
294 }
295 _StreamControllerAddStreamState state = _varData;
296 return state.varData;
297 }
298
299 // Returns the pending events, and creates the object if necessary.
300 _StreamImplEvents _ensurePendingEvents() {
301 assert(_isInitialState);
302 if (!_isAddingStream) {
303 if (_varData == null) _varData = new _StreamImplEvents();
304 return _varData;
305 }
306 _StreamControllerAddStreamState state = _varData;
307 if (state.varData == null) state.varData = new _StreamImplEvents();
308 return state.varData;
309 }
310
311 // Get the current subscription.
312 // If we are adding a stream, the subscription is moved into the state
313 // object to allow the state object to use the _varData field.
314 _ControllerSubscription get _subscription {
315 assert(hasListener);
316 if (_isAddingStream) {
317 _StreamControllerAddStreamState addState = _varData;
318 return addState.varData;
319 }
320 return _varData;
321 }
225 322
226 /** 323 /**
227 * Send or queue a data event. 324 * Creates an error describing why an event cannot be added.
325 *
326 * The reason, and therefore the error message, depends on the current state.
327 */
328 Error _badEventState() {
329 if (isClosed) {
330 return new StateError("Cannot add event after closing");
331 }
332 assert(_isAddingStream);
333 return new StateError("Cannot add event while adding a stream");
334 }
335
336 // StreamSink interface.
337 Future addStream(Stream<T> source) {
338 if (!_mayAddEvent) throw _badEventState();
339 if (_isCanceled) return new _FutureImpl.immediate(null);
340 _StreamControllerAddStreamState addState =
341 new _StreamControllerAddStreamState(this, _varData, source);
342 _varData = addState;
343 _state |= _STATE_ADDSTREAM;
344 return addState.addStreamFuture;
345 }
346
347 Future get done => _ensureDoneFuture();
348
349 Future _ensureDoneFuture() {
350 if (_doneFuture == null) {
351 _doneFuture = new _FutureImpl();
352 if (_isCanceled) _doneFuture._setValue(null);
353 }
354 return _doneFuture;
355 }
356
357 /**
358 * Send or enqueue a data event.
228 */ 359 */
229 void add(T value) { 360 void add(T value) {
230 if (isClosed) throw new StateError("Adding event after close"); 361 if (!_mayAddEvent) throw _badEventState();
231 if (_subscription != null) { 362 _add(value);
232 _sendData(value);
233 } else if (!_isCancelled) {
234 _addPendingEvent(new _DelayedData<T>(value));
235 }
236 } 363 }
237 364
238 /** 365 /**
239 * Send or enqueue an error event. 366 * Send or enqueue an error event.
240 */ 367 */
241 void addError(Object error, [Object stackTrace]) { 368 void addError(Object error, [Object stackTrace]) {
242 if (isClosed) throw new StateError("Adding event after close"); 369 if (!_mayAddEvent) throw _badEventState();
243 if (stackTrace != null) { 370 if (stackTrace != null) {
244 // Force stack trace overwrite. Even if the error already contained 371 // Force stack trace overwrite. Even if the error already contained
245 // a stack trace. 372 // a stack trace.
246 _attachStackTrace(error, stackTrace); 373 _attachStackTrace(error, stackTrace);
247 } 374 }
248 if (_subscription != null) { 375 if (hasListener) {
249 _sendError(error); 376 _sendError(error);
250 } else if (!_isCancelled) { 377 } else if (_isInitialState) {
251 _addPendingEvent(new _DelayedError(error)); 378 _ensurePendingEvents().add(new _DelayedError(error));
252 } 379 }
253 } 380 }
254 381
255 /** 382 /**
256 * Closes this controller. 383 * Closes this controller.
257 * 384 *
258 * After closing, no further events may be added using [add] or [addError]. 385 * After closing, no further events may be added using [add] or [addError].
259 * 386 *
260 * You are allowed to close the controller more than once, but only the first 387 * You are allowed to close the controller more than once, but only the first
261 * call has any effect. 388 * call has any effect.
262 * 389 *
263 * The first time a controller is closed, a "done" event is sent to its 390 * The first time a controller is closed, a "done" event is sent to its
264 * stream. 391 * stream.
265 */ 392 */
266 void close() { 393 Future close() {
267 if (isClosed) return; 394 if (isClosed) {
395 assert(_doneFuture != null); // Was set when close was first called.
396 return _doneFuture;
397 }
398 if (!_mayAddEvent) throw _badEventState();
268 _state |= _STATE_CLOSED; 399 _state |= _STATE_CLOSED;
269 if (_subscription != null) { 400 _ensureDoneFuture();
401 if (hasListener) {
270 _sendDone(); 402 _sendDone();
271 } else if (!_isCancelled) { 403 } else if (_isInitialState) {
272 _addPendingEvent(const _DelayedDone()); 404 _ensurePendingEvents().add(const _DelayedDone());
405 }
406 return _doneFuture;
407 }
408
409 // EventSink interface. Used by the [addStream] events.
410
411 // Add data event, used both by the [addStream] events and by [add].
412 void _add(T value) {
413 if (hasListener) {
414 _sendData(value);
415 } else if (_isInitialState) {
416 _ensurePendingEvents().add(new _DelayedData<T>(value));
273 } 417 }
274 } 418 }
275 419
276 // EventDispatch interface 420 void _addError(Object error) {
277 421 // Error from addStream. Stop the addStream and complete its future with the
floitsch 2013/06/27 15:15:19 I don't think that's the right thing to do. Just p
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
278 void _addPendingEvent(_DelayedEvent event) { 422 // error.
279 if (_isCancelled) return; 423 assert(_isAddingStream);
280 _StreamImplEvents events = _pendingEvents; 424 _StreamControllerAddStreamState addState = _varData;
281 if (events == null) { 425 _varData = addState.varData;
282 events = _pendingEvents = new _StreamImplEvents(); 426 _state &= ~_STATE_ADDSTREAM;
283 } 427 addState.completeWithError(error);
284 events.add(event);
285 } 428 }
286 429
287 void _recordListen(_BufferingStreamSubscription<T> subscription) { 430 void _close() {
288 assert(_subscription == null); 431 // End of addStream stream.
289 _subscription = subscription; 432 assert(_isAddingStream);
290 subscription._setPendingEvents(_pendingEvents); 433 _StreamControllerAddStreamState addState = _varData;
291 _pendingEvents = null; 434 _varData = addState.varData;
435 _state &= ~_STATE_ADDSTREAM;
436 addState.complete();
437 }
438
439 // _StreamControllerLifeCycle interface
440
441 StreamSubscription<T> _subscribe(void onData(T data),
442 void onError(Object error),
443 void onDone(),
444 bool cancelOnError) {
445 if (!_isInitialState) {
446 throw new StateError("Stream has already been listened to.");
447 }
448 _ControllerSubscription subscription = new _ControllerSubscription(
449 this, onData, onError, onDone, cancelOnError);
450
451 _PendingEvents pendingEvents = _pendingEvents;
452 _state |= _STATE_SUBSCRIBED;
453 if (_isAddingStream) {
454 _StreamControllerAddStreamState addState = _varData;
455 addState.varData = subscription;
456 } else {
457 _varData = subscription;
458 }
459 subscription._setPendingEvents(pendingEvents);
292 subscription._guardCallback(() { 460 subscription._guardCallback(() {
293 _runGuarded(_onListen); 461 _runGuarded(_onListen);
294 }); 462 });
463
464 return subscription;
295 } 465 }
296 466
297 void _recordCancel(StreamSubscription<T> subscription) { 467 void _recordCancel(StreamSubscription<T> subscription) {
298 assert(identical(_subscription, subscription)); 468 if (_isAddingStream) {
299 _subscription = null; 469 _StreamControllerAddStreamState addState = _varData;
300 _state |= _STATE_CANCELLED; 470 addState.cancel();
471 }
472 _varData = null;
473 _state =
474 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
301 _runGuarded(_onCancel); 475 _runGuarded(_onCancel);
476 if (_doneFuture != null && _doneFuture._mayComplete) {
477 _doneFuture._asyncSetValue(null);
478 }
302 } 479 }
303 480
304 void _recordPause(StreamSubscription<T> subscription) { 481 void _recordPause(StreamSubscription<T> subscription) {
482 if (_isAddingStream) {
483 _StreamControllerAddStreamState addState = _varData;
484 addState.pause();
485 }
305 _runGuarded(_onPause); 486 _runGuarded(_onPause);
306 } 487 }
307 488
308 void _recordResume(StreamSubscription<T> subscription) { 489 void _recordResume(StreamSubscription<T> subscription) {
490 if (_isAddingStream) {
491 _StreamControllerAddStreamState addState = _varData;
492 addState.resume();
493 }
309 _runGuarded(_onResume); 494 _runGuarded(_onResume);
310 } 495 }
311 } 496 }
312 497
313 class _SyncStreamController<T> extends _StreamController<T> { 498 abstract class _SyncStreamControllerDispatch<T>
314 _SyncStreamController(void onListen(), 499 implements _StreamController<T> {
315 void onPause(),
316 void onResume(),
317 void onCancel())
318 : super(onListen, onPause, onResume, onCancel);
319
320 void _sendData(T data) { 500 void _sendData(T data) {
321 _subscription._add(data); 501 _subscription._add(data);
322 } 502 }
323 503
324 void _sendError(Object error) { 504 void _sendError(Object error) {
325 _subscription._addError(error); 505 _subscription._addError(error);
326 } 506 }
327 507
328 void _sendDone() { 508 void _sendDone() {
329 _subscription._close(); 509 _subscription._close();
330 } 510 }
331 } 511 }
332 512
333 class _AsyncStreamController<T> extends _StreamController<T> { 513 abstract class _AsyncStreamControllerDispatch<T>
334 _AsyncStreamController(void onListen(), 514 implements _StreamController<T> {
335 void onPause(),
336 void onResume(),
337 void onCancel())
338 : super(onListen, onPause, onResume, onCancel);
339
340 void _sendData(T data) { 515 void _sendData(T data) {
341 _subscription._addPending(new _DelayedData(data)); 516 _subscription._addPending(new _DelayedData(data));
342 } 517 }
343 518
344 void _sendError(Object error) { 519 void _sendError(Object error) {
345 _subscription._addPending(new _DelayedError(error)); 520 _subscription._addPending(new _DelayedError(error));
346 } 521 }
347 522
348 void _sendDone() { 523 void _sendDone() {
349 _subscription._addPending(const _DelayedDone()); 524 _subscription._addPending(const _DelayedDone());
350 } 525 }
351 } 526 }
352 527
528 // TODO(lrn): Use common superclass for callback-controllers when VM supports
529 // constructors in mixin superclasses.
530
531 class _AsyncStreamController<T> extends _StreamController<T>
532 with _AsyncStreamControllerDispatch<T> {
533 final _NotificationHandler _onListen;
534 final _NotificationHandler _onPause;
535 final _NotificationHandler _onResume;
536 final _NotificationHandler _onCancel;
537
538 _AsyncStreamController(void this._onListen(),
539 void this._onPause(),
540 void this._onResume(),
541 void this._onCancel());
542 }
543
544 class _SyncStreamController<T> extends _StreamController<T>
545 with _SyncStreamControllerDispatch<T> {
546 final _NotificationHandler _onListen;
547 final _NotificationHandler _onPause;
548 final _NotificationHandler _onResume;
549 final _NotificationHandler _onCancel;
550
551 _SyncStreamController(void this._onListen(),
552 void this._onPause(),
553 void this._onResume(),
554 void this._onCancel());
555 }
556
557 abstract class _NoCallbacks {
558 _NotificationHandler get _onListen => null;
559 _NotificationHandler get _onPause => null;
560 _NotificationHandler get _onResume => null;
561 _NotificationHandler get _onCancel => null;
562 }
563
564 typedef _NoCallbackAsyncStreamController<T> = _StreamController<T>
565 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
566
567 typedef _NoCallbackSyncStreamController<T> = _StreamController<T>
568 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
569
353 typedef void _NotificationHandler(); 570 typedef void _NotificationHandler();
354 571
355 void _runGuarded(_NotificationHandler notificationHandler) { 572 void _runGuarded(_NotificationHandler notificationHandler) {
356 if (notificationHandler == null) return; 573 if (notificationHandler == null) return;
357 try { 574 try {
358 notificationHandler(); 575 notificationHandler();
359 } catch (e, s) { 576 } catch (e, s) {
360 _Zone.current.handleUncaughtError(_asyncError(e, s)); 577 _Zone.current.handleUncaughtError(_asyncError(e, s));
361 } 578 }
362 } 579 }
363 580
364 class _ControllerStream<T> extends _StreamImpl<T> { 581 class _ControllerStream<T> extends _StreamImpl<T> {
365 _StreamControllerLifecycle<T> _controller; 582 _StreamControllerLifecycle<T> _controller;
366 bool _hasListener = false;
367 583
368 _ControllerStream(this._controller); 584 _ControllerStream(this._controller);
369 585
370 StreamSubscription<T> _createSubscription( 586 StreamSubscription<T> _createSubscription(
371 void onData(T data), 587 void onData(T data),
372 void onError(Object error), 588 void onError(Object error),
373 void onDone(), 589 void onDone(),
374 bool cancelOnError) { 590 bool cancelOnError) =>
375 if (_hasListener) { 591 _controller._subscribe(onData, onError, onDone, cancelOnError);
376 throw new StateError("The stream has already been listened to.");
377 }
378 _hasListener = true;
379 return new _ControllerSubscription<T>(
380 _controller, onData, onError, onDone, cancelOnError);
381 }
382 592
383 void _onListen(_BufferingStreamSubscription subscription) { 593 // Override == and hashCode so that new streams returned by the same
384 _controller._recordListen(subscription); 594 // controller are considered equal. The controller returns a new stream
595 // each time it's queried, but doesn't have to cache the result.
596
597 int get hashCode => _controller.hashCode ^ 0x35323532;
598
599 bool operator==(Object other) {
600 if (other is! _ControllerStream) return false;
601 _ControllerStream otherStream = other;
602 return identical(otherStream._controller, this);
385 } 603 }
386 } 604 }
387 605
388 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 606 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
389 final _StreamControllerLifecycle<T> _controller; 607 final _StreamControllerLifecycle<T> _controller;
390 608
391 _ControllerSubscription(this._controller, 609 _ControllerSubscription(this._controller,
392 void onData(T data), 610 void onData(T data),
393 void onError(Object error), 611 void onError(Object error),
394 void onDone(), 612 void onDone(),
395 bool cancelOnError) 613 bool cancelOnError)
396 : super(onData, onError, onDone, cancelOnError); 614 : super(onData, onError, onDone, cancelOnError);
397 615
398 void _onCancel() { 616 void _onCancel() {
399 _controller._recordCancel(this); 617 _controller._recordCancel(this);
400 } 618 }
401 619
402 void _onPause() { 620 void _onPause() {
403 _controller._recordPause(this); 621 _controller._recordPause(this);
404 } 622 }
405 623
406 void _onResume() { 624 void _onResume() {
407 _controller._recordResume(this); 625 _controller._recordResume(this);
408 } 626 }
409 } 627 }
410 628
411 class _BroadcastStream<T> extends _StreamImpl<T> {
412 _BroadcastStreamController _controller;
413 629
414 _BroadcastStream(this._controller); 630 /** A class that exposes only the [StreamSink] interface of an object. */
631 class _StreamSinkWrapper<T> implements StreamSink<T> {
632 StreamSink _target;
floitsch 2013/06/27 15:15:19 final.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
633 _StreamSinkWrapper(this._target);
634 void add(T data) { _target.add(data); }
635 void addError(Object error) { _target.addError(error); }
636 Future close() => _target.close();
637 Future addStream(Stream<T> source) => _target.addStream(source);
638 Future get done => _target.done;
639 }
415 640
416 bool get isBroadcast => true; 641 /**
642 * Object containing the state used to handle [StreamController.addStream].
643 */
644 class _AddStreamState<T> {
645 // [_FutureImpl] returned by call to addStream.
646 _FutureImpl addStreamFuture;
417 647
418 StreamSubscription<T> _createSubscription( 648 // Subscription on stream argument to addStream.
419 void onData(T data), 649 StreamSubscription addSubscription;
420 void onError(Object error), 650
421 void onDone(), 651 _AddStreamState(StreamSink controller, Stream source)
422 bool cancelOnError) { 652 : addStreamFuture = new _FutureImpl(),
423 return new _BroadcastSubscription<T>( 653 addSubscription = source.listen(controller._add,
424 _controller, onData, onError, onDone, cancelOnError); 654 onError: controller._addError,
655 onDone: controller._close,
656 cancelOnError: true);
657
658 void pause() {
659 addSubscription.pause();
425 } 660 }
426 661
427 void _onListen(_BufferingStreamSubscription subscription) { 662 void resume() {
428 _controller._recordListen(subscription); 663 addSubscription.resume();
664 }
665
666 void cancel() {
667 addSubscription.cancel();
668 complete();
669 }
670
671 void completeWithError(Object error) {
672 addStreamFuture._asyncSetError(error);
673 }
674
675 void complete() {
676 addStreamFuture._asyncSetValue(null);
429 } 677 }
430 } 678 }
431 679
432 abstract class _BroadcastSubscriptionLink { 680 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
433 _BroadcastSubscriptionLink _next; 681 // The subscription or pending data of a _StreamController.
434 _BroadcastSubscriptionLink _previous; 682 // Stored here because we reuse the `_varData` field in the _StreamController
435 } 683 // to store this state object.
684 var varData;
436 685
437 class _BroadcastSubscription<T> extends _ControllerSubscription<T> 686 _StreamControllerAddStreamState(_StreamController controller,
438 implements _BroadcastSubscriptionLink { 687 this.varData,
439 static const int _STATE_EVENT_ID = 1; 688 Stream source) : super(controller, source) {
440 static const int _STATE_FIRING = 2; 689 if (controller.isPaused) {
441 static const int _STATE_REMOVE_AFTER_FIRING = 4; 690 addSubscription.pause();
442 int _eventState;
443
444 _BroadcastSubscriptionLink _next;
445 _BroadcastSubscriptionLink _previous;
446
447 _BroadcastSubscription(_StreamControllerLifecycle controller,
448 void onData(T data),
449 void onError(Object error),
450 void onDone(),
451 bool cancelOnError)
452 : super(controller, onData, onError, onDone, cancelOnError) {
453 _next = _previous = this;
454 }
455
456 _BroadcastStreamController get _controller => super._controller;
457
458 bool _expectsEvent(int eventId) {
459 return (_eventState & _STATE_EVENT_ID) == eventId;
460 }
461
462 void _toggleEventId() {
463 _eventState ^= _STATE_EVENT_ID;
464 }
465
466 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
467
468 bool _setRemoveAfterFiring() {
469 assert(_isFiring);
470 _eventState |= _STATE_REMOVE_AFTER_FIRING;
471 }
472
473 bool get _removeAfterFiring =>
474 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
475 }
476
477
478 abstract class _BroadcastStreamController<T>
479 implements StreamController<T>,
480 _StreamControllerLifecycle<T>,
481 _BroadcastSubscriptionLink,
482 _EventDispatch<T> {
483 static const int _STATE_INITIAL = 0;
484 static const int _STATE_EVENT_ID = 1;
485 static const int _STATE_FIRING = 2;
486 static const int _STATE_CLOSED = 4;
487
488 final _NotificationHandler _onListen;
489 final _NotificationHandler _onCancel;
490
491 // State of the controller.
492 int _state;
493
494 // Double-linked list of active listeners.
495 _BroadcastSubscriptionLink _next;
496 _BroadcastSubscriptionLink _previous;
497
498 _BroadcastStreamController(this._onListen, this._onCancel)
499 : _state = _STATE_INITIAL {
500 _next = _previous = this;
501 }
502
503 // StreamController interface.
504
505 Stream<T> get stream => new _BroadcastStream<T>(this);
506
507 EventSink<T> get sink => new _EventSinkView<T>(this);
508
509 bool get isClosed => (_state & _STATE_CLOSED) != 0;
510
511 /**
512 * A broadcast controller is never paused.
513 *
514 * Each receiving stream may be paused individually, and they handle their
515 * own buffering.
516 */
517 bool get isPaused => false;
518
519 /** Whether there are currently a subscriber on the [Stream]. */
520 bool get hasListener => !_isEmpty;
521
522 /** Whether an event is being fired (sent to some, but not all, listeners). */
523 bool get _isFiring => (_state & _STATE_FIRING) != 0;
524
525 // Linked list helpers
526
527 bool get _isEmpty => identical(_next, this);
528
529 /** Adds subscription to linked list of active listeners. */
530 void _addListener(_BroadcastSubscription<T> subscription) {
531 _BroadcastSubscriptionLink previous = _previous;
532 previous._next = subscription;
533 _previous = subscription._previous;
534 subscription._previous._next = this;
535 subscription._previous = previous;
536 subscription._eventState = (_state & _STATE_EVENT_ID);
537 }
538
539 void _removeListener(_BroadcastSubscription<T> subscription) {
540 assert(identical(subscription._controller, this));
541 assert(!identical(subscription._next, subscription));
542 subscription._previous._next = subscription._next;
543 subscription._next._previous = subscription._previous;
544 subscription._next = subscription._previous = subscription;
545 }
546
547 // _StreamControllerLifecycle interface.
548
549 void _recordListen(_BroadcastSubscription<T> subscription) {
550 _addListener(subscription);
551 if (identical(_next, _previous)) {
552 // Only one listener, so it must be the first listener.
553 _runGuarded(_onListen);
554 }
555 }
556
557 void _recordCancel(_BroadcastSubscription<T> subscription) {
558 if (subscription._isFiring) {
559 subscription._setRemoveAfterFiring();
560 } else {
561 _removeListener(subscription);
562 // If we are currently firing an event, the empty-check is performed at
563 // the end of the listener loop instead of here.
564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
565 _callOnCancel();
566 }
567 }
568 }
569
570 void _recordPause(StreamSubscription<T> subscription) {}
571 void _recordResume(StreamSubscription<T> subscription) {}
572
573 // EventSink interface.
574
575 void add(T data) {
576 if (isClosed) {
577 throw new StateError("Cannot add new events after calling close()");
578 }
579 _sendData(data);
580 }
581
582 void addError(Object error, [Object stackTrace]) {
583 if (isClosed) {
584 throw new StateError("Cannot add new events after calling close()");
585 }
586 if (stackTrace != null) _attachStackTrace(error, stackTrace);
587 _sendError(error);
588 }
589
590 void close() {
591 if (isClosed) {
592 throw new StateError("Cannot add new events after calling close()");
593 }
594 _state |= _STATE_CLOSED;
595 _sendDone();
596 }
597
598 void _forEachListener(
599 void action(_BufferingStreamSubscription<T> subscription)) {
600 if (_isFiring) {
601 throw new StateError(
602 "Cannot fire new event. Controller is already firing an event");
603 }
604 if (_isEmpty) return;
605
606 // Get event id of this event.
607 int id = (_state & _STATE_EVENT_ID);
608 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
609 // callbacks while firing, and we prevent reentrancy of this function.
610 //
611 // Set [_state]'s event id to the next event's id.
612 // Any listeners added while firing this event will expect the next event,
613 // not this one, and won't get notified.
614 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
615 _BroadcastSubscriptionLink link = _next;
616 while (!identical(link, this)) {
617 _BroadcastSubscription<T> subscription = link;
618 if (subscription._expectsEvent(id)) {
619 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
620 action(subscription);
621 subscription._toggleEventId();
622 link = subscription._next;
623 if (subscription._removeAfterFiring) {
624 _removeListener(subscription);
625 }
626 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
627 } else {
628 link = subscription._next;
629 }
630 }
631 _state &= ~_STATE_FIRING;
632
633 if (_isEmpty) {
634 _callOnCancel();
635 }
636 }
637
638 void _callOnCancel() {
639 _runGuarded(_onCancel);
640 }
641 }
642
643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
644 _SyncBroadcastStreamController(void onListen(), void onCancel())
645 : super(onListen, onCancel);
646
647 // EventDispatch interface.
648
649 void _sendData(T data) {
650 if (_isEmpty) return;
651 _forEachListener((_BufferingStreamSubscription<T> subscription) {
652 subscription._add(data);
653 });
654 }
655
656 void _sendError(Object error) {
657 if (_isEmpty) return;
658 _forEachListener((_BufferingStreamSubscription<T> subscription) {
659 subscription._addError(error);
660 });
661 }
662
663 void _sendDone() {
664 if (_isEmpty) return;
665 _forEachListener((_BroadcastSubscription<T> subscription) {
666 subscription._close();
667 subscription._eventState |=
668 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
669 });
670 }
671 }
672
673 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
674 _AsyncBroadcastStreamController(void onListen(), void onCancel())
675 : super(onListen, onCancel);
676
677 // EventDispatch interface.
678
679 void _sendData(T data) {
680 for (_BroadcastSubscriptionLink link = _next;
681 !identical(link, this);
682 link = link._next) {
683 _BroadcastSubscription<T> subscription = link;
684 subscription._addPending(new _DelayedData(data));
685 }
686 }
687
688 void _sendError(Object error) {
689 for (_BroadcastSubscriptionLink link = _next;
690 !identical(link, this);
691 link = link._next) {
692 _BroadcastSubscription<T> subscription = link;
693 subscription._addPending(new _DelayedError(error));
694 }
695 }
696
697 void _sendDone() {
698 for (_BroadcastSubscriptionLink link = _next;
699 !identical(link, this);
700 link = link._next) {
701 _BroadcastSubscription<T> subscription = link;
702 subscription._addPending(const _DelayedDone());
703 } 691 }
704 } 692 }
705 } 693 }
706
707 /**
708 * Stream controller that is used by [Stream.asBroadcastStream].
709 *
710 * This stream controller allows incoming events while it is firing
711 * other events. This is handled by delaying the events until the
712 * current event is done firing, and then fire the pending events.
713 *
714 * This class extends [_SyncBroadcastStreamController]. Events of
715 * an "asBroadcastStream" stream are always initiated by events
716 * on another stream, and it is fine to forward them synchronously.
717 */
718 class _AsBroadcastStreamController<T>
719 extends _SyncBroadcastStreamController<T>
720 implements _EventDispatch<T> {
721 _StreamImplEvents _pending;
722
723 _AsBroadcastStreamController(void onListen(), void onCancel())
724 : super(onListen, onCancel);
725
726 bool get _hasPending => _pending != null && ! _pending.isEmpty;
727
728 void _addPendingEvent(_DelayedEvent event) {
729 if (_pending == null) {
730 _pending = new _StreamImplEvents();
731 }
732 _pending.add(event);
733 }
734
735 void add(T data) {
736 if (_isFiring) {
737 _addPendingEvent(new _DelayedData<T>(data));
738 return;
739 }
740 super.add(data);
741 while (_hasPending) {
742 _pending.handleNext(this);
743 }
744 }
745
746 void addError(Object error, [StackTrace stackTrace]) {
747 if (_isFiring) {
748 _addPendingEvent(new _DelayedError(error));
749 return;
750 }
751 super.addError(error, stackTrace);
752 while (_hasPending) {
753 _pending.handleNext(this);
754 }
755 }
756
757 void close() {
758 if (_isFiring) {
759 _addPendingEvent(const _DelayedDone());
760 _state |= _STATE_CLOSED;
761 return;
762 }
763 super.close();
764 assert(!_hasPending);
765 }
766
767 void _callOnCancel() {
768 if (_hasPending) {
769 _pending.clear();
770 _pending = null;
771 }
772 super._callOnCancel();
773 }
774 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698