Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16125005: Make new StreamController be async by default. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
57 * registered. 57 * registered.
58 * 58 *
59 * The [onPause] function is called when the stream becomes 59 * The [onPause] function is called when the stream becomes
60 * paused. [onResume] is called when the stream resumed. 60 * paused. [onResume] is called when the stream resumed.
61 * 61 *
62 * The [onListen] callback is called when the stream 62 * The [onListen] callback is called when the stream
63 * receives its listener and [onCancel] when the listener ends 63 * receives its listener and [onCancel] when the listener ends
64 * its subscription. 64 * its subscription.
65 * 65 *
66 * If the stream is canceled before the controller needs new data the 66 * If the stream is canceled before the controller needs new data the
67 * [onResume] call might not be executed. 67 * [onResume] call might not be executed.
floitsch 2013/05/30 12:13:48 Comment `sync`.
Lasse Reichstein Nielsen 2013/05/31 05:51:59 Done.
68 */ 68 */
69 factory StreamController({void onListen(), 69 factory StreamController({void onListen(),
70 void onPause(), 70 void onPause(),
71 void onResume(), 71 void onResume(),
72 void onCancel()}) 72 void onCancel(),
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); 73 bool sync: false})
74 => sync
75 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
76 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
74 77
75 /** 78 /**
76 * A controller where [stream] can be listened to more than once. 79 * A controller where [stream] can be listened to more than once.
77 * 80 *
78 * The [Stream] returned by [stream] is a broadcast stream. It can be listened 81 * The [Stream] returned by [stream] is a broadcast stream. It can be listened
79 * to more than once. 82 * to more than once.
80 * 83 *
81 * The controller distributes any events to all currently subscribed 84 * The controller distributes any events to all currently subscribed
82 * listeners. 85 * listeners.
83 * It is not allowed to call [add], [addError], or [close] before a previous 86 * It is not allowed to call [add], [addError], or [close] before a previous
84 * call has returned. 87 * call has returned.
85 * 88 *
86 * Each listener is handled independently, and if they pause, only the pausing 89 * Each listener is handled independently, and if they pause, only the pausing
87 * listener is affected. A paused listener will buffer events internally until 90 * listener is affected. A paused listener will buffer events internally until
88 * unpaused or canceled. 91 * unpaused or canceled.
89 * 92 *
90 * The [onListen] callback is called when the first listener is subscribed, 93 * The [onListen] callback is called when the first listener is subscribed,
91 * and the [onCancel] is called when there are no longer any active listeners. 94 * and the [onCancel] is called when there are no longer any active listeners.
92 * If a listener is added again later, after the [onCancel] was called, 95 * If a listener is added again later, after the [onCancel] was called,
93 * the [onListen] will be called again. 96 * the [onListen] will be called again.
94 */ 97 */
95 factory StreamController.broadcast({void onListen(), void onCancel()}) { 98 factory StreamController.broadcast({void onListen(),
96 return new _MultiplexStreamController<T>(onListen, onCancel); 99 void onCancel(),
100 bool sync: false}) {
101 return sync
102 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
103 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
97 } 104 }
98 105
99 /** 106 /**
100 * Returns a view of this object that only exposes the [EventSink] interface. 107 * Returns a view of this object that only exposes the [EventSink] interface.
101 */ 108 */
102 EventSink<T> get sink; 109 EventSink<T> get sink;
103 110
104 /** 111 /**
105 * Whether the stream is closed for adding more events. 112 * Whether the stream is closed for adding more events.
106 * 113 *
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 void _recordPause(StreamSubscription<T> subscription) {} 147 void _recordPause(StreamSubscription<T> subscription) {}
141 void _recordResume(StreamSubscription<T> subscription) {} 148 void _recordResume(StreamSubscription<T> subscription) {}
142 void _recordCancel(StreamSubscription<T> subscription) {} 149 void _recordCancel(StreamSubscription<T> subscription) {}
143 } 150 }
144 151
145 /** 152 /**
146 * Default implementation of [StreamController]. 153 * Default implementation of [StreamController].
147 * 154 *
148 * Controls a stream that only supports a single controller. 155 * Controls a stream that only supports a single controller.
149 */ 156 */
150 class _StreamControllerImpl<T> implements StreamController<T>, 157 abstract class _StreamController<T> implements StreamController<T>,
151 _StreamControllerLifecycle<T> { 158 _StreamControllerLifecycle<T>,
159 _EventDispatch<T> {
152 static const int _STATE_OPEN = 0; 160 static const int _STATE_OPEN = 0;
153 static const int _STATE_CANCELLED = 1; 161 static const int _STATE_CANCELLED = 1;
154 static const int _STATE_CLOSED = 2; 162 static const int _STATE_CLOSED = 2;
155 163
156 final _NotificationHandler _onListen; 164 final _NotificationHandler _onListen;
157 final _NotificationHandler _onPause; 165 final _NotificationHandler _onPause;
158 final _NotificationHandler _onResume; 166 final _NotificationHandler _onResume;
159 final _NotificationHandler _onCancel; 167 final _NotificationHandler _onCancel;
160 _StreamImpl<T> _stream; 168 _StreamImpl<T> _stream;
161 169
162 // An active subscription on the stream, or null if no subscripton is active. 170 // An active subscription on the stream, or null if no subscripton is active.
163 _ControllerSubscription<T> _subscription; 171 _ControllerSubscription<T> _subscription;
164 172
165 // Whether we have sent a "done" event. 173 // Whether we have sent a "done" event.
166 int _state = _STATE_OPEN; 174 int _state = _STATE_OPEN;
167 175
168 // Events added to the stream before it has an active subscription. 176 // Events added to the stream before it has an active subscription.
169 _PendingEvents _pendingEvents = null; 177 _PendingEvents _pendingEvents = null;
170 178
171 _StreamControllerImpl(this._onListen, 179 _StreamController(this._onListen,
172 this._onPause, 180 this._onPause,
173 this._onResume, 181 this._onResume,
174 this._onCancel) { 182 this._onCancel) {
175 _stream = new _ControllerStream<T>(this); 183 _stream = new _ControllerStream<T>(this);
176 } 184 }
177 185
178 Stream<T> get stream => _stream; 186 Stream<T> get stream => _stream;
179 187
180 /** 188 /**
181 * Returns a view of this object that only exposes the [EventSink] interface. 189 * Returns a view of this object that only exposes the [EventSink] interface.
182 */ 190 */
183 EventSink<T> get sink => new _EventSinkView<T>(this); 191 EventSink<T> get sink => new _EventSinkView<T>(this);
184 192
(...skipping 10 matching lines...) Expand all
195 : !_isCancelled; 203 : !_isCancelled;
196 204
197 bool get hasListener => _subscription != null; 205 bool get hasListener => _subscription != null;
198 206
199 /** 207 /**
200 * Send or queue a data event. 208 * Send or queue a data event.
201 */ 209 */
202 void add(T value) { 210 void add(T value) {
203 if (isClosed) throw new StateError("Adding event after close"); 211 if (isClosed) throw new StateError("Adding event after close");
204 if (_subscription != null) { 212 if (_subscription != null) {
205 _subscription._add(value); 213 _sendData(value);
206 } else if (!_isCancelled) { 214 } else if (!_isCancelled) {
207 _addPendingEvent(new _DelayedData<T>(value)); 215 _addPendingEvent(new _DelayedData<T>(value));
208 } 216 }
209 } 217 }
210 218
211 /** 219 /**
212 * Send or enqueue an error event. 220 * Send or enqueue an error event.
213 */ 221 */
214 void addError(Object error, [Object stackTrace]) { 222 void addError(Object error, [Object stackTrace]) {
215 if (isClosed) throw new StateError("Adding event after close"); 223 if (isClosed) throw new StateError("Adding event after close");
216 if (stackTrace != null) { 224 if (stackTrace != null) {
217 // Force stack trace overwrite. Even if the error already contained 225 // Force stack trace overwrite. Even if the error already contained
218 // a stack trace. 226 // a stack trace.
219 _attachStackTrace(error, stackTrace); 227 _attachStackTrace(error, stackTrace);
220 } 228 }
221 if (_subscription != null) { 229 if (_subscription != null) {
222 _subscription._addError(error); 230 _sendError(error);
223 } else if (!_isCancelled) { 231 } else if (!_isCancelled) {
224 _addPendingEvent(new _DelayedError(error)); 232 _addPendingEvent(new _DelayedError(error));
225 } 233 }
226 } 234 }
227 235
228 /** 236 /**
229 * Closes this controller. 237 * Closes this controller.
230 * 238 *
231 * After closing, no further events may be added using [add] or [addError]. 239 * After closing, no further events may be added using [add] or [addError].
232 * 240 *
233 * You are allowed to close the controller more than once, but only the first 241 * You are allowed to close the controller more than once, but only the first
234 * call has any effect. 242 * call has any effect.
235 * 243 *
236 * The first time a controller is closed, a "done" event is sent to its 244 * The first time a controller is closed, a "done" event is sent to its
237 * stream. 245 * stream.
238 */ 246 */
239 void close() { 247 void close() {
240 if (isClosed) return; 248 if (isClosed) return;
241 _state |= _STATE_CLOSED; 249 _state |= _STATE_CLOSED;
242 if (_subscription != null) { 250 if (_subscription != null) {
243 _subscription._close(); 251 _sendDone();
244 } else if (!_isCancelled) { 252 } else if (!_isCancelled) {
245 _addPendingEvent(const _DelayedDone()); 253 _addPendingEvent(const _DelayedDone());
246 } 254 }
247 } 255 }
248 256
257 // EventDispatch interface
258
249 void _addPendingEvent(_DelayedEvent event) { 259 void _addPendingEvent(_DelayedEvent event) {
250 if (_isCancelled) return; 260 if (_isCancelled) return;
251 _StreamImplEvents events = _pendingEvents; 261 _StreamImplEvents events = _pendingEvents;
252 if (events == null) { 262 if (events == null) {
253 events = _pendingEvents = new _StreamImplEvents(); 263 events = _pendingEvents = new _StreamImplEvents();
254 } 264 }
255 events.add(event); 265 events.add(event);
256 } 266 }
257 267
258 void _recordListen(_BufferingStreamSubscription<T> subscription) { 268 void _recordListen(_BufferingStreamSubscription<T> subscription) {
(...skipping 15 matching lines...) Expand all
274 284
275 void _recordPause(StreamSubscription<T> subscription) { 285 void _recordPause(StreamSubscription<T> subscription) {
276 _runGuarded(_onPause); 286 _runGuarded(_onPause);
277 } 287 }
278 288
279 void _recordResume(StreamSubscription<T> subscription) { 289 void _recordResume(StreamSubscription<T> subscription) {
280 _runGuarded(_onResume); 290 _runGuarded(_onResume);
281 } 291 }
282 } 292 }
283 293
294 class _SyncStreamController<T> extends _StreamController<T> {
295 _SyncStreamController(void onListen(),
296 void onPause(),
297 void onResume(),
298 void onCancel())
299 : super(onListen, onPause, onResume, onCancel);
300
301 void _sendData(T data) {
302 _subscription._add(data);
303 }
304
305 void _sendError(Object error) {
306 _subscription._addError(error);
307 }
308
309 void _sendDone() {
310 _subscription._close();
311 }
312 }
313
314 class _AsyncStreamController<T> extends _StreamController<T> {
315 _AsyncStreamController(void onListen(),
316 void onPause(),
317 void onResume(),
318 void onCancel())
319 : super(onListen, onPause, onResume, onCancel);
320
321 void _sendData(T data) {
322 runAsync(() {
floitsch 2013/05/30 12:13:48 Try to use addPending instead.
Lasse Reichstein Nielsen 2013/05/31 05:51:59 Done.
323 if (_subscription == null) return;
324 _subscription._add(data);
325 });
326 }
327
328 void _sendError(Object error) {
329 runAsync(() {
330 if (_subscription == null) return;
331 _subscription._addError(error);
332 });
333 }
334
335 void _sendDone() {
336 runAsync(_subscription._close);
337 }
338 }
339
284 typedef void _NotificationHandler(); 340 typedef void _NotificationHandler();
285 341
286 void _runGuarded(_NotificationHandler notificationHandler) { 342 void _runGuarded(_NotificationHandler notificationHandler) {
287 if (notificationHandler == null) return; 343 if (notificationHandler == null) return;
288 try { 344 try {
289 notificationHandler(); 345 notificationHandler();
290 } catch (e, s) { 346 } catch (e, s) {
291 _throwDelayed(e, s); 347 _throwDelayed(e, s);
292 } 348 }
293 } 349 }
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
332 388
333 void _onPause() { 389 void _onPause() {
334 _controller._recordPause(this); 390 _controller._recordPause(this);
335 } 391 }
336 392
337 void _onResume() { 393 void _onResume() {
338 _controller._recordResume(this); 394 _controller._recordResume(this);
339 } 395 }
340 } 396 }
341 397
342 class _MultiplexStream<T> extends _StreamImpl<T> { 398 class _BroadcastStream<T> extends _StreamImpl<T> {
343 _MultiplexStreamController _controller; 399 _BroadcastStreamController _controller;
344 400
345 _MultiplexStream(this._controller); 401 _BroadcastStream(this._controller);
346 402
347 bool get isBroadcast => true; 403 bool get isBroadcast => true;
348 404
349 StreamSubscription<T> _createSubscription( 405 StreamSubscription<T> _createSubscription(
350 void onData(T data), 406 void onData(T data),
351 void onError(Object error), 407 void onError(Object error),
352 void onDone(), 408 void onDone(),
353 bool cancelOnError) { 409 bool cancelOnError) {
354 return new _MultiplexSubscription<T>( 410 return new _BroadcastSubscription<T>(
355 _controller, onData, onError, onDone, cancelOnError); 411 _controller, onData, onError, onDone, cancelOnError);
356 } 412 }
357 413
358 void _onListen(_BufferingStreamSubscription subscription) { 414 void _onListen(_BufferingStreamSubscription subscription) {
359 _controller._recordListen(subscription); 415 _controller._recordListen(subscription);
360 } 416 }
361 } 417 }
362 418
363 abstract class _MultiplexSubscriptionLink { 419 abstract class _BroadcastSubscriptionLink {
364 _MultiplexSubscriptionLink _next; 420 _BroadcastSubscriptionLink _next;
365 _MultiplexSubscriptionLink _previous; 421 _BroadcastSubscriptionLink _previous;
366 } 422 }
367 423
368 class _MultiplexSubscription<T> extends _ControllerSubscription<T> 424 class _BroadcastSubscription<T> extends _ControllerSubscription<T>
369 implements _MultiplexSubscriptionLink { 425 implements _BroadcastSubscriptionLink {
370 static const int _STATE_EVENT_ID = 1; 426 static const int _STATE_EVENT_ID = 1;
371 static const int _STATE_FIRING = 2; 427 static const int _STATE_FIRING = 2;
372 static const int _STATE_REMOVE_AFTER_FIRING = 4; 428 static const int _STATE_REMOVE_AFTER_FIRING = 4;
373 int _eventState; 429 int _eventState;
374 430
375 _MultiplexSubscriptionLink _next; 431 _BroadcastSubscriptionLink _next;
376 _MultiplexSubscriptionLink _previous; 432 _BroadcastSubscriptionLink _previous;
377 433
378 _MultiplexSubscription(_StreamControllerLifecycle controller, 434 _BroadcastSubscription(_StreamControllerLifecycle controller,
379 void onData(T data), 435 void onData(T data),
380 void onError(Object error), 436 void onError(Object error),
381 void onDone(), 437 void onDone(),
382 bool cancelOnError) 438 bool cancelOnError)
383 : super(controller, onData, onError, onDone, cancelOnError) { 439 : super(controller, onData, onError, onDone, cancelOnError) {
384 _next = _previous = this; 440 _next = _previous = this;
385 } 441 }
386 442
387 _MultiplexStreamController get _controller => super._controller; 443 _BroadcastStreamController get _controller => super._controller;
388 444
389 bool _expectsEvent(int eventId) { 445 bool _expectsEvent(int eventId) {
390 return (_eventState & _STATE_EVENT_ID) == eventId; 446 return (_eventState & _STATE_EVENT_ID) == eventId;
391 } 447 }
392 448
393 void _toggleEventId() { 449 void _toggleEventId() {
394 _eventState ^= _STATE_EVENT_ID; 450 _eventState ^= _STATE_EVENT_ID;
395 } 451 }
396 452
397 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; 453 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
398 454
399 bool _setRemoveAfterFiring() { 455 bool _setRemoveAfterFiring() {
400 assert(_isFiring); 456 assert(_isFiring);
401 _eventState |= _STATE_REMOVE_AFTER_FIRING; 457 _eventState |= _STATE_REMOVE_AFTER_FIRING;
402 } 458 }
403 459
404 bool get _removeAfterFiring => 460 bool get _removeAfterFiring =>
405 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; 461 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
406 } 462 }
407 463
408 464
409 class _MultiplexStreamController<T> implements StreamController<T>, 465 abstract class _BroadcastStreamController<T>
410 _StreamControllerLifecycle<T>, 466 implements StreamController<T>,
411 _MultiplexSubscriptionLink { 467 _StreamControllerLifecycle<T>,
468 _BroadcastSubscriptionLink,
469 _EventDispatch<T> {
412 static const int _STATE_INITIAL = 0; 470 static const int _STATE_INITIAL = 0;
413 static const int _STATE_EVENT_ID = 1; 471 static const int _STATE_EVENT_ID = 1;
414 static const int _STATE_FIRING = 2; 472 static const int _STATE_FIRING = 2;
415 static const int _STATE_CLOSED = 4; 473 static const int _STATE_CLOSED = 4;
416 474
417 final _NotificationHandler _onListen; 475 final _NotificationHandler _onListen;
418 final _NotificationHandler _onCancel; 476 final _NotificationHandler _onCancel;
419 477
420 // State of the controller. 478 // State of the controller.
421 int _state; 479 int _state;
422 480
423 // Double-linked list of active listeners. 481 // Double-linked list of active listeners.
424 _MultiplexSubscriptionLink _next; 482 _BroadcastSubscriptionLink _next;
425 _MultiplexSubscriptionLink _previous; 483 _BroadcastSubscriptionLink _previous;
426 484
427 _MultiplexStreamController(this._onListen, this._onCancel) 485 _BroadcastStreamController(this._onListen, this._onCancel)
428 : _state = _STATE_INITIAL { 486 : _state = _STATE_INITIAL {
429 _next = _previous = this; 487 _next = _previous = this;
430 } 488 }
431 489
432 // StreamController interface. 490 // StreamController interface.
433 491
434 Stream<T> get stream => new _MultiplexStream<T>(this); 492 Stream<T> get stream => new _BroadcastStream<T>(this);
435 493
436 EventSink<T> get sink => new _EventSinkView<T>(this); 494 EventSink<T> get sink => new _EventSinkView<T>(this);
437 495
438 bool get isClosed => (_state & _STATE_CLOSED) != 0; 496 bool get isClosed => (_state & _STATE_CLOSED) != 0;
439 497
440 /** 498 /**
441 * A multiplex controller is never paused. 499 * A broadcast controller is never paused.
442 * 500 *
443 * Each receiving stream may be paused individually, and they handle their 501 * Each receiving stream may be paused individually, and they handle their
444 * own buffering. 502 * own buffering.
445 */ 503 */
446 bool get isPaused => false; 504 bool get isPaused => false;
447 505
448 /** Whether there are currently a subscriber on the [Stream]. */ 506 /** Whether there are currently a subscriber on the [Stream]. */
449 bool get hasListener => !_isEmpty; 507 bool get hasListener => !_isEmpty;
450 508
451 /** Whether an event is being fired (sent to some, but not all, listeners). */ 509 /** Whether an event is being fired (sent to some, but not all, listeners). */
452 bool get _isFiring => (_state & _STATE_FIRING) != 0; 510 bool get _isFiring => (_state & _STATE_FIRING) != 0;
453 511
454 // Linked list helpers 512 // Linked list helpers
455 513
456 bool get _isEmpty => identical(_next, this); 514 bool get _isEmpty => identical(_next, this);
457 515
458 /** Adds subscription to linked list of active listeners. */ 516 /** Adds subscription to linked list of active listeners. */
459 void _addListener(_MultiplexSubscription<T> subscription) { 517 void _addListener(_BroadcastSubscription<T> subscription) {
460 _MultiplexSubscriptionLink previous = _previous; 518 _BroadcastSubscriptionLink previous = _previous;
461 previous._next = subscription; 519 previous._next = subscription;
462 _previous = subscription._previous; 520 _previous = subscription._previous;
463 subscription._previous._next = this; 521 subscription._previous._next = this;
464 subscription._previous = previous; 522 subscription._previous = previous;
465 subscription._eventState = (_state & _STATE_EVENT_ID); 523 subscription._eventState = (_state & _STATE_EVENT_ID);
466 } 524 }
467 525
468 void _removeListener(_MultiplexSubscription<T> subscription) { 526 void _removeListener(_BroadcastSubscription<T> subscription) {
469 assert(identical(subscription._controller, this)); 527 assert(identical(subscription._controller, this));
470 assert(!identical(subscription._next, subscription)); 528 assert(!identical(subscription._next, subscription));
471 subscription._previous._next = subscription._next; 529 subscription._previous._next = subscription._next;
472 subscription._next._previous = subscription._previous; 530 subscription._next._previous = subscription._previous;
473 subscription._next = subscription._previous = subscription; 531 subscription._next = subscription._previous = subscription;
474 } 532 }
475 533
476 // _StreamControllerLifecycle interface. 534 // _StreamControllerLifecycle interface.
477 535
478 void _recordListen(_MultiplexSubscription<T> subscription) { 536 void _recordListen(_BroadcastSubscription<T> subscription) {
479 _addListener(subscription); 537 _addListener(subscription);
480 if (identical(_next, _previous)) { 538 if (identical(_next, _previous)) {
481 // Only one listener, so it must be the first listener. 539 // Only one listener, so it must be the first listener.
482 _runGuarded(_onListen); 540 _runGuarded(_onListen);
483 } 541 }
484 } 542 }
485 543
486 void _recordCancel(_MultiplexSubscription<T> subscription) { 544 void _recordCancel(_BroadcastSubscription<T> subscription) {
487 if (subscription._isFiring) { 545 if (subscription._isFiring) {
488 subscription._setRemoveAfterFiring(); 546 subscription._setRemoveAfterFiring();
489 } else { 547 } else {
490 _removeListener(subscription); 548 _removeListener(subscription);
491 // If we are currently firing an event, the empty-check is performed at 549 // If we are currently firing an event, the empty-check is performed at
492 // the end of the listener loop instead of here. 550 // the end of the listener loop instead of here.
493 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { 551 if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
494 _callOnCancel(); 552 _callOnCancel();
495 } 553 }
496 } 554 }
(...skipping 20 matching lines...) Expand all
517 } 575 }
518 576
519 void close() { 577 void close() {
520 if (isClosed) { 578 if (isClosed) {
521 throw new StateError("Cannot add new events after calling close()"); 579 throw new StateError("Cannot add new events after calling close()");
522 } 580 }
523 _state |= _STATE_CLOSED; 581 _state |= _STATE_CLOSED;
524 _sendDone(); 582 _sendDone();
525 } 583 }
526 584
527 // EventDispatch interface.
528
529 void _sendData(T data) {
530 if (_isEmpty) return;
531 _forEachListener((_BufferingStreamSubscription<T> subscription) {
532 subscription._add(data);
533 });
534 }
535
536 void _sendError(Object error) {
537 if (_isEmpty) return;
538 _forEachListener((_BufferingStreamSubscription<T> subscription) {
539 subscription._addError(error);
540 });
541 }
542
543 void _sendDone() {
544 if (_isEmpty) return;
545 _forEachListener((_MultiplexSubscription<T> subscription) {
546 subscription._close();
547 subscription._eventState |=
548 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING;
549 });
550 }
551
552 void _forEachListener( 585 void _forEachListener(
553 void action(_BufferingStreamSubscription<T> subscription)) { 586 void action(_BufferingStreamSubscription<T> subscription)) {
554 if (_isFiring) { 587 if (_isFiring) {
555 throw new StateError( 588 throw new StateError(
556 "Cannot fire new event. Controller is already firing an event"); 589 "Cannot fire new event. Controller is already firing an event");
557 } 590 }
558 if (_isEmpty) return; 591 if (_isEmpty) return;
559 592
560 // Get event id of this event. 593 // Get event id of this event.
561 int id = (_state & _STATE_EVENT_ID); 594 int id = (_state & _STATE_EVENT_ID);
562 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] 595 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
563 // callbacks while firing, and we prevent reentrancy of this function. 596 // callbacks while firing, and we prevent reentrancy of this function.
564 // 597 //
565 // Set [_state]'s event id to the next event's id. 598 // Set [_state]'s event id to the next event's id.
566 // Any listeners added while firing this event will expect the next event, 599 // Any listeners added while firing this event will expect the next event,
567 // not this one, and won't get notified. 600 // not this one, and won't get notified.
568 _state ^= _STATE_EVENT_ID | _STATE_FIRING; 601 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
569 _MultiplexSubscriptionLink link = _next; 602 _BroadcastSubscriptionLink link = _next;
570 while (!identical(link, this)) { 603 while (!identical(link, this)) {
571 _MultiplexSubscription<T> subscription = link; 604 _BroadcastSubscription<T> subscription = link;
572 if (subscription._expectsEvent(id)) { 605 if (subscription._expectsEvent(id)) {
573 subscription._eventState |= _MultiplexSubscription._STATE_FIRING; 606 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
574 action(subscription); 607 action(subscription);
575 subscription._toggleEventId(); 608 subscription._toggleEventId();
576 link = subscription._next; 609 link = subscription._next;
577 if (subscription._removeAfterFiring) { 610 if (subscription._removeAfterFiring) {
578 _removeListener(subscription); 611 _removeListener(subscription);
579 } 612 }
580 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; 613 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
581 } else { 614 } else {
582 link = subscription._next; 615 link = subscription._next;
583 } 616 }
584 } 617 }
585 _state &= ~_STATE_FIRING; 618 _state &= ~_STATE_FIRING;
586 619
587 if (_isEmpty) { 620 if (_isEmpty) {
588 _callOnCancel(); 621 _callOnCancel();
589 } 622 }
590 } 623 }
591 624
592 void _callOnCancel() { 625 void _callOnCancel() {
593 _runGuarded(_onCancel); 626 _runGuarded(_onCancel);
594 } 627 }
595 } 628 }
596 629
597 class _BufferingMultiplexStreamController<T> 630 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
598 extends _MultiplexStreamController<T> 631 _SyncBroadcastStreamController(void onListen(), void onCancel())
632 : super(onListen, onCancel);
633
634 // EventDispatch interface.
635
636 void _sendData(T data) {
637 if (_isEmpty) return;
638 _forEachListener((_BufferingStreamSubscription<T> subscription) {
639 subscription._add(data);
640 });
641 }
642
643 void _sendError(Object error) {
644 if (_isEmpty) return;
645 _forEachListener((_BufferingStreamSubscription<T> subscription) {
646 subscription._addError(error);
647 });
648 }
649
650 void _sendDone() {
651 if (_isEmpty) return;
652 _forEachListener((_BroadcastSubscription<T> subscription) {
653 subscription._close();
654 subscription._eventState |=
655 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
656 });
657 }
658 }
659
660 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
661 _AsyncBroadcastStreamController(void onListen(), void onCancel())
662 : super(onListen, onCancel);
663
664 // EventDispatch interface.
665
666 void _sendData(T data) {
667 runAsync(() {
668 if (_isEmpty) return;
669 _forEachListener((_BufferingStreamSubscription<T> subscription) {
670 subscription._add(data);
671 });
672 });
673 }
674
675 void _sendError(Object error) {
676 runAsync(() {
677 if (_isEmpty) return;
678 _forEachListener((_BufferingStreamSubscription<T> subscription) {
679 subscription._addError(error);
680 });
681 });
682 }
683
684 void _sendDone() {
685 runAsync(() {
686 if (_isEmpty) return;
687 _forEachListener((_BroadcastSubscription<T> subscription) {
688 subscription._close();
689 subscription._eventState |=
690 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
691 });
692 });
693 }
694 }
695
696 /**
697 * Stream controller that is used by [Stream.asBroadcastStream].
698 *
699 * This stream controller allows incoming events while it is firing
700 * other events. This is handled by delaying the events until the
701 * current event is done firing, and then fire the pending events.
702 *
703 * This class extends [_SyncBroadcastStreamController]. Events of
704 * an "asBroadcastStream" stream are always initiated by events
705 * on another stream, and it is fine to forward them synchronously.
706 */
707 class _AsBroadcastStreamController<T>
708 extends _SyncBroadcastStreamController<T>
599 implements _EventDispatch<T> { 709 implements _EventDispatch<T> {
600 _StreamImplEvents _pending; 710 _StreamImplEvents _pending;
601 711
602 _BufferingMultiplexStreamController(void onListen(), void onCancel()) 712 _AsBroadcastStreamController(void onListen(), void onCancel())
603 : super(onListen, onCancel); 713 : super(onListen, onCancel);
604 714
605 bool get _hasPending => _pending != null && ! _pending.isEmpty; 715 bool get _hasPending => _pending != null && ! _pending.isEmpty;
606 716
607 void _addPendingEvent(_DelayedEvent event) { 717 void _addPendingEvent(_DelayedEvent event) {
608 if (_pending == null) { 718 if (_pending == null) {
609 _pending = new _StreamImplEvents(); 719 _pending = new _StreamImplEvents();
610 } 720 }
611 _pending.add(event); 721 _pending.add(event);
612 } 722 }
(...skipping 29 matching lines...) Expand all
642 super.close(); 752 super.close();
643 assert(!_hasPending); 753 assert(!_hasPending);
644 } 754 }
645 755
646 void _callOnCancel() { 756 void _callOnCancel() {
647 if (_hasPending) { 757 if (_hasPending) {
648 _pending.clear(); 758 _pending.clear();
649 _pending = null; 759 _pending = null;
650 } 760 }
651 super._callOnCancel(); 761 super._callOnCancel();
652
653 } 762 }
654 } 763 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698