OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 class _BroadcastStream<T> extends _ControllerStream<T> { | 7 class _BroadcastStream<T> extends _ControllerStream<T> { |
8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); | 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); |
9 | 9 |
10 bool get isBroadcast => true; | 10 bool get isBroadcast => true; |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
72 _StreamControllerLifecycle<T>, | 72 _StreamControllerLifecycle<T>, |
73 _BroadcastSubscriptionLink, | 73 _BroadcastSubscriptionLink, |
74 _EventSink<T>, | 74 _EventSink<T>, |
75 _EventDispatch<T> { | 75 _EventDispatch<T> { |
76 static const int _STATE_INITIAL = 0; | 76 static const int _STATE_INITIAL = 0; |
77 static const int _STATE_EVENT_ID = 1; | 77 static const int _STATE_EVENT_ID = 1; |
78 static const int _STATE_FIRING = 2; | 78 static const int _STATE_FIRING = 2; |
79 static const int _STATE_CLOSED = 4; | 79 static const int _STATE_CLOSED = 4; |
80 static const int _STATE_ADDSTREAM = 8; | 80 static const int _STATE_ADDSTREAM = 8; |
81 | 81 |
82 _NotificationHandler _onListen; | 82 ControllerCallback onListen; |
83 _NotificationHandler _onCancel; | 83 ControllerCancelCallback onCancel; |
84 | 84 |
85 // State of the controller. | 85 // State of the controller. |
86 int _state; | 86 int _state; |
87 | 87 |
88 // Double-linked list of active listeners. | 88 // Double-linked list of active listeners. |
89 _BroadcastSubscriptionLink _next; | 89 _BroadcastSubscriptionLink _next; |
90 _BroadcastSubscriptionLink _previous; | 90 _BroadcastSubscriptionLink _previous; |
91 | 91 |
92 // Extra state used during an [addStream] call. | 92 // Extra state used during an [addStream] call. |
93 _AddStreamState<T> _addStreamState; | 93 _AddStreamState<T> _addStreamState; |
94 | 94 |
95 /** | 95 /** |
96 * Future returned by [close] and [done]. | 96 * Future returned by [close] and [done]. |
97 * | 97 * |
98 * The future is completed whenever the done event has been sent to all | 98 * The future is completed whenever the done event has been sent to all |
99 * relevant listeners. | 99 * relevant listeners. |
100 * The relevant listeners are the ones that were listening when [close] was | 100 * The relevant listeners are the ones that were listening when [close] was |
101 * called. When all of these have been canceled (sending the done event makes | 101 * called. When all of these have been canceled (sending the done event makes |
102 * them cancel, but they can also be canceled before sending the event), | 102 * them cancel, but they can also be canceled before sending the event), |
103 * this future completes. | 103 * this future completes. |
104 * | 104 * |
105 * Any attempt to listen after calling [close] will throw, so there won't | 105 * Any attempt to listen after calling [close] will throw, so there won't |
106 * be any further listeners. | 106 * be any further listeners. |
107 */ | 107 */ |
108 _Future _doneFuture; | 108 _Future _doneFuture; |
109 | 109 |
110 _BroadcastStreamController(this._onListen, this._onCancel) | 110 _BroadcastStreamController(this.onListen, this.onCancel) |
111 : _state = _STATE_INITIAL { | 111 : _state = _STATE_INITIAL { |
112 _next = _previous = this; | 112 _next = _previous = this; |
113 } | 113 } |
114 | 114 |
115 void set onListen(void onListenHandler()) { _onListen = onListenHandler; } | 115 ControllerCallback get onPause { |
| 116 throw new UnsupportedError( |
| 117 "Broadcast stream controllers do not support pause callbacks"); |
| 118 } |
116 | 119 |
117 void set onPause(void onPauseHandler()) { | 120 void set onPause(void onPauseHandler()) { |
118 throw new UnsupportedError( | 121 throw new UnsupportedError( |
119 "Broadcast stream controllers do not support pause callbacks"); | 122 "Broadcast stream controllers do not support pause callbacks"); |
120 } | 123 } |
121 | 124 |
| 125 ControllerCallback get onResume { |
| 126 throw new UnsupportedError( |
| 127 "Broadcast stream controllers do not support pause callbacks"); |
| 128 } |
| 129 |
122 void set onResume(void onResumeHandler()) { | 130 void set onResume(void onResumeHandler()) { |
123 throw new UnsupportedError( | 131 throw new UnsupportedError( |
124 "Broadcast stream controllers do not support pause callbacks"); | 132 "Broadcast stream controllers do not support pause callbacks"); |
125 } | 133 } |
126 | 134 |
127 void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; } | |
128 | |
129 // StreamController interface. | 135 // StreamController interface. |
130 | 136 |
131 Stream<T> get stream => new _BroadcastStream<T>(this); | 137 Stream<T> get stream => new _BroadcastStream<T>(this); |
132 | 138 |
133 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 139 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
134 | 140 |
135 bool get isClosed => (_state & _STATE_CLOSED) != 0; | 141 bool get isClosed => (_state & _STATE_CLOSED) != 0; |
136 | 142 |
137 /** | 143 /** |
138 * A broadcast controller is never paused. | 144 * A broadcast controller is never paused. |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
202 if (isClosed) { | 208 if (isClosed) { |
203 if (onDone == null) onDone = _nullDoneHandler; | 209 if (onDone == null) onDone = _nullDoneHandler; |
204 return new _DoneStreamSubscription<T>(onDone); | 210 return new _DoneStreamSubscription<T>(onDone); |
205 } | 211 } |
206 StreamSubscription subscription = | 212 StreamSubscription subscription = |
207 new _BroadcastSubscription<T>(this, onData, onError, onDone, | 213 new _BroadcastSubscription<T>(this, onData, onError, onDone, |
208 cancelOnError); | 214 cancelOnError); |
209 _addListener(subscription); | 215 _addListener(subscription); |
210 if (identical(_next, _previous)) { | 216 if (identical(_next, _previous)) { |
211 // Only one listener, so it must be the first listener. | 217 // Only one listener, so it must be the first listener. |
212 _runGuarded(_onListen); | 218 _runGuarded(onListen); |
213 } | 219 } |
214 return subscription; | 220 return subscription; |
215 } | 221 } |
216 | 222 |
217 Future _recordCancel(_BroadcastSubscription<T> subscription) { | 223 Future _recordCancel(_BroadcastSubscription<T> subscription) { |
218 // If already removed by the stream, don't remove it again. | 224 // If already removed by the stream, don't remove it again. |
219 if (identical(subscription._next, subscription)) return null; | 225 if (identical(subscription._next, subscription)) return null; |
220 assert(!identical(subscription._next, subscription)); | 226 assert(!identical(subscription._next, subscription)); |
221 if (subscription._isFiring) { | 227 if (subscription._isFiring) { |
222 subscription._setRemoveAfterFiring(); | 228 subscription._setRemoveAfterFiring(); |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
303 void _forEachListener( | 309 void _forEachListener( |
304 void action(_BufferingStreamSubscription<T> subscription)) { | 310 void action(_BufferingStreamSubscription<T> subscription)) { |
305 if (_isFiring) { | 311 if (_isFiring) { |
306 throw new StateError( | 312 throw new StateError( |
307 "Cannot fire new event. Controller is already firing an event"); | 313 "Cannot fire new event. Controller is already firing an event"); |
308 } | 314 } |
309 if (_isEmpty) return; | 315 if (_isEmpty) return; |
310 | 316 |
311 // Get event id of this event. | 317 // Get event id of this event. |
312 int id = (_state & _STATE_EVENT_ID); | 318 int id = (_state & _STATE_EVENT_ID); |
313 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] | 319 // Start firing (set the _STATE_FIRING bit). We don't do [onCancel] |
314 // callbacks while firing, and we prevent reentrancy of this function. | 320 // callbacks while firing, and we prevent reentrancy of this function. |
315 // | 321 // |
316 // Set [_state]'s event id to the next event's id. | 322 // Set [_state]'s event id to the next event's id. |
317 // Any listeners added while firing this event will expect the next event, | 323 // Any listeners added while firing this event will expect the next event, |
318 // not this one, and won't get notified. | 324 // not this one, and won't get notified. |
319 _state ^= _STATE_EVENT_ID | _STATE_FIRING; | 325 _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
320 _BroadcastSubscriptionLink link = _next; | 326 _BroadcastSubscriptionLink link = _next; |
321 while (!identical(link, this)) { | 327 while (!identical(link, this)) { |
322 _BroadcastSubscription<T> subscription = link; | 328 _BroadcastSubscription<T> subscription = link; |
323 if (subscription._expectsEvent(id)) { | 329 if (subscription._expectsEvent(id)) { |
(...skipping 15 matching lines...) Expand all Loading... |
339 _callOnCancel(); | 345 _callOnCancel(); |
340 } | 346 } |
341 } | 347 } |
342 | 348 |
343 void _callOnCancel() { | 349 void _callOnCancel() { |
344 assert(_isEmpty); | 350 assert(_isEmpty); |
345 if (isClosed && _doneFuture._mayComplete) { | 351 if (isClosed && _doneFuture._mayComplete) { |
346 // When closed, _doneFuture is not null. | 352 // When closed, _doneFuture is not null. |
347 _doneFuture._asyncComplete(null); | 353 _doneFuture._asyncComplete(null); |
348 } | 354 } |
349 _runGuarded(_onCancel); | 355 _runGuarded(onCancel); |
350 } | 356 } |
351 } | 357 } |
352 | 358 |
353 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> | 359 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> |
354 implements SynchronousStreamController<T> { | 360 implements SynchronousStreamController<T> { |
355 _SyncBroadcastStreamController(void onListen(), void onCancel()) | 361 _SyncBroadcastStreamController(void onListen(), void onCancel()) |
356 : super(onListen, onCancel); | 362 : super(onListen, onCancel); |
357 | 363 |
358 // EventDispatch interface. | 364 // EventDispatch interface. |
359 | 365 |
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
527 _pauseCount++; | 533 _pauseCount++; |
528 } | 534 } |
529 void resume() { _resume(null); } | 535 void resume() { _resume(null); } |
530 void _resume(_) { | 536 void _resume(_) { |
531 if (_pauseCount > 0) _pauseCount--; | 537 if (_pauseCount > 0) _pauseCount--; |
532 } | 538 } |
533 Future cancel() { return new _Future.immediate(null); } | 539 Future cancel() { return new _Future.immediate(null); } |
534 bool get isPaused => _pauseCount > 0; | 540 bool get isPaused => _pauseCount > 0; |
535 Future asFuture([Object value]) => new _Future(); | 541 Future asFuture([Object value]) => new _Future(); |
536 } | 542 } |
OLD | NEW |