Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(277)

Side by Side Diff: sdk/lib/async/broadcast_stream_controller.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/async_error.dart ('k') | sdk/lib/async/deferred_load.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 class _BroadcastStream<T> extends _ControllerStream<T> { 7 class _BroadcastStream<T> extends _ControllerStream<T> {
8 _BroadcastStream(_StreamControllerLifecycle<T> controller) 8 _BroadcastStream(_StreamControllerLifecycle<T> controller)
9 : super(controller); 9 : super(controller);
10 10
11 bool get isBroadcast => true; 11 bool get isBroadcast => true;
12 } 12 }
13 13
14 class _BroadcastSubscription<T> extends _ControllerSubscription<T> { 14 class _BroadcastSubscription<T> extends _ControllerSubscription<T> {
15 static const int _STATE_EVENT_ID = 1; 15 static const int _STATE_EVENT_ID = 1;
16 static const int _STATE_FIRING = 2; 16 static const int _STATE_FIRING = 2;
17 static const int _STATE_REMOVE_AFTER_FIRING = 4; 17 static const int _STATE_REMOVE_AFTER_FIRING = 4;
18 // TODO(lrn): Use the _state field on _ControllerSubscription to 18 // TODO(lrn): Use the _state field on _ControllerSubscription to
19 // also store this state. Requires that the subscription implementation 19 // also store this state. Requires that the subscription implementation
20 // does not assume that it's use of the state integer is the only use. 20 // does not assume that it's use of the state integer is the only use.
21 int _eventState = 0; // Initialized to help dart2js type inference. 21 int _eventState = 0; // Initialized to help dart2js type inference.
22 22
23 _BroadcastSubscription<T> _next; 23 _BroadcastSubscription<T> _next;
24 _BroadcastSubscription<T> _previous; 24 _BroadcastSubscription<T> _previous;
25 25
26 _BroadcastSubscription(_StreamControllerLifecycle<T> controller, 26 _BroadcastSubscription(_StreamControllerLifecycle<T> controller,
27 void onData(T data), 27 void onData(T data), Function onError, void onDone(), bool cancelOnError)
28 Function onError,
29 void onDone(),
30 bool cancelOnError)
31 : super(controller, onData, onError, onDone, cancelOnError) { 28 : super(controller, onData, onError, onDone, cancelOnError) {
32 _next = _previous = this; 29 _next = _previous = this;
33 } 30 }
34 31
35 bool _expectsEvent(int eventId) => 32 bool _expectsEvent(int eventId) => (_eventState & _STATE_EVENT_ID) == eventId;
36 (_eventState & _STATE_EVENT_ID) == eventId;
37 33
38 void _toggleEventId() { 34 void _toggleEventId() {
39 _eventState ^= _STATE_EVENT_ID; 35 _eventState ^= _STATE_EVENT_ID;
40 } 36 }
41 37
42 bool get _isFiring => (_eventState & _STATE_FIRING) != 0; 38 bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
43 39
44 void _setRemoveAfterFiring() { 40 void _setRemoveAfterFiring() {
45 assert(_isFiring); 41 assert(_isFiring);
46 _eventState |= _STATE_REMOVE_AFTER_FIRING; 42 _eventState |= _STATE_REMOVE_AFTER_FIRING;
47 } 43 }
48 44
49 bool get _removeAfterFiring => 45 bool get _removeAfterFiring =>
50 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; 46 (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
51 47
52 // The controller._recordPause doesn't do anything for a broadcast controller, 48 // The controller._recordPause doesn't do anything for a broadcast controller,
53 // so we don't bother calling it. 49 // so we don't bother calling it.
54 void _onPause() { } 50 void _onPause() {}
55 51
56 // The controller._recordResume doesn't do anything for a broadcast 52 // The controller._recordResume doesn't do anything for a broadcast
57 // controller, so we don't bother calling it. 53 // controller, so we don't bother calling it.
58 void _onResume() { } 54 void _onResume() {}
59 55
60 // _onCancel is inherited. 56 // _onCancel is inherited.
61 } 57 }
62 58
63 abstract class _BroadcastStreamController<T> 59 abstract class _BroadcastStreamController<T>
64 implements StreamController<T>, 60 implements
65 _StreamControllerLifecycle<T>, 61 StreamController<T>,
66 _EventSink<T>, 62 _StreamControllerLifecycle<T>,
67 _EventDispatch<T> { 63 _EventSink<T>,
64 _EventDispatch<T> {
68 static const int _STATE_INITIAL = 0; 65 static const int _STATE_INITIAL = 0;
69 static const int _STATE_EVENT_ID = 1; 66 static const int _STATE_EVENT_ID = 1;
70 static const int _STATE_FIRING = 2; 67 static const int _STATE_FIRING = 2;
71 static const int _STATE_CLOSED = 4; 68 static const int _STATE_CLOSED = 4;
72 static const int _STATE_ADDSTREAM = 8; 69 static const int _STATE_ADDSTREAM = 8;
73 70
74 ControllerCallback onListen; 71 ControllerCallback onListen;
75 ControllerCancelCallback onCancel; 72 ControllerCancelCallback onCancel;
76 73
77 // State of the controller. 74 // State of the controller.
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
110 void set onPause(void onPauseHandler()) { 107 void set onPause(void onPauseHandler()) {
111 throw new UnsupportedError( 108 throw new UnsupportedError(
112 "Broadcast stream controllers do not support pause callbacks"); 109 "Broadcast stream controllers do not support pause callbacks");
113 } 110 }
114 111
115 ControllerCallback get onResume { 112 ControllerCallback get onResume {
116 throw new UnsupportedError( 113 throw new UnsupportedError(
117 "Broadcast stream controllers do not support pause callbacks"); 114 "Broadcast stream controllers do not support pause callbacks");
118 } 115 }
119 116
120 void set onResume(void onResumeHandler()) { 117 void set onResume(void onResumeHandler()) {
121 throw new UnsupportedError( 118 throw new UnsupportedError(
122 "Broadcast stream controllers do not support pause callbacks"); 119 "Broadcast stream controllers do not support pause callbacks");
123 } 120 }
124 121
125 // StreamController interface. 122 // StreamController interface.
126 123
127 Stream<T> get stream => new _BroadcastStream<T>(this); 124 Stream<T> get stream => new _BroadcastStream<T>(this);
128 125
129 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); 126 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
130 127
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
199 _lastSubscription = previous; 196 _lastSubscription = previous;
200 } else { 197 } else {
201 next._previous = previous; 198 next._previous = previous;
202 } 199 }
203 200
204 subscription._next = subscription._previous = subscription; 201 subscription._next = subscription._previous = subscription;
205 } 202 }
206 203
207 // _StreamControllerLifecycle interface. 204 // _StreamControllerLifecycle interface.
208 205
209 StreamSubscription<T> _subscribe( 206 StreamSubscription<T> _subscribe(void onData(T data), Function onError,
210 void onData(T data), 207 void onDone(), bool cancelOnError) {
211 Function onError,
212 void onDone(),
213 bool cancelOnError) {
214 if (isClosed) { 208 if (isClosed) {
215 if (onDone == null) onDone = _nullDoneHandler; 209 if (onDone == null) onDone = _nullDoneHandler;
216 return new _DoneStreamSubscription<T>(onDone); 210 return new _DoneStreamSubscription<T>(onDone);
217 } 211 }
218 StreamSubscription<T> subscription = 212 StreamSubscription<T> subscription = new _BroadcastSubscription<T>(
219 new _BroadcastSubscription<T>(this, onData, onError, onDone, 213 this, onData, onError, onDone, cancelOnError);
220 cancelOnError);
221 _addListener(subscription); 214 _addListener(subscription);
222 if (identical(_firstSubscription, _lastSubscription)) { 215 if (identical(_firstSubscription, _lastSubscription)) {
223 // Only one listener, so it must be the first listener. 216 // Only one listener, so it must be the first listener.
224 _runGuarded(onListen); 217 _runGuarded(onListen);
225 } 218 }
226 return subscription; 219 return subscription;
227 } 220 }
228 221
229 Future _recordCancel(StreamSubscription<T> sub) { 222 Future _recordCancel(StreamSubscription<T> sub) {
230 _BroadcastSubscription<T> subscription = sub; 223 _BroadcastSubscription<T> subscription = sub;
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 assert(_isEmpty); 348 assert(_isEmpty);
356 if (isClosed && _doneFuture._mayComplete) { 349 if (isClosed && _doneFuture._mayComplete) {
357 // When closed, _doneFuture is not null. 350 // When closed, _doneFuture is not null.
358 _doneFuture._asyncComplete(null); 351 _doneFuture._asyncComplete(null);
359 } 352 }
360 _runGuarded(onCancel); 353 _runGuarded(onCancel);
361 } 354 }
362 } 355 }
363 356
364 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> 357 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
365 implements SynchronousStreamController<T> { 358 implements SynchronousStreamController<T> {
366 _SyncBroadcastStreamController(void onListen(), void onCancel()) 359 _SyncBroadcastStreamController(void onListen(), void onCancel())
367 : super(onListen, onCancel); 360 : super(onListen, onCancel);
368 361
369 // EventDispatch interface. 362 // EventDispatch interface.
370 363
371 bool get _mayAddEvent => super._mayAddEvent && !_isFiring; 364 bool get _mayAddEvent => super._mayAddEvent && !_isFiring;
372 365
373 _addEventError() { 366 _addEventError() {
374 if (_isFiring) { 367 if (_isFiring) {
375 return new StateError( 368 return new StateError(
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
416 } 409 }
417 410
418 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { 411 class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
419 _AsyncBroadcastStreamController(void onListen(), void onCancel()) 412 _AsyncBroadcastStreamController(void onListen(), void onCancel())
420 : super(onListen, onCancel); 413 : super(onListen, onCancel);
421 414
422 // EventDispatch interface. 415 // EventDispatch interface.
423 416
424 void _sendData(T data) { 417 void _sendData(T data) {
425 for (_BroadcastSubscription<T> subscription = _firstSubscription; 418 for (_BroadcastSubscription<T> subscription = _firstSubscription;
426 subscription != null; 419 subscription != null;
427 subscription = subscription._next) { 420 subscription = subscription._next) {
428 subscription._addPending(new _DelayedData<T>(data)); 421 subscription._addPending(new _DelayedData<T>(data));
429 } 422 }
430 } 423 }
431 424
432 void _sendError(Object error, StackTrace stackTrace) { 425 void _sendError(Object error, StackTrace stackTrace) {
433 for (_BroadcastSubscription<T> subscription = _firstSubscription; 426 for (_BroadcastSubscription<T> subscription = _firstSubscription;
434 subscription != null; 427 subscription != null;
435 subscription = subscription._next) { 428 subscription = subscription._next) {
436 subscription._addPending(new _DelayedError(error, stackTrace)); 429 subscription._addPending(new _DelayedError(error, stackTrace));
437 } 430 }
438 } 431 }
439 432
440 void _sendDone() { 433 void _sendDone() {
441 if (!_isEmpty) { 434 if (!_isEmpty) {
442 for (_BroadcastSubscription<T> subscription = _firstSubscription; 435 for (_BroadcastSubscription<T> subscription = _firstSubscription;
443 subscription != null; 436 subscription != null;
444 subscription = subscription._next) { 437 subscription = subscription._next) {
445 subscription._addPending(const _DelayedDone()); 438 subscription._addPending(const _DelayedDone());
446 } 439 }
447 } else { 440 } else {
448 assert(_doneFuture != null); 441 assert(_doneFuture != null);
449 assert(_doneFuture._mayComplete); 442 assert(_doneFuture._mayComplete);
450 _doneFuture._asyncComplete(null); 443 _doneFuture._asyncComplete(null);
451 } 444 }
452 } 445 }
453 } 446 }
454 447
455 /** 448 /**
456 * Stream controller that is used by [Stream.asBroadcastStream]. 449 * Stream controller that is used by [Stream.asBroadcastStream].
457 * 450 *
458 * This stream controller allows incoming events while it is firing 451 * This stream controller allows incoming events while it is firing
459 * other events. This is handled by delaying the events until the 452 * other events. This is handled by delaying the events until the
460 * current event is done firing, and then fire the pending events. 453 * current event is done firing, and then fire the pending events.
461 * 454 *
462 * This class extends [_SyncBroadcastStreamController]. Events of 455 * This class extends [_SyncBroadcastStreamController]. Events of
463 * an "asBroadcastStream" stream are always initiated by events 456 * an "asBroadcastStream" stream are always initiated by events
464 * on another stream, and it is fine to forward them synchronously. 457 * on another stream, and it is fine to forward them synchronously.
465 */ 458 */
466 class _AsBroadcastStreamController<T> 459 class _AsBroadcastStreamController<T> extends _SyncBroadcastStreamController<T>
467 extends _SyncBroadcastStreamController<T>
468 implements _EventDispatch<T> { 460 implements _EventDispatch<T> {
469 _StreamImplEvents<T> _pending; 461 _StreamImplEvents<T> _pending;
470 462
471 _AsBroadcastStreamController(void onListen(), void onCancel()) 463 _AsBroadcastStreamController(void onListen(), void onCancel())
472 : super(onListen, onCancel); 464 : super(onListen, onCancel);
473 465
474 bool get _hasPending => _pending != null && ! _pending.isEmpty; 466 bool get _hasPending => _pending != null && !_pending.isEmpty;
475 467
476 void _addPendingEvent(_DelayedEvent event) { 468 void _addPendingEvent(_DelayedEvent event) {
477 if (_pending == null) { 469 if (_pending == null) {
478 _pending = new _StreamImplEvents<T>(); 470 _pending = new _StreamImplEvents<T>();
479 } 471 }
480 _pending.add(event); 472 _pending.add(event);
481 } 473 }
482 474
483 void add(T data) { 475 void add(T data) {
484 if (!isClosed && _isFiring) { 476 if (!isClosed && _isFiring) {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
527 // It can simulate pauses, but otherwise does nothing. 519 // It can simulate pauses, but otherwise does nothing.
528 class _DoneSubscription<T> implements StreamSubscription<T> { 520 class _DoneSubscription<T> implements StreamSubscription<T> {
529 int _pauseCount = 0; 521 int _pauseCount = 0;
530 void onData(void handleData(T data)) {} 522 void onData(void handleData(T data)) {}
531 void onError(Function handleError) {} 523 void onError(Function handleError) {}
532 void onDone(void handleDone()) {} 524 void onDone(void handleDone()) {}
533 void pause([Future resumeSignal]) { 525 void pause([Future resumeSignal]) {
534 if (resumeSignal != null) resumeSignal.then(_resume); 526 if (resumeSignal != null) resumeSignal.then(_resume);
535 _pauseCount++; 527 _pauseCount++;
536 } 528 }
537 void resume() { _resume(null); } 529
530 void resume() {
531 _resume(null);
532 }
533
538 void _resume(_) { 534 void _resume(_) {
539 if (_pauseCount > 0) _pauseCount--; 535 if (_pauseCount > 0) _pauseCount--;
540 } 536 }
541 Future cancel() { return new _Future.immediate(null); } 537
538 Future cancel() {
539 return new _Future.immediate(null);
540 }
541
542 bool get isPaused => _pauseCount > 0; 542 bool get isPaused => _pauseCount > 0;
543 Future<E> asFuture<E>([E value]) => new _Future<E>(); 543 Future<E> asFuture<E>([E value]) => new _Future<E>();
544 } 544 }
OLDNEW
« no previous file with comments | « sdk/lib/async/async_error.dart ('k') | sdk/lib/async/deferred_load.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698