Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(191)

Side by Side Diff: sdk/lib/async/broadcast_stream_controller.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review comments. Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/async_sources.gypi ('k') | sdk/lib/async/future_impl.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 class _BroadcastStream<T> extends _StreamImpl<T> {
8 _BroadcastStreamController _controller;
9
10 _BroadcastStream(this._controller);
11
12 bool get isBroadcast => true;
13
14 StreamSubscription<T> _createSubscription(
15 void onData(T data),
16 void onError(Object error),
17 void onDone(),
18 bool cancelOnError) =>
19 _controller._subscribe(onData, onError, onDone, cancelOnError);
20 }
21
22 abstract class _BroadcastSubscriptionLink {
23 _BroadcastSubscriptionLink _next;
24 _BroadcastSubscriptionLink _previous;
25 }
26
27 class _BroadcastSubscription<T> extends _ControllerSubscription<T>
28 implements _BroadcastSubscriptionLink {
29 static const int _STATE_EVENT_ID = 1;
30 static const int _STATE_FIRING = 2;
31 static const int _STATE_REMOVE_AFTER_FIRING = 4;
32 // TODO(lrn): Use the _state field on _ControllerSubscription to
33 // also store this state. Requires that the subscription implementation
34 // does not assume that it's use of the state integer is the only use.
35 int _eventState;
36
37 _BroadcastSubscriptionLink _next;
38 _BroadcastSubscriptionLink _previous;
39
40 _BroadcastSubscription(_StreamControllerLifecycle controller,
41 void onData(T data),
42 void onError(Object error),
43 void onDone(),
44 bool cancelOnError)
45 : super(controller, onData, onError, onDone, cancelOnError) {
46 _next = _previous = this;
47 }
48
49 _BroadcastStreamController get _controller => super._controller;
50
51 bool _expectsEvent(int eventId) =>
52 (_eventState & _STATE_EVENT_ID) == eventId;
53
54
55 void _toggleEventId() {
56 _eventState ^= _STATE_EVENT_ID;
57 }
58
59 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
60
61 bool _setRemoveAfterFiring() {
62 assert(_isFiring);
63 _eventState |= _STATE_REMOVE_AFTER_FIRING;
64 }
65
66 bool get _removeAfterFiring =>
67 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
68
69 // The controller._recordPause doesn't do anything for a broadcast controller,
70 // so we don't bother calling it.
71 void _onPause() { }
72
73 // The controller._recordResume doesn't do anything for a broadcast
74 // controller, so we don't bother calling it.
75 void _onResume() { }
76
77 // _onCancel is inherited.
78 }
79
80
81 abstract class _BroadcastStreamController<T>
82 implements StreamController<T>,
83 _StreamControllerLifecycle<T>,
84 _BroadcastSubscriptionLink,
85 _EventSink<T>,
86 _EventDispatch<T> {
87 static const int _STATE_INITIAL = 0;
88 static const int _STATE_EVENT_ID = 1;
89 static const int _STATE_FIRING = 2;
90 static const int _STATE_CLOSED = 4;
91 static const int _STATE_ADDSTREAM = 8;
92
93 final _NotificationHandler _onListen;
94 final _NotificationHandler _onCancel;
95
96 // State of the controller.
97 int _state;
98
99 // Double-linked list of active listeners.
100 _BroadcastSubscriptionLink _next;
101 _BroadcastSubscriptionLink _previous;
102
103 // Extra state used during an [addStream] call.
104 _AddStreamState<T> _addStreamState;
105
106 /**
107 * Future returned by [close] and [done].
108 *
109 * The future is completed whenever the done event has been sent to all
110 * relevant listeners.
111 * The relevant listeners are the ones that were listening when [close] was
112 * called. When all of these have been canceled (sending the done event makes
113 * them cancel, but they can also be canceled before sending the event),
114 * this future completes.
115 *
116 * Any attempt to listen after calling [close] will throw, so there won't
117 * be any further listeners.
118 */
119 _FutureImpl _doneFuture;
120
121 _BroadcastStreamController(this._onListen, this._onCancel)
122 : _state = _STATE_INITIAL {
123 _next = _previous = this;
124 }
125
126 // StreamController interface.
127
128 Stream<T> get stream => new _BroadcastStream<T>(this);
129
130 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
131
132 bool get isClosed => (_state & _STATE_CLOSED) != 0;
133
134 /**
135 * A broadcast controller is never paused.
136 *
137 * Each receiving stream may be paused individually, and they handle their
138 * own buffering.
139 */
140 bool get isPaused => false;
141
142 /** Whether there are currently one or more subscribers. */
143 bool get hasListener => !_isEmpty;
144
145 /** Whether an event is being fired (sent to some, but not all, listeners). */
146 bool get _isFiring => (_state & _STATE_FIRING) != 0;
147
148 bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
149
150 bool get _mayAddEvent => (_state < _STATE_CLOSED);
151
152 _FutureImpl _ensureDoneFuture() {
153 if (_doneFuture != null) return _doneFuture;
154 return _doneFuture = new _FutureImpl();
155 }
156
157 // Linked list helpers
158
159 bool get _isEmpty => identical(_next, this);
160
161 /** Adds subscription to linked list of active listeners. */
162 void _addListener(_BroadcastSubscription<T> subscription) {
163 assert(identical(subscription._next, subscription));
164 // Insert in linked list just before `this`.
165 subscription._previous = _previous;
166 subscription._next = this;
167 this._previous._next = subscription;
168 this._previous = subscription;
169 subscription._eventState = (_state & _STATE_EVENT_ID);
170 }
171
172 void _removeListener(_BroadcastSubscription<T> subscription) {
173 assert(identical(subscription._controller, this));
174 assert(!identical(subscription._next, subscription));
175 _BroadcastSubscriptionLink previous = subscription._previous;
176 _BroadcastSubscriptionLink next = subscription._next;
177 previous._next = next;
178 next._previous = previous;
179 subscription._next = subscription._previous = subscription;
180 }
181
182 // _StreamControllerLifecycle interface.
183
184 StreamSubscription<T> _subscribe(void onData(T data),
185 void onError(Object error),
186 void onDone(),
187 bool cancelOnError) {
188 if (isClosed) {
189 throw new StateError("Subscribing to closed stream");
190 }
191 StreamSubscription subscription = new _BroadcastSubscription<T>(
192 this, onData, onError, onDone, cancelOnError);
193 _addListener(subscription);
194 if (identical(_next, _previous)) {
195 // Only one listener, so it must be the first listener.
196 _runGuarded(_onListen);
197 }
198 return subscription;
199 }
200
201 void _recordCancel(_BroadcastSubscription<T> subscription) {
202 // If already removed by the stream, don't remove it again.
203 if (identical(subscription._next, subscription)) return;
204 assert(!identical(subscription._next, subscription));
205 if (subscription._isFiring) {
206 subscription._setRemoveAfterFiring();
207 } else {
208 assert(!identical(subscription._next, subscription));
209 _removeListener(subscription);
210 // If we are currently firing an event, the empty-check is performed at
211 // the end of the listener loop instead of here.
212 if (!_isFiring && _isEmpty) {
213 _callOnCancel();
214 }
215 }
216 }
217
218 void _recordPause(StreamSubscription<T> subscription) {}
219 void _recordResume(StreamSubscription<T> subscription) {}
220
221 // EventSink interface.
222
223 Error _addEventError() {
224 if (isClosed) {
225 return new StateError("Cannot add new events after calling close");
226 }
227 assert(_isAddingStream);
228 return new StateError("Cannot add new events while doing an addStream");
229 }
230
231 void add(T data) {
232 if (!_mayAddEvent) throw _addEventError();
233 _sendData(data);
234 }
235
236 void addError(Object error, [Object stackTrace]) {
237 if (!_mayAddEvent) throw _addEventError();
238 if (stackTrace != null) _attachStackTrace(error, stackTrace);
239 _sendError(error);
240 }
241
242 Future close() {
243 if (isClosed) {
244 assert(_doneFuture != null);
245 return _doneFuture;
246 }
247 if (!_mayAddEvent) throw _addEventError();
248 _state |= _STATE_CLOSED;
249 Future doneFuture = _ensureDoneFuture();
250 _sendDone();
251 return doneFuture;
252 }
253
254 Future get done => _ensureDoneFuture();
255
256 Future addStream(Stream<T> stream) {
257 if (!_mayAddEvent) throw _addEventError();
258 _state |= _STATE_ADDSTREAM;
259 _addStreamState = new _AddStreamState(this, stream);
260 return _addStreamState.addStreamFuture;
261 }
262
263 // _EventSink interface, called from AddStreamState.
264 void _add(T data) {
265 _sendData(data);
266 }
267
268 void _addError(Object error) {
269 assert(_isAddingStream);
270 _sendError(error);
271 }
272
273 void _close() {
274 assert(_isAddingStream);
275 _AddStreamState addState = _addStreamState;
276 _addStreamState = null;
277 _state &= ~_STATE_ADDSTREAM;
278 addState.complete();
279 }
280
281 // Event handling.
282 void _forEachListener(
283 void action(_BufferingStreamSubscription<T> subscription)) {
284 if (_isFiring) {
285 throw new StateError(
286 "Cannot fire new event. Controller is already firing an event");
287 }
288 if (_isEmpty) return;
289
290 // Get event id of this event.
291 int id = (_state & _STATE_EVENT_ID);
292 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
293 // callbacks while firing, and we prevent reentrancy of this function.
294 //
295 // Set [_state]'s event id to the next event's id.
296 // Any listeners added while firing this event will expect the next event,
297 // not this one, and won't get notified.
298 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
299 _BroadcastSubscriptionLink link = _next;
300 while (!identical(link, this)) {
301 _BroadcastSubscription<T> subscription = link;
302 if (subscription._expectsEvent(id)) {
303 subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
304 action(subscription);
305 subscription._toggleEventId();
306 link = subscription._next;
307 if (subscription._removeAfterFiring) {
308 _removeListener(subscription);
309 }
310 subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
311 } else {
312 link = subscription._next;
313 }
314 }
315 _state &= ~_STATE_FIRING;
316
317 if (_isEmpty) {
318 _callOnCancel();
319 }
320 }
321
322 void _callOnCancel() {
323 assert(_isEmpty);
324 if (isClosed && _doneFuture._mayComplete) {
325 // When closed, _doneFuture is not null.
326 _doneFuture._asyncSetValue(null);
327 }
328 _runGuarded(_onCancel);
329 }
330 }
331
332 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
333 _SyncBroadcastStreamController(void onListen(), void onCancel())
334 : super(onListen, onCancel);
335
336 // EventDispatch interface.
337
338 void _sendData(T data) {
339 if (_isEmpty) return;
340 _forEachListener((_BufferingStreamSubscription<T> subscription) {
341 subscription._add(data);
342 });
343 }
344
345 void _sendError(Object error) {
346 if (_isEmpty) return;
347 _forEachListener((_BufferingStreamSubscription<T> subscription) {
348 subscription._addError(error);
349 });
350 }
351
352 void _sendDone() {
353 if (!_isEmpty) {
354 _forEachListener((_BroadcastSubscription<T> subscription) {
355 subscription._close();
356 });
357 } else {
358 assert(_doneFuture != null);
359 assert(_doneFuture._mayComplete);
360 _doneFuture._asyncSetValue(null);
361 }
362 }
363 }
364
365 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
366 _AsyncBroadcastStreamController(void onListen(), void onCancel())
367 : super(onListen, onCancel);
368
369 // EventDispatch interface.
370
371 void _sendData(T data) {
372 for (_BroadcastSubscriptionLink link = _next;
373 !identical(link, this);
374 link = link._next) {
375 _BroadcastSubscription<T> subscription = link;
376 subscription._addPending(new _DelayedData(data));
377 }
378 }
379
380 void _sendError(Object error) {
381 for (_BroadcastSubscriptionLink link = _next;
382 !identical(link, this);
383 link = link._next) {
384 _BroadcastSubscription<T> subscription = link;
385 subscription._addPending(new _DelayedError(error));
386 }
387 }
388
389 void _sendDone() {
390 if (!_isEmpty) {
391 for (_BroadcastSubscriptionLink link = _next;
392 !identical(link, this);
393 link = link._next) {
394 _BroadcastSubscription<T> subscription = link;
395 subscription._addPending(const _DelayedDone());
396 }
397 } else {
398 assert(_doneFuture != null);
399 assert(_doneFuture._mayComplete);
400 _doneFuture._asyncSetValue(null);
401 }
402 }
403 }
404
405 /**
406 * Stream controller that is used by [Stream.asBroadcastStream].
407 *
408 * This stream controller allows incoming events while it is firing
409 * other events. This is handled by delaying the events until the
410 * current event is done firing, and then fire the pending events.
411 *
412 * This class extends [_SyncBroadcastStreamController]. Events of
413 * an "asBroadcastStream" stream are always initiated by events
414 * on another stream, and it is fine to forward them synchronously.
415 */
416 class _AsBroadcastStreamController<T>
417 extends _SyncBroadcastStreamController<T>
418 implements _EventDispatch<T> {
419 _StreamImplEvents _pending;
420
421 _AsBroadcastStreamController(void onListen(), void onCancel())
422 : super(onListen, onCancel);
423
424 bool get _hasPending => _pending != null && ! _pending.isEmpty;
425
426 void _addPendingEvent(_DelayedEvent event) {
427 if (_pending == null) {
428 _pending = new _StreamImplEvents();
429 }
430 _pending.add(event);
431 }
432
433 void add(T data) {
434 if (!isClosed && _isFiring) {
435 _addPendingEvent(new _DelayedData<T>(data));
436 return;
437 }
438 super.add(data);
439 while (_hasPending) {
440 _pending.handleNext(this);
441 }
442 }
443
444 void addError(Object error, [StackTrace stackTrace]) {
445 if (!isClosed && _isFiring) {
446 _addPendingEvent(new _DelayedError(error));
447 return;
448 }
449 super.addError(error, stackTrace);
450 while (_hasPending) {
451 _pending.handleNext(this);
452 }
453 }
454
455 void close() {
456 if (!isClosed && _isFiring) {
457 _addPendingEvent(const _DelayedDone());
458 _state |= _STATE_CLOSED;
459 return;
460 }
461 super.close();
462 assert(!_hasPending);
463 }
464
465 void _callOnCancel() {
466 if (_hasPending) {
467 _pending.clear();
468 _pending = null;
469 }
470 super._callOnCancel();
471 }
472 }
473
474 // A subscription that never receives any events.
475 // It can simulate pauses, but otherwise does nothing.
476 class _DoneSubscription<T> implements StreamSubscription<T> {
477 int _pauseCount = 0;
478 void onData(void handleData(T data)) {}
479 void onError(void handleErrr(Object error)) {}
480 void onDone(void handleDone()) {}
481 void pause([Future resumeSignal]) {
482 if (resumeSignal != null) resumeSignal.then(_resume);
483 _pauseCount++;
484 }
485 void resume() { _resume(null); }
486 void _resume(_) {
487 if (_pauseCount > 0) _pauseCount--;
488 }
489 void cancel() {}
490 bool get isPaused => _pauseCount > 0;
491 Future asFuture(Object value) => new _FutureImpl();
492 }
OLDNEW
« no previous file with comments | « sdk/lib/async/async_sources.gypi ('k') | sdk/lib/async/future_impl.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698