Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(227)

Unified Diff: test/codegen/expect/dart/async.js

Issue 1016003003: sort classes in dependency order, or load lazily if needed, fixes #78 (Closed) Base URL: git@github.com:dart-lang/dev_compiler.git@master
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: test/codegen/expect/dart/async.js
diff --git a/test/codegen/expect/dart/async.js b/test/codegen/expect/dart/async.js
index 64113b19d2b79119295dbae9389742bac628eb7d..1505f152ad355aff5554190e932ddbe801e2da94 100644
--- a/test/codegen/expect/dart/async.js
+++ b/test/codegen/expect/dart/async.js
@@ -18,6 +18,15 @@ var async;
}
}
let _getBestStackTrace = Symbol('_getBestStackTrace');
+ class AsyncError extends core.Object {
+ AsyncError(error, stackTrace) {
+ this.error = error;
+ this.stackTrace = stackTrace;
+ }
+ toString() {
+ return dart.as(dart.dinvoke(this.error, 'toString'), core.String);
+ }
+ }
class _UncaughtAsyncError extends AsyncError {
_UncaughtAsyncError(error, stackTrace) {
super.AsyncError(error, _getBestStackTrace(error, stackTrace));
@@ -38,2970 +47,2970 @@ var async;
return result;
}
}
- let _BroadcastStream$ = dart.generic(function(T) {
- class _BroadcastStream extends _ControllerStream$(T) {
- _BroadcastStream(controller) {
- super._ControllerStream(dart.as(controller, _StreamControllerLifecycle$(T)));
- }
- get isBroadcast() {
- return true;
- }
- }
- return _BroadcastStream;
- });
- let _BroadcastStream = _BroadcastStream$(dart.dynamic);
- let _next = Symbol('_next');
- let _previous = Symbol('_previous');
- class _BroadcastSubscriptionLink extends core.Object {
- _BroadcastSubscriptionLink() {
- this[_next] = null;
- this[_previous] = null;
- }
- }
- let _eventState = Symbol('_eventState');
let _controller = Symbol('_controller');
- let _expectsEvent = Symbol('_expectsEvent');
- let _toggleEventId = Symbol('_toggleEventId');
- let _isFiring = Symbol('_isFiring');
- let _setRemoveAfterFiring = Symbol('_setRemoveAfterFiring');
- let _removeAfterFiring = Symbol('_removeAfterFiring');
- let _onPause = Symbol('_onPause');
- let _onResume = Symbol('_onResume');
- let _BroadcastSubscription$ = dart.generic(function(T) {
- class _BroadcastSubscription extends _ControllerSubscription$(T) {
- _BroadcastSubscription(controller, onData, onError, onDone, cancelOnError) {
- this[_eventState] = null;
- this[_next] = null;
- this[_previous] = null;
- super._ControllerSubscription(dart.as(controller, _StreamControllerLifecycle$(T)), onData, onError, onDone, cancelOnError);
- this[_next] = this[_previous] = this;
- }
- get [_controller]() {
- return dart.as(super[_controller], _BroadcastStreamController);
- }
- [_expectsEvent](eventId) {
- return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_EVENT_ID)) === eventId;
- }
- [_toggleEventId]() {
- this[_eventState] = _BroadcastSubscription._STATE_EVENT_ID;
- }
- get [_isFiring]() {
- return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_FIRING)) !== 0;
- }
- [_setRemoveAfterFiring]() {
- dart.assert(this[_isFiring]);
- this[_eventState] = _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
- }
- get [_removeAfterFiring]() {
- return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_REMOVE_AFTER_FIRING)) !== 0;
- }
- [_onPause]() {}
- [_onResume]() {}
- }
- _BroadcastSubscription._STATE_EVENT_ID = 1;
- _BroadcastSubscription._STATE_FIRING = 2;
- _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING = 4;
- return _BroadcastSubscription;
- });
- let _BroadcastSubscription = _BroadcastSubscription$(dart.dynamic);
+ let _createSubscription = Symbol('_createSubscription');
let _onListen = Symbol('_onListen');
- let _onCancel = Symbol('_onCancel');
- let _state = Symbol('_state');
- let _addStreamState = Symbol('_addStreamState');
- let _doneFuture = Symbol('_doneFuture');
- let _isEmpty = Symbol('_isEmpty');
- let _hasOneListener = Symbol('_hasOneListener');
- let _isAddingStream = Symbol('_isAddingStream');
- let _mayAddEvent = Symbol('_mayAddEvent');
- let _ensureDoneFuture = Symbol('_ensureDoneFuture');
- let _addListener = Symbol('_addListener');
- let _removeListener = Symbol('_removeListener');
- let _subscribe = Symbol('_subscribe');
- let _recordCancel = Symbol('_recordCancel');
- let _callOnCancel = Symbol('_callOnCancel');
- let _recordPause = Symbol('_recordPause');
- let _recordResume = Symbol('_recordResume');
- let _addEventError = Symbol('_addEventError');
- let _sendData = Symbol('_sendData');
- let _sendError = Symbol('_sendError');
- let _sendDone = Symbol('_sendDone');
- let _add = Symbol('_add');
let _addError = Symbol('_addError');
- let _close = Symbol('_close');
- let _forEachListener = Symbol('_forEachListener');
- let _STATE_FIRING = Symbol('_STATE_FIRING');
- let _mayComplete = Symbol('_mayComplete');
- let _BroadcastStreamController$ = dart.generic(function(T) {
- class _BroadcastStreamController extends core.Object {
- _BroadcastStreamController($_onListen, $_onCancel) {
- this[_onListen] = $_onListen;
- this[_onCancel] = $_onCancel;
- this[_state] = _BroadcastStreamController._STATE_INITIAL;
- this[_next] = null;
- this[_previous] = null;
- this[_addStreamState] = null;
- this[_doneFuture] = null;
- this[_next] = this[_previous] = this;
- }
- get stream() {
- return new _BroadcastStream(this);
- }
- get sink() {
- return new _StreamSinkWrapper(this);
+ let _completeError = Symbol('_completeError');
+ let _complete = Symbol('_complete');
+ let _sink = Symbol('_sink');
+ let Stream$ = dart.generic(function(T) {
+ class Stream extends core.Object {
+ Stream() {
}
- get isClosed() {
- return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_CLOSED)) !== 0;
+ Stream$fromFuture(future) {
+ let controller = dart.as(new StreamController({sync: true}), _StreamController$(T));
+ future.then(((value) => {
+ controller._add(dart.as(value, T));
+ controller._closeUnchecked();
+ }).bind(this), {onError: ((error, stackTrace) => {
+ controller._addError(error, dart.as(stackTrace, core.StackTrace));
+ controller._closeUnchecked();
+ }).bind(this)});
+ return controller.stream;
}
- get isPaused() {
- return false;
+ Stream$fromIterable(data) {
+ return new _GeneratedStreamImpl(() => new _IterablePendingEvents(data));
}
- get hasListener() {
- return !dart.notNull(this[_isEmpty]);
+ Stream$periodic(period, computation) {
+ if (computation === void 0)
+ computation = null;
+ if (computation === null)
+ computation = (i) => null;
+ let timer = null;
+ let computationCount = 0;
+ let controller = null;
+ let watch = new core.Stopwatch();
+ // Function sendEvent: () → void
+ function sendEvent() {
+ watch.reset();
+ let data = computation((($tmp) => computationCount = dart.notNull($tmp) + 1, $tmp)(computationCount));
+ controller.add(data);
+ }
+ // Function startPeriodicTimer: () → void
+ function startPeriodicTimer() {
+ dart.assert(timer === null);
+ timer = new Timer.periodic(period, (timer) => {
+ sendEvent();
+ });
+ }
+ controller = new StreamController({sync: true, onListen: (() => {
+ watch.start();
+ startPeriodicTimer();
+ }).bind(this), onPause: (() => {
+ timer.cancel();
+ timer = null;
+ watch.stop();
+ }).bind(this), onResume: (() => {
+ dart.assert(timer === null);
+ let elapsed = watch.elapsed;
+ watch.start();
+ timer = new Timer(period['-'](elapsed), () => {
+ timer = null;
+ startPeriodicTimer();
+ sendEvent();
+ });
+ }).bind(this), onCancel: (() => {
+ if (timer !== null)
+ timer.cancel();
+ timer = null;
+ }).bind(this)});
+ return controller.stream;
}
- get [_hasOneListener]() {
- dart.assert(!dart.notNull(this[_isEmpty]));
- return core.identical(this[_next][_next], this);
+ Stream$eventTransformed(source, mapSink) {
+ return dart.as(new _BoundSinkStream(source, dart.as(mapSink, _SinkMapper)), Stream$(T));
}
- get [_isFiring]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_FIRING)) !== 0;
+ get isBroadcast() {
+ return false;
}
- get [_isAddingStream]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM)) !== 0;
+ asBroadcastStream(opt$) {
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
+ return new _AsBroadcastStream(this, dart.as(onListen, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void")), dart.as(onCancel, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void")));
}
- get [_mayAddEvent]() {
- return dart.notNull(this[_state]) < dart.notNull(_BroadcastStreamController._STATE_CLOSED);
+ where(test) {
+ return new _WhereStream(this, test);
}
- [_ensureDoneFuture]() {
- if (this[_doneFuture] !== null)
- return this[_doneFuture];
- return this[_doneFuture] = new _Future();
+ map(convert) {
+ return new _MapStream(this, convert);
}
- get [_isEmpty]() {
- return core.identical(this[_next], this);
+ asyncMap(convert) {
+ let controller = null;
+ let subscription = null;
+ // Function onListen: () → void
+ function onListen() {
+ let add = controller.add;
+ dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController)));
+ let eventSink = controller;
+ let addError = eventSink[_addError];
+ subscription = this.listen(((event) => {
+ let newValue = null;
+ try {
+ newValue = convert(event);
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ controller.addError(e, s);
+ return;
+ }
+
+ if (dart.is(newValue, Future)) {
+ subscription.pause();
+ dart.dinvoke(dart.dinvoke(newValue, 'then', add, {onError: addError}), 'whenComplete', subscription.resume);
+ } else {
+ controller.add(newValue);
+ }
+ }).bind(this), {onError: dart.as(addError, core.Function), onDone: controller.close});
+ }
+ if (this.isBroadcast) {
+ controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => {
+ subscription.cancel();
+ }).bind(this), sync: true});
+ } else {
+ controller = new StreamController({onListen: onListen, onPause: (() => {
+ subscription.pause();
+ }).bind(this), onResume: (() => {
+ subscription.resume();
+ }).bind(this), onCancel: (() => {
+ subscription.cancel();
+ }).bind(this), sync: true});
+ }
+ return controller.stream;
}
- [_addListener](subscription) {
- dart.assert(core.identical(subscription[_next], subscription));
- subscription[_previous] = this[_previous];
- subscription[_next] = this;
- this[_previous][_next] = subscription;
- this[_previous] = subscription;
- subscription[_eventState] = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID);
- }
- [_removeListener](subscription) {
- dart.assert(core.identical(subscription[_controller], this));
- dart.assert(!dart.notNull(core.identical(subscription[_next], subscription)));
- let previous = subscription[_previous];
- let next = subscription[_next];
- previous[_next] = next;
- next[_previous] = previous;
- subscription[_next] = subscription[_previous] = subscription;
- }
- [_subscribe](onData, onError, onDone, cancelOnError) {
- if (this.isClosed) {
- if (onDone === null)
- onDone = _nullDoneHandler;
- return new _DoneStreamSubscription(onDone);
- }
- let subscription = new _BroadcastSubscription(this, onData, onError, onDone, cancelOnError);
- this[_addListener](dart.as(subscription, _BroadcastSubscription$(T)));
- if (core.identical(this[_next], this[_previous])) {
- _runGuarded(this[_onListen]);
+ asyncExpand(convert) {
+ let controller = null;
+ let subscription = null;
+ // Function onListen: () → void
+ function onListen() {
+ dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController)));
+ let eventSink = controller;
+ subscription = this.listen(((event) => {
+ let newStream = null;
+ try {
+ newStream = convert(event);
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ controller.addError(e, s);
+ return;
+ }
+
+ if (newStream !== null) {
+ subscription.pause();
+ controller.addStream(newStream).whenComplete(subscription.resume);
+ }
+ }).bind(this), {onError: dart.as(eventSink[_addError], core.Function), onDone: controller.close});
}
- return dart.as(subscription, StreamSubscription$(T));
- }
- [_recordCancel](subscription) {
- if (core.identical(subscription[_next], subscription))
- return null;
- dart.assert(!dart.notNull(core.identical(subscription[_next], subscription)));
- if (subscription[_isFiring]) {
- subscription._setRemoveAfterFiring();
+ if (this.isBroadcast) {
+ controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => {
+ subscription.cancel();
+ }).bind(this), sync: true});
} else {
- dart.assert(!dart.notNull(core.identical(subscription[_next], subscription)));
- this[_removeListener](subscription);
- if (!dart.notNull(this[_isFiring]) && dart.notNull(this[_isEmpty])) {
- this[_callOnCancel]();
- }
+ controller = new StreamController({onListen: onListen, onPause: (() => {
+ subscription.pause();
+ }).bind(this), onResume: (() => {
+ subscription.resume();
+ }).bind(this), onCancel: (() => {
+ subscription.cancel();
+ }).bind(this), sync: true});
}
- return null;
+ return controller.stream;
}
- [_recordPause](subscription) {}
- [_recordResume](subscription) {}
- [_addEventError]() {
- if (this.isClosed) {
- return new core.StateError("Cannot add new events after calling close");
- }
- dart.assert(this[_isAddingStream]);
- return new core.StateError("Cannot add new events while doing an addStream");
+ handleError(onError, opt$) {
+ let test = opt$.test === void 0 ? null : opt$.test;
+ return new _HandleErrorStream(this, onError, test);
}
- add(data) {
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_addEventError]();
- this[_sendData](data);
+ expand(convert) {
+ return new _ExpandStream(this, convert);
}
- addError(error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- error = _nonNullError(error);
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_addEventError]();
- let replacement = Zone.current.errorCallback(error, stackTrace);
- if (replacement !== null) {
- error = _nonNullError(replacement.error);
- stackTrace = replacement.stackTrace;
- }
- this[_sendError](error, stackTrace);
+ pipe(streamConsumer) {
+ return streamConsumer.addStream(this).then(((_) => streamConsumer.close()).bind(this));
}
- close() {
- if (this.isClosed) {
- dart.assert(this[_doneFuture] !== null);
- return this[_doneFuture];
- }
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_addEventError]();
- this[_state] = _BroadcastStreamController._STATE_CLOSED;
- let doneFuture = this[_ensureDoneFuture]();
- this[_sendDone]();
- return doneFuture;
+ transform(streamTransformer) {
+ return streamTransformer.bind(this);
}
- get done() {
- return this[_ensureDoneFuture]();
+ reduce(combine) {
+ let result = new _Future();
+ let seenFirst = false;
+ let value = null;
+ let subscription = null;
+ subscription = this.listen((element) => {
+ if (seenFirst) {
+ _runUserCode(() => combine(value, element), dart.as((newValue) => {
+ value = newValue;
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ } else {
+ value = element;
+ seenFirst = true;
+ }
+ }, {onError: result[_completeError], onDone: (() => {
+ if (!dart.notNull(seenFirst)) {
+ try {
+ throw _internal.IterableElementError.noElement();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(result, e, s);
+ }
+
+ } else {
+ result._complete(value);
+ }
+ }).bind(this), cancelOnError: true});
+ return result;
}
- addStream(stream, opt$) {
- let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError;
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_addEventError]();
- this[_state] = _BroadcastStreamController._STATE_ADDSTREAM;
- this[_addStreamState] = dart.as(new _AddStreamState(this, stream, cancelOnError), _AddStreamState$(T));
- return this[_addStreamState].addStreamFuture;
+ fold(initialValue, combine) {
+ let result = new _Future();
+ let value = initialValue;
+ let subscription = null;
+ subscription = this.listen((element) => {
+ _runUserCode(() => combine(value, element), (newValue) => {
+ value = newValue;
+ }, dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: ((e, st) => {
+ result._completeError(e, dart.as(st, core.StackTrace));
+ }).bind(this), onDone: (() => {
+ result._complete(value);
+ }).bind(this), cancelOnError: true});
+ return result;
}
- [_add](data) {
- this[_sendData](data);
+ join(separator) {
+ if (separator === void 0)
+ separator = "";
+ let result = new _Future();
+ let buffer = new core.StringBuffer();
+ let subscription = null;
+ let first = true;
+ subscription = this.listen(((element) => {
+ if (!dart.notNull(first)) {
+ buffer.write(separator);
+ }
+ first = false;
+ try {
+ buffer.write(element);
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _cancelAndErrorWithReplacement(subscription, result, e, s);
+ }
+
+ }).bind(this), {onError: ((e) => {
+ result._completeError(e);
+ }).bind(this), onDone: (() => {
+ result._complete(buffer.toString());
+ }).bind(this), cancelOnError: true});
+ return result;
}
- [_addError](error, stackTrace) {
- this[_sendError](error, stackTrace);
+ contains(needle) {
+ let future = new _Future();
+ let subscription = null;
+ subscription = this.listen((element) => {
+ _runUserCode(() => dart.equals(element, needle), dart.as((isMatch) => {
+ if (isMatch) {
+ _cancelAndValue(subscription, future, true);
+ }
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: future[_completeError], onDone: (() => {
+ future._complete(false);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- [_close]() {
- dart.assert(this[_isAddingStream]);
- let addState = this[_addStreamState];
- this[_addStreamState] = null;
- this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM);
- addState.complete();
+ forEach(action) {
+ let future = new _Future();
+ let subscription = null;
+ subscription = this.listen((element) => {
+ _runUserCode(() => action(element), (_) => {
+ }, dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: future[_completeError], onDone: (() => {
+ future._complete(null);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- [_forEachListener](action) {
- if (this[_isFiring]) {
- throw new core.StateError("Cannot fire new event. Controller is already firing an event");
- }
- if (this[_isEmpty])
- return;
- let id = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID);
- this[_state] = dart.notNull(_BroadcastStreamController._STATE_EVENT_ID) | dart.notNull(_BroadcastStreamController._STATE_FIRING);
- let link = this[_next];
- while (!dart.notNull(core.identical(link, this))) {
- let subscription = dart.as(link, _BroadcastSubscription$(T));
- if (subscription._expectsEvent(id)) {
- subscription[_eventState] = _BroadcastSubscription[_STATE_FIRING];
- action(subscription);
- subscription._toggleEventId();
- link = subscription[_next];
- if (subscription[_removeAfterFiring]) {
- this[_removeListener](subscription);
+ every(test) {
+ let future = new _Future();
+ let subscription = null;
+ subscription = this.listen((element) => {
+ _runUserCode(() => test(element), dart.as((isMatch) => {
+ if (!dart.notNull(isMatch)) {
+ _cancelAndValue(subscription, future, false);
}
- subscription[_eventState] = ~dart.notNull(_BroadcastSubscription[_STATE_FIRING]);
- } else {
- link = subscription[_next];
- }
- }
- this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_FIRING);
- if (this[_isEmpty]) {
- this[_callOnCancel]();
- }
- }
- [_callOnCancel]() {
- dart.assert(this[_isEmpty]);
- if (dart.notNull(this.isClosed) && dart.notNull(this[_doneFuture][_mayComplete])) {
- this[_doneFuture]._asyncComplete(null);
- }
- _runGuarded(this[_onCancel]);
- }
- }
- _BroadcastStreamController._STATE_INITIAL = 0;
- _BroadcastStreamController._STATE_EVENT_ID = 1;
- _BroadcastStreamController._STATE_FIRING = 2;
- _BroadcastStreamController._STATE_CLOSED = 4;
- _BroadcastStreamController._STATE_ADDSTREAM = 8;
- return _BroadcastStreamController;
- });
- let _BroadcastStreamController = _BroadcastStreamController$(dart.dynamic);
- let _SyncBroadcastStreamController$ = dart.generic(function(T) {
- class _SyncBroadcastStreamController extends _BroadcastStreamController$(T) {
- _SyncBroadcastStreamController(onListen, onCancel) {
- super._BroadcastStreamController(onListen, onCancel);
- }
- [_sendData](data) {
- if (this[_isEmpty])
- return;
- if (this[_hasOneListener]) {
- this[_state] = _BroadcastStreamController[_STATE_FIRING];
- let subscription = dart.as(this[_next], _BroadcastSubscription);
- subscription._add(data);
- this[_state] = ~dart.notNull(_BroadcastStreamController[_STATE_FIRING]);
- if (this[_isEmpty]) {
- this[_callOnCancel]();
- }
- return;
- }
- this[_forEachListener](((subscription) => {
- subscription._add(data);
- }).bind(this));
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: future[_completeError], onDone: (() => {
+ future._complete(true);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- [_sendError](error, stackTrace) {
- if (this[_isEmpty])
- return;
- this[_forEachListener](((subscription) => {
- subscription._addError(error, stackTrace);
- }).bind(this));
+ any(test) {
+ let future = new _Future();
+ let subscription = null;
+ subscription = this.listen((element) => {
+ _runUserCode(() => test(element), dart.as((isMatch) => {
+ if (isMatch) {
+ _cancelAndValue(subscription, future, true);
+ }
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: future[_completeError], onDone: (() => {
+ future._complete(false);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- [_sendDone]() {
- if (!dart.notNull(this[_isEmpty])) {
- this[_forEachListener](dart.as(((subscription) => {
- subscription._close();
- }).bind(this), dart.throw_("Unimplemented type (_BufferingStreamSubscription<T>) → void")));
- } else {
- dart.assert(this[_doneFuture] !== null);
- dart.assert(this[_doneFuture][_mayComplete]);
- this[_doneFuture]._asyncComplete(null);
- }
+ get length() {
+ let future = new _Future();
+ let count = 0;
+ this.listen((_) => {
+ count = dart.notNull(count) + 1;
+ }, {onError: future[_completeError], onDone: (() => {
+ future._complete(count);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- }
- return _SyncBroadcastStreamController;
- });
- let _SyncBroadcastStreamController = _SyncBroadcastStreamController$(dart.dynamic);
- let _AsyncBroadcastStreamController$ = dart.generic(function(T) {
- class _AsyncBroadcastStreamController extends _BroadcastStreamController$(T) {
- _AsyncBroadcastStreamController(onListen, onCancel) {
- super._BroadcastStreamController(onListen, onCancel);
+ get isEmpty() {
+ let future = new _Future();
+ let subscription = null;
+ subscription = this.listen((_) => {
+ _cancelAndValue(subscription, future, false);
+ }, {onError: future[_completeError], onDone: (() => {
+ future._complete(true);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- [_sendData](data) {
- for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) {
- let subscription = dart.as(link, _BroadcastSubscription$(T));
- subscription._addPending(new _DelayedData(data));
- }
+ toList() {
+ let result = new List.from([]);
+ let future = new _Future();
+ this.listen(((data) => {
+ result.add(data);
+ }).bind(this), {onError: future[_completeError], onDone: (() => {
+ future._complete(result);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- [_sendError](error, stackTrace) {
- for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) {
- let subscription = dart.as(link, _BroadcastSubscription$(T));
- subscription._addPending(new _DelayedError(error, stackTrace));
- }
+ toSet() {
+ let result = new core.Set();
+ let future = new _Future();
+ this.listen(((data) => {
+ result.add(data);
+ }).bind(this), {onError: future[_completeError], onDone: (() => {
+ future._complete(result);
+ }).bind(this), cancelOnError: true});
+ return future;
}
- [_sendDone]() {
- if (!dart.notNull(this[_isEmpty])) {
- for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) {
- let subscription = dart.as(link, _BroadcastSubscription$(T));
- subscription._addPending(new _DelayedDone());
- }
- } else {
- dart.assert(this[_doneFuture] !== null);
- dart.assert(this[_doneFuture][_mayComplete]);
- this[_doneFuture]._asyncComplete(null);
- }
+ drain(futureValue) {
+ if (futureValue === void 0)
+ futureValue = null;
+ return this.listen(null, {cancelOnError: true}).asFuture(futureValue);
}
- }
- return _AsyncBroadcastStreamController;
- });
- let _AsyncBroadcastStreamController = _AsyncBroadcastStreamController$(dart.dynamic);
- let _pending = Symbol('_pending');
- let _hasPending = Symbol('_hasPending');
- let _addPendingEvent = Symbol('_addPendingEvent');
- let _STATE_CLOSED = Symbol('_STATE_CLOSED');
- let _AsBroadcastStreamController$ = dart.generic(function(T) {
- class _AsBroadcastStreamController extends _SyncBroadcastStreamController$(T) {
- _AsBroadcastStreamController(onListen, onCancel) {
- this[_pending] = null;
- super._SyncBroadcastStreamController(onListen, onCancel);
+ take(count) {
+ return dart.as(new _TakeStream(this, count), Stream$(T));
}
- get [_hasPending]() {
- return dart.notNull(this[_pending] !== null) && !dart.notNull(this[_pending].isEmpty);
+ takeWhile(test) {
+ return dart.as(new _TakeWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T));
}
- [_addPendingEvent](event) {
- if (this[_pending] === null) {
- this[_pending] = new _StreamImplEvents();
- }
- this[_pending].add(event);
+ skip(count) {
+ return dart.as(new _SkipStream(this, count), Stream$(T));
}
- add(data) {
- if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) {
- this[_addPendingEvent](new _DelayedData(data));
- return;
- }
- super.add(data);
- while (this[_hasPending]) {
- this[_pending].handleNext(this);
- }
+ skipWhile(test) {
+ return dart.as(new _SkipWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T));
}
- addError(error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) {
- this[_addPendingEvent](new _DelayedError(error, stackTrace));
- return;
- }
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_addEventError]();
- this[_sendError](error, stackTrace);
- while (this[_hasPending]) {
- this[_pending].handleNext(this);
- }
+ distinct(equals) {
+ if (equals === void 0)
+ equals = null;
+ return dart.as(new _DistinctStream(this, dart.as(equals, dart.throw_("Unimplemented type (dynamic, dynamic) → bool"))), Stream$(T));
}
- close() {
- if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) {
- this[_addPendingEvent](new _DelayedDone());
- this[_state] = _BroadcastStreamController[_STATE_CLOSED];
- return super.done;
- }
- let result = super.close();
- dart.assert(!dart.notNull(this[_hasPending]));
- return result;
+ get first() {
+ let future = new _Future();
+ let subscription = null;
+ subscription = this.listen((value) => {
+ _cancelAndValue(subscription, future, value);
+ }, {
+ onError: future[_completeError],
+ onDone: () => {
+ try {
+ throw _internal.IterableElementError.noElement();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(future, e, s);
+ }
+
+ },
+ cancelOnError: true
+ });
+ return future;
}
- [_callOnCancel]() {
- if (this[_hasPending]) {
- this[_pending].clear();
- this[_pending] = null;
- }
- super._callOnCancel();
+ get last() {
+ let future = new _Future();
+ let result = null;
+ let foundResult = false;
+ let subscription = null;
+ subscription = this.listen((value) => {
+ foundResult = true;
+ result = value;
+ }, {onError: future[_completeError], onDone: (() => {
+ if (foundResult) {
+ future._complete(result);
+ return;
+ }
+ try {
+ throw _internal.IterableElementError.noElement();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(future, e, s);
+ }
+
+ }).bind(this), cancelOnError: true});
+ return future;
}
- }
- return _AsBroadcastStreamController;
- });
- let _AsBroadcastStreamController = _AsBroadcastStreamController$(dart.dynamic);
- let _pauseCount = Symbol('_pauseCount');
- let _resume = Symbol('_resume');
- let _DoneSubscription$ = dart.generic(function(T) {
- class _DoneSubscription extends core.Object {
- _DoneSubscription() {
- this[_pauseCount] = 0;
- }
- onData(handleData) {}
- onError(handleError) {}
- onDone(handleDone) {}
- pause(resumeSignal) {
- if (resumeSignal === void 0)
- resumeSignal = null;
- if (resumeSignal !== null)
- resumeSignal.then(this[_resume]);
- this[_pauseCount] = dart.notNull(this[_pauseCount]) + 1;
- }
- resume() {
- this[_resume](null);
- }
- [_resume](_) {
- if (dart.notNull(this[_pauseCount]) > 0)
- this[_pauseCount] = dart.notNull(this[_pauseCount]) - 1;
- }
- cancel() {
- return new _Future.immediate(null);
- }
- get isPaused() {
- return dart.notNull(this[_pauseCount]) > 0;
- }
- asFuture(value) {
- if (value === void 0)
- value = null;
- return new _Future();
- }
- }
- return _DoneSubscription;
- });
- let _DoneSubscription = _DoneSubscription$(dart.dynamic);
- class DeferredLibrary extends core.Object {
- DeferredLibrary(libraryName, opt$) {
- let uri = opt$.uri === void 0 ? null : opt$.uri;
- this.libraryName = libraryName;
- this.uri = uri;
- }
- load() {
- throw 'DeferredLibrary not supported. ' + 'please use the `import "lib.dart" deferred as lib` syntax.';
- }
- }
- let _s = Symbol('_s');
- class DeferredLoadException extends core.Object {
- DeferredLoadException($_s) {
- this[_s] = $_s;
- }
- toString() {
- return `DeferredLoadException: '${this[_s]}'`;
- }
- }
- let _completeError = Symbol('_completeError');
- let Future$ = dart.generic(function(T) {
- class Future extends core.Object {
- Future(computation) {
- let result = new _Future();
- Timer.run((() => {
- try {
- result._complete(computation());
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(result, e, s);
- }
+ get single() {
+ let future = new _Future();
+ let result = null;
+ let foundResult = false;
+ let subscription = null;
+ subscription = this.listen((value) => {
+ if (foundResult) {
+ try {
+ throw _internal.IterableElementError.tooMany();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _cancelAndErrorWithReplacement(subscription, future, e, s);
+ }
- }).bind(this));
- return dart.as(result, Future$(T));
- }
- Future$microtask(computation) {
- let result = new _Future();
- scheduleMicrotask((() => {
- try {
- result._complete(computation());
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(result, e, s);
+ return;
}
+ foundResult = true;
+ result = value;
+ }, {onError: future[_completeError], onDone: (() => {
+ if (foundResult) {
+ future._complete(result);
+ return;
+ }
+ try {
+ throw _internal.IterableElementError.noElement();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(future, e, s);
+ }
- }).bind(this));
- return dart.as(result, Future$(T));
+ }).bind(this), cancelOnError: true});
+ return future;
}
- Future$sync(computation) {
- try {
- let result = computation();
- return new Future.value(result);
- } catch (error) {
- let stackTrace = dart.stackTrace(error);
- return new Future.error(error, stackTrace);
- }
+ firstWhere(test, opt$) {
+ let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue;
+ let future = new _Future();
+ let subscription = null;
+ subscription = this.listen((value) => {
+ _runUserCode(() => test(value), dart.as((isMatch) => {
+ if (isMatch) {
+ _cancelAndValue(subscription, future, value);
+ }
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: future[_completeError], onDone: (() => {
+ if (defaultValue !== null) {
+ _runUserCode(defaultValue, future[_complete], future[_completeError]);
+ return;
+ }
+ try {
+ throw _internal.IterableElementError.noElement();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(future, e, s);
+ }
+ }).bind(this), cancelOnError: true});
+ return future;
}
- Future$value(value) {
- if (value === void 0)
- value = null;
- return new _Future.immediate(value);
- }
- Future$error(error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- error = _nonNullError(error);
- if (!dart.notNull(core.identical(Zone.current, _ROOT_ZONE))) {
- let replacement = Zone.current.errorCallback(error, stackTrace);
- if (replacement !== null) {
- error = _nonNullError(replacement.error);
- stackTrace = replacement.stackTrace;
- }
- }
- return new _Future.immediateError(error, stackTrace);
- }
- Future$delayed(duration, computation) {
- if (computation === void 0)
- computation = null;
- let result = new _Future();
- new Timer(duration, (() => {
- try {
- result._complete(computation === null ? null : computation());
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(result, e, s);
- }
+ lastWhere(test, opt$) {
+ let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue;
+ let future = new _Future();
+ let result = null;
+ let foundResult = false;
+ let subscription = null;
+ subscription = this.listen((value) => {
+ _runUserCode(() => true === test(value), dart.as((isMatch) => {
+ if (isMatch) {
+ foundResult = true;
+ result = value;
+ }
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: future[_completeError], onDone: (() => {
+ if (foundResult) {
+ future._complete(result);
+ return;
+ }
+ if (defaultValue !== null) {
+ _runUserCode(defaultValue, future[_complete], future[_completeError]);
+ return;
+ }
+ try {
+ throw _internal.IterableElementError.noElement();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(future, e, s);
+ }
- }).bind(this));
- return dart.as(result, Future$(T));
+ }).bind(this), cancelOnError: true});
+ return future;
}
- static wait(futures, opt$) {
- let eagerError = opt$.eagerError === void 0 ? false : opt$.eagerError;
- let cleanUp = opt$.cleanUp === void 0 ? null : opt$.cleanUp;
- let result = new _Future();
- let values = null;
- let remaining = 0;
- let error = null;
- let stackTrace = null;
- // Function handleError: (dynamic, dynamic) → void
- function handleError(theError, theStackTrace) {
- remaining = dart.notNull(remaining) - 1;
- if (values !== null) {
- if (cleanUp !== null) {
- for (let value of values) {
- if (value !== null) {
- new Future.sync(() => {
- cleanUp(value);
- });
+ singleWhere(test) {
+ let future = new _Future();
+ let result = null;
+ let foundResult = false;
+ let subscription = null;
+ subscription = this.listen((value) => {
+ _runUserCode(() => true === test(value), dart.as((isMatch) => {
+ if (isMatch) {
+ if (foundResult) {
+ try {
+ throw _internal.IterableElementError.tooMany();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _cancelAndErrorWithReplacement(subscription, future, e, s);
}
+
+ return;
}
+ foundResult = true;
+ result = value;
}
- values = null;
- if (remaining === 0 || dart.notNull(eagerError)) {
- result._completeError(theError, dart.as(theStackTrace, core.StackTrace));
- } else {
- error = theError;
- stackTrace = dart.as(theStackTrace, core.StackTrace);
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ }, {onError: future[_completeError], onDone: (() => {
+ if (foundResult) {
+ future._complete(result);
+ return;
}
- } else if (remaining === 0 && !dart.notNull(eagerError)) {
- result._completeError(error, stackTrace);
- }
- }
- for (let future of futures) {
- let pos = (($tmp) => remaining = dart.notNull($tmp) + 1, $tmp)(remaining);
- future.then(dart.as(((value) => {
- remaining = dart.notNull(remaining) - 1;
- if (values !== null) {
- values.set(pos, value);
- if (remaining === 0) {
- result._completeWithValue(values);
- }
- } else {
- if (dart.notNull(cleanUp !== null) && dart.notNull(value !== null)) {
- new Future.sync(() => {
- cleanUp(value);
- });
- }
- if (remaining === 0 && !dart.notNull(eagerError)) {
- result._completeError(error, stackTrace);
- }
+ try {
+ throw _internal.IterableElementError.noElement();
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(future, e, s);
}
- }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: handleError});
- }
- if (remaining === 0) {
- return dart.as(new Future.value(/* Unimplemented const */new List.from([])), Future$(core.List));
- }
- values = new core.List(remaining);
- return result;
+
+ }).bind(this), cancelOnError: true});
+ return future;
}
- static forEach(input, f) {
- let iterator = input.iterator;
- return doWhile((() => {
- if (!dart.notNull(iterator.moveNext()))
- return false;
- return new Future.sync((() => f(iterator.current)).bind(this)).then((_) => true);
- }).bind(this));
+ elementAt(index) {
+ if (dart.notNull(!(typeof index == number)) || dart.notNull(index) < 0)
+ throw new core.ArgumentError(index);
+ let future = new _Future();
+ let subscription = null;
+ let elementIndex = 0;
+ subscription = this.listen((value) => {
+ if (index === elementIndex) {
+ _cancelAndValue(subscription, future, value);
+ return;
+ }
+ elementIndex = 1;
+ }, {onError: future[_completeError], onDone: (() => {
+ future._completeError(new core.RangeError.index(index, this, "index", null, elementIndex));
+ }).bind(this), cancelOnError: true});
+ return future;
}
- static doWhile(f) {
- let doneSignal = new _Future();
- let nextIteration = null;
- nextIteration = Zone.current.bindUnaryCallback(dart.as(((keepGoing) => {
- if (keepGoing) {
- new Future.sync(f).then(dart.as(nextIteration, dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: doneSignal[_completeError]});
+ timeout(timeLimit, opt$) {
+ let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout;
+ let controller = null;
+ let subscription = null;
+ let timer = null;
+ let zone = null;
+ let timeout = null;
+ // Function onData: (T) → void
+ function onData(event) {
+ timer.cancel();
+ controller.add(event);
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
+ }
+ // Function onError: (dynamic, StackTrace) → void
+ function onError(error, stackTrace) {
+ timer.cancel();
+ dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController)));
+ let eventSink = controller;
+ dart.dinvoke(eventSink, '_addError', error, stackTrace);
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
+ }
+ // Function onDone: () → void
+ function onDone() {
+ timer.cancel();
+ controller.close();
+ }
+ // Function onListen: () → void
+ function onListen() {
+ zone = Zone.current;
+ if (onTimeout === null) {
+ timeout = (() => {
+ controller.addError(new TimeoutException("No stream event", timeLimit), null);
+ }).bind(this);
} else {
- doneSignal._complete(null);
+ onTimeout = zone.registerUnaryCallback(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic")));
+ let wrapper = new _ControllerEventSinkWrapper(null);
+ timeout = (() => {
+ wrapper[_sink] = controller;
+ zone.runUnaryGuarded(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic")), wrapper);
+ wrapper[_sink] = null;
+ }).bind(this);
}
- }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {runGuarded: true});
- dart.dinvokef(nextIteration, true);
- return doneSignal;
+ subscription = this.listen(onData, {onError: onError, onDone: onDone});
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
+ }
+ // Function onCancel: () → Future<dynamic>
+ function onCancel() {
+ timer.cancel();
+ let result = subscription.cancel();
+ subscription = null;
+ return result;
+ }
+ controller = this.isBroadcast ? new _SyncBroadcastStreamController(onListen, onCancel) : new _SyncStreamController(onListen, (() => {
+ timer.cancel();
+ subscription.pause();
+ }).bind(this), (() => {
+ subscription.resume();
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
+ }).bind(this), onCancel);
+ return controller.stream;
}
}
- dart.defineNamedConstructor(Future, 'microtask');
- dart.defineNamedConstructor(Future, 'sync');
- dart.defineNamedConstructor(Future, 'value');
- dart.defineNamedConstructor(Future, 'error');
- dart.defineNamedConstructor(Future, 'delayed');
- dart.defineLazyProperties(Future, {
- get _nullFuture() {
- return dart.as(new Future.value(null), _Future);
- }
- });
- return Future;
+ dart.defineNamedConstructor(Stream, 'fromFuture');
+ dart.defineNamedConstructor(Stream, 'fromIterable');
+ dart.defineNamedConstructor(Stream, 'periodic');
+ dart.defineNamedConstructor(Stream, 'eventTransformed');
+ return Stream;
});
- let Future = Future$(dart.dynamic);
- class TimeoutException extends core.Object {
- TimeoutException(message, duration) {
- if (duration === void 0)
- duration = null;
- this.message = message;
- this.duration = duration;
- }
- toString() {
- let result = "TimeoutException";
- if (this.duration !== null)
- result = `TimeoutException after ${this.duration}`;
- if (this.message !== null)
- result = `${result}: ${this.message}`;
- return result;
- }
- }
- let Completer$ = dart.generic(function(T) {
- class Completer extends core.Object {
- Completer() {
- return new _AsyncCompleter();
+ let Stream = Stream$(dart.dynamic);
+ let _StreamImpl$ = dart.generic(function(T) {
+ class _StreamImpl extends Stream$(T) {
+ listen(onData, opt$) {
+ let onError = opt$.onError === void 0 ? null : opt$.onError;
+ let onDone = opt$.onDone === void 0 ? null : opt$.onDone;
+ let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError;
+ cancelOnError = core.identical(true, cancelOnError);
+ let subscription = this[_createSubscription](onData, onError, onDone, cancelOnError);
+ this[_onListen](subscription);
+ return dart.as(subscription, StreamSubscription$(T));
}
- Completer$sync() {
- return new _SyncCompleter();
+ [_createSubscription](onData, onError, onDone, cancelOnError) {
+ return new _BufferingStreamSubscription(onData, onError, onDone, cancelOnError);
}
+ [_onListen](subscription) {}
}
- dart.defineNamedConstructor(Completer, 'sync');
- return Completer;
+ return _StreamImpl;
});
- let Completer = Completer$(dart.dynamic);
- // Function _completeWithErrorCallback: (_Future<dynamic>, dynamic, dynamic) → void
- function _completeWithErrorCallback(result, error, stackTrace) {
- let replacement = Zone.current.errorCallback(error, dart.as(stackTrace, core.StackTrace));
- if (replacement !== null) {
- error = _nonNullError(replacement.error);
- stackTrace = replacement.stackTrace;
- }
- result._completeError(error, dart.as(stackTrace, core.StackTrace));
- }
- // Function _nonNullError: (Object) → Object
- function _nonNullError(error) {
- return error !== null ? error : new core.NullThrownError();
- }
- let _Completer$ = dart.generic(function(T) {
- class _Completer extends core.Object {
- _Completer() {
- this.future = new _Future();
+ let _StreamImpl = _StreamImpl$(dart.dynamic);
+ let _ControllerStream$ = dart.generic(function(T) {
+ class _ControllerStream extends _StreamImpl$(T) {
+ _ControllerStream($_controller) {
+ this[_controller] = $_controller;
+ super._StreamImpl();
}
- completeError(error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- error = _nonNullError(error);
- if (!dart.notNull(this.future[_mayComplete]))
- throw new core.StateError("Future already completed");
- let replacement = Zone.current.errorCallback(error, stackTrace);
- if (replacement !== null) {
- error = _nonNullError(replacement.error);
- stackTrace = replacement.stackTrace;
- }
- this[_completeError](error, stackTrace);
+ [_createSubscription](onData, onError, onDone, cancelOnError) {
+ return this[_controller]._subscribe(onData, onError, onDone, cancelOnError);
}
- get isCompleted() {
- return !dart.notNull(this.future[_mayComplete]);
+ get hashCode() {
+ return dart.notNull(this[_controller].hashCode) ^ 892482866;
+ }
+ ['=='](other) {
+ if (core.identical(this, other))
+ return true;
+ if (!dart.is(other, _ControllerStream))
+ return false;
+ let otherStream = dart.as(other, _ControllerStream);
+ return core.identical(otherStream[_controller], this[_controller]);
}
}
- return _Completer;
+ return _ControllerStream;
});
- let _Completer = _Completer$(dart.dynamic);
- let _AsyncCompleter$ = dart.generic(function(T) {
- class _AsyncCompleter extends _Completer$(T) {
- complete(value) {
- if (value === void 0)
- value = null;
- if (!dart.notNull(this.future[_mayComplete]))
- throw new core.StateError("Future already completed");
- this.future._asyncComplete(value);
+ let _ControllerStream = _ControllerStream$(dart.dynamic);
+ let _BroadcastStream$ = dart.generic(function(T) {
+ class _BroadcastStream extends _ControllerStream$(T) {
+ _BroadcastStream(controller) {
+ super._ControllerStream(dart.as(controller, _StreamControllerLifecycle$(T)));
}
- [_completeError](error, stackTrace) {
- this.future._asyncCompleteError(error, stackTrace);
+ get isBroadcast() {
+ return true;
}
}
- return _AsyncCompleter;
+ return _BroadcastStream;
});
- let _AsyncCompleter = _AsyncCompleter$(dart.dynamic);
- let _SyncCompleter$ = dart.generic(function(T) {
- class _SyncCompleter extends _Completer$(T) {
- complete(value) {
- if (value === void 0)
- value = null;
- if (!dart.notNull(this.future[_mayComplete]))
- throw new core.StateError("Future already completed");
- this.future._complete(value);
- }
- [_completeError](error, stackTrace) {
- this.future._completeError(error, stackTrace);
- }
+ let _BroadcastStream = _BroadcastStream$(dart.dynamic);
+ let _next = Symbol('_next');
+ let _previous = Symbol('_previous');
+ class _BroadcastSubscriptionLink extends core.Object {
+ _BroadcastSubscriptionLink() {
+ this[_next] = null;
+ this[_previous] = null;
}
- return _SyncCompleter;
- });
- let _SyncCompleter = _SyncCompleter$(dart.dynamic);
- let _nextListener = Symbol('_nextListener');
+ }
+ let _eventState = Symbol('_eventState');
+ let _expectsEvent = Symbol('_expectsEvent');
+ let _toggleEventId = Symbol('_toggleEventId');
+ let _isFiring = Symbol('_isFiring');
+ let _setRemoveAfterFiring = Symbol('_setRemoveAfterFiring');
+ let _removeAfterFiring = Symbol('_removeAfterFiring');
+ let _onPause = Symbol('_onPause');
+ let _onResume = Symbol('_onResume');
+ let _onCancel = Symbol('_onCancel');
let _zone = Symbol('_zone');
- let _onValue = Symbol('_onValue');
+ let _state = Symbol('_state');
+ let _onData = Symbol('_onData');
let _onError = Symbol('_onError');
- let _errorTest = Symbol('_errorTest');
- let _whenCompleteAction = Symbol('_whenCompleteAction');
- class _FutureListener extends core.Object {
- _FutureListener$then(result, onValue, errorCallback) {
- this.result = result;
- this.callback = onValue;
- this.errorCallback = errorCallback;
- this.state = errorCallback === null ? _FutureListener.STATE_THEN : _FutureListener.STATE_THEN_ONERROR;
- this[_nextListener] = null;
- }
- _FutureListener$catchError(result, errorCallback, test) {
- this.result = result;
- this.errorCallback = errorCallback;
- this.callback = test;
- this.state = test === null ? _FutureListener.STATE_CATCHERROR : _FutureListener.STATE_CATCHERROR_TEST;
- this[_nextListener] = null;
- }
- _FutureListener$whenComplete(result, onComplete) {
- this.result = result;
- this.callback = onComplete;
- this.errorCallback = null;
- this.state = _FutureListener.STATE_WHENCOMPLETE;
- this[_nextListener] = null;
- }
- _FutureListener$chain(result) {
- this.result = result;
- this.callback = null;
- this.errorCallback = null;
- this.state = _FutureListener.STATE_CHAIN;
- this[_nextListener] = null;
- }
- get [_zone]() {
- return this.result[_zone];
- }
- get handlesValue() {
- return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_VALUE)) !== 0;
- }
- get handlesError() {
- return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_ERROR)) !== 0;
- }
- get hasErrorTest() {
- return this.state === _FutureListener.STATE_CATCHERROR_TEST;
- }
- get handlesComplete() {
- return this.state === _FutureListener.STATE_WHENCOMPLETE;
- }
- get [_onValue]() {
- dart.assert(this.handlesValue);
- return dart.as(this.callback, _FutureOnValue);
- }
- get [_onError]() {
- return this.errorCallback;
- }
- get [_errorTest]() {
- dart.assert(this.hasErrorTest);
- return dart.as(this.callback, _FutureErrorTest);
- }
- get [_whenCompleteAction]() {
- dart.assert(this.handlesComplete);
- return dart.as(this.callback, _FutureAction);
- }
- }
- dart.defineNamedConstructor(_FutureListener, 'then');
- dart.defineNamedConstructor(_FutureListener, 'catchError');
- dart.defineNamedConstructor(_FutureListener, 'whenComplete');
- dart.defineNamedConstructor(_FutureListener, 'chain');
- _FutureListener.MASK_VALUE = 1;
- _FutureListener.MASK_ERROR = 2;
- _FutureListener.MASK_TEST_ERROR = 4;
- _FutureListener.MASK_WHENCOMPLETE = 8;
- _FutureListener.STATE_CHAIN = 0;
- _FutureListener.STATE_THEN = _FutureListener.MASK_VALUE;
- _FutureListener.STATE_THEN_ONERROR = dart.notNull(_FutureListener.MASK_VALUE) | dart.notNull(_FutureListener.MASK_ERROR);
- _FutureListener.STATE_CATCHERROR = _FutureListener.MASK_ERROR;
- _FutureListener.STATE_CATCHERROR_TEST = dart.notNull(_FutureListener.MASK_ERROR) | dart.notNull(_FutureListener.MASK_TEST_ERROR);
- _FutureListener.STATE_WHENCOMPLETE = _FutureListener.MASK_WHENCOMPLETE;
- let _resultOrListeners = Symbol('_resultOrListeners');
- let _asyncComplete = Symbol('_asyncComplete');
- let _asyncCompleteError = Symbol('_asyncCompleteError');
- let _isChained = Symbol('_isChained');
- let _isComplete = Symbol('_isComplete');
- let _hasValue = Symbol('_hasValue');
- let _hasError = Symbol('_hasError');
- let _markPendingCompletion = Symbol('_markPendingCompletion');
- let _value = Symbol('_value');
- let _error = Symbol('_error');
- let _setValue = Symbol('_setValue');
- let _setErrorObject = Symbol('_setErrorObject');
- let _setError = Symbol('_setError');
- let _removeListeners = Symbol('_removeListeners');
- let _chainForeignFuture = Symbol('_chainForeignFuture');
- let _chainCoreFuture = Symbol('_chainCoreFuture');
- let _complete = Symbol('_complete');
- let _completeWithValue = Symbol('_completeWithValue');
- let _propagateToListeners = Symbol('_propagateToListeners');
- let _Future$ = dart.generic(function(T) {
- class _Future extends core.Object {
- _Future() {
+ let _onDone = Symbol('_onDone');
+ let _cancelFuture = Symbol('_cancelFuture');
+ let _pending = Symbol('_pending');
+ let _setPendingEvents = Symbol('_setPendingEvents');
+ let _extractPending = Symbol('_extractPending');
+ let _isCanceled = Symbol('_isCanceled');
+ let _isPaused = Symbol('_isPaused');
+ let _isInputPaused = Symbol('_isInputPaused');
+ let _inCallback = Symbol('_inCallback');
+ let _guardCallback = Symbol('_guardCallback');
+ let _decrementPauseCount = Symbol('_decrementPauseCount');
+ let _hasPending = Symbol('_hasPending');
+ let _mayResumeInput = Symbol('_mayResumeInput');
+ let _cancel = Symbol('_cancel');
+ let _isClosed = Symbol('_isClosed');
+ let _waitsForCancel = Symbol('_waitsForCancel');
+ let _canFire = Symbol('_canFire');
+ let _cancelOnError = Symbol('_cancelOnError');
+ let _incrementPauseCount = Symbol('_incrementPauseCount');
+ let _add = Symbol('_add');
+ let _sendData = Symbol('_sendData');
+ let _addPending = Symbol('_addPending');
+ let _sendError = Symbol('_sendError');
+ let _close = Symbol('_close');
+ let _sendDone = Symbol('_sendDone');
+ let _checkState = Symbol('_checkState');
+ let _BufferingStreamSubscription$ = dart.generic(function(T) {
+ class _BufferingStreamSubscription extends core.Object {
+ _BufferingStreamSubscription(onData, onError, onDone, cancelOnError) {
this[_zone] = Zone.current;
- this[_state] = _Future._INCOMPLETE;
- this[_resultOrListeners] = null;
+ this[_state] = cancelOnError ? _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR : 0;
+ this[_onData] = null;
+ this[_onError] = null;
+ this[_onDone] = null;
+ this[_cancelFuture] = null;
+ this[_pending] = null;
+ this.onData(onData);
+ this.onError(onError);
+ this.onDone(onDone);
}
- _Future$immediate(value) {
- this[_zone] = Zone.current;
- this[_state] = _Future._INCOMPLETE;
- this[_resultOrListeners] = null;
- this[_asyncComplete](value);
+ [_setPendingEvents](pendingEvents) {
+ dart.assert(this[_pending] === null);
+ if (pendingEvents === null)
+ return;
+ this[_pending] = pendingEvents;
+ if (!dart.notNull(pendingEvents.isEmpty)) {
+ this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING;
+ this[_pending].schedule(this);
+ }
}
- _Future$immediateError(error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- this[_zone] = Zone.current;
- this[_state] = _Future._INCOMPLETE;
- this[_resultOrListeners] = null;
- this[_asyncCompleteError](error, stackTrace);
+ [_extractPending]() {
+ dart.assert(this[_isCanceled]);
+ let events = this[_pending];
+ this[_pending] = null;
+ return events;
}
- get [_mayComplete]() {
- return this[_state] === _Future._INCOMPLETE;
+ onData(handleData) {
+ if (handleData === null)
+ handleData = _nullDataHandler;
+ this[_onData] = this[_zone].registerUnaryCallback(dart.as(handleData, dart.throw_("Unimplemented type (dynamic) → dynamic")));
}
- get [_isChained]() {
- return this[_state] === _Future._CHAINED;
+ onError(handleError) {
+ if (handleError === null)
+ handleError = _nullErrorHandler;
+ this[_onError] = _registerErrorHandler(handleError, this[_zone]);
}
- get [_isComplete]() {
- return dart.notNull(this[_state]) >= dart.notNull(_Future._VALUE);
+ onDone(handleDone) {
+ if (handleDone === null)
+ handleDone = _nullDoneHandler;
+ this[_onDone] = this[_zone].registerCallback(handleDone);
}
- get [_hasValue]() {
- return this[_state] === _Future._VALUE;
+ pause(resumeSignal) {
+ if (resumeSignal === void 0)
+ resumeSignal = null;
+ if (this[_isCanceled])
+ return;
+ let wasPaused = this[_isPaused];
+ let wasInputPaused = this[_isInputPaused];
+ this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
+ if (resumeSignal !== null)
+ resumeSignal.whenComplete(this.resume);
+ if (!dart.notNull(wasPaused) && dart.notNull(this[_pending] !== null))
+ this[_pending].cancelSchedule();
+ if (!dart.notNull(wasInputPaused) && !dart.notNull(this[_inCallback]))
+ this[_guardCallback](this[_onPause]);
}
- get [_hasError]() {
- return this[_state] === _Future._ERROR;
+ resume() {
+ if (this[_isCanceled])
+ return;
+ if (this[_isPaused]) {
+ this[_decrementPauseCount]();
+ if (!dart.notNull(this[_isPaused])) {
+ if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_pending].isEmpty)) {
+ this[_pending].schedule(this);
+ } else {
+ dart.assert(this[_mayResumeInput]);
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
+ if (!dart.notNull(this[_inCallback]))
+ this[_guardCallback](this[_onResume]);
+ }
+ }
+ }
}
- set [_isChained](value) {
- if (value) {
- dart.assert(!dart.notNull(this[_isComplete]));
- this[_state] = _Future._CHAINED;
- } else {
- dart.assert(this[_isChained]);
- this[_state] = _Future._INCOMPLETE;
- }
+ cancel() {
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL);
+ if (this[_isCanceled])
+ return this[_cancelFuture];
+ this[_cancel]();
+ return this[_cancelFuture];
}
- then(f, opt$) {
- let onError = opt$.onError === void 0 ? null : opt$.onError;
+ asFuture(futureValue) {
+ if (futureValue === void 0)
+ futureValue = null;
let result = new _Future();
- if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) {
- f = result[_zone].registerUnaryCallback(dart.as(f, dart.throw_("Unimplemented type (dynamic) → dynamic")));
- if (onError !== null) {
- onError = _registerErrorHandler(onError, result[_zone]);
- }
- }
- this[_addListener](new _FutureListener.then(result, dart.as(f, _FutureOnValue), onError));
+ this[_onDone] = (() => {
+ result._complete(futureValue);
+ }).bind(this);
+ this[_onError] = ((error, stackTrace) => {
+ this.cancel();
+ result._completeError(error, dart.as(stackTrace, core.StackTrace));
+ }).bind(this);
return result;
}
- catchError(onError, opt$) {
- let test = opt$.test === void 0 ? null : opt$.test;
- let result = new _Future();
- if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) {
- onError = _registerErrorHandler(onError, result[_zone]);
- if (test !== null)
- test = dart.as(result[_zone].registerUnaryCallback(test), dart.throw_("Unimplemented type (dynamic) → bool"));
- }
- this[_addListener](new _FutureListener.catchError(result, onError, test));
- return result;
+ get [_isInputPaused]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED)) !== 0;
}
- whenComplete(action) {
- let result = new _Future();
- if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) {
- action = result[_zone].registerCallback(action);
- }
- this[_addListener](new _FutureListener.whenComplete(result, action));
- return dart.as(result, Future$(T));
+ get [_isClosed]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CLOSED)) !== 0;
}
- asStream() {
- return dart.as(new Stream.fromFuture(this), Stream$(T));
+ get [_isCanceled]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCELED)) !== 0;
}
- [_markPendingCompletion]() {
- if (!dart.notNull(this[_mayComplete]))
- throw new core.StateError("Future already completed");
- this[_state] = _Future._PENDING_COMPLETE;
+ get [_waitsForCancel]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL)) !== 0;
}
- get [_value]() {
- dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasValue]));
- return dart.as(this[_resultOrListeners], T);
+ get [_inCallback]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK)) !== 0;
}
- get [_error]() {
- dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasError]));
- return dart.as(this[_resultOrListeners], AsyncError);
+ get [_hasPending]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING)) !== 0;
}
- [_setValue](value) {
- dart.assert(!dart.notNull(this[_isComplete]));
- this[_state] = _Future._VALUE;
- this[_resultOrListeners] = value;
+ get [_isPaused]() {
+ return dart.notNull(this[_state]) >= dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT);
}
- [_setErrorObject](error) {
- dart.assert(!dart.notNull(this[_isComplete]));
- this[_state] = _Future._ERROR;
- this[_resultOrListeners] = error;
+ get [_canFire]() {
+ return dart.notNull(this[_state]) < dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
}
- [_setError](error, stackTrace) {
- this[_setErrorObject](new AsyncError(error, stackTrace));
+ get [_mayResumeInput]() {
+ return !dart.notNull(this[_isPaused]) && (dart.notNull(this[_pending] === null) || dart.notNull(this[_pending].isEmpty));
}
- [_addListener](listener) {
- dart.assert(listener[_nextListener] === null);
- if (this[_isComplete]) {
- this[_zone].scheduleMicrotask((() => {
- _propagateToListeners(this, listener);
- }).bind(this));
- } else {
- listener[_nextListener] = dart.as(this[_resultOrListeners], _FutureListener);
- this[_resultOrListeners] = listener;
- }
+ get [_cancelOnError]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCEL_ON_ERROR)) !== 0;
}
- [_removeListeners]() {
- dart.assert(!dart.notNull(this[_isComplete]));
- let current = dart.as(this[_resultOrListeners], _FutureListener);
- this[_resultOrListeners] = null;
- let prev = null;
- while (current !== null) {
- let next = current[_nextListener];
- current[_nextListener] = prev;
- prev = current;
- current = next;
+ get isPaused() {
+ return this[_isPaused];
+ }
+ [_cancel]() {
+ this[_state] = _BufferingStreamSubscription._STATE_CANCELED;
+ if (this[_hasPending]) {
+ this[_pending].cancelSchedule();
}
- return prev;
+ if (!dart.notNull(this[_inCallback]))
+ this[_pending] = null;
+ this[_cancelFuture] = this[_onCancel]();
}
- static [_chainForeignFuture](source, target) {
- dart.assert(!dart.notNull(target[_isComplete]));
- dart.assert(!dart.is(source, _Future));
- target[_isChained] = true;
- source.then(((value) => {
- dart.assert(target[_isChained]);
- target._completeWithValue(value);
- }).bind(this), {onError: ((error, stackTrace) => {
- if (stackTrace === void 0)
- stackTrace = null;
- dart.assert(target[_isChained]);
- target._completeError(error, dart.as(stackTrace, core.StackTrace));
- }).bind(this)});
+ [_incrementPauseCount]() {
+ this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
}
- static [_chainCoreFuture](source, target) {
- dart.assert(!dart.notNull(target[_isComplete]));
- dart.assert(dart.is(source, _Future));
- target[_isChained] = true;
- let listener = new _FutureListener.chain(target);
- if (source[_isComplete]) {
- _propagateToListeners(source, listener);
+ [_decrementPauseCount]() {
+ dart.assert(this[_isPaused]);
+ this[_state] = _BufferingStreamSubscription._STATE_PAUSE_COUNT;
+ }
+ [_add](data) {
+ dart.assert(!dart.notNull(this[_isClosed]));
+ if (this[_isCanceled])
+ return;
+ if (this[_canFire]) {
+ this[_sendData](data);
} else {
- source._addListener(listener);
+ this[_addPending](new _DelayedData(data));
}
}
- [_complete](value) {
- dart.assert(!dart.notNull(this[_isComplete]));
- if (dart.is(value, Future)) {
- if (dart.is(value, _Future)) {
- _chainCoreFuture(dart.as(value, _Future), this);
- } else {
- _chainForeignFuture(dart.as(value, Future), this);
- }
+ [_addError](error, stackTrace) {
+ if (this[_isCanceled])
+ return;
+ if (this[_canFire]) {
+ this[_sendError](error, stackTrace);
} else {
- let listeners = this[_removeListeners]();
- this[_setValue](dart.as(value, T));
- _propagateToListeners(this, listeners);
+ this[_addPending](new _DelayedError(error, stackTrace));
}
}
- [_completeWithValue](value) {
- dart.assert(!dart.notNull(this[_isComplete]));
- dart.assert(!dart.is(value, Future));
- let listeners = this[_removeListeners]();
- this[_setValue](dart.as(value, T));
- _propagateToListeners(this, listeners);
- }
- [_completeError](error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- dart.assert(!dart.notNull(this[_isComplete]));
- let listeners = this[_removeListeners]();
- this[_setError](error, stackTrace);
- _propagateToListeners(this, listeners);
- }
- [_asyncComplete](value) {
- dart.assert(!dart.notNull(this[_isComplete]));
- if (value === null) {
- } else if (dart.is(value, Future)) {
- let typedFuture = dart.as(value, Future$(T));
- if (dart.is(typedFuture, _Future)) {
- let coreFuture = dart.as(typedFuture, _Future$(T));
- if (dart.notNull(coreFuture[_isComplete]) && dart.notNull(coreFuture[_hasError])) {
- this[_markPendingCompletion]();
- this[_zone].scheduleMicrotask((() => {
- _chainCoreFuture(coreFuture, this);
- }).bind(this));
- } else {
- _chainCoreFuture(coreFuture, this);
- }
- } else {
- _chainForeignFuture(typedFuture, this);
- }
+ [_close]() {
+ dart.assert(!dart.notNull(this[_isClosed]));
+ if (this[_isCanceled])
return;
+ this[_state] = _BufferingStreamSubscription._STATE_CLOSED;
+ if (this[_canFire]) {
+ this[_sendDone]();
} else {
- let typedValue = dart.as(value, T);
+ this[_addPending](new _DelayedDone());
}
- this[_markPendingCompletion]();
- this[_zone].scheduleMicrotask((() => {
- this[_completeWithValue](value);
- }).bind(this));
}
- [_asyncCompleteError](error, stackTrace) {
- dart.assert(!dart.notNull(this[_isComplete]));
- this[_markPendingCompletion]();
- this[_zone].scheduleMicrotask((() => {
- this[_completeError](error, stackTrace);
- }).bind(this));
+ [_onPause]() {
+ dart.assert(this[_isInputPaused]);
}
- static [_propagateToListeners](source, listeners) {
- while (true) {
- dart.assert(source[_isComplete]);
- let hasError = source[_hasError];
- if (listeners === null) {
- if (hasError) {
- let asyncError = source[_error];
- source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace);
- }
- return;
- }
- while (listeners[_nextListener] !== null) {
- let listener = listeners;
- listeners = listener[_nextListener];
- listener[_nextListener] = null;
- _propagateToListeners(source, listener);
+ [_onResume]() {
+ dart.assert(!dart.notNull(this[_isInputPaused]));
+ }
+ [_onCancel]() {
+ dart.assert(this[_isCanceled]);
+ return null;
+ }
+ [_addPending](event) {
+ let pending = dart.as(this[_pending], _StreamImplEvents);
+ if (this[_pending] === null)
+ pending = this[_pending] = new _StreamImplEvents();
+ pending.add(event);
+ if (!dart.notNull(this[_hasPending])) {
+ this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING;
+ if (!dart.notNull(this[_isPaused])) {
+ this[_pending].schedule(this);
}
- let listener = listeners;
- let listenerHasValue = true;
- let sourceValue = hasError ? null : source[_value];
- let listenerValueOrError = sourceValue;
- let isPropagationAborted = false;
- if (dart.notNull(hasError) || dart.notNull(listener.handlesValue) || dart.notNull(listener.handlesComplete)) {
- let zone = listener[_zone];
- if (dart.notNull(hasError) && !dart.notNull(source[_zone].inSameErrorZone(zone))) {
- let asyncError = source[_error];
- source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace);
- return;
- }
- let oldZone = null;
- if (!dart.notNull(core.identical(Zone.current, zone))) {
- oldZone = Zone._enter(zone);
- }
- // Function handleValueCallback: () → bool
- function handleValueCallback() {
- try {
- listenerValueOrError = zone.runUnary(listener[_onValue], sourceValue);
- return true;
- } catch (e) {
- let s = dart.stackTrace(e);
- listenerValueOrError = new AsyncError(e, s);
- return false;
- }
-
- }
- // Function handleError: () → void
- function handleError() {
- let asyncError = source[_error];
- let matchesTest = true;
- if (listener.hasErrorTest) {
- let test = listener[_errorTest];
- try {
- matchesTest = dart.as(zone.runUnary(test, asyncError.error), core.bool);
- } catch (e) {
- let s = dart.stackTrace(e);
- listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s);
- listenerHasValue = false;
- return;
- }
-
- }
- let errorCallback = listener[_onError];
- if (dart.notNull(matchesTest) && dart.notNull(errorCallback !== null)) {
- try {
- if (dart.is(errorCallback, ZoneBinaryCallback)) {
- listenerValueOrError = zone.runBinary(errorCallback, asyncError.error, asyncError.stackTrace);
- } else {
- listenerValueOrError = zone.runUnary(dart.as(errorCallback, dart.throw_("Unimplemented type (dynamic) → dynamic")), asyncError.error);
- }
- } catch (e) {
- let s = dart.stackTrace(e);
- listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s);
- listenerHasValue = false;
- return;
- }
-
- listenerHasValue = true;
- } else {
- listenerValueOrError = asyncError;
- listenerHasValue = false;
- }
- }
- // Function handleWhenCompleteCallback: () → void
- function handleWhenCompleteCallback() {
- let completeResult = null;
- try {
- completeResult = zone.run(listener[_whenCompleteAction]);
- } catch (e) {
- let s = dart.stackTrace(e);
- if (dart.notNull(hasError) && dart.notNull(core.identical(source[_error].error, e))) {
- listenerValueOrError = source[_error];
- } else {
- listenerValueOrError = new AsyncError(e, s);
- }
- listenerHasValue = false;
- return;
- }
-
- if (dart.is(completeResult, Future)) {
- let result = listener.result;
- result[_isChained] = true;
- isPropagationAborted = true;
- dart.dinvoke(completeResult, 'then', (ignored) => {
- _propagateToListeners(source, new _FutureListener.chain(result));
- }, {
- onError: (error, stackTrace) => {
- if (stackTrace === void 0)
- stackTrace = null;
- if (!dart.is(completeResult, _Future)) {
- completeResult = new _Future();
- dart.dinvoke(completeResult, '_setError', error, stackTrace);
- }
- _propagateToListeners(dart.as(completeResult, _Future), new _FutureListener.chain(result));
- }
- });
- }
- }
- if (!dart.notNull(hasError)) {
- if (listener.handlesValue) {
- listenerHasValue = handleValueCallback();
- }
- } else {
- handleError();
- }
- if (listener.handlesComplete) {
- handleWhenCompleteCallback();
- }
- if (oldZone !== null)
- Zone._leave(oldZone);
- if (isPropagationAborted)
- return;
- if (dart.notNull(listenerHasValue) && !dart.notNull(core.identical(sourceValue, listenerValueOrError)) && dart.notNull(dart.is(listenerValueOrError, Future))) {
- let chainSource = dart.as(listenerValueOrError, Future);
- let result = listener.result;
- if (dart.is(chainSource, _Future)) {
- if (chainSource[_isComplete]) {
- result[_isChained] = true;
- source = chainSource;
- listeners = new _FutureListener.chain(result);
- continue;
- } else {
- _chainCoreFuture(chainSource, result);
- }
- } else {
- _chainForeignFuture(chainSource, result);
- }
- return;
- }
+ }
+ }
+ [_sendData](data) {
+ dart.assert(!dart.notNull(this[_isCanceled]));
+ dart.assert(!dart.notNull(this[_isPaused]));
+ dart.assert(!dart.notNull(this[_inCallback]));
+ let wasInputPaused = this[_isInputPaused];
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
+ this[_zone].runUnaryGuarded(dart.as(this[_onData], dart.throw_("Unimplemented type (dynamic) → dynamic")), data);
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
+ this[_checkState](wasInputPaused);
+ }
+ [_sendError](error, stackTrace) {
+ dart.assert(!dart.notNull(this[_isCanceled]));
+ dart.assert(!dart.notNull(this[_isPaused]));
+ dart.assert(!dart.notNull(this[_inCallback]));
+ let wasInputPaused = this[_isInputPaused];
+ // Function sendError: () → void
+ function sendError() {
+ if (dart.notNull(this[_isCanceled]) && !dart.notNull(this[_waitsForCancel]))
+ return;
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
+ if (dart.is(this[_onError], ZoneBinaryCallback)) {
+ this[_zone].runBinaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic, dynamic) → dynamic")), error, stackTrace);
+ } else {
+ this[_zone].runUnaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic) → dynamic")), error);
}
- let result = listener.result;
- listeners = result._removeListeners();
- if (listenerHasValue) {
- result._setValue(listenerValueOrError);
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
+ }
+ if (this[_cancelOnError]) {
+ this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL;
+ this[_cancel]();
+ if (dart.is(this[_cancelFuture], Future)) {
+ this[_cancelFuture].whenComplete(sendError);
} else {
- let asyncError = dart.as(listenerValueOrError, AsyncError);
- result._setErrorObject(asyncError);
+ sendError();
}
- source = result;
+ } else {
+ sendError();
+ this[_checkState](wasInputPaused);
}
}
- timeout(timeLimit, opt$) {
- let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout;
- if (this[_isComplete])
- return new _Future.immediate(this);
- let result = new _Future();
- let timer = null;
- if (onTimeout === null) {
- timer = new Timer(timeLimit, (() => {
- result._completeError(new TimeoutException("Future not completed", timeLimit));
- }).bind(this));
+ [_sendDone]() {
+ dart.assert(!dart.notNull(this[_isCanceled]));
+ dart.assert(!dart.notNull(this[_isPaused]));
+ dart.assert(!dart.notNull(this[_inCallback]));
+ // Function sendDone: () → void
+ function sendDone() {
+ if (!dart.notNull(this[_waitsForCancel]))
+ return;
+ this[_state] = dart.notNull(_BufferingStreamSubscription._STATE_CANCELED) | dart.notNull(_BufferingStreamSubscription._STATE_CLOSED) | dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
+ this[_zone].runGuarded(this[_onDone]);
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
+ }
+ this[_cancel]();
+ this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL;
+ if (dart.is(this[_cancelFuture], Future)) {
+ this[_cancelFuture].whenComplete(sendDone);
} else {
- let zone = Zone.current;
- onTimeout = zone.registerCallback(onTimeout);
- timer = new Timer(timeLimit, (() => {
- try {
- result._complete(zone.run(onTimeout));
- } catch (e) {
- let s = dart.stackTrace(e);
- result._completeError(e, s);
- }
-
- }).bind(this));
+ sendDone();
}
- this.then(((v) => {
- if (timer.isActive) {
- timer.cancel();
- result._completeWithValue(v);
+ }
+ [_guardCallback](callback) {
+ dart.assert(!dart.notNull(this[_inCallback]));
+ let wasInputPaused = this[_isInputPaused];
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
+ dart.dinvokef(callback);
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
+ this[_checkState](wasInputPaused);
+ }
+ [_checkState](wasInputPaused) {
+ dart.assert(!dart.notNull(this[_inCallback]));
+ if (dart.notNull(this[_hasPending]) && dart.notNull(this[_pending].isEmpty)) {
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING);
+ if (dart.notNull(this[_isInputPaused]) && dart.notNull(this[_mayResumeInput])) {
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
}
- }).bind(this), {onError: ((e, s) => {
- if (timer.isActive) {
- timer.cancel();
- result._completeError(e, dart.as(s, core.StackTrace));
- }
- }).bind(this)});
- return result;
+ }
+ while (true) {
+ if (this[_isCanceled]) {
+ this[_pending] = null;
+ return;
+ }
+ let isInputPaused = this[_isInputPaused];
+ if (wasInputPaused === isInputPaused)
+ break;
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
+ if (isInputPaused) {
+ this[_onPause]();
+ } else {
+ this[_onResume]();
+ }
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
+ wasInputPaused = isInputPaused;
+ }
+ if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_isPaused])) {
+ this[_pending].schedule(this);
+ }
}
}
- dart.defineNamedConstructor(_Future, 'immediate');
- dart.defineNamedConstructor(_Future, 'immediateError');
- _Future._INCOMPLETE = 0;
- _Future._PENDING_COMPLETE = 1;
- _Future._CHAINED = 2;
- _Future._VALUE = 4;
- _Future._ERROR = 8;
- return _Future;
+ _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR = 1;
+ _BufferingStreamSubscription._STATE_CLOSED = 2;
+ _BufferingStreamSubscription._STATE_INPUT_PAUSED = 4;
+ _BufferingStreamSubscription._STATE_CANCELED = 8;
+ _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL = 16;
+ _BufferingStreamSubscription._STATE_IN_CALLBACK = 32;
+ _BufferingStreamSubscription._STATE_HAS_PENDING = 64;
+ _BufferingStreamSubscription._STATE_PAUSE_COUNT = 128;
+ _BufferingStreamSubscription._STATE_PAUSE_COUNT_SHIFT = 7;
+ return _BufferingStreamSubscription;
});
- let _Future = _Future$(dart.dynamic);
- class _AsyncCallbackEntry extends core.Object {
- _AsyncCallbackEntry(callback) {
- this.callback = callback;
- this.next = null;
- }
- }
- exports._nextCallback = null;
- exports._lastCallback = null;
- exports._lastPriorityCallback = null;
- exports._isInCallbackLoop = false;
- // Function _asyncRunCallbackLoop: () → void
- function _asyncRunCallbackLoop() {
- while (exports._nextCallback !== null) {
- exports._lastPriorityCallback = null;
- let entry = exports._nextCallback;
- exports._nextCallback = entry.next;
- if (exports._nextCallback === null)
- exports._lastCallback = null;
- entry.callback();
- }
- }
- // Function _asyncRunCallback: () → void
- function _asyncRunCallback() {
- exports._isInCallbackLoop = true;
- try {
- _asyncRunCallbackLoop();
- } finally {
- exports._lastPriorityCallback = null;
- exports._isInCallbackLoop = false;
- if (exports._nextCallback !== null)
- _AsyncRun._scheduleImmediate(_asyncRunCallback);
- }
- }
- // Function _scheduleAsyncCallback: (dynamic) → void
- function _scheduleAsyncCallback(callback) {
- if (exports._nextCallback === null) {
- exports._nextCallback = exports._lastCallback = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback));
- if (!dart.notNull(exports._isInCallbackLoop)) {
- _AsyncRun._scheduleImmediate(_asyncRunCallback);
+ let _BufferingStreamSubscription = _BufferingStreamSubscription$(dart.dynamic);
+ let _ControllerSubscription$ = dart.generic(function(T) {
+ class _ControllerSubscription extends _BufferingStreamSubscription$(T) {
+ _ControllerSubscription($_controller, onData, onError, onDone, cancelOnError) {
+ this[_controller] = $_controller;
+ super._BufferingStreamSubscription(onData, onError, onDone, cancelOnError);
}
- } else {
- let newEntry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback));
- exports._lastCallback.next = newEntry;
- exports._lastCallback = newEntry;
- }
- }
- // Function _schedulePriorityAsyncCallback: (dynamic) → void
- function _schedulePriorityAsyncCallback(callback) {
- let entry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback));
- if (exports._nextCallback === null) {
- _scheduleAsyncCallback(callback);
- exports._lastPriorityCallback = exports._lastCallback;
- } else if (exports._lastPriorityCallback === null) {
- entry.next = exports._nextCallback;
- exports._nextCallback = exports._lastPriorityCallback = entry;
- } else {
- entry.next = exports._lastPriorityCallback.next;
- exports._lastPriorityCallback.next = entry;
- exports._lastPriorityCallback = entry;
- if (entry.next === null) {
- exports._lastCallback = entry;
+ [_onCancel]() {
+ return this[_controller]._recordCancel(this);
}
- }
- }
- // Function scheduleMicrotask: (() → void) → void
- function scheduleMicrotask(callback) {
- if (core.identical(_ROOT_ZONE, Zone.current)) {
- _rootScheduleMicrotask(null, null, dart.as(_ROOT_ZONE, Zone), callback);
- return;
- }
- Zone.current.scheduleMicrotask(Zone.current.bindCallback(callback, {runGuarded: true}));
- }
- let _scheduleImmediate = Symbol('_scheduleImmediate');
- let _initializeScheduleImmediate = Symbol('_initializeScheduleImmediate');
- let _scheduleImmediateJsOverride = Symbol('_scheduleImmediateJsOverride');
- let _scheduleImmediateWithSetImmediate = Symbol('_scheduleImmediateWithSetImmediate');
- let _scheduleImmediateWithTimer = Symbol('_scheduleImmediateWithTimer');
- class _AsyncRun extends core.Object {
- static [_scheduleImmediate](callback) {
- dart.dinvokef(scheduleImmediateClosure, callback);
- }
- static [_initializeScheduleImmediate]() {
- _js_helper.requiresPreamble();
- if (self.scheduleImmediate !== null) {
- return _scheduleImmediateJsOverride;
+ [_onPause]() {
+ this[_controller]._recordPause(this);
}
- if (dart.notNull(self.MutationObserver !== null) && dart.notNull(self.document !== null)) {
- let div = self.document.createElement("div");
- let span = self.document.createElement("span");
- let storedCallback = null;
- // Function internalCallback: (dynamic) → dynamic
- function internalCallback(_) {
- _isolate_helper.leaveJsAsync();
- let f = storedCallback;
- storedCallback = null;
- dart.dinvokef(f);
- }
- ;
- let observer = new self.MutationObserver(_js_helper.convertDartClosureToJS(internalCallback, 1));
- observer.observe(div, {childList: true});
- return (callback) => {
- dart.assert(storedCallback === null);
- _isolate_helper.enterJsAsync();
- storedCallback = callback;
- div.firstChild ? div.removeChild(span) : div.appendChild(span);
- };
- } else if (self.setImmediate !== null) {
- return _scheduleImmediateWithSetImmediate;
+ [_onResume]() {
+ this[_controller]._recordResume(this);
}
- return _scheduleImmediateWithTimer;
}
- static [_scheduleImmediateJsOverride](callback) {
- // Function internalCallback: () → dynamic
- function internalCallback() {
- _isolate_helper.leaveJsAsync();
- callback();
+ return _ControllerSubscription;
+ });
+ let _ControllerSubscription = _ControllerSubscription$(dart.dynamic);
+ let _BroadcastSubscription$ = dart.generic(function(T) {
+ class _BroadcastSubscription extends _ControllerSubscription$(T) {
+ _BroadcastSubscription(controller, onData, onError, onDone, cancelOnError) {
+ this[_eventState] = null;
+ this[_next] = null;
+ this[_previous] = null;
+ super._ControllerSubscription(dart.as(controller, _StreamControllerLifecycle$(T)), onData, onError, onDone, cancelOnError);
+ this[_next] = this[_previous] = this;
}
- ;
- _isolate_helper.enterJsAsync();
- self.scheduleImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0));
+ get [_controller]() {
+ return dart.as(super[_controller], _BroadcastStreamController);
+ }
+ [_expectsEvent](eventId) {
+ return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_EVENT_ID)) === eventId;
+ }
+ [_toggleEventId]() {
+ this[_eventState] = _BroadcastSubscription._STATE_EVENT_ID;
+ }
+ get [_isFiring]() {
+ return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_FIRING)) !== 0;
+ }
+ [_setRemoveAfterFiring]() {
+ dart.assert(this[_isFiring]);
+ this[_eventState] = _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
+ }
+ get [_removeAfterFiring]() {
+ return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_REMOVE_AFTER_FIRING)) !== 0;
+ }
+ [_onPause]() {}
+ [_onResume]() {}
}
- static [_scheduleImmediateWithSetImmediate](callback) {
- // Function internalCallback: () → dynamic
- function internalCallback() {
- _isolate_helper.leaveJsAsync();
- callback();
+ _BroadcastSubscription._STATE_EVENT_ID = 1;
+ _BroadcastSubscription._STATE_FIRING = 2;
+ _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING = 4;
+ return _BroadcastSubscription;
+ });
+ let _BroadcastSubscription = _BroadcastSubscription$(dart.dynamic);
+ let _addStreamState = Symbol('_addStreamState');
+ let _doneFuture = Symbol('_doneFuture');
+ let _isEmpty = Symbol('_isEmpty');
+ let _hasOneListener = Symbol('_hasOneListener');
+ let _isAddingStream = Symbol('_isAddingStream');
+ let _mayAddEvent = Symbol('_mayAddEvent');
+ let _ensureDoneFuture = Symbol('_ensureDoneFuture');
+ let _addListener = Symbol('_addListener');
+ let _removeListener = Symbol('_removeListener');
+ let _subscribe = Symbol('_subscribe');
+ let _recordCancel = Symbol('_recordCancel');
+ let _callOnCancel = Symbol('_callOnCancel');
+ let _recordPause = Symbol('_recordPause');
+ let _recordResume = Symbol('_recordResume');
+ let _addEventError = Symbol('_addEventError');
+ let _forEachListener = Symbol('_forEachListener');
+ let _STATE_FIRING = Symbol('_STATE_FIRING');
+ let _mayComplete = Symbol('_mayComplete');
+ let _BroadcastStreamController$ = dart.generic(function(T) {
+ class _BroadcastStreamController extends core.Object {
+ _BroadcastStreamController($_onListen, $_onCancel) {
+ this[_onListen] = $_onListen;
+ this[_onCancel] = $_onCancel;
+ this[_state] = _BroadcastStreamController._STATE_INITIAL;
+ this[_next] = null;
+ this[_previous] = null;
+ this[_addStreamState] = null;
+ this[_doneFuture] = null;
+ this[_next] = this[_previous] = this;
+ }
+ get stream() {
+ return new _BroadcastStream(this);
+ }
+ get sink() {
+ return new _StreamSinkWrapper(this);
+ }
+ get isClosed() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_CLOSED)) !== 0;
+ }
+ get isPaused() {
+ return false;
}
- ;
- _isolate_helper.enterJsAsync();
- self.setImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0));
- }
- static [_scheduleImmediateWithTimer](callback) {
- Timer._createTimer(core.Duration.ZERO, callback);
- }
- }
- dart.defineLazyProperties(_AsyncRun, {
- get scheduleImmediateClosure() {
- return _initializeScheduleImmediate();
- }
- });
- let _sink = Symbol('_sink');
- let Stream$ = dart.generic(function(T) {
- class Stream extends core.Object {
- Stream() {
+ get hasListener() {
+ return !dart.notNull(this[_isEmpty]);
}
- Stream$fromFuture(future) {
- let controller = dart.as(new StreamController({sync: true}), _StreamController$(T));
- future.then(((value) => {
- controller._add(dart.as(value, T));
- controller._closeUnchecked();
- }).bind(this), {onError: ((error, stackTrace) => {
- controller._addError(error, dart.as(stackTrace, core.StackTrace));
- controller._closeUnchecked();
- }).bind(this)});
- return controller.stream;
+ get [_hasOneListener]() {
+ dart.assert(!dart.notNull(this[_isEmpty]));
+ return core.identical(this[_next][_next], this);
}
- Stream$fromIterable(data) {
- return new _GeneratedStreamImpl(() => new _IterablePendingEvents(data));
+ get [_isFiring]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_FIRING)) !== 0;
}
- Stream$periodic(period, computation) {
- if (computation === void 0)
- computation = null;
- if (computation === null)
- computation = (i) => null;
- let timer = null;
- let computationCount = 0;
- let controller = null;
- let watch = new core.Stopwatch();
- // Function sendEvent: () → void
- function sendEvent() {
- watch.reset();
- let data = computation((($tmp) => computationCount = dart.notNull($tmp) + 1, $tmp)(computationCount));
- controller.add(data);
- }
- // Function startPeriodicTimer: () → void
- function startPeriodicTimer() {
- dart.assert(timer === null);
- timer = new Timer.periodic(period, (timer) => {
- sendEvent();
- });
- }
- controller = new StreamController({sync: true, onListen: (() => {
- watch.start();
- startPeriodicTimer();
- }).bind(this), onPause: (() => {
- timer.cancel();
- timer = null;
- watch.stop();
- }).bind(this), onResume: (() => {
- dart.assert(timer === null);
- let elapsed = watch.elapsed;
- watch.start();
- timer = new Timer(period['-'](elapsed), () => {
- timer = null;
- startPeriodicTimer();
- sendEvent();
- });
- }).bind(this), onCancel: (() => {
- if (timer !== null)
- timer.cancel();
- timer = null;
- }).bind(this)});
- return controller.stream;
+ get [_isAddingStream]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM)) !== 0;
}
- Stream$eventTransformed(source, mapSink) {
- return dart.as(new _BoundSinkStream(source, dart.as(mapSink, _SinkMapper)), Stream$(T));
+ get [_mayAddEvent]() {
+ return dart.notNull(this[_state]) < dart.notNull(_BroadcastStreamController._STATE_CLOSED);
}
- get isBroadcast() {
- return false;
+ [_ensureDoneFuture]() {
+ if (this[_doneFuture] !== null)
+ return this[_doneFuture];
+ return this[_doneFuture] = new _Future();
}
- asBroadcastStream(opt$) {
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
- return new _AsBroadcastStream(this, dart.as(onListen, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void")), dart.as(onCancel, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void")));
+ get [_isEmpty]() {
+ return core.identical(this[_next], this);
}
- where(test) {
- return new _WhereStream(this, test);
+ [_addListener](subscription) {
+ dart.assert(core.identical(subscription[_next], subscription));
+ subscription[_previous] = this[_previous];
+ subscription[_next] = this;
+ this[_previous][_next] = subscription;
+ this[_previous] = subscription;
+ subscription[_eventState] = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID);
}
- map(convert) {
- return new _MapStream(this, convert);
+ [_removeListener](subscription) {
+ dart.assert(core.identical(subscription[_controller], this));
+ dart.assert(!dart.notNull(core.identical(subscription[_next], subscription)));
+ let previous = subscription[_previous];
+ let next = subscription[_next];
+ previous[_next] = next;
+ next[_previous] = previous;
+ subscription[_next] = subscription[_previous] = subscription;
}
- asyncMap(convert) {
- let controller = null;
- let subscription = null;
- // Function onListen: () → void
- function onListen() {
- let add = controller.add;
- dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController)));
- let eventSink = controller;
- let addError = eventSink[_addError];
- subscription = this.listen(((event) => {
- let newValue = null;
- try {
- newValue = convert(event);
- } catch (e) {
- let s = dart.stackTrace(e);
- controller.addError(e, s);
- return;
- }
-
- if (dart.is(newValue, Future)) {
- subscription.pause();
- dart.dinvoke(dart.dinvoke(newValue, 'then', add, {onError: addError}), 'whenComplete', subscription.resume);
- } else {
- controller.add(newValue);
- }
- }).bind(this), {onError: dart.as(addError, core.Function), onDone: controller.close});
+ [_subscribe](onData, onError, onDone, cancelOnError) {
+ if (this.isClosed) {
+ if (onDone === null)
+ onDone = _nullDoneHandler;
+ return new _DoneStreamSubscription(onDone);
}
- if (this.isBroadcast) {
- controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => {
- subscription.cancel();
- }).bind(this), sync: true});
+ let subscription = new _BroadcastSubscription(this, onData, onError, onDone, cancelOnError);
+ this[_addListener](dart.as(subscription, _BroadcastSubscription$(T)));
+ if (core.identical(this[_next], this[_previous])) {
+ _runGuarded(this[_onListen]);
+ }
+ return dart.as(subscription, StreamSubscription$(T));
+ }
+ [_recordCancel](subscription) {
+ if (core.identical(subscription[_next], subscription))
+ return null;
+ dart.assert(!dart.notNull(core.identical(subscription[_next], subscription)));
+ if (subscription[_isFiring]) {
+ subscription._setRemoveAfterFiring();
} else {
- controller = new StreamController({onListen: onListen, onPause: (() => {
- subscription.pause();
- }).bind(this), onResume: (() => {
- subscription.resume();
- }).bind(this), onCancel: (() => {
- subscription.cancel();
- }).bind(this), sync: true});
+ dart.assert(!dart.notNull(core.identical(subscription[_next], subscription)));
+ this[_removeListener](subscription);
+ if (!dart.notNull(this[_isFiring]) && dart.notNull(this[_isEmpty])) {
+ this[_callOnCancel]();
+ }
}
- return controller.stream;
+ return null;
}
- asyncExpand(convert) {
- let controller = null;
- let subscription = null;
- // Function onListen: () → void
- function onListen() {
- dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController)));
- let eventSink = controller;
- subscription = this.listen(((event) => {
- let newStream = null;
- try {
- newStream = convert(event);
- } catch (e) {
- let s = dart.stackTrace(e);
- controller.addError(e, s);
- return;
- }
-
- if (newStream !== null) {
- subscription.pause();
- controller.addStream(newStream).whenComplete(subscription.resume);
- }
- }).bind(this), {onError: dart.as(eventSink[_addError], core.Function), onDone: controller.close});
+ [_recordPause](subscription) {}
+ [_recordResume](subscription) {}
+ [_addEventError]() {
+ if (this.isClosed) {
+ return new core.StateError("Cannot add new events after calling close");
}
- if (this.isBroadcast) {
- controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => {
- subscription.cancel();
- }).bind(this), sync: true});
- } else {
- controller = new StreamController({onListen: onListen, onPause: (() => {
- subscription.pause();
- }).bind(this), onResume: (() => {
- subscription.resume();
- }).bind(this), onCancel: (() => {
- subscription.cancel();
- }).bind(this), sync: true});
+ dart.assert(this[_isAddingStream]);
+ return new core.StateError("Cannot add new events while doing an addStream");
+ }
+ add(data) {
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_addEventError]();
+ this[_sendData](data);
+ }
+ addError(error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ error = _nonNullError(error);
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_addEventError]();
+ let replacement = Zone.current.errorCallback(error, stackTrace);
+ if (replacement !== null) {
+ error = _nonNullError(replacement.error);
+ stackTrace = replacement.stackTrace;
}
- return controller.stream;
+ this[_sendError](error, stackTrace);
}
- handleError(onError, opt$) {
- let test = opt$.test === void 0 ? null : opt$.test;
- return new _HandleErrorStream(this, onError, test);
+ close() {
+ if (this.isClosed) {
+ dart.assert(this[_doneFuture] !== null);
+ return this[_doneFuture];
+ }
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_addEventError]();
+ this[_state] = _BroadcastStreamController._STATE_CLOSED;
+ let doneFuture = this[_ensureDoneFuture]();
+ this[_sendDone]();
+ return doneFuture;
+ }
+ get done() {
+ return this[_ensureDoneFuture]();
}
- expand(convert) {
- return new _ExpandStream(this, convert);
+ addStream(stream, opt$) {
+ let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError;
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_addEventError]();
+ this[_state] = _BroadcastStreamController._STATE_ADDSTREAM;
+ this[_addStreamState] = dart.as(new _AddStreamState(this, stream, cancelOnError), _AddStreamState$(T));
+ return this[_addStreamState].addStreamFuture;
}
- pipe(streamConsumer) {
- return streamConsumer.addStream(this).then(((_) => streamConsumer.close()).bind(this));
+ [_add](data) {
+ this[_sendData](data);
}
- transform(streamTransformer) {
- return streamTransformer.bind(this);
+ [_addError](error, stackTrace) {
+ this[_sendError](error, stackTrace);
}
- reduce(combine) {
- let result = new _Future();
- let seenFirst = false;
- let value = null;
- let subscription = null;
- subscription = this.listen((element) => {
- if (seenFirst) {
- _runUserCode(() => combine(value, element), dart.as((newValue) => {
- value = newValue;
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
+ [_close]() {
+ dart.assert(this[_isAddingStream]);
+ let addState = this[_addStreamState];
+ this[_addStreamState] = null;
+ this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM);
+ addState.complete();
+ }
+ [_forEachListener](action) {
+ if (this[_isFiring]) {
+ throw new core.StateError("Cannot fire new event. Controller is already firing an event");
+ }
+ if (this[_isEmpty])
+ return;
+ let id = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID);
+ this[_state] = dart.notNull(_BroadcastStreamController._STATE_EVENT_ID) | dart.notNull(_BroadcastStreamController._STATE_FIRING);
+ let link = this[_next];
+ while (!dart.notNull(core.identical(link, this))) {
+ let subscription = dart.as(link, _BroadcastSubscription$(T));
+ if (subscription._expectsEvent(id)) {
+ subscription[_eventState] = _BroadcastSubscription[_STATE_FIRING];
+ action(subscription);
+ subscription._toggleEventId();
+ link = subscription[_next];
+ if (subscription[_removeAfterFiring]) {
+ this[_removeListener](subscription);
+ }
+ subscription[_eventState] = ~dart.notNull(_BroadcastSubscription[_STATE_FIRING]);
} else {
- value = element;
- seenFirst = true;
+ link = subscription[_next];
}
- }, {onError: result[_completeError], onDone: (() => {
- if (!dart.notNull(seenFirst)) {
- try {
- throw _internal.IterableElementError.noElement();
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(result, e, s);
- }
-
- } else {
- result._complete(value);
- }
- }).bind(this), cancelOnError: true});
- return result;
+ }
+ this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_FIRING);
+ if (this[_isEmpty]) {
+ this[_callOnCancel]();
+ }
}
- fold(initialValue, combine) {
- let result = new _Future();
- let value = initialValue;
- let subscription = null;
- subscription = this.listen((element) => {
- _runUserCode(() => combine(value, element), (newValue) => {
- value = newValue;
- }, dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: ((e, st) => {
- result._completeError(e, dart.as(st, core.StackTrace));
- }).bind(this), onDone: (() => {
- result._complete(value);
- }).bind(this), cancelOnError: true});
- return result;
+ [_callOnCancel]() {
+ dart.assert(this[_isEmpty]);
+ if (dart.notNull(this.isClosed) && dart.notNull(this[_doneFuture][_mayComplete])) {
+ this[_doneFuture]._asyncComplete(null);
+ }
+ _runGuarded(this[_onCancel]);
}
- join(separator) {
- if (separator === void 0)
- separator = "";
- let result = new _Future();
- let buffer = new core.StringBuffer();
- let subscription = null;
- let first = true;
- subscription = this.listen(((element) => {
- if (!dart.notNull(first)) {
- buffer.write(separator);
- }
- first = false;
- try {
- buffer.write(element);
- } catch (e) {
- let s = dart.stackTrace(e);
- _cancelAndErrorWithReplacement(subscription, result, e, s);
+ }
+ _BroadcastStreamController._STATE_INITIAL = 0;
+ _BroadcastStreamController._STATE_EVENT_ID = 1;
+ _BroadcastStreamController._STATE_FIRING = 2;
+ _BroadcastStreamController._STATE_CLOSED = 4;
+ _BroadcastStreamController._STATE_ADDSTREAM = 8;
+ return _BroadcastStreamController;
+ });
+ let _BroadcastStreamController = _BroadcastStreamController$(dart.dynamic);
+ let _SyncBroadcastStreamController$ = dart.generic(function(T) {
+ class _SyncBroadcastStreamController extends _BroadcastStreamController$(T) {
+ _SyncBroadcastStreamController(onListen, onCancel) {
+ super._BroadcastStreamController(onListen, onCancel);
+ }
+ [_sendData](data) {
+ if (this[_isEmpty])
+ return;
+ if (this[_hasOneListener]) {
+ this[_state] = _BroadcastStreamController[_STATE_FIRING];
+ let subscription = dart.as(this[_next], _BroadcastSubscription);
+ subscription._add(data);
+ this[_state] = ~dart.notNull(_BroadcastStreamController[_STATE_FIRING]);
+ if (this[_isEmpty]) {
+ this[_callOnCancel]();
}
-
- }).bind(this), {onError: ((e) => {
- result._completeError(e);
- }).bind(this), onDone: (() => {
- result._complete(buffer.toString());
- }).bind(this), cancelOnError: true});
- return result;
+ return;
+ }
+ this[_forEachListener](((subscription) => {
+ subscription._add(data);
+ }).bind(this));
}
- contains(needle) {
- let future = new _Future();
- let subscription = null;
- subscription = this.listen((element) => {
- _runUserCode(() => dart.equals(element, needle), dart.as((isMatch) => {
- if (isMatch) {
- _cancelAndValue(subscription, future, true);
- }
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: future[_completeError], onDone: (() => {
- future._complete(false);
- }).bind(this), cancelOnError: true});
- return future;
+ [_sendError](error, stackTrace) {
+ if (this[_isEmpty])
+ return;
+ this[_forEachListener](((subscription) => {
+ subscription._addError(error, stackTrace);
+ }).bind(this));
}
- forEach(action) {
- let future = new _Future();
- let subscription = null;
- subscription = this.listen((element) => {
- _runUserCode(() => action(element), (_) => {
- }, dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: future[_completeError], onDone: (() => {
- future._complete(null);
- }).bind(this), cancelOnError: true});
- return future;
+ [_sendDone]() {
+ if (!dart.notNull(this[_isEmpty])) {
+ this[_forEachListener](dart.as(((subscription) => {
+ subscription._close();
+ }).bind(this), dart.throw_("Unimplemented type (_BufferingStreamSubscription<T>) → void")));
+ } else {
+ dart.assert(this[_doneFuture] !== null);
+ dart.assert(this[_doneFuture][_mayComplete]);
+ this[_doneFuture]._asyncComplete(null);
+ }
}
- every(test) {
- let future = new _Future();
- let subscription = null;
- subscription = this.listen((element) => {
- _runUserCode(() => test(element), dart.as((isMatch) => {
- if (!dart.notNull(isMatch)) {
- _cancelAndValue(subscription, future, false);
- }
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: future[_completeError], onDone: (() => {
- future._complete(true);
- }).bind(this), cancelOnError: true});
- return future;
+ }
+ return _SyncBroadcastStreamController;
+ });
+ let _SyncBroadcastStreamController = _SyncBroadcastStreamController$(dart.dynamic);
+ let _AsyncBroadcastStreamController$ = dart.generic(function(T) {
+ class _AsyncBroadcastStreamController extends _BroadcastStreamController$(T) {
+ _AsyncBroadcastStreamController(onListen, onCancel) {
+ super._BroadcastStreamController(onListen, onCancel);
}
- any(test) {
- let future = new _Future();
- let subscription = null;
- subscription = this.listen((element) => {
- _runUserCode(() => test(element), dart.as((isMatch) => {
- if (isMatch) {
- _cancelAndValue(subscription, future, true);
- }
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: future[_completeError], onDone: (() => {
- future._complete(false);
- }).bind(this), cancelOnError: true});
- return future;
+ [_sendData](data) {
+ for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) {
+ let subscription = dart.as(link, _BroadcastSubscription$(T));
+ subscription._addPending(new _DelayedData(data));
+ }
+ }
+ [_sendError](error, stackTrace) {
+ for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) {
+ let subscription = dart.as(link, _BroadcastSubscription$(T));
+ subscription._addPending(new _DelayedError(error, stackTrace));
+ }
+ }
+ [_sendDone]() {
+ if (!dart.notNull(this[_isEmpty])) {
+ for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) {
+ let subscription = dart.as(link, _BroadcastSubscription$(T));
+ subscription._addPending(new _DelayedDone());
+ }
+ } else {
+ dart.assert(this[_doneFuture] !== null);
+ dart.assert(this[_doneFuture][_mayComplete]);
+ this[_doneFuture]._asyncComplete(null);
+ }
+ }
+ }
+ return _AsyncBroadcastStreamController;
+ });
+ let _AsyncBroadcastStreamController = _AsyncBroadcastStreamController$(dart.dynamic);
+ let _addPendingEvent = Symbol('_addPendingEvent');
+ let _STATE_CLOSED = Symbol('_STATE_CLOSED');
+ let _AsBroadcastStreamController$ = dart.generic(function(T) {
+ class _AsBroadcastStreamController extends _SyncBroadcastStreamController$(T) {
+ _AsBroadcastStreamController(onListen, onCancel) {
+ this[_pending] = null;
+ super._SyncBroadcastStreamController(onListen, onCancel);
}
- get length() {
- let future = new _Future();
- let count = 0;
- this.listen((_) => {
- count = dart.notNull(count) + 1;
- }, {onError: future[_completeError], onDone: (() => {
- future._complete(count);
- }).bind(this), cancelOnError: true});
- return future;
+ get [_hasPending]() {
+ return dart.notNull(this[_pending] !== null) && !dart.notNull(this[_pending].isEmpty);
}
- get isEmpty() {
- let future = new _Future();
- let subscription = null;
- subscription = this.listen((_) => {
- _cancelAndValue(subscription, future, false);
- }, {onError: future[_completeError], onDone: (() => {
- future._complete(true);
- }).bind(this), cancelOnError: true});
- return future;
+ [_addPendingEvent](event) {
+ if (this[_pending] === null) {
+ this[_pending] = new _StreamImplEvents();
+ }
+ this[_pending].add(event);
}
- toList() {
- let result = new List.from([]);
- let future = new _Future();
- this.listen(((data) => {
- result.add(data);
- }).bind(this), {onError: future[_completeError], onDone: (() => {
- future._complete(result);
- }).bind(this), cancelOnError: true});
- return future;
+ add(data) {
+ if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) {
+ this[_addPendingEvent](new _DelayedData(data));
+ return;
+ }
+ super.add(data);
+ while (this[_hasPending]) {
+ this[_pending].handleNext(this);
+ }
}
- toSet() {
- let result = new core.Set();
- let future = new _Future();
- this.listen(((data) => {
- result.add(data);
- }).bind(this), {onError: future[_completeError], onDone: (() => {
- future._complete(result);
- }).bind(this), cancelOnError: true});
- return future;
+ addError(error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) {
+ this[_addPendingEvent](new _DelayedError(error, stackTrace));
+ return;
+ }
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_addEventError]();
+ this[_sendError](error, stackTrace);
+ while (this[_hasPending]) {
+ this[_pending].handleNext(this);
+ }
}
- drain(futureValue) {
- if (futureValue === void 0)
- futureValue = null;
- return this.listen(null, {cancelOnError: true}).asFuture(futureValue);
+ close() {
+ if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) {
+ this[_addPendingEvent](new _DelayedDone());
+ this[_state] = _BroadcastStreamController[_STATE_CLOSED];
+ return super.done;
+ }
+ let result = super.close();
+ dart.assert(!dart.notNull(this[_hasPending]));
+ return result;
}
- take(count) {
- return dart.as(new _TakeStream(this, count), Stream$(T));
+ [_callOnCancel]() {
+ if (this[_hasPending]) {
+ this[_pending].clear();
+ this[_pending] = null;
+ }
+ super._callOnCancel();
}
- takeWhile(test) {
- return dart.as(new _TakeWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T));
+ }
+ return _AsBroadcastStreamController;
+ });
+ let _AsBroadcastStreamController = _AsBroadcastStreamController$(dart.dynamic);
+ let _pauseCount = Symbol('_pauseCount');
+ let _resume = Symbol('_resume');
+ let _DoneSubscription$ = dart.generic(function(T) {
+ class _DoneSubscription extends core.Object {
+ _DoneSubscription() {
+ this[_pauseCount] = 0;
}
- skip(count) {
- return dart.as(new _SkipStream(this, count), Stream$(T));
+ onData(handleData) {}
+ onError(handleError) {}
+ onDone(handleDone) {}
+ pause(resumeSignal) {
+ if (resumeSignal === void 0)
+ resumeSignal = null;
+ if (resumeSignal !== null)
+ resumeSignal.then(this[_resume]);
+ this[_pauseCount] = dart.notNull(this[_pauseCount]) + 1;
}
- skipWhile(test) {
- return dart.as(new _SkipWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T));
+ resume() {
+ this[_resume](null);
}
- distinct(equals) {
- if (equals === void 0)
- equals = null;
- return dart.as(new _DistinctStream(this, dart.as(equals, dart.throw_("Unimplemented type (dynamic, dynamic) → bool"))), Stream$(T));
+ [_resume](_) {
+ if (dart.notNull(this[_pauseCount]) > 0)
+ this[_pauseCount] = dart.notNull(this[_pauseCount]) - 1;
}
- get first() {
- let future = new _Future();
- let subscription = null;
- subscription = this.listen((value) => {
- _cancelAndValue(subscription, future, value);
- }, {
- onError: future[_completeError],
- onDone: () => {
- try {
- throw _internal.IterableElementError.noElement();
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(future, e, s);
- }
-
- },
- cancelOnError: true
- });
- return future;
+ cancel() {
+ return new _Future.immediate(null);
}
- get last() {
- let future = new _Future();
- let result = null;
- let foundResult = false;
- let subscription = null;
- subscription = this.listen((value) => {
- foundResult = true;
- result = value;
- }, {onError: future[_completeError], onDone: (() => {
- if (foundResult) {
- future._complete(result);
- return;
- }
- try {
- throw _internal.IterableElementError.noElement();
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(future, e, s);
- }
-
- }).bind(this), cancelOnError: true});
- return future;
+ get isPaused() {
+ return dart.notNull(this[_pauseCount]) > 0;
}
- get single() {
- let future = new _Future();
- let result = null;
- let foundResult = false;
- let subscription = null;
- subscription = this.listen((value) => {
- if (foundResult) {
- try {
- throw _internal.IterableElementError.tooMany();
- } catch (e) {
- let s = dart.stackTrace(e);
- _cancelAndErrorWithReplacement(subscription, future, e, s);
- }
+ asFuture(value) {
+ if (value === void 0)
+ value = null;
+ return new _Future();
+ }
+ }
+ return _DoneSubscription;
+ });
+ let _DoneSubscription = _DoneSubscription$(dart.dynamic);
+ class DeferredLibrary extends core.Object {
+ DeferredLibrary(libraryName, opt$) {
+ let uri = opt$.uri === void 0 ? null : opt$.uri;
+ this.libraryName = libraryName;
+ this.uri = uri;
+ }
+ load() {
+ throw 'DeferredLibrary not supported. ' + 'please use the `import "lib.dart" deferred as lib` syntax.';
+ }
+ }
+ let _s = Symbol('_s');
+ class DeferredLoadException extends core.Object {
+ DeferredLoadException($_s) {
+ this[_s] = $_s;
+ }
+ toString() {
+ return `DeferredLoadException: '${this[_s]}'`;
+ }
+ }
+ let Future$ = dart.generic(function(T) {
+ class Future extends core.Object {
+ Future(computation) {
+ let result = new _Future();
+ Timer.run((() => {
+ try {
+ result._complete(computation());
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(result, e, s);
+ }
- return;
+ }).bind(this));
+ return dart.as(result, Future$(T));
+ }
+ Future$microtask(computation) {
+ let result = new _Future();
+ scheduleMicrotask((() => {
+ try {
+ result._complete(computation());
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(result, e, s);
}
- foundResult = true;
- result = value;
- }, {onError: future[_completeError], onDone: (() => {
- if (foundResult) {
- future._complete(result);
- return;
- }
- try {
- throw _internal.IterableElementError.noElement();
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(future, e, s);
- }
- }).bind(this), cancelOnError: true});
- return future;
+ }).bind(this));
+ return dart.as(result, Future$(T));
+ }
+ Future$sync(computation) {
+ try {
+ let result = computation();
+ return new Future.value(result);
+ } catch (error) {
+ let stackTrace = dart.stackTrace(error);
+ return new Future.error(error, stackTrace);
+ }
+
}
- firstWhere(test, opt$) {
- let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue;
- let future = new _Future();
- let subscription = null;
- subscription = this.listen((value) => {
- _runUserCode(() => test(value), dart.as((isMatch) => {
- if (isMatch) {
- _cancelAndValue(subscription, future, value);
- }
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: future[_completeError], onDone: (() => {
- if (defaultValue !== null) {
- _runUserCode(defaultValue, future[_complete], future[_completeError]);
- return;
- }
- try {
- throw _internal.IterableElementError.noElement();
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(future, e, s);
- }
-
- }).bind(this), cancelOnError: true});
- return future;
+ Future$value(value) {
+ if (value === void 0)
+ value = null;
+ return new _Future.immediate(value);
}
- lastWhere(test, opt$) {
- let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue;
- let future = new _Future();
- let result = null;
- let foundResult = false;
- let subscription = null;
- subscription = this.listen((value) => {
- _runUserCode(() => true === test(value), dart.as((isMatch) => {
- if (isMatch) {
- foundResult = true;
- result = value;
- }
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: future[_completeError], onDone: (() => {
- if (foundResult) {
- future._complete(result);
- return;
- }
- if (defaultValue !== null) {
- _runUserCode(defaultValue, future[_complete], future[_completeError]);
- return;
- }
- try {
- throw _internal.IterableElementError.noElement();
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(future, e, s);
- }
+ Future$error(error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ error = _nonNullError(error);
+ if (!dart.notNull(core.identical(Zone.current, _ROOT_ZONE))) {
+ let replacement = Zone.current.errorCallback(error, stackTrace);
+ if (replacement !== null) {
+ error = _nonNullError(replacement.error);
+ stackTrace = replacement.stackTrace;
+ }
+ }
+ return new _Future.immediateError(error, stackTrace);
+ }
+ Future$delayed(duration, computation) {
+ if (computation === void 0)
+ computation = null;
+ let result = new _Future();
+ new Timer(duration, (() => {
+ try {
+ result._complete(computation === null ? null : computation());
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ _completeWithErrorCallback(result, e, s);
+ }
- }).bind(this), cancelOnError: true});
- return future;
+ }).bind(this));
+ return dart.as(result, Future$(T));
}
- singleWhere(test) {
- let future = new _Future();
- let result = null;
- let foundResult = false;
- let subscription = null;
- subscription = this.listen((value) => {
- _runUserCode(() => true === test(value), dart.as((isMatch) => {
- if (isMatch) {
- if (foundResult) {
- try {
- throw _internal.IterableElementError.tooMany();
- } catch (e) {
- let s = dart.stackTrace(e);
- _cancelAndErrorWithReplacement(subscription, future, e, s);
+ static wait(futures, opt$) {
+ let eagerError = opt$.eagerError === void 0 ? false : opt$.eagerError;
+ let cleanUp = opt$.cleanUp === void 0 ? null : opt$.cleanUp;
+ let result = new _Future();
+ let values = null;
+ let remaining = 0;
+ let error = null;
+ let stackTrace = null;
+ // Function handleError: (dynamic, dynamic) → void
+ function handleError(theError, theStackTrace) {
+ remaining = dart.notNull(remaining) - 1;
+ if (values !== null) {
+ if (cleanUp !== null) {
+ for (let value of values) {
+ if (value !== null) {
+ new Future.sync(() => {
+ cleanUp(value);
+ });
}
-
- return;
}
- foundResult = true;
- result = value;
- }
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic")));
- }, {onError: future[_completeError], onDone: (() => {
- if (foundResult) {
- future._complete(result);
- return;
}
- try {
- throw _internal.IterableElementError.noElement();
- } catch (e) {
- let s = dart.stackTrace(e);
- _completeWithErrorCallback(future, e, s);
+ values = null;
+ if (remaining === 0 || dart.notNull(eagerError)) {
+ result._completeError(theError, dart.as(theStackTrace, core.StackTrace));
+ } else {
+ error = theError;
+ stackTrace = dart.as(theStackTrace, core.StackTrace);
}
-
- }).bind(this), cancelOnError: true});
- return future;
- }
- elementAt(index) {
- if (dart.notNull(!(typeof index == number)) || dart.notNull(index) < 0)
- throw new core.ArgumentError(index);
- let future = new _Future();
- let subscription = null;
- let elementIndex = 0;
- subscription = this.listen((value) => {
- if (index === elementIndex) {
- _cancelAndValue(subscription, future, value);
- return;
+ } else if (remaining === 0 && !dart.notNull(eagerError)) {
+ result._completeError(error, stackTrace);
}
- elementIndex = 1;
- }, {onError: future[_completeError], onDone: (() => {
- future._completeError(new core.RangeError.index(index, this, "index", null, elementIndex));
- }).bind(this), cancelOnError: true});
- return future;
- }
- timeout(timeLimit, opt$) {
- let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout;
- let controller = null;
- let subscription = null;
- let timer = null;
- let zone = null;
- let timeout = null;
- // Function onData: (T) → void
- function onData(event) {
- timer.cancel();
- controller.add(event);
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
}
- // Function onError: (dynamic, StackTrace) → void
- function onError(error, stackTrace) {
- timer.cancel();
- dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController)));
- let eventSink = controller;
- dart.dinvoke(eventSink, '_addError', error, stackTrace);
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
+ for (let future of futures) {
+ let pos = (($tmp) => remaining = dart.notNull($tmp) + 1, $tmp)(remaining);
+ future.then(dart.as(((value) => {
+ remaining = dart.notNull(remaining) - 1;
+ if (values !== null) {
+ values.set(pos, value);
+ if (remaining === 0) {
+ result._completeWithValue(values);
+ }
+ } else {
+ if (dart.notNull(cleanUp !== null) && dart.notNull(value !== null)) {
+ new Future.sync(() => {
+ cleanUp(value);
+ });
+ }
+ if (remaining === 0 && !dart.notNull(eagerError)) {
+ result._completeError(error, stackTrace);
+ }
+ }
+ }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: handleError});
}
- // Function onDone: () → void
- function onDone() {
- timer.cancel();
- controller.close();
+ if (remaining === 0) {
+ return dart.as(new Future.value(/* Unimplemented const */new List.from([])), Future$(core.List));
}
- // Function onListen: () → void
- function onListen() {
- zone = Zone.current;
- if (onTimeout === null) {
- timeout = (() => {
- controller.addError(new TimeoutException("No stream event", timeLimit), null);
- }).bind(this);
+ values = new core.List(remaining);
+ return result;
+ }
+ static forEach(input, f) {
+ let iterator = input.iterator;
+ return doWhile((() => {
+ if (!dart.notNull(iterator.moveNext()))
+ return false;
+ return new Future.sync((() => f(iterator.current)).bind(this)).then((_) => true);
+ }).bind(this));
+ }
+ static doWhile(f) {
+ let doneSignal = new _Future();
+ let nextIteration = null;
+ nextIteration = Zone.current.bindUnaryCallback(dart.as(((keepGoing) => {
+ if (keepGoing) {
+ new Future.sync(f).then(dart.as(nextIteration, dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: doneSignal[_completeError]});
} else {
- onTimeout = zone.registerUnaryCallback(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic")));
- let wrapper = new _ControllerEventSinkWrapper(null);
- timeout = (() => {
- wrapper[_sink] = controller;
- zone.runUnaryGuarded(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic")), wrapper);
- wrapper[_sink] = null;
- }).bind(this);
- }
- subscription = this.listen(onData, {onError: onError, onDone: onDone});
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
- }
- // Function onCancel: () → Future<dynamic>
- function onCancel() {
- timer.cancel();
- let result = subscription.cancel();
- subscription = null;
- return result;
- }
- controller = this.isBroadcast ? new _SyncBroadcastStreamController(onListen, onCancel) : new _SyncStreamController(onListen, (() => {
- timer.cancel();
- subscription.pause();
- }).bind(this), (() => {
- subscription.resume();
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void")));
- }).bind(this), onCancel);
- return controller.stream;
+ doneSignal._complete(null);
+ }
+ }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {runGuarded: true});
+ dart.dinvokef(nextIteration, true);
+ return doneSignal;
}
}
- dart.defineNamedConstructor(Stream, 'fromFuture');
- dart.defineNamedConstructor(Stream, 'fromIterable');
- dart.defineNamedConstructor(Stream, 'periodic');
- dart.defineNamedConstructor(Stream, 'eventTransformed');
- return Stream;
+ dart.defineNamedConstructor(Future, 'microtask');
+ dart.defineNamedConstructor(Future, 'sync');
+ dart.defineNamedConstructor(Future, 'value');
+ dart.defineNamedConstructor(Future, 'error');
+ dart.defineNamedConstructor(Future, 'delayed');
+ dart.defineLazyProperties(Future, {
+ get _nullFuture() {
+ return dart.as(new Future.value(null), _Future);
+ }
+ });
+ return Future;
});
- let Stream = Stream$(dart.dynamic);
- let StreamSubscription$ = dart.generic(function(T) {
- class StreamSubscription extends core.Object {
+ let Future = Future$(dart.dynamic);
+ class TimeoutException extends core.Object {
+ TimeoutException(message, duration) {
+ if (duration === void 0)
+ duration = null;
+ this.message = message;
+ this.duration = duration;
}
- return StreamSubscription;
- });
- let StreamSubscription = StreamSubscription$(dart.dynamic);
- let EventSink$ = dart.generic(function(T) {
- class EventSink extends core.Object {
+ toString() {
+ let result = "TimeoutException";
+ if (this.duration !== null)
+ result = `TimeoutException after ${this.duration}`;
+ if (this.message !== null)
+ result = `${result}: ${this.message}`;
+ return result;
}
- return EventSink;
- });
- let EventSink = EventSink$(dart.dynamic);
- let _stream = Symbol('_stream');
- let StreamView$ = dart.generic(function(T) {
- class StreamView extends Stream$(T) {
- StreamView($_stream) {
- this[_stream] = $_stream;
- super.Stream();
- }
- get isBroadcast() {
- return this[_stream].isBroadcast;
- }
- asBroadcastStream(opt$) {
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
- return this[_stream].asBroadcastStream({onListen: onListen, onCancel: onCancel});
+ }
+ let Completer$ = dart.generic(function(T) {
+ class Completer extends core.Object {
+ Completer() {
+ return new _AsyncCompleter();
}
- listen(onData, opt$) {
- let onError = opt$.onError === void 0 ? null : opt$.onError;
- let onDone = opt$.onDone === void 0 ? null : opt$.onDone;
- let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError;
- return this[_stream].listen(onData, {onError: onError, onDone: onDone, cancelOnError: cancelOnError});
+ Completer$sync() {
+ return new _SyncCompleter();
}
}
- return StreamView;
+ dart.defineNamedConstructor(Completer, 'sync');
+ return Completer;
});
- let StreamView = StreamView$(dart.dynamic);
- let StreamConsumer$ = dart.generic(function(S) {
- class StreamConsumer extends core.Object {
+ let Completer = Completer$(dart.dynamic);
+ // Function _completeWithErrorCallback: (_Future<dynamic>, dynamic, dynamic) → void
+ function _completeWithErrorCallback(result, error, stackTrace) {
+ let replacement = Zone.current.errorCallback(error, dart.as(stackTrace, core.StackTrace));
+ if (replacement !== null) {
+ error = _nonNullError(replacement.error);
+ stackTrace = replacement.stackTrace;
}
- return StreamConsumer;
- });
- let StreamConsumer = StreamConsumer$(dart.dynamic);
- let StreamSink$ = dart.generic(function(S) {
- class StreamSink extends core.Object {
+ result._completeError(error, dart.as(stackTrace, core.StackTrace));
+ }
+ // Function _nonNullError: (Object) → Object
+ function _nonNullError(error) {
+ return error !== null ? error : new core.NullThrownError();
+ }
+ let _Completer$ = dart.generic(function(T) {
+ class _Completer extends core.Object {
+ _Completer() {
+ this.future = new _Future();
+ }
+ completeError(error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ error = _nonNullError(error);
+ if (!dart.notNull(this.future[_mayComplete]))
+ throw new core.StateError("Future already completed");
+ let replacement = Zone.current.errorCallback(error, stackTrace);
+ if (replacement !== null) {
+ error = _nonNullError(replacement.error);
+ stackTrace = replacement.stackTrace;
+ }
+ this[_completeError](error, stackTrace);
+ }
+ get isCompleted() {
+ return !dart.notNull(this.future[_mayComplete]);
+ }
}
- return StreamSink;
+ return _Completer;
});
- let StreamSink = StreamSink$(dart.dynamic);
- let StreamTransformer$ = dart.generic(function(S, T) {
- class StreamTransformer extends core.Object {
- StreamTransformer(transformer) {
- return new _StreamSubscriptionTransformer(transformer);
+ let _Completer = _Completer$(dart.dynamic);
+ let _AsyncCompleter$ = dart.generic(function(T) {
+ class _AsyncCompleter extends _Completer$(T) {
+ complete(value) {
+ if (value === void 0)
+ value = null;
+ if (!dart.notNull(this.future[_mayComplete]))
+ throw new core.StateError("Future already completed");
+ this.future._asyncComplete(value);
}
- StreamTransformer$fromHandlers(opt$) {
- return new _StreamHandlerTransformer(opt$);
+ [_completeError](error, stackTrace) {
+ this.future._asyncCompleteError(error, stackTrace);
}
}
- dart.defineNamedConstructor(StreamTransformer, 'fromHandlers');
- return StreamTransformer;
+ return _AsyncCompleter;
});
- let StreamTransformer = StreamTransformer$(dart.dynamic, dart.dynamic);
- let StreamIterator$ = dart.generic(function(T) {
- class StreamIterator extends core.Object {
- StreamIterator(stream) {
- return new _StreamIteratorImpl(stream);
+ let _AsyncCompleter = _AsyncCompleter$(dart.dynamic);
+ let _SyncCompleter$ = dart.generic(function(T) {
+ class _SyncCompleter extends _Completer$(T) {
+ complete(value) {
+ if (value === void 0)
+ value = null;
+ if (!dart.notNull(this.future[_mayComplete]))
+ throw new core.StateError("Future already completed");
+ this.future._complete(value);
+ }
+ [_completeError](error, stackTrace) {
+ this.future._completeError(error, stackTrace);
}
}
- return StreamIterator;
+ return _SyncCompleter;
});
- let StreamIterator = StreamIterator$(dart.dynamic);
- let _ControllerEventSinkWrapper$ = dart.generic(function(T) {
- class _ControllerEventSinkWrapper extends core.Object {
- _ControllerEventSinkWrapper($_sink) {
- this[_sink] = $_sink;
+ let _SyncCompleter = _SyncCompleter$(dart.dynamic);
+ let _nextListener = Symbol('_nextListener');
+ let _onValue = Symbol('_onValue');
+ let _errorTest = Symbol('_errorTest');
+ let _whenCompleteAction = Symbol('_whenCompleteAction');
+ class _FutureListener extends core.Object {
+ _FutureListener$then(result, onValue, errorCallback) {
+ this.result = result;
+ this.callback = onValue;
+ this.errorCallback = errorCallback;
+ this.state = errorCallback === null ? _FutureListener.STATE_THEN : _FutureListener.STATE_THEN_ONERROR;
+ this[_nextListener] = null;
+ }
+ _FutureListener$catchError(result, errorCallback, test) {
+ this.result = result;
+ this.errorCallback = errorCallback;
+ this.callback = test;
+ this.state = test === null ? _FutureListener.STATE_CATCHERROR : _FutureListener.STATE_CATCHERROR_TEST;
+ this[_nextListener] = null;
+ }
+ _FutureListener$whenComplete(result, onComplete) {
+ this.result = result;
+ this.callback = onComplete;
+ this.errorCallback = null;
+ this.state = _FutureListener.STATE_WHENCOMPLETE;
+ this[_nextListener] = null;
+ }
+ _FutureListener$chain(result) {
+ this.result = result;
+ this.callback = null;
+ this.errorCallback = null;
+ this.state = _FutureListener.STATE_CHAIN;
+ this[_nextListener] = null;
+ }
+ get [_zone]() {
+ return this.result[_zone];
+ }
+ get handlesValue() {
+ return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_VALUE)) !== 0;
+ }
+ get handlesError() {
+ return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_ERROR)) !== 0;
+ }
+ get hasErrorTest() {
+ return this.state === _FutureListener.STATE_CATCHERROR_TEST;
+ }
+ get handlesComplete() {
+ return this.state === _FutureListener.STATE_WHENCOMPLETE;
+ }
+ get [_onValue]() {
+ dart.assert(this.handlesValue);
+ return dart.as(this.callback, _FutureOnValue);
+ }
+ get [_onError]() {
+ return this.errorCallback;
+ }
+ get [_errorTest]() {
+ dart.assert(this.hasErrorTest);
+ return dart.as(this.callback, _FutureErrorTest);
+ }
+ get [_whenCompleteAction]() {
+ dart.assert(this.handlesComplete);
+ return dart.as(this.callback, _FutureAction);
+ }
+ }
+ dart.defineNamedConstructor(_FutureListener, 'then');
+ dart.defineNamedConstructor(_FutureListener, 'catchError');
+ dart.defineNamedConstructor(_FutureListener, 'whenComplete');
+ dart.defineNamedConstructor(_FutureListener, 'chain');
+ _FutureListener.MASK_VALUE = 1;
+ _FutureListener.MASK_ERROR = 2;
+ _FutureListener.MASK_TEST_ERROR = 4;
+ _FutureListener.MASK_WHENCOMPLETE = 8;
+ _FutureListener.STATE_CHAIN = 0;
+ _FutureListener.STATE_THEN = _FutureListener.MASK_VALUE;
+ _FutureListener.STATE_THEN_ONERROR = dart.notNull(_FutureListener.MASK_VALUE) | dart.notNull(_FutureListener.MASK_ERROR);
+ _FutureListener.STATE_CATCHERROR = _FutureListener.MASK_ERROR;
+ _FutureListener.STATE_CATCHERROR_TEST = dart.notNull(_FutureListener.MASK_ERROR) | dart.notNull(_FutureListener.MASK_TEST_ERROR);
+ _FutureListener.STATE_WHENCOMPLETE = _FutureListener.MASK_WHENCOMPLETE;
+ let _resultOrListeners = Symbol('_resultOrListeners');
+ let _asyncComplete = Symbol('_asyncComplete');
+ let _asyncCompleteError = Symbol('_asyncCompleteError');
+ let _isChained = Symbol('_isChained');
+ let _isComplete = Symbol('_isComplete');
+ let _hasValue = Symbol('_hasValue');
+ let _hasError = Symbol('_hasError');
+ let _markPendingCompletion = Symbol('_markPendingCompletion');
+ let _value = Symbol('_value');
+ let _error = Symbol('_error');
+ let _setValue = Symbol('_setValue');
+ let _setErrorObject = Symbol('_setErrorObject');
+ let _setError = Symbol('_setError');
+ let _removeListeners = Symbol('_removeListeners');
+ let _chainForeignFuture = Symbol('_chainForeignFuture');
+ let _chainCoreFuture = Symbol('_chainCoreFuture');
+ let _completeWithValue = Symbol('_completeWithValue');
+ let _propagateToListeners = Symbol('_propagateToListeners');
+ let _Future$ = dart.generic(function(T) {
+ class _Future extends core.Object {
+ _Future() {
+ this[_zone] = Zone.current;
+ this[_state] = _Future._INCOMPLETE;
+ this[_resultOrListeners] = null;
}
- add(data) {
- this[_sink].add(data);
+ _Future$immediate(value) {
+ this[_zone] = Zone.current;
+ this[_state] = _Future._INCOMPLETE;
+ this[_resultOrListeners] = null;
+ this[_asyncComplete](value);
}
- addError(error, stackTrace) {
+ _Future$immediateError(error, stackTrace) {
if (stackTrace === void 0)
stackTrace = null;
- this[_sink].addError(error, stackTrace);
- }
- close() {
- this[_sink].close();
+ this[_zone] = Zone.current;
+ this[_state] = _Future._INCOMPLETE;
+ this[_resultOrListeners] = null;
+ this[_asyncCompleteError](error, stackTrace);
}
- }
- return _ControllerEventSinkWrapper;
- });
- let _ControllerEventSinkWrapper = _ControllerEventSinkWrapper$(dart.dynamic);
- let StreamController$ = dart.generic(function(T) {
- class StreamController extends core.Object {
- StreamController(opt$) {
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
- let onPause = opt$.onPause === void 0 ? null : opt$.onPause;
- let onResume = opt$.onResume === void 0 ? null : opt$.onResume;
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
- let sync = opt$.sync === void 0 ? false : opt$.sync;
- if (dart.notNull(onListen === null) && dart.notNull(onPause === null) && dart.notNull(onResume === null) && dart.notNull(onCancel === null)) {
- return dart.as(sync ? new _NoCallbackSyncStreamController() : new _NoCallbackAsyncStreamController(), StreamController$(T));
- }
- return sync ? new _SyncStreamController(onListen, onPause, onResume, onCancel) : new _AsyncStreamController(onListen, onPause, onResume, onCancel);
+ get [_mayComplete]() {
+ return this[_state] === _Future._INCOMPLETE;
}
- StreamController$broadcast(opt$) {
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
- let sync = opt$.sync === void 0 ? false : opt$.sync;
- return sync ? new _SyncBroadcastStreamController(onListen, onCancel) : new _AsyncBroadcastStreamController(onListen, onCancel);
+ get [_isChained]() {
+ return this[_state] === _Future._CHAINED;
}
- }
- dart.defineNamedConstructor(StreamController, 'broadcast');
- return StreamController;
- });
- let StreamController = StreamController$(dart.dynamic);
- let _StreamControllerLifecycle$ = dart.generic(function(T) {
- class _StreamControllerLifecycle extends core.Object {
- [_recordPause](subscription) {}
- [_recordResume](subscription) {}
- [_recordCancel](subscription) {
- return null;
+ get [_isComplete]() {
+ return dart.notNull(this[_state]) >= dart.notNull(_Future._VALUE);
}
- }
- return _StreamControllerLifecycle;
- });
- let _StreamControllerLifecycle = _StreamControllerLifecycle$(dart.dynamic);
- let _varData = Symbol('_varData');
- let _isCanceled = Symbol('_isCanceled');
- let _isInitialState = Symbol('_isInitialState');
- let _subscription = Symbol('_subscription');
- let _isInputPaused = Symbol('_isInputPaused');
- let _pendingEvents = Symbol('_pendingEvents');
- let _ensurePendingEvents = Symbol('_ensurePendingEvents');
- let _badEventState = Symbol('_badEventState');
- let _nullFuture = Symbol('_nullFuture');
- let _closeUnchecked = Symbol('_closeUnchecked');
- let _StreamController$ = dart.generic(function(T) {
- class _StreamController extends core.Object {
- _StreamController() {
- this[_varData] = null;
- this[_state] = _StreamController._STATE_INITIAL;
- this[_doneFuture] = null;
+ get [_hasValue]() {
+ return this[_state] === _Future._VALUE;
}
- get stream() {
- return dart.as(new _ControllerStream(this), Stream$(T));
+ get [_hasError]() {
+ return this[_state] === _Future._ERROR;
}
- get sink() {
- return new _StreamSinkWrapper(this);
+ set [_isChained](value) {
+ if (value) {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ this[_state] = _Future._CHAINED;
+ } else {
+ dart.assert(this[_isChained]);
+ this[_state] = _Future._INCOMPLETE;
+ }
}
- get [_isCanceled]() {
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CANCELED)) !== 0;
+ then(f, opt$) {
+ let onError = opt$.onError === void 0 ? null : opt$.onError;
+ let result = new _Future();
+ if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) {
+ f = result[_zone].registerUnaryCallback(dart.as(f, dart.throw_("Unimplemented type (dynamic) → dynamic")));
+ if (onError !== null) {
+ onError = _registerErrorHandler(onError, result[_zone]);
+ }
+ }
+ this[_addListener](new _FutureListener.then(result, dart.as(f, _FutureOnValue), onError));
+ return result;
}
- get hasListener() {
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIBED)) !== 0;
+ catchError(onError, opt$) {
+ let test = opt$.test === void 0 ? null : opt$.test;
+ let result = new _Future();
+ if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) {
+ onError = _registerErrorHandler(onError, result[_zone]);
+ if (test !== null)
+ test = dart.as(result[_zone].registerUnaryCallback(test), dart.throw_("Unimplemented type (dynamic) → bool"));
+ }
+ this[_addListener](new _FutureListener.catchError(result, onError, test));
+ return result;
}
- get [_isInitialState]() {
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIPTION_MASK)) === _StreamController._STATE_INITIAL;
+ whenComplete(action) {
+ let result = new _Future();
+ if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) {
+ action = result[_zone].registerCallback(action);
+ }
+ this[_addListener](new _FutureListener.whenComplete(result, action));
+ return dart.as(result, Future$(T));
}
- get isClosed() {
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CLOSED)) !== 0;
+ asStream() {
+ return dart.as(new Stream.fromFuture(this), Stream$(T));
}
- get isPaused() {
- return this.hasListener ? this[_subscription][_isInputPaused] : !dart.notNull(this[_isCanceled]);
+ [_markPendingCompletion]() {
+ if (!dart.notNull(this[_mayComplete]))
+ throw new core.StateError("Future already completed");
+ this[_state] = _Future._PENDING_COMPLETE;
}
- get [_isAddingStream]() {
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_ADDSTREAM)) !== 0;
+ get [_value]() {
+ dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasValue]));
+ return dart.as(this[_resultOrListeners], T);
}
- get [_mayAddEvent]() {
- return dart.notNull(this[_state]) < dart.notNull(_StreamController._STATE_CLOSED);
+ get [_error]() {
+ dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasError]));
+ return dart.as(this[_resultOrListeners], AsyncError);
}
- get [_pendingEvents]() {
- dart.assert(this[_isInitialState]);
- if (!dart.notNull(this[_isAddingStream])) {
- return dart.as(this[_varData], _PendingEvents);
- }
- let state = dart.as(this[_varData], _StreamControllerAddStreamState);
- return dart.as(state.varData, _PendingEvents);
+ [_setValue](value) {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ this[_state] = _Future._VALUE;
+ this[_resultOrListeners] = value;
}
- [_ensurePendingEvents]() {
- dart.assert(this[_isInitialState]);
- if (!dart.notNull(this[_isAddingStream])) {
- if (this[_varData] === null)
- this[_varData] = new _StreamImplEvents();
- return dart.as(this[_varData], _StreamImplEvents);
- }
- let state = dart.as(this[_varData], _StreamControllerAddStreamState);
- if (state.varData === null)
- state.varData = new _StreamImplEvents();
- return dart.as(state.varData, _StreamImplEvents);
+ [_setErrorObject](error) {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ this[_state] = _Future._ERROR;
+ this[_resultOrListeners] = error;
}
- get [_subscription]() {
- dart.assert(this.hasListener);
- if (this[_isAddingStream]) {
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
- return dart.as(addState.varData, _ControllerSubscription);
- }
- return dart.as(this[_varData], _ControllerSubscription);
+ [_setError](error, stackTrace) {
+ this[_setErrorObject](new AsyncError(error, stackTrace));
}
- [_badEventState]() {
- if (this.isClosed) {
- return new core.StateError("Cannot add event after closing");
+ [_addListener](listener) {
+ dart.assert(listener[_nextListener] === null);
+ if (this[_isComplete]) {
+ this[_zone].scheduleMicrotask((() => {
+ _propagateToListeners(this, listener);
+ }).bind(this));
+ } else {
+ listener[_nextListener] = dart.as(this[_resultOrListeners], _FutureListener);
+ this[_resultOrListeners] = listener;
}
- dart.assert(this[_isAddingStream]);
- return new core.StateError("Cannot add event while adding a stream");
- }
- addStream(source, opt$) {
- let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError;
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_badEventState]();
- if (this[_isCanceled])
- return new _Future.immediate(null);
- let addState = new _StreamControllerAddStreamState(this, this[_varData], source, cancelOnError);
- this[_varData] = addState;
- this[_state] = _StreamController._STATE_ADDSTREAM;
- return addState.addStreamFuture;
}
- get done() {
- return this[_ensureDoneFuture]();
- }
- [_ensureDoneFuture]() {
- if (this[_doneFuture] === null) {
- this[_doneFuture] = this[_isCanceled] ? Future[_nullFuture] : new _Future();
+ [_removeListeners]() {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ let current = dart.as(this[_resultOrListeners], _FutureListener);
+ this[_resultOrListeners] = null;
+ let prev = null;
+ while (current !== null) {
+ let next = current[_nextListener];
+ current[_nextListener] = prev;
+ prev = current;
+ current = next;
}
- return this[_doneFuture];
- }
- add(value) {
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_badEventState]();
- this[_add](value);
+ return prev;
}
- addError(error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- error = _nonNullError(error);
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_badEventState]();
- let replacement = Zone.current.errorCallback(error, stackTrace);
- if (replacement !== null) {
- error = _nonNullError(replacement.error);
- stackTrace = replacement.stackTrace;
- }
- this[_addError](error, stackTrace);
+ static [_chainForeignFuture](source, target) {
+ dart.assert(!dart.notNull(target[_isComplete]));
+ dart.assert(!dart.is(source, _Future));
+ target[_isChained] = true;
+ source.then(((value) => {
+ dart.assert(target[_isChained]);
+ target._completeWithValue(value);
+ }).bind(this), {onError: ((error, stackTrace) => {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ dart.assert(target[_isChained]);
+ target._completeError(error, dart.as(stackTrace, core.StackTrace));
+ }).bind(this)});
}
- close() {
- if (this.isClosed) {
- return this[_ensureDoneFuture]();
+ static [_chainCoreFuture](source, target) {
+ dart.assert(!dart.notNull(target[_isComplete]));
+ dart.assert(dart.is(source, _Future));
+ target[_isChained] = true;
+ let listener = new _FutureListener.chain(target);
+ if (source[_isComplete]) {
+ _propagateToListeners(source, listener);
+ } else {
+ source._addListener(listener);
}
- if (!dart.notNull(this[_mayAddEvent]))
- throw this[_badEventState]();
- this[_closeUnchecked]();
- return this[_ensureDoneFuture]();
}
- [_closeUnchecked]() {
- this[_state] = _StreamController._STATE_CLOSED;
- if (this.hasListener) {
- this[_sendDone]();
- } else if (this[_isInitialState]) {
- this[_ensurePendingEvents]().add(new _DelayedDone());
+ [_complete](value) {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ if (dart.is(value, Future)) {
+ if (dart.is(value, _Future)) {
+ _chainCoreFuture(dart.as(value, _Future), this);
+ } else {
+ _chainForeignFuture(dart.as(value, Future), this);
+ }
+ } else {
+ let listeners = this[_removeListeners]();
+ this[_setValue](dart.as(value, T));
+ _propagateToListeners(this, listeners);
}
}
- [_add](value) {
- if (this.hasListener) {
- this[_sendData](value);
- } else if (this[_isInitialState]) {
- this[_ensurePendingEvents]().add(new _DelayedData(value));
- }
+ [_completeWithValue](value) {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ dart.assert(!dart.is(value, Future));
+ let listeners = this[_removeListeners]();
+ this[_setValue](dart.as(value, T));
+ _propagateToListeners(this, listeners);
}
- [_addError](error, stackTrace) {
- if (this.hasListener) {
- this[_sendError](error, stackTrace);
- } else if (this[_isInitialState]) {
- this[_ensurePendingEvents]().add(new _DelayedError(error, stackTrace));
+ [_completeError](error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ dart.assert(!dart.notNull(this[_isComplete]));
+ let listeners = this[_removeListeners]();
+ this[_setError](error, stackTrace);
+ _propagateToListeners(this, listeners);
+ }
+ [_asyncComplete](value) {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ if (value === null) {
+ } else if (dart.is(value, Future)) {
+ let typedFuture = dart.as(value, Future$(T));
+ if (dart.is(typedFuture, _Future)) {
+ let coreFuture = dart.as(typedFuture, _Future$(T));
+ if (dart.notNull(coreFuture[_isComplete]) && dart.notNull(coreFuture[_hasError])) {
+ this[_markPendingCompletion]();
+ this[_zone].scheduleMicrotask((() => {
+ _chainCoreFuture(coreFuture, this);
+ }).bind(this));
+ } else {
+ _chainCoreFuture(coreFuture, this);
+ }
+ } else {
+ _chainForeignFuture(typedFuture, this);
+ }
+ return;
+ } else {
+ let typedValue = dart.as(value, T);
}
+ this[_markPendingCompletion]();
+ this[_zone].scheduleMicrotask((() => {
+ this[_completeWithValue](value);
+ }).bind(this));
}
- [_close]() {
- dart.assert(this[_isAddingStream]);
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
- this[_varData] = addState.varData;
- this[_state] = ~dart.notNull(_StreamController._STATE_ADDSTREAM);
- addState.complete();
+ [_asyncCompleteError](error, stackTrace) {
+ dart.assert(!dart.notNull(this[_isComplete]));
+ this[_markPendingCompletion]();
+ this[_zone].scheduleMicrotask((() => {
+ this[_completeError](error, stackTrace);
+ }).bind(this));
}
- [_subscribe](onData, onError, onDone, cancelOnError) {
- if (!dart.notNull(this[_isInitialState])) {
- throw new core.StateError("Stream has already been listened to.");
- }
- let subscription = new _ControllerSubscription(this, dart.as(onData, dart.throw_("Unimplemented type (dynamic) → void")), onError, onDone, cancelOnError);
- let pendingEvents = this[_pendingEvents];
- this[_state] = _StreamController._STATE_SUBSCRIBED;
- if (this[_isAddingStream]) {
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
- addState.varData = subscription;
- addState.resume();
- } else {
- this[_varData] = subscription;
+ static [_propagateToListeners](source, listeners) {
+ while (true) {
+ dart.assert(source[_isComplete]);
+ let hasError = source[_hasError];
+ if (listeners === null) {
+ if (hasError) {
+ let asyncError = source[_error];
+ source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace);
+ }
+ return;
+ }
+ while (listeners[_nextListener] !== null) {
+ let listener = listeners;
+ listeners = listener[_nextListener];
+ listener[_nextListener] = null;
+ _propagateToListeners(source, listener);
+ }
+ let listener = listeners;
+ let listenerHasValue = true;
+ let sourceValue = hasError ? null : source[_value];
+ let listenerValueOrError = sourceValue;
+ let isPropagationAborted = false;
+ if (dart.notNull(hasError) || dart.notNull(listener.handlesValue) || dart.notNull(listener.handlesComplete)) {
+ let zone = listener[_zone];
+ if (dart.notNull(hasError) && !dart.notNull(source[_zone].inSameErrorZone(zone))) {
+ let asyncError = source[_error];
+ source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace);
+ return;
+ }
+ let oldZone = null;
+ if (!dart.notNull(core.identical(Zone.current, zone))) {
+ oldZone = Zone._enter(zone);
+ }
+ // Function handleValueCallback: () → bool
+ function handleValueCallback() {
+ try {
+ listenerValueOrError = zone.runUnary(listener[_onValue], sourceValue);
+ return true;
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ listenerValueOrError = new AsyncError(e, s);
+ return false;
+ }
+
+ }
+ // Function handleError: () → void
+ function handleError() {
+ let asyncError = source[_error];
+ let matchesTest = true;
+ if (listener.hasErrorTest) {
+ let test = listener[_errorTest];
+ try {
+ matchesTest = dart.as(zone.runUnary(test, asyncError.error), core.bool);
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s);
+ listenerHasValue = false;
+ return;
+ }
+
+ }
+ let errorCallback = listener[_onError];
+ if (dart.notNull(matchesTest) && dart.notNull(errorCallback !== null)) {
+ try {
+ if (dart.is(errorCallback, ZoneBinaryCallback)) {
+ listenerValueOrError = zone.runBinary(errorCallback, asyncError.error, asyncError.stackTrace);
+ } else {
+ listenerValueOrError = zone.runUnary(dart.as(errorCallback, dart.throw_("Unimplemented type (dynamic) → dynamic")), asyncError.error);
+ }
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s);
+ listenerHasValue = false;
+ return;
+ }
+
+ listenerHasValue = true;
+ } else {
+ listenerValueOrError = asyncError;
+ listenerHasValue = false;
+ }
+ }
+ // Function handleWhenCompleteCallback: () → void
+ function handleWhenCompleteCallback() {
+ let completeResult = null;
+ try {
+ completeResult = zone.run(listener[_whenCompleteAction]);
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ if (dart.notNull(hasError) && dart.notNull(core.identical(source[_error].error, e))) {
+ listenerValueOrError = source[_error];
+ } else {
+ listenerValueOrError = new AsyncError(e, s);
+ }
+ listenerHasValue = false;
+ return;
+ }
+
+ if (dart.is(completeResult, Future)) {
+ let result = listener.result;
+ result[_isChained] = true;
+ isPropagationAborted = true;
+ dart.dinvoke(completeResult, 'then', (ignored) => {
+ _propagateToListeners(source, new _FutureListener.chain(result));
+ }, {
+ onError: (error, stackTrace) => {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ if (!dart.is(completeResult, _Future)) {
+ completeResult = new _Future();
+ dart.dinvoke(completeResult, '_setError', error, stackTrace);
+ }
+ _propagateToListeners(dart.as(completeResult, _Future), new _FutureListener.chain(result));
+ }
+ });
+ }
+ }
+ if (!dart.notNull(hasError)) {
+ if (listener.handlesValue) {
+ listenerHasValue = handleValueCallback();
+ }
+ } else {
+ handleError();
+ }
+ if (listener.handlesComplete) {
+ handleWhenCompleteCallback();
+ }
+ if (oldZone !== null)
+ Zone._leave(oldZone);
+ if (isPropagationAborted)
+ return;
+ if (dart.notNull(listenerHasValue) && !dart.notNull(core.identical(sourceValue, listenerValueOrError)) && dart.notNull(dart.is(listenerValueOrError, Future))) {
+ let chainSource = dart.as(listenerValueOrError, Future);
+ let result = listener.result;
+ if (dart.is(chainSource, _Future)) {
+ if (chainSource[_isComplete]) {
+ result[_isChained] = true;
+ source = chainSource;
+ listeners = new _FutureListener.chain(result);
+ continue;
+ } else {
+ _chainCoreFuture(chainSource, result);
+ }
+ } else {
+ _chainForeignFuture(chainSource, result);
+ }
+ return;
+ }
+ }
+ let result = listener.result;
+ listeners = result._removeListeners();
+ if (listenerHasValue) {
+ result._setValue(listenerValueOrError);
+ } else {
+ let asyncError = dart.as(listenerValueOrError, AsyncError);
+ result._setErrorObject(asyncError);
+ }
+ source = result;
}
- subscription._setPendingEvents(pendingEvents);
- subscription._guardCallback((() => {
- _runGuarded(this[_onListen]);
- }).bind(this));
- return dart.as(subscription, StreamSubscription$(T));
}
- [_recordCancel](subscription) {
- let result = null;
- if (this[_isAddingStream]) {
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
- result = addState.cancel();
- }
- this[_varData] = null;
- this[_state] = dart.notNull(this[_state]) & ~(dart.notNull(_StreamController._STATE_SUBSCRIBED) | dart.notNull(_StreamController._STATE_ADDSTREAM)) | dart.notNull(_StreamController._STATE_CANCELED);
- if (this[_onCancel] !== null) {
- if (result === null) {
+ timeout(timeLimit, opt$) {
+ let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout;
+ if (this[_isComplete])
+ return new _Future.immediate(this);
+ let result = new _Future();
+ let timer = null;
+ if (onTimeout === null) {
+ timer = new Timer(timeLimit, (() => {
+ result._completeError(new TimeoutException("Future not completed", timeLimit));
+ }).bind(this));
+ } else {
+ let zone = Zone.current;
+ onTimeout = zone.registerCallback(onTimeout);
+ timer = new Timer(timeLimit, (() => {
try {
- result = dart.as(this[_onCancel](), Future);
+ result._complete(zone.run(onTimeout));
} catch (e) {
let s = dart.stackTrace(e);
- result = ((_) => {
- _._asyncCompleteError(e, s);
- return _;
- }).bind(this)(new _Future());
+ result._completeError(e, s);
}
- } else {
- result = result.whenComplete(this[_onCancel]);
- }
+ }).bind(this));
}
- // Function complete: () → void
- function complete() {
- if (dart.notNull(this[_doneFuture] !== null) && dart.notNull(this[_doneFuture][_mayComplete])) {
- this[_doneFuture]._asyncComplete(null);
+ this.then(((v) => {
+ if (timer.isActive) {
+ timer.cancel();
+ result._completeWithValue(v);
}
- }
- if (result !== null) {
- result = result.whenComplete(complete);
- } else {
- complete();
- }
+ }).bind(this), {onError: ((e, s) => {
+ if (timer.isActive) {
+ timer.cancel();
+ result._completeError(e, dart.as(s, core.StackTrace));
+ }
+ }).bind(this)});
return result;
}
- [_recordPause](subscription) {
- if (this[_isAddingStream]) {
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
- addState.pause();
+ }
+ dart.defineNamedConstructor(_Future, 'immediate');
+ dart.defineNamedConstructor(_Future, 'immediateError');
+ _Future._INCOMPLETE = 0;
+ _Future._PENDING_COMPLETE = 1;
+ _Future._CHAINED = 2;
+ _Future._VALUE = 4;
+ _Future._ERROR = 8;
+ return _Future;
+ });
+ let _Future = _Future$(dart.dynamic);
+ class _AsyncCallbackEntry extends core.Object {
+ _AsyncCallbackEntry(callback) {
+ this.callback = callback;
+ this.next = null;
+ }
+ }
+ exports._nextCallback = null;
+ exports._lastCallback = null;
+ exports._lastPriorityCallback = null;
+ exports._isInCallbackLoop = false;
+ // Function _asyncRunCallbackLoop: () → void
+ function _asyncRunCallbackLoop() {
+ while (exports._nextCallback !== null) {
+ exports._lastPriorityCallback = null;
+ let entry = exports._nextCallback;
+ exports._nextCallback = entry.next;
+ if (exports._nextCallback === null)
+ exports._lastCallback = null;
+ entry.callback();
+ }
+ }
+ // Function _asyncRunCallback: () → void
+ function _asyncRunCallback() {
+ exports._isInCallbackLoop = true;
+ try {
+ _asyncRunCallbackLoop();
+ } finally {
+ exports._lastPriorityCallback = null;
+ exports._isInCallbackLoop = false;
+ if (exports._nextCallback !== null)
+ _AsyncRun._scheduleImmediate(_asyncRunCallback);
+ }
+ }
+ // Function _scheduleAsyncCallback: (dynamic) → void
+ function _scheduleAsyncCallback(callback) {
+ if (exports._nextCallback === null) {
+ exports._nextCallback = exports._lastCallback = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback));
+ if (!dart.notNull(exports._isInCallbackLoop)) {
+ _AsyncRun._scheduleImmediate(_asyncRunCallback);
+ }
+ } else {
+ let newEntry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback));
+ exports._lastCallback.next = newEntry;
+ exports._lastCallback = newEntry;
+ }
+ }
+ // Function _schedulePriorityAsyncCallback: (dynamic) → void
+ function _schedulePriorityAsyncCallback(callback) {
+ let entry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback));
+ if (exports._nextCallback === null) {
+ _scheduleAsyncCallback(callback);
+ exports._lastPriorityCallback = exports._lastCallback;
+ } else if (exports._lastPriorityCallback === null) {
+ entry.next = exports._nextCallback;
+ exports._nextCallback = exports._lastPriorityCallback = entry;
+ } else {
+ entry.next = exports._lastPriorityCallback.next;
+ exports._lastPriorityCallback.next = entry;
+ exports._lastPriorityCallback = entry;
+ if (entry.next === null) {
+ exports._lastCallback = entry;
+ }
+ }
+ }
+ // Function scheduleMicrotask: (() → void) → void
+ function scheduleMicrotask(callback) {
+ if (core.identical(_ROOT_ZONE, Zone.current)) {
+ _rootScheduleMicrotask(null, null, dart.as(_ROOT_ZONE, Zone), callback);
+ return;
+ }
+ Zone.current.scheduleMicrotask(Zone.current.bindCallback(callback, {runGuarded: true}));
+ }
+ let _scheduleImmediate = Symbol('_scheduleImmediate');
+ let _initializeScheduleImmediate = Symbol('_initializeScheduleImmediate');
+ let _scheduleImmediateJsOverride = Symbol('_scheduleImmediateJsOverride');
+ let _scheduleImmediateWithSetImmediate = Symbol('_scheduleImmediateWithSetImmediate');
+ let _scheduleImmediateWithTimer = Symbol('_scheduleImmediateWithTimer');
+ class _AsyncRun extends core.Object {
+ static [_scheduleImmediate](callback) {
+ dart.dinvokef(scheduleImmediateClosure, callback);
+ }
+ static [_initializeScheduleImmediate]() {
+ _js_helper.requiresPreamble();
+ if (self.scheduleImmediate !== null) {
+ return _scheduleImmediateJsOverride;
+ }
+ if (dart.notNull(self.MutationObserver !== null) && dart.notNull(self.document !== null)) {
+ let div = self.document.createElement("div");
+ let span = self.document.createElement("span");
+ let storedCallback = null;
+ // Function internalCallback: (dynamic) → dynamic
+ function internalCallback(_) {
+ _isolate_helper.leaveJsAsync();
+ let f = storedCallback;
+ storedCallback = null;
+ dart.dinvokef(f);
}
- _runGuarded(this[_onPause]);
+ ;
+ let observer = new self.MutationObserver(_js_helper.convertDartClosureToJS(internalCallback, 1));
+ observer.observe(div, {childList: true});
+ return (callback) => {
+ dart.assert(storedCallback === null);
+ _isolate_helper.enterJsAsync();
+ storedCallback = callback;
+ div.firstChild ? div.removeChild(span) : div.appendChild(span);
+ };
+ } else if (self.setImmediate !== null) {
+ return _scheduleImmediateWithSetImmediate;
+ }
+ return _scheduleImmediateWithTimer;
+ }
+ static [_scheduleImmediateJsOverride](callback) {
+ // Function internalCallback: () → dynamic
+ function internalCallback() {
+ _isolate_helper.leaveJsAsync();
+ callback();
+ }
+ ;
+ _isolate_helper.enterJsAsync();
+ self.scheduleImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0));
+ }
+ static [_scheduleImmediateWithSetImmediate](callback) {
+ // Function internalCallback: () → dynamic
+ function internalCallback() {
+ _isolate_helper.leaveJsAsync();
+ callback();
+ }
+ ;
+ _isolate_helper.enterJsAsync();
+ self.setImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0));
+ }
+ static [_scheduleImmediateWithTimer](callback) {
+ Timer._createTimer(core.Duration.ZERO, callback);
+ }
+ }
+ dart.defineLazyProperties(_AsyncRun, {
+ get scheduleImmediateClosure() {
+ return _initializeScheduleImmediate();
+ }
+ });
+ let StreamSubscription$ = dart.generic(function(T) {
+ class StreamSubscription extends core.Object {
+ }
+ return StreamSubscription;
+ });
+ let StreamSubscription = StreamSubscription$(dart.dynamic);
+ let EventSink$ = dart.generic(function(T) {
+ class EventSink extends core.Object {
+ }
+ return EventSink;
+ });
+ let EventSink = EventSink$(dart.dynamic);
+ let _stream = Symbol('_stream');
+ let StreamView$ = dart.generic(function(T) {
+ class StreamView extends Stream$(T) {
+ StreamView($_stream) {
+ this[_stream] = $_stream;
+ super.Stream();
+ }
+ get isBroadcast() {
+ return this[_stream].isBroadcast;
+ }
+ asBroadcastStream(opt$) {
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
+ return this[_stream].asBroadcastStream({onListen: onListen, onCancel: onCancel});
}
- [_recordResume](subscription) {
- if (this[_isAddingStream]) {
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
- addState.resume();
- }
- _runGuarded(this[_onResume]);
+ listen(onData, opt$) {
+ let onError = opt$.onError === void 0 ? null : opt$.onError;
+ let onDone = opt$.onDone === void 0 ? null : opt$.onDone;
+ let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError;
+ return this[_stream].listen(onData, {onError: onError, onDone: onDone, cancelOnError: cancelOnError});
}
}
- _StreamController._STATE_INITIAL = 0;
- _StreamController._STATE_SUBSCRIBED = 1;
- _StreamController._STATE_CANCELED = 2;
- _StreamController._STATE_SUBSCRIPTION_MASK = 3;
- _StreamController._STATE_CLOSED = 4;
- _StreamController._STATE_ADDSTREAM = 8;
- return _StreamController;
+ return StreamView;
});
- let _StreamController = _StreamController$(dart.dynamic);
- let _SyncStreamControllerDispatch$ = dart.generic(function(T) {
- class _SyncStreamControllerDispatch extends core.Object {
- [_sendData](data) {
- this[_subscription]._add(data);
+ let StreamView = StreamView$(dart.dynamic);
+ let StreamConsumer$ = dart.generic(function(S) {
+ class StreamConsumer extends core.Object {
+ }
+ return StreamConsumer;
+ });
+ let StreamConsumer = StreamConsumer$(dart.dynamic);
+ let StreamSink$ = dart.generic(function(S) {
+ class StreamSink extends core.Object {
+ }
+ return StreamSink;
+ });
+ let StreamSink = StreamSink$(dart.dynamic);
+ let StreamTransformer$ = dart.generic(function(S, T) {
+ class StreamTransformer extends core.Object {
+ StreamTransformer(transformer) {
+ return new _StreamSubscriptionTransformer(transformer);
}
- [_sendError](error, stackTrace) {
- this[_subscription]._addError(error, stackTrace);
+ StreamTransformer$fromHandlers(opt$) {
+ return new _StreamHandlerTransformer(opt$);
}
- [_sendDone]() {
- this[_subscription]._close();
+ }
+ dart.defineNamedConstructor(StreamTransformer, 'fromHandlers');
+ return StreamTransformer;
+ });
+ let StreamTransformer = StreamTransformer$(dart.dynamic, dart.dynamic);
+ let StreamIterator$ = dart.generic(function(T) {
+ class StreamIterator extends core.Object {
+ StreamIterator(stream) {
+ return new _StreamIteratorImpl(stream);
}
}
- return _SyncStreamControllerDispatch;
+ return StreamIterator;
});
- let _SyncStreamControllerDispatch = _SyncStreamControllerDispatch$(dart.dynamic);
- let _AsyncStreamControllerDispatch$ = dart.generic(function(T) {
- class _AsyncStreamControllerDispatch extends core.Object {
- [_sendData](data) {
- this[_subscription]._addPending(new _DelayedData(data));
+ let StreamIterator = StreamIterator$(dart.dynamic);
+ let _ControllerEventSinkWrapper$ = dart.generic(function(T) {
+ class _ControllerEventSinkWrapper extends core.Object {
+ _ControllerEventSinkWrapper($_sink) {
+ this[_sink] = $_sink;
}
- [_sendError](error, stackTrace) {
- this[_subscription]._addPending(new _DelayedError(error, stackTrace));
+ add(data) {
+ this[_sink].add(data);
}
- [_sendDone]() {
- this[_subscription]._addPending(new _DelayedDone());
+ addError(error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ this[_sink].addError(error, stackTrace);
+ }
+ close() {
+ this[_sink].close();
}
}
- return _AsyncStreamControllerDispatch;
+ return _ControllerEventSinkWrapper;
});
- let _AsyncStreamControllerDispatch = _AsyncStreamControllerDispatch$(dart.dynamic);
- let _AsyncStreamController$ = dart.generic(function(T) {
- class _AsyncStreamController extends dart.mixin(_StreamController$(T), _AsyncStreamControllerDispatch$(T)) {
- _AsyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) {
- this[_onListen] = $_onListen;
- this[_onPause] = $_onPause;
- this[_onResume] = $_onResume;
- this[_onCancel] = $_onCancel;
- super._StreamController();
+ let _ControllerEventSinkWrapper = _ControllerEventSinkWrapper$(dart.dynamic);
+ let StreamController$ = dart.generic(function(T) {
+ class StreamController extends core.Object {
+ StreamController(opt$) {
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
+ let onPause = opt$.onPause === void 0 ? null : opt$.onPause;
+ let onResume = opt$.onResume === void 0 ? null : opt$.onResume;
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
+ let sync = opt$.sync === void 0 ? false : opt$.sync;
+ if (dart.notNull(onListen === null) && dart.notNull(onPause === null) && dart.notNull(onResume === null) && dart.notNull(onCancel === null)) {
+ return dart.as(sync ? new _NoCallbackSyncStreamController() : new _NoCallbackAsyncStreamController(), StreamController$(T));
+ }
+ return sync ? new _SyncStreamController(onListen, onPause, onResume, onCancel) : new _AsyncStreamController(onListen, onPause, onResume, onCancel);
+ }
+ StreamController$broadcast(opt$) {
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen;
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel;
+ let sync = opt$.sync === void 0 ? false : opt$.sync;
+ return sync ? new _SyncBroadcastStreamController(onListen, onCancel) : new _AsyncBroadcastStreamController(onListen, onCancel);
}
}
- return _AsyncStreamController;
+ dart.defineNamedConstructor(StreamController, 'broadcast');
+ return StreamController;
});
- let _AsyncStreamController = _AsyncStreamController$(dart.dynamic);
- let _SyncStreamController$ = dart.generic(function(T) {
- class _SyncStreamController extends dart.mixin(_StreamController$(T), _SyncStreamControllerDispatch$(T)) {
- _SyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) {
- this[_onListen] = $_onListen;
- this[_onPause] = $_onPause;
- this[_onResume] = $_onResume;
- this[_onCancel] = $_onCancel;
- super._StreamController();
+ let StreamController = StreamController$(dart.dynamic);
+ let _StreamControllerLifecycle$ = dart.generic(function(T) {
+ class _StreamControllerLifecycle extends core.Object {
+ [_recordPause](subscription) {}
+ [_recordResume](subscription) {}
+ [_recordCancel](subscription) {
+ return null;
}
}
- return _SyncStreamController;
+ return _StreamControllerLifecycle;
});
- let _SyncStreamController = _SyncStreamController$(dart.dynamic);
- class _NoCallbacks extends core.Object {
- get [_onListen]() {
- return null;
- }
- get [_onPause]() {
- return null;
- }
- get [_onResume]() {
- return null;
- }
- get [_onCancel]() {
- return null;
- }
- }
- class _NoCallbackAsyncStreamController extends dart.mixin(_AsyncStreamControllerDispatch, _NoCallbacks) {
- }
- class _NoCallbackSyncStreamController extends dart.mixin(_SyncStreamControllerDispatch, _NoCallbacks) {
- }
- // Function _runGuarded: (() → dynamic) → Future<dynamic>
- function _runGuarded(notificationHandler) {
- if (notificationHandler === null)
- return null;
- try {
- let result = notificationHandler();
- if (dart.is(result, Future))
- return dart.as(result, Future);
- return null;
- } catch (e) {
- let s = dart.stackTrace(e);
- Zone.current.handleUncaughtError(e, s);
- }
-
- }
- let _createSubscription = Symbol('_createSubscription');
- let _ControllerStream$ = dart.generic(function(T) {
- class _ControllerStream extends _StreamImpl$(T) {
- _ControllerStream($_controller) {
- this[_controller] = $_controller;
- super._StreamImpl();
+ let _StreamControllerLifecycle = _StreamControllerLifecycle$(dart.dynamic);
+ let _varData = Symbol('_varData');
+ let _isInitialState = Symbol('_isInitialState');
+ let _subscription = Symbol('_subscription');
+ let _pendingEvents = Symbol('_pendingEvents');
+ let _ensurePendingEvents = Symbol('_ensurePendingEvents');
+ let _badEventState = Symbol('_badEventState');
+ let _nullFuture = Symbol('_nullFuture');
+ let _closeUnchecked = Symbol('_closeUnchecked');
+ let _StreamController$ = dart.generic(function(T) {
+ class _StreamController extends core.Object {
+ _StreamController() {
+ this[_varData] = null;
+ this[_state] = _StreamController._STATE_INITIAL;
+ this[_doneFuture] = null;
}
- [_createSubscription](onData, onError, onDone, cancelOnError) {
- return this[_controller]._subscribe(onData, onError, onDone, cancelOnError);
+ get stream() {
+ return dart.as(new _ControllerStream(this), Stream$(T));
}
- get hashCode() {
- return dart.notNull(this[_controller].hashCode) ^ 892482866;
+ get sink() {
+ return new _StreamSinkWrapper(this);
}
- ['=='](other) {
- if (core.identical(this, other))
- return true;
- if (!dart.is(other, _ControllerStream))
- return false;
- let otherStream = dart.as(other, _ControllerStream);
- return core.identical(otherStream[_controller], this[_controller]);
+ get [_isCanceled]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CANCELED)) !== 0;
}
- }
- return _ControllerStream;
- });
- let _ControllerStream = _ControllerStream$(dart.dynamic);
- let _ControllerSubscription$ = dart.generic(function(T) {
- class _ControllerSubscription extends _BufferingStreamSubscription$(T) {
- _ControllerSubscription($_controller, onData, onError, onDone, cancelOnError) {
- this[_controller] = $_controller;
- super._BufferingStreamSubscription(onData, onError, onDone, cancelOnError);
+ get hasListener() {
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIBED)) !== 0;
}
- [_onCancel]() {
- return this[_controller]._recordCancel(this);
+ get [_isInitialState]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIPTION_MASK)) === _StreamController._STATE_INITIAL;
}
- [_onPause]() {
- this[_controller]._recordPause(this);
+ get isClosed() {
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CLOSED)) !== 0;
}
- [_onResume]() {
- this[_controller]._recordResume(this);
+ get isPaused() {
+ return this.hasListener ? this[_subscription][_isInputPaused] : !dart.notNull(this[_isCanceled]);
}
- }
- return _ControllerSubscription;
- });
- let _ControllerSubscription = _ControllerSubscription$(dart.dynamic);
- let _target = Symbol('_target');
- let _StreamSinkWrapper$ = dart.generic(function(T) {
- class _StreamSinkWrapper extends core.Object {
- _StreamSinkWrapper($_target) {
- this[_target] = $_target;
+ get [_isAddingStream]() {
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_ADDSTREAM)) !== 0;
}
- add(data) {
- this[_target].add(data);
+ get [_mayAddEvent]() {
+ return dart.notNull(this[_state]) < dart.notNull(_StreamController._STATE_CLOSED);
+ }
+ get [_pendingEvents]() {
+ dart.assert(this[_isInitialState]);
+ if (!dart.notNull(this[_isAddingStream])) {
+ return dart.as(this[_varData], _PendingEvents);
+ }
+ let state = dart.as(this[_varData], _StreamControllerAddStreamState);
+ return dart.as(state.varData, _PendingEvents);
+ }
+ [_ensurePendingEvents]() {
+ dart.assert(this[_isInitialState]);
+ if (!dart.notNull(this[_isAddingStream])) {
+ if (this[_varData] === null)
+ this[_varData] = new _StreamImplEvents();
+ return dart.as(this[_varData], _StreamImplEvents);
+ }
+ let state = dart.as(this[_varData], _StreamControllerAddStreamState);
+ if (state.varData === null)
+ state.varData = new _StreamImplEvents();
+ return dart.as(state.varData, _StreamImplEvents);
}
- addError(error, stackTrace) {
- if (stackTrace === void 0)
- stackTrace = null;
- this[_target].addError(error, stackTrace);
+ get [_subscription]() {
+ dart.assert(this.hasListener);
+ if (this[_isAddingStream]) {
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
+ return dart.as(addState.varData, _ControllerSubscription);
+ }
+ return dart.as(this[_varData], _ControllerSubscription);
}
- close() {
- return this[_target].close();
+ [_badEventState]() {
+ if (this.isClosed) {
+ return new core.StateError("Cannot add event after closing");
+ }
+ dart.assert(this[_isAddingStream]);
+ return new core.StateError("Cannot add event while adding a stream");
}
addStream(source, opt$) {
let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError;
- return this[_target].addStream(source, {cancelOnError: cancelOnError});
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_badEventState]();
+ if (this[_isCanceled])
+ return new _Future.immediate(null);
+ let addState = new _StreamControllerAddStreamState(this, this[_varData], source, cancelOnError);
+ this[_varData] = addState;
+ this[_state] = _StreamController._STATE_ADDSTREAM;
+ return addState.addStreamFuture;
}
get done() {
- return this[_target].done;
- }
- }
- return _StreamSinkWrapper;
- });
- let _StreamSinkWrapper = _StreamSinkWrapper$(dart.dynamic);
- let _AddStreamState$ = dart.generic(function(T) {
- class _AddStreamState extends core.Object {
- _AddStreamState(controller, source, cancelOnError) {
- this.addStreamFuture = new _Future();
- this.addSubscription = source.listen(dart.as(controller[_add], dart.throw_("Unimplemented type (dynamic) → void")), {onError: dart.as(cancelOnError ? makeErrorHandler(controller) : controller[_addError], core.Function), onDone: controller[_close], cancelOnError: cancelOnError});
- }
- static makeErrorHandler(controller) {
- return ((e, s) => {
- controller._addError(e, s);
- controller._close();
- }).bind(this);
- }
- pause() {
- this.addSubscription.pause();
- }
- resume() {
- this.addSubscription.resume();
+ return this[_ensureDoneFuture]();
}
- cancel() {
- let cancel = this.addSubscription.cancel();
- if (cancel === null) {
- this.addStreamFuture._asyncComplete(null);
- return null;
+ [_ensureDoneFuture]() {
+ if (this[_doneFuture] === null) {
+ this[_doneFuture] = this[_isCanceled] ? Future[_nullFuture] : new _Future();
}
- return cancel.whenComplete((() => {
- this.addStreamFuture._asyncComplete(null);
- }).bind(this));
+ return this[_doneFuture];
}
- complete() {
- this.addStreamFuture._asyncComplete(null);
+ add(value) {
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_badEventState]();
+ this[_add](value);
}
- }
- return _AddStreamState;
- });
- let _AddStreamState = _AddStreamState$(dart.dynamic);
- let _StreamControllerAddStreamState$ = dart.generic(function(T) {
- class _StreamControllerAddStreamState extends _AddStreamState$(T) {
- _StreamControllerAddStreamState(controller, varData, source, cancelOnError) {
- this.varData = varData;
- super._AddStreamState(dart.as(controller, _EventSink$(T)), source, cancelOnError);
- if (controller.isPaused) {
- this.addSubscription.pause();
+ addError(error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ error = _nonNullError(error);
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_badEventState]();
+ let replacement = Zone.current.errorCallback(error, stackTrace);
+ if (replacement !== null) {
+ error = _nonNullError(replacement.error);
+ stackTrace = replacement.stackTrace;
}
+ this[_addError](error, stackTrace);
}
- }
- return _StreamControllerAddStreamState;
- });
- let _StreamControllerAddStreamState = _StreamControllerAddStreamState$(dart.dynamic);
- let _EventSink$ = dart.generic(function(T) {
- class _EventSink extends core.Object {
- }
- return _EventSink;
- });
- let _EventSink = _EventSink$(dart.dynamic);
- let _EventDispatch$ = dart.generic(function(T) {
- class _EventDispatch extends core.Object {
- }
- return _EventDispatch;
- });
- let _EventDispatch = _EventDispatch$(dart.dynamic);
- let _onData = Symbol('_onData');
- let _onDone = Symbol('_onDone');
- let _cancelFuture = Symbol('_cancelFuture');
- let _setPendingEvents = Symbol('_setPendingEvents');
- let _extractPending = Symbol('_extractPending');
- let _isPaused = Symbol('_isPaused');
- let _inCallback = Symbol('_inCallback');
- let _guardCallback = Symbol('_guardCallback');
- let _decrementPauseCount = Symbol('_decrementPauseCount');
- let _mayResumeInput = Symbol('_mayResumeInput');
- let _cancel = Symbol('_cancel');
- let _isClosed = Symbol('_isClosed');
- let _waitsForCancel = Symbol('_waitsForCancel');
- let _canFire = Symbol('_canFire');
- let _cancelOnError = Symbol('_cancelOnError');
- let _incrementPauseCount = Symbol('_incrementPauseCount');
- let _addPending = Symbol('_addPending');
- let _checkState = Symbol('_checkState');
- let _BufferingStreamSubscription$ = dart.generic(function(T) {
- class _BufferingStreamSubscription extends core.Object {
- _BufferingStreamSubscription(onData, onError, onDone, cancelOnError) {
- this[_zone] = Zone.current;
- this[_state] = cancelOnError ? _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR : 0;
- this[_onData] = null;
- this[_onError] = null;
- this[_onDone] = null;
- this[_cancelFuture] = null;
- this[_pending] = null;
- this.onData(onData);
- this.onError(onError);
- this.onDone(onDone);
- }
- [_setPendingEvents](pendingEvents) {
- dart.assert(this[_pending] === null);
- if (pendingEvents === null)
- return;
- this[_pending] = pendingEvents;
- if (!dart.notNull(pendingEvents.isEmpty)) {
- this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING;
- this[_pending].schedule(this);
+ close() {
+ if (this.isClosed) {
+ return this[_ensureDoneFuture]();
}
+ if (!dart.notNull(this[_mayAddEvent]))
+ throw this[_badEventState]();
+ this[_closeUnchecked]();
+ return this[_ensureDoneFuture]();
}
- [_extractPending]() {
- dart.assert(this[_isCanceled]);
- let events = this[_pending];
- this[_pending] = null;
- return events;
+ [_closeUnchecked]() {
+ this[_state] = _StreamController._STATE_CLOSED;
+ if (this.hasListener) {
+ this[_sendDone]();
+ } else if (this[_isInitialState]) {
+ this[_ensurePendingEvents]().add(new _DelayedDone());
+ }
}
- onData(handleData) {
- if (handleData === null)
- handleData = _nullDataHandler;
- this[_onData] = this[_zone].registerUnaryCallback(dart.as(handleData, dart.throw_("Unimplemented type (dynamic) → dynamic")));
+ [_add](value) {
+ if (this.hasListener) {
+ this[_sendData](value);
+ } else if (this[_isInitialState]) {
+ this[_ensurePendingEvents]().add(new _DelayedData(value));
+ }
}
- onError(handleError) {
- if (handleError === null)
- handleError = _nullErrorHandler;
- this[_onError] = _registerErrorHandler(handleError, this[_zone]);
+ [_addError](error, stackTrace) {
+ if (this.hasListener) {
+ this[_sendError](error, stackTrace);
+ } else if (this[_isInitialState]) {
+ this[_ensurePendingEvents]().add(new _DelayedError(error, stackTrace));
+ }
}
- onDone(handleDone) {
- if (handleDone === null)
- handleDone = _nullDoneHandler;
- this[_onDone] = this[_zone].registerCallback(handleDone);
+ [_close]() {
+ dart.assert(this[_isAddingStream]);
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
+ this[_varData] = addState.varData;
+ this[_state] = ~dart.notNull(_StreamController._STATE_ADDSTREAM);
+ addState.complete();
}
- pause(resumeSignal) {
- if (resumeSignal === void 0)
- resumeSignal = null;
- if (this[_isCanceled])
- return;
- let wasPaused = this[_isPaused];
- let wasInputPaused = this[_isInputPaused];
- this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
- if (resumeSignal !== null)
- resumeSignal.whenComplete(this.resume);
- if (!dart.notNull(wasPaused) && dart.notNull(this[_pending] !== null))
- this[_pending].cancelSchedule();
- if (!dart.notNull(wasInputPaused) && !dart.notNull(this[_inCallback]))
- this[_guardCallback](this[_onPause]);
+ [_subscribe](onData, onError, onDone, cancelOnError) {
+ if (!dart.notNull(this[_isInitialState])) {
+ throw new core.StateError("Stream has already been listened to.");
+ }
+ let subscription = new _ControllerSubscription(this, dart.as(onData, dart.throw_("Unimplemented type (dynamic) → void")), onError, onDone, cancelOnError);
+ let pendingEvents = this[_pendingEvents];
+ this[_state] = _StreamController._STATE_SUBSCRIBED;
+ if (this[_isAddingStream]) {
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
+ addState.varData = subscription;
+ addState.resume();
+ } else {
+ this[_varData] = subscription;
+ }
+ subscription._setPendingEvents(pendingEvents);
+ subscription._guardCallback((() => {
+ _runGuarded(this[_onListen]);
+ }).bind(this));
+ return dart.as(subscription, StreamSubscription$(T));
}
- resume() {
- if (this[_isCanceled])
- return;
- if (this[_isPaused]) {
- this[_decrementPauseCount]();
- if (!dart.notNull(this[_isPaused])) {
- if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_pending].isEmpty)) {
- this[_pending].schedule(this);
- } else {
- dart.assert(this[_mayResumeInput]);
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
- if (!dart.notNull(this[_inCallback]))
- this[_guardCallback](this[_onResume]);
+ [_recordCancel](subscription) {
+ let result = null;
+ if (this[_isAddingStream]) {
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
+ result = addState.cancel();
+ }
+ this[_varData] = null;
+ this[_state] = dart.notNull(this[_state]) & ~(dart.notNull(_StreamController._STATE_SUBSCRIBED) | dart.notNull(_StreamController._STATE_ADDSTREAM)) | dart.notNull(_StreamController._STATE_CANCELED);
+ if (this[_onCancel] !== null) {
+ if (result === null) {
+ try {
+ result = dart.as(this[_onCancel](), Future);
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ result = ((_) => {
+ _._asyncCompleteError(e, s);
+ return _;
+ }).bind(this)(new _Future());
}
+
+ } else {
+ result = result.whenComplete(this[_onCancel]);
}
}
- }
- cancel() {
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL);
- if (this[_isCanceled])
- return this[_cancelFuture];
- this[_cancel]();
- return this[_cancelFuture];
- }
- asFuture(futureValue) {
- if (futureValue === void 0)
- futureValue = null;
- let result = new _Future();
- this[_onDone] = (() => {
- result._complete(futureValue);
- }).bind(this);
- this[_onError] = ((error, stackTrace) => {
- this.cancel();
- result._completeError(error, dart.as(stackTrace, core.StackTrace));
- }).bind(this);
+ // Function complete: () → void
+ function complete() {
+ if (dart.notNull(this[_doneFuture] !== null) && dart.notNull(this[_doneFuture][_mayComplete])) {
+ this[_doneFuture]._asyncComplete(null);
+ }
+ }
+ if (result !== null) {
+ result = result.whenComplete(complete);
+ } else {
+ complete();
+ }
return result;
}
- get [_isInputPaused]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED)) !== 0;
- }
- get [_isClosed]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CLOSED)) !== 0;
- }
- get [_isCanceled]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCELED)) !== 0;
- }
- get [_waitsForCancel]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL)) !== 0;
- }
- get [_inCallback]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK)) !== 0;
+ [_recordPause](subscription) {
+ if (this[_isAddingStream]) {
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
+ addState.pause();
+ }
+ _runGuarded(this[_onPause]);
}
- get [_hasPending]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING)) !== 0;
+ [_recordResume](subscription) {
+ if (this[_isAddingStream]) {
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState);
+ addState.resume();
+ }
+ _runGuarded(this[_onResume]);
}
- get [_isPaused]() {
- return dart.notNull(this[_state]) >= dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT);
+ }
+ _StreamController._STATE_INITIAL = 0;
+ _StreamController._STATE_SUBSCRIBED = 1;
+ _StreamController._STATE_CANCELED = 2;
+ _StreamController._STATE_SUBSCRIPTION_MASK = 3;
+ _StreamController._STATE_CLOSED = 4;
+ _StreamController._STATE_ADDSTREAM = 8;
+ return _StreamController;
+ });
+ let _StreamController = _StreamController$(dart.dynamic);
+ let _SyncStreamControllerDispatch$ = dart.generic(function(T) {
+ class _SyncStreamControllerDispatch extends core.Object {
+ [_sendData](data) {
+ this[_subscription]._add(data);
}
- get [_canFire]() {
- return dart.notNull(this[_state]) < dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
+ [_sendError](error, stackTrace) {
+ this[_subscription]._addError(error, stackTrace);
}
- get [_mayResumeInput]() {
- return !dart.notNull(this[_isPaused]) && (dart.notNull(this[_pending] === null) || dart.notNull(this[_pending].isEmpty));
+ [_sendDone]() {
+ this[_subscription]._close();
}
- get [_cancelOnError]() {
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCEL_ON_ERROR)) !== 0;
+ }
+ return _SyncStreamControllerDispatch;
+ });
+ let _SyncStreamControllerDispatch = _SyncStreamControllerDispatch$(dart.dynamic);
+ let _AsyncStreamControllerDispatch$ = dart.generic(function(T) {
+ class _AsyncStreamControllerDispatch extends core.Object {
+ [_sendData](data) {
+ this[_subscription]._addPending(new _DelayedData(data));
}
- get isPaused() {
- return this[_isPaused];
+ [_sendError](error, stackTrace) {
+ this[_subscription]._addPending(new _DelayedError(error, stackTrace));
}
- [_cancel]() {
- this[_state] = _BufferingStreamSubscription._STATE_CANCELED;
- if (this[_hasPending]) {
- this[_pending].cancelSchedule();
- }
- if (!dart.notNull(this[_inCallback]))
- this[_pending] = null;
- this[_cancelFuture] = this[_onCancel]();
+ [_sendDone]() {
+ this[_subscription]._addPending(new _DelayedDone());
}
- [_incrementPauseCount]() {
- this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
+ }
+ return _AsyncStreamControllerDispatch;
+ });
+ let _AsyncStreamControllerDispatch = _AsyncStreamControllerDispatch$(dart.dynamic);
+ let _AsyncStreamController$ = dart.generic(function(T) {
+ class _AsyncStreamController extends dart.mixin(_StreamController$(T), _AsyncStreamControllerDispatch$(T)) {
+ _AsyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) {
+ this[_onListen] = $_onListen;
+ this[_onPause] = $_onPause;
+ this[_onResume] = $_onResume;
+ this[_onCancel] = $_onCancel;
+ super._StreamController();
}
- [_decrementPauseCount]() {
- dart.assert(this[_isPaused]);
- this[_state] = _BufferingStreamSubscription._STATE_PAUSE_COUNT;
+ }
+ return _AsyncStreamController;
+ });
+ let _AsyncStreamController = _AsyncStreamController$(dart.dynamic);
+ let _SyncStreamController$ = dart.generic(function(T) {
+ class _SyncStreamController extends dart.mixin(_StreamController$(T), _SyncStreamControllerDispatch$(T)) {
+ _SyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) {
+ this[_onListen] = $_onListen;
+ this[_onPause] = $_onPause;
+ this[_onResume] = $_onResume;
+ this[_onCancel] = $_onCancel;
+ super._StreamController();
}
- [_add](data) {
- dart.assert(!dart.notNull(this[_isClosed]));
- if (this[_isCanceled])
- return;
- if (this[_canFire]) {
- this[_sendData](data);
- } else {
- this[_addPending](new _DelayedData(data));
- }
+ }
+ return _SyncStreamController;
+ });
+ let _SyncStreamController = _SyncStreamController$(dart.dynamic);
+ class _NoCallbacks extends core.Object {
+ get [_onListen]() {
+ return null;
+ }
+ get [_onPause]() {
+ return null;
+ }
+ get [_onResume]() {
+ return null;
+ }
+ get [_onCancel]() {
+ return null;
+ }
+ }
+ class _NoCallbackAsyncStreamController extends dart.mixin(_AsyncStreamControllerDispatch, _NoCallbacks) {
+ }
+ class _NoCallbackSyncStreamController extends dart.mixin(_SyncStreamControllerDispatch, _NoCallbacks) {
+ }
+ // Function _runGuarded: (() → dynamic) → Future<dynamic>
+ function _runGuarded(notificationHandler) {
+ if (notificationHandler === null)
+ return null;
+ try {
+ let result = notificationHandler();
+ if (dart.is(result, Future))
+ return dart.as(result, Future);
+ return null;
+ } catch (e) {
+ let s = dart.stackTrace(e);
+ Zone.current.handleUncaughtError(e, s);
+ }
+
+ }
+ let _target = Symbol('_target');
+ let _StreamSinkWrapper$ = dart.generic(function(T) {
+ class _StreamSinkWrapper extends core.Object {
+ _StreamSinkWrapper($_target) {
+ this[_target] = $_target;
}
- [_addError](error, stackTrace) {
- if (this[_isCanceled])
- return;
- if (this[_canFire]) {
- this[_sendError](error, stackTrace);
- } else {
- this[_addPending](new _DelayedError(error, stackTrace));
- }
+ add(data) {
+ this[_target].add(data);
}
- [_close]() {
- dart.assert(!dart.notNull(this[_isClosed]));
- if (this[_isCanceled])
- return;
- this[_state] = _BufferingStreamSubscription._STATE_CLOSED;
- if (this[_canFire]) {
- this[_sendDone]();
- } else {
- this[_addPending](new _DelayedDone());
- }
+ addError(error, stackTrace) {
+ if (stackTrace === void 0)
+ stackTrace = null;
+ this[_target].addError(error, stackTrace);
}
- [_onPause]() {
- dart.assert(this[_isInputPaused]);
+ close() {
+ return this[_target].close();
}
- [_onResume]() {
- dart.assert(!dart.notNull(this[_isInputPaused]));
+ addStream(source, opt$) {
+ let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError;
+ return this[_target].addStream(source, {cancelOnError: cancelOnError});
}
- [_onCancel]() {
- dart.assert(this[_isCanceled]);
- return null;
+ get done() {
+ return this[_target].done;
}
- [_addPending](event) {
- let pending = dart.as(this[_pending], _StreamImplEvents);
- if (this[_pending] === null)
- pending = this[_pending] = new _StreamImplEvents();
- pending.add(event);
- if (!dart.notNull(this[_hasPending])) {
- this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING;
- if (!dart.notNull(this[_isPaused])) {
- this[_pending].schedule(this);
- }
- }
+ }
+ return _StreamSinkWrapper;
+ });
+ let _StreamSinkWrapper = _StreamSinkWrapper$(dart.dynamic);
+ let _AddStreamState$ = dart.generic(function(T) {
+ class _AddStreamState extends core.Object {
+ _AddStreamState(controller, source, cancelOnError) {
+ this.addStreamFuture = new _Future();
+ this.addSubscription = source.listen(dart.as(controller[_add], dart.throw_("Unimplemented type (dynamic) → void")), {onError: dart.as(cancelOnError ? makeErrorHandler(controller) : controller[_addError], core.Function), onDone: controller[_close], cancelOnError: cancelOnError});
}
- [_sendData](data) {
- dart.assert(!dart.notNull(this[_isCanceled]));
- dart.assert(!dart.notNull(this[_isPaused]));
- dart.assert(!dart.notNull(this[_inCallback]));
- let wasInputPaused = this[_isInputPaused];
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
- this[_zone].runUnaryGuarded(dart.as(this[_onData], dart.throw_("Unimplemented type (dynamic) → dynamic")), data);
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
- this[_checkState](wasInputPaused);
+ static makeErrorHandler(controller) {
+ return ((e, s) => {
+ controller._addError(e, s);
+ controller._close();
+ }).bind(this);
}
- [_sendError](error, stackTrace) {
- dart.assert(!dart.notNull(this[_isCanceled]));
- dart.assert(!dart.notNull(this[_isPaused]));
- dart.assert(!dart.notNull(this[_inCallback]));
- let wasInputPaused = this[_isInputPaused];
- // Function sendError: () → void
- function sendError() {
- if (dart.notNull(this[_isCanceled]) && !dart.notNull(this[_waitsForCancel]))
- return;
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
- if (dart.is(this[_onError], ZoneBinaryCallback)) {
- this[_zone].runBinaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic, dynamic) → dynamic")), error, stackTrace);
- } else {
- this[_zone].runUnaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic) → dynamic")), error);
- }
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
- }
- if (this[_cancelOnError]) {
- this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL;
- this[_cancel]();
- if (dart.is(this[_cancelFuture], Future)) {
- this[_cancelFuture].whenComplete(sendError);
- } else {
- sendError();
- }
- } else {
- sendError();
- this[_checkState](wasInputPaused);
- }
+ pause() {
+ this.addSubscription.pause();
}
- [_sendDone]() {
- dart.assert(!dart.notNull(this[_isCanceled]));
- dart.assert(!dart.notNull(this[_isPaused]));
- dart.assert(!dart.notNull(this[_inCallback]));
- // Function sendDone: () → void
- function sendDone() {
- if (!dart.notNull(this[_waitsForCancel]))
- return;
- this[_state] = dart.notNull(_BufferingStreamSubscription._STATE_CANCELED) | dart.notNull(_BufferingStreamSubscription._STATE_CLOSED) | dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
- this[_zone].runGuarded(this[_onDone]);
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
- }
- this[_cancel]();
- this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL;
- if (dart.is(this[_cancelFuture], Future)) {
- this[_cancelFuture].whenComplete(sendDone);
- } else {
- sendDone();
+ resume() {
+ this.addSubscription.resume();
+ }
+ cancel() {
+ let cancel = this.addSubscription.cancel();
+ if (cancel === null) {
+ this.addStreamFuture._asyncComplete(null);
+ return null;
}
+ return cancel.whenComplete((() => {
+ this.addStreamFuture._asyncComplete(null);
+ }).bind(this));
}
- [_guardCallback](callback) {
- dart.assert(!dart.notNull(this[_inCallback]));
- let wasInputPaused = this[_isInputPaused];
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
- dart.dinvokef(callback);
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
- this[_checkState](wasInputPaused);
+ complete() {
+ this.addStreamFuture._asyncComplete(null);
}
- [_checkState](wasInputPaused) {
- dart.assert(!dart.notNull(this[_inCallback]));
- if (dart.notNull(this[_hasPending]) && dart.notNull(this[_pending].isEmpty)) {
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING);
- if (dart.notNull(this[_isInputPaused]) && dart.notNull(this[_mayResumeInput])) {
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED);
- }
- }
- while (true) {
- if (this[_isCanceled]) {
- this[_pending] = null;
- return;
- }
- let isInputPaused = this[_isInputPaused];
- if (wasInputPaused === isInputPaused)
- break;
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK;
- if (isInputPaused) {
- this[_onPause]();
- } else {
- this[_onResume]();
- }
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK);
- wasInputPaused = isInputPaused;
- }
- if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_isPaused])) {
- this[_pending].schedule(this);
+ }
+ return _AddStreamState;
+ });
+ let _AddStreamState = _AddStreamState$(dart.dynamic);
+ let _StreamControllerAddStreamState$ = dart.generic(function(T) {
+ class _StreamControllerAddStreamState extends _AddStreamState$(T) {
+ _StreamControllerAddStreamState(controller, varData, source, cancelOnError) {
+ this.varData = varData;
+ super._AddStreamState(dart.as(controller, _EventSink$(T)), source, cancelOnError);
+ if (controller.isPaused) {
+ this.addSubscription.pause();
}
}
}
- _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR = 1;
- _BufferingStreamSubscription._STATE_CLOSED = 2;
- _BufferingStreamSubscription._STATE_INPUT_PAUSED = 4;
- _BufferingStreamSubscription._STATE_CANCELED = 8;
- _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL = 16;
- _BufferingStreamSubscription._STATE_IN_CALLBACK = 32;
- _BufferingStreamSubscription._STATE_HAS_PENDING = 64;
- _BufferingStreamSubscription._STATE_PAUSE_COUNT = 128;
- _BufferingStreamSubscription._STATE_PAUSE_COUNT_SHIFT = 7;
- return _BufferingStreamSubscription;
+ return _StreamControllerAddStreamState;
});
- let _BufferingStreamSubscription = _BufferingStreamSubscription$(dart.dynamic);
- let _StreamImpl$ = dart.generic(function(T) {
- class _StreamImpl extends Stream$(T) {
- listen(onData, opt$) {
- let onError = opt$.onError === void 0 ? null : opt$.onError;
- let onDone = opt$.onDone === void 0 ? null : opt$.onDone;
- let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError;
- cancelOnError = core.identical(true, cancelOnError);
- let subscription = this[_createSubscription](onData, onError, onDone, cancelOnError);
- this[_onListen](subscription);
- return dart.as(subscription, StreamSubscription$(T));
- }
- [_createSubscription](onData, onError, onDone, cancelOnError) {
- return new _BufferingStreamSubscription(onData, onError, onDone, cancelOnError);
- }
- [_onListen](subscription) {}
+ let _StreamControllerAddStreamState = _StreamControllerAddStreamState$(dart.dynamic);
+ let _EventSink$ = dart.generic(function(T) {
+ class _EventSink extends core.Object {
}
- return _StreamImpl;
+ return _EventSink;
});
- let _StreamImpl = _StreamImpl$(dart.dynamic);
+ let _EventSink = _EventSink$(dart.dynamic);
+ let _EventDispatch$ = dart.generic(function(T) {
+ class _EventDispatch extends core.Object {
+ }
+ return _EventDispatch;
+ });
+ let _EventDispatch = _EventDispatch$(dart.dynamic);
let _isUsed = Symbol('_isUsed');
let _GeneratedStreamImpl$ = dart.generic(function(T) {
class _GeneratedStreamImpl extends _StreamImpl$(T) {
@@ -3024,6 +3033,43 @@ var async;
});
let _GeneratedStreamImpl = _GeneratedStreamImpl$(dart.dynamic);
let _iterator = Symbol('_iterator');
+ let _eventScheduled = Symbol('_eventScheduled');
+ class _PendingEvents extends core.Object {
+ _PendingEvents() {
+ this[_state] = _PendingEvents._STATE_UNSCHEDULED;
+ }
+ get isScheduled() {
+ return this[_state] === _PendingEvents._STATE_SCHEDULED;
+ }
+ get [_eventScheduled]() {
+ return dart.notNull(this[_state]) >= dart.notNull(_PendingEvents._STATE_SCHEDULED);
+ }
+ schedule(dispatch) {
+ if (this.isScheduled)
+ return;
+ dart.assert(!dart.notNull(this.isEmpty));
+ if (this[_eventScheduled]) {
+ dart.assert(this[_state] === _PendingEvents._STATE_CANCELED);
+ this[_state] = _PendingEvents._STATE_SCHEDULED;
+ return;
+ }
+ scheduleMicrotask((() => {
+ let oldState = this[_state];
+ this[_state] = _PendingEvents._STATE_UNSCHEDULED;
+ if (oldState === _PendingEvents._STATE_CANCELED)
+ return;
+ this.handleNext(dispatch);
+ }).bind(this));
+ this[_state] = _PendingEvents._STATE_SCHEDULED;
+ }
+ cancelSchedule() {
+ if (this.isScheduled)
+ this[_state] = _PendingEvents._STATE_CANCELED;
+ }
+ }
+ _PendingEvents._STATE_UNSCHEDULED = 0;
+ _PendingEvents._STATE_SCHEDULED = 1;
+ _PendingEvents._STATE_CANCELED = 3;
let _IterablePendingEvents$ = dart.generic(function(T) {
class _IterablePendingEvents extends _PendingEvents {
_IterablePendingEvents(data) {
@@ -3116,43 +3162,6 @@ var async;
throw new core.StateError("No events after a done.");
}
}
- let _eventScheduled = Symbol('_eventScheduled');
- class _PendingEvents extends core.Object {
- _PendingEvents() {
- this[_state] = _PendingEvents._STATE_UNSCHEDULED;
- }
- get isScheduled() {
- return this[_state] === _PendingEvents._STATE_SCHEDULED;
- }
- get [_eventScheduled]() {
- return dart.notNull(this[_state]) >= dart.notNull(_PendingEvents._STATE_SCHEDULED);
- }
- schedule(dispatch) {
- if (this.isScheduled)
- return;
- dart.assert(!dart.notNull(this.isEmpty));
- if (this[_eventScheduled]) {
- dart.assert(this[_state] === _PendingEvents._STATE_CANCELED);
- this[_state] = _PendingEvents._STATE_SCHEDULED;
- return;
- }
- scheduleMicrotask((() => {
- let oldState = this[_state];
- this[_state] = _PendingEvents._STATE_UNSCHEDULED;
- if (oldState === _PendingEvents._STATE_CANCELED)
- return;
- this.handleNext(dispatch);
- }).bind(this));
- this[_state] = _PendingEvents._STATE_SCHEDULED;
- }
- cancelSchedule() {
- if (this.isScheduled)
- this[_state] = _PendingEvents._STATE_CANCELED;
- }
- }
- _PendingEvents._STATE_UNSCHEDULED = 0;
- _PendingEvents._STATE_SCHEDULED = 1;
- _PendingEvents._STATE_CANCELED = 3;
class _StreamImplEvents extends _PendingEvents {
_StreamImplEvents() {
this.firstPendingEvent = null;
@@ -4167,15 +4176,6 @@ var async;
}
}
dart.defineNamedConstructor(Timer, 'periodic');
- class AsyncError extends core.Object {
- AsyncError(error, stackTrace) {
- this.error = error;
- this.stackTrace = stackTrace;
- }
- toString() {
- return dart.as(dart.dinvoke(this.error, 'toString'), core.String);
- }
- }
class _ZoneFunction extends core.Object {
_ZoneFunction(zone, function) {
this.zone = zone;
@@ -4917,6 +4917,9 @@ var async;
}
});
// Exports:
+ exports.AsyncError = AsyncError;
+ exports.Stream = Stream;
+ exports.Stream$ = Stream$;
exports.DeferredLibrary = DeferredLibrary;
exports.DeferredLoadException = DeferredLoadException;
exports.Future = Future;
@@ -4925,8 +4928,6 @@ var async;
exports.Completer = Completer;
exports.Completer$ = Completer$;
exports.scheduleMicrotask = scheduleMicrotask;
- exports.Stream = Stream;
- exports.Stream$ = Stream$;
exports.StreamSubscription = StreamSubscription;
exports.StreamSubscription$ = StreamSubscription$;
exports.EventSink = EventSink;
@@ -4944,7 +4945,6 @@ var async;
exports.StreamController = StreamController;
exports.StreamController$ = StreamController$;
exports.Timer = Timer;
- exports.AsyncError = AsyncError;
exports.ZoneSpecification = ZoneSpecification;
exports.ZoneDelegate = ZoneDelegate;
exports.Zone = Zone;

Powered by Google App Engine
This is Rietveld 408576698