Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(215)

Side by Side Diff: sdk/lib/async/broadcast_stream_controller.dart

Issue 1278873008: Add getters for callbacks on StreamController. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Fix incorrect return type on broadcast stream controller. Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 class _BroadcastStream<T> extends _ControllerStream<T> { 7 class _BroadcastStream<T> extends _ControllerStream<T> {
8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller); 8 _BroadcastStream(_StreamControllerLifecycle controller) : super(controller);
9 9
10 bool get isBroadcast => true; 10 bool get isBroadcast => true;
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 _StreamControllerLifecycle<T>, 72 _StreamControllerLifecycle<T>,
73 _BroadcastSubscriptionLink, 73 _BroadcastSubscriptionLink,
74 _EventSink<T>, 74 _EventSink<T>,
75 _EventDispatch<T> { 75 _EventDispatch<T> {
76 static const int _STATE_INITIAL = 0; 76 static const int _STATE_INITIAL = 0;
77 static const int _STATE_EVENT_ID = 1; 77 static const int _STATE_EVENT_ID = 1;
78 static const int _STATE_FIRING = 2; 78 static const int _STATE_FIRING = 2;
79 static const int _STATE_CLOSED = 4; 79 static const int _STATE_CLOSED = 4;
80 static const int _STATE_ADDSTREAM = 8; 80 static const int _STATE_ADDSTREAM = 8;
81 81
82 _NotificationHandler _onListen; 82 ControllerCallback onListen;
83 _NotificationHandler _onCancel; 83 ControllerCancelCallback onCancel;
84 84
85 // State of the controller. 85 // State of the controller.
86 int _state; 86 int _state;
87 87
88 // Double-linked list of active listeners. 88 // Double-linked list of active listeners.
89 _BroadcastSubscriptionLink _next; 89 _BroadcastSubscriptionLink _next;
90 _BroadcastSubscriptionLink _previous; 90 _BroadcastSubscriptionLink _previous;
91 91
92 // Extra state used during an [addStream] call. 92 // Extra state used during an [addStream] call.
93 _AddStreamState<T> _addStreamState; 93 _AddStreamState<T> _addStreamState;
94 94
95 /** 95 /**
96 * Future returned by [close] and [done]. 96 * Future returned by [close] and [done].
97 * 97 *
98 * The future is completed whenever the done event has been sent to all 98 * The future is completed whenever the done event has been sent to all
99 * relevant listeners. 99 * relevant listeners.
100 * The relevant listeners are the ones that were listening when [close] was 100 * The relevant listeners are the ones that were listening when [close] was
101 * called. When all of these have been canceled (sending the done event makes 101 * called. When all of these have been canceled (sending the done event makes
102 * them cancel, but they can also be canceled before sending the event), 102 * them cancel, but they can also be canceled before sending the event),
103 * this future completes. 103 * this future completes.
104 * 104 *
105 * Any attempt to listen after calling [close] will throw, so there won't 105 * Any attempt to listen after calling [close] will throw, so there won't
106 * be any further listeners. 106 * be any further listeners.
107 */ 107 */
108 _Future _doneFuture; 108 _Future _doneFuture;
109 109
110 _BroadcastStreamController(this._onListen, this._onCancel) 110 _BroadcastStreamController(this.onListen, this.onCancel)
111 : _state = _STATE_INITIAL { 111 : _state = _STATE_INITIAL {
112 _next = _previous = this; 112 _next = _previous = this;
113 } 113 }
114 114
115 void set onListen(void onListenHandler()) { _onListen = onListenHandler; } 115 ControllerCallback get onPause {
116 throw new UnsupportedError(
117 "Broadcast stream controllers do not support pause callbacks");
118 }
116 119
117 void set onPause(void onPauseHandler()) { 120 void set onPause(void onPauseHandler()) {
118 throw new UnsupportedError( 121 throw new UnsupportedError(
119 "Broadcast stream controllers do not support pause callbacks"); 122 "Broadcast stream controllers do not support pause callbacks");
120 } 123 }
121 124
125 ControllerCallback get onResume {
126 throw new UnsupportedError(
127 "Broadcast stream controllers do not support pause callbacks");
128 }
129
122 void set onResume(void onResumeHandler()) { 130 void set onResume(void onResumeHandler()) {
123 throw new UnsupportedError( 131 throw new UnsupportedError(
124 "Broadcast stream controllers do not support pause callbacks"); 132 "Broadcast stream controllers do not support pause callbacks");
125 } 133 }
126 134
127 void set onCancel(onCancelHandler()) { _onCancel = onCancelHandler; }
128
129 // StreamController interface. 135 // StreamController interface.
130 136
131 Stream<T> get stream => new _BroadcastStream<T>(this); 137 Stream<T> get stream => new _BroadcastStream<T>(this);
132 138
133 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); 139 StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
134 140
135 bool get isClosed => (_state & _STATE_CLOSED) != 0; 141 bool get isClosed => (_state & _STATE_CLOSED) != 0;
136 142
137 /** 143 /**
138 * A broadcast controller is never paused. 144 * A broadcast controller is never paused.
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
202 if (isClosed) { 208 if (isClosed) {
203 if (onDone == null) onDone = _nullDoneHandler; 209 if (onDone == null) onDone = _nullDoneHandler;
204 return new _DoneStreamSubscription<T>(onDone); 210 return new _DoneStreamSubscription<T>(onDone);
205 } 211 }
206 StreamSubscription subscription = 212 StreamSubscription subscription =
207 new _BroadcastSubscription<T>(this, onData, onError, onDone, 213 new _BroadcastSubscription<T>(this, onData, onError, onDone,
208 cancelOnError); 214 cancelOnError);
209 _addListener(subscription); 215 _addListener(subscription);
210 if (identical(_next, _previous)) { 216 if (identical(_next, _previous)) {
211 // Only one listener, so it must be the first listener. 217 // Only one listener, so it must be the first listener.
212 _runGuarded(_onListen); 218 _runGuarded(onListen);
213 } 219 }
214 return subscription; 220 return subscription;
215 } 221 }
216 222
217 Future _recordCancel(_BroadcastSubscription<T> subscription) { 223 Future _recordCancel(_BroadcastSubscription<T> subscription) {
218 // If already removed by the stream, don't remove it again. 224 // If already removed by the stream, don't remove it again.
219 if (identical(subscription._next, subscription)) return null; 225 if (identical(subscription._next, subscription)) return null;
220 assert(!identical(subscription._next, subscription)); 226 assert(!identical(subscription._next, subscription));
221 if (subscription._isFiring) { 227 if (subscription._isFiring) {
222 subscription._setRemoveAfterFiring(); 228 subscription._setRemoveAfterFiring();
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
303 void _forEachListener( 309 void _forEachListener(
304 void action(_BufferingStreamSubscription<T> subscription)) { 310 void action(_BufferingStreamSubscription<T> subscription)) {
305 if (_isFiring) { 311 if (_isFiring) {
306 throw new StateError( 312 throw new StateError(
307 "Cannot fire new event. Controller is already firing an event"); 313 "Cannot fire new event. Controller is already firing an event");
308 } 314 }
309 if (_isEmpty) return; 315 if (_isEmpty) return;
310 316
311 // Get event id of this event. 317 // Get event id of this event.
312 int id = (_state & _STATE_EVENT_ID); 318 int id = (_state & _STATE_EVENT_ID);
313 // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] 319 // Start firing (set the _STATE_FIRING bit). We don't do [onCancel]
314 // callbacks while firing, and we prevent reentrancy of this function. 320 // callbacks while firing, and we prevent reentrancy of this function.
315 // 321 //
316 // Set [_state]'s event id to the next event's id. 322 // Set [_state]'s event id to the next event's id.
317 // Any listeners added while firing this event will expect the next event, 323 // Any listeners added while firing this event will expect the next event,
318 // not this one, and won't get notified. 324 // not this one, and won't get notified.
319 _state ^= _STATE_EVENT_ID | _STATE_FIRING; 325 _state ^= _STATE_EVENT_ID | _STATE_FIRING;
320 _BroadcastSubscriptionLink link = _next; 326 _BroadcastSubscriptionLink link = _next;
321 while (!identical(link, this)) { 327 while (!identical(link, this)) {
322 _BroadcastSubscription<T> subscription = link; 328 _BroadcastSubscription<T> subscription = link;
323 if (subscription._expectsEvent(id)) { 329 if (subscription._expectsEvent(id)) {
(...skipping 15 matching lines...) Expand all
339 _callOnCancel(); 345 _callOnCancel();
340 } 346 }
341 } 347 }
342 348
343 void _callOnCancel() { 349 void _callOnCancel() {
344 assert(_isEmpty); 350 assert(_isEmpty);
345 if (isClosed && _doneFuture._mayComplete) { 351 if (isClosed && _doneFuture._mayComplete) {
346 // When closed, _doneFuture is not null. 352 // When closed, _doneFuture is not null.
347 _doneFuture._asyncComplete(null); 353 _doneFuture._asyncComplete(null);
348 } 354 }
349 _runGuarded(_onCancel); 355 _runGuarded(onCancel);
350 } 356 }
351 } 357 }
352 358
353 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> 359 class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
354 implements SynchronousStreamController<T> { 360 implements SynchronousStreamController<T> {
355 _SyncBroadcastStreamController(void onListen(), void onCancel()) 361 _SyncBroadcastStreamController(void onListen(), void onCancel())
356 : super(onListen, onCancel); 362 : super(onListen, onCancel);
357 363
358 // EventDispatch interface. 364 // EventDispatch interface.
359 365
(...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after
527 _pauseCount++; 533 _pauseCount++;
528 } 534 }
529 void resume() { _resume(null); } 535 void resume() { _resume(null); }
530 void _resume(_) { 536 void _resume(_) {
531 if (_pauseCount > 0) _pauseCount--; 537 if (_pauseCount > 0) _pauseCount--;
532 } 538 }
533 Future cancel() { return new _Future.immediate(null); } 539 Future cancel() { return new _Future.immediate(null); }
534 bool get isPaused => _pauseCount > 0; 540 bool get isPaused => _pauseCount > 0;
535 Future asFuture([Object value]) => new _Future(); 541 Future asFuture([Object value]) => new _Future();
536 } 542 }
OLDNEW
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698