Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(500)

Side by Side Diff: sdk/lib/async/stream_controller.dart

Issue 16125005: Make new StreamController be async by default. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review comments Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Controller for creating and adding events to a stream. 8 // Controller for creating and adding events to a stream.
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
46 * the stream at all, and won't trigger callbacks. From the controller's point 46 * the stream at all, and won't trigger callbacks. From the controller's point
47 * of view, the stream is completely inert when has completed. 47 * of view, the stream is completely inert when has completed.
48 */ 48 */
49 abstract class StreamController<T> implements EventSink<T> { 49 abstract class StreamController<T> implements EventSink<T> {
50 /** The stream that this controller is controlling. */ 50 /** The stream that this controller is controlling. */
51 Stream<T> get stream; 51 Stream<T> get stream;
52 52
53 /** 53 /**
54 * A controller with a [stream] that supports only one single subscriber. 54 * A controller with a [stream] that supports only one single subscriber.
55 * 55 *
56 * If [sync] is true, events may be passed directly to the stream's listener
57 * during an [add], [addError] or [close] call. If [sync] is false, the event
58 * will be passed to the listener at a later time, after the code creating
59 * the event has returned.
60 *
56 * The controller will buffer all incoming events until the subscriber is 61 * The controller will buffer all incoming events until the subscriber is
57 * registered. 62 * registered.
58 * 63 *
59 * The [onPause] function is called when the stream becomes 64 * The [onPause] function is called when the stream becomes
60 * paused. [onResume] is called when the stream resumed. 65 * paused. [onResume] is called when the stream resumed.
61 * 66 *
62 * The [onListen] callback is called when the stream 67 * The [onListen] callback is called when the stream
63 * receives its listener and [onCancel] when the listener ends 68 * receives its listener and [onCancel] when the listener ends
64 * its subscription. 69 * its subscription.
65 * 70 *
66 * If the stream is canceled before the controller needs new data the 71 * If the stream is canceled before the controller needs new data the
67 * [onResume] call might not be executed. 72 * [onResume] call might not be executed.
68 */ 73 */
69 factory StreamController({void onListen(), 74 factory StreamController({void onListen(),
70 void onPause(), 75 void onPause(),
71 void onResume(), 76 void onResume(),
72 void onCancel()}) 77 void onCancel(),
73 => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); 78 bool sync: false})
79 => sync
80 ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
81 : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
74 82
75 /** 83 /**
76 * A controller where [stream] can be listened to more than once. 84 * A controller where [stream] can be listened to more than once.
77 * 85 *
78 * The [Stream] returned by [stream] is a broadcast stream. It can be listened 86 * The [Stream] returned by [stream] is a broadcast stream. It can be listened
79 * to more than once. 87 * to more than once.
80 * 88 *
81 * The controller distributes any events to all currently subscribed 89 * The controller distributes any events to all currently subscribed
82 * listeners. 90 * listeners.
83 * It is not allowed to call [add], [addError], or [close] before a previous 91 * It is not allowed to call [add], [addError], or [close] before a previous
84 * call has returned. 92 * call has returned.
85 * 93 *
94 * If [sync] is true, events may be passed directly to the stream's listener
95 * during an [add], [addError] or [close] call. If [sync] is false, the event
96 * will be passed to the listener at a later time, after the code creating
97 * the event has returned.
98 *
86 * Each listener is handled independently, and if they pause, only the pausing 99 * Each listener is handled independently, and if they pause, only the pausing
87 * listener is affected. A paused listener will buffer events internally until 100 * listener is affected. A paused listener will buffer events internally until
88 * unpaused or canceled. 101 * unpaused or canceled.
89 * 102 *
103 * If [sync] is false, no guarantees are given with regard to when
104 * multiple listeners get the events, except that each listener will get
105 * all events in the correct order. If two events are sent on an async
106 * controller with two listeners, one of the listeners may get both events
107 * before the other listener gets any.
108 * A listener must be subscribed both when the event is initiated (that is,
109 * when [add] is called) and when the event is later delivered, in order to
110 * get the event.
111 *
90 * The [onListen] callback is called when the first listener is subscribed, 112 * The [onListen] callback is called when the first listener is subscribed,
91 * and the [onCancel] is called when there are no longer any active listeners. 113 * and the [onCancel] is called when there are no longer any active listeners.
92 * If a listener is added again later, after the [onCancel] was called, 114 * If a listener is added again later, after the [onCancel] was called,
93 * the [onListen] will be called again. 115 * the [onListen] will be called again.
94 */ 116 */
95 factory StreamController.broadcast({void onListen(), void onCancel()}) { 117 factory StreamController.broadcast({void onListen(),
96 return new _MultiplexStreamController<T>(onListen, onCancel); 118 void onCancel(),
119 bool sync: false}) {
120 return sync
121 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
122 : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
97 } 123 }
98 124
99 /** 125 /**
100 * Returns a view of this object that only exposes the [EventSink] interface. 126 * Returns a view of this object that only exposes the [EventSink] interface.
101 */ 127 */
102 EventSink<T> get sink; 128 EventSink<T> get sink;
103 129
104 /** 130 /**
105 * Whether the stream is closed for adding more events. 131 * Whether the stream is closed for adding more events.
106 * 132 *
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
140 void _recordPause(StreamSubscription<T> subscription) {} 166 void _recordPause(StreamSubscription<T> subscription) {}
141 void _recordResume(StreamSubscription<T> subscription) {} 167 void _recordResume(StreamSubscription<T> subscription) {}
142 void _recordCancel(StreamSubscription<T> subscription) {} 168 void _recordCancel(StreamSubscription<T> subscription) {}
143 } 169 }
144 170
145 /** 171 /**
146 * Default implementation of [StreamController]. 172 * Default implementation of [StreamController].
147 * 173 *
148 * Controls a stream that only supports a single controller. 174 * Controls a stream that only supports a single controller.
149 */ 175 */
150 class _StreamControllerImpl<T> implements StreamController<T>, 176 abstract class _StreamController<T> implements StreamController<T>,
151 _StreamControllerLifecycle<T> { 177 _StreamControllerLifecycle<T>,
178 _EventDispatch<T> {
152 static const int _STATE_OPEN = 0; 179 static const int _STATE_OPEN = 0;
153 static const int _STATE_CANCELLED = 1; 180 static const int _STATE_CANCELLED = 1;
154 static const int _STATE_CLOSED = 2; 181 static const int _STATE_CLOSED = 2;
155 182
156 final _NotificationHandler _onListen; 183 final _NotificationHandler _onListen;
157 final _NotificationHandler _onPause; 184 final _NotificationHandler _onPause;
158 final _NotificationHandler _onResume; 185 final _NotificationHandler _onResume;
159 final _NotificationHandler _onCancel; 186 final _NotificationHandler _onCancel;
160 _StreamImpl<T> _stream; 187 _StreamImpl<T> _stream;
161 188
162 // An active subscription on the stream, or null if no subscripton is active. 189 // An active subscription on the stream, or null if no subscripton is active.
163 _ControllerSubscription<T> _subscription; 190 _ControllerSubscription<T> _subscription;
164 191
165 // Whether we have sent a "done" event. 192 // Whether we have sent a "done" event.
166 int _state = _STATE_OPEN; 193 int _state = _STATE_OPEN;
167 194
168 // Events added to the stream before it has an active subscription. 195 // Events added to the stream before it has an active subscription.
169 _PendingEvents _pendingEvents = null; 196 _PendingEvents _pendingEvents = null;
170 197
171 _StreamControllerImpl(this._onListen, 198 _StreamController(this._onListen,
172 this._onPause, 199 this._onPause,
173 this._onResume, 200 this._onResume,
174 this._onCancel) { 201 this._onCancel) {
175 _stream = new _ControllerStream<T>(this); 202 _stream = new _ControllerStream<T>(this);
176 } 203 }
177 204
178 Stream<T> get stream => _stream; 205 Stream<T> get stream => _stream;
179 206
180 /** 207 /**
181 * Returns a view of this object that only exposes the [EventSink] interface. 208 * Returns a view of this object that only exposes the [EventSink] interface.
182 */ 209 */
183 EventSink<T> get sink => new _EventSinkView<T>(this); 210 EventSink<T> get sink => new _EventSinkView<T>(this);
184 211
(...skipping 10 matching lines...) Expand all
195 : !_isCancelled; 222 : !_isCancelled;
196 223
197 bool get hasListener => _subscription != null; 224 bool get hasListener => _subscription != null;
198 225
199 /** 226 /**
200 * Send or queue a data event. 227 * Send or queue a data event.
201 */ 228 */
202 void add(T value) { 229 void add(T value) {
203 if (isClosed) throw new StateError("Adding event after close"); 230 if (isClosed) throw new StateError("Adding event after close");
204 if (_subscription != null) { 231 if (_subscription != null) {
205 _subscription._add(value); 232 _sendData(value);
206 } else if (!_isCancelled) { 233 } else if (!_isCancelled) {
207 _addPendingEvent(new _DelayedData<T>(value)); 234 _addPendingEvent(new _DelayedData<T>(value));
208 } 235 }
209 } 236 }
210 237
211 /** 238 /**
212 * Send or enqueue an error event. 239 * Send or enqueue an error event.
213 */ 240 */
214 void addError(Object error, [Object stackTrace]) { 241 void addError(Object error, [Object stackTrace]) {
215 if (isClosed) throw new StateError("Adding event after close"); 242 if (isClosed) throw new StateError("Adding event after close");
216 if (stackTrace != null) { 243 if (stackTrace != null) {
217 // Force stack trace overwrite. Even if the error already contained 244 // Force stack trace overwrite. Even if the error already contained
218 // a stack trace. 245 // a stack trace.
219 _attachStackTrace(error, stackTrace); 246 _attachStackTrace(error, stackTrace);
220 } 247 }
221 if (_subscription != null) { 248 if (_subscription != null) {
222 _subscription._addError(error); 249 _sendError(error);
223 } else if (!_isCancelled) { 250 } else if (!_isCancelled) {
224 _addPendingEvent(new _DelayedError(error)); 251 _addPendingEvent(new _DelayedError(error));
225 } 252 }
226 } 253 }
227 254
228 /** 255 /**
229 * Closes this controller. 256 * Closes this controller.
230 * 257 *
231 * After closing, no further events may be added using [add] or [addError]. 258 * After closing, no further events may be added using [add] or [addError].
232 * 259 *
233 * You are allowed to close the controller more than once, but only the first 260 * You are allowed to close the controller more than once, but only the first
234 * call has any effect. 261 * call has any effect.
235 * 262 *
236 * The first time a controller is closed, a "done" event is sent to its 263 * The first time a controller is closed, a "done" event is sent to its
237 * stream. 264 * stream.
238 */ 265 */
239 void close() { 266 void close() {
240 if (isClosed) return; 267 if (isClosed) return;
241 _state |= _STATE_CLOSED; 268 _state |= _STATE_CLOSED;
242 if (_subscription != null) { 269 if (_subscription != null) {
243 _subscription._close(); 270 _sendDone();
244 } else if (!_isCancelled) { 271 } else if (!_isCancelled) {
245 _addPendingEvent(const _DelayedDone()); 272 _addPendingEvent(const _DelayedDone());
246 } 273 }
247 } 274 }
248 275
276 // EventDispatch interface
277
249 void _addPendingEvent(_DelayedEvent event) { 278 void _addPendingEvent(_DelayedEvent event) {
250 if (_isCancelled) return; 279 if (_isCancelled) return;
251 _StreamImplEvents events = _pendingEvents; 280 _StreamImplEvents events = _pendingEvents;
252 if (events == null) { 281 if (events == null) {
253 events = _pendingEvents = new _StreamImplEvents(); 282 events = _pendingEvents = new _StreamImplEvents();
254 } 283 }
255 events.add(event); 284 events.add(event);
256 } 285 }
257 286
258 void _recordListen(_BufferingStreamSubscription<T> subscription) { 287 void _recordListen(_BufferingStreamSubscription<T> subscription) {
(...skipping 15 matching lines...) Expand all
274 303
275 void _recordPause(StreamSubscription<T> subscription) { 304 void _recordPause(StreamSubscription<T> subscription) {
276 _runGuarded(_onPause); 305 _runGuarded(_onPause);
277 } 306 }
278 307
279 void _recordResume(StreamSubscription<T> subscription) { 308 void _recordResume(StreamSubscription<T> subscription) {
280 _runGuarded(_onResume); 309 _runGuarded(_onResume);
281 } 310 }
282 } 311 }
283 312
313 class _SyncStreamController<T> extends _StreamController<T> {
314 _SyncStreamController(void onListen(),
315 void onPause(),
316 void onResume(),
317 void onCancel())
318 : super(onListen, onPause, onResume, onCancel);
319
320 void _sendData(T data) {
321 _subscription._add(data);
322 }
323
324 void _sendError(Object error) {
325 _subscription._addError(error);
326 }
327
328 void _sendDone() {
329 _subscription._close();
330 }
331 }
332
333 class _AsyncStreamController<T> extends _StreamController<T> {
334 _AsyncStreamController(void onListen(),
335 void onPause(),
336 void onResume(),
337 void onCancel())
338 : super(onListen, onPause, onResume, onCancel);
339
340 void _sendData(T data) {
341 _subscription._addPending(new _DelayedData(data));
342 }
343
344 void _sendError(Object error) {
345 _subscription._addPending(new _DelayedError(error));
346 }
347
348 void _sendDone() {
349 _subscription._addPending(const _DelayedDone());
350 }
351 }
352
284 typedef void _NotificationHandler(); 353 typedef void _NotificationHandler();
285 354
286 void _runGuarded(_NotificationHandler notificationHandler) { 355 void _runGuarded(_NotificationHandler notificationHandler) {
287 if (notificationHandler == null) return; 356 if (notificationHandler == null) return;
288 try { 357 try {
289 notificationHandler(); 358 notificationHandler();
290 } catch (e, s) { 359 } catch (e, s) {
291 _throwDelayed(e, s); 360 _throwDelayed(e, s);
292 } 361 }
293 } 362 }
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
332 401
333 void _onPause() { 402 void _onPause() {
334 _controller._recordPause(this); 403 _controller._recordPause(this);
335 } 404 }
336 405
337 void _onResume() { 406 void _onResume() {
338 _controller._recordResume(this); 407 _controller._recordResume(this);
339 } 408 }
340 } 409 }
341 410
342 class _MultiplexStream<T> extends _StreamImpl<T> { 411 class _BroadcastStream<T> extends _StreamImpl<T> {
343 _MultiplexStreamController _controller; 412 _BroadcastStreamController _controller;
344 413
345 _MultiplexStream(this._controller); 414 _BroadcastStream(this._controller);
346 415
347 bool get isBroadcast => true; 416 bool get isBroadcast => true;
348 417
349 StreamSubscription<T> _createSubscription( 418 StreamSubscription<T> _createSubscription(
350 void onData(T data), 419 void onData(T data),
351 void onError(Object error), 420 void onError(Object error),
352 void onDone(), 421 void onDone(),
353 bool cancelOnError) { 422 bool cancelOnError) {
354 return new _MultiplexSubscription<T>( 423 return new _BroadcastSubscription<T>(
355 _controller, onData, onError, onDone, cancelOnError); 424 _controller, onData, onError, onDone, cancelOnError);
356 } 425 }
357 426
358 void _onListen(_BufferingStreamSubscription subscription) { 427 void _onListen(_BufferingStreamSubscription subscription) {
359 _controller._recordListen(subscription); 428 _controller._recordListen(subscription);
360 } 429 }
361 } 430 }
362 431
363 abstract class _MultiplexSubscriptionLink { 432 abstract class _BroadcastSubscriptionLink {
364 _MultiplexSubscriptionLink _next; 433 _BroadcastSubscriptionLink _next;
365 _MultiplexSubscriptionLink _previous; 434 _BroadcastSubscriptionLink _previous;
366 } 435 }
367 436
368 class _MultiplexSubscription<T> extends _ControllerSubscription<T> 437 class _BroadcastSubscription<T> extends _ControllerSubscription<T>
369 implements _MultiplexSubscriptionLink { 438 implements _BroadcastSubscriptionLink {
370 static const int _STATE_EVENT_ID = 1; 439 static const int _STATE_EVENT_ID = 1;
371 static const int _STATE_FIRING = 2; 440 static const int _STATE_FIRING = 2;
372 static const int _STATE_REMOVE_AFTER_FIRING = 4; 441 static const int _STATE_REMOVE_AFTER_FIRING = 4;
373 int _eventState; 442 int _eventState;
374 443
375 _MultiplexSubscriptionLink _next; 444 _BroadcastSubscriptionLink _next;
376 _MultiplexSubscriptionLink _previous; 445 _BroadcastSubscriptionLink _previous;
377 446
378 _MultiplexSubscription(_StreamControllerLifecycle controller, 447 _BroadcastSubscription(_StreamControllerLifecycle controller,
379 void onData(T data), 448 void onData(T data),
380 void onError(Object error), 449 void onError(Object error),
381 void onDone(), 450 void onDone(),
382 bool cancelOnError) 451 bool cancelOnError)
383 : super(controller, onData, onError, onDone, cancelOnError) { 452 : super(controller, onData, onError, onDone, cancelOnError) {
384 _next = _previous = this; 453 _next = _previous = this;
385 } 454 }
386 455
387 _MultiplexStreamController get _controller => super._controller; 456 _BroadcastStreamController get _controller => super._controller;
388 457
389 bool _expectsEvent(int eventId) { 458 bool _expectsEvent(int eventId) {
390 return (_eventState & _STATE_EVENT_ID) == eventId; 459 return (_eventState & _STATE_EVENT_ID) == eventId;
391 } 460 }
392 461
393 void _toggleEventId() { 462 void _toggleEventId() {
394 _eventState ^= _STATE_EVENT_ID; 463 _eventState ^= _STATE_EVENT_ID;
395 } 464 }
396 465
397 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; 466 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
398 467
399 bool _setRemoveAfterFiring() { 468 bool _setRemoveAfterFiring() {
400 assert(_isFiring); 469 assert(_isFiring);
401 _eventState |= _STATE_REMOVE_AFTER_FIRING; 470 _eventState |= _STATE_REMOVE_AFTER_FIRING;
402 } 471 }
403 472
404 bool get _removeAfterFiring => 473 bool get _removeAfterFiring =>
405 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; 474 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
406 } 475 }
407 476
408 477
409 class _MultiplexStreamController<T> implements StreamController<T>, 478 abstract class _BroadcastStreamController<T>
410 _StreamControllerLifecycle<T>, 479 implements StreamController<T>,
411 _MultiplexSubscriptionLink { 480 _StreamControllerLifecycle<T>,
481 _BroadcastSubscriptionLink,
482 _EventDispatch<T> {
412 static const int _STATE_INITIAL = 0; 483 static const int _STATE_INITIAL = 0;
413 static const int _STATE_EVENT_ID = 1; 484 static const int _STATE_EVENT_ID = 1;
414 static const int _STATE_FIRING = 2; 485 static const int _STATE_FIRING = 2;
415 static const int _STATE_CLOSED = 4; 486 static const int _STATE_CLOSED = 4;
416 487
417 final _NotificationHandler _onListen; 488 final _NotificationHandler _onListen;
418 final _NotificationHandler _onCancel; 489 final _NotificationHandler _onCancel;
419 490
420 // State of the controller. 491 // State of the controller.
421 int _state; 492 int _state;
422 493
423 // Double-linked list of active listeners. 494 // Double-linked list of active listeners.
424 _MultiplexSubscriptionLink _next; 495 _BroadcastSubscriptionLink _next;
425 _MultiplexSubscriptionLink _previous; 496 _BroadcastSubscriptionLink _previous;
426 497
427 _MultiplexStreamController(this._onListen, this._onCancel) 498 _BroadcastStreamController(this._onListen, this._onCancel)
428 : _state = _STATE_INITIAL { 499 : _state = _STATE_INITIAL {
429 _next = _previous = this; 500 _next = _previous = this;
430 } 501 }
431 502
432 // StreamController interface. 503 // StreamController interface.
433 504
434 Stream<T> get stream => new _MultiplexStream<T>(this); 505 Stream<T> get stream => new _BroadcastStream<T>(this);
435 506
436 EventSink<T> get sink => new _EventSinkView<T>(this); 507 EventSink<T> get sink => new _EventSinkView<T>(this);
437 508
438 bool get isClosed => (_state & _STATE_CLOSED) != 0; 509 bool get isClosed => (_state & _STATE_CLOSED) != 0;
439 510
440 /** 511 /**
441 * A multiplex controller is never paused. 512 * A broadcast controller is never paused.
442 * 513 *
443 * Each receiving stream may be paused individually, and they handle their 514 * Each receiving stream may be paused individually, and they handle their
444 * own buffering. 515 * own buffering.
445 */ 516 */
446 bool get isPaused => false; 517 bool get isPaused => false;
447 518
448 /** Whether there are currently a subscriber on the [Stream]. */ 519 /** Whether there are currently a subscriber on the [Stream]. */
449 bool get hasListener => !_isEmpty; 520 bool get hasListener => !_isEmpty;
450 521
451 /** Whether an event is being fired (sent to some, but not all, listeners). */ 522 /** Whether an event is being fired (sent to some, but not all, listeners). */
452 bool get _isFiring => (_state & _STATE_FIRING) != 0; 523 bool get _isFiring => (_state & _STATE_FIRING) != 0;
453 524
454 // Linked list helpers 525 // Linked list helpers
455 526
456 bool get _isEmpty => identical(_next, this); 527 bool get _isEmpty => identical(_next, this);
457 528
458 /** Adds subscription to linked list of active listeners. */ 529 /** Adds subscription to linked list of active listeners. */
459 void _addListener(_MultiplexSubscription<T> subscription) { 530 void _addListener(_BroadcastSubscription<T> subscription) {
460 _MultiplexSubscriptionLink previous = _previous; 531 _BroadcastSubscriptionLink previous = _previous;
461 previous._next = subscription; 532 previous._next = subscription;
462 _previous = subscription._previous; 533 _previous = subscription._previous;
463 subscription._previous._next = this; 534 subscription._previous._next = this;
464 subscription._previous = previous; 535 subscription._previous = previous;
465 subscription._eventState = (_state & _STATE_EVENT_ID); 536 subscription._eventState = (_state & _STATE_EVENT_ID);
466 } 537 }
467 538
468 void _removeListener(_MultiplexSubscription<T> subscription) { 539 void _removeListener(_BroadcastSubscription<T> subscription) {
469 assert(identical(subscription._controller, this)); 540 assert(identical(subscription._controller, this));
470 assert(!identical(subscription._next, subscription)); 541 assert(!identical(subscription._next, subscription));
471 subscription._previous._next = subscription._next; 542 subscription._previous._next = subscription._next;
472 subscription._next._previous = subscription._previous; 543 subscription._next._previous = subscription._previous;
473 subscription._next = subscription._previous = subscription; 544 subscription._next = subscription._previous = subscription;
474 } 545 }
475 546
476 // _StreamControllerLifecycle interface. 547 // _StreamControllerLifecycle interface.
477 548
478 void _recordListen(_MultiplexSubscription<T> subscription) { 549 void _recordListen(_BroadcastSubscription<T> subscription) {
479 _addListener(subscription); 550 _addListener(subscription);
480 if (identical(_next, _previous)) { 551 if (identical(_next, _previous)) {
481 // Only one listener, so it must be the first listener. 552 // Only one listener, so it must be the first listener.
482 _runGuarded(_onListen); 553 _runGuarded(_onListen);
483 } 554 }
484 } 555 }
485 556
486 void _recordCancel(_MultiplexSubscription<T> subscription) { 557 void _recordCancel(_BroadcastSubscription<T> subscription) {
487 if (subscription._isFiring) { 558 if (subscription._isFiring) {
488 subscription._setRemoveAfterFiring(); 559 subscription._setRemoveAfterFiring();
489 } else { 560 } else {
490 _removeListener(subscription); 561 _removeListener(subscription);
491 // If we are currently firing an event, the empty-check is performed at 562 // If we are currently firing an event, the empty-check is performed at
492 // the end of the listener loop instead of here. 563 // the end of the listener loop instead of here.
493 if ((_state & _STATE_FIRING) == 0 && _isEmpty) { 564 if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
494 _callOnCancel(); 565 _callOnCancel();
495 } 566 }
496 } 567 }
(...skipping 20 matching lines...) Expand all
517 } 588 }
518 589
519 void close() { 590 void close() {
520 if (isClosed) { 591 if (isClosed) {
521 throw new StateError("Cannot add new events after calling close()"); 592 throw new StateError("Cannot add new events after calling close()");
522 } 593 }
523 _state |= _STATE_CLOSED; 594 _state |= _STATE_CLOSED;
524 _sendDone(); 595 _sendDone();
525 } 596 }
526 597
527 // EventDispatch interface.
528
529 void _sendData(T data) {
530 if (_isEmpty) return;
531 _forEachListener((_BufferingStreamSubscription<T> subscription) {
532 subscription._add(data);
533 });
534 }
535
536 void _sendError(Object error) {
537 if (_isEmpty) return;
538 _forEachListener((_BufferingStreamSubscription<T> subscription) {
539 subscription._addError(error);
540 });
541 }
542
543 void _sendDone() {
544 if (_isEmpty) return;
545 _forEachListener((_MultiplexSubscription<T> subscription) {
546 subscription._close();
547 subscription._eventState |=
548 _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING;
549 });
550 }
551
552 void _forEachListener( 598 void _forEachListener(
553 void action(_BufferingStreamSubscription<T> subscription)) { 599 void action(_BufferingStreamSubscription<T> subscription)) {
554 if (_isFiring) { 600 if (_isFiring) {
555 throw new StateError( 601 throw new StateError(
556 "Cannot fire new event. Controller is already firing an event"); 602 "Cannot fire new event. Controller is already firing an event");
557 } 603 }
558 if (_isEmpty) return; 604 if (_isEmpty) return;
559 605
560 // Get event id of this event. 606 // Get event id of this event.
561 int id = (_state & _STATE_EVENT_ID); 607 int id = (_state & _STATE_EVENT_ID);
562 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] 608 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
563 // callbacks while firing, and we prevent reentrancy of this function. 609 // callbacks while firing, and we prevent reentrancy of this function.
564 // 610 //
565 // Set [_state]'s event id to the next event's id. 611 // Set [_state]'s event id to the next event's id.
566 // Any listeners added while firing this event will expect the next event, 612 // Any listeners added while firing this event will expect the next event,
567 // not this one, and won't get notified. 613 // not this one, and won't get notified.
568 _state ^= _STATE_EVENT_ID | _STATE_FIRING; 614 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
569 _MultiplexSubscriptionLink link = _next; 615 _BroadcastSubscriptionLink link = _next;
570 while (!identical(link, this)) { 616 while (!identical(link, this)) {
571 _MultiplexSubscription<T> subscription = link; 617 _BroadcastSubscription<T> subscription = link;
572 if (subscription._expectsEvent(id)) { 618 if (subscription._expectsEvent(id)) {
573 subscription._eventState |= _MultiplexSubscription._STATE_FIRING; 619 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
574 action(subscription); 620 action(subscription);
575 subscription._toggleEventId(); 621 subscription._toggleEventId();
576 link = subscription._next; 622 link = subscription._next;
577 if (subscription._removeAfterFiring) { 623 if (subscription._removeAfterFiring) {
578 _removeListener(subscription); 624 _removeListener(subscription);
579 } 625 }
580 subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; 626 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
581 } else { 627 } else {
582 link = subscription._next; 628 link = subscription._next;
583 } 629 }
584 } 630 }
585 _state &= ~_STATE_FIRING; 631 _state &= ~_STATE_FIRING;
586 632
587 if (_isEmpty) { 633 if (_isEmpty) {
588 _callOnCancel(); 634 _callOnCancel();
589 } 635 }
590 } 636 }
591 637
592 void _callOnCancel() { 638 void _callOnCancel() {
593 _runGuarded(_onCancel); 639 _runGuarded(_onCancel);
594 } 640 }
595 } 641 }
596 642
597 class _BufferingMultiplexStreamController<T> 643 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
598 extends _MultiplexStreamController<T> 644 _SyncBroadcastStreamController(void onListen(), void onCancel())
645 : super(onListen, onCancel);
646
647 // EventDispatch interface.
648
649 void _sendData(T data) {
650 if (_isEmpty) return;
651 _forEachListener((_BufferingStreamSubscription<T> subscription) {
652 subscription._add(data);
653 });
654 }
655
656 void _sendError(Object error) {
657 if (_isEmpty) return;
658 _forEachListener((_BufferingStreamSubscription<T> subscription) {
659 subscription._addError(error);
660 });
661 }
662
663 void _sendDone() {
664 if (_isEmpty) return;
665 _forEachListener((_BroadcastSubscription<T> subscription) {
666 subscription._close();
667 subscription._eventState |=
668 _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
669 });
670 }
671 }
672
673 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
674 _AsyncBroadcastStreamController(void onListen(), void onCancel())
675 : super(onListen, onCancel);
676
677 // EventDispatch interface.
678
679 void _sendData(T data) {
680 for (_BroadcastSubscriptionLink link = _next;
681 !identical(link, this);
682 link = link._next) {
683 _BroadcastSubscription<T> subscription = link;
684 subscription._addPending(new _DelayedData(data));
685 }
686 }
687
688 void _sendError(Object error) {
689 for (_BroadcastSubscriptionLink link = _next;
690 !identical(link, this);
691 link = link._next) {
692 _BroadcastSubscription<T> subscription = link;
693 subscription._addPending(new _DelayedError(error));
694 }
695 }
696
697 void _sendDone() {
698 for (_BroadcastSubscriptionLink link = _next;
699 !identical(link, this);
700 link = link._next) {
701 _BroadcastSubscription<T> subscription = link;
702 subscription._addPending(const _DelayedDone());
703 }
704 }
705 }
706
707 /**
708 * Stream controller that is used by [Stream.asBroadcastStream].
709 *
710 * This stream controller allows incoming events while it is firing
711 * other events. This is handled by delaying the events until the
712 * current event is done firing, and then fire the pending events.
713 *
714 * This class extends [_SyncBroadcastStreamController]. Events of
715 * an "asBroadcastStream" stream are always initiated by events
716 * on another stream, and it is fine to forward them synchronously.
717 */
718 class _AsBroadcastStreamController<T>
719 extends _SyncBroadcastStreamController<T>
599 implements _EventDispatch<T> { 720 implements _EventDispatch<T> {
600 _StreamImplEvents _pending; 721 _StreamImplEvents _pending;
601 722
602 _BufferingMultiplexStreamController(void onListen(), void onCancel()) 723 _AsBroadcastStreamController(void onListen(), void onCancel())
603 : super(onListen, onCancel); 724 : super(onListen, onCancel);
604 725
605 bool get _hasPending => _pending != null && ! _pending.isEmpty; 726 bool get _hasPending => _pending != null && ! _pending.isEmpty;
606 727
607 void _addPendingEvent(_DelayedEvent event) { 728 void _addPendingEvent(_DelayedEvent event) {
608 if (_pending == null) { 729 if (_pending == null) {
609 _pending = new _StreamImplEvents(); 730 _pending = new _StreamImplEvents();
610 } 731 }
611 _pending.add(event); 732 _pending.add(event);
612 } 733 }
(...skipping 29 matching lines...) Expand all
642 super.close(); 763 super.close();
643 assert(!_hasPending); 764 assert(!_hasPending);
644 } 765 }
645 766
646 void _callOnCancel() { 767 void _callOnCancel() {
647 if (_hasPending) { 768 if (_hasPending) {
648 _pending.clear(); 769 _pending.clear();
649 _pending = null; 770 _pending = null;
650 } 771 }
651 super._callOnCancel(); 772 super._callOnCancel();
652
653 } 773 }
654 } 774 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | sdk/lib/async/stream_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698