Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(127)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review comments. Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 28 matching lines...) Expand all
39 * Whether to invoke a callback depends only on the state before and after 39 * Whether to invoke a callback depends only on the state before and after
40 * a stream action, for example firing an event. If the state changes multiple 40 * a stream action, for example firing an event. If the state changes multiple
41 * times during the action, and then ends up in the same state as before, no 41 * times during the action, and then ends up in the same state as before, no
42 * callback is performed. 42 * callback is performed.
43 * 43 *
44 * If listeners are added after the stream has completed (sent a "done" event), 44 * If listeners are added after the stream has completed (sent a "done" event),
45 * the listeners will be sent a "done" event eventually, but they won't affect 45 * the listeners will be sent a "done" event eventually, but they won't affect
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 abstract class StreamController<T> implements EventSink<T> { 49 abstract class StreamController<T> implements StreamSink<T> {
50 /** The stream that this controller is controlling. */ 50 /** The stream that this controller is controlling. */
51 Stream<T> get stream; 51 Stream<T> get stream;
52 52
53 /** 53 /**
54 * A controller with a [stream] that supports only one single subscriber. 54 * A controller with a [stream] that supports only one single subscriber.
55 * 55 *
56 * If [sync] is true, events may be passed directly to the stream's listener 56 * If [sync] is true, events may be passed directly to the stream's listener
57 * during an [add], [addError] or [close] call. If [sync] is false, the event 57 * during an [add], [addError] or [close] call. If [sync] is false, the event
58 * will be passed to the listener at a later time, after the code creating 58 * will be passed to the listener at a later time, after the code creating
59 * the event has returned. 59 * the event has returned.
60 * 60 *
61 * The controller will buffer all incoming events until the subscriber is 61 * The controller will buffer all incoming events until the subscriber is
62 * registered. 62 * registered.
63 * 63 *
64 * The [onPause] function is called when the stream becomes 64 * The [onPause] function is called when the stream becomes
65 * paused. [onResume] is called when the stream resumed. 65 * paused. [onResume] is called when the stream resumed.
66 * 66 *
67 * The [onListen] callback is called when the stream 67 * The [onListen] callback is called when the stream
68 * receives its listener and [onCancel] when the listener ends 68 * receives its listener and [onCancel] when the listener ends
69 * its subscription. 69 * its subscription.
70 * 70 *
71 * If the stream is canceled before the controller needs new data the 71 * If the stream is canceled before the controller needs new data the
72 * [onResume] call might not be executed. 72 * [onResume] call might not be executed.
73 */ 73 */
74 factory StreamController({void onListen(), 74 factory StreamController({void onListen(),
75 void onPause(), 75 void onPause(),
76 void onResume(), 76 void onResume(),
77 void onCancel(), 77 void onCancel(),
78 bool sync: false}) 78 bool sync: false}) {
79 => sync 79 if (onListen == null && onPause == null &&
80 onResume == null && onCancel == null) {
81 return sync
82 ? new _NoCallbackSyncStreamController<T>()
83 : new _NoCallbackAsyncStreamController<T>();
84 }
85 return sync
80 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) 86 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
81 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); 87 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
88 }
82 89
83 /** 90 /**
84 * A controller where [stream] can be listened to more than once. 91 * A controller where [stream] can be listened to more than once.
85 * 92 *
86 * The [Stream] returned by [stream] is a broadcast stream. It can be listened 93 * The [Stream] returned by [stream] is a broadcast stream. It can be listened
87 * to more than once. 94 * to more than once.
88 * 95 *
89 * The controller distributes any events to all currently subscribed 96 * The controller distributes any events to all currently subscribed
90 * listeners. 97 * listeners.
91 * It is not allowed to call [add], [addError], or [close] before a previous 98 * It is not allowed to call [add], [addError], or [close] before a previous
(...skipping 24 matching lines...) Expand all
116 */ 123 */
117 factory StreamController.broadcast({void onListen(), 124 factory StreamController.broadcast({void onListen(),
118 void onCancel(), 125 void onCancel(),
119 bool sync: false}) { 126 bool sync: false}) {
120 return sync 127 return sync
121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 128 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel); 129 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
123 } 130 }
124 131
125 /** 132 /**
126 * Returns a view of this object that only exposes the [EventSink] interface. 133 * Returns a view of this object that only exposes the [StreamSink] interface.
127 */ 134 */
128 EventSink<T> get sink; 135 StreamSink<T> get sink;
129 136
130 /** 137 /**
131 * Whether the stream is closed for adding more events. 138 * Whether the stream is closed for adding more events.
132 * 139 *
133 * If true, the "done" event might not have fired yet, but it has been 140 * If true, the "done" event might not have fired yet, but it has been
134 * scheduled, and it is too late to add more events. 141 * scheduled, and it is too late to add more events.
135 */ 142 */
136 bool get isClosed; 143 bool get isClosed;
137 144
138 /** 145 /**
(...skipping 16 matching lines...) Expand all
155 * Send or enqueue an error event. 162 * Send or enqueue an error event.
156 * 163 *
157 * Also allows an objection stack trace object, on top of what [EventSink] 164 * Also allows an objection stack trace object, on top of what [EventSink]
158 * allows. 165 * allows.
159 */ 166 */
160 void addError(Object error, [Object stackTrace]); 167 void addError(Object error, [Object stackTrace]);
161 } 168 }
162 169
163 170
164 abstract class _StreamControllerLifecycle<T> { 171 abstract class _StreamControllerLifecycle<T> {
165 void _recordListen(StreamSubscription<T> subscription) {} 172 StreamSubscription<T> _subscribe(void onData(T data),
173 void onError(Object error),
174 void onDone(),
175 bool cancelOnError);
166 void _recordPause(StreamSubscription<T> subscription) {} 176 void _recordPause(StreamSubscription<T> subscription) {}
167 void _recordResume(StreamSubscription<T> subscription) {} 177 void _recordResume(StreamSubscription<T> subscription) {}
168 void _recordCancel(StreamSubscription<T> subscription) {} 178 void _recordCancel(StreamSubscription<T> subscription) {}
169 } 179 }
170 180
171 /** 181 /**
172 * Default implementation of [StreamController]. 182 * Default implementation of [StreamController].
173 * 183 *
174 * Controls a stream that only supports a single controller. 184 * Controls a stream that only supports a single controller.
175 */ 185 */
176 abstract class _StreamController<T> implements StreamController<T>, 186 abstract class _StreamController<T> implements StreamController<T>,
177 _StreamControllerLifecycle<T>, 187 _StreamControllerLifecycle<T>,
188 _EventSink<T>,
178 _EventDispatch<T> { 189 _EventDispatch<T> {
179 static const int _STATE_OPEN = 0; 190 // The states are bit-flags. More than one can be set at a time.
180 static const int _STATE_CANCELLED = 1; 191 //
181 static const int _STATE_CLOSED = 2; 192 // The "subscription state" goes through the states:
182 193 // initial -> subscribed -> canceled.
183 final _NotificationHandler _onListen; 194 // These are mutually exclusive.
184 final _NotificationHandler _onPause; 195 // The "closed" state records whether the [close] method has been called
185 final _NotificationHandler _onResume; 196 // on the controller. This can be done at any time. If done before
186 final _NotificationHandler _onCancel; 197 // subscription, the done event is queued. If done after cancel, the done
187 _StreamImpl<T> _stream; 198 // event is ignored (just as any other event after a cancel).
188 199
189 // An active subscription on the stream, or null if no subscripton is active. 200 /** The controller is in its initial state with no subscription. */
190 _ControllerSubscription<T> _subscription; 201 static const int _STATE_INITIAL = 0;
191 202 /** The controller has a subscription, but hasn't been closed or canceled. */
192 // Whether we have sent a "done" event. 203 static const int _STATE_SUBSCRIBED = 1;
193 int _state = _STATE_OPEN; 204 /** The subscription is canceled. */
194 205 static const int _STATE_CANCELED = 2;
195 // Events added to the stream before it has an active subscription. 206 /** Mask for the subscription state. */
196 _PendingEvents _pendingEvents = null; 207 static const int _STATE_SUBSCRIPTION_MASK = 3;
197 208
198 _StreamController(this._onListen, 209 // The following state relate to the controller, not the subscription.
199 this._onPause, 210 // If closed, adding more events is not allowed.
200 this._onResume, 211 // If executing an [addStream], new events are not allowed either, but will
201 this._onCancel) { 212 // be added by the stream.
202 _stream = new _ControllerStream<T>(this); 213
203 } 214 /**
204 215 * The controller is closed due to calling [close].
205 Stream<T> get stream => _stream; 216 *
206 217 * When the stream is closed, you can neither add new events nor add new
207 /** 218 * listeners.
208 * Returns a view of this object that only exposes the [EventSink] interface. 219 */
209 */ 220 static const int _STATE_CLOSED = 4;
210 EventSink<T> get sink => new _EventSinkView<T>(this); 221 /**
211 222 * The controller is in the middle of an [addStream] operation.
212 /** 223 *
213 * Whether a listener has existed and been cancelled. 224 * While adding events from a stream, no new events can be added directly
225 * on the controller.
226 */
227 static const int _STATE_ADDSTREAM = 8;
228
229 /**
230 * Field containing different data depending on the current subscription
231 * state.
232 *
233 * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents]
234 * for events added to the controller before a subscription.
235 *
236 * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription.
237 *
238 * When [_state] is [_STATE_CANCELED] the field is currently not used.
239 */
240 var _varData;
241
242 /** Current state of the controller. */
243 int _state = _STATE_INITIAL;
244
245 /**
246 * Future completed when the stream sends its last event.
247 *
248 * This is also the future returned by [close].
249 */
250 // TODO(lrn): Could this be stored in the varData field too, if it's not
251 // accessed until the call to "close"? Then we need to special case if it's
252 // accessed earlier, or if close is called before subscribing.
253 _FutureImpl _doneFuture;
254
255 _StreamController();
256
257 _NotificationHandler get _onListen;
258 _NotificationHandler get _onPause;
259 _NotificationHandler get _onResume;
260 _NotificationHandler get _onCancel;
261
262 // Return a new stream every time. The streams are equal, but not identical.
263 Stream<T> get stream => new _ControllerStream(this);
264
265 /**
266 * Returns a view of this object that only exposes the [StreamSink] interface.
267 */
268 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
269
270 /**
271 * Whether a listener has existed and been canceled.
214 * 272 *
215 * After this, adding more events will be ignored. 273 * After this, adding more events will be ignored.
216 */ 274 */
217 bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; 275 bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
276
277 /** Whether there is an active listener. */
278 bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
279
280 /** Whether there has not been a listener yet. */
281 bool get _isInitialState =>
282 (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
218 283
219 bool get isClosed => (_state & _STATE_CLOSED) != 0; 284 bool get isClosed => (_state & _STATE_CLOSED) != 0;
220 285
221 bool get isPaused => hasListener ? _subscription._isInputPaused 286 bool get isPaused => hasListener ? _subscription._isInputPaused
222 : !_isCancelled; 287 : !_isCanceled;
223 288
224 bool get hasListener => _subscription != null; 289 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
225 290
226 /** 291 /** New events may not be added after close, or during addStream. */
227 * Send or queue a data event. 292 bool get _mayAddEvent => (_state < _STATE_CLOSED);
293
294 // Returns the pending events.
295 // Pending events are events added before a subscription exists.
296 // They are added to the subscription when it is created.
297 // Pending events, if any, are kept in the _varData field until the
298 // stream is listened to.
299 // While adding a stream, pending events are moved into the
300 // state object to allow the state object to use the _varData field.
301 _PendingEvents get _pendingEvents {
302 assert(_isInitialState);
303 if (!_isAddingStream) {
304 return _varData;
305 }
306 _StreamControllerAddStreamState state = _varData;
307 return state.varData;
308 }
309
310 // Returns the pending events, and creates the object if necessary.
311 _StreamImplEvents _ensurePendingEvents() {
312 assert(_isInitialState);
313 if (!_isAddingStream) {
314 if (_varData == null) _varData = new _StreamImplEvents();
315 return _varData;
316 }
317 _StreamControllerAddStreamState state = _varData;
318 if (state.varData == null) state.varData = new _StreamImplEvents();
319 return state.varData;
320 }
321
322 // Get the current subscription.
323 // If we are adding a stream, the subscription is moved into the state
324 // object to allow the state object to use the _varData field.
325 _ControllerSubscription get _subscription {
326 assert(hasListener);
327 if (_isAddingStream) {
328 _StreamControllerAddStreamState addState = _varData;
329 return addState.varData;
330 }
331 return _varData;
332 }
333
334 /**
335 * Creates an error describing why an event cannot be added.
336 *
337 * The reason, and therefore the error message, depends on the current state.
338 */
339 Error _badEventState() {
340 if (isClosed) {
341 return new StateError("Cannot add event after closing");
342 }
343 assert(_isAddingStream);
344 return new StateError("Cannot add event while adding a stream");
345 }
346
347 // StreamSink interface.
348 Future addStream(Stream<T> source) {
349 if (!_mayAddEvent) throw _badEventState();
350 if (_isCanceled) return new _FutureImpl.immediate(null);
351 _StreamControllerAddStreamState addState =
352 new _StreamControllerAddStreamState(this, _varData, source);
353 _varData = addState;
354 _state |= _STATE_ADDSTREAM;
355 return addState.addStreamFuture;
356 }
357
358 Future get done => _ensureDoneFuture();
359
360 Future _ensureDoneFuture() {
361 if (_doneFuture == null) {
362 _doneFuture = new _FutureImpl();
363 if (_isCanceled) _doneFuture._setValue(null);
364 }
365 return _doneFuture;
366 }
367
368 /**
369 * Send or enqueue a data event.
228 */ 370 */
229 void add(T value) { 371 void add(T value) {
230 if (isClosed) throw new StateError("Adding event after close"); 372 if (!_mayAddEvent) throw _badEventState();
231 if (_subscription != null) { 373 _add(value);
232 _sendData(value);
233 } else if (!_isCancelled) {
234 _addPendingEvent(new _DelayedData<T>(value));
235 }
236 } 374 }
237 375
238 /** 376 /**
239 * Send or enqueue an error event. 377 * Send or enqueue an error event.
240 */ 378 */
241 void addError(Object error, [Object stackTrace]) { 379 void addError(Object error, [Object stackTrace]) {
242 if (isClosed) throw new StateError("Adding event after close"); 380 if (!_mayAddEvent) throw _badEventState();
243 if (stackTrace != null) { 381 if (stackTrace != null) {
244 // Force stack trace overwrite. Even if the error already contained 382 // Force stack trace overwrite. Even if the error already contained
245 // a stack trace. 383 // a stack trace.
246 _attachStackTrace(error, stackTrace); 384 _attachStackTrace(error, stackTrace);
247 } 385 }
248 if (_subscription != null) { 386 _addError(error);
249 _sendError(error); 387 }
250 } else if (!_isCancelled) { 388
251 _addPendingEvent(new _DelayedError(error)); 389 /**
252 }
253 }
254
255 /**
256 * Closes this controller. 390 * Closes this controller.
257 * 391 *
258 * After closing, no further events may be added using [add] or [addError]. 392 * After closing, no further events may be added using [add] or [addError].
259 * 393 *
260 * You are allowed to close the controller more than once, but only the first 394 * You are allowed to close the controller more than once, but only the first
261 * call has any effect. 395 * call has any effect.
262 * 396 *
263 * The first time a controller is closed, a "done" event is sent to its 397 * The first time a controller is closed, a "done" event is sent to its
264 * stream. 398 * stream.
265 */ 399 */
266 void close() { 400 Future close() {
267 if (isClosed) return; 401 if (isClosed) {
402 assert(_doneFuture != null); // Was set when close was first called.
403 return _doneFuture;
404 }
405 if (!_mayAddEvent) throw _badEventState();
268 _state |= _STATE_CLOSED; 406 _state |= _STATE_CLOSED;
269 if (_subscription != null) { 407 _ensureDoneFuture();
408 if (hasListener) {
270 _sendDone(); 409 _sendDone();
271 } else if (!_isCancelled) { 410 } else if (_isInitialState) {
272 _addPendingEvent(const _DelayedDone()); 411 _ensurePendingEvents().add(const _DelayedDone());
412 }
413 return _doneFuture;
414 }
415
416 // EventSink interface. Used by the [addStream] events.
417
418 // Add data event, used both by the [addStream] events and by [add].
419 void _add(T value) {
420 if (hasListener) {
421 _sendData(value);
422 } else if (_isInitialState) {
423 _ensurePendingEvents().add(new _DelayedData<T>(value));
273 } 424 }
274 } 425 }
275 426
276 // EventDispatch interface 427 void _addError(Object error) {
277 428 if (hasListener) {
278 void _addPendingEvent(_DelayedEvent event) { 429 _sendError(error);
279 if (_isCancelled) return; 430 } else if (_isInitialState) {
280 _StreamImplEvents events = _pendingEvents; 431 _ensurePendingEvents().add(new _DelayedError(error));
281 if (events == null) {
282 events = _pendingEvents = new _StreamImplEvents();
283 } 432 }
284 events.add(event);
285 } 433 }
286 434
287 void _recordListen(_BufferingStreamSubscription<T> subscription) { 435 void _close() {
288 assert(_subscription == null); 436 // End of addStream stream.
289 _subscription = subscription; 437 assert(_isAddingStream);
290 subscription._setPendingEvents(_pendingEvents); 438 _StreamControllerAddStreamState addState = _varData;
291 _pendingEvents = null; 439 _varData = addState.varData;
440 _state &= ~_STATE_ADDSTREAM;
441 addState.complete();
442 }
443
444 // _StreamControllerLifeCycle interface
445
446 StreamSubscription<T> _subscribe(void onData(T data),
447 void onError(Object error),
448 void onDone(),
449 bool cancelOnError) {
450 if (!_isInitialState) {
451 throw new StateError("Stream has already been listened to.");
452 }
453 _ControllerSubscription subscription = new _ControllerSubscription(
454 this, onData, onError, onDone, cancelOnError);
455
456 _PendingEvents pendingEvents = _pendingEvents;
457 _state |= _STATE_SUBSCRIBED;
458 if (_isAddingStream) {
459 _StreamControllerAddStreamState addState = _varData;
460 addState.varData = subscription;
461 } else {
462 _varData = subscription;
463 }
464 subscription._setPendingEvents(pendingEvents);
292 subscription._guardCallback(() { 465 subscription._guardCallback(() {
293 _runGuarded(_onListen); 466 _runGuarded(_onListen);
294 }); 467 });
468
469 return subscription;
295 } 470 }
296 471
297 void _recordCancel(StreamSubscription<T> subscription) { 472 void _recordCancel(StreamSubscription<T> subscription) {
298 assert(identical(_subscription, subscription)); 473 if (_isAddingStream) {
299 _subscription = null; 474 _StreamControllerAddStreamState addState = _varData;
300 _state |= _STATE_CANCELLED; 475 addState.cancel();
476 }
477 _varData = null;
478 _state =
479 (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
301 _runGuarded(_onCancel); 480 _runGuarded(_onCancel);
481 if (_doneFuture != null && _doneFuture._mayComplete) {
482 _doneFuture._asyncSetValue(null);
483 }
302 } 484 }
303 485
304 void _recordPause(StreamSubscription<T> subscription) { 486 void _recordPause(StreamSubscription<T> subscription) {
487 if (_isAddingStream) {
488 _StreamControllerAddStreamState addState = _varData;
489 addState.pause();
490 }
305 _runGuarded(_onPause); 491 _runGuarded(_onPause);
306 } 492 }
307 493
308 void _recordResume(StreamSubscription<T> subscription) { 494 void _recordResume(StreamSubscription<T> subscription) {
495 if (_isAddingStream) {
496 _StreamControllerAddStreamState addState = _varData;
497 addState.resume();
498 }
309 _runGuarded(_onResume); 499 _runGuarded(_onResume);
310 } 500 }
311 } 501 }
312 502
313 class _SyncStreamController<T> extends _StreamController<T> { 503 abstract class _SyncStreamControllerDispatch<T>
314 _SyncStreamController(void onListen(), 504 implements _StreamController<T> {
315 void onPause(),
316 void onResume(),
317 void onCancel())
318 : super(onListen, onPause, onResume, onCancel);
319
320 void _sendData(T data) { 505 void _sendData(T data) {
321 _subscription._add(data); 506 _subscription._add(data);
322 } 507 }
323 508
324 void _sendError(Object error) { 509 void _sendError(Object error) {
325 _subscription._addError(error); 510 _subscription._addError(error);
326 } 511 }
327 512
328 void _sendDone() { 513 void _sendDone() {
329 _subscription._close(); 514 _subscription._close();
330 } 515 }
331 } 516 }
332 517
333 class _AsyncStreamController<T> extends _StreamController<T> { 518 abstract class _AsyncStreamControllerDispatch<T>
334 _AsyncStreamController(void onListen(), 519 implements _StreamController<T> {
335 void onPause(),
336 void onResume(),
337 void onCancel())
338 : super(onListen, onPause, onResume, onCancel);
339
340 void _sendData(T data) { 520 void _sendData(T data) {
341 _subscription._addPending(new _DelayedData(data)); 521 _subscription._addPending(new _DelayedData(data));
342 } 522 }
343 523
344 void _sendError(Object error) { 524 void _sendError(Object error) {
345 _subscription._addPending(new _DelayedError(error)); 525 _subscription._addPending(new _DelayedError(error));
346 } 526 }
347 527
348 void _sendDone() { 528 void _sendDone() {
349 _subscription._addPending(const _DelayedDone()); 529 _subscription._addPending(const _DelayedDone());
350 } 530 }
351 } 531 }
352 532
533 // TODO(lrn): Use common superclass for callback-controllers when VM supports
534 // constructors in mixin superclasses.
535
536 class _AsyncStreamController<T> extends _StreamController<T>
537 with _AsyncStreamControllerDispatch<T> {
538 final _NotificationHandler _onListen;
539 final _NotificationHandler _onPause;
540 final _NotificationHandler _onResume;
541 final _NotificationHandler _onCancel;
542
543 _AsyncStreamController(void this._onListen(),
544 void this._onPause(),
545 void this._onResume(),
546 void this._onCancel());
547 }
548
549 class _SyncStreamController<T> extends _StreamController<T>
550 with _SyncStreamControllerDispatch<T> {
551 final _NotificationHandler _onListen;
552 final _NotificationHandler _onPause;
553 final _NotificationHandler _onResume;
554 final _NotificationHandler _onCancel;
555
556 _SyncStreamController(void this._onListen(),
557 void this._onPause(),
558 void this._onResume(),
559 void this._onCancel());
560 }
561
562 abstract class _NoCallbacks {
563 _NotificationHandler get _onListen => null;
564 _NotificationHandler get _onPause => null;
565 _NotificationHandler get _onResume => null;
566 _NotificationHandler get _onCancel => null;
567 }
568
569 typedef _NoCallbackAsyncStreamController<T> = _StreamController<T>
570 with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
571
572 typedef _NoCallbackSyncStreamController<T> = _StreamController<T>
573 with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
574
353 typedef void _NotificationHandler(); 575 typedef void _NotificationHandler();
354 576
355 void _runGuarded(_NotificationHandler notificationHandler) { 577 void _runGuarded(_NotificationHandler notificationHandler) {
356 if (notificationHandler == null) return; 578 if (notificationHandler == null) return;
357 try { 579 try {
358 notificationHandler(); 580 notificationHandler();
359 } catch (e, s) { 581 } catch (e, s) {
360 _Zone.current.handleUncaughtError(_asyncError(e, s)); 582 _Zone.current.handleUncaughtError(_asyncError(e, s));
361 } 583 }
362 } 584 }
363 585
364 class _ControllerStream<T> extends _StreamImpl<T> { 586 class _ControllerStream<T> extends _StreamImpl<T> {
365 _StreamControllerLifecycle<T> _controller; 587 _StreamControllerLifecycle<T> _controller;
366 bool _hasListener = false;
367 588
368 _ControllerStream(this._controller); 589 _ControllerStream(this._controller);
369 590
370 StreamSubscription<T> _createSubscription( 591 StreamSubscription<T> _createSubscription(
371 void onData(T data), 592 void onData(T data),
372 void onError(Object error), 593 void onError(Object error),
373 void onDone(), 594 void onDone(),
374 bool cancelOnError) { 595 bool cancelOnError) =>
375 if (_hasListener) { 596 _controller._subscribe(onData, onError, onDone, cancelOnError);
376 throw new StateError("The stream has already been listened to.");
377 }
378 _hasListener = true;
379 return new _ControllerSubscription<T>(
380 _controller, onData, onError, onDone, cancelOnError);
381 }
382 597
383 void _onListen(_BufferingStreamSubscription subscription) { 598 // Override == and hashCode so that new streams returned by the same
384 _controller._recordListen(subscription); 599 // controller are considered equal. The controller returns a new stream
600 // each time it's queried, but doesn't have to cache the result.
601
602 int get hashCode => _controller.hashCode ^ 0x35323532;
603
604 bool operator==(Object other) {
605 if (other is! _ControllerStream) return false;
606 _ControllerStream otherStream = other;
607 return identical(otherStream._controller, this);
385 } 608 }
386 } 609 }
387 610
388 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { 611 class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
389 final _StreamControllerLifecycle<T> _controller; 612 final _StreamControllerLifecycle<T> _controller;
390 613
391 _ControllerSubscription(this._controller, 614 _ControllerSubscription(this._controller,
392 void onData(T data), 615 void onData(T data),
393 void onError(Object error), 616 void onError(Object error),
394 void onDone(), 617 void onDone(),
395 bool cancelOnError) 618 bool cancelOnError)
396 : super(onData, onError, onDone, cancelOnError); 619 : super(onData, onError, onDone, cancelOnError);
397 620
398 void _onCancel() { 621 void _onCancel() {
399 _controller._recordCancel(this); 622 _controller._recordCancel(this);
400 } 623 }
401 624
402 void _onPause() { 625 void _onPause() {
403 _controller._recordPause(this); 626 _controller._recordPause(this);
404 } 627 }
405 628
406 void _onResume() { 629 void _onResume() {
407 _controller._recordResume(this); 630 _controller._recordResume(this);
408 } 631 }
409 } 632 }
410 633
411 class _BroadcastStream<T> extends _StreamImpl<T> {
412 _BroadcastStreamController _controller;
413 634
414 _BroadcastStream(this._controller); 635 /** A class that exposes only the [StreamSink] interface of an object. */
636 class _StreamSinkWrapper<T> implements StreamSink<T> {
637 final StreamSink _target;
638 _StreamSinkWrapper(this._target);
639 void add(T data) { _target.add(data); }
640 void addError(Object error) { _target.addError(error); }
641 Future close() => _target.close();
642 Future addStream(Stream<T> source) => _target.addStream(source);
643 Future get done => _target.done;
644 }
415 645
416 bool get isBroadcast => true; 646 /**
647 * Object containing the state used to handle [StreamController.addStream].
648 */
649 class _AddStreamState<T> {
650 // [_FutureImpl] returned by call to addStream.
651 _FutureImpl addStreamFuture;
417 652
418 StreamSubscription<T> _createSubscription( 653 // Subscription on stream argument to addStream.
419 void onData(T data), 654 StreamSubscription addSubscription;
420 void onError(Object error), 655
421 void onDone(), 656 _AddStreamState(StreamSink controller, Stream source)
422 bool cancelOnError) { 657 : addStreamFuture = new _FutureImpl(),
423 return new _BroadcastSubscription<T>( 658 addSubscription = source.listen(controller._add,
424 _controller, onData, onError, onDone, cancelOnError); 659 onError: controller._addError,
660 onDone: controller._close,
661 cancelOnError: true);
662
663 void pause() {
664 addSubscription.pause();
425 } 665 }
426 666
427 void _onListen(_BufferingStreamSubscription subscription) { 667 void resume() {
428 _controller._recordListen(subscription); 668 addSubscription.resume();
669 }
670
671 void cancel() {
672 addSubscription.cancel();
673 complete();
674 }
675
676 void complete() {
677 addStreamFuture._asyncSetValue(null);
429 } 678 }
430 } 679 }
431 680
432 abstract class _BroadcastSubscriptionLink { 681 class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
433 _BroadcastSubscriptionLink _next; 682 // The subscription or pending data of a _StreamController.
434 _BroadcastSubscriptionLink _previous; 683 // Stored here because we reuse the `_varData` field in the _StreamController
435 } 684 // to store this state object.
685 var varData;
436 686
437 class _BroadcastSubscription<T> extends _ControllerSubscription<T> 687 _StreamControllerAddStreamState(_StreamController controller,
438 implements _BroadcastSubscriptionLink { 688 this.varData,
439 static const int _STATE_EVENT_ID = 1; 689 Stream source) : super(controller, source) {
440 static const int _STATE_FIRING = 2; 690 if (controller.isPaused) {
441 static const int _STATE_REMOVE_AFTER_FIRING = 4; 691 addSubscription.pause();
442 int _eventState;
443
444 _BroadcastSubscriptionLink _next;
445 _BroadcastSubscriptionLink _previous;
446
447 _BroadcastSubscription(_StreamControllerLifecycle controller,
448 void onData(T data),
449 void onError(Object error),
450 void onDone(),
451 bool cancelOnError)
452 : super(controller, onData, onError, onDone, cancelOnError) {
453 _next = _previous = this;
454 }
455
456 _BroadcastStreamController get _controller => super._controller;
457
458 bool _expectsEvent(int eventId) {
459 return (_eventState & _STATE_EVENT_ID) == eventId;
460 }
461
462 void _toggleEventId() {
463 _eventState ^= _STATE_EVENT_ID;
464 }
465
466 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
467
468 bool _setRemoveAfterFiring() {
469 assert(_isFiring);
470 _eventState |= _STATE_REMOVE_AFTER_FIRING;
471 }
472
473 bool get _removeAfterFiring =>
474 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
475 }
476
477
478 abstract class _BroadcastStreamController<T>
479 implements StreamController<T>,
480 _StreamControllerLifecycle<T>,
481 _BroadcastSubscriptionLink,
482 _EventDispatch<T> {
483 static const int _STATE_INITIAL = 0;
484 static const int _STATE_EVENT_ID = 1;
485 static const int _STATE_FIRING = 2;
486 static const int _STATE_CLOSED = 4;
487
488 final _NotificationHandler _onListen;
489 final _NotificationHandler _onCancel;
490
491 // State of the controller.
492 int _state;
493
494 // Double-linked list of active listeners.
495 _BroadcastSubscriptionLink _next;
496 _BroadcastSubscriptionLink _previous;
497
498 _BroadcastStreamController(this._onListen, this._onCancel)
499 : _state = _STATE_INITIAL {
500 _next = _previous = this;
501 }
502
503 // StreamController interface.
504
505 Stream<T> get stream => new _BroadcastStream<T>(this);
506
507 EventSink<T> get sink => new _EventSinkView<T>(this);
508
509 bool get isClosed => (_state & _STATE_CLOSED) != 0;
510
511 /**
512 * A broadcast controller is never paused.
513 *
514 * Each receiving stream may be paused individually, and they handle their
515 * own buffering.
516 */
517 bool get isPaused => false;
518
519 /** Whether there are currently a subscriber on the [Stream]. */
520 bool get hasListener => !_isEmpty;
521
522 /** Whether an event is being fired (sent to some, but not all, listeners). */
523 bool get _isFiring => (_state & _STATE_FIRING) != 0;
524
525 // Linked list helpers
526
527 bool get _isEmpty => identical(_next, this);
528
529 /** Adds subscription to linked list of active listeners. */
530 void _addListener(_BroadcastSubscription<T> subscription) {
531 _BroadcastSubscriptionLink previous = _previous;
532 previous._next = subscription;
533 _previous = subscription._previous;
534 subscription._previous._next = this;
535 subscription._previous = previous;
536 subscription._eventState = (_state & _STATE_EVENT_ID);
537 }
538
539 void _removeListener(_BroadcastSubscription<T> subscription) {
540 assert(identical(subscription._controller, this));
541 assert(!identical(subscription._next, subscription));
542 subscription._previous._next = subscription._next;
543 subscription._next._previous = subscription._previous;
544 subscription._next = subscription._previous = subscription;
545 }
546
547 // _StreamControllerLifecycle interface.
548
549 void _recordListen(_BroadcastSubscription<T> subscription) {
550 _addListener(subscription);
551 if (identical(_next, _previous)) {
552 // Only one listener, so it must be the first listener.
553 _runGuarded(_onListen);
554 }
555 }
556
557 void _recordCancel(_BroadcastSubscription<T> subscription) {
558 if (subscription._isFiring) {
559 subscription._setRemoveAfterFiring();
560 } else {
561 _removeListener(subscription);
562 // If we are currently firing an event, the empty-check is performed at
563 // the end of the listener loop instead of here.
564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
565 _callOnCancel();
566 }
567 }
568 }
569
570 void _recordPause(StreamSubscription<T> subscription) {}
571 void _recordResume(StreamSubscription<T> subscription) {}
572
573 // EventSink interface.
574
575 void add(T data) {
576 if (isClosed) {
577 throw new StateError("Cannot add new events after calling close()");
578 }
579 _sendData(data);
580 }
581
582 void addError(Object error, [Object stackTrace]) {
583 if (isClosed) {
584 throw new StateError("Cannot add new events after calling close()");
585 }
586 if (stackTrace != null) _attachStackTrace(error, stackTrace);
587 _sendError(error);
588 }
589
590 void close() {
591 if (isClosed) {
592 throw new StateError("Cannot add new events after calling close()");
593 }
594 _state |= _STATE_CLOSED;
595 _sendDone();
596 }
597
598 void _forEachListener(
599 void action(_BufferingStreamSubscription<T> subscription)) {
600 if (_isFiring) {
601 throw new StateError(
602 "Cannot fire new event. Controller is already firing an event");
603 }
604 if (_isEmpty) return;
605
606 // Get event id of this event.
607 int id = (_state & _STATE_EVENT_ID);
608 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
609 // callbacks while firing, and we prevent reentrancy of this function.
610 //
611 // Set [_state]'s event id to the next event's id.
612 // Any listeners added while firing this event will expect the next event,
613 // not this one, and won't get notified.
614 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
615 _BroadcastSubscriptionLink link = _next;
616 while (!identical(link, this)) {
617 _BroadcastSubscription<T> subscription = link;
618 if (subscription._expectsEvent(id)) {
619 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
620 action(subscription);
621 subscription._toggleEventId();
622 link = subscription._next;
623 if (subscription._removeAfterFiring) {
624 _removeListener(subscription);
625 }
626 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
627 } else {
628 link = subscription._next;
629 }
630 }
631 _state &= ~_STATE_FIRING;
632
633 if (_isEmpty) {
634 _callOnCancel();
635 }
636 }
637
638 void _callOnCancel() {
639 _runGuarded(_onCancel);
640 }
641 }
642
643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
644 _SyncBroadcastStreamController(void onListen(), void onCancel())
645 : super(onListen, onCancel);
646
647 // EventDispatch interface.
648
649 void _sendData(T data) {
650 if (_isEmpty) return;
651 _forEachListener((_BufferingStreamSubscription<T> subscription) {
652 subscription._add(data);
653 });
654 }
655
656 void _sendError(Object error) {
657 if (_isEmpty) return;
658 _forEachListener((_BufferingStreamSubscription<T> subscription) {
659 subscription._addError(error);
660 });
661 }
662
663 void _sendDone() {
664 if (_isEmpty) return;
665 _forEachListener((_BroadcastSubscription<T> subscription) {
666 subscription._close();
667 subscription._eventState |=
668 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
669 });
670 }
671 }
672
673 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
674 _AsyncBroadcastStreamController(void onListen(), void onCancel())
675 : super(onListen, onCancel);
676
677 // EventDispatch interface.
678
679 void _sendData(T data) {
680 for (_BroadcastSubscriptionLink link = _next;
681 !identical(link, this);
682 link = link._next) {
683 _BroadcastSubscription<T> subscription = link;
684 subscription._addPending(new _DelayedData(data));
685 }
686 }
687
688 void _sendError(Object error) {
689 for (_BroadcastSubscriptionLink link = _next;
690 !identical(link, this);
691 link = link._next) {
692 _BroadcastSubscription<T> subscription = link;
693 subscription._addPending(new _DelayedError(error));
694 }
695 }
696
697 void _sendDone() {
698 for (_BroadcastSubscriptionLink link = _next;
699 !identical(link, this);
700 link = link._next) {
701 _BroadcastSubscription<T> subscription = link;
702 subscription._addPending(const _DelayedDone());
703 } 692 }
704 } 693 }
705 } 694 }
706
707 /**
708 * Stream controller that is used by [Stream.asBroadcastStream].
709 *
710 * This stream controller allows incoming events while it is firing
711 * other events. This is handled by delaying the events until the
712 * current event is done firing, and then fire the pending events.
713 *
714 * This class extends [_SyncBroadcastStreamController]. Events of
715 * an "asBroadcastStream" stream are always initiated by events
716 * on another stream, and it is fine to forward them synchronously.
717 */
718 class _AsBroadcastStreamController<T>
719 extends _SyncBroadcastStreamController<T>
720 implements _EventDispatch<T> {
721 _StreamImplEvents _pending;
722
723 _AsBroadcastStreamController(void onListen(), void onCancel())
724 : super(onListen, onCancel);
725
726 bool get _hasPending => _pending != null && ! _pending.isEmpty;
727
728 void _addPendingEvent(_DelayedEvent event) {
729 if (_pending == null) {
730 _pending = new _StreamImplEvents();
731 }
732 _pending.add(event);
733 }
734
735 void add(T data) {
736 if (_isFiring) {
737 _addPendingEvent(new _DelayedData<T>(data));
738 return;
739 }
740 super.add(data);
741 while (_hasPending) {
742 _pending.handleNext(this);
743 }
744 }
745
746 void addError(Object error, [StackTrace stackTrace]) {
747 if (_isFiring) {
748 _addPendingEvent(new _DelayedError(error));
749 return;
750 }
751 super.addError(error, stackTrace);
752 while (_hasPending) {
753 _pending.handleNext(this);
754 }
755 }
756
757 void close() {
758 if (_isFiring) {
759 _addPendingEvent(const _DelayedDone());
760 _state |= _STATE_CLOSED;
761 return;
762 }
763 super.close();
764 assert(!_hasPending);
765 }
766
767 void _callOnCancel() {
768 if (_hasPending) {
769 _pending.clear();
770 _pending = null;
771 }
772 super._callOnCancel();
773 }
774 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698