Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: sdk/lib/async/broadcast_stream_controller.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Complete rewrite. StreamController is now itself a StreamSink. Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 class _BroadcastStream<T> extends _StreamImpl<T> {
8 _BroadcastStreamController _controller;
9
10 _BroadcastStream(this._controller);
11
12 bool get isBroadcast => true;
13
14 StreamSubscription<T> _createSubscription(
15 void onData(T data),
16 void onError(Object error),
17 void onDone(),
18 bool cancelOnError) =>
19 _controller._subscribe(onData, onError, onDone, cancelOnError);
20 }
21
22 abstract class _BroadcastSubscriptionLink {
23 _BroadcastSubscriptionLink _next;
24 _BroadcastSubscriptionLink _previous;
25 }
26
27 class _BroadcastSubscription<T> extends _ControllerSubscription<T>
28 implements _BroadcastSubscriptionLink {
29 static const int _STATE_EVENT_ID = 1;
30 static const int _STATE_FIRING = 2;
31 static const int _STATE_REMOVE_AFTER_FIRING = 4;
32 // TODO(lrn): Use the _state field on _ControllerSubscription to
33 // also store this state. Requires that the subscription implementation
34 // does not assume that it's use of the state integer is the only use.
35 int _eventState;
36
37 _BroadcastSubscriptionLink _next;
38 _BroadcastSubscriptionLink _previous;
39
40 _BroadcastSubscription(_StreamControllerLifecycle controller,
41 void onData(T data),
42 void onError(Object error),
43 void onDone(),
44 bool cancelOnError)
45 : super(controller, onData, onError, onDone, cancelOnError) {
46 _next = _previous = this;
47 }
48
49 _BroadcastStreamController get _controller => super._controller;
50
51 bool _expectsEvent(int eventId) {
floitsch 2013/06/27 15:15:19 => ?
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Will do. Probably added the block to be able to d
52 return (_eventState & _STATE_EVENT_ID) == eventId;
53 }
54
55 void _toggleEventId() {
floitsch 2013/06/27 15:15:19 => ?
Lasse Reichstein Nielsen 2013/06/28 12:57:38 doesn't return a value.
56 _eventState ^= _STATE_EVENT_ID;
57 }
58
59 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
60
61 bool _setRemoveAfterFiring() {
62 assert(_isFiring);
63 _eventState |= _STATE_REMOVE_AFTER_FIRING;
64 }
65
66 bool get _removeAfterFiring =>
floitsch 2013/06/27 15:15:19 _shouldRemoveAfterFiring
Lasse Reichstein Nielsen 2013/06/28 12:57:38 This is an imperative. It must be removed after fi
67 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
68
69 void _onPause() { }
70
71 void _onResume() { }
72 }
floitsch 2013/06/27 15:15:19 missing _onCancel() from the _ControllerSubscripti
Lasse Reichstein Nielsen 2013/06/28 12:57:38 It's inherited. We overwrite _onPause and _onResum
73
74
75 abstract class _BroadcastStreamController<T>
76 implements StreamController<T>,
77 _StreamControllerLifecycle<T>,
78 _BroadcastSubscriptionLink,
79 _EventSink<T>,
80 _EventDispatch<T> {
81 static const int _STATE_INITIAL = 0;
82 static const int _STATE_EVENT_ID = 1;
83 static const int _STATE_FIRING = 2;
84 static const int _STATE_CLOSED = 4;
85 static const int _STATE_ADDSTREAM = 8;
86
87 final _NotificationHandler _onListen;
88 final _NotificationHandler _onCancel;
89
90 // State of the controller.
91 int _state;
92
93 // Double-linked list of active listeners.
94 _BroadcastSubscriptionLink _next;
95 _BroadcastSubscriptionLink _previous;
96
97 // Extra state used during an [addStream] call.
98 _AddStreamState<T> _addStreamState;
99
100 /**
101 * Future returned by [close] and [done].
102 *
103 * The future is completed whenever the done event has been sent to all
104 * relevant listeners.
105 * This means when all listeners at the time when the done event was
floitsch 2013/06/27 15:15:19 bad English sentence.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Reworded.
106 * scheduled have been canceled (sending the done event makes them cancel,
107 * but they can also be canceled before sending the event).
108 *
109 * To make this easier to handle, all listeners added after calling "close"
110 * will never receive any events, so we don't remember them. That means that
111 * this future can be completed whenever the controller [isClosed] and
112 * [hasListener] is false. This is checked in [close] and [_callOnCancel].
113 */
114 _FutureImpl _doneFuture;
115
116 _BroadcastStreamController(this._onListen, this._onCancel)
117 : _state = _STATE_INITIAL {
118 _next = _previous = this;
119 }
120
121 // StreamController interface.
122
123 Stream<T> get stream => new _BroadcastStream<T>(this);
124
125 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
126
127 bool get isClosed => (_state & _STATE_CLOSED) != 0;
128
129 /**
130 * A broadcast controller is never paused.
131 *
132 * Each receiving stream may be paused individually, and they handle their
133 * own buffering.
134 */
135 bool get isPaused => false;
136
137 /** Whether there are currently one or more subscribers. */
138 bool get hasListener => !_isEmpty;
139
140 /** Whether an event is being fired (sent to some, but not all, listeners). */
141 bool get _isFiring => (_state & _STATE_FIRING) != 0;
142
143 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
144
145 bool get _mayAddEvent => (_state < _STATE_CLOSED);
146
147 _FutureImpl _ensureDoneFuture() {
148 if (_doneFuture != null) return _doneFuture;
149 return _doneFuture = new _FutureImpl();
150 }
151
152 // Linked list helpers
153
154 bool get _isEmpty => identical(_next, this);
155
156 /** Adds subscription to linked list of active listeners. */
157 void _addListener(_BroadcastSubscription<T> subscription) {
158 _BroadcastSubscriptionLink previous = _previous;
159 previous._next = subscription;
floitsch 2013/06/27 15:15:19 needs comments. Either explain that you want to ac
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
160 _previous = subscription._previous;
161 subscription._previous._next = this;
162 subscription._previous = previous;
163 subscription._eventState = (_state & _STATE_EVENT_ID);
164 }
165
166 void _removeListener(_BroadcastSubscription<T> subscription) {
167 assert(identical(subscription._controller, this));
168 assert(!identical(subscription._next, subscription));
169 subscription._previous._next = subscription._next;
floitsch 2013/06/27 15:15:19 please make this nicer to read: var prev = sub.pre
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
170 subscription._next._previous = subscription._previous;
171 subscription._next = subscription._previous = subscription;
172 }
173
174 // _StreamControllerLifecycle interface.
175
176 StreamSubscription<T> _subscribe(void onData(T data),
177 void onError(Object error),
178 void onDone(),
179 bool cancelOnError) {
180 if (isClosed) {
181 // No events will ever reach the new subscription, so we don't attach
182 // it to anything.
183 return new _DoneSubscription<T>();
floitsch 2013/06/27 15:15:19 Let's throw instead.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
184 }
185 StreamSubscription subscription = new _BroadcastSubscription<T>(
186 this, onData, onError, onDone, cancelOnError);
187 _addListener(subscription);
188 if (identical(_next, _previous)) {
189 // Only one listener, so it must be the first listener.
190 _runGuarded(_onListen);
191 }
192 return subscription;
193 }
194
195 void _recordCancel(_BroadcastSubscription<T> subscription) {
196 // If already removed by the stream, don't remove it again.
197 if (identical(subscription._next, subscription)) return;
198 assert(!identical(subscription._next, subscription));
199 if (subscription._isFiring) {
200 subscription._setRemoveAfterFiring();
201 } else {
202 assert(!identical(subscription._next, subscription));
203 _removeListener(subscription);
204 // If we are currently firing an event, the empty-check is performed at
205 // the end of the listener loop instead of here.
206 if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
floitsch 2013/06/27 15:15:19 if (!_isFiring && _isEmpty)
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
207 _callOnCancel();
208 }
209 }
210 }
211
212 void _recordPause(StreamSubscription<T> subscription) {}
213 void _recordResume(StreamSubscription<T> subscription) {}
214
215 // EventSink interface.
216
217 Error _addEventError() {
218 if (isClosed) {
219 return new StateError("Cannot add new events after calling close");
220 }
221 assert(_isAddingStream);
222 return new StateError("Cannot add new events while doing an addStream");
223 }
224
225 void add(T data) {
226 if (!_mayAddEvent) throw _addEventError();
227 _sendData(data);
228 }
229
230 void addError(Object error, [Object stackTrace]) {
231 if (!_mayAddEvent) throw _addEventError();
232 if (stackTrace != null) _attachStackTrace(error, stackTrace);
233 _sendError(error);
234 }
235
236 Future close() {
237 if (isClosed) {
238 assert(_doneFuture != null);
239 return _doneFuture;
240 }
241 if (!_mayAddEvent) throw _addEventError();
242 _state |= _STATE_CLOSED;
243 Future doneFuture = _ensureDoneFuture();
244 _sendDone();
245 return doneFuture;
246 }
247
248 Future get done => _ensureDoneFuture();
249
250 Future addStream(Stream<T> stream) {
251 if (!_mayAddEvent) throw _addEventError();
252 _state |= _STATE_ADDSTREAM;
253 _addStreamState = new _AddStreamState(this, stream);
254 return _addStreamState.addStreamFuture;
255 }
256
257 // _EventSink interface, called from AddStramState.
floitsch 2013/06/27 15:15:19 AddStreamState
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
258 void _add(T data) {
259 _sendData(data);
260 }
261
262 void _addError(Object error) {
263 assert(_isAddingStream);
floitsch 2013/06/27 15:15:19 Why is an error fatal? Isn't it just passed throug
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Let's pass it through, the controller can handle i
264 _AddStreamState addState = _addStreamState;
265 _addStreamState = null;
266 _state &= ~_STATE_ADDSTREAM;
267 addState.completeWithError(error);
268 }
269
270 void _close() {
271 assert(_isAddingStream);
272 _AddStreamState addState = _addStreamState;
273 _addStreamState = null;
274 _state &= ~_STATE_ADDSTREAM;
275 addState.complete();
276 }
277
278 // Event handling.
279 void _forEachListener(
280 void action(_BufferingStreamSubscription<T> subscription)) {
281 if (_isFiring) {
282 throw new StateError(
283 "Cannot fire new event. Controller is already firing an event");
284 }
285 if (_isEmpty) return;
286
287 // Get event id of this event.
288 int id = (_state & _STATE_EVENT_ID);
289 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
290 // callbacks while firing, and we prevent reentrancy of this function.
291 //
292 // Set [_state]'s event id to the next event's id.
293 // Any listeners added while firing this event will expect the next event,
294 // not this one, and won't get notified.
295 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
296 _BroadcastSubscriptionLink link = _next;
297 while (!identical(link, this)) {
298 _BroadcastSubscription<T> subscription = link;
299 if (subscription._expectsEvent(id)) {
300 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
301 action(subscription);
302 subscription._toggleEventId();
303 link = subscription._next;
304 if (subscription._removeAfterFiring) {
305 _removeListener(subscription);
306 }
307 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
308 } else {
309 link = subscription._next;
310 }
311 }
312 _state &= ~_STATE_FIRING;
313
314 if (_isEmpty) {
315 _callOnCancel();
316 }
317 }
318
319 void _callOnCancel() {
320 assert(_isEmpty);
321 if (isClosed && _doneFuture._mayComplete) {
322 // When closed, _doneFuture is not null.
323 _doneFuture._asyncSetValue(null);
324 }
325 _runGuarded(_onCancel);
326 }
327 }
328
329 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
330 _SyncBroadcastStreamController(void onListen(), void onCancel())
331 : super(onListen, onCancel);
332
333 // EventDispatch interface.
334
335 void _sendData(T data) {
336 if (_isEmpty) return;
337 _forEachListener((_BufferingStreamSubscription<T> subscription) {
338 subscription._add(data);
339 });
340 }
341
342 void _sendError(Object error) {
343 if (_isEmpty) return;
344 _forEachListener((_BufferingStreamSubscription<T> subscription) {
345 subscription._addError(error);
346 });
347 }
348
349 void _sendDone() {
350 if (!_isEmpty) {
351 _forEachListener((_BroadcastSubscription<T> subscription) {
352 subscription._close();
353 });
354 } else {
355 assert(_doneFuture != null);
356 assert(_doneFuture._mayComplete);
357 _doneFuture._asyncSetValue(null);
358 }
359 }
360 }
361
362 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
363 _AsyncBroadcastStreamController(void onListen(), void onCancel())
364 : super(onListen, onCancel);
365
366 // EventDispatch interface.
367
368 void _sendData(T data) {
369 for (_BroadcastSubscriptionLink link = _next;
370 !identical(link, this);
371 link = link._next) {
372 _BroadcastSubscription<T> subscription = link;
373 subscription._addPending(new _DelayedData(data));
374 }
375 }
376
377 void _sendError(Object error) {
378 for (_BroadcastSubscriptionLink link = _next;
379 !identical(link, this);
380 link = link._next) {
381 _BroadcastSubscription<T> subscription = link;
382 subscription._addPending(new _DelayedError(error));
383 }
384 }
385
386 void _sendDone() {
387 if (!_isEmpty) {
388 for (_BroadcastSubscriptionLink link = _next;
389 !identical(link, this);
390 link = link._next) {
391 _BroadcastSubscription<T> subscription = link;
392 subscription._addPending(const _DelayedDone());
393 }
394 } else {
395 assert(_doneFuture != null);
396 assert(_doneFuture._mayComplete);
397 _doneFuture._asyncSetValue(null);
398 }
399 }
400 }
401
402 /**
403 * Stream controller that is used by [Stream.asBroadcastStream].
404 *
405 * This stream controller allows incoming events while it is firing
406 * other events. This is handled by delaying the events until the
407 * current event is done firing, and then fire the pending events.
408 *
409 * This class extends [_SyncBroadcastStreamController]. Events of
410 * an "asBroadcastStream" stream are always initiated by events
411 * on another stream, and it is fine to forward them synchronously.
412 */
413 class _AsBroadcastStreamController<T>
414 extends _SyncBroadcastStreamController<T>
415 implements _EventDispatch<T> {
416 _StreamImplEvents _pending;
417
418 _AsBroadcastStreamController(void onListen(), void onCancel())
419 : super(onListen, onCancel);
420
421 bool get _hasPending => _pending != null && ! _pending.isEmpty;
422
423 void _addPendingEvent(_DelayedEvent event) {
424 if (_pending == null) {
425 _pending = new _StreamImplEvents();
426 }
427 _pending.add(event);
428 }
429
430 void add(T data) {
431 if (!isClosed && _isFiring) {
432 _addPendingEvent(new _DelayedData<T>(data));
433 return;
434 }
435 super.add(data);
436 while (_hasPending) {
437 _pending.handleNext(this);
438 }
439 }
440
441 void addError(Object error, [StackTrace stackTrace]) {
442 if (!isClosed && _isFiring) {
443 _addPendingEvent(new _DelayedError(error));
444 return;
445 }
446 super.addError(error, stackTrace);
447 while (_hasPending) {
448 _pending.handleNext(this);
449 }
450 }
451
452 void close() {
453 if (!isClosed && _isFiring) {
454 _addPendingEvent(const _DelayedDone());
455 _state |= _STATE_CLOSED;
456 return;
457 }
458 super.close();
459 assert(!_hasPending);
460 }
461
462 void _callOnCancel() {
463 if (_hasPending) {
464 _pending.clear();
465 _pending = null;
466 }
467 super._callOnCancel();
468 }
469 }
470
471 // A subscription that never receives any events.
472 // It can simulate pauses, but otherwise does nothing.
473 class _DoneSubscription<T> implements StreamSubscription<T> {
474 int _pauseCount = 0;
475 void onData(void handleData(T data)) {}
476 void onError(void handleErrr(Object error)) {}
477 void onDone(void handleDone()) {}
478 void pause([Future resumeSignal]) {
479 if (resumeSignal != null) resumeSignal.then(_resume);
480 _pauseCount++;
481 }
482 void resume() { _resume(null); }
483 void _resume(_) {
484 if (_pauseCount > 0) _pauseCount--;
485 }
486 void cancel() {}
487 bool get isPaused => _pauseCount > 0;
488 Future asFuture(Object value) => new _FutureImpl();
489 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698