Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: tool/input_sdk/lib/async/broadcast_stream_controller.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 class _BroadcastStream<T> extends _ControllerStream<T> { 7 class _BroadcastStream<T> extends _ControllerStream<T> {
8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); 8 _BroadcastStream(_StreamControllerLifecycle<T> controller)
9 : super(controller);
9 10
10 bool get isBroadcast => true; 11 bool get isBroadcast => true;
11 } 12 }
12 13
13 abstract class _BroadcastSubscriptionLink { 14 class _BroadcastSubscription<T> extends _ControllerSubscription<T> {
14 _BroadcastSubscriptionLink _next;
15 _BroadcastSubscriptionLink _previous;
16 }
17
18 class _BroadcastSubscription<T> extends _ControllerSubscription<T>
19 implements _BroadcastSubscriptionLink {
20 static const int _STATE_EVENT_ID = 1; 15 static const int _STATE_EVENT_ID = 1;
21 static const int _STATE_FIRING = 2; 16 static const int _STATE_FIRING = 2;
22 static const int _STATE_REMOVE_AFTER_FIRING = 4; 17 static const int _STATE_REMOVE_AFTER_FIRING = 4;
23 // TODO(lrn): Use the _state field on _ControllerSubscription to 18 // TODO(lrn): Use the _state field on _ControllerSubscription to
24 // also store this state. Requires that the subscription implementation 19 // also store this state. Requires that the subscription implementation
25 // does not assume that it's use of the state integer is the only use. 20 // does not assume that it's use of the state integer is the only use.
26 int _eventState; 21 int _eventState = 0; // Initialized to help dart2js type inference.
27 22
28 _BroadcastSubscriptionLink _next; 23 _BroadcastSubscription<T> _next;
29 _BroadcastSubscriptionLink _previous; 24 _BroadcastSubscription<T> _previous;
30 25
31 _BroadcastSubscription(_StreamControllerLifecycle controller, 26 _BroadcastSubscription(_StreamControllerLifecycle<T> controller,
32 void onData(T data), 27 void onData(T data),
33 Function onError, 28 Function onError,
34 void onDone(), 29 void onDone(),
35 bool cancelOnError) 30 bool cancelOnError)
36 : super(controller, onData, onError, onDone, cancelOnError) { 31 : super(controller, onData, onError, onDone, cancelOnError) {
37 _next = _previous = this; 32 _next = _previous = this;
38 } 33 }
39 34
40 bool _expectsEvent(int eventId) => 35 bool _expectsEvent(int eventId) =>
41 (_eventState & _STATE_EVENT_ID) == eventId; 36 (_eventState & _STATE_EVENT_ID) == eventId;
(...skipping 16 matching lines...) Expand all
58 // so we don't bother calling it. 53 // so we don't bother calling it.
59 void _onPause() { } 54 void _onPause() { }
60 55
61 // The controller._recordResume doesn't do anything for a broadcast 56 // The controller._recordResume doesn't do anything for a broadcast
62 // controller, so we don't bother calling it. 57 // controller, so we don't bother calling it.
63 void _onResume() { } 58 void _onResume() { }
64 59
65 // _onCancel is inherited. 60 // _onCancel is inherited.
66 } 61 }
67 62
68
69 abstract class _BroadcastStreamController<T> 63 abstract class _BroadcastStreamController<T>
70 implements StreamController<T>, 64 implements StreamController<T>,
71 _StreamControllerLifecycle<T>, 65 _StreamControllerLifecycle<T>,
72 _BroadcastSubscriptionLink,
73 _EventSink<T>, 66 _EventSink<T>,
74 _EventDispatch<T> { 67 _EventDispatch<T> {
75 static const int _STATE_INITIAL = 0; 68 static const int _STATE_INITIAL = 0;
76 static const int _STATE_EVENT_ID = 1; 69 static const int _STATE_EVENT_ID = 1;
77 static const int _STATE_FIRING = 2; 70 static const int _STATE_FIRING = 2;
78 static const int _STATE_CLOSED = 4; 71 static const int _STATE_CLOSED = 4;
79 static const int _STATE_ADDSTREAM = 8; 72 static const int _STATE_ADDSTREAM = 8;
80 73
81 final _NotificationHandler _onListen; 74 ControllerCallback onListen;
82 final _NotificationHandler _onCancel; 75 ControllerCancelCallback onCancel;
83 76
84 // State of the controller. 77 // State of the controller.
85 int _state; 78 int _state;
86 79
87 // Double-linked list of active listeners. 80 // Double-linked list of active listeners.
88 _BroadcastSubscriptionLink _next; 81 _BroadcastSubscription<T> _firstSubscription;
89 _BroadcastSubscriptionLink _previous; 82 _BroadcastSubscription<T> _lastSubscription;
90 83
91 // Extra state used during an [addStream] call. 84 // Extra state used during an [addStream] call.
92 _AddStreamState<T> _addStreamState; 85 _AddStreamState<T> _addStreamState;
93 86
94 /** 87 /**
95 * Future returned by [close] and [done]. 88 * Future returned by [close] and [done].
96 * 89 *
97 * The future is completed whenever the done event has been sent to all 90 * The future is completed whenever the done event has been sent to all
98 * relevant listeners. 91 * relevant listeners.
99 * The relevant listeners are the ones that were listening when [close] was 92 * The relevant listeners are the ones that were listening when [close] was
100 * called. When all of these have been canceled (sending the done event makes 93 * called. When all of these have been canceled (sending the done event makes
101 * them cancel, but they can also be canceled before sending the event), 94 * them cancel, but they can also be canceled before sending the event),
102 * this future completes. 95 * this future completes.
103 * 96 *
104 * Any attempt to listen after calling [close] will throw, so there won't 97 * Any attempt to listen after calling [close] will throw, so there won't
105 * be any further listeners. 98 * be any further listeners.
106 */ 99 */
107 _Future _doneFuture; 100 _Future _doneFuture;
108 101
109 _BroadcastStreamController(this._onListen, this._onCancel) 102 _BroadcastStreamController(this.onListen, this.onCancel)
110 : _state = _STATE_INITIAL { 103 : _state = _STATE_INITIAL;
111 _next = _previous = this; 104
105 ControllerCallback get onPause {
106 throw new UnsupportedError(
107 "Broadcast stream controllers do not support pause callbacks");
108 }
109
110 void set onPause(void onPauseHandler()) {
111 throw new UnsupportedError(
112 "Broadcast stream controllers do not support pause callbacks");
113 }
114
115 ControllerCallback get onResume {
116 throw new UnsupportedError(
117 "Broadcast stream controllers do not support pause callbacks");
118 }
119
120 void set onResume(void onResumeHandler()) {
121 throw new UnsupportedError(
122 "Broadcast stream controllers do not support pause callbacks");
112 } 123 }
113 124
114 // StreamController interface. 125 // StreamController interface.
115 126
116 Stream<T> get stream => new _BroadcastStream<T>(this); 127 Stream<T> get stream => new _BroadcastStream<T>(this);
117 128
118 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); 129 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
119 130
120 bool get isClosed => (_state & _STATE_CLOSED) != 0; 131 bool get isClosed => (_state & _STATE_CLOSED) != 0;
121 132
122 /** 133 /**
123 * A broadcast controller is never paused. 134 * A broadcast controller is never paused.
124 * 135 *
125 * Each receiving stream may be paused individually, and they handle their 136 * Each receiving stream may be paused individually, and they handle their
126 * own buffering. 137 * own buffering.
127 */ 138 */
128 bool get isPaused => false; 139 bool get isPaused => false;
129 140
130 /** Whether there are currently one or more subscribers. */ 141 /** Whether there are currently one or more subscribers. */
131 bool get hasListener => !_isEmpty; 142 bool get hasListener => !_isEmpty;
132 143
133 /** 144 /**
134 * Test whether the stream has exactly one listener. 145 * Test whether the stream has exactly one listener.
135 * 146 *
136 * Assumes that the stream has a listener (not [_isEmpty]). 147 * Assumes that the stream has a listener (not [_isEmpty]).
137 */ 148 */
138 bool get _hasOneListener { 149 bool get _hasOneListener {
139 assert(!_isEmpty); 150 assert(!_isEmpty);
140 return identical(_next._next, this); 151 return identical(_firstSubscription, _lastSubscription);
141 } 152 }
142 153
143 /** Whether an event is being fired (sent to some, but not all, listeners). */ 154 /** Whether an event is being fired (sent to some, but not all, listeners). */
144 bool get _isFiring => (_state & _STATE_FIRING) != 0; 155 bool get _isFiring => (_state & _STATE_FIRING) != 0;
145 156
146 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; 157 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
147 158
148 bool get _mayAddEvent => (_state < _STATE_CLOSED); 159 bool get _mayAddEvent => (_state < _STATE_CLOSED);
149 160
150 _Future _ensureDoneFuture() { 161 _Future _ensureDoneFuture() {
151 if (_doneFuture != null) return _doneFuture; 162 if (_doneFuture != null) return _doneFuture;
152 return _doneFuture = new _Future(); 163 return _doneFuture = new _Future();
153 } 164 }
154 165
155 // Linked list helpers 166 // Linked list helpers
156 167
157 bool get _isEmpty => identical(_next, this); 168 bool get _isEmpty => _firstSubscription == null;
158 169
159 /** Adds subscription to linked list of active listeners. */ 170 /** Adds subscription to linked list of active listeners. */
160 void _addListener(_BroadcastSubscription<T> subscription) { 171 void _addListener(_BroadcastSubscription<T> subscription) {
161 assert(identical(subscription._next, subscription)); 172 assert(identical(subscription._next, subscription));
162 // Insert in linked list just before `this`.
163 subscription._previous = _previous;
164 subscription._next = this;
165 this._previous._next = subscription;
166 this._previous = subscription;
167 subscription._eventState = (_state & _STATE_EVENT_ID); 173 subscription._eventState = (_state & _STATE_EVENT_ID);
174 // Insert in linked list as last subscription.
175 _BroadcastSubscription<T> oldLast = _lastSubscription;
176 _lastSubscription = subscription;
177 subscription._next = null;
178 subscription._previous = oldLast;
179 if (oldLast == null) {
180 _firstSubscription = subscription;
181 } else {
182 oldLast._next = subscription;
183 }
168 } 184 }
169 185
170 void _removeListener(_BroadcastSubscription<T> subscription) { 186 void _removeListener(_BroadcastSubscription<T> subscription) {
171 assert(identical(subscription._controller, this)); 187 assert(identical(subscription._controller, this));
172 assert(!identical(subscription._next, subscription)); 188 assert(!identical(subscription._next, subscription));
173 _BroadcastSubscriptionLink previous = subscription._previous; 189 _BroadcastSubscription<T> previous = subscription._previous;
174 _BroadcastSubscriptionLink next = subscription._next; 190 _BroadcastSubscription<T> next = subscription._next;
175 previous._next = next; 191 if (previous == null) {
176 next._previous = previous; 192 // This was the first subscription.
193 _firstSubscription = next;
194 } else {
195 previous._next = next;
196 }
197 if (next == null) {
198 // This was the last subscription.
199 _lastSubscription = previous;
200 } else {
201 next._previous = previous;
202 }
203
177 subscription._next = subscription._previous = subscription; 204 subscription._next = subscription._previous = subscription;
178 } 205 }
179 206
180 // _StreamControllerLifecycle interface. 207 // _StreamControllerLifecycle interface.
181 208
182 StreamSubscription<T> _subscribe( 209 StreamSubscription<T> _subscribe(
183 void onData(T data), 210 void onData(T data),
184 Function onError, 211 Function onError,
185 void onDone(), 212 void onDone(),
186 bool cancelOnError) { 213 bool cancelOnError) {
187 if (isClosed) { 214 if (isClosed) {
188 if (onDone == null) onDone = _nullDoneHandler; 215 if (onDone == null) onDone = _nullDoneHandler;
189 return new _DoneStreamSubscription<T>(onDone); 216 return new _DoneStreamSubscription<T>(onDone);
190 } 217 }
191 StreamSubscription subscription = 218 StreamSubscription<T> subscription =
192 new _BroadcastSubscription<T>(this, onData, onError, onDone, 219 new _BroadcastSubscription<T>(this, onData, onError, onDone,
193 cancelOnError); 220 cancelOnError);
194 _addListener(subscription); 221 _addListener(subscription);
195 if (identical(_next, _previous)) { 222 if (identical(_firstSubscription, _lastSubscription)) {
196 // Only one listener, so it must be the first listener. 223 // Only one listener, so it must be the first listener.
197 _runGuarded(_onListen); 224 _runGuarded(onListen);
198 } 225 }
199 return subscription; 226 return subscription;
200 } 227 }
201 228
202 Future _recordCancel(StreamSubscription<T> sub) { 229 Future _recordCancel(StreamSubscription<T> sub) {
203 var subscription = sub as _BroadcastSubscription<T>; 230 _BroadcastSubscription<T> subscription = sub;
204 // If already removed by the stream, don't remove it again. 231 // If already removed by the stream, don't remove it again.
205 if (identical(subscription._next, subscription)) return null; 232 if (identical(subscription._next, subscription)) return null;
206 assert(!identical(subscription._next, subscription));
207 if (subscription._isFiring) { 233 if (subscription._isFiring) {
208 subscription._setRemoveAfterFiring(); 234 subscription._setRemoveAfterFiring();
209 } else { 235 } else {
210 assert(!identical(subscription._next, subscription));
211 _removeListener(subscription); 236 _removeListener(subscription);
212 // If we are currently firing an event, the empty-check is performed at 237 // If we are currently firing an event, the empty-check is performed at
213 // the end of the listener loop instead of here. 238 // the end of the listener loop instead of here.
214 if (!_isFiring && _isEmpty) { 239 if (!_isFiring && _isEmpty) {
215 _callOnCancel(); 240 _callOnCancel();
216 } 241 }
217 } 242 }
218 return null; 243 return null;
219 } 244 }
220 245
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 void _forEachListener( 314 void _forEachListener(
290 void action(_BufferingStreamSubscription<T> subscription)) { 315 void action(_BufferingStreamSubscription<T> subscription)) {
291 if (_isFiring) { 316 if (_isFiring) {
292 throw new StateError( 317 throw new StateError(
293 "Cannot fire new event. Controller is already firing an event"); 318 "Cannot fire new event. Controller is already firing an event");
294 } 319 }
295 if (_isEmpty) return; 320 if (_isEmpty) return;
296 321
297 // Get event id of this event. 322 // Get event id of this event.
298 int id = (_state & _STATE_EVENT_ID); 323 int id = (_state & _STATE_EVENT_ID);
299 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] 324 // Start firing (set the _STATE_FIRING bit). We don't do [onCancel]
300 // callbacks while firing, and we prevent reentrancy of this function. 325 // callbacks while firing, and we prevent reentrancy of this function.
301 // 326 //
302 // Set [_state]'s event id to the next event's id. 327 // Set [_state]'s event id to the next event's id.
303 // Any listeners added while firing this event will expect the next event, 328 // Any listeners added while firing this event will expect the next event,
304 // not this one, and won't get notified. 329 // not this one, and won't get notified.
305 _state ^= _STATE_EVENT_ID | _STATE_FIRING; 330 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
306 _BroadcastSubscriptionLink link = _next; 331 _BroadcastSubscription<T> subscription = _firstSubscription;
307 while (!identical(link, this)) { 332 while (subscription != null) {
308 _BroadcastSubscription<T> subscription = link;
309 if (subscription._expectsEvent(id)) { 333 if (subscription._expectsEvent(id)) {
310 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; 334 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
311 action(subscription); 335 action(subscription);
312 subscription._toggleEventId(); 336 subscription._toggleEventId();
313 link = subscription._next; 337 _BroadcastSubscription<T> next = subscription._next;
314 if (subscription._removeAfterFiring) { 338 if (subscription._removeAfterFiring) {
315 _removeListener(subscription); 339 _removeListener(subscription);
316 } 340 }
317 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; 341 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
342 subscription = next;
318 } else { 343 } else {
319 link = subscription._next; 344 subscription = subscription._next;
320 } 345 }
321 } 346 }
322 _state &= ~_STATE_FIRING; 347 _state &= ~_STATE_FIRING;
323 348
324 if (_isEmpty) { 349 if (_isEmpty) {
325 _callOnCancel(); 350 _callOnCancel();
326 } 351 }
327 } 352 }
328 353
329 void _callOnCancel() { 354 void _callOnCancel() {
330 assert(_isEmpty); 355 assert(_isEmpty);
331 if (isClosed && _doneFuture._mayComplete) { 356 if (isClosed && _doneFuture._mayComplete) {
332 // When closed, _doneFuture is not null. 357 // When closed, _doneFuture is not null.
333 _doneFuture._asyncComplete(null); 358 _doneFuture._asyncComplete(null);
334 } 359 }
335 _runGuarded(_onCancel); 360 _runGuarded(onCancel);
336 } 361 }
337 } 362 }
338 363
339 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { 364 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
365 implements SynchronousStreamController<T> {
340 _SyncBroadcastStreamController(void onListen(), void onCancel()) 366 _SyncBroadcastStreamController(void onListen(), void onCancel())
341 : super(onListen, onCancel); 367 : super(onListen, onCancel);
342 368
343 // EventDispatch interface. 369 // EventDispatch interface.
344 370
371 bool get _mayAddEvent => super._mayAddEvent && !_isFiring;
372
373 _addEventError() {
374 if (_isFiring) {
375 return new StateError(
376 "Cannot fire new event. Controller is already firing an event");
377 }
378 return super._addEventError();
379 }
380
345 void _sendData(T data) { 381 void _sendData(T data) {
346 if (_isEmpty) return; 382 if (_isEmpty) return;
347 if (_hasOneListener) { 383 if (_hasOneListener) {
348 _state |= _BroadcastStreamController._STATE_FIRING; 384 _state |= _BroadcastStreamController._STATE_FIRING;
349 _BroadcastSubscription subscription = _next; 385 _BroadcastSubscription<T> subscription = _firstSubscription;
350 subscription._add(data); 386 subscription._add(data);
351 _state &= ~_BroadcastStreamController._STATE_FIRING; 387 _state &= ~_BroadcastStreamController._STATE_FIRING;
352 if (_isEmpty) { 388 if (_isEmpty) {
353 _callOnCancel(); 389 _callOnCancel();
354 } 390 }
355 return; 391 return;
356 } 392 }
357 _forEachListener((_BufferingStreamSubscription<T> subscription) { 393 _forEachListener((_BufferingStreamSubscription<T> subscription) {
358 subscription._add(data); 394 subscription._add(data);
359 }); 395 });
360 } 396 }
361 397
362 void _sendError(Object error, StackTrace stackTrace) { 398 void _sendError(Object error, StackTrace stackTrace) {
363 if (_isEmpty) return; 399 if (_isEmpty) return;
364 _forEachListener((_BufferingStreamSubscription<T> subscription) { 400 _forEachListener((_BufferingStreamSubscription<T> subscription) {
365 subscription._addError(error, stackTrace); 401 subscription._addError(error, stackTrace);
366 }); 402 });
367 } 403 }
368 404
369 void _sendDone() { 405 void _sendDone() {
370 if (!_isEmpty) { 406 if (!_isEmpty) {
371 _forEachListener((_BroadcastSubscription<T> subscription) { 407 _forEachListener((_BufferingStreamSubscription<T> subscription) {
372 subscription._close(); 408 subscription._close();
373 }); 409 });
374 } else { 410 } else {
375 assert(_doneFuture != null); 411 assert(_doneFuture != null);
376 assert(_doneFuture._mayComplete); 412 assert(_doneFuture._mayComplete);
377 _doneFuture._asyncComplete(null); 413 _doneFuture._asyncComplete(null);
378 } 414 }
379 } 415 }
380 } 416 }
381 417
382 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { 418 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
383 _AsyncBroadcastStreamController(void onListen(), void onCancel()) 419 _AsyncBroadcastStreamController(void onListen(), void onCancel())
384 : super(onListen, onCancel); 420 : super(onListen, onCancel);
385 421
386 // EventDispatch interface. 422 // EventDispatch interface.
387 423
388 void _sendData(T data) { 424 void _sendData(T data) {
389 for (_BroadcastSubscriptionLink link = _next; 425 for (_BroadcastSubscription<T> subscription = _firstSubscription;
390 !identical(link, this); 426 subscription != null;
391 link = link._next) { 427 subscription = subscription._next) {
392 _BroadcastSubscription<T> subscription = link; 428 subscription._addPending(new _DelayedData<T>(data));
393 subscription._addPending(new _DelayedData(data));
394 } 429 }
395 } 430 }
396 431
397 void _sendError(Object error, StackTrace stackTrace) { 432 void _sendError(Object error, StackTrace stackTrace) {
398 for (_BroadcastSubscriptionLink link = _next; 433 for (_BroadcastSubscription<T> subscription = _firstSubscription;
399 !identical(link, this); 434 subscription != null;
400 link = link._next) { 435 subscription = subscription._next) {
401 _BroadcastSubscription<T> subscription = link;
402 subscription._addPending(new _DelayedError(error, stackTrace)); 436 subscription._addPending(new _DelayedError(error, stackTrace));
403 } 437 }
404 } 438 }
405 439
406 void _sendDone() { 440 void _sendDone() {
407 if (!_isEmpty) { 441 if (!_isEmpty) {
408 for (_BroadcastSubscriptionLink link = _next; 442 for (_BroadcastSubscription<T> subscription = _firstSubscription;
409 !identical(link, this); 443 subscription != null;
410 link = link._next) { 444 subscription = subscription._next) {
411 _BroadcastSubscription<T> subscription = link;
412 subscription._addPending(const _DelayedDone()); 445 subscription._addPending(const _DelayedDone());
413 } 446 }
414 } else { 447 } else {
415 assert(_doneFuture != null); 448 assert(_doneFuture != null);
416 assert(_doneFuture._mayComplete); 449 assert(_doneFuture._mayComplete);
417 _doneFuture._asyncComplete(null); 450 _doneFuture._asyncComplete(null);
418 } 451 }
419 } 452 }
420 } 453 }
421 454
422 /** 455 /**
423 * Stream controller that is used by [Stream.asBroadcastStream]. 456 * Stream controller that is used by [Stream.asBroadcastStream].
424 * 457 *
425 * This stream controller allows incoming events while it is firing 458 * This stream controller allows incoming events while it is firing
426 * other events. This is handled by delaying the events until the 459 * other events. This is handled by delaying the events until the
427 * current event is done firing, and then fire the pending events. 460 * current event is done firing, and then fire the pending events.
428 * 461 *
429 * This class extends [_SyncBroadcastStreamController]. Events of 462 * This class extends [_SyncBroadcastStreamController]. Events of
430 * an "asBroadcastStream" stream are always initiated by events 463 * an "asBroadcastStream" stream are always initiated by events
431 * on another stream, and it is fine to forward them synchronously. 464 * on another stream, and it is fine to forward them synchronously.
432 */ 465 */
433 class _AsBroadcastStreamController<T> 466 class _AsBroadcastStreamController<T>
434 extends _SyncBroadcastStreamController<T> 467 extends _SyncBroadcastStreamController<T>
435 implements _EventDispatch<T> { 468 implements _EventDispatch<T> {
436 _StreamImplEvents _pending; 469 _StreamImplEvents<T> _pending;
437 470
438 _AsBroadcastStreamController(void onListen(), void onCancel()) 471 _AsBroadcastStreamController(void onListen(), void onCancel())
439 : super(onListen, onCancel); 472 : super(onListen, onCancel);
440 473
441 bool get _hasPending => _pending != null && ! _pending.isEmpty; 474 bool get _hasPending => _pending != null && ! _pending.isEmpty;
442 475
443 void _addPendingEvent(_DelayedEvent event) { 476 void _addPendingEvent(_DelayedEvent event) {
444 if (_pending == null) { 477 if (_pending == null) {
445 _pending = new _StreamImplEvents(); 478 _pending = new _StreamImplEvents<T>();
446 } 479 }
447 _pending.add(event); 480 _pending.add(event);
448 } 481 }
449 482
450 void add(T data) { 483 void add(T data) {
451 if (!isClosed && _isFiring) { 484 if (!isClosed && _isFiring) {
452 _addPendingEvent(new _DelayedData<T>(data)); 485 _addPendingEvent(new _DelayedData<T>(data));
453 return; 486 return;
454 } 487 }
455 super.add(data); 488 super.add(data);
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
500 void pause([Future resumeSignal]) { 533 void pause([Future resumeSignal]) {
501 if (resumeSignal != null) resumeSignal.then(_resume); 534 if (resumeSignal != null) resumeSignal.then(_resume);
502 _pauseCount++; 535 _pauseCount++;
503 } 536 }
504 void resume() { _resume(null); } 537 void resume() { _resume(null); }
505 void _resume(_) { 538 void _resume(_) {
506 if (_pauseCount > 0) _pauseCount--; 539 if (_pauseCount > 0) _pauseCount--;
507 } 540 }
508 Future cancel() { return new _Future.immediate(null); } 541 Future cancel() { return new _Future.immediate(null); }
509 bool get isPaused => _pauseCount > 0; 542 bool get isPaused => _pauseCount > 0;
510 Future asFuture([Object value]) => new _Future(); 543 Future/*<E>*/ asFuture/*<E>*/([Object/*=E*/ value]) => new _Future/*<E>*/();
511 } 544 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698