Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(429)

Side by Side Diff: pkg/dev_compiler/tool/input_sdk/lib/async/broadcast_stream_controller.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 class _BroadcastStream<T> extends _ControllerStream<T> {
8 _BroadcastStream(_StreamControllerLifecycle<T> controller)
9 : super(controller);
10
11 bool get isBroadcast => true;
12 }
13
14 class _BroadcastSubscription<T> extends _ControllerSubscription<T> {
15 static const int _STATE_EVENT_ID = 1;
16 static const int _STATE_FIRING = 2;
17 static const int _STATE_REMOVE_AFTER_FIRING = 4;
18 // TODO(lrn): Use the _state field on _ControllerSubscription to
19 // also store this state. Requires that the subscription implementation
20 // does not assume that it's use of the state integer is the only use.
21 int _eventState = 0; // Initialized to help dart2js type inference.
22
23 _BroadcastSubscription<T> _next;
24 _BroadcastSubscription<T> _previous;
25
26 _BroadcastSubscription(_StreamControllerLifecycle<T> controller,
27 void onData(T data),
28 Function onError,
29 void onDone(),
30 bool cancelOnError)
31 : super(controller, onData, onError, onDone, cancelOnError) {
32 _next = _previous = this;
33 }
34
35 bool _expectsEvent(int eventId) =>
36 (_eventState & _STATE_EVENT_ID) == eventId;
37
38 void _toggleEventId() {
39 _eventState ^= _STATE_EVENT_ID;
40 }
41
42 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
43
44 void _setRemoveAfterFiring() {
45 assert(_isFiring);
46 _eventState |= _STATE_REMOVE_AFTER_FIRING;
47 }
48
49 bool get _removeAfterFiring =>
50 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
51
52 // The controller._recordPause doesn't do anything for a broadcast controller,
53 // so we don't bother calling it.
54 void _onPause() { }
55
56 // The controller._recordResume doesn't do anything for a broadcast
57 // controller, so we don't bother calling it.
58 void _onResume() { }
59
60 // _onCancel is inherited.
61 }
62
63 abstract class _BroadcastStreamController<T>
64 implements StreamController<T>,
65 _StreamControllerLifecycle<T>,
66 _EventSink<T>,
67 _EventDispatch<T> {
68 static const int _STATE_INITIAL = 0;
69 static const int _STATE_EVENT_ID = 1;
70 static const int _STATE_FIRING = 2;
71 static const int _STATE_CLOSED = 4;
72 static const int _STATE_ADDSTREAM = 8;
73
74 ControllerCallback onListen;
75 ControllerCancelCallback onCancel;
76
77 // State of the controller.
78 int _state;
79
80 // Double-linked list of active listeners.
81 _BroadcastSubscription<T> _firstSubscription;
82 _BroadcastSubscription<T> _lastSubscription;
83
84 // Extra state used during an [addStream] call.
85 _AddStreamState<T> _addStreamState;
86
87 /**
88 * Future returned by [close] and [done].
89 *
90 * The future is completed whenever the done event has been sent to all
91 * relevant listeners.
92 * The relevant listeners are the ones that were listening when [close] was
93 * called. When all of these have been canceled (sending the done event makes
94 * them cancel, but they can also be canceled before sending the event),
95 * this future completes.
96 *
97 * Any attempt to listen after calling [close] will throw, so there won't
98 * be any further listeners.
99 */
100 _Future _doneFuture;
101
102 _BroadcastStreamController(this.onListen, this.onCancel)
103 : _state = _STATE_INITIAL;
104
105 ControllerCallback get onPause {
106 throw new UnsupportedError(
107 "Broadcast stream controllers do not support pause callbacks");
108 }
109
110 void set onPause(void onPauseHandler()) {
111 throw new UnsupportedError(
112 "Broadcast stream controllers do not support pause callbacks");
113 }
114
115 ControllerCallback get onResume {
116 throw new UnsupportedError(
117 "Broadcast stream controllers do not support pause callbacks");
118 }
119
120 void set onResume(void onResumeHandler()) {
121 throw new UnsupportedError(
122 "Broadcast stream controllers do not support pause callbacks");
123 }
124
125 // StreamController interface.
126
127 Stream<T> get stream => new _BroadcastStream<T>(this);
128
129 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
130
131 bool get isClosed => (_state & _STATE_CLOSED) != 0;
132
133 /**
134 * A broadcast controller is never paused.
135 *
136 * Each receiving stream may be paused individually, and they handle their
137 * own buffering.
138 */
139 bool get isPaused => false;
140
141 /** Whether there are currently one or more subscribers. */
142 bool get hasListener => !_isEmpty;
143
144 /**
145 * Test whether the stream has exactly one listener.
146 *
147 * Assumes that the stream has a listener (not [_isEmpty]).
148 */
149 bool get _hasOneListener {
150 assert(!_isEmpty);
151 return identical(_firstSubscription, _lastSubscription);
152 }
153
154 /** Whether an event is being fired (sent to some, but not all, listeners). */
155 bool get _isFiring => (_state & _STATE_FIRING) != 0;
156
157 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
158
159 bool get _mayAddEvent => (_state < _STATE_CLOSED);
160
161 _Future _ensureDoneFuture() {
162 if (_doneFuture != null) return _doneFuture;
163 return _doneFuture = new _Future();
164 }
165
166 // Linked list helpers
167
168 bool get _isEmpty => _firstSubscription == null;
169
170 /** Adds subscription to linked list of active listeners. */
171 void _addListener(_BroadcastSubscription<T> subscription) {
172 assert(identical(subscription._next, subscription));
173 subscription._eventState = (_state & _STATE_EVENT_ID);
174 // Insert in linked list as last subscription.
175 _BroadcastSubscription<T> oldLast = _lastSubscription;
176 _lastSubscription = subscription;
177 subscription._next = null;
178 subscription._previous = oldLast;
179 if (oldLast == null) {
180 _firstSubscription = subscription;
181 } else {
182 oldLast._next = subscription;
183 }
184 }
185
186 void _removeListener(_BroadcastSubscription<T> subscription) {
187 assert(identical(subscription._controller, this));
188 assert(!identical(subscription._next, subscription));
189 _BroadcastSubscription<T> previous = subscription._previous;
190 _BroadcastSubscription<T> next = subscription._next;
191 if (previous == null) {
192 // This was the first subscription.
193 _firstSubscription = next;
194 } else {
195 previous._next = next;
196 }
197 if (next == null) {
198 // This was the last subscription.
199 _lastSubscription = previous;
200 } else {
201 next._previous = previous;
202 }
203
204 subscription._next = subscription._previous = subscription;
205 }
206
207 // _StreamControllerLifecycle interface.
208
209 StreamSubscription<T> _subscribe(
210 void onData(T data),
211 Function onError,
212 void onDone(),
213 bool cancelOnError) {
214 if (isClosed) {
215 if (onDone == null) onDone = _nullDoneHandler;
216 return new _DoneStreamSubscription<T>(onDone);
217 }
218 StreamSubscription<T> subscription =
219 new _BroadcastSubscription<T>(this, onData, onError, onDone,
220 cancelOnError);
221 _addListener(subscription);
222 if (identical(_firstSubscription, _lastSubscription)) {
223 // Only one listener, so it must be the first listener.
224 _runGuarded(onListen);
225 }
226 return subscription;
227 }
228
229 Future _recordCancel(StreamSubscription<T> sub) {
230 _BroadcastSubscription<T> subscription = sub;
231 // If already removed by the stream, don't remove it again.
232 if (identical(subscription._next, subscription)) return null;
233 if (subscription._isFiring) {
234 subscription._setRemoveAfterFiring();
235 } else {
236 _removeListener(subscription);
237 // If we are currently firing an event, the empty-check is performed at
238 // the end of the listener loop instead of here.
239 if (!_isFiring && _isEmpty) {
240 _callOnCancel();
241 }
242 }
243 return null;
244 }
245
246 void _recordPause(StreamSubscription<T> subscription) {}
247 void _recordResume(StreamSubscription<T> subscription) {}
248
249 // EventSink interface.
250
251 Error _addEventError() {
252 if (isClosed) {
253 return new StateError("Cannot add new events after calling close");
254 }
255 assert(_isAddingStream);
256 return new StateError("Cannot add new events while doing an addStream");
257 }
258
259 void add(T data) {
260 if (!_mayAddEvent) throw _addEventError();
261 _sendData(data);
262 }
263
264 void addError(Object error, [StackTrace stackTrace]) {
265 error = _nonNullError(error);
266 if (!_mayAddEvent) throw _addEventError();
267 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
268 if (replacement != null) {
269 error = _nonNullError(replacement.error);
270 stackTrace = replacement.stackTrace;
271 }
272 _sendError(error, stackTrace);
273 }
274
275 Future close() {
276 if (isClosed) {
277 assert(_doneFuture != null);
278 return _doneFuture;
279 }
280 if (!_mayAddEvent) throw _addEventError();
281 _state |= _STATE_CLOSED;
282 Future doneFuture = _ensureDoneFuture();
283 _sendDone();
284 return doneFuture;
285 }
286
287 Future get done => _ensureDoneFuture();
288
289 Future addStream(Stream<T> stream, {bool cancelOnError: true}) {
290 if (!_mayAddEvent) throw _addEventError();
291 _state |= _STATE_ADDSTREAM;
292 _addStreamState = new _AddStreamState(this, stream, cancelOnError);
293 return _addStreamState.addStreamFuture;
294 }
295
296 // _EventSink interface, called from AddStreamState.
297 void _add(T data) {
298 _sendData(data);
299 }
300
301 void _addError(Object error, StackTrace stackTrace) {
302 _sendError(error, stackTrace);
303 }
304
305 void _close() {
306 assert(_isAddingStream);
307 _AddStreamState addState = _addStreamState;
308 _addStreamState = null;
309 _state &= ~_STATE_ADDSTREAM;
310 addState.complete();
311 }
312
313 // Event handling.
314 void _forEachListener(
315 void action(_BufferingStreamSubscription<T> subscription)) {
316 if (_isFiring) {
317 throw new StateError(
318 "Cannot fire new event. Controller is already firing an event");
319 }
320 if (_isEmpty) return;
321
322 // Get event id of this event.
323 int id = (_state & _STATE_EVENT_ID);
324 // Start firing (set the _STATE_FIRING bit). We don't do [onCancel]
325 // callbacks while firing, and we prevent reentrancy of this function.
326 //
327 // Set [_state]'s event id to the next event's id.
328 // Any listeners added while firing this event will expect the next event,
329 // not this one, and won't get notified.
330 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
331 _BroadcastSubscription<T> subscription = _firstSubscription;
332 while (subscription != null) {
333 if (subscription._expectsEvent(id)) {
334 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
335 action(subscription);
336 subscription._toggleEventId();
337 _BroadcastSubscription<T> next = subscription._next;
338 if (subscription._removeAfterFiring) {
339 _removeListener(subscription);
340 }
341 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
342 subscription = next;
343 } else {
344 subscription = subscription._next;
345 }
346 }
347 _state &= ~_STATE_FIRING;
348
349 if (_isEmpty) {
350 _callOnCancel();
351 }
352 }
353
354 void _callOnCancel() {
355 assert(_isEmpty);
356 if (isClosed && _doneFuture._mayComplete) {
357 // When closed, _doneFuture is not null.
358 _doneFuture._asyncComplete(null);
359 }
360 _runGuarded(onCancel);
361 }
362 }
363
364 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
365 implements SynchronousStreamController<T> {
366 _SyncBroadcastStreamController(void onListen(), void onCancel())
367 : super(onListen, onCancel);
368
369 // EventDispatch interface.
370
371 bool get _mayAddEvent => super._mayAddEvent && !_isFiring;
372
373 _addEventError() {
374 if (_isFiring) {
375 return new StateError(
376 "Cannot fire new event. Controller is already firing an event");
377 }
378 return super._addEventError();
379 }
380
381 void _sendData(T data) {
382 if (_isEmpty) return;
383 if (_hasOneListener) {
384 _state |= _BroadcastStreamController._STATE_FIRING;
385 _BroadcastSubscription<T> subscription = _firstSubscription;
386 subscription._add(data);
387 _state &= ~_BroadcastStreamController._STATE_FIRING;
388 if (_isEmpty) {
389 _callOnCancel();
390 }
391 return;
392 }
393 _forEachListener((_BufferingStreamSubscription<T> subscription) {
394 subscription._add(data);
395 });
396 }
397
398 void _sendError(Object error, StackTrace stackTrace) {
399 if (_isEmpty) return;
400 _forEachListener((_BufferingStreamSubscription<T> subscription) {
401 subscription._addError(error, stackTrace);
402 });
403 }
404
405 void _sendDone() {
406 if (!_isEmpty) {
407 _forEachListener((_BufferingStreamSubscription<T> subscription) {
408 subscription._close();
409 });
410 } else {
411 assert(_doneFuture != null);
412 assert(_doneFuture._mayComplete);
413 _doneFuture._asyncComplete(null);
414 }
415 }
416 }
417
418 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
419 _AsyncBroadcastStreamController(void onListen(), void onCancel())
420 : super(onListen, onCancel);
421
422 // EventDispatch interface.
423
424 void _sendData(T data) {
425 for (_BroadcastSubscription<T> subscription = _firstSubscription;
426 subscription != null;
427 subscription = subscription._next) {
428 subscription._addPending(new _DelayedData<T>(data));
429 }
430 }
431
432 void _sendError(Object error, StackTrace stackTrace) {
433 for (_BroadcastSubscription<T> subscription = _firstSubscription;
434 subscription != null;
435 subscription = subscription._next) {
436 subscription._addPending(new _DelayedError(error, stackTrace));
437 }
438 }
439
440 void _sendDone() {
441 if (!_isEmpty) {
442 for (_BroadcastSubscription<T> subscription = _firstSubscription;
443 subscription != null;
444 subscription = subscription._next) {
445 subscription._addPending(const _DelayedDone());
446 }
447 } else {
448 assert(_doneFuture != null);
449 assert(_doneFuture._mayComplete);
450 _doneFuture._asyncComplete(null);
451 }
452 }
453 }
454
455 /**
456 * Stream controller that is used by [Stream.asBroadcastStream].
457 *
458 * This stream controller allows incoming events while it is firing
459 * other events. This is handled by delaying the events until the
460 * current event is done firing, and then fire the pending events.
461 *
462 * This class extends [_SyncBroadcastStreamController]. Events of
463 * an "asBroadcastStream" stream are always initiated by events
464 * on another stream, and it is fine to forward them synchronously.
465 */
466 class _AsBroadcastStreamController<T>
467 extends _SyncBroadcastStreamController<T>
468 implements _EventDispatch<T> {
469 _StreamImplEvents<T> _pending;
470
471 _AsBroadcastStreamController(void onListen(), void onCancel())
472 : super(onListen, onCancel);
473
474 bool get _hasPending => _pending != null && ! _pending.isEmpty;
475
476 void _addPendingEvent(_DelayedEvent event) {
477 if (_pending == null) {
478 _pending = new _StreamImplEvents<T>();
479 }
480 _pending.add(event);
481 }
482
483 void add(T data) {
484 if (!isClosed && _isFiring) {
485 _addPendingEvent(new _DelayedData<T>(data));
486 return;
487 }
488 super.add(data);
489 while (_hasPending) {
490 _pending.handleNext(this);
491 }
492 }
493
494 void addError(Object error, [StackTrace stackTrace]) {
495 if (!isClosed && _isFiring) {
496 _addPendingEvent(new _DelayedError(error, stackTrace));
497 return;
498 }
499 if (!_mayAddEvent) throw _addEventError();
500 _sendError(error, stackTrace);
501 while (_hasPending) {
502 _pending.handleNext(this);
503 }
504 }
505
506 Future close() {
507 if (!isClosed && _isFiring) {
508 _addPendingEvent(const _DelayedDone());
509 _state |= _BroadcastStreamController._STATE_CLOSED;
510 return super.done;
511 }
512 Future result = super.close();
513 assert(!_hasPending);
514 return result;
515 }
516
517 void _callOnCancel() {
518 if (_hasPending) {
519 _pending.clear();
520 _pending = null;
521 }
522 super._callOnCancel();
523 }
524 }
525
526 // A subscription that never receives any events.
527 // It can simulate pauses, but otherwise does nothing.
528 class _DoneSubscription<T> implements StreamSubscription<T> {
529 int _pauseCount = 0;
530 void onData(void handleData(T data)) {}
531 void onError(Function handleError) {}
532 void onDone(void handleDone()) {}
533 void pause([Future resumeSignal]) {
534 if (resumeSignal != null) resumeSignal.then(_resume);
535 _pauseCount++;
536 }
537 void resume() { _resume(null); }
538 void _resume(_) {
539 if (_pauseCount > 0) _pauseCount--;
540 }
541 Future cancel() { return new _Future.immediate(null); }
542 bool get isPaused => _pauseCount > 0;
543 Future/*<E>*/ asFuture/*<E>*/([Object/*=E*/ value]) => new _Future/*<E>*/();
544 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698