Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Unified Diff: tool/input_sdk/lib/async/broadcast_stream_controller.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Remove unneeded calls. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tool/input_sdk/lib/async/async_error.dart ('k') | tool/input_sdk/lib/async/future.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tool/input_sdk/lib/async/broadcast_stream_controller.dart
diff --git a/tool/input_sdk/lib/async/broadcast_stream_controller.dart b/tool/input_sdk/lib/async/broadcast_stream_controller.dart
index 401cf17c80ef978fa2e32c9e519b08de0d8c11f9..c34ec9f78b8c17d9afb4bf11a35f64d30f579405 100644
--- a/tool/input_sdk/lib/async/broadcast_stream_controller.dart
+++ b/tool/input_sdk/lib/async/broadcast_stream_controller.dart
@@ -5,30 +5,25 @@
part of dart.async;
class _BroadcastStream<T> extends _ControllerStream<T> {
- _BroadcastStream(_StreamControllerLifecycle controller) : super(controller);
+ _BroadcastStream(_StreamControllerLifecycle<T> controller)
+ : super(controller);
bool get isBroadcast => true;
}
-abstract class _BroadcastSubscriptionLink {
- _BroadcastSubscriptionLink _next;
- _BroadcastSubscriptionLink _previous;
-}
-
-class _BroadcastSubscription<T> extends _ControllerSubscription<T>
- implements _BroadcastSubscriptionLink {
+class _BroadcastSubscription<T> extends _ControllerSubscription<T> {
static const int _STATE_EVENT_ID = 1;
static const int _STATE_FIRING = 2;
static const int _STATE_REMOVE_AFTER_FIRING = 4;
// TODO(lrn): Use the _state field on _ControllerSubscription to
// also store this state. Requires that the subscription implementation
// does not assume that it's use of the state integer is the only use.
- int _eventState;
+ int _eventState = 0; // Initialized to help dart2js type inference.
- _BroadcastSubscriptionLink _next;
- _BroadcastSubscriptionLink _previous;
+ _BroadcastSubscription<T> _next;
+ _BroadcastSubscription<T> _previous;
- _BroadcastSubscription(_StreamControllerLifecycle controller,
+ _BroadcastSubscription(_StreamControllerLifecycle<T> controller,
void onData(T data),
Function onError,
void onDone(),
@@ -65,11 +60,9 @@ class _BroadcastSubscription<T> extends _ControllerSubscription<T>
// _onCancel is inherited.
}
-
abstract class _BroadcastStreamController<T>
implements StreamController<T>,
_StreamControllerLifecycle<T>,
- _BroadcastSubscriptionLink,
_EventSink<T>,
_EventDispatch<T> {
static const int _STATE_INITIAL = 0;
@@ -78,15 +71,15 @@ abstract class _BroadcastStreamController<T>
static const int _STATE_CLOSED = 4;
static const int _STATE_ADDSTREAM = 8;
- final _NotificationHandler _onListen;
- final _NotificationHandler _onCancel;
+ ControllerCallback onListen;
+ ControllerCancelCallback onCancel;
// State of the controller.
int _state;
// Double-linked list of active listeners.
- _BroadcastSubscriptionLink _next;
- _BroadcastSubscriptionLink _previous;
+ _BroadcastSubscription<T> _firstSubscription;
+ _BroadcastSubscription<T> _lastSubscription;
// Extra state used during an [addStream] call.
_AddStreamState<T> _addStreamState;
@@ -106,9 +99,27 @@ abstract class _BroadcastStreamController<T>
*/
_Future _doneFuture;
- _BroadcastStreamController(this._onListen, this._onCancel)
- : _state = _STATE_INITIAL {
- _next = _previous = this;
+ _BroadcastStreamController(this.onListen, this.onCancel)
+ : _state = _STATE_INITIAL;
+
+ ControllerCallback get onPause {
+ throw new UnsupportedError(
+ "Broadcast stream controllers do not support pause callbacks");
+ }
+
+ void set onPause(void onPauseHandler()) {
+ throw new UnsupportedError(
+ "Broadcast stream controllers do not support pause callbacks");
+ }
+
+ ControllerCallback get onResume {
+ throw new UnsupportedError(
+ "Broadcast stream controllers do not support pause callbacks");
+ }
+
+ void set onResume(void onResumeHandler()) {
+ throw new UnsupportedError(
+ "Broadcast stream controllers do not support pause callbacks");
}
// StreamController interface.
@@ -137,7 +148,7 @@ abstract class _BroadcastStreamController<T>
*/
bool get _hasOneListener {
assert(!_isEmpty);
- return identical(_next._next, this);
+ return identical(_firstSubscription, _lastSubscription);
}
/** Whether an event is being fired (sent to some, but not all, listeners). */
@@ -154,26 +165,42 @@ abstract class _BroadcastStreamController<T>
// Linked list helpers
- bool get _isEmpty => identical(_next, this);
+ bool get _isEmpty => _firstSubscription == null;
/** Adds subscription to linked list of active listeners. */
void _addListener(_BroadcastSubscription<T> subscription) {
assert(identical(subscription._next, subscription));
- // Insert in linked list just before `this`.
- subscription._previous = _previous;
- subscription._next = this;
- this._previous._next = subscription;
- this._previous = subscription;
subscription._eventState = (_state & _STATE_EVENT_ID);
+ // Insert in linked list as last subscription.
+ _BroadcastSubscription<T> oldLast = _lastSubscription;
+ _lastSubscription = subscription;
+ subscription._next = null;
+ subscription._previous = oldLast;
+ if (oldLast == null) {
+ _firstSubscription = subscription;
+ } else {
+ oldLast._next = subscription;
+ }
}
void _removeListener(_BroadcastSubscription<T> subscription) {
assert(identical(subscription._controller, this));
assert(!identical(subscription._next, subscription));
- _BroadcastSubscriptionLink previous = subscription._previous;
- _BroadcastSubscriptionLink next = subscription._next;
- previous._next = next;
- next._previous = previous;
+ _BroadcastSubscription<T> previous = subscription._previous;
+ _BroadcastSubscription<T> next = subscription._next;
+ if (previous == null) {
+ // This was the first subscription.
+ _firstSubscription = next;
+ } else {
+ previous._next = next;
+ }
+ if (next == null) {
+ // This was the last subscription.
+ _lastSubscription = previous;
+ } else {
+ next._previous = previous;
+ }
+
subscription._next = subscription._previous = subscription;
}
@@ -188,26 +215,24 @@ abstract class _BroadcastStreamController<T>
if (onDone == null) onDone = _nullDoneHandler;
return new _DoneStreamSubscription<T>(onDone);
}
- StreamSubscription subscription =
+ StreamSubscription<T> subscription =
new _BroadcastSubscription<T>(this, onData, onError, onDone,
cancelOnError);
_addListener(subscription);
- if (identical(_next, _previous)) {
+ if (identical(_firstSubscription, _lastSubscription)) {
// Only one listener, so it must be the first listener.
- _runGuarded(_onListen);
+ _runGuarded(onListen);
}
return subscription;
}
Future _recordCancel(StreamSubscription<T> sub) {
- var subscription = sub as _BroadcastSubscription<T>;
+ _BroadcastSubscription<T> subscription = sub;
// If already removed by the stream, don't remove it again.
if (identical(subscription._next, subscription)) return null;
- assert(!identical(subscription._next, subscription));
if (subscription._isFiring) {
subscription._setRemoveAfterFiring();
} else {
- assert(!identical(subscription._next, subscription));
_removeListener(subscription);
// If we are currently firing an event, the empty-check is performed at
// the end of the listener loop instead of here.
@@ -296,27 +321,27 @@ abstract class _BroadcastStreamController<T>
// Get event id of this event.
int id = (_state & _STATE_EVENT_ID);
- // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
+ // Start firing (set the _STATE_FIRING bit). We don't do [onCancel]
// callbacks while firing, and we prevent reentrancy of this function.
//
// Set [_state]'s event id to the next event's id.
// Any listeners added while firing this event will expect the next event,
// not this one, and won't get notified.
_state ^= _STATE_EVENT_ID | _STATE_FIRING;
- _BroadcastSubscriptionLink link = _next;
- while (!identical(link, this)) {
- _BroadcastSubscription<T> subscription = link;
+ _BroadcastSubscription<T> subscription = _firstSubscription;
+ while (subscription != null) {
if (subscription._expectsEvent(id)) {
subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
action(subscription);
subscription._toggleEventId();
- link = subscription._next;
+ _BroadcastSubscription<T> next = subscription._next;
if (subscription._removeAfterFiring) {
_removeListener(subscription);
}
subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
+ subscription = next;
} else {
- link = subscription._next;
+ subscription = subscription._next;
}
}
_state &= ~_STATE_FIRING;
@@ -332,21 +357,32 @@ abstract class _BroadcastStreamController<T>
// When closed, _doneFuture is not null.
_doneFuture._asyncComplete(null);
}
- _runGuarded(_onCancel);
+ _runGuarded(onCancel);
}
}
-class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
+class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T>
+ implements SynchronousStreamController<T> {
_SyncBroadcastStreamController(void onListen(), void onCancel())
: super(onListen, onCancel);
// EventDispatch interface.
+ bool get _mayAddEvent => super._mayAddEvent && !_isFiring;
+
+ _addEventError() {
+ if (_isFiring) {
+ return new StateError(
+ "Cannot fire new event. Controller is already firing an event");
+ }
+ return super._addEventError();
+ }
+
void _sendData(T data) {
if (_isEmpty) return;
if (_hasOneListener) {
_state |= _BroadcastStreamController._STATE_FIRING;
- _BroadcastSubscription subscription = _next;
+ _BroadcastSubscription<T> subscription = _firstSubscription;
subscription._add(data);
_state &= ~_BroadcastStreamController._STATE_FIRING;
if (_isEmpty) {
@@ -368,7 +404,7 @@ class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
void _sendDone() {
if (!_isEmpty) {
- _forEachListener((_BroadcastSubscription<T> subscription) {
+ _forEachListener((_BufferingStreamSubscription<T> subscription) {
subscription._close();
});
} else {
@@ -386,29 +422,26 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
// EventDispatch interface.
void _sendData(T data) {
- for (_BroadcastSubscriptionLink link = _next;
- !identical(link, this);
- link = link._next) {
- _BroadcastSubscription<T> subscription = link;
- subscription._addPending(new _DelayedData(data));
+ for (_BroadcastSubscription<T> subscription = _firstSubscription;
+ subscription != null;
+ subscription = subscription._next) {
+ subscription._addPending(new _DelayedData<T>(data));
}
}
void _sendError(Object error, StackTrace stackTrace) {
- for (_BroadcastSubscriptionLink link = _next;
- !identical(link, this);
- link = link._next) {
- _BroadcastSubscription<T> subscription = link;
+ for (_BroadcastSubscription<T> subscription = _firstSubscription;
+ subscription != null;
+ subscription = subscription._next) {
subscription._addPending(new _DelayedError(error, stackTrace));
}
}
void _sendDone() {
if (!_isEmpty) {
- for (_BroadcastSubscriptionLink link = _next;
- !identical(link, this);
- link = link._next) {
- _BroadcastSubscription<T> subscription = link;
+ for (_BroadcastSubscription<T> subscription = _firstSubscription;
+ subscription != null;
+ subscription = subscription._next) {
subscription._addPending(const _DelayedDone());
}
} else {
@@ -433,7 +466,7 @@ class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
class _AsBroadcastStreamController<T>
extends _SyncBroadcastStreamController<T>
implements _EventDispatch<T> {
- _StreamImplEvents _pending;
+ _StreamImplEvents<T> _pending;
_AsBroadcastStreamController(void onListen(), void onCancel())
: super(onListen, onCancel);
@@ -442,7 +475,7 @@ class _AsBroadcastStreamController<T>
void _addPendingEvent(_DelayedEvent event) {
if (_pending == null) {
- _pending = new _StreamImplEvents();
+ _pending = new _StreamImplEvents<T>();
}
_pending.add(event);
}
@@ -507,5 +540,5 @@ class _DoneSubscription<T> implements StreamSubscription<T> {
}
Future cancel() { return new _Future.immediate(null); }
bool get isPaused => _pauseCount > 0;
- Future asFuture([Object value]) => new _Future();
+ Future/*<E>*/ asFuture/*<E>*/([Object/*=E*/ value]) => new _Future/*<E>*/();
}
« no previous file with comments | « tool/input_sdk/lib/async/async_error.dart ('k') | tool/input_sdk/lib/async/future.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698