OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 part of dart.async; | |
6 | |
7 class _BroadcastStream<T> extends _ControllerStream<T> { | |
8 _BroadcastStream(_StreamControllerLifecycle<T> controller) | |
9 : super(controller); | |
10 | |
11 bool get isBroadcast => true; | |
12 } | |
13 | |
14 class _BroadcastSubscription<T> extends _ControllerSubscription<T> { | |
15 static const int _STATE_EVENT_ID = 1; | |
16 static const int _STATE_FIRING = 2; | |
17 static const int _STATE_REMOVE_AFTER_FIRING = 4; | |
18 // TODO(lrn): Use the _state field on _ControllerSubscription to | |
19 // also store this state. Requires that the subscription implementation | |
20 // does not assume that it's use of the state integer is the only use. | |
21 int _eventState = 0; // Initialized to help dart2js type inference. | |
22 | |
23 _BroadcastSubscription<T> _next; | |
24 _BroadcastSubscription<T> _previous; | |
25 | |
26 _BroadcastSubscription(_StreamControllerLifecycle<T> controller, | |
27 void onData(T data), | |
28 Function onError, | |
29 void onDone(), | |
30 bool cancelOnError) | |
31 : super(controller, onData, onError, onDone, cancelOnError) { | |
32 _next = _previous = this; | |
33 } | |
34 | |
35 bool _expectsEvent(int eventId) => | |
36 (_eventState & _STATE_EVENT_ID) == eventId; | |
37 | |
38 void _toggleEventId() { | |
39 _eventState ^= _STATE_EVENT_ID; | |
40 } | |
41 | |
42 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; | |
43 | |
44 void _setRemoveAfterFiring() { | |
45 assert(_isFiring); | |
46 _eventState |= _STATE_REMOVE_AFTER_FIRING; | |
47 } | |
48 | |
49 bool get _removeAfterFiring => | |
50 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; | |
51 | |
52 // The controller._recordPause doesn't do anything for a broadcast controller, | |
53 // so we don't bother calling it. | |
54 void _onPause() { } | |
55 | |
56 // The controller._recordResume doesn't do anything for a broadcast | |
57 // controller, so we don't bother calling it. | |
58 void _onResume() { } | |
59 | |
60 // _onCancel is inherited. | |
61 } | |
62 | |
63 abstract class _BroadcastStreamController<T> | |
64 implements StreamController<T>, | |
65 _StreamControllerLifecycle<T>, | |
66 _EventSink<T>, | |
67 _EventDispatch<T> { | |
68 static const int _STATE_INITIAL = 0; | |
69 static const int _STATE_EVENT_ID = 1; | |
70 static const int _STATE_FIRING = 2; | |
71 static const int _STATE_CLOSED = 4; | |
72 static const int _STATE_ADDSTREAM = 8; | |
73 | |
74 ControllerCallback onListen; | |
75 ControllerCancelCallback onCancel; | |
76 | |
77 // State of the controller. | |
78 int _state; | |
79 | |
80 // Double-linked list of active listeners. | |
81 _BroadcastSubscription<T> _firstSubscription; | |
82 _BroadcastSubscription<T> _lastSubscription; | |
83 | |
84 // Extra state used during an [addStream] call. | |
85 _AddStreamState<T> _addStreamState; | |
86 | |
87 /** | |
88 * Future returned by [close] and [done]. | |
89 * | |
90 * The future is completed whenever the done event has been sent to all | |
91 * relevant listeners. | |
92 * The relevant listeners are the ones that were listening when [close] was | |
93 * called. When all of these have been canceled (sending the done event makes | |
94 * them cancel, but they can also be canceled before sending the event), | |
95 * this future completes. | |
96 * | |
97 * Any attempt to listen after calling [close] will throw, so there won't | |
98 * be any further listeners. | |
99 */ | |
100 _Future _doneFuture; | |
101 | |
102 _BroadcastStreamController(this.onListen, this.onCancel) | |
103 : _state = _STATE_INITIAL; | |
104 | |
105 ControllerCallback get onPause { | |
106 throw new UnsupportedError( | |
107 "Broadcast stream controllers do not support pause callbacks"); | |
108 } | |
109 | |
110 void set onPause(void onPauseHandler()) { | |
111 throw new UnsupportedError( | |
112 "Broadcast stream controllers do not support pause callbacks"); | |
113 } | |
114 | |
115 ControllerCallback get onResume { | |
116 throw new UnsupportedError( | |
117 "Broadcast stream controllers do not support pause callbacks"); | |
118 } | |
119 | |
120 void set onResume(void onResumeHandler()) { | |
121 throw new UnsupportedError( | |
122 "Broadcast stream controllers do not support pause callbacks"); | |
123 } | |
124 | |
125 // StreamController interface. | |
126 | |
127 Stream<T> get stream => new _BroadcastStream<T>(this); | |
128 | |
129 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | |
130 | |
131 bool get isClosed => (_state & _STATE_CLOSED) != 0; | |
132 | |
133 /** | |
134 * A broadcast controller is never paused. | |
135 * | |
136 * Each receiving stream may be paused individually, and they handle their | |
137 * own buffering. | |
138 */ | |
139 bool get isPaused => false; | |
140 | |
141 /** Whether there are currently one or more subscribers. */ | |
142 bool get hasListener => !_isEmpty; | |
143 | |
144 /** | |
145 * Test whether the stream has exactly one listener. | |
146 * | |
147 * Assumes that the stream has a listener (not [_isEmpty]). | |
148 */ | |
149 bool get _hasOneListener { | |
150 assert(!_isEmpty); | |
151 return identical(_firstSubscription, _lastSubscription); | |
152 } | |
153 | |
154 /** Whether an event is being fired (sent to some, but not all, listeners). */ | |
155 bool get _isFiring => (_state & _STATE_FIRING) != 0; | |
156 | |
157 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | |
158 | |
159 bool get _mayAddEvent => (_state < _STATE_CLOSED); | |
160 | |
161 _Future _ensureDoneFuture() { | |
162 if (_doneFuture != null) return _doneFuture; | |
163 return _doneFuture = new _Future(); | |
164 } | |
165 | |
166 // Linked list helpers | |
167 | |
168 bool get _isEmpty => _firstSubscription == null; | |
169 | |
170 /** Adds subscription to linked list of active listeners. */ | |
171 void _addListener(_BroadcastSubscription<T> subscription) { | |
172 assert(identical(subscription._next, subscription)); | |
173 subscription._eventState = (_state & _STATE_EVENT_ID); | |
174 // Insert in linked list as last subscription. | |
175 _BroadcastSubscription<T> oldLast = _lastSubscription; | |
176 _lastSubscription = subscription; | |
177 subscription._next = null; | |
178 subscription._previous = oldLast; | |
179 if (oldLast == null) { | |
180 _firstSubscription = subscription; | |
181 } else { | |
182 oldLast._next = subscription; | |
183 } | |
184 } | |
185 | |
186 void _removeListener(_BroadcastSubscription<T> subscription) { | |
187 assert(identical(subscription._controller, this)); | |
188 assert(!identical(subscription._next, subscription)); | |
189 _BroadcastSubscription<T> previous = subscription._previous; | |
190 _BroadcastSubscription<T> next = subscription._next; | |
191 if (previous == null) { | |
192 // This was the first subscription. | |
193 _firstSubscription = next; | |
194 } else { | |
195 previous._next = next; | |
196 } | |
197 if (next == null) { | |
198 // This was the last subscription. | |
199 _lastSubscription = previous; | |
200 } else { | |
201 next._previous = previous; | |
202 } | |
203 | |
204 subscription._next = subscription._previous = subscription; | |
205 } | |
206 | |
207 // _StreamControllerLifecycle interface. | |
208 | |
209 StreamSubscription<T> _subscribe( | |
210 void onData(T data), | |
211 Function onError, | |
212 void onDone(), | |
213 bool cancelOnError) { | |
214 if (isClosed) { | |
215 if (onDone == null) onDone = _nullDoneHandler; | |
216 return new _DoneStreamSubscription<T>(onDone); | |
217 } | |
218 StreamSubscription<T> subscription = | |
219 new _BroadcastSubscription<T>(this, onData, onError, onDone, | |
220 cancelOnError); | |
221 _addListener(subscription); | |
222 if (identical(_firstSubscription, _lastSubscription)) { | |
223 // Only one listener, so it must be the first listener. | |
224 _runGuarded(onListen); | |
225 } | |
226 return subscription; | |
227 } | |
228 | |
229 Future _recordCancel(StreamSubscription<T> sub) { | |
230 _BroadcastSubscription<T> subscription = sub; | |
231 // If already removed by the stream, don't remove it again. | |
232 if (identical(subscription._next, subscription)) return null; | |
233 if (subscription._isFiring) { | |
234 subscription._setRemoveAfterFiring(); | |
235 } else { | |
236 _removeListener(subscription); | |
237 // If we are currently firing an event, the empty-check is performed at | |
238 // the end of the listener loop instead of here. | |
239 if (!_isFiring && _isEmpty) { | |
240 _callOnCancel(); | |
241 } | |
242 } | |
243 return null; | |
244 } | |
245 | |
246 void _recordPause(StreamSubscription<T> subscription) {} | |
247 void _recordResume(StreamSubscription<T> subscription) {} | |
248 | |
249 // EventSink interface. | |
250 | |
251 Error _addEventError() { | |
252 if (isClosed) { | |
253 return new StateError("Cannot add new events after calling close"); | |
254 } | |
255 assert(_isAddingStream); | |
256 return new StateError("Cannot add new events while doing an addStream"); | |
257 } | |
258 | |
259 void add(T data) { | |
260 if (!_mayAddEvent) throw _addEventError(); | |
261 _sendData(data); | |
262 } | |
263 | |
264 void addError(Object error, [StackTrace stackTrace]) { | |
265 error = _nonNullError(error); | |
266 if (!_mayAddEvent) throw _addEventError(); | |
267 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
268 if (replacement != null) { | |
269 error = _nonNullError(replacement.error); | |
270 stackTrace = replacement.stackTrace; | |
271 } | |
272 _sendError(error, stackTrace); | |
273 } | |
274 | |
275 Future close() { | |
276 if (isClosed) { | |
277 assert(_doneFuture != null); | |
278 return _doneFuture; | |
279 } | |
280 if (!_mayAddEvent) throw _addEventError(); | |
281 _state |= _STATE_CLOSED; | |
282 Future doneFuture = _ensureDoneFuture(); | |
283 _sendDone(); | |
284 return doneFuture; | |
285 } | |
286 | |
287 Future get done => _ensureDoneFuture(); | |
288 | |
289 Future addStream(Stream<T> stream, {bool cancelOnError: true}) { | |
290 if (!_mayAddEvent) throw _addEventError(); | |
291 _state |= _STATE_ADDSTREAM; | |
292 _addStreamState = new _AddStreamState(this, stream, cancelOnError); | |
293 return _addStreamState.addStreamFuture; | |
294 } | |
295 | |
296 // _EventSink interface, called from AddStreamState. | |
297 void _add(T data) { | |
298 _sendData(data); | |
299 } | |
300 | |
301 void _addError(Object error, StackTrace stackTrace) { | |
302 _sendError(error, stackTrace); | |
303 } | |
304 | |
305 void _close() { | |
306 assert(_isAddingStream); | |
307 _AddStreamState addState = _addStreamState; | |
308 _addStreamState = null; | |
309 _state &= ~_STATE_ADDSTREAM; | |
310 addState.complete(); | |
311 } | |
312 | |
313 // Event handling. | |
314 void _forEachListener( | |
315 void action(_BufferingStreamSubscription<T> subscription)) { | |
316 if (_isFiring) { | |
317 throw new StateError( | |
318 "Cannot fire new event. Controller is already firing an event"); | |
319 } | |
320 if (_isEmpty) return; | |
321 | |
322 // Get event id of this event. | |
323 int id = (_state & _STATE_EVENT_ID); | |
324 // Start firing (set the _STATE_FIRING bit). We don't do [onCancel] | |
325 // callbacks while firing, and we prevent reentrancy of this function. | |
326 // | |
327 // Set [_state]'s event id to the next event's id. | |
328 // Any listeners added while firing this event will expect the next event, | |
329 // not this one, and won't get notified. | |
330 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | |
331 _BroadcastSubscription<T> subscription = _firstSubscription; | |
332 while (subscription != null) { | |
333 if (subscription._expectsEvent(id)) { | |
334 subscription._eventState |= _BroadcastSubscription._STATE_FIRING; | |
335 action(subscription); | |
336 subscription._toggleEventId(); | |
337 _BroadcastSubscription<T> next = subscription._next; | |
338 if (subscription._removeAfterFiring) { | |
339 _removeListener(subscription); | |
340 } | |
341 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; | |
342 subscription = next; | |
343 } else { | |
344 subscription = subscription._next; | |
345 } | |
346 } | |
347 _state &= ~_STATE_FIRING; | |
348 | |
349 if (_isEmpty) { | |
350 _callOnCancel(); | |
351 } | |
352 } | |
353 | |
354 void _callOnCancel() { | |
355 assert(_isEmpty); | |
356 if (isClosed && _doneFuture._mayComplete) { | |
357 // When closed, _doneFuture is not null. | |
358 _doneFuture._asyncComplete(null); | |
359 } | |
360 _runGuarded(onCancel); | |
361 } | |
362 } | |
363 | |
364 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> | |
365 implements SynchronousStreamController<T> { | |
366 _SyncBroadcastStreamController(void onListen(), void onCancel()) | |
367 : super(onListen, onCancel); | |
368 | |
369 // EventDispatch interface. | |
370 | |
371 bool get _mayAddEvent => super._mayAddEvent && !_isFiring; | |
372 | |
373 _addEventError() { | |
374 if (_isFiring) { | |
375 return new StateError( | |
376 "Cannot fire new event. Controller is already firing an event"); | |
377 } | |
378 return super._addEventError(); | |
379 } | |
380 | |
381 void _sendData(T data) { | |
382 if (_isEmpty) return; | |
383 if (_hasOneListener) { | |
384 _state |= _BroadcastStreamController._STATE_FIRING; | |
385 _BroadcastSubscription<T> subscription = _firstSubscription; | |
386 subscription._add(data); | |
387 _state &= ~_BroadcastStreamController._STATE_FIRING; | |
388 if (_isEmpty) { | |
389 _callOnCancel(); | |
390 } | |
391 return; | |
392 } | |
393 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
394 subscription._add(data); | |
395 }); | |
396 } | |
397 | |
398 void _sendError(Object error, StackTrace stackTrace) { | |
399 if (_isEmpty) return; | |
400 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
401 subscription._addError(error, stackTrace); | |
402 }); | |
403 } | |
404 | |
405 void _sendDone() { | |
406 if (!_isEmpty) { | |
407 _forEachListener((_BufferingStreamSubscription<T> subscription) { | |
408 subscription._close(); | |
409 }); | |
410 } else { | |
411 assert(_doneFuture != null); | |
412 assert(_doneFuture._mayComplete); | |
413 _doneFuture._asyncComplete(null); | |
414 } | |
415 } | |
416 } | |
417 | |
418 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { | |
419 _AsyncBroadcastStreamController(void onListen(), void onCancel()) | |
420 : super(onListen, onCancel); | |
421 | |
422 // EventDispatch interface. | |
423 | |
424 void _sendData(T data) { | |
425 for (_BroadcastSubscription<T> subscription = _firstSubscription; | |
426 subscription != null; | |
427 subscription = subscription._next) { | |
428 subscription._addPending(new _DelayedData<T>(data)); | |
429 } | |
430 } | |
431 | |
432 void _sendError(Object error, StackTrace stackTrace) { | |
433 for (_BroadcastSubscription<T> subscription = _firstSubscription; | |
434 subscription != null; | |
435 subscription = subscription._next) { | |
436 subscription._addPending(new _DelayedError(error, stackTrace)); | |
437 } | |
438 } | |
439 | |
440 void _sendDone() { | |
441 if (!_isEmpty) { | |
442 for (_BroadcastSubscription<T> subscription = _firstSubscription; | |
443 subscription != null; | |
444 subscription = subscription._next) { | |
445 subscription._addPending(const _DelayedDone()); | |
446 } | |
447 } else { | |
448 assert(_doneFuture != null); | |
449 assert(_doneFuture._mayComplete); | |
450 _doneFuture._asyncComplete(null); | |
451 } | |
452 } | |
453 } | |
454 | |
455 /** | |
456 * Stream controller that is used by [Stream.asBroadcastStream]. | |
457 * | |
458 * This stream controller allows incoming events while it is firing | |
459 * other events. This is handled by delaying the events until the | |
460 * current event is done firing, and then fire the pending events. | |
461 * | |
462 * This class extends [_SyncBroadcastStreamController]. Events of | |
463 * an "asBroadcastStream" stream are always initiated by events | |
464 * on another stream, and it is fine to forward them synchronously. | |
465 */ | |
466 class _AsBroadcastStreamController<T> | |
467 extends _SyncBroadcastStreamController<T> | |
468 implements _EventDispatch<T> { | |
469 _StreamImplEvents<T> _pending; | |
470 | |
471 _AsBroadcastStreamController(void onListen(), void onCancel()) | |
472 : super(onListen, onCancel); | |
473 | |
474 bool get _hasPending => _pending != null && ! _pending.isEmpty; | |
475 | |
476 void _addPendingEvent(_DelayedEvent event) { | |
477 if (_pending == null) { | |
478 _pending = new _StreamImplEvents<T>(); | |
479 } | |
480 _pending.add(event); | |
481 } | |
482 | |
483 void add(T data) { | |
484 if (!isClosed && _isFiring) { | |
485 _addPendingEvent(new _DelayedData<T>(data)); | |
486 return; | |
487 } | |
488 super.add(data); | |
489 while (_hasPending) { | |
490 _pending.handleNext(this); | |
491 } | |
492 } | |
493 | |
494 void addError(Object error, [StackTrace stackTrace]) { | |
495 if (!isClosed && _isFiring) { | |
496 _addPendingEvent(new _DelayedError(error, stackTrace)); | |
497 return; | |
498 } | |
499 if (!_mayAddEvent) throw _addEventError(); | |
500 _sendError(error, stackTrace); | |
501 while (_hasPending) { | |
502 _pending.handleNext(this); | |
503 } | |
504 } | |
505 | |
506 Future close() { | |
507 if (!isClosed && _isFiring) { | |
508 _addPendingEvent(const _DelayedDone()); | |
509 _state |= _BroadcastStreamController._STATE_CLOSED; | |
510 return super.done; | |
511 } | |
512 Future result = super.close(); | |
513 assert(!_hasPending); | |
514 return result; | |
515 } | |
516 | |
517 void _callOnCancel() { | |
518 if (_hasPending) { | |
519 _pending.clear(); | |
520 _pending = null; | |
521 } | |
522 super._callOnCancel(); | |
523 } | |
524 } | |
525 | |
526 // A subscription that never receives any events. | |
527 // It can simulate pauses, but otherwise does nothing. | |
528 class _DoneSubscription<T> implements StreamSubscription<T> { | |
529 int _pauseCount = 0; | |
530 void onData(void handleData(T data)) {} | |
531 void onError(Function handleError) {} | |
532 void onDone(void handleDone()) {} | |
533 void pause([Future resumeSignal]) { | |
534 if (resumeSignal != null) resumeSignal.then(_resume); | |
535 _pauseCount++; | |
536 } | |
537 void resume() { _resume(null); } | |
538 void _resume(_) { | |
539 if (_pauseCount > 0) _pauseCount--; | |
540 } | |
541 Future cancel() { return new _Future.immediate(null); } | |
542 bool get isPaused => _pauseCount > 0; | |
543 Future/*<E>*/ asFuture/*<E>*/([Object/*=E*/ value]) => new _Future/*<E>*/(); | |
544 } | |
OLD | NEW |