Index: test/codegen/expect/dart/async.js |
diff --git a/test/codegen/expect/dart/async.js b/test/codegen/expect/dart/async.js |
index 64113b19d2b79119295dbae9389742bac628eb7d..1505f152ad355aff5554190e932ddbe801e2da94 100644 |
--- a/test/codegen/expect/dart/async.js |
+++ b/test/codegen/expect/dart/async.js |
@@ -18,6 +18,15 @@ var async; |
} |
} |
let _getBestStackTrace = Symbol('_getBestStackTrace'); |
+ class AsyncError extends core.Object { |
+ AsyncError(error, stackTrace) { |
+ this.error = error; |
+ this.stackTrace = stackTrace; |
+ } |
+ toString() { |
+ return dart.as(dart.dinvoke(this.error, 'toString'), core.String); |
+ } |
+ } |
class _UncaughtAsyncError extends AsyncError { |
_UncaughtAsyncError(error, stackTrace) { |
super.AsyncError(error, _getBestStackTrace(error, stackTrace)); |
@@ -38,2970 +47,2970 @@ var async; |
return result; |
} |
} |
- let _BroadcastStream$ = dart.generic(function(T) { |
- class _BroadcastStream extends _ControllerStream$(T) { |
- _BroadcastStream(controller) { |
- super._ControllerStream(dart.as(controller, _StreamControllerLifecycle$(T))); |
- } |
- get isBroadcast() { |
- return true; |
- } |
- } |
- return _BroadcastStream; |
- }); |
- let _BroadcastStream = _BroadcastStream$(dart.dynamic); |
- let _next = Symbol('_next'); |
- let _previous = Symbol('_previous'); |
- class _BroadcastSubscriptionLink extends core.Object { |
- _BroadcastSubscriptionLink() { |
- this[_next] = null; |
- this[_previous] = null; |
- } |
- } |
- let _eventState = Symbol('_eventState'); |
let _controller = Symbol('_controller'); |
- let _expectsEvent = Symbol('_expectsEvent'); |
- let _toggleEventId = Symbol('_toggleEventId'); |
- let _isFiring = Symbol('_isFiring'); |
- let _setRemoveAfterFiring = Symbol('_setRemoveAfterFiring'); |
- let _removeAfterFiring = Symbol('_removeAfterFiring'); |
- let _onPause = Symbol('_onPause'); |
- let _onResume = Symbol('_onResume'); |
- let _BroadcastSubscription$ = dart.generic(function(T) { |
- class _BroadcastSubscription extends _ControllerSubscription$(T) { |
- _BroadcastSubscription(controller, onData, onError, onDone, cancelOnError) { |
- this[_eventState] = null; |
- this[_next] = null; |
- this[_previous] = null; |
- super._ControllerSubscription(dart.as(controller, _StreamControllerLifecycle$(T)), onData, onError, onDone, cancelOnError); |
- this[_next] = this[_previous] = this; |
- } |
- get [_controller]() { |
- return dart.as(super[_controller], _BroadcastStreamController); |
- } |
- [_expectsEvent](eventId) { |
- return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_EVENT_ID)) === eventId; |
- } |
- [_toggleEventId]() { |
- this[_eventState] = _BroadcastSubscription._STATE_EVENT_ID; |
- } |
- get [_isFiring]() { |
- return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_FIRING)) !== 0; |
- } |
- [_setRemoveAfterFiring]() { |
- dart.assert(this[_isFiring]); |
- this[_eventState] = _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
- } |
- get [_removeAfterFiring]() { |
- return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_REMOVE_AFTER_FIRING)) !== 0; |
- } |
- [_onPause]() {} |
- [_onResume]() {} |
- } |
- _BroadcastSubscription._STATE_EVENT_ID = 1; |
- _BroadcastSubscription._STATE_FIRING = 2; |
- _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING = 4; |
- return _BroadcastSubscription; |
- }); |
- let _BroadcastSubscription = _BroadcastSubscription$(dart.dynamic); |
+ let _createSubscription = Symbol('_createSubscription'); |
let _onListen = Symbol('_onListen'); |
- let _onCancel = Symbol('_onCancel'); |
- let _state = Symbol('_state'); |
- let _addStreamState = Symbol('_addStreamState'); |
- let _doneFuture = Symbol('_doneFuture'); |
- let _isEmpty = Symbol('_isEmpty'); |
- let _hasOneListener = Symbol('_hasOneListener'); |
- let _isAddingStream = Symbol('_isAddingStream'); |
- let _mayAddEvent = Symbol('_mayAddEvent'); |
- let _ensureDoneFuture = Symbol('_ensureDoneFuture'); |
- let _addListener = Symbol('_addListener'); |
- let _removeListener = Symbol('_removeListener'); |
- let _subscribe = Symbol('_subscribe'); |
- let _recordCancel = Symbol('_recordCancel'); |
- let _callOnCancel = Symbol('_callOnCancel'); |
- let _recordPause = Symbol('_recordPause'); |
- let _recordResume = Symbol('_recordResume'); |
- let _addEventError = Symbol('_addEventError'); |
- let _sendData = Symbol('_sendData'); |
- let _sendError = Symbol('_sendError'); |
- let _sendDone = Symbol('_sendDone'); |
- let _add = Symbol('_add'); |
let _addError = Symbol('_addError'); |
- let _close = Symbol('_close'); |
- let _forEachListener = Symbol('_forEachListener'); |
- let _STATE_FIRING = Symbol('_STATE_FIRING'); |
- let _mayComplete = Symbol('_mayComplete'); |
- let _BroadcastStreamController$ = dart.generic(function(T) { |
- class _BroadcastStreamController extends core.Object { |
- _BroadcastStreamController($_onListen, $_onCancel) { |
- this[_onListen] = $_onListen; |
- this[_onCancel] = $_onCancel; |
- this[_state] = _BroadcastStreamController._STATE_INITIAL; |
- this[_next] = null; |
- this[_previous] = null; |
- this[_addStreamState] = null; |
- this[_doneFuture] = null; |
- this[_next] = this[_previous] = this; |
- } |
- get stream() { |
- return new _BroadcastStream(this); |
- } |
- get sink() { |
- return new _StreamSinkWrapper(this); |
+ let _completeError = Symbol('_completeError'); |
+ let _complete = Symbol('_complete'); |
+ let _sink = Symbol('_sink'); |
+ let Stream$ = dart.generic(function(T) { |
+ class Stream extends core.Object { |
+ Stream() { |
} |
- get isClosed() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_CLOSED)) !== 0; |
+ Stream$fromFuture(future) { |
+ let controller = dart.as(new StreamController({sync: true}), _StreamController$(T)); |
+ future.then(((value) => { |
+ controller._add(dart.as(value, T)); |
+ controller._closeUnchecked(); |
+ }).bind(this), {onError: ((error, stackTrace) => { |
+ controller._addError(error, dart.as(stackTrace, core.StackTrace)); |
+ controller._closeUnchecked(); |
+ }).bind(this)}); |
+ return controller.stream; |
} |
- get isPaused() { |
- return false; |
+ Stream$fromIterable(data) { |
+ return new _GeneratedStreamImpl(() => new _IterablePendingEvents(data)); |
} |
- get hasListener() { |
- return !dart.notNull(this[_isEmpty]); |
+ Stream$periodic(period, computation) { |
+ if (computation === void 0) |
+ computation = null; |
+ if (computation === null) |
+ computation = (i) => null; |
+ let timer = null; |
+ let computationCount = 0; |
+ let controller = null; |
+ let watch = new core.Stopwatch(); |
+ // Function sendEvent: () → void |
+ function sendEvent() { |
+ watch.reset(); |
+ let data = computation((($tmp) => computationCount = dart.notNull($tmp) + 1, $tmp)(computationCount)); |
+ controller.add(data); |
+ } |
+ // Function startPeriodicTimer: () → void |
+ function startPeriodicTimer() { |
+ dart.assert(timer === null); |
+ timer = new Timer.periodic(period, (timer) => { |
+ sendEvent(); |
+ }); |
+ } |
+ controller = new StreamController({sync: true, onListen: (() => { |
+ watch.start(); |
+ startPeriodicTimer(); |
+ }).bind(this), onPause: (() => { |
+ timer.cancel(); |
+ timer = null; |
+ watch.stop(); |
+ }).bind(this), onResume: (() => { |
+ dart.assert(timer === null); |
+ let elapsed = watch.elapsed; |
+ watch.start(); |
+ timer = new Timer(period['-'](elapsed), () => { |
+ timer = null; |
+ startPeriodicTimer(); |
+ sendEvent(); |
+ }); |
+ }).bind(this), onCancel: (() => { |
+ if (timer !== null) |
+ timer.cancel(); |
+ timer = null; |
+ }).bind(this)}); |
+ return controller.stream; |
} |
- get [_hasOneListener]() { |
- dart.assert(!dart.notNull(this[_isEmpty])); |
- return core.identical(this[_next][_next], this); |
+ Stream$eventTransformed(source, mapSink) { |
+ return dart.as(new _BoundSinkStream(source, dart.as(mapSink, _SinkMapper)), Stream$(T)); |
} |
- get [_isFiring]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_FIRING)) !== 0; |
+ get isBroadcast() { |
+ return false; |
} |
- get [_isAddingStream]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM)) !== 0; |
+ asBroadcastStream(opt$) { |
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
+ return new _AsBroadcastStream(this, dart.as(onListen, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void")), dart.as(onCancel, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void"))); |
} |
- get [_mayAddEvent]() { |
- return dart.notNull(this[_state]) < dart.notNull(_BroadcastStreamController._STATE_CLOSED); |
+ where(test) { |
+ return new _WhereStream(this, test); |
} |
- [_ensureDoneFuture]() { |
- if (this[_doneFuture] !== null) |
- return this[_doneFuture]; |
- return this[_doneFuture] = new _Future(); |
+ map(convert) { |
+ return new _MapStream(this, convert); |
} |
- get [_isEmpty]() { |
- return core.identical(this[_next], this); |
+ asyncMap(convert) { |
+ let controller = null; |
+ let subscription = null; |
+ // Function onListen: () → void |
+ function onListen() { |
+ let add = controller.add; |
+ dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController))); |
+ let eventSink = controller; |
+ let addError = eventSink[_addError]; |
+ subscription = this.listen(((event) => { |
+ let newValue = null; |
+ try { |
+ newValue = convert(event); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ controller.addError(e, s); |
+ return; |
+ } |
+ |
+ if (dart.is(newValue, Future)) { |
+ subscription.pause(); |
+ dart.dinvoke(dart.dinvoke(newValue, 'then', add, {onError: addError}), 'whenComplete', subscription.resume); |
+ } else { |
+ controller.add(newValue); |
+ } |
+ }).bind(this), {onError: dart.as(addError, core.Function), onDone: controller.close}); |
+ } |
+ if (this.isBroadcast) { |
+ controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => { |
+ subscription.cancel(); |
+ }).bind(this), sync: true}); |
+ } else { |
+ controller = new StreamController({onListen: onListen, onPause: (() => { |
+ subscription.pause(); |
+ }).bind(this), onResume: (() => { |
+ subscription.resume(); |
+ }).bind(this), onCancel: (() => { |
+ subscription.cancel(); |
+ }).bind(this), sync: true}); |
+ } |
+ return controller.stream; |
} |
- [_addListener](subscription) { |
- dart.assert(core.identical(subscription[_next], subscription)); |
- subscription[_previous] = this[_previous]; |
- subscription[_next] = this; |
- this[_previous][_next] = subscription; |
- this[_previous] = subscription; |
- subscription[_eventState] = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID); |
- } |
- [_removeListener](subscription) { |
- dart.assert(core.identical(subscription[_controller], this)); |
- dart.assert(!dart.notNull(core.identical(subscription[_next], subscription))); |
- let previous = subscription[_previous]; |
- let next = subscription[_next]; |
- previous[_next] = next; |
- next[_previous] = previous; |
- subscription[_next] = subscription[_previous] = subscription; |
- } |
- [_subscribe](onData, onError, onDone, cancelOnError) { |
- if (this.isClosed) { |
- if (onDone === null) |
- onDone = _nullDoneHandler; |
- return new _DoneStreamSubscription(onDone); |
- } |
- let subscription = new _BroadcastSubscription(this, onData, onError, onDone, cancelOnError); |
- this[_addListener](dart.as(subscription, _BroadcastSubscription$(T))); |
- if (core.identical(this[_next], this[_previous])) { |
- _runGuarded(this[_onListen]); |
+ asyncExpand(convert) { |
+ let controller = null; |
+ let subscription = null; |
+ // Function onListen: () → void |
+ function onListen() { |
+ dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController))); |
+ let eventSink = controller; |
+ subscription = this.listen(((event) => { |
+ let newStream = null; |
+ try { |
+ newStream = convert(event); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ controller.addError(e, s); |
+ return; |
+ } |
+ |
+ if (newStream !== null) { |
+ subscription.pause(); |
+ controller.addStream(newStream).whenComplete(subscription.resume); |
+ } |
+ }).bind(this), {onError: dart.as(eventSink[_addError], core.Function), onDone: controller.close}); |
} |
- return dart.as(subscription, StreamSubscription$(T)); |
- } |
- [_recordCancel](subscription) { |
- if (core.identical(subscription[_next], subscription)) |
- return null; |
- dart.assert(!dart.notNull(core.identical(subscription[_next], subscription))); |
- if (subscription[_isFiring]) { |
- subscription._setRemoveAfterFiring(); |
+ if (this.isBroadcast) { |
+ controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => { |
+ subscription.cancel(); |
+ }).bind(this), sync: true}); |
} else { |
- dart.assert(!dart.notNull(core.identical(subscription[_next], subscription))); |
- this[_removeListener](subscription); |
- if (!dart.notNull(this[_isFiring]) && dart.notNull(this[_isEmpty])) { |
- this[_callOnCancel](); |
- } |
+ controller = new StreamController({onListen: onListen, onPause: (() => { |
+ subscription.pause(); |
+ }).bind(this), onResume: (() => { |
+ subscription.resume(); |
+ }).bind(this), onCancel: (() => { |
+ subscription.cancel(); |
+ }).bind(this), sync: true}); |
} |
- return null; |
+ return controller.stream; |
} |
- [_recordPause](subscription) {} |
- [_recordResume](subscription) {} |
- [_addEventError]() { |
- if (this.isClosed) { |
- return new core.StateError("Cannot add new events after calling close"); |
- } |
- dart.assert(this[_isAddingStream]); |
- return new core.StateError("Cannot add new events while doing an addStream"); |
+ handleError(onError, opt$) { |
+ let test = opt$.test === void 0 ? null : opt$.test; |
+ return new _HandleErrorStream(this, onError, test); |
} |
- add(data) { |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_addEventError](); |
- this[_sendData](data); |
+ expand(convert) { |
+ return new _ExpandStream(this, convert); |
} |
- addError(error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- error = _nonNullError(error); |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_addEventError](); |
- let replacement = Zone.current.errorCallback(error, stackTrace); |
- if (replacement !== null) { |
- error = _nonNullError(replacement.error); |
- stackTrace = replacement.stackTrace; |
- } |
- this[_sendError](error, stackTrace); |
+ pipe(streamConsumer) { |
+ return streamConsumer.addStream(this).then(((_) => streamConsumer.close()).bind(this)); |
} |
- close() { |
- if (this.isClosed) { |
- dart.assert(this[_doneFuture] !== null); |
- return this[_doneFuture]; |
- } |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_addEventError](); |
- this[_state] = _BroadcastStreamController._STATE_CLOSED; |
- let doneFuture = this[_ensureDoneFuture](); |
- this[_sendDone](); |
- return doneFuture; |
+ transform(streamTransformer) { |
+ return streamTransformer.bind(this); |
} |
- get done() { |
- return this[_ensureDoneFuture](); |
+ reduce(combine) { |
+ let result = new _Future(); |
+ let seenFirst = false; |
+ let value = null; |
+ let subscription = null; |
+ subscription = this.listen((element) => { |
+ if (seenFirst) { |
+ _runUserCode(() => combine(value, element), dart.as((newValue) => { |
+ value = newValue; |
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ } else { |
+ value = element; |
+ seenFirst = true; |
+ } |
+ }, {onError: result[_completeError], onDone: (() => { |
+ if (!dart.notNull(seenFirst)) { |
+ try { |
+ throw _internal.IterableElementError.noElement(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(result, e, s); |
+ } |
+ |
+ } else { |
+ result._complete(value); |
+ } |
+ }).bind(this), cancelOnError: true}); |
+ return result; |
} |
- addStream(stream, opt$) { |
- let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError; |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_addEventError](); |
- this[_state] = _BroadcastStreamController._STATE_ADDSTREAM; |
- this[_addStreamState] = dart.as(new _AddStreamState(this, stream, cancelOnError), _AddStreamState$(T)); |
- return this[_addStreamState].addStreamFuture; |
+ fold(initialValue, combine) { |
+ let result = new _Future(); |
+ let value = initialValue; |
+ let subscription = null; |
+ subscription = this.listen((element) => { |
+ _runUserCode(() => combine(value, element), (newValue) => { |
+ value = newValue; |
+ }, dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: ((e, st) => { |
+ result._completeError(e, dart.as(st, core.StackTrace)); |
+ }).bind(this), onDone: (() => { |
+ result._complete(value); |
+ }).bind(this), cancelOnError: true}); |
+ return result; |
} |
- [_add](data) { |
- this[_sendData](data); |
+ join(separator) { |
+ if (separator === void 0) |
+ separator = ""; |
+ let result = new _Future(); |
+ let buffer = new core.StringBuffer(); |
+ let subscription = null; |
+ let first = true; |
+ subscription = this.listen(((element) => { |
+ if (!dart.notNull(first)) { |
+ buffer.write(separator); |
+ } |
+ first = false; |
+ try { |
+ buffer.write(element); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _cancelAndErrorWithReplacement(subscription, result, e, s); |
+ } |
+ |
+ }).bind(this), {onError: ((e) => { |
+ result._completeError(e); |
+ }).bind(this), onDone: (() => { |
+ result._complete(buffer.toString()); |
+ }).bind(this), cancelOnError: true}); |
+ return result; |
} |
- [_addError](error, stackTrace) { |
- this[_sendError](error, stackTrace); |
+ contains(needle) { |
+ let future = new _Future(); |
+ let subscription = null; |
+ subscription = this.listen((element) => { |
+ _runUserCode(() => dart.equals(element, needle), dart.as((isMatch) => { |
+ if (isMatch) { |
+ _cancelAndValue(subscription, future, true); |
+ } |
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ future._complete(false); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- [_close]() { |
- dart.assert(this[_isAddingStream]); |
- let addState = this[_addStreamState]; |
- this[_addStreamState] = null; |
- this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM); |
- addState.complete(); |
+ forEach(action) { |
+ let future = new _Future(); |
+ let subscription = null; |
+ subscription = this.listen((element) => { |
+ _runUserCode(() => action(element), (_) => { |
+ }, dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ future._complete(null); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- [_forEachListener](action) { |
- if (this[_isFiring]) { |
- throw new core.StateError("Cannot fire new event. Controller is already firing an event"); |
- } |
- if (this[_isEmpty]) |
- return; |
- let id = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID); |
- this[_state] = dart.notNull(_BroadcastStreamController._STATE_EVENT_ID) | dart.notNull(_BroadcastStreamController._STATE_FIRING); |
- let link = this[_next]; |
- while (!dart.notNull(core.identical(link, this))) { |
- let subscription = dart.as(link, _BroadcastSubscription$(T)); |
- if (subscription._expectsEvent(id)) { |
- subscription[_eventState] = _BroadcastSubscription[_STATE_FIRING]; |
- action(subscription); |
- subscription._toggleEventId(); |
- link = subscription[_next]; |
- if (subscription[_removeAfterFiring]) { |
- this[_removeListener](subscription); |
+ every(test) { |
+ let future = new _Future(); |
+ let subscription = null; |
+ subscription = this.listen((element) => { |
+ _runUserCode(() => test(element), dart.as((isMatch) => { |
+ if (!dart.notNull(isMatch)) { |
+ _cancelAndValue(subscription, future, false); |
} |
- subscription[_eventState] = ~dart.notNull(_BroadcastSubscription[_STATE_FIRING]); |
- } else { |
- link = subscription[_next]; |
- } |
- } |
- this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_FIRING); |
- if (this[_isEmpty]) { |
- this[_callOnCancel](); |
- } |
- } |
- [_callOnCancel]() { |
- dart.assert(this[_isEmpty]); |
- if (dart.notNull(this.isClosed) && dart.notNull(this[_doneFuture][_mayComplete])) { |
- this[_doneFuture]._asyncComplete(null); |
- } |
- _runGuarded(this[_onCancel]); |
- } |
- } |
- _BroadcastStreamController._STATE_INITIAL = 0; |
- _BroadcastStreamController._STATE_EVENT_ID = 1; |
- _BroadcastStreamController._STATE_FIRING = 2; |
- _BroadcastStreamController._STATE_CLOSED = 4; |
- _BroadcastStreamController._STATE_ADDSTREAM = 8; |
- return _BroadcastStreamController; |
- }); |
- let _BroadcastStreamController = _BroadcastStreamController$(dart.dynamic); |
- let _SyncBroadcastStreamController$ = dart.generic(function(T) { |
- class _SyncBroadcastStreamController extends _BroadcastStreamController$(T) { |
- _SyncBroadcastStreamController(onListen, onCancel) { |
- super._BroadcastStreamController(onListen, onCancel); |
- } |
- [_sendData](data) { |
- if (this[_isEmpty]) |
- return; |
- if (this[_hasOneListener]) { |
- this[_state] = _BroadcastStreamController[_STATE_FIRING]; |
- let subscription = dart.as(this[_next], _BroadcastSubscription); |
- subscription._add(data); |
- this[_state] = ~dart.notNull(_BroadcastStreamController[_STATE_FIRING]); |
- if (this[_isEmpty]) { |
- this[_callOnCancel](); |
- } |
- return; |
- } |
- this[_forEachListener](((subscription) => { |
- subscription._add(data); |
- }).bind(this)); |
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ future._complete(true); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- [_sendError](error, stackTrace) { |
- if (this[_isEmpty]) |
- return; |
- this[_forEachListener](((subscription) => { |
- subscription._addError(error, stackTrace); |
- }).bind(this)); |
+ any(test) { |
+ let future = new _Future(); |
+ let subscription = null; |
+ subscription = this.listen((element) => { |
+ _runUserCode(() => test(element), dart.as((isMatch) => { |
+ if (isMatch) { |
+ _cancelAndValue(subscription, future, true); |
+ } |
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ future._complete(false); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- [_sendDone]() { |
- if (!dart.notNull(this[_isEmpty])) { |
- this[_forEachListener](dart.as(((subscription) => { |
- subscription._close(); |
- }).bind(this), dart.throw_("Unimplemented type (_BufferingStreamSubscription<T>) → void"))); |
- } else { |
- dart.assert(this[_doneFuture] !== null); |
- dart.assert(this[_doneFuture][_mayComplete]); |
- this[_doneFuture]._asyncComplete(null); |
- } |
+ get length() { |
+ let future = new _Future(); |
+ let count = 0; |
+ this.listen((_) => { |
+ count = dart.notNull(count) + 1; |
+ }, {onError: future[_completeError], onDone: (() => { |
+ future._complete(count); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- } |
- return _SyncBroadcastStreamController; |
- }); |
- let _SyncBroadcastStreamController = _SyncBroadcastStreamController$(dart.dynamic); |
- let _AsyncBroadcastStreamController$ = dart.generic(function(T) { |
- class _AsyncBroadcastStreamController extends _BroadcastStreamController$(T) { |
- _AsyncBroadcastStreamController(onListen, onCancel) { |
- super._BroadcastStreamController(onListen, onCancel); |
+ get isEmpty() { |
+ let future = new _Future(); |
+ let subscription = null; |
+ subscription = this.listen((_) => { |
+ _cancelAndValue(subscription, future, false); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ future._complete(true); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- [_sendData](data) { |
- for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) { |
- let subscription = dart.as(link, _BroadcastSubscription$(T)); |
- subscription._addPending(new _DelayedData(data)); |
- } |
+ toList() { |
+ let result = new List.from([]); |
+ let future = new _Future(); |
+ this.listen(((data) => { |
+ result.add(data); |
+ }).bind(this), {onError: future[_completeError], onDone: (() => { |
+ future._complete(result); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- [_sendError](error, stackTrace) { |
- for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) { |
- let subscription = dart.as(link, _BroadcastSubscription$(T)); |
- subscription._addPending(new _DelayedError(error, stackTrace)); |
- } |
+ toSet() { |
+ let result = new core.Set(); |
+ let future = new _Future(); |
+ this.listen(((data) => { |
+ result.add(data); |
+ }).bind(this), {onError: future[_completeError], onDone: (() => { |
+ future._complete(result); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- [_sendDone]() { |
- if (!dart.notNull(this[_isEmpty])) { |
- for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) { |
- let subscription = dart.as(link, _BroadcastSubscription$(T)); |
- subscription._addPending(new _DelayedDone()); |
- } |
- } else { |
- dart.assert(this[_doneFuture] !== null); |
- dart.assert(this[_doneFuture][_mayComplete]); |
- this[_doneFuture]._asyncComplete(null); |
- } |
+ drain(futureValue) { |
+ if (futureValue === void 0) |
+ futureValue = null; |
+ return this.listen(null, {cancelOnError: true}).asFuture(futureValue); |
} |
- } |
- return _AsyncBroadcastStreamController; |
- }); |
- let _AsyncBroadcastStreamController = _AsyncBroadcastStreamController$(dart.dynamic); |
- let _pending = Symbol('_pending'); |
- let _hasPending = Symbol('_hasPending'); |
- let _addPendingEvent = Symbol('_addPendingEvent'); |
- let _STATE_CLOSED = Symbol('_STATE_CLOSED'); |
- let _AsBroadcastStreamController$ = dart.generic(function(T) { |
- class _AsBroadcastStreamController extends _SyncBroadcastStreamController$(T) { |
- _AsBroadcastStreamController(onListen, onCancel) { |
- this[_pending] = null; |
- super._SyncBroadcastStreamController(onListen, onCancel); |
+ take(count) { |
+ return dart.as(new _TakeStream(this, count), Stream$(T)); |
} |
- get [_hasPending]() { |
- return dart.notNull(this[_pending] !== null) && !dart.notNull(this[_pending].isEmpty); |
+ takeWhile(test) { |
+ return dart.as(new _TakeWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T)); |
} |
- [_addPendingEvent](event) { |
- if (this[_pending] === null) { |
- this[_pending] = new _StreamImplEvents(); |
- } |
- this[_pending].add(event); |
+ skip(count) { |
+ return dart.as(new _SkipStream(this, count), Stream$(T)); |
} |
- add(data) { |
- if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) { |
- this[_addPendingEvent](new _DelayedData(data)); |
- return; |
- } |
- super.add(data); |
- while (this[_hasPending]) { |
- this[_pending].handleNext(this); |
- } |
+ skipWhile(test) { |
+ return dart.as(new _SkipWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T)); |
} |
- addError(error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) { |
- this[_addPendingEvent](new _DelayedError(error, stackTrace)); |
- return; |
- } |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_addEventError](); |
- this[_sendError](error, stackTrace); |
- while (this[_hasPending]) { |
- this[_pending].handleNext(this); |
- } |
+ distinct(equals) { |
+ if (equals === void 0) |
+ equals = null; |
+ return dart.as(new _DistinctStream(this, dart.as(equals, dart.throw_("Unimplemented type (dynamic, dynamic) → bool"))), Stream$(T)); |
} |
- close() { |
- if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) { |
- this[_addPendingEvent](new _DelayedDone()); |
- this[_state] = _BroadcastStreamController[_STATE_CLOSED]; |
- return super.done; |
- } |
- let result = super.close(); |
- dart.assert(!dart.notNull(this[_hasPending])); |
- return result; |
+ get first() { |
+ let future = new _Future(); |
+ let subscription = null; |
+ subscription = this.listen((value) => { |
+ _cancelAndValue(subscription, future, value); |
+ }, { |
+ onError: future[_completeError], |
+ onDone: () => { |
+ try { |
+ throw _internal.IterableElementError.noElement(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ |
+ }, |
+ cancelOnError: true |
+ }); |
+ return future; |
} |
- [_callOnCancel]() { |
- if (this[_hasPending]) { |
- this[_pending].clear(); |
- this[_pending] = null; |
- } |
- super._callOnCancel(); |
+ get last() { |
+ let future = new _Future(); |
+ let result = null; |
+ let foundResult = false; |
+ let subscription = null; |
+ subscription = this.listen((value) => { |
+ foundResult = true; |
+ result = value; |
+ }, {onError: future[_completeError], onDone: (() => { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
+ } |
+ try { |
+ throw _internal.IterableElementError.noElement(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- } |
- return _AsBroadcastStreamController; |
- }); |
- let _AsBroadcastStreamController = _AsBroadcastStreamController$(dart.dynamic); |
- let _pauseCount = Symbol('_pauseCount'); |
- let _resume = Symbol('_resume'); |
- let _DoneSubscription$ = dart.generic(function(T) { |
- class _DoneSubscription extends core.Object { |
- _DoneSubscription() { |
- this[_pauseCount] = 0; |
- } |
- onData(handleData) {} |
- onError(handleError) {} |
- onDone(handleDone) {} |
- pause(resumeSignal) { |
- if (resumeSignal === void 0) |
- resumeSignal = null; |
- if (resumeSignal !== null) |
- resumeSignal.then(this[_resume]); |
- this[_pauseCount] = dart.notNull(this[_pauseCount]) + 1; |
- } |
- resume() { |
- this[_resume](null); |
- } |
- [_resume](_) { |
- if (dart.notNull(this[_pauseCount]) > 0) |
- this[_pauseCount] = dart.notNull(this[_pauseCount]) - 1; |
- } |
- cancel() { |
- return new _Future.immediate(null); |
- } |
- get isPaused() { |
- return dart.notNull(this[_pauseCount]) > 0; |
- } |
- asFuture(value) { |
- if (value === void 0) |
- value = null; |
- return new _Future(); |
- } |
- } |
- return _DoneSubscription; |
- }); |
- let _DoneSubscription = _DoneSubscription$(dart.dynamic); |
- class DeferredLibrary extends core.Object { |
- DeferredLibrary(libraryName, opt$) { |
- let uri = opt$.uri === void 0 ? null : opt$.uri; |
- this.libraryName = libraryName; |
- this.uri = uri; |
- } |
- load() { |
- throw 'DeferredLibrary not supported. ' + 'please use the `import "lib.dart" deferred as lib` syntax.'; |
- } |
- } |
- let _s = Symbol('_s'); |
- class DeferredLoadException extends core.Object { |
- DeferredLoadException($_s) { |
- this[_s] = $_s; |
- } |
- toString() { |
- return `DeferredLoadException: '${this[_s]}'`; |
- } |
- } |
- let _completeError = Symbol('_completeError'); |
- let Future$ = dart.generic(function(T) { |
- class Future extends core.Object { |
- Future(computation) { |
- let result = new _Future(); |
- Timer.run((() => { |
- try { |
- result._complete(computation()); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(result, e, s); |
- } |
+ get single() { |
+ let future = new _Future(); |
+ let result = null; |
+ let foundResult = false; |
+ let subscription = null; |
+ subscription = this.listen((value) => { |
+ if (foundResult) { |
+ try { |
+ throw _internal.IterableElementError.tooMany(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _cancelAndErrorWithReplacement(subscription, future, e, s); |
+ } |
- }).bind(this)); |
- return dart.as(result, Future$(T)); |
- } |
- Future$microtask(computation) { |
- let result = new _Future(); |
- scheduleMicrotask((() => { |
- try { |
- result._complete(computation()); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(result, e, s); |
+ return; |
} |
+ foundResult = true; |
+ result = value; |
+ }, {onError: future[_completeError], onDone: (() => { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
+ } |
+ try { |
+ throw _internal.IterableElementError.noElement(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(future, e, s); |
+ } |
- }).bind(this)); |
- return dart.as(result, Future$(T)); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- Future$sync(computation) { |
- try { |
- let result = computation(); |
- return new Future.value(result); |
- } catch (error) { |
- let stackTrace = dart.stackTrace(error); |
- return new Future.error(error, stackTrace); |
- } |
+ firstWhere(test, opt$) { |
+ let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue; |
+ let future = new _Future(); |
+ let subscription = null; |
+ subscription = this.listen((value) => { |
+ _runUserCode(() => test(value), dart.as((isMatch) => { |
+ if (isMatch) { |
+ _cancelAndValue(subscription, future, value); |
+ } |
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ if (defaultValue !== null) { |
+ _runUserCode(defaultValue, future[_complete], future[_completeError]); |
+ return; |
+ } |
+ try { |
+ throw _internal.IterableElementError.noElement(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- Future$value(value) { |
- if (value === void 0) |
- value = null; |
- return new _Future.immediate(value); |
- } |
- Future$error(error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- error = _nonNullError(error); |
- if (!dart.notNull(core.identical(Zone.current, _ROOT_ZONE))) { |
- let replacement = Zone.current.errorCallback(error, stackTrace); |
- if (replacement !== null) { |
- error = _nonNullError(replacement.error); |
- stackTrace = replacement.stackTrace; |
- } |
- } |
- return new _Future.immediateError(error, stackTrace); |
- } |
- Future$delayed(duration, computation) { |
- if (computation === void 0) |
- computation = null; |
- let result = new _Future(); |
- new Timer(duration, (() => { |
- try { |
- result._complete(computation === null ? null : computation()); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(result, e, s); |
- } |
+ lastWhere(test, opt$) { |
+ let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue; |
+ let future = new _Future(); |
+ let result = null; |
+ let foundResult = false; |
+ let subscription = null; |
+ subscription = this.listen((value) => { |
+ _runUserCode(() => true === test(value), dart.as((isMatch) => { |
+ if (isMatch) { |
+ foundResult = true; |
+ result = value; |
+ } |
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
+ } |
+ if (defaultValue !== null) { |
+ _runUserCode(defaultValue, future[_complete], future[_completeError]); |
+ return; |
+ } |
+ try { |
+ throw _internal.IterableElementError.noElement(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(future, e, s); |
+ } |
- }).bind(this)); |
- return dart.as(result, Future$(T)); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- static wait(futures, opt$) { |
- let eagerError = opt$.eagerError === void 0 ? false : opt$.eagerError; |
- let cleanUp = opt$.cleanUp === void 0 ? null : opt$.cleanUp; |
- let result = new _Future(); |
- let values = null; |
- let remaining = 0; |
- let error = null; |
- let stackTrace = null; |
- // Function handleError: (dynamic, dynamic) → void |
- function handleError(theError, theStackTrace) { |
- remaining = dart.notNull(remaining) - 1; |
- if (values !== null) { |
- if (cleanUp !== null) { |
- for (let value of values) { |
- if (value !== null) { |
- new Future.sync(() => { |
- cleanUp(value); |
- }); |
+ singleWhere(test) { |
+ let future = new _Future(); |
+ let result = null; |
+ let foundResult = false; |
+ let subscription = null; |
+ subscription = this.listen((value) => { |
+ _runUserCode(() => true === test(value), dart.as((isMatch) => { |
+ if (isMatch) { |
+ if (foundResult) { |
+ try { |
+ throw _internal.IterableElementError.tooMany(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _cancelAndErrorWithReplacement(subscription, future, e, s); |
} |
+ |
+ return; |
} |
+ foundResult = true; |
+ result = value; |
} |
- values = null; |
- if (remaining === 0 || dart.notNull(eagerError)) { |
- result._completeError(theError, dart.as(theStackTrace, core.StackTrace)); |
- } else { |
- error = theError; |
- stackTrace = dart.as(theStackTrace, core.StackTrace); |
+ }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ }, {onError: future[_completeError], onDone: (() => { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
} |
- } else if (remaining === 0 && !dart.notNull(eagerError)) { |
- result._completeError(error, stackTrace); |
- } |
- } |
- for (let future of futures) { |
- let pos = (($tmp) => remaining = dart.notNull($tmp) + 1, $tmp)(remaining); |
- future.then(dart.as(((value) => { |
- remaining = dart.notNull(remaining) - 1; |
- if (values !== null) { |
- values.set(pos, value); |
- if (remaining === 0) { |
- result._completeWithValue(values); |
- } |
- } else { |
- if (dart.notNull(cleanUp !== null) && dart.notNull(value !== null)) { |
- new Future.sync(() => { |
- cleanUp(value); |
- }); |
- } |
- if (remaining === 0 && !dart.notNull(eagerError)) { |
- result._completeError(error, stackTrace); |
- } |
+ try { |
+ throw _internal.IterableElementError.noElement(); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(future, e, s); |
} |
- }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: handleError}); |
- } |
- if (remaining === 0) { |
- return dart.as(new Future.value(/* Unimplemented const */new List.from([])), Future$(core.List)); |
- } |
- values = new core.List(remaining); |
- return result; |
+ |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- static forEach(input, f) { |
- let iterator = input.iterator; |
- return doWhile((() => { |
- if (!dart.notNull(iterator.moveNext())) |
- return false; |
- return new Future.sync((() => f(iterator.current)).bind(this)).then((_) => true); |
- }).bind(this)); |
+ elementAt(index) { |
+ if (dart.notNull(!(typeof index == number)) || dart.notNull(index) < 0) |
+ throw new core.ArgumentError(index); |
+ let future = new _Future(); |
+ let subscription = null; |
+ let elementIndex = 0; |
+ subscription = this.listen((value) => { |
+ if (index === elementIndex) { |
+ _cancelAndValue(subscription, future, value); |
+ return; |
+ } |
+ elementIndex = 1; |
+ }, {onError: future[_completeError], onDone: (() => { |
+ future._completeError(new core.RangeError.index(index, this, "index", null, elementIndex)); |
+ }).bind(this), cancelOnError: true}); |
+ return future; |
} |
- static doWhile(f) { |
- let doneSignal = new _Future(); |
- let nextIteration = null; |
- nextIteration = Zone.current.bindUnaryCallback(dart.as(((keepGoing) => { |
- if (keepGoing) { |
- new Future.sync(f).then(dart.as(nextIteration, dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: doneSignal[_completeError]}); |
+ timeout(timeLimit, opt$) { |
+ let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout; |
+ let controller = null; |
+ let subscription = null; |
+ let timer = null; |
+ let zone = null; |
+ let timeout = null; |
+ // Function onData: (T) → void |
+ function onData(event) { |
+ timer.cancel(); |
+ controller.add(event); |
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
+ } |
+ // Function onError: (dynamic, StackTrace) → void |
+ function onError(error, stackTrace) { |
+ timer.cancel(); |
+ dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController))); |
+ let eventSink = controller; |
+ dart.dinvoke(eventSink, '_addError', error, stackTrace); |
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
+ } |
+ // Function onDone: () → void |
+ function onDone() { |
+ timer.cancel(); |
+ controller.close(); |
+ } |
+ // Function onListen: () → void |
+ function onListen() { |
+ zone = Zone.current; |
+ if (onTimeout === null) { |
+ timeout = (() => { |
+ controller.addError(new TimeoutException("No stream event", timeLimit), null); |
+ }).bind(this); |
} else { |
- doneSignal._complete(null); |
+ onTimeout = zone.registerUnaryCallback(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic"))); |
+ let wrapper = new _ControllerEventSinkWrapper(null); |
+ timeout = (() => { |
+ wrapper[_sink] = controller; |
+ zone.runUnaryGuarded(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic")), wrapper); |
+ wrapper[_sink] = null; |
+ }).bind(this); |
} |
- }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {runGuarded: true}); |
- dart.dinvokef(nextIteration, true); |
- return doneSignal; |
+ subscription = this.listen(onData, {onError: onError, onDone: onDone}); |
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
+ } |
+ // Function onCancel: () → Future<dynamic> |
+ function onCancel() { |
+ timer.cancel(); |
+ let result = subscription.cancel(); |
+ subscription = null; |
+ return result; |
+ } |
+ controller = this.isBroadcast ? new _SyncBroadcastStreamController(onListen, onCancel) : new _SyncStreamController(onListen, (() => { |
+ timer.cancel(); |
+ subscription.pause(); |
+ }).bind(this), (() => { |
+ subscription.resume(); |
+ timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
+ }).bind(this), onCancel); |
+ return controller.stream; |
} |
} |
- dart.defineNamedConstructor(Future, 'microtask'); |
- dart.defineNamedConstructor(Future, 'sync'); |
- dart.defineNamedConstructor(Future, 'value'); |
- dart.defineNamedConstructor(Future, 'error'); |
- dart.defineNamedConstructor(Future, 'delayed'); |
- dart.defineLazyProperties(Future, { |
- get _nullFuture() { |
- return dart.as(new Future.value(null), _Future); |
- } |
- }); |
- return Future; |
+ dart.defineNamedConstructor(Stream, 'fromFuture'); |
+ dart.defineNamedConstructor(Stream, 'fromIterable'); |
+ dart.defineNamedConstructor(Stream, 'periodic'); |
+ dart.defineNamedConstructor(Stream, 'eventTransformed'); |
+ return Stream; |
}); |
- let Future = Future$(dart.dynamic); |
- class TimeoutException extends core.Object { |
- TimeoutException(message, duration) { |
- if (duration === void 0) |
- duration = null; |
- this.message = message; |
- this.duration = duration; |
- } |
- toString() { |
- let result = "TimeoutException"; |
- if (this.duration !== null) |
- result = `TimeoutException after ${this.duration}`; |
- if (this.message !== null) |
- result = `${result}: ${this.message}`; |
- return result; |
- } |
- } |
- let Completer$ = dart.generic(function(T) { |
- class Completer extends core.Object { |
- Completer() { |
- return new _AsyncCompleter(); |
+ let Stream = Stream$(dart.dynamic); |
+ let _StreamImpl$ = dart.generic(function(T) { |
+ class _StreamImpl extends Stream$(T) { |
+ listen(onData, opt$) { |
+ let onError = opt$.onError === void 0 ? null : opt$.onError; |
+ let onDone = opt$.onDone === void 0 ? null : opt$.onDone; |
+ let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError; |
+ cancelOnError = core.identical(true, cancelOnError); |
+ let subscription = this[_createSubscription](onData, onError, onDone, cancelOnError); |
+ this[_onListen](subscription); |
+ return dart.as(subscription, StreamSubscription$(T)); |
} |
- Completer$sync() { |
- return new _SyncCompleter(); |
+ [_createSubscription](onData, onError, onDone, cancelOnError) { |
+ return new _BufferingStreamSubscription(onData, onError, onDone, cancelOnError); |
} |
+ [_onListen](subscription) {} |
} |
- dart.defineNamedConstructor(Completer, 'sync'); |
- return Completer; |
+ return _StreamImpl; |
}); |
- let Completer = Completer$(dart.dynamic); |
- // Function _completeWithErrorCallback: (_Future<dynamic>, dynamic, dynamic) → void |
- function _completeWithErrorCallback(result, error, stackTrace) { |
- let replacement = Zone.current.errorCallback(error, dart.as(stackTrace, core.StackTrace)); |
- if (replacement !== null) { |
- error = _nonNullError(replacement.error); |
- stackTrace = replacement.stackTrace; |
- } |
- result._completeError(error, dart.as(stackTrace, core.StackTrace)); |
- } |
- // Function _nonNullError: (Object) → Object |
- function _nonNullError(error) { |
- return error !== null ? error : new core.NullThrownError(); |
- } |
- let _Completer$ = dart.generic(function(T) { |
- class _Completer extends core.Object { |
- _Completer() { |
- this.future = new _Future(); |
+ let _StreamImpl = _StreamImpl$(dart.dynamic); |
+ let _ControllerStream$ = dart.generic(function(T) { |
+ class _ControllerStream extends _StreamImpl$(T) { |
+ _ControllerStream($_controller) { |
+ this[_controller] = $_controller; |
+ super._StreamImpl(); |
} |
- completeError(error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- error = _nonNullError(error); |
- if (!dart.notNull(this.future[_mayComplete])) |
- throw new core.StateError("Future already completed"); |
- let replacement = Zone.current.errorCallback(error, stackTrace); |
- if (replacement !== null) { |
- error = _nonNullError(replacement.error); |
- stackTrace = replacement.stackTrace; |
- } |
- this[_completeError](error, stackTrace); |
+ [_createSubscription](onData, onError, onDone, cancelOnError) { |
+ return this[_controller]._subscribe(onData, onError, onDone, cancelOnError); |
} |
- get isCompleted() { |
- return !dart.notNull(this.future[_mayComplete]); |
+ get hashCode() { |
+ return dart.notNull(this[_controller].hashCode) ^ 892482866; |
+ } |
+ ['=='](other) { |
+ if (core.identical(this, other)) |
+ return true; |
+ if (!dart.is(other, _ControllerStream)) |
+ return false; |
+ let otherStream = dart.as(other, _ControllerStream); |
+ return core.identical(otherStream[_controller], this[_controller]); |
} |
} |
- return _Completer; |
+ return _ControllerStream; |
}); |
- let _Completer = _Completer$(dart.dynamic); |
- let _AsyncCompleter$ = dart.generic(function(T) { |
- class _AsyncCompleter extends _Completer$(T) { |
- complete(value) { |
- if (value === void 0) |
- value = null; |
- if (!dart.notNull(this.future[_mayComplete])) |
- throw new core.StateError("Future already completed"); |
- this.future._asyncComplete(value); |
+ let _ControllerStream = _ControllerStream$(dart.dynamic); |
+ let _BroadcastStream$ = dart.generic(function(T) { |
+ class _BroadcastStream extends _ControllerStream$(T) { |
+ _BroadcastStream(controller) { |
+ super._ControllerStream(dart.as(controller, _StreamControllerLifecycle$(T))); |
} |
- [_completeError](error, stackTrace) { |
- this.future._asyncCompleteError(error, stackTrace); |
+ get isBroadcast() { |
+ return true; |
} |
} |
- return _AsyncCompleter; |
+ return _BroadcastStream; |
}); |
- let _AsyncCompleter = _AsyncCompleter$(dart.dynamic); |
- let _SyncCompleter$ = dart.generic(function(T) { |
- class _SyncCompleter extends _Completer$(T) { |
- complete(value) { |
- if (value === void 0) |
- value = null; |
- if (!dart.notNull(this.future[_mayComplete])) |
- throw new core.StateError("Future already completed"); |
- this.future._complete(value); |
- } |
- [_completeError](error, stackTrace) { |
- this.future._completeError(error, stackTrace); |
- } |
+ let _BroadcastStream = _BroadcastStream$(dart.dynamic); |
+ let _next = Symbol('_next'); |
+ let _previous = Symbol('_previous'); |
+ class _BroadcastSubscriptionLink extends core.Object { |
+ _BroadcastSubscriptionLink() { |
+ this[_next] = null; |
+ this[_previous] = null; |
} |
- return _SyncCompleter; |
- }); |
- let _SyncCompleter = _SyncCompleter$(dart.dynamic); |
- let _nextListener = Symbol('_nextListener'); |
+ } |
+ let _eventState = Symbol('_eventState'); |
+ let _expectsEvent = Symbol('_expectsEvent'); |
+ let _toggleEventId = Symbol('_toggleEventId'); |
+ let _isFiring = Symbol('_isFiring'); |
+ let _setRemoveAfterFiring = Symbol('_setRemoveAfterFiring'); |
+ let _removeAfterFiring = Symbol('_removeAfterFiring'); |
+ let _onPause = Symbol('_onPause'); |
+ let _onResume = Symbol('_onResume'); |
+ let _onCancel = Symbol('_onCancel'); |
let _zone = Symbol('_zone'); |
- let _onValue = Symbol('_onValue'); |
+ let _state = Symbol('_state'); |
+ let _onData = Symbol('_onData'); |
let _onError = Symbol('_onError'); |
- let _errorTest = Symbol('_errorTest'); |
- let _whenCompleteAction = Symbol('_whenCompleteAction'); |
- class _FutureListener extends core.Object { |
- _FutureListener$then(result, onValue, errorCallback) { |
- this.result = result; |
- this.callback = onValue; |
- this.errorCallback = errorCallback; |
- this.state = errorCallback === null ? _FutureListener.STATE_THEN : _FutureListener.STATE_THEN_ONERROR; |
- this[_nextListener] = null; |
- } |
- _FutureListener$catchError(result, errorCallback, test) { |
- this.result = result; |
- this.errorCallback = errorCallback; |
- this.callback = test; |
- this.state = test === null ? _FutureListener.STATE_CATCHERROR : _FutureListener.STATE_CATCHERROR_TEST; |
- this[_nextListener] = null; |
- } |
- _FutureListener$whenComplete(result, onComplete) { |
- this.result = result; |
- this.callback = onComplete; |
- this.errorCallback = null; |
- this.state = _FutureListener.STATE_WHENCOMPLETE; |
- this[_nextListener] = null; |
- } |
- _FutureListener$chain(result) { |
- this.result = result; |
- this.callback = null; |
- this.errorCallback = null; |
- this.state = _FutureListener.STATE_CHAIN; |
- this[_nextListener] = null; |
- } |
- get [_zone]() { |
- return this.result[_zone]; |
- } |
- get handlesValue() { |
- return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_VALUE)) !== 0; |
- } |
- get handlesError() { |
- return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_ERROR)) !== 0; |
- } |
- get hasErrorTest() { |
- return this.state === _FutureListener.STATE_CATCHERROR_TEST; |
- } |
- get handlesComplete() { |
- return this.state === _FutureListener.STATE_WHENCOMPLETE; |
- } |
- get [_onValue]() { |
- dart.assert(this.handlesValue); |
- return dart.as(this.callback, _FutureOnValue); |
- } |
- get [_onError]() { |
- return this.errorCallback; |
- } |
- get [_errorTest]() { |
- dart.assert(this.hasErrorTest); |
- return dart.as(this.callback, _FutureErrorTest); |
- } |
- get [_whenCompleteAction]() { |
- dart.assert(this.handlesComplete); |
- return dart.as(this.callback, _FutureAction); |
- } |
- } |
- dart.defineNamedConstructor(_FutureListener, 'then'); |
- dart.defineNamedConstructor(_FutureListener, 'catchError'); |
- dart.defineNamedConstructor(_FutureListener, 'whenComplete'); |
- dart.defineNamedConstructor(_FutureListener, 'chain'); |
- _FutureListener.MASK_VALUE = 1; |
- _FutureListener.MASK_ERROR = 2; |
- _FutureListener.MASK_TEST_ERROR = 4; |
- _FutureListener.MASK_WHENCOMPLETE = 8; |
- _FutureListener.STATE_CHAIN = 0; |
- _FutureListener.STATE_THEN = _FutureListener.MASK_VALUE; |
- _FutureListener.STATE_THEN_ONERROR = dart.notNull(_FutureListener.MASK_VALUE) | dart.notNull(_FutureListener.MASK_ERROR); |
- _FutureListener.STATE_CATCHERROR = _FutureListener.MASK_ERROR; |
- _FutureListener.STATE_CATCHERROR_TEST = dart.notNull(_FutureListener.MASK_ERROR) | dart.notNull(_FutureListener.MASK_TEST_ERROR); |
- _FutureListener.STATE_WHENCOMPLETE = _FutureListener.MASK_WHENCOMPLETE; |
- let _resultOrListeners = Symbol('_resultOrListeners'); |
- let _asyncComplete = Symbol('_asyncComplete'); |
- let _asyncCompleteError = Symbol('_asyncCompleteError'); |
- let _isChained = Symbol('_isChained'); |
- let _isComplete = Symbol('_isComplete'); |
- let _hasValue = Symbol('_hasValue'); |
- let _hasError = Symbol('_hasError'); |
- let _markPendingCompletion = Symbol('_markPendingCompletion'); |
- let _value = Symbol('_value'); |
- let _error = Symbol('_error'); |
- let _setValue = Symbol('_setValue'); |
- let _setErrorObject = Symbol('_setErrorObject'); |
- let _setError = Symbol('_setError'); |
- let _removeListeners = Symbol('_removeListeners'); |
- let _chainForeignFuture = Symbol('_chainForeignFuture'); |
- let _chainCoreFuture = Symbol('_chainCoreFuture'); |
- let _complete = Symbol('_complete'); |
- let _completeWithValue = Symbol('_completeWithValue'); |
- let _propagateToListeners = Symbol('_propagateToListeners'); |
- let _Future$ = dart.generic(function(T) { |
- class _Future extends core.Object { |
- _Future() { |
+ let _onDone = Symbol('_onDone'); |
+ let _cancelFuture = Symbol('_cancelFuture'); |
+ let _pending = Symbol('_pending'); |
+ let _setPendingEvents = Symbol('_setPendingEvents'); |
+ let _extractPending = Symbol('_extractPending'); |
+ let _isCanceled = Symbol('_isCanceled'); |
+ let _isPaused = Symbol('_isPaused'); |
+ let _isInputPaused = Symbol('_isInputPaused'); |
+ let _inCallback = Symbol('_inCallback'); |
+ let _guardCallback = Symbol('_guardCallback'); |
+ let _decrementPauseCount = Symbol('_decrementPauseCount'); |
+ let _hasPending = Symbol('_hasPending'); |
+ let _mayResumeInput = Symbol('_mayResumeInput'); |
+ let _cancel = Symbol('_cancel'); |
+ let _isClosed = Symbol('_isClosed'); |
+ let _waitsForCancel = Symbol('_waitsForCancel'); |
+ let _canFire = Symbol('_canFire'); |
+ let _cancelOnError = Symbol('_cancelOnError'); |
+ let _incrementPauseCount = Symbol('_incrementPauseCount'); |
+ let _add = Symbol('_add'); |
+ let _sendData = Symbol('_sendData'); |
+ let _addPending = Symbol('_addPending'); |
+ let _sendError = Symbol('_sendError'); |
+ let _close = Symbol('_close'); |
+ let _sendDone = Symbol('_sendDone'); |
+ let _checkState = Symbol('_checkState'); |
+ let _BufferingStreamSubscription$ = dart.generic(function(T) { |
+ class _BufferingStreamSubscription extends core.Object { |
+ _BufferingStreamSubscription(onData, onError, onDone, cancelOnError) { |
this[_zone] = Zone.current; |
- this[_state] = _Future._INCOMPLETE; |
- this[_resultOrListeners] = null; |
+ this[_state] = cancelOnError ? _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR : 0; |
+ this[_onData] = null; |
+ this[_onError] = null; |
+ this[_onDone] = null; |
+ this[_cancelFuture] = null; |
+ this[_pending] = null; |
+ this.onData(onData); |
+ this.onError(onError); |
+ this.onDone(onDone); |
} |
- _Future$immediate(value) { |
- this[_zone] = Zone.current; |
- this[_state] = _Future._INCOMPLETE; |
- this[_resultOrListeners] = null; |
- this[_asyncComplete](value); |
+ [_setPendingEvents](pendingEvents) { |
+ dart.assert(this[_pending] === null); |
+ if (pendingEvents === null) |
+ return; |
+ this[_pending] = pendingEvents; |
+ if (!dart.notNull(pendingEvents.isEmpty)) { |
+ this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING; |
+ this[_pending].schedule(this); |
+ } |
} |
- _Future$immediateError(error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- this[_zone] = Zone.current; |
- this[_state] = _Future._INCOMPLETE; |
- this[_resultOrListeners] = null; |
- this[_asyncCompleteError](error, stackTrace); |
+ [_extractPending]() { |
+ dart.assert(this[_isCanceled]); |
+ let events = this[_pending]; |
+ this[_pending] = null; |
+ return events; |
} |
- get [_mayComplete]() { |
- return this[_state] === _Future._INCOMPLETE; |
+ onData(handleData) { |
+ if (handleData === null) |
+ handleData = _nullDataHandler; |
+ this[_onData] = this[_zone].registerUnaryCallback(dart.as(handleData, dart.throw_("Unimplemented type (dynamic) → dynamic"))); |
} |
- get [_isChained]() { |
- return this[_state] === _Future._CHAINED; |
+ onError(handleError) { |
+ if (handleError === null) |
+ handleError = _nullErrorHandler; |
+ this[_onError] = _registerErrorHandler(handleError, this[_zone]); |
} |
- get [_isComplete]() { |
- return dart.notNull(this[_state]) >= dart.notNull(_Future._VALUE); |
+ onDone(handleDone) { |
+ if (handleDone === null) |
+ handleDone = _nullDoneHandler; |
+ this[_onDone] = this[_zone].registerCallback(handleDone); |
} |
- get [_hasValue]() { |
- return this[_state] === _Future._VALUE; |
+ pause(resumeSignal) { |
+ if (resumeSignal === void 0) |
+ resumeSignal = null; |
+ if (this[_isCanceled]) |
+ return; |
+ let wasPaused = this[_isPaused]; |
+ let wasInputPaused = this[_isInputPaused]; |
+ this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
+ if (resumeSignal !== null) |
+ resumeSignal.whenComplete(this.resume); |
+ if (!dart.notNull(wasPaused) && dart.notNull(this[_pending] !== null)) |
+ this[_pending].cancelSchedule(); |
+ if (!dart.notNull(wasInputPaused) && !dart.notNull(this[_inCallback])) |
+ this[_guardCallback](this[_onPause]); |
} |
- get [_hasError]() { |
- return this[_state] === _Future._ERROR; |
+ resume() { |
+ if (this[_isCanceled]) |
+ return; |
+ if (this[_isPaused]) { |
+ this[_decrementPauseCount](); |
+ if (!dart.notNull(this[_isPaused])) { |
+ if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_pending].isEmpty)) { |
+ this[_pending].schedule(this); |
+ } else { |
+ dart.assert(this[_mayResumeInput]); |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
+ if (!dart.notNull(this[_inCallback])) |
+ this[_guardCallback](this[_onResume]); |
+ } |
+ } |
+ } |
} |
- set [_isChained](value) { |
- if (value) { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- this[_state] = _Future._CHAINED; |
- } else { |
- dart.assert(this[_isChained]); |
- this[_state] = _Future._INCOMPLETE; |
- } |
+ cancel() { |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL); |
+ if (this[_isCanceled]) |
+ return this[_cancelFuture]; |
+ this[_cancel](); |
+ return this[_cancelFuture]; |
} |
- then(f, opt$) { |
- let onError = opt$.onError === void 0 ? null : opt$.onError; |
+ asFuture(futureValue) { |
+ if (futureValue === void 0) |
+ futureValue = null; |
let result = new _Future(); |
- if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) { |
- f = result[_zone].registerUnaryCallback(dart.as(f, dart.throw_("Unimplemented type (dynamic) → dynamic"))); |
- if (onError !== null) { |
- onError = _registerErrorHandler(onError, result[_zone]); |
- } |
- } |
- this[_addListener](new _FutureListener.then(result, dart.as(f, _FutureOnValue), onError)); |
+ this[_onDone] = (() => { |
+ result._complete(futureValue); |
+ }).bind(this); |
+ this[_onError] = ((error, stackTrace) => { |
+ this.cancel(); |
+ result._completeError(error, dart.as(stackTrace, core.StackTrace)); |
+ }).bind(this); |
return result; |
} |
- catchError(onError, opt$) { |
- let test = opt$.test === void 0 ? null : opt$.test; |
- let result = new _Future(); |
- if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) { |
- onError = _registerErrorHandler(onError, result[_zone]); |
- if (test !== null) |
- test = dart.as(result[_zone].registerUnaryCallback(test), dart.throw_("Unimplemented type (dynamic) → bool")); |
- } |
- this[_addListener](new _FutureListener.catchError(result, onError, test)); |
- return result; |
+ get [_isInputPaused]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED)) !== 0; |
} |
- whenComplete(action) { |
- let result = new _Future(); |
- if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) { |
- action = result[_zone].registerCallback(action); |
- } |
- this[_addListener](new _FutureListener.whenComplete(result, action)); |
- return dart.as(result, Future$(T)); |
+ get [_isClosed]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CLOSED)) !== 0; |
} |
- asStream() { |
- return dart.as(new Stream.fromFuture(this), Stream$(T)); |
+ get [_isCanceled]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCELED)) !== 0; |
} |
- [_markPendingCompletion]() { |
- if (!dart.notNull(this[_mayComplete])) |
- throw new core.StateError("Future already completed"); |
- this[_state] = _Future._PENDING_COMPLETE; |
+ get [_waitsForCancel]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL)) !== 0; |
} |
- get [_value]() { |
- dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasValue])); |
- return dart.as(this[_resultOrListeners], T); |
+ get [_inCallback]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK)) !== 0; |
} |
- get [_error]() { |
- dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasError])); |
- return dart.as(this[_resultOrListeners], AsyncError); |
+ get [_hasPending]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING)) !== 0; |
} |
- [_setValue](value) { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- this[_state] = _Future._VALUE; |
- this[_resultOrListeners] = value; |
+ get [_isPaused]() { |
+ return dart.notNull(this[_state]) >= dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT); |
} |
- [_setErrorObject](error) { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- this[_state] = _Future._ERROR; |
- this[_resultOrListeners] = error; |
+ get [_canFire]() { |
+ return dart.notNull(this[_state]) < dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
} |
- [_setError](error, stackTrace) { |
- this[_setErrorObject](new AsyncError(error, stackTrace)); |
+ get [_mayResumeInput]() { |
+ return !dart.notNull(this[_isPaused]) && (dart.notNull(this[_pending] === null) || dart.notNull(this[_pending].isEmpty)); |
} |
- [_addListener](listener) { |
- dart.assert(listener[_nextListener] === null); |
- if (this[_isComplete]) { |
- this[_zone].scheduleMicrotask((() => { |
- _propagateToListeners(this, listener); |
- }).bind(this)); |
- } else { |
- listener[_nextListener] = dart.as(this[_resultOrListeners], _FutureListener); |
- this[_resultOrListeners] = listener; |
- } |
+ get [_cancelOnError]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCEL_ON_ERROR)) !== 0; |
} |
- [_removeListeners]() { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- let current = dart.as(this[_resultOrListeners], _FutureListener); |
- this[_resultOrListeners] = null; |
- let prev = null; |
- while (current !== null) { |
- let next = current[_nextListener]; |
- current[_nextListener] = prev; |
- prev = current; |
- current = next; |
+ get isPaused() { |
+ return this[_isPaused]; |
+ } |
+ [_cancel]() { |
+ this[_state] = _BufferingStreamSubscription._STATE_CANCELED; |
+ if (this[_hasPending]) { |
+ this[_pending].cancelSchedule(); |
} |
- return prev; |
+ if (!dart.notNull(this[_inCallback])) |
+ this[_pending] = null; |
+ this[_cancelFuture] = this[_onCancel](); |
} |
- static [_chainForeignFuture](source, target) { |
- dart.assert(!dart.notNull(target[_isComplete])); |
- dart.assert(!dart.is(source, _Future)); |
- target[_isChained] = true; |
- source.then(((value) => { |
- dart.assert(target[_isChained]); |
- target._completeWithValue(value); |
- }).bind(this), {onError: ((error, stackTrace) => { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- dart.assert(target[_isChained]); |
- target._completeError(error, dart.as(stackTrace, core.StackTrace)); |
- }).bind(this)}); |
+ [_incrementPauseCount]() { |
+ this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
} |
- static [_chainCoreFuture](source, target) { |
- dart.assert(!dart.notNull(target[_isComplete])); |
- dart.assert(dart.is(source, _Future)); |
- target[_isChained] = true; |
- let listener = new _FutureListener.chain(target); |
- if (source[_isComplete]) { |
- _propagateToListeners(source, listener); |
+ [_decrementPauseCount]() { |
+ dart.assert(this[_isPaused]); |
+ this[_state] = _BufferingStreamSubscription._STATE_PAUSE_COUNT; |
+ } |
+ [_add](data) { |
+ dart.assert(!dart.notNull(this[_isClosed])); |
+ if (this[_isCanceled]) |
+ return; |
+ if (this[_canFire]) { |
+ this[_sendData](data); |
} else { |
- source._addListener(listener); |
+ this[_addPending](new _DelayedData(data)); |
} |
} |
- [_complete](value) { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- if (dart.is(value, Future)) { |
- if (dart.is(value, _Future)) { |
- _chainCoreFuture(dart.as(value, _Future), this); |
- } else { |
- _chainForeignFuture(dart.as(value, Future), this); |
- } |
+ [_addError](error, stackTrace) { |
+ if (this[_isCanceled]) |
+ return; |
+ if (this[_canFire]) { |
+ this[_sendError](error, stackTrace); |
} else { |
- let listeners = this[_removeListeners](); |
- this[_setValue](dart.as(value, T)); |
- _propagateToListeners(this, listeners); |
+ this[_addPending](new _DelayedError(error, stackTrace)); |
} |
} |
- [_completeWithValue](value) { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- dart.assert(!dart.is(value, Future)); |
- let listeners = this[_removeListeners](); |
- this[_setValue](dart.as(value, T)); |
- _propagateToListeners(this, listeners); |
- } |
- [_completeError](error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- dart.assert(!dart.notNull(this[_isComplete])); |
- let listeners = this[_removeListeners](); |
- this[_setError](error, stackTrace); |
- _propagateToListeners(this, listeners); |
- } |
- [_asyncComplete](value) { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- if (value === null) { |
- } else if (dart.is(value, Future)) { |
- let typedFuture = dart.as(value, Future$(T)); |
- if (dart.is(typedFuture, _Future)) { |
- let coreFuture = dart.as(typedFuture, _Future$(T)); |
- if (dart.notNull(coreFuture[_isComplete]) && dart.notNull(coreFuture[_hasError])) { |
- this[_markPendingCompletion](); |
- this[_zone].scheduleMicrotask((() => { |
- _chainCoreFuture(coreFuture, this); |
- }).bind(this)); |
- } else { |
- _chainCoreFuture(coreFuture, this); |
- } |
- } else { |
- _chainForeignFuture(typedFuture, this); |
- } |
+ [_close]() { |
+ dart.assert(!dart.notNull(this[_isClosed])); |
+ if (this[_isCanceled]) |
return; |
+ this[_state] = _BufferingStreamSubscription._STATE_CLOSED; |
+ if (this[_canFire]) { |
+ this[_sendDone](); |
} else { |
- let typedValue = dart.as(value, T); |
+ this[_addPending](new _DelayedDone()); |
} |
- this[_markPendingCompletion](); |
- this[_zone].scheduleMicrotask((() => { |
- this[_completeWithValue](value); |
- }).bind(this)); |
} |
- [_asyncCompleteError](error, stackTrace) { |
- dart.assert(!dart.notNull(this[_isComplete])); |
- this[_markPendingCompletion](); |
- this[_zone].scheduleMicrotask((() => { |
- this[_completeError](error, stackTrace); |
- }).bind(this)); |
+ [_onPause]() { |
+ dart.assert(this[_isInputPaused]); |
} |
- static [_propagateToListeners](source, listeners) { |
- while (true) { |
- dart.assert(source[_isComplete]); |
- let hasError = source[_hasError]; |
- if (listeners === null) { |
- if (hasError) { |
- let asyncError = source[_error]; |
- source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace); |
- } |
- return; |
- } |
- while (listeners[_nextListener] !== null) { |
- let listener = listeners; |
- listeners = listener[_nextListener]; |
- listener[_nextListener] = null; |
- _propagateToListeners(source, listener); |
+ [_onResume]() { |
+ dart.assert(!dart.notNull(this[_isInputPaused])); |
+ } |
+ [_onCancel]() { |
+ dart.assert(this[_isCanceled]); |
+ return null; |
+ } |
+ [_addPending](event) { |
+ let pending = dart.as(this[_pending], _StreamImplEvents); |
+ if (this[_pending] === null) |
+ pending = this[_pending] = new _StreamImplEvents(); |
+ pending.add(event); |
+ if (!dart.notNull(this[_hasPending])) { |
+ this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING; |
+ if (!dart.notNull(this[_isPaused])) { |
+ this[_pending].schedule(this); |
} |
- let listener = listeners; |
- let listenerHasValue = true; |
- let sourceValue = hasError ? null : source[_value]; |
- let listenerValueOrError = sourceValue; |
- let isPropagationAborted = false; |
- if (dart.notNull(hasError) || dart.notNull(listener.handlesValue) || dart.notNull(listener.handlesComplete)) { |
- let zone = listener[_zone]; |
- if (dart.notNull(hasError) && !dart.notNull(source[_zone].inSameErrorZone(zone))) { |
- let asyncError = source[_error]; |
- source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace); |
- return; |
- } |
- let oldZone = null; |
- if (!dart.notNull(core.identical(Zone.current, zone))) { |
- oldZone = Zone._enter(zone); |
- } |
- // Function handleValueCallback: () → bool |
- function handleValueCallback() { |
- try { |
- listenerValueOrError = zone.runUnary(listener[_onValue], sourceValue); |
- return true; |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- listenerValueOrError = new AsyncError(e, s); |
- return false; |
- } |
- |
- } |
- // Function handleError: () → void |
- function handleError() { |
- let asyncError = source[_error]; |
- let matchesTest = true; |
- if (listener.hasErrorTest) { |
- let test = listener[_errorTest]; |
- try { |
- matchesTest = dart.as(zone.runUnary(test, asyncError.error), core.bool); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s); |
- listenerHasValue = false; |
- return; |
- } |
- |
- } |
- let errorCallback = listener[_onError]; |
- if (dart.notNull(matchesTest) && dart.notNull(errorCallback !== null)) { |
- try { |
- if (dart.is(errorCallback, ZoneBinaryCallback)) { |
- listenerValueOrError = zone.runBinary(errorCallback, asyncError.error, asyncError.stackTrace); |
- } else { |
- listenerValueOrError = zone.runUnary(dart.as(errorCallback, dart.throw_("Unimplemented type (dynamic) → dynamic")), asyncError.error); |
- } |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s); |
- listenerHasValue = false; |
- return; |
- } |
- |
- listenerHasValue = true; |
- } else { |
- listenerValueOrError = asyncError; |
- listenerHasValue = false; |
- } |
- } |
- // Function handleWhenCompleteCallback: () → void |
- function handleWhenCompleteCallback() { |
- let completeResult = null; |
- try { |
- completeResult = zone.run(listener[_whenCompleteAction]); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- if (dart.notNull(hasError) && dart.notNull(core.identical(source[_error].error, e))) { |
- listenerValueOrError = source[_error]; |
- } else { |
- listenerValueOrError = new AsyncError(e, s); |
- } |
- listenerHasValue = false; |
- return; |
- } |
- |
- if (dart.is(completeResult, Future)) { |
- let result = listener.result; |
- result[_isChained] = true; |
- isPropagationAborted = true; |
- dart.dinvoke(completeResult, 'then', (ignored) => { |
- _propagateToListeners(source, new _FutureListener.chain(result)); |
- }, { |
- onError: (error, stackTrace) => { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- if (!dart.is(completeResult, _Future)) { |
- completeResult = new _Future(); |
- dart.dinvoke(completeResult, '_setError', error, stackTrace); |
- } |
- _propagateToListeners(dart.as(completeResult, _Future), new _FutureListener.chain(result)); |
- } |
- }); |
- } |
- } |
- if (!dart.notNull(hasError)) { |
- if (listener.handlesValue) { |
- listenerHasValue = handleValueCallback(); |
- } |
- } else { |
- handleError(); |
- } |
- if (listener.handlesComplete) { |
- handleWhenCompleteCallback(); |
- } |
- if (oldZone !== null) |
- Zone._leave(oldZone); |
- if (isPropagationAborted) |
- return; |
- if (dart.notNull(listenerHasValue) && !dart.notNull(core.identical(sourceValue, listenerValueOrError)) && dart.notNull(dart.is(listenerValueOrError, Future))) { |
- let chainSource = dart.as(listenerValueOrError, Future); |
- let result = listener.result; |
- if (dart.is(chainSource, _Future)) { |
- if (chainSource[_isComplete]) { |
- result[_isChained] = true; |
- source = chainSource; |
- listeners = new _FutureListener.chain(result); |
- continue; |
- } else { |
- _chainCoreFuture(chainSource, result); |
- } |
- } else { |
- _chainForeignFuture(chainSource, result); |
- } |
- return; |
- } |
+ } |
+ } |
+ [_sendData](data) { |
+ dart.assert(!dart.notNull(this[_isCanceled])); |
+ dart.assert(!dart.notNull(this[_isPaused])); |
+ dart.assert(!dart.notNull(this[_inCallback])); |
+ let wasInputPaused = this[_isInputPaused]; |
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
+ this[_zone].runUnaryGuarded(dart.as(this[_onData], dart.throw_("Unimplemented type (dynamic) → dynamic")), data); |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
+ this[_checkState](wasInputPaused); |
+ } |
+ [_sendError](error, stackTrace) { |
+ dart.assert(!dart.notNull(this[_isCanceled])); |
+ dart.assert(!dart.notNull(this[_isPaused])); |
+ dart.assert(!dart.notNull(this[_inCallback])); |
+ let wasInputPaused = this[_isInputPaused]; |
+ // Function sendError: () → void |
+ function sendError() { |
+ if (dart.notNull(this[_isCanceled]) && !dart.notNull(this[_waitsForCancel])) |
+ return; |
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
+ if (dart.is(this[_onError], ZoneBinaryCallback)) { |
+ this[_zone].runBinaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic, dynamic) → dynamic")), error, stackTrace); |
+ } else { |
+ this[_zone].runUnaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic) → dynamic")), error); |
} |
- let result = listener.result; |
- listeners = result._removeListeners(); |
- if (listenerHasValue) { |
- result._setValue(listenerValueOrError); |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
+ } |
+ if (this[_cancelOnError]) { |
+ this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL; |
+ this[_cancel](); |
+ if (dart.is(this[_cancelFuture], Future)) { |
+ this[_cancelFuture].whenComplete(sendError); |
} else { |
- let asyncError = dart.as(listenerValueOrError, AsyncError); |
- result._setErrorObject(asyncError); |
+ sendError(); |
} |
- source = result; |
+ } else { |
+ sendError(); |
+ this[_checkState](wasInputPaused); |
} |
} |
- timeout(timeLimit, opt$) { |
- let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout; |
- if (this[_isComplete]) |
- return new _Future.immediate(this); |
- let result = new _Future(); |
- let timer = null; |
- if (onTimeout === null) { |
- timer = new Timer(timeLimit, (() => { |
- result._completeError(new TimeoutException("Future not completed", timeLimit)); |
- }).bind(this)); |
+ [_sendDone]() { |
+ dart.assert(!dart.notNull(this[_isCanceled])); |
+ dart.assert(!dart.notNull(this[_isPaused])); |
+ dart.assert(!dart.notNull(this[_inCallback])); |
+ // Function sendDone: () → void |
+ function sendDone() { |
+ if (!dart.notNull(this[_waitsForCancel])) |
+ return; |
+ this[_state] = dart.notNull(_BufferingStreamSubscription._STATE_CANCELED) | dart.notNull(_BufferingStreamSubscription._STATE_CLOSED) | dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
+ this[_zone].runGuarded(this[_onDone]); |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
+ } |
+ this[_cancel](); |
+ this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL; |
+ if (dart.is(this[_cancelFuture], Future)) { |
+ this[_cancelFuture].whenComplete(sendDone); |
} else { |
- let zone = Zone.current; |
- onTimeout = zone.registerCallback(onTimeout); |
- timer = new Timer(timeLimit, (() => { |
- try { |
- result._complete(zone.run(onTimeout)); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- result._completeError(e, s); |
- } |
- |
- }).bind(this)); |
+ sendDone(); |
} |
- this.then(((v) => { |
- if (timer.isActive) { |
- timer.cancel(); |
- result._completeWithValue(v); |
+ } |
+ [_guardCallback](callback) { |
+ dart.assert(!dart.notNull(this[_inCallback])); |
+ let wasInputPaused = this[_isInputPaused]; |
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
+ dart.dinvokef(callback); |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
+ this[_checkState](wasInputPaused); |
+ } |
+ [_checkState](wasInputPaused) { |
+ dart.assert(!dart.notNull(this[_inCallback])); |
+ if (dart.notNull(this[_hasPending]) && dart.notNull(this[_pending].isEmpty)) { |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING); |
+ if (dart.notNull(this[_isInputPaused]) && dart.notNull(this[_mayResumeInput])) { |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
} |
- }).bind(this), {onError: ((e, s) => { |
- if (timer.isActive) { |
- timer.cancel(); |
- result._completeError(e, dart.as(s, core.StackTrace)); |
- } |
- }).bind(this)}); |
- return result; |
+ } |
+ while (true) { |
+ if (this[_isCanceled]) { |
+ this[_pending] = null; |
+ return; |
+ } |
+ let isInputPaused = this[_isInputPaused]; |
+ if (wasInputPaused === isInputPaused) |
+ break; |
+ this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
+ if (isInputPaused) { |
+ this[_onPause](); |
+ } else { |
+ this[_onResume](); |
+ } |
+ this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
+ wasInputPaused = isInputPaused; |
+ } |
+ if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_isPaused])) { |
+ this[_pending].schedule(this); |
+ } |
} |
} |
- dart.defineNamedConstructor(_Future, 'immediate'); |
- dart.defineNamedConstructor(_Future, 'immediateError'); |
- _Future._INCOMPLETE = 0; |
- _Future._PENDING_COMPLETE = 1; |
- _Future._CHAINED = 2; |
- _Future._VALUE = 4; |
- _Future._ERROR = 8; |
- return _Future; |
+ _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR = 1; |
+ _BufferingStreamSubscription._STATE_CLOSED = 2; |
+ _BufferingStreamSubscription._STATE_INPUT_PAUSED = 4; |
+ _BufferingStreamSubscription._STATE_CANCELED = 8; |
+ _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL = 16; |
+ _BufferingStreamSubscription._STATE_IN_CALLBACK = 32; |
+ _BufferingStreamSubscription._STATE_HAS_PENDING = 64; |
+ _BufferingStreamSubscription._STATE_PAUSE_COUNT = 128; |
+ _BufferingStreamSubscription._STATE_PAUSE_COUNT_SHIFT = 7; |
+ return _BufferingStreamSubscription; |
}); |
- let _Future = _Future$(dart.dynamic); |
- class _AsyncCallbackEntry extends core.Object { |
- _AsyncCallbackEntry(callback) { |
- this.callback = callback; |
- this.next = null; |
- } |
- } |
- exports._nextCallback = null; |
- exports._lastCallback = null; |
- exports._lastPriorityCallback = null; |
- exports._isInCallbackLoop = false; |
- // Function _asyncRunCallbackLoop: () → void |
- function _asyncRunCallbackLoop() { |
- while (exports._nextCallback !== null) { |
- exports._lastPriorityCallback = null; |
- let entry = exports._nextCallback; |
- exports._nextCallback = entry.next; |
- if (exports._nextCallback === null) |
- exports._lastCallback = null; |
- entry.callback(); |
- } |
- } |
- // Function _asyncRunCallback: () → void |
- function _asyncRunCallback() { |
- exports._isInCallbackLoop = true; |
- try { |
- _asyncRunCallbackLoop(); |
- } finally { |
- exports._lastPriorityCallback = null; |
- exports._isInCallbackLoop = false; |
- if (exports._nextCallback !== null) |
- _AsyncRun._scheduleImmediate(_asyncRunCallback); |
- } |
- } |
- // Function _scheduleAsyncCallback: (dynamic) → void |
- function _scheduleAsyncCallback(callback) { |
- if (exports._nextCallback === null) { |
- exports._nextCallback = exports._lastCallback = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback)); |
- if (!dart.notNull(exports._isInCallbackLoop)) { |
- _AsyncRun._scheduleImmediate(_asyncRunCallback); |
+ let _BufferingStreamSubscription = _BufferingStreamSubscription$(dart.dynamic); |
+ let _ControllerSubscription$ = dart.generic(function(T) { |
+ class _ControllerSubscription extends _BufferingStreamSubscription$(T) { |
+ _ControllerSubscription($_controller, onData, onError, onDone, cancelOnError) { |
+ this[_controller] = $_controller; |
+ super._BufferingStreamSubscription(onData, onError, onDone, cancelOnError); |
} |
- } else { |
- let newEntry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback)); |
- exports._lastCallback.next = newEntry; |
- exports._lastCallback = newEntry; |
- } |
- } |
- // Function _schedulePriorityAsyncCallback: (dynamic) → void |
- function _schedulePriorityAsyncCallback(callback) { |
- let entry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback)); |
- if (exports._nextCallback === null) { |
- _scheduleAsyncCallback(callback); |
- exports._lastPriorityCallback = exports._lastCallback; |
- } else if (exports._lastPriorityCallback === null) { |
- entry.next = exports._nextCallback; |
- exports._nextCallback = exports._lastPriorityCallback = entry; |
- } else { |
- entry.next = exports._lastPriorityCallback.next; |
- exports._lastPriorityCallback.next = entry; |
- exports._lastPriorityCallback = entry; |
- if (entry.next === null) { |
- exports._lastCallback = entry; |
+ [_onCancel]() { |
+ return this[_controller]._recordCancel(this); |
} |
- } |
- } |
- // Function scheduleMicrotask: (() → void) → void |
- function scheduleMicrotask(callback) { |
- if (core.identical(_ROOT_ZONE, Zone.current)) { |
- _rootScheduleMicrotask(null, null, dart.as(_ROOT_ZONE, Zone), callback); |
- return; |
- } |
- Zone.current.scheduleMicrotask(Zone.current.bindCallback(callback, {runGuarded: true})); |
- } |
- let _scheduleImmediate = Symbol('_scheduleImmediate'); |
- let _initializeScheduleImmediate = Symbol('_initializeScheduleImmediate'); |
- let _scheduleImmediateJsOverride = Symbol('_scheduleImmediateJsOverride'); |
- let _scheduleImmediateWithSetImmediate = Symbol('_scheduleImmediateWithSetImmediate'); |
- let _scheduleImmediateWithTimer = Symbol('_scheduleImmediateWithTimer'); |
- class _AsyncRun extends core.Object { |
- static [_scheduleImmediate](callback) { |
- dart.dinvokef(scheduleImmediateClosure, callback); |
- } |
- static [_initializeScheduleImmediate]() { |
- _js_helper.requiresPreamble(); |
- if (self.scheduleImmediate !== null) { |
- return _scheduleImmediateJsOverride; |
+ [_onPause]() { |
+ this[_controller]._recordPause(this); |
} |
- if (dart.notNull(self.MutationObserver !== null) && dart.notNull(self.document !== null)) { |
- let div = self.document.createElement("div"); |
- let span = self.document.createElement("span"); |
- let storedCallback = null; |
- // Function internalCallback: (dynamic) → dynamic |
- function internalCallback(_) { |
- _isolate_helper.leaveJsAsync(); |
- let f = storedCallback; |
- storedCallback = null; |
- dart.dinvokef(f); |
- } |
- ; |
- let observer = new self.MutationObserver(_js_helper.convertDartClosureToJS(internalCallback, 1)); |
- observer.observe(div, {childList: true}); |
- return (callback) => { |
- dart.assert(storedCallback === null); |
- _isolate_helper.enterJsAsync(); |
- storedCallback = callback; |
- div.firstChild ? div.removeChild(span) : div.appendChild(span); |
- }; |
- } else if (self.setImmediate !== null) { |
- return _scheduleImmediateWithSetImmediate; |
+ [_onResume]() { |
+ this[_controller]._recordResume(this); |
} |
- return _scheduleImmediateWithTimer; |
} |
- static [_scheduleImmediateJsOverride](callback) { |
- // Function internalCallback: () → dynamic |
- function internalCallback() { |
- _isolate_helper.leaveJsAsync(); |
- callback(); |
+ return _ControllerSubscription; |
+ }); |
+ let _ControllerSubscription = _ControllerSubscription$(dart.dynamic); |
+ let _BroadcastSubscription$ = dart.generic(function(T) { |
+ class _BroadcastSubscription extends _ControllerSubscription$(T) { |
+ _BroadcastSubscription(controller, onData, onError, onDone, cancelOnError) { |
+ this[_eventState] = null; |
+ this[_next] = null; |
+ this[_previous] = null; |
+ super._ControllerSubscription(dart.as(controller, _StreamControllerLifecycle$(T)), onData, onError, onDone, cancelOnError); |
+ this[_next] = this[_previous] = this; |
} |
- ; |
- _isolate_helper.enterJsAsync(); |
- self.scheduleImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0)); |
+ get [_controller]() { |
+ return dart.as(super[_controller], _BroadcastStreamController); |
+ } |
+ [_expectsEvent](eventId) { |
+ return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_EVENT_ID)) === eventId; |
+ } |
+ [_toggleEventId]() { |
+ this[_eventState] = _BroadcastSubscription._STATE_EVENT_ID; |
+ } |
+ get [_isFiring]() { |
+ return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_FIRING)) !== 0; |
+ } |
+ [_setRemoveAfterFiring]() { |
+ dart.assert(this[_isFiring]); |
+ this[_eventState] = _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
+ } |
+ get [_removeAfterFiring]() { |
+ return (dart.notNull(this[_eventState]) & dart.notNull(_BroadcastSubscription._STATE_REMOVE_AFTER_FIRING)) !== 0; |
+ } |
+ [_onPause]() {} |
+ [_onResume]() {} |
} |
- static [_scheduleImmediateWithSetImmediate](callback) { |
- // Function internalCallback: () → dynamic |
- function internalCallback() { |
- _isolate_helper.leaveJsAsync(); |
- callback(); |
+ _BroadcastSubscription._STATE_EVENT_ID = 1; |
+ _BroadcastSubscription._STATE_FIRING = 2; |
+ _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING = 4; |
+ return _BroadcastSubscription; |
+ }); |
+ let _BroadcastSubscription = _BroadcastSubscription$(dart.dynamic); |
+ let _addStreamState = Symbol('_addStreamState'); |
+ let _doneFuture = Symbol('_doneFuture'); |
+ let _isEmpty = Symbol('_isEmpty'); |
+ let _hasOneListener = Symbol('_hasOneListener'); |
+ let _isAddingStream = Symbol('_isAddingStream'); |
+ let _mayAddEvent = Symbol('_mayAddEvent'); |
+ let _ensureDoneFuture = Symbol('_ensureDoneFuture'); |
+ let _addListener = Symbol('_addListener'); |
+ let _removeListener = Symbol('_removeListener'); |
+ let _subscribe = Symbol('_subscribe'); |
+ let _recordCancel = Symbol('_recordCancel'); |
+ let _callOnCancel = Symbol('_callOnCancel'); |
+ let _recordPause = Symbol('_recordPause'); |
+ let _recordResume = Symbol('_recordResume'); |
+ let _addEventError = Symbol('_addEventError'); |
+ let _forEachListener = Symbol('_forEachListener'); |
+ let _STATE_FIRING = Symbol('_STATE_FIRING'); |
+ let _mayComplete = Symbol('_mayComplete'); |
+ let _BroadcastStreamController$ = dart.generic(function(T) { |
+ class _BroadcastStreamController extends core.Object { |
+ _BroadcastStreamController($_onListen, $_onCancel) { |
+ this[_onListen] = $_onListen; |
+ this[_onCancel] = $_onCancel; |
+ this[_state] = _BroadcastStreamController._STATE_INITIAL; |
+ this[_next] = null; |
+ this[_previous] = null; |
+ this[_addStreamState] = null; |
+ this[_doneFuture] = null; |
+ this[_next] = this[_previous] = this; |
+ } |
+ get stream() { |
+ return new _BroadcastStream(this); |
+ } |
+ get sink() { |
+ return new _StreamSinkWrapper(this); |
+ } |
+ get isClosed() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_CLOSED)) !== 0; |
+ } |
+ get isPaused() { |
+ return false; |
} |
- ; |
- _isolate_helper.enterJsAsync(); |
- self.setImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0)); |
- } |
- static [_scheduleImmediateWithTimer](callback) { |
- Timer._createTimer(core.Duration.ZERO, callback); |
- } |
- } |
- dart.defineLazyProperties(_AsyncRun, { |
- get scheduleImmediateClosure() { |
- return _initializeScheduleImmediate(); |
- } |
- }); |
- let _sink = Symbol('_sink'); |
- let Stream$ = dart.generic(function(T) { |
- class Stream extends core.Object { |
- Stream() { |
+ get hasListener() { |
+ return !dart.notNull(this[_isEmpty]); |
} |
- Stream$fromFuture(future) { |
- let controller = dart.as(new StreamController({sync: true}), _StreamController$(T)); |
- future.then(((value) => { |
- controller._add(dart.as(value, T)); |
- controller._closeUnchecked(); |
- }).bind(this), {onError: ((error, stackTrace) => { |
- controller._addError(error, dart.as(stackTrace, core.StackTrace)); |
- controller._closeUnchecked(); |
- }).bind(this)}); |
- return controller.stream; |
+ get [_hasOneListener]() { |
+ dart.assert(!dart.notNull(this[_isEmpty])); |
+ return core.identical(this[_next][_next], this); |
} |
- Stream$fromIterable(data) { |
- return new _GeneratedStreamImpl(() => new _IterablePendingEvents(data)); |
+ get [_isFiring]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_FIRING)) !== 0; |
} |
- Stream$periodic(period, computation) { |
- if (computation === void 0) |
- computation = null; |
- if (computation === null) |
- computation = (i) => null; |
- let timer = null; |
- let computationCount = 0; |
- let controller = null; |
- let watch = new core.Stopwatch(); |
- // Function sendEvent: () → void |
- function sendEvent() { |
- watch.reset(); |
- let data = computation((($tmp) => computationCount = dart.notNull($tmp) + 1, $tmp)(computationCount)); |
- controller.add(data); |
- } |
- // Function startPeriodicTimer: () → void |
- function startPeriodicTimer() { |
- dart.assert(timer === null); |
- timer = new Timer.periodic(period, (timer) => { |
- sendEvent(); |
- }); |
- } |
- controller = new StreamController({sync: true, onListen: (() => { |
- watch.start(); |
- startPeriodicTimer(); |
- }).bind(this), onPause: (() => { |
- timer.cancel(); |
- timer = null; |
- watch.stop(); |
- }).bind(this), onResume: (() => { |
- dart.assert(timer === null); |
- let elapsed = watch.elapsed; |
- watch.start(); |
- timer = new Timer(period['-'](elapsed), () => { |
- timer = null; |
- startPeriodicTimer(); |
- sendEvent(); |
- }); |
- }).bind(this), onCancel: (() => { |
- if (timer !== null) |
- timer.cancel(); |
- timer = null; |
- }).bind(this)}); |
- return controller.stream; |
+ get [_isAddingStream]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM)) !== 0; |
} |
- Stream$eventTransformed(source, mapSink) { |
- return dart.as(new _BoundSinkStream(source, dart.as(mapSink, _SinkMapper)), Stream$(T)); |
+ get [_mayAddEvent]() { |
+ return dart.notNull(this[_state]) < dart.notNull(_BroadcastStreamController._STATE_CLOSED); |
} |
- get isBroadcast() { |
- return false; |
+ [_ensureDoneFuture]() { |
+ if (this[_doneFuture] !== null) |
+ return this[_doneFuture]; |
+ return this[_doneFuture] = new _Future(); |
} |
- asBroadcastStream(opt$) { |
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
- return new _AsBroadcastStream(this, dart.as(onListen, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void")), dart.as(onCancel, dart.throw_("Unimplemented type (StreamSubscription<dynamic>) → void"))); |
+ get [_isEmpty]() { |
+ return core.identical(this[_next], this); |
} |
- where(test) { |
- return new _WhereStream(this, test); |
+ [_addListener](subscription) { |
+ dart.assert(core.identical(subscription[_next], subscription)); |
+ subscription[_previous] = this[_previous]; |
+ subscription[_next] = this; |
+ this[_previous][_next] = subscription; |
+ this[_previous] = subscription; |
+ subscription[_eventState] = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID); |
} |
- map(convert) { |
- return new _MapStream(this, convert); |
+ [_removeListener](subscription) { |
+ dart.assert(core.identical(subscription[_controller], this)); |
+ dart.assert(!dart.notNull(core.identical(subscription[_next], subscription))); |
+ let previous = subscription[_previous]; |
+ let next = subscription[_next]; |
+ previous[_next] = next; |
+ next[_previous] = previous; |
+ subscription[_next] = subscription[_previous] = subscription; |
} |
- asyncMap(convert) { |
- let controller = null; |
- let subscription = null; |
- // Function onListen: () → void |
- function onListen() { |
- let add = controller.add; |
- dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController))); |
- let eventSink = controller; |
- let addError = eventSink[_addError]; |
- subscription = this.listen(((event) => { |
- let newValue = null; |
- try { |
- newValue = convert(event); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- controller.addError(e, s); |
- return; |
- } |
- |
- if (dart.is(newValue, Future)) { |
- subscription.pause(); |
- dart.dinvoke(dart.dinvoke(newValue, 'then', add, {onError: addError}), 'whenComplete', subscription.resume); |
- } else { |
- controller.add(newValue); |
- } |
- }).bind(this), {onError: dart.as(addError, core.Function), onDone: controller.close}); |
+ [_subscribe](onData, onError, onDone, cancelOnError) { |
+ if (this.isClosed) { |
+ if (onDone === null) |
+ onDone = _nullDoneHandler; |
+ return new _DoneStreamSubscription(onDone); |
} |
- if (this.isBroadcast) { |
- controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => { |
- subscription.cancel(); |
- }).bind(this), sync: true}); |
+ let subscription = new _BroadcastSubscription(this, onData, onError, onDone, cancelOnError); |
+ this[_addListener](dart.as(subscription, _BroadcastSubscription$(T))); |
+ if (core.identical(this[_next], this[_previous])) { |
+ _runGuarded(this[_onListen]); |
+ } |
+ return dart.as(subscription, StreamSubscription$(T)); |
+ } |
+ [_recordCancel](subscription) { |
+ if (core.identical(subscription[_next], subscription)) |
+ return null; |
+ dart.assert(!dart.notNull(core.identical(subscription[_next], subscription))); |
+ if (subscription[_isFiring]) { |
+ subscription._setRemoveAfterFiring(); |
} else { |
- controller = new StreamController({onListen: onListen, onPause: (() => { |
- subscription.pause(); |
- }).bind(this), onResume: (() => { |
- subscription.resume(); |
- }).bind(this), onCancel: (() => { |
- subscription.cancel(); |
- }).bind(this), sync: true}); |
+ dart.assert(!dart.notNull(core.identical(subscription[_next], subscription))); |
+ this[_removeListener](subscription); |
+ if (!dart.notNull(this[_isFiring]) && dart.notNull(this[_isEmpty])) { |
+ this[_callOnCancel](); |
+ } |
} |
- return controller.stream; |
+ return null; |
} |
- asyncExpand(convert) { |
- let controller = null; |
- let subscription = null; |
- // Function onListen: () → void |
- function onListen() { |
- dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController))); |
- let eventSink = controller; |
- subscription = this.listen(((event) => { |
- let newStream = null; |
- try { |
- newStream = convert(event); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- controller.addError(e, s); |
- return; |
- } |
- |
- if (newStream !== null) { |
- subscription.pause(); |
- controller.addStream(newStream).whenComplete(subscription.resume); |
- } |
- }).bind(this), {onError: dart.as(eventSink[_addError], core.Function), onDone: controller.close}); |
+ [_recordPause](subscription) {} |
+ [_recordResume](subscription) {} |
+ [_addEventError]() { |
+ if (this.isClosed) { |
+ return new core.StateError("Cannot add new events after calling close"); |
} |
- if (this.isBroadcast) { |
- controller = new StreamController.broadcast({onListen: onListen, onCancel: (() => { |
- subscription.cancel(); |
- }).bind(this), sync: true}); |
- } else { |
- controller = new StreamController({onListen: onListen, onPause: (() => { |
- subscription.pause(); |
- }).bind(this), onResume: (() => { |
- subscription.resume(); |
- }).bind(this), onCancel: (() => { |
- subscription.cancel(); |
- }).bind(this), sync: true}); |
+ dart.assert(this[_isAddingStream]); |
+ return new core.StateError("Cannot add new events while doing an addStream"); |
+ } |
+ add(data) { |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_addEventError](); |
+ this[_sendData](data); |
+ } |
+ addError(error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ error = _nonNullError(error); |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_addEventError](); |
+ let replacement = Zone.current.errorCallback(error, stackTrace); |
+ if (replacement !== null) { |
+ error = _nonNullError(replacement.error); |
+ stackTrace = replacement.stackTrace; |
} |
- return controller.stream; |
+ this[_sendError](error, stackTrace); |
} |
- handleError(onError, opt$) { |
- let test = opt$.test === void 0 ? null : opt$.test; |
- return new _HandleErrorStream(this, onError, test); |
+ close() { |
+ if (this.isClosed) { |
+ dart.assert(this[_doneFuture] !== null); |
+ return this[_doneFuture]; |
+ } |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_addEventError](); |
+ this[_state] = _BroadcastStreamController._STATE_CLOSED; |
+ let doneFuture = this[_ensureDoneFuture](); |
+ this[_sendDone](); |
+ return doneFuture; |
+ } |
+ get done() { |
+ return this[_ensureDoneFuture](); |
} |
- expand(convert) { |
- return new _ExpandStream(this, convert); |
+ addStream(stream, opt$) { |
+ let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError; |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_addEventError](); |
+ this[_state] = _BroadcastStreamController._STATE_ADDSTREAM; |
+ this[_addStreamState] = dart.as(new _AddStreamState(this, stream, cancelOnError), _AddStreamState$(T)); |
+ return this[_addStreamState].addStreamFuture; |
} |
- pipe(streamConsumer) { |
- return streamConsumer.addStream(this).then(((_) => streamConsumer.close()).bind(this)); |
+ [_add](data) { |
+ this[_sendData](data); |
} |
- transform(streamTransformer) { |
- return streamTransformer.bind(this); |
+ [_addError](error, stackTrace) { |
+ this[_sendError](error, stackTrace); |
} |
- reduce(combine) { |
- let result = new _Future(); |
- let seenFirst = false; |
- let value = null; |
- let subscription = null; |
- subscription = this.listen((element) => { |
- if (seenFirst) { |
- _runUserCode(() => combine(value, element), dart.as((newValue) => { |
- value = newValue; |
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
+ [_close]() { |
+ dart.assert(this[_isAddingStream]); |
+ let addState = this[_addStreamState]; |
+ this[_addStreamState] = null; |
+ this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_ADDSTREAM); |
+ addState.complete(); |
+ } |
+ [_forEachListener](action) { |
+ if (this[_isFiring]) { |
+ throw new core.StateError("Cannot fire new event. Controller is already firing an event"); |
+ } |
+ if (this[_isEmpty]) |
+ return; |
+ let id = dart.notNull(this[_state]) & dart.notNull(_BroadcastStreamController._STATE_EVENT_ID); |
+ this[_state] = dart.notNull(_BroadcastStreamController._STATE_EVENT_ID) | dart.notNull(_BroadcastStreamController._STATE_FIRING); |
+ let link = this[_next]; |
+ while (!dart.notNull(core.identical(link, this))) { |
+ let subscription = dart.as(link, _BroadcastSubscription$(T)); |
+ if (subscription._expectsEvent(id)) { |
+ subscription[_eventState] = _BroadcastSubscription[_STATE_FIRING]; |
+ action(subscription); |
+ subscription._toggleEventId(); |
+ link = subscription[_next]; |
+ if (subscription[_removeAfterFiring]) { |
+ this[_removeListener](subscription); |
+ } |
+ subscription[_eventState] = ~dart.notNull(_BroadcastSubscription[_STATE_FIRING]); |
} else { |
- value = element; |
- seenFirst = true; |
+ link = subscription[_next]; |
} |
- }, {onError: result[_completeError], onDone: (() => { |
- if (!dart.notNull(seenFirst)) { |
- try { |
- throw _internal.IterableElementError.noElement(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(result, e, s); |
- } |
- |
- } else { |
- result._complete(value); |
- } |
- }).bind(this), cancelOnError: true}); |
- return result; |
+ } |
+ this[_state] = ~dart.notNull(_BroadcastStreamController._STATE_FIRING); |
+ if (this[_isEmpty]) { |
+ this[_callOnCancel](); |
+ } |
} |
- fold(initialValue, combine) { |
- let result = new _Future(); |
- let value = initialValue; |
- let subscription = null; |
- subscription = this.listen((element) => { |
- _runUserCode(() => combine(value, element), (newValue) => { |
- value = newValue; |
- }, dart.as(_cancelAndErrorClosure(subscription, result), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: ((e, st) => { |
- result._completeError(e, dart.as(st, core.StackTrace)); |
- }).bind(this), onDone: (() => { |
- result._complete(value); |
- }).bind(this), cancelOnError: true}); |
- return result; |
+ [_callOnCancel]() { |
+ dart.assert(this[_isEmpty]); |
+ if (dart.notNull(this.isClosed) && dart.notNull(this[_doneFuture][_mayComplete])) { |
+ this[_doneFuture]._asyncComplete(null); |
+ } |
+ _runGuarded(this[_onCancel]); |
} |
- join(separator) { |
- if (separator === void 0) |
- separator = ""; |
- let result = new _Future(); |
- let buffer = new core.StringBuffer(); |
- let subscription = null; |
- let first = true; |
- subscription = this.listen(((element) => { |
- if (!dart.notNull(first)) { |
- buffer.write(separator); |
- } |
- first = false; |
- try { |
- buffer.write(element); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _cancelAndErrorWithReplacement(subscription, result, e, s); |
+ } |
+ _BroadcastStreamController._STATE_INITIAL = 0; |
+ _BroadcastStreamController._STATE_EVENT_ID = 1; |
+ _BroadcastStreamController._STATE_FIRING = 2; |
+ _BroadcastStreamController._STATE_CLOSED = 4; |
+ _BroadcastStreamController._STATE_ADDSTREAM = 8; |
+ return _BroadcastStreamController; |
+ }); |
+ let _BroadcastStreamController = _BroadcastStreamController$(dart.dynamic); |
+ let _SyncBroadcastStreamController$ = dart.generic(function(T) { |
+ class _SyncBroadcastStreamController extends _BroadcastStreamController$(T) { |
+ _SyncBroadcastStreamController(onListen, onCancel) { |
+ super._BroadcastStreamController(onListen, onCancel); |
+ } |
+ [_sendData](data) { |
+ if (this[_isEmpty]) |
+ return; |
+ if (this[_hasOneListener]) { |
+ this[_state] = _BroadcastStreamController[_STATE_FIRING]; |
+ let subscription = dart.as(this[_next], _BroadcastSubscription); |
+ subscription._add(data); |
+ this[_state] = ~dart.notNull(_BroadcastStreamController[_STATE_FIRING]); |
+ if (this[_isEmpty]) { |
+ this[_callOnCancel](); |
} |
- |
- }).bind(this), {onError: ((e) => { |
- result._completeError(e); |
- }).bind(this), onDone: (() => { |
- result._complete(buffer.toString()); |
- }).bind(this), cancelOnError: true}); |
- return result; |
+ return; |
+ } |
+ this[_forEachListener](((subscription) => { |
+ subscription._add(data); |
+ }).bind(this)); |
} |
- contains(needle) { |
- let future = new _Future(); |
- let subscription = null; |
- subscription = this.listen((element) => { |
- _runUserCode(() => dart.equals(element, needle), dart.as((isMatch) => { |
- if (isMatch) { |
- _cancelAndValue(subscription, future, true); |
- } |
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: future[_completeError], onDone: (() => { |
- future._complete(false); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ [_sendError](error, stackTrace) { |
+ if (this[_isEmpty]) |
+ return; |
+ this[_forEachListener](((subscription) => { |
+ subscription._addError(error, stackTrace); |
+ }).bind(this)); |
} |
- forEach(action) { |
- let future = new _Future(); |
- let subscription = null; |
- subscription = this.listen((element) => { |
- _runUserCode(() => action(element), (_) => { |
- }, dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: future[_completeError], onDone: (() => { |
- future._complete(null); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ [_sendDone]() { |
+ if (!dart.notNull(this[_isEmpty])) { |
+ this[_forEachListener](dart.as(((subscription) => { |
+ subscription._close(); |
+ }).bind(this), dart.throw_("Unimplemented type (_BufferingStreamSubscription<T>) → void"))); |
+ } else { |
+ dart.assert(this[_doneFuture] !== null); |
+ dart.assert(this[_doneFuture][_mayComplete]); |
+ this[_doneFuture]._asyncComplete(null); |
+ } |
} |
- every(test) { |
- let future = new _Future(); |
- let subscription = null; |
- subscription = this.listen((element) => { |
- _runUserCode(() => test(element), dart.as((isMatch) => { |
- if (!dart.notNull(isMatch)) { |
- _cancelAndValue(subscription, future, false); |
- } |
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: future[_completeError], onDone: (() => { |
- future._complete(true); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ } |
+ return _SyncBroadcastStreamController; |
+ }); |
+ let _SyncBroadcastStreamController = _SyncBroadcastStreamController$(dart.dynamic); |
+ let _AsyncBroadcastStreamController$ = dart.generic(function(T) { |
+ class _AsyncBroadcastStreamController extends _BroadcastStreamController$(T) { |
+ _AsyncBroadcastStreamController(onListen, onCancel) { |
+ super._BroadcastStreamController(onListen, onCancel); |
} |
- any(test) { |
- let future = new _Future(); |
- let subscription = null; |
- subscription = this.listen((element) => { |
- _runUserCode(() => test(element), dart.as((isMatch) => { |
- if (isMatch) { |
- _cancelAndValue(subscription, future, true); |
- } |
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: future[_completeError], onDone: (() => { |
- future._complete(false); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ [_sendData](data) { |
+ for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) { |
+ let subscription = dart.as(link, _BroadcastSubscription$(T)); |
+ subscription._addPending(new _DelayedData(data)); |
+ } |
+ } |
+ [_sendError](error, stackTrace) { |
+ for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) { |
+ let subscription = dart.as(link, _BroadcastSubscription$(T)); |
+ subscription._addPending(new _DelayedError(error, stackTrace)); |
+ } |
+ } |
+ [_sendDone]() { |
+ if (!dart.notNull(this[_isEmpty])) { |
+ for (let link = this[_next]; !dart.notNull(core.identical(link, this)); link = link[_next]) { |
+ let subscription = dart.as(link, _BroadcastSubscription$(T)); |
+ subscription._addPending(new _DelayedDone()); |
+ } |
+ } else { |
+ dart.assert(this[_doneFuture] !== null); |
+ dart.assert(this[_doneFuture][_mayComplete]); |
+ this[_doneFuture]._asyncComplete(null); |
+ } |
+ } |
+ } |
+ return _AsyncBroadcastStreamController; |
+ }); |
+ let _AsyncBroadcastStreamController = _AsyncBroadcastStreamController$(dart.dynamic); |
+ let _addPendingEvent = Symbol('_addPendingEvent'); |
+ let _STATE_CLOSED = Symbol('_STATE_CLOSED'); |
+ let _AsBroadcastStreamController$ = dart.generic(function(T) { |
+ class _AsBroadcastStreamController extends _SyncBroadcastStreamController$(T) { |
+ _AsBroadcastStreamController(onListen, onCancel) { |
+ this[_pending] = null; |
+ super._SyncBroadcastStreamController(onListen, onCancel); |
} |
- get length() { |
- let future = new _Future(); |
- let count = 0; |
- this.listen((_) => { |
- count = dart.notNull(count) + 1; |
- }, {onError: future[_completeError], onDone: (() => { |
- future._complete(count); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ get [_hasPending]() { |
+ return dart.notNull(this[_pending] !== null) && !dart.notNull(this[_pending].isEmpty); |
} |
- get isEmpty() { |
- let future = new _Future(); |
- let subscription = null; |
- subscription = this.listen((_) => { |
- _cancelAndValue(subscription, future, false); |
- }, {onError: future[_completeError], onDone: (() => { |
- future._complete(true); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ [_addPendingEvent](event) { |
+ if (this[_pending] === null) { |
+ this[_pending] = new _StreamImplEvents(); |
+ } |
+ this[_pending].add(event); |
} |
- toList() { |
- let result = new List.from([]); |
- let future = new _Future(); |
- this.listen(((data) => { |
- result.add(data); |
- }).bind(this), {onError: future[_completeError], onDone: (() => { |
- future._complete(result); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ add(data) { |
+ if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) { |
+ this[_addPendingEvent](new _DelayedData(data)); |
+ return; |
+ } |
+ super.add(data); |
+ while (this[_hasPending]) { |
+ this[_pending].handleNext(this); |
+ } |
} |
- toSet() { |
- let result = new core.Set(); |
- let future = new _Future(); |
- this.listen(((data) => { |
- result.add(data); |
- }).bind(this), {onError: future[_completeError], onDone: (() => { |
- future._complete(result); |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ addError(error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) { |
+ this[_addPendingEvent](new _DelayedError(error, stackTrace)); |
+ return; |
+ } |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_addEventError](); |
+ this[_sendError](error, stackTrace); |
+ while (this[_hasPending]) { |
+ this[_pending].handleNext(this); |
+ } |
} |
- drain(futureValue) { |
- if (futureValue === void 0) |
- futureValue = null; |
- return this.listen(null, {cancelOnError: true}).asFuture(futureValue); |
+ close() { |
+ if (!dart.notNull(this.isClosed) && dart.notNull(this[_isFiring])) { |
+ this[_addPendingEvent](new _DelayedDone()); |
+ this[_state] = _BroadcastStreamController[_STATE_CLOSED]; |
+ return super.done; |
+ } |
+ let result = super.close(); |
+ dart.assert(!dart.notNull(this[_hasPending])); |
+ return result; |
} |
- take(count) { |
- return dart.as(new _TakeStream(this, count), Stream$(T)); |
+ [_callOnCancel]() { |
+ if (this[_hasPending]) { |
+ this[_pending].clear(); |
+ this[_pending] = null; |
+ } |
+ super._callOnCancel(); |
} |
- takeWhile(test) { |
- return dart.as(new _TakeWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T)); |
+ } |
+ return _AsBroadcastStreamController; |
+ }); |
+ let _AsBroadcastStreamController = _AsBroadcastStreamController$(dart.dynamic); |
+ let _pauseCount = Symbol('_pauseCount'); |
+ let _resume = Symbol('_resume'); |
+ let _DoneSubscription$ = dart.generic(function(T) { |
+ class _DoneSubscription extends core.Object { |
+ _DoneSubscription() { |
+ this[_pauseCount] = 0; |
} |
- skip(count) { |
- return dart.as(new _SkipStream(this, count), Stream$(T)); |
+ onData(handleData) {} |
+ onError(handleError) {} |
+ onDone(handleDone) {} |
+ pause(resumeSignal) { |
+ if (resumeSignal === void 0) |
+ resumeSignal = null; |
+ if (resumeSignal !== null) |
+ resumeSignal.then(this[_resume]); |
+ this[_pauseCount] = dart.notNull(this[_pauseCount]) + 1; |
} |
- skipWhile(test) { |
- return dart.as(new _SkipWhileStream(this, dart.as(test, dart.throw_("Unimplemented type (dynamic) → bool"))), Stream$(T)); |
+ resume() { |
+ this[_resume](null); |
} |
- distinct(equals) { |
- if (equals === void 0) |
- equals = null; |
- return dart.as(new _DistinctStream(this, dart.as(equals, dart.throw_("Unimplemented type (dynamic, dynamic) → bool"))), Stream$(T)); |
+ [_resume](_) { |
+ if (dart.notNull(this[_pauseCount]) > 0) |
+ this[_pauseCount] = dart.notNull(this[_pauseCount]) - 1; |
} |
- get first() { |
- let future = new _Future(); |
- let subscription = null; |
- subscription = this.listen((value) => { |
- _cancelAndValue(subscription, future, value); |
- }, { |
- onError: future[_completeError], |
- onDone: () => { |
- try { |
- throw _internal.IterableElementError.noElement(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(future, e, s); |
- } |
- |
- }, |
- cancelOnError: true |
- }); |
- return future; |
+ cancel() { |
+ return new _Future.immediate(null); |
} |
- get last() { |
- let future = new _Future(); |
- let result = null; |
- let foundResult = false; |
- let subscription = null; |
- subscription = this.listen((value) => { |
- foundResult = true; |
- result = value; |
- }, {onError: future[_completeError], onDone: (() => { |
- if (foundResult) { |
- future._complete(result); |
- return; |
- } |
- try { |
- throw _internal.IterableElementError.noElement(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(future, e, s); |
- } |
- |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ get isPaused() { |
+ return dart.notNull(this[_pauseCount]) > 0; |
} |
- get single() { |
- let future = new _Future(); |
- let result = null; |
- let foundResult = false; |
- let subscription = null; |
- subscription = this.listen((value) => { |
- if (foundResult) { |
- try { |
- throw _internal.IterableElementError.tooMany(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _cancelAndErrorWithReplacement(subscription, future, e, s); |
- } |
+ asFuture(value) { |
+ if (value === void 0) |
+ value = null; |
+ return new _Future(); |
+ } |
+ } |
+ return _DoneSubscription; |
+ }); |
+ let _DoneSubscription = _DoneSubscription$(dart.dynamic); |
+ class DeferredLibrary extends core.Object { |
+ DeferredLibrary(libraryName, opt$) { |
+ let uri = opt$.uri === void 0 ? null : opt$.uri; |
+ this.libraryName = libraryName; |
+ this.uri = uri; |
+ } |
+ load() { |
+ throw 'DeferredLibrary not supported. ' + 'please use the `import "lib.dart" deferred as lib` syntax.'; |
+ } |
+ } |
+ let _s = Symbol('_s'); |
+ class DeferredLoadException extends core.Object { |
+ DeferredLoadException($_s) { |
+ this[_s] = $_s; |
+ } |
+ toString() { |
+ return `DeferredLoadException: '${this[_s]}'`; |
+ } |
+ } |
+ let Future$ = dart.generic(function(T) { |
+ class Future extends core.Object { |
+ Future(computation) { |
+ let result = new _Future(); |
+ Timer.run((() => { |
+ try { |
+ result._complete(computation()); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(result, e, s); |
+ } |
- return; |
+ }).bind(this)); |
+ return dart.as(result, Future$(T)); |
+ } |
+ Future$microtask(computation) { |
+ let result = new _Future(); |
+ scheduleMicrotask((() => { |
+ try { |
+ result._complete(computation()); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(result, e, s); |
} |
- foundResult = true; |
- result = value; |
- }, {onError: future[_completeError], onDone: (() => { |
- if (foundResult) { |
- future._complete(result); |
- return; |
- } |
- try { |
- throw _internal.IterableElementError.noElement(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(future, e, s); |
- } |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ }).bind(this)); |
+ return dart.as(result, Future$(T)); |
+ } |
+ Future$sync(computation) { |
+ try { |
+ let result = computation(); |
+ return new Future.value(result); |
+ } catch (error) { |
+ let stackTrace = dart.stackTrace(error); |
+ return new Future.error(error, stackTrace); |
+ } |
+ |
} |
- firstWhere(test, opt$) { |
- let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue; |
- let future = new _Future(); |
- let subscription = null; |
- subscription = this.listen((value) => { |
- _runUserCode(() => test(value), dart.as((isMatch) => { |
- if (isMatch) { |
- _cancelAndValue(subscription, future, value); |
- } |
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: future[_completeError], onDone: (() => { |
- if (defaultValue !== null) { |
- _runUserCode(defaultValue, future[_complete], future[_completeError]); |
- return; |
- } |
- try { |
- throw _internal.IterableElementError.noElement(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(future, e, s); |
- } |
- |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ Future$value(value) { |
+ if (value === void 0) |
+ value = null; |
+ return new _Future.immediate(value); |
} |
- lastWhere(test, opt$) { |
- let defaultValue = opt$.defaultValue === void 0 ? null : opt$.defaultValue; |
- let future = new _Future(); |
- let result = null; |
- let foundResult = false; |
- let subscription = null; |
- subscription = this.listen((value) => { |
- _runUserCode(() => true === test(value), dart.as((isMatch) => { |
- if (isMatch) { |
- foundResult = true; |
- result = value; |
- } |
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: future[_completeError], onDone: (() => { |
- if (foundResult) { |
- future._complete(result); |
- return; |
- } |
- if (defaultValue !== null) { |
- _runUserCode(defaultValue, future[_complete], future[_completeError]); |
- return; |
- } |
- try { |
- throw _internal.IterableElementError.noElement(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(future, e, s); |
- } |
+ Future$error(error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ error = _nonNullError(error); |
+ if (!dart.notNull(core.identical(Zone.current, _ROOT_ZONE))) { |
+ let replacement = Zone.current.errorCallback(error, stackTrace); |
+ if (replacement !== null) { |
+ error = _nonNullError(replacement.error); |
+ stackTrace = replacement.stackTrace; |
+ } |
+ } |
+ return new _Future.immediateError(error, stackTrace); |
+ } |
+ Future$delayed(duration, computation) { |
+ if (computation === void 0) |
+ computation = null; |
+ let result = new _Future(); |
+ new Timer(duration, (() => { |
+ try { |
+ result._complete(computation === null ? null : computation()); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ _completeWithErrorCallback(result, e, s); |
+ } |
- }).bind(this), cancelOnError: true}); |
- return future; |
+ }).bind(this)); |
+ return dart.as(result, Future$(T)); |
} |
- singleWhere(test) { |
- let future = new _Future(); |
- let result = null; |
- let foundResult = false; |
- let subscription = null; |
- subscription = this.listen((value) => { |
- _runUserCode(() => true === test(value), dart.as((isMatch) => { |
- if (isMatch) { |
- if (foundResult) { |
- try { |
- throw _internal.IterableElementError.tooMany(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _cancelAndErrorWithReplacement(subscription, future, e, s); |
+ static wait(futures, opt$) { |
+ let eagerError = opt$.eagerError === void 0 ? false : opt$.eagerError; |
+ let cleanUp = opt$.cleanUp === void 0 ? null : opt$.cleanUp; |
+ let result = new _Future(); |
+ let values = null; |
+ let remaining = 0; |
+ let error = null; |
+ let stackTrace = null; |
+ // Function handleError: (dynamic, dynamic) → void |
+ function handleError(theError, theStackTrace) { |
+ remaining = dart.notNull(remaining) - 1; |
+ if (values !== null) { |
+ if (cleanUp !== null) { |
+ for (let value of values) { |
+ if (value !== null) { |
+ new Future.sync(() => { |
+ cleanUp(value); |
+ }); |
} |
- |
- return; |
} |
- foundResult = true; |
- result = value; |
- } |
- }, dart.throw_("Unimplemented type (dynamic) → dynamic")), dart.as(_cancelAndErrorClosure(subscription, future), dart.throw_("Unimplemented type (dynamic, StackTrace) → dynamic"))); |
- }, {onError: future[_completeError], onDone: (() => { |
- if (foundResult) { |
- future._complete(result); |
- return; |
} |
- try { |
- throw _internal.IterableElementError.noElement(); |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- _completeWithErrorCallback(future, e, s); |
+ values = null; |
+ if (remaining === 0 || dart.notNull(eagerError)) { |
+ result._completeError(theError, dart.as(theStackTrace, core.StackTrace)); |
+ } else { |
+ error = theError; |
+ stackTrace = dart.as(theStackTrace, core.StackTrace); |
} |
- |
- }).bind(this), cancelOnError: true}); |
- return future; |
- } |
- elementAt(index) { |
- if (dart.notNull(!(typeof index == number)) || dart.notNull(index) < 0) |
- throw new core.ArgumentError(index); |
- let future = new _Future(); |
- let subscription = null; |
- let elementIndex = 0; |
- subscription = this.listen((value) => { |
- if (index === elementIndex) { |
- _cancelAndValue(subscription, future, value); |
- return; |
+ } else if (remaining === 0 && !dart.notNull(eagerError)) { |
+ result._completeError(error, stackTrace); |
} |
- elementIndex = 1; |
- }, {onError: future[_completeError], onDone: (() => { |
- future._completeError(new core.RangeError.index(index, this, "index", null, elementIndex)); |
- }).bind(this), cancelOnError: true}); |
- return future; |
- } |
- timeout(timeLimit, opt$) { |
- let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout; |
- let controller = null; |
- let subscription = null; |
- let timer = null; |
- let zone = null; |
- let timeout = null; |
- // Function onData: (T) → void |
- function onData(event) { |
- timer.cancel(); |
- controller.add(event); |
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
} |
- // Function onError: (dynamic, StackTrace) → void |
- function onError(error, stackTrace) { |
- timer.cancel(); |
- dart.assert(dart.notNull(dart.is(controller, _StreamController)) || dart.notNull(dart.is(controller, _BroadcastStreamController))); |
- let eventSink = controller; |
- dart.dinvoke(eventSink, '_addError', error, stackTrace); |
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
+ for (let future of futures) { |
+ let pos = (($tmp) => remaining = dart.notNull($tmp) + 1, $tmp)(remaining); |
+ future.then(dart.as(((value) => { |
+ remaining = dart.notNull(remaining) - 1; |
+ if (values !== null) { |
+ values.set(pos, value); |
+ if (remaining === 0) { |
+ result._completeWithValue(values); |
+ } |
+ } else { |
+ if (dart.notNull(cleanUp !== null) && dart.notNull(value !== null)) { |
+ new Future.sync(() => { |
+ cleanUp(value); |
+ }); |
+ } |
+ if (remaining === 0 && !dart.notNull(eagerError)) { |
+ result._completeError(error, stackTrace); |
+ } |
+ } |
+ }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: handleError}); |
} |
- // Function onDone: () → void |
- function onDone() { |
- timer.cancel(); |
- controller.close(); |
+ if (remaining === 0) { |
+ return dart.as(new Future.value(/* Unimplemented const */new List.from([])), Future$(core.List)); |
} |
- // Function onListen: () → void |
- function onListen() { |
- zone = Zone.current; |
- if (onTimeout === null) { |
- timeout = (() => { |
- controller.addError(new TimeoutException("No stream event", timeLimit), null); |
- }).bind(this); |
+ values = new core.List(remaining); |
+ return result; |
+ } |
+ static forEach(input, f) { |
+ let iterator = input.iterator; |
+ return doWhile((() => { |
+ if (!dart.notNull(iterator.moveNext())) |
+ return false; |
+ return new Future.sync((() => f(iterator.current)).bind(this)).then((_) => true); |
+ }).bind(this)); |
+ } |
+ static doWhile(f) { |
+ let doneSignal = new _Future(); |
+ let nextIteration = null; |
+ nextIteration = Zone.current.bindUnaryCallback(dart.as(((keepGoing) => { |
+ if (keepGoing) { |
+ new Future.sync(f).then(dart.as(nextIteration, dart.throw_("Unimplemented type (dynamic) → dynamic")), {onError: doneSignal[_completeError]}); |
} else { |
- onTimeout = zone.registerUnaryCallback(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic"))); |
- let wrapper = new _ControllerEventSinkWrapper(null); |
- timeout = (() => { |
- wrapper[_sink] = controller; |
- zone.runUnaryGuarded(dart.as(onTimeout, dart.throw_("Unimplemented type (dynamic) → dynamic")), wrapper); |
- wrapper[_sink] = null; |
- }).bind(this); |
- } |
- subscription = this.listen(onData, {onError: onError, onDone: onDone}); |
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
- } |
- // Function onCancel: () → Future<dynamic> |
- function onCancel() { |
- timer.cancel(); |
- let result = subscription.cancel(); |
- subscription = null; |
- return result; |
- } |
- controller = this.isBroadcast ? new _SyncBroadcastStreamController(onListen, onCancel) : new _SyncStreamController(onListen, (() => { |
- timer.cancel(); |
- subscription.pause(); |
- }).bind(this), (() => { |
- subscription.resume(); |
- timer = zone.createTimer(timeLimit, dart.as(timeout, dart.throw_("Unimplemented type () → void"))); |
- }).bind(this), onCancel); |
- return controller.stream; |
+ doneSignal._complete(null); |
+ } |
+ }).bind(this), dart.throw_("Unimplemented type (dynamic) → dynamic")), {runGuarded: true}); |
+ dart.dinvokef(nextIteration, true); |
+ return doneSignal; |
} |
} |
- dart.defineNamedConstructor(Stream, 'fromFuture'); |
- dart.defineNamedConstructor(Stream, 'fromIterable'); |
- dart.defineNamedConstructor(Stream, 'periodic'); |
- dart.defineNamedConstructor(Stream, 'eventTransformed'); |
- return Stream; |
+ dart.defineNamedConstructor(Future, 'microtask'); |
+ dart.defineNamedConstructor(Future, 'sync'); |
+ dart.defineNamedConstructor(Future, 'value'); |
+ dart.defineNamedConstructor(Future, 'error'); |
+ dart.defineNamedConstructor(Future, 'delayed'); |
+ dart.defineLazyProperties(Future, { |
+ get _nullFuture() { |
+ return dart.as(new Future.value(null), _Future); |
+ } |
+ }); |
+ return Future; |
}); |
- let Stream = Stream$(dart.dynamic); |
- let StreamSubscription$ = dart.generic(function(T) { |
- class StreamSubscription extends core.Object { |
+ let Future = Future$(dart.dynamic); |
+ class TimeoutException extends core.Object { |
+ TimeoutException(message, duration) { |
+ if (duration === void 0) |
+ duration = null; |
+ this.message = message; |
+ this.duration = duration; |
} |
- return StreamSubscription; |
- }); |
- let StreamSubscription = StreamSubscription$(dart.dynamic); |
- let EventSink$ = dart.generic(function(T) { |
- class EventSink extends core.Object { |
+ toString() { |
+ let result = "TimeoutException"; |
+ if (this.duration !== null) |
+ result = `TimeoutException after ${this.duration}`; |
+ if (this.message !== null) |
+ result = `${result}: ${this.message}`; |
+ return result; |
} |
- return EventSink; |
- }); |
- let EventSink = EventSink$(dart.dynamic); |
- let _stream = Symbol('_stream'); |
- let StreamView$ = dart.generic(function(T) { |
- class StreamView extends Stream$(T) { |
- StreamView($_stream) { |
- this[_stream] = $_stream; |
- super.Stream(); |
- } |
- get isBroadcast() { |
- return this[_stream].isBroadcast; |
- } |
- asBroadcastStream(opt$) { |
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
- return this[_stream].asBroadcastStream({onListen: onListen, onCancel: onCancel}); |
+ } |
+ let Completer$ = dart.generic(function(T) { |
+ class Completer extends core.Object { |
+ Completer() { |
+ return new _AsyncCompleter(); |
} |
- listen(onData, opt$) { |
- let onError = opt$.onError === void 0 ? null : opt$.onError; |
- let onDone = opt$.onDone === void 0 ? null : opt$.onDone; |
- let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError; |
- return this[_stream].listen(onData, {onError: onError, onDone: onDone, cancelOnError: cancelOnError}); |
+ Completer$sync() { |
+ return new _SyncCompleter(); |
} |
} |
- return StreamView; |
+ dart.defineNamedConstructor(Completer, 'sync'); |
+ return Completer; |
}); |
- let StreamView = StreamView$(dart.dynamic); |
- let StreamConsumer$ = dart.generic(function(S) { |
- class StreamConsumer extends core.Object { |
+ let Completer = Completer$(dart.dynamic); |
+ // Function _completeWithErrorCallback: (_Future<dynamic>, dynamic, dynamic) → void |
+ function _completeWithErrorCallback(result, error, stackTrace) { |
+ let replacement = Zone.current.errorCallback(error, dart.as(stackTrace, core.StackTrace)); |
+ if (replacement !== null) { |
+ error = _nonNullError(replacement.error); |
+ stackTrace = replacement.stackTrace; |
} |
- return StreamConsumer; |
- }); |
- let StreamConsumer = StreamConsumer$(dart.dynamic); |
- let StreamSink$ = dart.generic(function(S) { |
- class StreamSink extends core.Object { |
+ result._completeError(error, dart.as(stackTrace, core.StackTrace)); |
+ } |
+ // Function _nonNullError: (Object) → Object |
+ function _nonNullError(error) { |
+ return error !== null ? error : new core.NullThrownError(); |
+ } |
+ let _Completer$ = dart.generic(function(T) { |
+ class _Completer extends core.Object { |
+ _Completer() { |
+ this.future = new _Future(); |
+ } |
+ completeError(error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ error = _nonNullError(error); |
+ if (!dart.notNull(this.future[_mayComplete])) |
+ throw new core.StateError("Future already completed"); |
+ let replacement = Zone.current.errorCallback(error, stackTrace); |
+ if (replacement !== null) { |
+ error = _nonNullError(replacement.error); |
+ stackTrace = replacement.stackTrace; |
+ } |
+ this[_completeError](error, stackTrace); |
+ } |
+ get isCompleted() { |
+ return !dart.notNull(this.future[_mayComplete]); |
+ } |
} |
- return StreamSink; |
+ return _Completer; |
}); |
- let StreamSink = StreamSink$(dart.dynamic); |
- let StreamTransformer$ = dart.generic(function(S, T) { |
- class StreamTransformer extends core.Object { |
- StreamTransformer(transformer) { |
- return new _StreamSubscriptionTransformer(transformer); |
+ let _Completer = _Completer$(dart.dynamic); |
+ let _AsyncCompleter$ = dart.generic(function(T) { |
+ class _AsyncCompleter extends _Completer$(T) { |
+ complete(value) { |
+ if (value === void 0) |
+ value = null; |
+ if (!dart.notNull(this.future[_mayComplete])) |
+ throw new core.StateError("Future already completed"); |
+ this.future._asyncComplete(value); |
} |
- StreamTransformer$fromHandlers(opt$) { |
- return new _StreamHandlerTransformer(opt$); |
+ [_completeError](error, stackTrace) { |
+ this.future._asyncCompleteError(error, stackTrace); |
} |
} |
- dart.defineNamedConstructor(StreamTransformer, 'fromHandlers'); |
- return StreamTransformer; |
+ return _AsyncCompleter; |
}); |
- let StreamTransformer = StreamTransformer$(dart.dynamic, dart.dynamic); |
- let StreamIterator$ = dart.generic(function(T) { |
- class StreamIterator extends core.Object { |
- StreamIterator(stream) { |
- return new _StreamIteratorImpl(stream); |
+ let _AsyncCompleter = _AsyncCompleter$(dart.dynamic); |
+ let _SyncCompleter$ = dart.generic(function(T) { |
+ class _SyncCompleter extends _Completer$(T) { |
+ complete(value) { |
+ if (value === void 0) |
+ value = null; |
+ if (!dart.notNull(this.future[_mayComplete])) |
+ throw new core.StateError("Future already completed"); |
+ this.future._complete(value); |
+ } |
+ [_completeError](error, stackTrace) { |
+ this.future._completeError(error, stackTrace); |
} |
} |
- return StreamIterator; |
+ return _SyncCompleter; |
}); |
- let StreamIterator = StreamIterator$(dart.dynamic); |
- let _ControllerEventSinkWrapper$ = dart.generic(function(T) { |
- class _ControllerEventSinkWrapper extends core.Object { |
- _ControllerEventSinkWrapper($_sink) { |
- this[_sink] = $_sink; |
+ let _SyncCompleter = _SyncCompleter$(dart.dynamic); |
+ let _nextListener = Symbol('_nextListener'); |
+ let _onValue = Symbol('_onValue'); |
+ let _errorTest = Symbol('_errorTest'); |
+ let _whenCompleteAction = Symbol('_whenCompleteAction'); |
+ class _FutureListener extends core.Object { |
+ _FutureListener$then(result, onValue, errorCallback) { |
+ this.result = result; |
+ this.callback = onValue; |
+ this.errorCallback = errorCallback; |
+ this.state = errorCallback === null ? _FutureListener.STATE_THEN : _FutureListener.STATE_THEN_ONERROR; |
+ this[_nextListener] = null; |
+ } |
+ _FutureListener$catchError(result, errorCallback, test) { |
+ this.result = result; |
+ this.errorCallback = errorCallback; |
+ this.callback = test; |
+ this.state = test === null ? _FutureListener.STATE_CATCHERROR : _FutureListener.STATE_CATCHERROR_TEST; |
+ this[_nextListener] = null; |
+ } |
+ _FutureListener$whenComplete(result, onComplete) { |
+ this.result = result; |
+ this.callback = onComplete; |
+ this.errorCallback = null; |
+ this.state = _FutureListener.STATE_WHENCOMPLETE; |
+ this[_nextListener] = null; |
+ } |
+ _FutureListener$chain(result) { |
+ this.result = result; |
+ this.callback = null; |
+ this.errorCallback = null; |
+ this.state = _FutureListener.STATE_CHAIN; |
+ this[_nextListener] = null; |
+ } |
+ get [_zone]() { |
+ return this.result[_zone]; |
+ } |
+ get handlesValue() { |
+ return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_VALUE)) !== 0; |
+ } |
+ get handlesError() { |
+ return (dart.notNull(this.state) & dart.notNull(_FutureListener.MASK_ERROR)) !== 0; |
+ } |
+ get hasErrorTest() { |
+ return this.state === _FutureListener.STATE_CATCHERROR_TEST; |
+ } |
+ get handlesComplete() { |
+ return this.state === _FutureListener.STATE_WHENCOMPLETE; |
+ } |
+ get [_onValue]() { |
+ dart.assert(this.handlesValue); |
+ return dart.as(this.callback, _FutureOnValue); |
+ } |
+ get [_onError]() { |
+ return this.errorCallback; |
+ } |
+ get [_errorTest]() { |
+ dart.assert(this.hasErrorTest); |
+ return dart.as(this.callback, _FutureErrorTest); |
+ } |
+ get [_whenCompleteAction]() { |
+ dart.assert(this.handlesComplete); |
+ return dart.as(this.callback, _FutureAction); |
+ } |
+ } |
+ dart.defineNamedConstructor(_FutureListener, 'then'); |
+ dart.defineNamedConstructor(_FutureListener, 'catchError'); |
+ dart.defineNamedConstructor(_FutureListener, 'whenComplete'); |
+ dart.defineNamedConstructor(_FutureListener, 'chain'); |
+ _FutureListener.MASK_VALUE = 1; |
+ _FutureListener.MASK_ERROR = 2; |
+ _FutureListener.MASK_TEST_ERROR = 4; |
+ _FutureListener.MASK_WHENCOMPLETE = 8; |
+ _FutureListener.STATE_CHAIN = 0; |
+ _FutureListener.STATE_THEN = _FutureListener.MASK_VALUE; |
+ _FutureListener.STATE_THEN_ONERROR = dart.notNull(_FutureListener.MASK_VALUE) | dart.notNull(_FutureListener.MASK_ERROR); |
+ _FutureListener.STATE_CATCHERROR = _FutureListener.MASK_ERROR; |
+ _FutureListener.STATE_CATCHERROR_TEST = dart.notNull(_FutureListener.MASK_ERROR) | dart.notNull(_FutureListener.MASK_TEST_ERROR); |
+ _FutureListener.STATE_WHENCOMPLETE = _FutureListener.MASK_WHENCOMPLETE; |
+ let _resultOrListeners = Symbol('_resultOrListeners'); |
+ let _asyncComplete = Symbol('_asyncComplete'); |
+ let _asyncCompleteError = Symbol('_asyncCompleteError'); |
+ let _isChained = Symbol('_isChained'); |
+ let _isComplete = Symbol('_isComplete'); |
+ let _hasValue = Symbol('_hasValue'); |
+ let _hasError = Symbol('_hasError'); |
+ let _markPendingCompletion = Symbol('_markPendingCompletion'); |
+ let _value = Symbol('_value'); |
+ let _error = Symbol('_error'); |
+ let _setValue = Symbol('_setValue'); |
+ let _setErrorObject = Symbol('_setErrorObject'); |
+ let _setError = Symbol('_setError'); |
+ let _removeListeners = Symbol('_removeListeners'); |
+ let _chainForeignFuture = Symbol('_chainForeignFuture'); |
+ let _chainCoreFuture = Symbol('_chainCoreFuture'); |
+ let _completeWithValue = Symbol('_completeWithValue'); |
+ let _propagateToListeners = Symbol('_propagateToListeners'); |
+ let _Future$ = dart.generic(function(T) { |
+ class _Future extends core.Object { |
+ _Future() { |
+ this[_zone] = Zone.current; |
+ this[_state] = _Future._INCOMPLETE; |
+ this[_resultOrListeners] = null; |
} |
- add(data) { |
- this[_sink].add(data); |
+ _Future$immediate(value) { |
+ this[_zone] = Zone.current; |
+ this[_state] = _Future._INCOMPLETE; |
+ this[_resultOrListeners] = null; |
+ this[_asyncComplete](value); |
} |
- addError(error, stackTrace) { |
+ _Future$immediateError(error, stackTrace) { |
if (stackTrace === void 0) |
stackTrace = null; |
- this[_sink].addError(error, stackTrace); |
- } |
- close() { |
- this[_sink].close(); |
+ this[_zone] = Zone.current; |
+ this[_state] = _Future._INCOMPLETE; |
+ this[_resultOrListeners] = null; |
+ this[_asyncCompleteError](error, stackTrace); |
} |
- } |
- return _ControllerEventSinkWrapper; |
- }); |
- let _ControllerEventSinkWrapper = _ControllerEventSinkWrapper$(dart.dynamic); |
- let StreamController$ = dart.generic(function(T) { |
- class StreamController extends core.Object { |
- StreamController(opt$) { |
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
- let onPause = opt$.onPause === void 0 ? null : opt$.onPause; |
- let onResume = opt$.onResume === void 0 ? null : opt$.onResume; |
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
- let sync = opt$.sync === void 0 ? false : opt$.sync; |
- if (dart.notNull(onListen === null) && dart.notNull(onPause === null) && dart.notNull(onResume === null) && dart.notNull(onCancel === null)) { |
- return dart.as(sync ? new _NoCallbackSyncStreamController() : new _NoCallbackAsyncStreamController(), StreamController$(T)); |
- } |
- return sync ? new _SyncStreamController(onListen, onPause, onResume, onCancel) : new _AsyncStreamController(onListen, onPause, onResume, onCancel); |
+ get [_mayComplete]() { |
+ return this[_state] === _Future._INCOMPLETE; |
} |
- StreamController$broadcast(opt$) { |
- let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
- let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
- let sync = opt$.sync === void 0 ? false : opt$.sync; |
- return sync ? new _SyncBroadcastStreamController(onListen, onCancel) : new _AsyncBroadcastStreamController(onListen, onCancel); |
+ get [_isChained]() { |
+ return this[_state] === _Future._CHAINED; |
} |
- } |
- dart.defineNamedConstructor(StreamController, 'broadcast'); |
- return StreamController; |
- }); |
- let StreamController = StreamController$(dart.dynamic); |
- let _StreamControllerLifecycle$ = dart.generic(function(T) { |
- class _StreamControllerLifecycle extends core.Object { |
- [_recordPause](subscription) {} |
- [_recordResume](subscription) {} |
- [_recordCancel](subscription) { |
- return null; |
+ get [_isComplete]() { |
+ return dart.notNull(this[_state]) >= dart.notNull(_Future._VALUE); |
} |
- } |
- return _StreamControllerLifecycle; |
- }); |
- let _StreamControllerLifecycle = _StreamControllerLifecycle$(dart.dynamic); |
- let _varData = Symbol('_varData'); |
- let _isCanceled = Symbol('_isCanceled'); |
- let _isInitialState = Symbol('_isInitialState'); |
- let _subscription = Symbol('_subscription'); |
- let _isInputPaused = Symbol('_isInputPaused'); |
- let _pendingEvents = Symbol('_pendingEvents'); |
- let _ensurePendingEvents = Symbol('_ensurePendingEvents'); |
- let _badEventState = Symbol('_badEventState'); |
- let _nullFuture = Symbol('_nullFuture'); |
- let _closeUnchecked = Symbol('_closeUnchecked'); |
- let _StreamController$ = dart.generic(function(T) { |
- class _StreamController extends core.Object { |
- _StreamController() { |
- this[_varData] = null; |
- this[_state] = _StreamController._STATE_INITIAL; |
- this[_doneFuture] = null; |
+ get [_hasValue]() { |
+ return this[_state] === _Future._VALUE; |
} |
- get stream() { |
- return dart.as(new _ControllerStream(this), Stream$(T)); |
+ get [_hasError]() { |
+ return this[_state] === _Future._ERROR; |
} |
- get sink() { |
- return new _StreamSinkWrapper(this); |
+ set [_isChained](value) { |
+ if (value) { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ this[_state] = _Future._CHAINED; |
+ } else { |
+ dart.assert(this[_isChained]); |
+ this[_state] = _Future._INCOMPLETE; |
+ } |
} |
- get [_isCanceled]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CANCELED)) !== 0; |
+ then(f, opt$) { |
+ let onError = opt$.onError === void 0 ? null : opt$.onError; |
+ let result = new _Future(); |
+ if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) { |
+ f = result[_zone].registerUnaryCallback(dart.as(f, dart.throw_("Unimplemented type (dynamic) → dynamic"))); |
+ if (onError !== null) { |
+ onError = _registerErrorHandler(onError, result[_zone]); |
+ } |
+ } |
+ this[_addListener](new _FutureListener.then(result, dart.as(f, _FutureOnValue), onError)); |
+ return result; |
} |
- get hasListener() { |
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIBED)) !== 0; |
+ catchError(onError, opt$) { |
+ let test = opt$.test === void 0 ? null : opt$.test; |
+ let result = new _Future(); |
+ if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) { |
+ onError = _registerErrorHandler(onError, result[_zone]); |
+ if (test !== null) |
+ test = dart.as(result[_zone].registerUnaryCallback(test), dart.throw_("Unimplemented type (dynamic) → bool")); |
+ } |
+ this[_addListener](new _FutureListener.catchError(result, onError, test)); |
+ return result; |
} |
- get [_isInitialState]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIPTION_MASK)) === _StreamController._STATE_INITIAL; |
+ whenComplete(action) { |
+ let result = new _Future(); |
+ if (!dart.notNull(core.identical(result[_zone], _ROOT_ZONE))) { |
+ action = result[_zone].registerCallback(action); |
+ } |
+ this[_addListener](new _FutureListener.whenComplete(result, action)); |
+ return dart.as(result, Future$(T)); |
} |
- get isClosed() { |
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CLOSED)) !== 0; |
+ asStream() { |
+ return dart.as(new Stream.fromFuture(this), Stream$(T)); |
} |
- get isPaused() { |
- return this.hasListener ? this[_subscription][_isInputPaused] : !dart.notNull(this[_isCanceled]); |
+ [_markPendingCompletion]() { |
+ if (!dart.notNull(this[_mayComplete])) |
+ throw new core.StateError("Future already completed"); |
+ this[_state] = _Future._PENDING_COMPLETE; |
} |
- get [_isAddingStream]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_ADDSTREAM)) !== 0; |
+ get [_value]() { |
+ dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasValue])); |
+ return dart.as(this[_resultOrListeners], T); |
} |
- get [_mayAddEvent]() { |
- return dart.notNull(this[_state]) < dart.notNull(_StreamController._STATE_CLOSED); |
+ get [_error]() { |
+ dart.assert(dart.notNull(this[_isComplete]) && dart.notNull(this[_hasError])); |
+ return dart.as(this[_resultOrListeners], AsyncError); |
} |
- get [_pendingEvents]() { |
- dart.assert(this[_isInitialState]); |
- if (!dart.notNull(this[_isAddingStream])) { |
- return dart.as(this[_varData], _PendingEvents); |
- } |
- let state = dart.as(this[_varData], _StreamControllerAddStreamState); |
- return dart.as(state.varData, _PendingEvents); |
+ [_setValue](value) { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ this[_state] = _Future._VALUE; |
+ this[_resultOrListeners] = value; |
} |
- [_ensurePendingEvents]() { |
- dart.assert(this[_isInitialState]); |
- if (!dart.notNull(this[_isAddingStream])) { |
- if (this[_varData] === null) |
- this[_varData] = new _StreamImplEvents(); |
- return dart.as(this[_varData], _StreamImplEvents); |
- } |
- let state = dart.as(this[_varData], _StreamControllerAddStreamState); |
- if (state.varData === null) |
- state.varData = new _StreamImplEvents(); |
- return dart.as(state.varData, _StreamImplEvents); |
+ [_setErrorObject](error) { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ this[_state] = _Future._ERROR; |
+ this[_resultOrListeners] = error; |
} |
- get [_subscription]() { |
- dart.assert(this.hasListener); |
- if (this[_isAddingStream]) { |
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
- return dart.as(addState.varData, _ControllerSubscription); |
- } |
- return dart.as(this[_varData], _ControllerSubscription); |
+ [_setError](error, stackTrace) { |
+ this[_setErrorObject](new AsyncError(error, stackTrace)); |
} |
- [_badEventState]() { |
- if (this.isClosed) { |
- return new core.StateError("Cannot add event after closing"); |
+ [_addListener](listener) { |
+ dart.assert(listener[_nextListener] === null); |
+ if (this[_isComplete]) { |
+ this[_zone].scheduleMicrotask((() => { |
+ _propagateToListeners(this, listener); |
+ }).bind(this)); |
+ } else { |
+ listener[_nextListener] = dart.as(this[_resultOrListeners], _FutureListener); |
+ this[_resultOrListeners] = listener; |
} |
- dart.assert(this[_isAddingStream]); |
- return new core.StateError("Cannot add event while adding a stream"); |
- } |
- addStream(source, opt$) { |
- let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError; |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_badEventState](); |
- if (this[_isCanceled]) |
- return new _Future.immediate(null); |
- let addState = new _StreamControllerAddStreamState(this, this[_varData], source, cancelOnError); |
- this[_varData] = addState; |
- this[_state] = _StreamController._STATE_ADDSTREAM; |
- return addState.addStreamFuture; |
} |
- get done() { |
- return this[_ensureDoneFuture](); |
- } |
- [_ensureDoneFuture]() { |
- if (this[_doneFuture] === null) { |
- this[_doneFuture] = this[_isCanceled] ? Future[_nullFuture] : new _Future(); |
+ [_removeListeners]() { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ let current = dart.as(this[_resultOrListeners], _FutureListener); |
+ this[_resultOrListeners] = null; |
+ let prev = null; |
+ while (current !== null) { |
+ let next = current[_nextListener]; |
+ current[_nextListener] = prev; |
+ prev = current; |
+ current = next; |
} |
- return this[_doneFuture]; |
- } |
- add(value) { |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_badEventState](); |
- this[_add](value); |
+ return prev; |
} |
- addError(error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- error = _nonNullError(error); |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_badEventState](); |
- let replacement = Zone.current.errorCallback(error, stackTrace); |
- if (replacement !== null) { |
- error = _nonNullError(replacement.error); |
- stackTrace = replacement.stackTrace; |
- } |
- this[_addError](error, stackTrace); |
+ static [_chainForeignFuture](source, target) { |
+ dart.assert(!dart.notNull(target[_isComplete])); |
+ dart.assert(!dart.is(source, _Future)); |
+ target[_isChained] = true; |
+ source.then(((value) => { |
+ dart.assert(target[_isChained]); |
+ target._completeWithValue(value); |
+ }).bind(this), {onError: ((error, stackTrace) => { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ dart.assert(target[_isChained]); |
+ target._completeError(error, dart.as(stackTrace, core.StackTrace)); |
+ }).bind(this)}); |
} |
- close() { |
- if (this.isClosed) { |
- return this[_ensureDoneFuture](); |
+ static [_chainCoreFuture](source, target) { |
+ dart.assert(!dart.notNull(target[_isComplete])); |
+ dart.assert(dart.is(source, _Future)); |
+ target[_isChained] = true; |
+ let listener = new _FutureListener.chain(target); |
+ if (source[_isComplete]) { |
+ _propagateToListeners(source, listener); |
+ } else { |
+ source._addListener(listener); |
} |
- if (!dart.notNull(this[_mayAddEvent])) |
- throw this[_badEventState](); |
- this[_closeUnchecked](); |
- return this[_ensureDoneFuture](); |
} |
- [_closeUnchecked]() { |
- this[_state] = _StreamController._STATE_CLOSED; |
- if (this.hasListener) { |
- this[_sendDone](); |
- } else if (this[_isInitialState]) { |
- this[_ensurePendingEvents]().add(new _DelayedDone()); |
+ [_complete](value) { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ if (dart.is(value, Future)) { |
+ if (dart.is(value, _Future)) { |
+ _chainCoreFuture(dart.as(value, _Future), this); |
+ } else { |
+ _chainForeignFuture(dart.as(value, Future), this); |
+ } |
+ } else { |
+ let listeners = this[_removeListeners](); |
+ this[_setValue](dart.as(value, T)); |
+ _propagateToListeners(this, listeners); |
} |
} |
- [_add](value) { |
- if (this.hasListener) { |
- this[_sendData](value); |
- } else if (this[_isInitialState]) { |
- this[_ensurePendingEvents]().add(new _DelayedData(value)); |
- } |
+ [_completeWithValue](value) { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ dart.assert(!dart.is(value, Future)); |
+ let listeners = this[_removeListeners](); |
+ this[_setValue](dart.as(value, T)); |
+ _propagateToListeners(this, listeners); |
} |
- [_addError](error, stackTrace) { |
- if (this.hasListener) { |
- this[_sendError](error, stackTrace); |
- } else if (this[_isInitialState]) { |
- this[_ensurePendingEvents]().add(new _DelayedError(error, stackTrace)); |
+ [_completeError](error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ let listeners = this[_removeListeners](); |
+ this[_setError](error, stackTrace); |
+ _propagateToListeners(this, listeners); |
+ } |
+ [_asyncComplete](value) { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ if (value === null) { |
+ } else if (dart.is(value, Future)) { |
+ let typedFuture = dart.as(value, Future$(T)); |
+ if (dart.is(typedFuture, _Future)) { |
+ let coreFuture = dart.as(typedFuture, _Future$(T)); |
+ if (dart.notNull(coreFuture[_isComplete]) && dart.notNull(coreFuture[_hasError])) { |
+ this[_markPendingCompletion](); |
+ this[_zone].scheduleMicrotask((() => { |
+ _chainCoreFuture(coreFuture, this); |
+ }).bind(this)); |
+ } else { |
+ _chainCoreFuture(coreFuture, this); |
+ } |
+ } else { |
+ _chainForeignFuture(typedFuture, this); |
+ } |
+ return; |
+ } else { |
+ let typedValue = dart.as(value, T); |
} |
+ this[_markPendingCompletion](); |
+ this[_zone].scheduleMicrotask((() => { |
+ this[_completeWithValue](value); |
+ }).bind(this)); |
} |
- [_close]() { |
- dart.assert(this[_isAddingStream]); |
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
- this[_varData] = addState.varData; |
- this[_state] = ~dart.notNull(_StreamController._STATE_ADDSTREAM); |
- addState.complete(); |
+ [_asyncCompleteError](error, stackTrace) { |
+ dart.assert(!dart.notNull(this[_isComplete])); |
+ this[_markPendingCompletion](); |
+ this[_zone].scheduleMicrotask((() => { |
+ this[_completeError](error, stackTrace); |
+ }).bind(this)); |
} |
- [_subscribe](onData, onError, onDone, cancelOnError) { |
- if (!dart.notNull(this[_isInitialState])) { |
- throw new core.StateError("Stream has already been listened to."); |
- } |
- let subscription = new _ControllerSubscription(this, dart.as(onData, dart.throw_("Unimplemented type (dynamic) → void")), onError, onDone, cancelOnError); |
- let pendingEvents = this[_pendingEvents]; |
- this[_state] = _StreamController._STATE_SUBSCRIBED; |
- if (this[_isAddingStream]) { |
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
- addState.varData = subscription; |
- addState.resume(); |
- } else { |
- this[_varData] = subscription; |
+ static [_propagateToListeners](source, listeners) { |
+ while (true) { |
+ dart.assert(source[_isComplete]); |
+ let hasError = source[_hasError]; |
+ if (listeners === null) { |
+ if (hasError) { |
+ let asyncError = source[_error]; |
+ source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace); |
+ } |
+ return; |
+ } |
+ while (listeners[_nextListener] !== null) { |
+ let listener = listeners; |
+ listeners = listener[_nextListener]; |
+ listener[_nextListener] = null; |
+ _propagateToListeners(source, listener); |
+ } |
+ let listener = listeners; |
+ let listenerHasValue = true; |
+ let sourceValue = hasError ? null : source[_value]; |
+ let listenerValueOrError = sourceValue; |
+ let isPropagationAborted = false; |
+ if (dart.notNull(hasError) || dart.notNull(listener.handlesValue) || dart.notNull(listener.handlesComplete)) { |
+ let zone = listener[_zone]; |
+ if (dart.notNull(hasError) && !dart.notNull(source[_zone].inSameErrorZone(zone))) { |
+ let asyncError = source[_error]; |
+ source[_zone].handleUncaughtError(asyncError.error, asyncError.stackTrace); |
+ return; |
+ } |
+ let oldZone = null; |
+ if (!dart.notNull(core.identical(Zone.current, zone))) { |
+ oldZone = Zone._enter(zone); |
+ } |
+ // Function handleValueCallback: () → bool |
+ function handleValueCallback() { |
+ try { |
+ listenerValueOrError = zone.runUnary(listener[_onValue], sourceValue); |
+ return true; |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ listenerValueOrError = new AsyncError(e, s); |
+ return false; |
+ } |
+ |
+ } |
+ // Function handleError: () → void |
+ function handleError() { |
+ let asyncError = source[_error]; |
+ let matchesTest = true; |
+ if (listener.hasErrorTest) { |
+ let test = listener[_errorTest]; |
+ try { |
+ matchesTest = dart.as(zone.runUnary(test, asyncError.error), core.bool); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s); |
+ listenerHasValue = false; |
+ return; |
+ } |
+ |
+ } |
+ let errorCallback = listener[_onError]; |
+ if (dart.notNull(matchesTest) && dart.notNull(errorCallback !== null)) { |
+ try { |
+ if (dart.is(errorCallback, ZoneBinaryCallback)) { |
+ listenerValueOrError = zone.runBinary(errorCallback, asyncError.error, asyncError.stackTrace); |
+ } else { |
+ listenerValueOrError = zone.runUnary(dart.as(errorCallback, dart.throw_("Unimplemented type (dynamic) → dynamic")), asyncError.error); |
+ } |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ listenerValueOrError = core.identical(asyncError.error, e) ? asyncError : new AsyncError(e, s); |
+ listenerHasValue = false; |
+ return; |
+ } |
+ |
+ listenerHasValue = true; |
+ } else { |
+ listenerValueOrError = asyncError; |
+ listenerHasValue = false; |
+ } |
+ } |
+ // Function handleWhenCompleteCallback: () → void |
+ function handleWhenCompleteCallback() { |
+ let completeResult = null; |
+ try { |
+ completeResult = zone.run(listener[_whenCompleteAction]); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ if (dart.notNull(hasError) && dart.notNull(core.identical(source[_error].error, e))) { |
+ listenerValueOrError = source[_error]; |
+ } else { |
+ listenerValueOrError = new AsyncError(e, s); |
+ } |
+ listenerHasValue = false; |
+ return; |
+ } |
+ |
+ if (dart.is(completeResult, Future)) { |
+ let result = listener.result; |
+ result[_isChained] = true; |
+ isPropagationAborted = true; |
+ dart.dinvoke(completeResult, 'then', (ignored) => { |
+ _propagateToListeners(source, new _FutureListener.chain(result)); |
+ }, { |
+ onError: (error, stackTrace) => { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ if (!dart.is(completeResult, _Future)) { |
+ completeResult = new _Future(); |
+ dart.dinvoke(completeResult, '_setError', error, stackTrace); |
+ } |
+ _propagateToListeners(dart.as(completeResult, _Future), new _FutureListener.chain(result)); |
+ } |
+ }); |
+ } |
+ } |
+ if (!dart.notNull(hasError)) { |
+ if (listener.handlesValue) { |
+ listenerHasValue = handleValueCallback(); |
+ } |
+ } else { |
+ handleError(); |
+ } |
+ if (listener.handlesComplete) { |
+ handleWhenCompleteCallback(); |
+ } |
+ if (oldZone !== null) |
+ Zone._leave(oldZone); |
+ if (isPropagationAborted) |
+ return; |
+ if (dart.notNull(listenerHasValue) && !dart.notNull(core.identical(sourceValue, listenerValueOrError)) && dart.notNull(dart.is(listenerValueOrError, Future))) { |
+ let chainSource = dart.as(listenerValueOrError, Future); |
+ let result = listener.result; |
+ if (dart.is(chainSource, _Future)) { |
+ if (chainSource[_isComplete]) { |
+ result[_isChained] = true; |
+ source = chainSource; |
+ listeners = new _FutureListener.chain(result); |
+ continue; |
+ } else { |
+ _chainCoreFuture(chainSource, result); |
+ } |
+ } else { |
+ _chainForeignFuture(chainSource, result); |
+ } |
+ return; |
+ } |
+ } |
+ let result = listener.result; |
+ listeners = result._removeListeners(); |
+ if (listenerHasValue) { |
+ result._setValue(listenerValueOrError); |
+ } else { |
+ let asyncError = dart.as(listenerValueOrError, AsyncError); |
+ result._setErrorObject(asyncError); |
+ } |
+ source = result; |
} |
- subscription._setPendingEvents(pendingEvents); |
- subscription._guardCallback((() => { |
- _runGuarded(this[_onListen]); |
- }).bind(this)); |
- return dart.as(subscription, StreamSubscription$(T)); |
} |
- [_recordCancel](subscription) { |
- let result = null; |
- if (this[_isAddingStream]) { |
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
- result = addState.cancel(); |
- } |
- this[_varData] = null; |
- this[_state] = dart.notNull(this[_state]) & ~(dart.notNull(_StreamController._STATE_SUBSCRIBED) | dart.notNull(_StreamController._STATE_ADDSTREAM)) | dart.notNull(_StreamController._STATE_CANCELED); |
- if (this[_onCancel] !== null) { |
- if (result === null) { |
+ timeout(timeLimit, opt$) { |
+ let onTimeout = opt$.onTimeout === void 0 ? null : opt$.onTimeout; |
+ if (this[_isComplete]) |
+ return new _Future.immediate(this); |
+ let result = new _Future(); |
+ let timer = null; |
+ if (onTimeout === null) { |
+ timer = new Timer(timeLimit, (() => { |
+ result._completeError(new TimeoutException("Future not completed", timeLimit)); |
+ }).bind(this)); |
+ } else { |
+ let zone = Zone.current; |
+ onTimeout = zone.registerCallback(onTimeout); |
+ timer = new Timer(timeLimit, (() => { |
try { |
- result = dart.as(this[_onCancel](), Future); |
+ result._complete(zone.run(onTimeout)); |
} catch (e) { |
let s = dart.stackTrace(e); |
- result = ((_) => { |
- _._asyncCompleteError(e, s); |
- return _; |
- }).bind(this)(new _Future()); |
+ result._completeError(e, s); |
} |
- } else { |
- result = result.whenComplete(this[_onCancel]); |
- } |
+ }).bind(this)); |
} |
- // Function complete: () → void |
- function complete() { |
- if (dart.notNull(this[_doneFuture] !== null) && dart.notNull(this[_doneFuture][_mayComplete])) { |
- this[_doneFuture]._asyncComplete(null); |
+ this.then(((v) => { |
+ if (timer.isActive) { |
+ timer.cancel(); |
+ result._completeWithValue(v); |
} |
- } |
- if (result !== null) { |
- result = result.whenComplete(complete); |
- } else { |
- complete(); |
- } |
+ }).bind(this), {onError: ((e, s) => { |
+ if (timer.isActive) { |
+ timer.cancel(); |
+ result._completeError(e, dart.as(s, core.StackTrace)); |
+ } |
+ }).bind(this)}); |
return result; |
} |
- [_recordPause](subscription) { |
- if (this[_isAddingStream]) { |
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
- addState.pause(); |
+ } |
+ dart.defineNamedConstructor(_Future, 'immediate'); |
+ dart.defineNamedConstructor(_Future, 'immediateError'); |
+ _Future._INCOMPLETE = 0; |
+ _Future._PENDING_COMPLETE = 1; |
+ _Future._CHAINED = 2; |
+ _Future._VALUE = 4; |
+ _Future._ERROR = 8; |
+ return _Future; |
+ }); |
+ let _Future = _Future$(dart.dynamic); |
+ class _AsyncCallbackEntry extends core.Object { |
+ _AsyncCallbackEntry(callback) { |
+ this.callback = callback; |
+ this.next = null; |
+ } |
+ } |
+ exports._nextCallback = null; |
+ exports._lastCallback = null; |
+ exports._lastPriorityCallback = null; |
+ exports._isInCallbackLoop = false; |
+ // Function _asyncRunCallbackLoop: () → void |
+ function _asyncRunCallbackLoop() { |
+ while (exports._nextCallback !== null) { |
+ exports._lastPriorityCallback = null; |
+ let entry = exports._nextCallback; |
+ exports._nextCallback = entry.next; |
+ if (exports._nextCallback === null) |
+ exports._lastCallback = null; |
+ entry.callback(); |
+ } |
+ } |
+ // Function _asyncRunCallback: () → void |
+ function _asyncRunCallback() { |
+ exports._isInCallbackLoop = true; |
+ try { |
+ _asyncRunCallbackLoop(); |
+ } finally { |
+ exports._lastPriorityCallback = null; |
+ exports._isInCallbackLoop = false; |
+ if (exports._nextCallback !== null) |
+ _AsyncRun._scheduleImmediate(_asyncRunCallback); |
+ } |
+ } |
+ // Function _scheduleAsyncCallback: (dynamic) → void |
+ function _scheduleAsyncCallback(callback) { |
+ if (exports._nextCallback === null) { |
+ exports._nextCallback = exports._lastCallback = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback)); |
+ if (!dart.notNull(exports._isInCallbackLoop)) { |
+ _AsyncRun._scheduleImmediate(_asyncRunCallback); |
+ } |
+ } else { |
+ let newEntry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback)); |
+ exports._lastCallback.next = newEntry; |
+ exports._lastCallback = newEntry; |
+ } |
+ } |
+ // Function _schedulePriorityAsyncCallback: (dynamic) → void |
+ function _schedulePriorityAsyncCallback(callback) { |
+ let entry = new _AsyncCallbackEntry(dart.as(callback, _AsyncCallback)); |
+ if (exports._nextCallback === null) { |
+ _scheduleAsyncCallback(callback); |
+ exports._lastPriorityCallback = exports._lastCallback; |
+ } else if (exports._lastPriorityCallback === null) { |
+ entry.next = exports._nextCallback; |
+ exports._nextCallback = exports._lastPriorityCallback = entry; |
+ } else { |
+ entry.next = exports._lastPriorityCallback.next; |
+ exports._lastPriorityCallback.next = entry; |
+ exports._lastPriorityCallback = entry; |
+ if (entry.next === null) { |
+ exports._lastCallback = entry; |
+ } |
+ } |
+ } |
+ // Function scheduleMicrotask: (() → void) → void |
+ function scheduleMicrotask(callback) { |
+ if (core.identical(_ROOT_ZONE, Zone.current)) { |
+ _rootScheduleMicrotask(null, null, dart.as(_ROOT_ZONE, Zone), callback); |
+ return; |
+ } |
+ Zone.current.scheduleMicrotask(Zone.current.bindCallback(callback, {runGuarded: true})); |
+ } |
+ let _scheduleImmediate = Symbol('_scheduleImmediate'); |
+ let _initializeScheduleImmediate = Symbol('_initializeScheduleImmediate'); |
+ let _scheduleImmediateJsOverride = Symbol('_scheduleImmediateJsOverride'); |
+ let _scheduleImmediateWithSetImmediate = Symbol('_scheduleImmediateWithSetImmediate'); |
+ let _scheduleImmediateWithTimer = Symbol('_scheduleImmediateWithTimer'); |
+ class _AsyncRun extends core.Object { |
+ static [_scheduleImmediate](callback) { |
+ dart.dinvokef(scheduleImmediateClosure, callback); |
+ } |
+ static [_initializeScheduleImmediate]() { |
+ _js_helper.requiresPreamble(); |
+ if (self.scheduleImmediate !== null) { |
+ return _scheduleImmediateJsOverride; |
+ } |
+ if (dart.notNull(self.MutationObserver !== null) && dart.notNull(self.document !== null)) { |
+ let div = self.document.createElement("div"); |
+ let span = self.document.createElement("span"); |
+ let storedCallback = null; |
+ // Function internalCallback: (dynamic) → dynamic |
+ function internalCallback(_) { |
+ _isolate_helper.leaveJsAsync(); |
+ let f = storedCallback; |
+ storedCallback = null; |
+ dart.dinvokef(f); |
} |
- _runGuarded(this[_onPause]); |
+ ; |
+ let observer = new self.MutationObserver(_js_helper.convertDartClosureToJS(internalCallback, 1)); |
+ observer.observe(div, {childList: true}); |
+ return (callback) => { |
+ dart.assert(storedCallback === null); |
+ _isolate_helper.enterJsAsync(); |
+ storedCallback = callback; |
+ div.firstChild ? div.removeChild(span) : div.appendChild(span); |
+ }; |
+ } else if (self.setImmediate !== null) { |
+ return _scheduleImmediateWithSetImmediate; |
+ } |
+ return _scheduleImmediateWithTimer; |
+ } |
+ static [_scheduleImmediateJsOverride](callback) { |
+ // Function internalCallback: () → dynamic |
+ function internalCallback() { |
+ _isolate_helper.leaveJsAsync(); |
+ callback(); |
+ } |
+ ; |
+ _isolate_helper.enterJsAsync(); |
+ self.scheduleImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0)); |
+ } |
+ static [_scheduleImmediateWithSetImmediate](callback) { |
+ // Function internalCallback: () → dynamic |
+ function internalCallback() { |
+ _isolate_helper.leaveJsAsync(); |
+ callback(); |
+ } |
+ ; |
+ _isolate_helper.enterJsAsync(); |
+ self.setImmediate(_js_helper.convertDartClosureToJS(internalCallback, 0)); |
+ } |
+ static [_scheduleImmediateWithTimer](callback) { |
+ Timer._createTimer(core.Duration.ZERO, callback); |
+ } |
+ } |
+ dart.defineLazyProperties(_AsyncRun, { |
+ get scheduleImmediateClosure() { |
+ return _initializeScheduleImmediate(); |
+ } |
+ }); |
+ let StreamSubscription$ = dart.generic(function(T) { |
+ class StreamSubscription extends core.Object { |
+ } |
+ return StreamSubscription; |
+ }); |
+ let StreamSubscription = StreamSubscription$(dart.dynamic); |
+ let EventSink$ = dart.generic(function(T) { |
+ class EventSink extends core.Object { |
+ } |
+ return EventSink; |
+ }); |
+ let EventSink = EventSink$(dart.dynamic); |
+ let _stream = Symbol('_stream'); |
+ let StreamView$ = dart.generic(function(T) { |
+ class StreamView extends Stream$(T) { |
+ StreamView($_stream) { |
+ this[_stream] = $_stream; |
+ super.Stream(); |
+ } |
+ get isBroadcast() { |
+ return this[_stream].isBroadcast; |
+ } |
+ asBroadcastStream(opt$) { |
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
+ return this[_stream].asBroadcastStream({onListen: onListen, onCancel: onCancel}); |
} |
- [_recordResume](subscription) { |
- if (this[_isAddingStream]) { |
- let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
- addState.resume(); |
- } |
- _runGuarded(this[_onResume]); |
+ listen(onData, opt$) { |
+ let onError = opt$.onError === void 0 ? null : opt$.onError; |
+ let onDone = opt$.onDone === void 0 ? null : opt$.onDone; |
+ let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError; |
+ return this[_stream].listen(onData, {onError: onError, onDone: onDone, cancelOnError: cancelOnError}); |
} |
} |
- _StreamController._STATE_INITIAL = 0; |
- _StreamController._STATE_SUBSCRIBED = 1; |
- _StreamController._STATE_CANCELED = 2; |
- _StreamController._STATE_SUBSCRIPTION_MASK = 3; |
- _StreamController._STATE_CLOSED = 4; |
- _StreamController._STATE_ADDSTREAM = 8; |
- return _StreamController; |
+ return StreamView; |
}); |
- let _StreamController = _StreamController$(dart.dynamic); |
- let _SyncStreamControllerDispatch$ = dart.generic(function(T) { |
- class _SyncStreamControllerDispatch extends core.Object { |
- [_sendData](data) { |
- this[_subscription]._add(data); |
+ let StreamView = StreamView$(dart.dynamic); |
+ let StreamConsumer$ = dart.generic(function(S) { |
+ class StreamConsumer extends core.Object { |
+ } |
+ return StreamConsumer; |
+ }); |
+ let StreamConsumer = StreamConsumer$(dart.dynamic); |
+ let StreamSink$ = dart.generic(function(S) { |
+ class StreamSink extends core.Object { |
+ } |
+ return StreamSink; |
+ }); |
+ let StreamSink = StreamSink$(dart.dynamic); |
+ let StreamTransformer$ = dart.generic(function(S, T) { |
+ class StreamTransformer extends core.Object { |
+ StreamTransformer(transformer) { |
+ return new _StreamSubscriptionTransformer(transformer); |
} |
- [_sendError](error, stackTrace) { |
- this[_subscription]._addError(error, stackTrace); |
+ StreamTransformer$fromHandlers(opt$) { |
+ return new _StreamHandlerTransformer(opt$); |
} |
- [_sendDone]() { |
- this[_subscription]._close(); |
+ } |
+ dart.defineNamedConstructor(StreamTransformer, 'fromHandlers'); |
+ return StreamTransformer; |
+ }); |
+ let StreamTransformer = StreamTransformer$(dart.dynamic, dart.dynamic); |
+ let StreamIterator$ = dart.generic(function(T) { |
+ class StreamIterator extends core.Object { |
+ StreamIterator(stream) { |
+ return new _StreamIteratorImpl(stream); |
} |
} |
- return _SyncStreamControllerDispatch; |
+ return StreamIterator; |
}); |
- let _SyncStreamControllerDispatch = _SyncStreamControllerDispatch$(dart.dynamic); |
- let _AsyncStreamControllerDispatch$ = dart.generic(function(T) { |
- class _AsyncStreamControllerDispatch extends core.Object { |
- [_sendData](data) { |
- this[_subscription]._addPending(new _DelayedData(data)); |
+ let StreamIterator = StreamIterator$(dart.dynamic); |
+ let _ControllerEventSinkWrapper$ = dart.generic(function(T) { |
+ class _ControllerEventSinkWrapper extends core.Object { |
+ _ControllerEventSinkWrapper($_sink) { |
+ this[_sink] = $_sink; |
} |
- [_sendError](error, stackTrace) { |
- this[_subscription]._addPending(new _DelayedError(error, stackTrace)); |
+ add(data) { |
+ this[_sink].add(data); |
} |
- [_sendDone]() { |
- this[_subscription]._addPending(new _DelayedDone()); |
+ addError(error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ this[_sink].addError(error, stackTrace); |
+ } |
+ close() { |
+ this[_sink].close(); |
} |
} |
- return _AsyncStreamControllerDispatch; |
+ return _ControllerEventSinkWrapper; |
}); |
- let _AsyncStreamControllerDispatch = _AsyncStreamControllerDispatch$(dart.dynamic); |
- let _AsyncStreamController$ = dart.generic(function(T) { |
- class _AsyncStreamController extends dart.mixin(_StreamController$(T), _AsyncStreamControllerDispatch$(T)) { |
- _AsyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) { |
- this[_onListen] = $_onListen; |
- this[_onPause] = $_onPause; |
- this[_onResume] = $_onResume; |
- this[_onCancel] = $_onCancel; |
- super._StreamController(); |
+ let _ControllerEventSinkWrapper = _ControllerEventSinkWrapper$(dart.dynamic); |
+ let StreamController$ = dart.generic(function(T) { |
+ class StreamController extends core.Object { |
+ StreamController(opt$) { |
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
+ let onPause = opt$.onPause === void 0 ? null : opt$.onPause; |
+ let onResume = opt$.onResume === void 0 ? null : opt$.onResume; |
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
+ let sync = opt$.sync === void 0 ? false : opt$.sync; |
+ if (dart.notNull(onListen === null) && dart.notNull(onPause === null) && dart.notNull(onResume === null) && dart.notNull(onCancel === null)) { |
+ return dart.as(sync ? new _NoCallbackSyncStreamController() : new _NoCallbackAsyncStreamController(), StreamController$(T)); |
+ } |
+ return sync ? new _SyncStreamController(onListen, onPause, onResume, onCancel) : new _AsyncStreamController(onListen, onPause, onResume, onCancel); |
+ } |
+ StreamController$broadcast(opt$) { |
+ let onListen = opt$.onListen === void 0 ? null : opt$.onListen; |
+ let onCancel = opt$.onCancel === void 0 ? null : opt$.onCancel; |
+ let sync = opt$.sync === void 0 ? false : opt$.sync; |
+ return sync ? new _SyncBroadcastStreamController(onListen, onCancel) : new _AsyncBroadcastStreamController(onListen, onCancel); |
} |
} |
- return _AsyncStreamController; |
+ dart.defineNamedConstructor(StreamController, 'broadcast'); |
+ return StreamController; |
}); |
- let _AsyncStreamController = _AsyncStreamController$(dart.dynamic); |
- let _SyncStreamController$ = dart.generic(function(T) { |
- class _SyncStreamController extends dart.mixin(_StreamController$(T), _SyncStreamControllerDispatch$(T)) { |
- _SyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) { |
- this[_onListen] = $_onListen; |
- this[_onPause] = $_onPause; |
- this[_onResume] = $_onResume; |
- this[_onCancel] = $_onCancel; |
- super._StreamController(); |
+ let StreamController = StreamController$(dart.dynamic); |
+ let _StreamControllerLifecycle$ = dart.generic(function(T) { |
+ class _StreamControllerLifecycle extends core.Object { |
+ [_recordPause](subscription) {} |
+ [_recordResume](subscription) {} |
+ [_recordCancel](subscription) { |
+ return null; |
} |
} |
- return _SyncStreamController; |
+ return _StreamControllerLifecycle; |
}); |
- let _SyncStreamController = _SyncStreamController$(dart.dynamic); |
- class _NoCallbacks extends core.Object { |
- get [_onListen]() { |
- return null; |
- } |
- get [_onPause]() { |
- return null; |
- } |
- get [_onResume]() { |
- return null; |
- } |
- get [_onCancel]() { |
- return null; |
- } |
- } |
- class _NoCallbackAsyncStreamController extends dart.mixin(_AsyncStreamControllerDispatch, _NoCallbacks) { |
- } |
- class _NoCallbackSyncStreamController extends dart.mixin(_SyncStreamControllerDispatch, _NoCallbacks) { |
- } |
- // Function _runGuarded: (() → dynamic) → Future<dynamic> |
- function _runGuarded(notificationHandler) { |
- if (notificationHandler === null) |
- return null; |
- try { |
- let result = notificationHandler(); |
- if (dart.is(result, Future)) |
- return dart.as(result, Future); |
- return null; |
- } catch (e) { |
- let s = dart.stackTrace(e); |
- Zone.current.handleUncaughtError(e, s); |
- } |
- |
- } |
- let _createSubscription = Symbol('_createSubscription'); |
- let _ControllerStream$ = dart.generic(function(T) { |
- class _ControllerStream extends _StreamImpl$(T) { |
- _ControllerStream($_controller) { |
- this[_controller] = $_controller; |
- super._StreamImpl(); |
+ let _StreamControllerLifecycle = _StreamControllerLifecycle$(dart.dynamic); |
+ let _varData = Symbol('_varData'); |
+ let _isInitialState = Symbol('_isInitialState'); |
+ let _subscription = Symbol('_subscription'); |
+ let _pendingEvents = Symbol('_pendingEvents'); |
+ let _ensurePendingEvents = Symbol('_ensurePendingEvents'); |
+ let _badEventState = Symbol('_badEventState'); |
+ let _nullFuture = Symbol('_nullFuture'); |
+ let _closeUnchecked = Symbol('_closeUnchecked'); |
+ let _StreamController$ = dart.generic(function(T) { |
+ class _StreamController extends core.Object { |
+ _StreamController() { |
+ this[_varData] = null; |
+ this[_state] = _StreamController._STATE_INITIAL; |
+ this[_doneFuture] = null; |
} |
- [_createSubscription](onData, onError, onDone, cancelOnError) { |
- return this[_controller]._subscribe(onData, onError, onDone, cancelOnError); |
+ get stream() { |
+ return dart.as(new _ControllerStream(this), Stream$(T)); |
} |
- get hashCode() { |
- return dart.notNull(this[_controller].hashCode) ^ 892482866; |
+ get sink() { |
+ return new _StreamSinkWrapper(this); |
} |
- ['=='](other) { |
- if (core.identical(this, other)) |
- return true; |
- if (!dart.is(other, _ControllerStream)) |
- return false; |
- let otherStream = dart.as(other, _ControllerStream); |
- return core.identical(otherStream[_controller], this[_controller]); |
+ get [_isCanceled]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CANCELED)) !== 0; |
} |
- } |
- return _ControllerStream; |
- }); |
- let _ControllerStream = _ControllerStream$(dart.dynamic); |
- let _ControllerSubscription$ = dart.generic(function(T) { |
- class _ControllerSubscription extends _BufferingStreamSubscription$(T) { |
- _ControllerSubscription($_controller, onData, onError, onDone, cancelOnError) { |
- this[_controller] = $_controller; |
- super._BufferingStreamSubscription(onData, onError, onDone, cancelOnError); |
+ get hasListener() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIBED)) !== 0; |
} |
- [_onCancel]() { |
- return this[_controller]._recordCancel(this); |
+ get [_isInitialState]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_SUBSCRIPTION_MASK)) === _StreamController._STATE_INITIAL; |
} |
- [_onPause]() { |
- this[_controller]._recordPause(this); |
+ get isClosed() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_CLOSED)) !== 0; |
} |
- [_onResume]() { |
- this[_controller]._recordResume(this); |
+ get isPaused() { |
+ return this.hasListener ? this[_subscription][_isInputPaused] : !dart.notNull(this[_isCanceled]); |
} |
- } |
- return _ControllerSubscription; |
- }); |
- let _ControllerSubscription = _ControllerSubscription$(dart.dynamic); |
- let _target = Symbol('_target'); |
- let _StreamSinkWrapper$ = dart.generic(function(T) { |
- class _StreamSinkWrapper extends core.Object { |
- _StreamSinkWrapper($_target) { |
- this[_target] = $_target; |
+ get [_isAddingStream]() { |
+ return (dart.notNull(this[_state]) & dart.notNull(_StreamController._STATE_ADDSTREAM)) !== 0; |
} |
- add(data) { |
- this[_target].add(data); |
+ get [_mayAddEvent]() { |
+ return dart.notNull(this[_state]) < dart.notNull(_StreamController._STATE_CLOSED); |
+ } |
+ get [_pendingEvents]() { |
+ dart.assert(this[_isInitialState]); |
+ if (!dart.notNull(this[_isAddingStream])) { |
+ return dart.as(this[_varData], _PendingEvents); |
+ } |
+ let state = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ return dart.as(state.varData, _PendingEvents); |
+ } |
+ [_ensurePendingEvents]() { |
+ dart.assert(this[_isInitialState]); |
+ if (!dart.notNull(this[_isAddingStream])) { |
+ if (this[_varData] === null) |
+ this[_varData] = new _StreamImplEvents(); |
+ return dart.as(this[_varData], _StreamImplEvents); |
+ } |
+ let state = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ if (state.varData === null) |
+ state.varData = new _StreamImplEvents(); |
+ return dart.as(state.varData, _StreamImplEvents); |
} |
- addError(error, stackTrace) { |
- if (stackTrace === void 0) |
- stackTrace = null; |
- this[_target].addError(error, stackTrace); |
+ get [_subscription]() { |
+ dart.assert(this.hasListener); |
+ if (this[_isAddingStream]) { |
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ return dart.as(addState.varData, _ControllerSubscription); |
+ } |
+ return dart.as(this[_varData], _ControllerSubscription); |
} |
- close() { |
- return this[_target].close(); |
+ [_badEventState]() { |
+ if (this.isClosed) { |
+ return new core.StateError("Cannot add event after closing"); |
+ } |
+ dart.assert(this[_isAddingStream]); |
+ return new core.StateError("Cannot add event while adding a stream"); |
} |
addStream(source, opt$) { |
let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError; |
- return this[_target].addStream(source, {cancelOnError: cancelOnError}); |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_badEventState](); |
+ if (this[_isCanceled]) |
+ return new _Future.immediate(null); |
+ let addState = new _StreamControllerAddStreamState(this, this[_varData], source, cancelOnError); |
+ this[_varData] = addState; |
+ this[_state] = _StreamController._STATE_ADDSTREAM; |
+ return addState.addStreamFuture; |
} |
get done() { |
- return this[_target].done; |
- } |
- } |
- return _StreamSinkWrapper; |
- }); |
- let _StreamSinkWrapper = _StreamSinkWrapper$(dart.dynamic); |
- let _AddStreamState$ = dart.generic(function(T) { |
- class _AddStreamState extends core.Object { |
- _AddStreamState(controller, source, cancelOnError) { |
- this.addStreamFuture = new _Future(); |
- this.addSubscription = source.listen(dart.as(controller[_add], dart.throw_("Unimplemented type (dynamic) → void")), {onError: dart.as(cancelOnError ? makeErrorHandler(controller) : controller[_addError], core.Function), onDone: controller[_close], cancelOnError: cancelOnError}); |
- } |
- static makeErrorHandler(controller) { |
- return ((e, s) => { |
- controller._addError(e, s); |
- controller._close(); |
- }).bind(this); |
- } |
- pause() { |
- this.addSubscription.pause(); |
- } |
- resume() { |
- this.addSubscription.resume(); |
+ return this[_ensureDoneFuture](); |
} |
- cancel() { |
- let cancel = this.addSubscription.cancel(); |
- if (cancel === null) { |
- this.addStreamFuture._asyncComplete(null); |
- return null; |
+ [_ensureDoneFuture]() { |
+ if (this[_doneFuture] === null) { |
+ this[_doneFuture] = this[_isCanceled] ? Future[_nullFuture] : new _Future(); |
} |
- return cancel.whenComplete((() => { |
- this.addStreamFuture._asyncComplete(null); |
- }).bind(this)); |
+ return this[_doneFuture]; |
} |
- complete() { |
- this.addStreamFuture._asyncComplete(null); |
+ add(value) { |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_badEventState](); |
+ this[_add](value); |
} |
- } |
- return _AddStreamState; |
- }); |
- let _AddStreamState = _AddStreamState$(dart.dynamic); |
- let _StreamControllerAddStreamState$ = dart.generic(function(T) { |
- class _StreamControllerAddStreamState extends _AddStreamState$(T) { |
- _StreamControllerAddStreamState(controller, varData, source, cancelOnError) { |
- this.varData = varData; |
- super._AddStreamState(dart.as(controller, _EventSink$(T)), source, cancelOnError); |
- if (controller.isPaused) { |
- this.addSubscription.pause(); |
+ addError(error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ error = _nonNullError(error); |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_badEventState](); |
+ let replacement = Zone.current.errorCallback(error, stackTrace); |
+ if (replacement !== null) { |
+ error = _nonNullError(replacement.error); |
+ stackTrace = replacement.stackTrace; |
} |
+ this[_addError](error, stackTrace); |
} |
- } |
- return _StreamControllerAddStreamState; |
- }); |
- let _StreamControllerAddStreamState = _StreamControllerAddStreamState$(dart.dynamic); |
- let _EventSink$ = dart.generic(function(T) { |
- class _EventSink extends core.Object { |
- } |
- return _EventSink; |
- }); |
- let _EventSink = _EventSink$(dart.dynamic); |
- let _EventDispatch$ = dart.generic(function(T) { |
- class _EventDispatch extends core.Object { |
- } |
- return _EventDispatch; |
- }); |
- let _EventDispatch = _EventDispatch$(dart.dynamic); |
- let _onData = Symbol('_onData'); |
- let _onDone = Symbol('_onDone'); |
- let _cancelFuture = Symbol('_cancelFuture'); |
- let _setPendingEvents = Symbol('_setPendingEvents'); |
- let _extractPending = Symbol('_extractPending'); |
- let _isPaused = Symbol('_isPaused'); |
- let _inCallback = Symbol('_inCallback'); |
- let _guardCallback = Symbol('_guardCallback'); |
- let _decrementPauseCount = Symbol('_decrementPauseCount'); |
- let _mayResumeInput = Symbol('_mayResumeInput'); |
- let _cancel = Symbol('_cancel'); |
- let _isClosed = Symbol('_isClosed'); |
- let _waitsForCancel = Symbol('_waitsForCancel'); |
- let _canFire = Symbol('_canFire'); |
- let _cancelOnError = Symbol('_cancelOnError'); |
- let _incrementPauseCount = Symbol('_incrementPauseCount'); |
- let _addPending = Symbol('_addPending'); |
- let _checkState = Symbol('_checkState'); |
- let _BufferingStreamSubscription$ = dart.generic(function(T) { |
- class _BufferingStreamSubscription extends core.Object { |
- _BufferingStreamSubscription(onData, onError, onDone, cancelOnError) { |
- this[_zone] = Zone.current; |
- this[_state] = cancelOnError ? _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR : 0; |
- this[_onData] = null; |
- this[_onError] = null; |
- this[_onDone] = null; |
- this[_cancelFuture] = null; |
- this[_pending] = null; |
- this.onData(onData); |
- this.onError(onError); |
- this.onDone(onDone); |
- } |
- [_setPendingEvents](pendingEvents) { |
- dart.assert(this[_pending] === null); |
- if (pendingEvents === null) |
- return; |
- this[_pending] = pendingEvents; |
- if (!dart.notNull(pendingEvents.isEmpty)) { |
- this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING; |
- this[_pending].schedule(this); |
+ close() { |
+ if (this.isClosed) { |
+ return this[_ensureDoneFuture](); |
} |
+ if (!dart.notNull(this[_mayAddEvent])) |
+ throw this[_badEventState](); |
+ this[_closeUnchecked](); |
+ return this[_ensureDoneFuture](); |
} |
- [_extractPending]() { |
- dart.assert(this[_isCanceled]); |
- let events = this[_pending]; |
- this[_pending] = null; |
- return events; |
+ [_closeUnchecked]() { |
+ this[_state] = _StreamController._STATE_CLOSED; |
+ if (this.hasListener) { |
+ this[_sendDone](); |
+ } else if (this[_isInitialState]) { |
+ this[_ensurePendingEvents]().add(new _DelayedDone()); |
+ } |
} |
- onData(handleData) { |
- if (handleData === null) |
- handleData = _nullDataHandler; |
- this[_onData] = this[_zone].registerUnaryCallback(dart.as(handleData, dart.throw_("Unimplemented type (dynamic) → dynamic"))); |
+ [_add](value) { |
+ if (this.hasListener) { |
+ this[_sendData](value); |
+ } else if (this[_isInitialState]) { |
+ this[_ensurePendingEvents]().add(new _DelayedData(value)); |
+ } |
} |
- onError(handleError) { |
- if (handleError === null) |
- handleError = _nullErrorHandler; |
- this[_onError] = _registerErrorHandler(handleError, this[_zone]); |
+ [_addError](error, stackTrace) { |
+ if (this.hasListener) { |
+ this[_sendError](error, stackTrace); |
+ } else if (this[_isInitialState]) { |
+ this[_ensurePendingEvents]().add(new _DelayedError(error, stackTrace)); |
+ } |
} |
- onDone(handleDone) { |
- if (handleDone === null) |
- handleDone = _nullDoneHandler; |
- this[_onDone] = this[_zone].registerCallback(handleDone); |
+ [_close]() { |
+ dart.assert(this[_isAddingStream]); |
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ this[_varData] = addState.varData; |
+ this[_state] = ~dart.notNull(_StreamController._STATE_ADDSTREAM); |
+ addState.complete(); |
} |
- pause(resumeSignal) { |
- if (resumeSignal === void 0) |
- resumeSignal = null; |
- if (this[_isCanceled]) |
- return; |
- let wasPaused = this[_isPaused]; |
- let wasInputPaused = this[_isInputPaused]; |
- this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
- if (resumeSignal !== null) |
- resumeSignal.whenComplete(this.resume); |
- if (!dart.notNull(wasPaused) && dart.notNull(this[_pending] !== null)) |
- this[_pending].cancelSchedule(); |
- if (!dart.notNull(wasInputPaused) && !dart.notNull(this[_inCallback])) |
- this[_guardCallback](this[_onPause]); |
+ [_subscribe](onData, onError, onDone, cancelOnError) { |
+ if (!dart.notNull(this[_isInitialState])) { |
+ throw new core.StateError("Stream has already been listened to."); |
+ } |
+ let subscription = new _ControllerSubscription(this, dart.as(onData, dart.throw_("Unimplemented type (dynamic) → void")), onError, onDone, cancelOnError); |
+ let pendingEvents = this[_pendingEvents]; |
+ this[_state] = _StreamController._STATE_SUBSCRIBED; |
+ if (this[_isAddingStream]) { |
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ addState.varData = subscription; |
+ addState.resume(); |
+ } else { |
+ this[_varData] = subscription; |
+ } |
+ subscription._setPendingEvents(pendingEvents); |
+ subscription._guardCallback((() => { |
+ _runGuarded(this[_onListen]); |
+ }).bind(this)); |
+ return dart.as(subscription, StreamSubscription$(T)); |
} |
- resume() { |
- if (this[_isCanceled]) |
- return; |
- if (this[_isPaused]) { |
- this[_decrementPauseCount](); |
- if (!dart.notNull(this[_isPaused])) { |
- if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_pending].isEmpty)) { |
- this[_pending].schedule(this); |
- } else { |
- dart.assert(this[_mayResumeInput]); |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
- if (!dart.notNull(this[_inCallback])) |
- this[_guardCallback](this[_onResume]); |
+ [_recordCancel](subscription) { |
+ let result = null; |
+ if (this[_isAddingStream]) { |
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ result = addState.cancel(); |
+ } |
+ this[_varData] = null; |
+ this[_state] = dart.notNull(this[_state]) & ~(dart.notNull(_StreamController._STATE_SUBSCRIBED) | dart.notNull(_StreamController._STATE_ADDSTREAM)) | dart.notNull(_StreamController._STATE_CANCELED); |
+ if (this[_onCancel] !== null) { |
+ if (result === null) { |
+ try { |
+ result = dart.as(this[_onCancel](), Future); |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ result = ((_) => { |
+ _._asyncCompleteError(e, s); |
+ return _; |
+ }).bind(this)(new _Future()); |
} |
+ |
+ } else { |
+ result = result.whenComplete(this[_onCancel]); |
} |
} |
- } |
- cancel() { |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL); |
- if (this[_isCanceled]) |
- return this[_cancelFuture]; |
- this[_cancel](); |
- return this[_cancelFuture]; |
- } |
- asFuture(futureValue) { |
- if (futureValue === void 0) |
- futureValue = null; |
- let result = new _Future(); |
- this[_onDone] = (() => { |
- result._complete(futureValue); |
- }).bind(this); |
- this[_onError] = ((error, stackTrace) => { |
- this.cancel(); |
- result._completeError(error, dart.as(stackTrace, core.StackTrace)); |
- }).bind(this); |
+ // Function complete: () → void |
+ function complete() { |
+ if (dart.notNull(this[_doneFuture] !== null) && dart.notNull(this[_doneFuture][_mayComplete])) { |
+ this[_doneFuture]._asyncComplete(null); |
+ } |
+ } |
+ if (result !== null) { |
+ result = result.whenComplete(complete); |
+ } else { |
+ complete(); |
+ } |
return result; |
} |
- get [_isInputPaused]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED)) !== 0; |
- } |
- get [_isClosed]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CLOSED)) !== 0; |
- } |
- get [_isCanceled]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCELED)) !== 0; |
- } |
- get [_waitsForCancel]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL)) !== 0; |
- } |
- get [_inCallback]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK)) !== 0; |
+ [_recordPause](subscription) { |
+ if (this[_isAddingStream]) { |
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ addState.pause(); |
+ } |
+ _runGuarded(this[_onPause]); |
} |
- get [_hasPending]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING)) !== 0; |
+ [_recordResume](subscription) { |
+ if (this[_isAddingStream]) { |
+ let addState = dart.as(this[_varData], _StreamControllerAddStreamState); |
+ addState.resume(); |
+ } |
+ _runGuarded(this[_onResume]); |
} |
- get [_isPaused]() { |
- return dart.notNull(this[_state]) >= dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT); |
+ } |
+ _StreamController._STATE_INITIAL = 0; |
+ _StreamController._STATE_SUBSCRIBED = 1; |
+ _StreamController._STATE_CANCELED = 2; |
+ _StreamController._STATE_SUBSCRIPTION_MASK = 3; |
+ _StreamController._STATE_CLOSED = 4; |
+ _StreamController._STATE_ADDSTREAM = 8; |
+ return _StreamController; |
+ }); |
+ let _StreamController = _StreamController$(dart.dynamic); |
+ let _SyncStreamControllerDispatch$ = dart.generic(function(T) { |
+ class _SyncStreamControllerDispatch extends core.Object { |
+ [_sendData](data) { |
+ this[_subscription]._add(data); |
} |
- get [_canFire]() { |
- return dart.notNull(this[_state]) < dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
+ [_sendError](error, stackTrace) { |
+ this[_subscription]._addError(error, stackTrace); |
} |
- get [_mayResumeInput]() { |
- return !dart.notNull(this[_isPaused]) && (dart.notNull(this[_pending] === null) || dart.notNull(this[_pending].isEmpty)); |
+ [_sendDone]() { |
+ this[_subscription]._close(); |
} |
- get [_cancelOnError]() { |
- return (dart.notNull(this[_state]) & dart.notNull(_BufferingStreamSubscription._STATE_CANCEL_ON_ERROR)) !== 0; |
+ } |
+ return _SyncStreamControllerDispatch; |
+ }); |
+ let _SyncStreamControllerDispatch = _SyncStreamControllerDispatch$(dart.dynamic); |
+ let _AsyncStreamControllerDispatch$ = dart.generic(function(T) { |
+ class _AsyncStreamControllerDispatch extends core.Object { |
+ [_sendData](data) { |
+ this[_subscription]._addPending(new _DelayedData(data)); |
} |
- get isPaused() { |
- return this[_isPaused]; |
+ [_sendError](error, stackTrace) { |
+ this[_subscription]._addPending(new _DelayedError(error, stackTrace)); |
} |
- [_cancel]() { |
- this[_state] = _BufferingStreamSubscription._STATE_CANCELED; |
- if (this[_hasPending]) { |
- this[_pending].cancelSchedule(); |
- } |
- if (!dart.notNull(this[_inCallback])) |
- this[_pending] = null; |
- this[_cancelFuture] = this[_onCancel](); |
+ [_sendDone]() { |
+ this[_subscription]._addPending(new _DelayedDone()); |
} |
- [_incrementPauseCount]() { |
- this[_state] = dart.notNull(this[_state]) + dart.notNull(_BufferingStreamSubscription._STATE_PAUSE_COUNT) | dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
+ } |
+ return _AsyncStreamControllerDispatch; |
+ }); |
+ let _AsyncStreamControllerDispatch = _AsyncStreamControllerDispatch$(dart.dynamic); |
+ let _AsyncStreamController$ = dart.generic(function(T) { |
+ class _AsyncStreamController extends dart.mixin(_StreamController$(T), _AsyncStreamControllerDispatch$(T)) { |
+ _AsyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) { |
+ this[_onListen] = $_onListen; |
+ this[_onPause] = $_onPause; |
+ this[_onResume] = $_onResume; |
+ this[_onCancel] = $_onCancel; |
+ super._StreamController(); |
} |
- [_decrementPauseCount]() { |
- dart.assert(this[_isPaused]); |
- this[_state] = _BufferingStreamSubscription._STATE_PAUSE_COUNT; |
+ } |
+ return _AsyncStreamController; |
+ }); |
+ let _AsyncStreamController = _AsyncStreamController$(dart.dynamic); |
+ let _SyncStreamController$ = dart.generic(function(T) { |
+ class _SyncStreamController extends dart.mixin(_StreamController$(T), _SyncStreamControllerDispatch$(T)) { |
+ _SyncStreamController($_onListen, $_onPause, $_onResume, $_onCancel) { |
+ this[_onListen] = $_onListen; |
+ this[_onPause] = $_onPause; |
+ this[_onResume] = $_onResume; |
+ this[_onCancel] = $_onCancel; |
+ super._StreamController(); |
} |
- [_add](data) { |
- dart.assert(!dart.notNull(this[_isClosed])); |
- if (this[_isCanceled]) |
- return; |
- if (this[_canFire]) { |
- this[_sendData](data); |
- } else { |
- this[_addPending](new _DelayedData(data)); |
- } |
+ } |
+ return _SyncStreamController; |
+ }); |
+ let _SyncStreamController = _SyncStreamController$(dart.dynamic); |
+ class _NoCallbacks extends core.Object { |
+ get [_onListen]() { |
+ return null; |
+ } |
+ get [_onPause]() { |
+ return null; |
+ } |
+ get [_onResume]() { |
+ return null; |
+ } |
+ get [_onCancel]() { |
+ return null; |
+ } |
+ } |
+ class _NoCallbackAsyncStreamController extends dart.mixin(_AsyncStreamControllerDispatch, _NoCallbacks) { |
+ } |
+ class _NoCallbackSyncStreamController extends dart.mixin(_SyncStreamControllerDispatch, _NoCallbacks) { |
+ } |
+ // Function _runGuarded: (() → dynamic) → Future<dynamic> |
+ function _runGuarded(notificationHandler) { |
+ if (notificationHandler === null) |
+ return null; |
+ try { |
+ let result = notificationHandler(); |
+ if (dart.is(result, Future)) |
+ return dart.as(result, Future); |
+ return null; |
+ } catch (e) { |
+ let s = dart.stackTrace(e); |
+ Zone.current.handleUncaughtError(e, s); |
+ } |
+ |
+ } |
+ let _target = Symbol('_target'); |
+ let _StreamSinkWrapper$ = dart.generic(function(T) { |
+ class _StreamSinkWrapper extends core.Object { |
+ _StreamSinkWrapper($_target) { |
+ this[_target] = $_target; |
} |
- [_addError](error, stackTrace) { |
- if (this[_isCanceled]) |
- return; |
- if (this[_canFire]) { |
- this[_sendError](error, stackTrace); |
- } else { |
- this[_addPending](new _DelayedError(error, stackTrace)); |
- } |
+ add(data) { |
+ this[_target].add(data); |
} |
- [_close]() { |
- dart.assert(!dart.notNull(this[_isClosed])); |
- if (this[_isCanceled]) |
- return; |
- this[_state] = _BufferingStreamSubscription._STATE_CLOSED; |
- if (this[_canFire]) { |
- this[_sendDone](); |
- } else { |
- this[_addPending](new _DelayedDone()); |
- } |
+ addError(error, stackTrace) { |
+ if (stackTrace === void 0) |
+ stackTrace = null; |
+ this[_target].addError(error, stackTrace); |
} |
- [_onPause]() { |
- dart.assert(this[_isInputPaused]); |
+ close() { |
+ return this[_target].close(); |
} |
- [_onResume]() { |
- dart.assert(!dart.notNull(this[_isInputPaused])); |
+ addStream(source, opt$) { |
+ let cancelOnError = opt$.cancelOnError === void 0 ? true : opt$.cancelOnError; |
+ return this[_target].addStream(source, {cancelOnError: cancelOnError}); |
} |
- [_onCancel]() { |
- dart.assert(this[_isCanceled]); |
- return null; |
+ get done() { |
+ return this[_target].done; |
} |
- [_addPending](event) { |
- let pending = dart.as(this[_pending], _StreamImplEvents); |
- if (this[_pending] === null) |
- pending = this[_pending] = new _StreamImplEvents(); |
- pending.add(event); |
- if (!dart.notNull(this[_hasPending])) { |
- this[_state] = _BufferingStreamSubscription._STATE_HAS_PENDING; |
- if (!dart.notNull(this[_isPaused])) { |
- this[_pending].schedule(this); |
- } |
- } |
+ } |
+ return _StreamSinkWrapper; |
+ }); |
+ let _StreamSinkWrapper = _StreamSinkWrapper$(dart.dynamic); |
+ let _AddStreamState$ = dart.generic(function(T) { |
+ class _AddStreamState extends core.Object { |
+ _AddStreamState(controller, source, cancelOnError) { |
+ this.addStreamFuture = new _Future(); |
+ this.addSubscription = source.listen(dart.as(controller[_add], dart.throw_("Unimplemented type (dynamic) → void")), {onError: dart.as(cancelOnError ? makeErrorHandler(controller) : controller[_addError], core.Function), onDone: controller[_close], cancelOnError: cancelOnError}); |
} |
- [_sendData](data) { |
- dart.assert(!dart.notNull(this[_isCanceled])); |
- dart.assert(!dart.notNull(this[_isPaused])); |
- dart.assert(!dart.notNull(this[_inCallback])); |
- let wasInputPaused = this[_isInputPaused]; |
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
- this[_zone].runUnaryGuarded(dart.as(this[_onData], dart.throw_("Unimplemented type (dynamic) → dynamic")), data); |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
- this[_checkState](wasInputPaused); |
+ static makeErrorHandler(controller) { |
+ return ((e, s) => { |
+ controller._addError(e, s); |
+ controller._close(); |
+ }).bind(this); |
} |
- [_sendError](error, stackTrace) { |
- dart.assert(!dart.notNull(this[_isCanceled])); |
- dart.assert(!dart.notNull(this[_isPaused])); |
- dart.assert(!dart.notNull(this[_inCallback])); |
- let wasInputPaused = this[_isInputPaused]; |
- // Function sendError: () → void |
- function sendError() { |
- if (dart.notNull(this[_isCanceled]) && !dart.notNull(this[_waitsForCancel])) |
- return; |
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
- if (dart.is(this[_onError], ZoneBinaryCallback)) { |
- this[_zone].runBinaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic, dynamic) → dynamic")), error, stackTrace); |
- } else { |
- this[_zone].runUnaryGuarded(dart.as(this[_onError], dart.throw_("Unimplemented type (dynamic) → dynamic")), error); |
- } |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
- } |
- if (this[_cancelOnError]) { |
- this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL; |
- this[_cancel](); |
- if (dart.is(this[_cancelFuture], Future)) { |
- this[_cancelFuture].whenComplete(sendError); |
- } else { |
- sendError(); |
- } |
- } else { |
- sendError(); |
- this[_checkState](wasInputPaused); |
- } |
+ pause() { |
+ this.addSubscription.pause(); |
} |
- [_sendDone]() { |
- dart.assert(!dart.notNull(this[_isCanceled])); |
- dart.assert(!dart.notNull(this[_isPaused])); |
- dart.assert(!dart.notNull(this[_inCallback])); |
- // Function sendDone: () → void |
- function sendDone() { |
- if (!dart.notNull(this[_waitsForCancel])) |
- return; |
- this[_state] = dart.notNull(_BufferingStreamSubscription._STATE_CANCELED) | dart.notNull(_BufferingStreamSubscription._STATE_CLOSED) | dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
- this[_zone].runGuarded(this[_onDone]); |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
- } |
- this[_cancel](); |
- this[_state] = _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL; |
- if (dart.is(this[_cancelFuture], Future)) { |
- this[_cancelFuture].whenComplete(sendDone); |
- } else { |
- sendDone(); |
+ resume() { |
+ this.addSubscription.resume(); |
+ } |
+ cancel() { |
+ let cancel = this.addSubscription.cancel(); |
+ if (cancel === null) { |
+ this.addStreamFuture._asyncComplete(null); |
+ return null; |
} |
+ return cancel.whenComplete((() => { |
+ this.addStreamFuture._asyncComplete(null); |
+ }).bind(this)); |
} |
- [_guardCallback](callback) { |
- dart.assert(!dart.notNull(this[_inCallback])); |
- let wasInputPaused = this[_isInputPaused]; |
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
- dart.dinvokef(callback); |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
- this[_checkState](wasInputPaused); |
+ complete() { |
+ this.addStreamFuture._asyncComplete(null); |
} |
- [_checkState](wasInputPaused) { |
- dart.assert(!dart.notNull(this[_inCallback])); |
- if (dart.notNull(this[_hasPending]) && dart.notNull(this[_pending].isEmpty)) { |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_HAS_PENDING); |
- if (dart.notNull(this[_isInputPaused]) && dart.notNull(this[_mayResumeInput])) { |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_INPUT_PAUSED); |
- } |
- } |
- while (true) { |
- if (this[_isCanceled]) { |
- this[_pending] = null; |
- return; |
- } |
- let isInputPaused = this[_isInputPaused]; |
- if (wasInputPaused === isInputPaused) |
- break; |
- this[_state] = _BufferingStreamSubscription._STATE_IN_CALLBACK; |
- if (isInputPaused) { |
- this[_onPause](); |
- } else { |
- this[_onResume](); |
- } |
- this[_state] = ~dart.notNull(_BufferingStreamSubscription._STATE_IN_CALLBACK); |
- wasInputPaused = isInputPaused; |
- } |
- if (dart.notNull(this[_hasPending]) && !dart.notNull(this[_isPaused])) { |
- this[_pending].schedule(this); |
+ } |
+ return _AddStreamState; |
+ }); |
+ let _AddStreamState = _AddStreamState$(dart.dynamic); |
+ let _StreamControllerAddStreamState$ = dart.generic(function(T) { |
+ class _StreamControllerAddStreamState extends _AddStreamState$(T) { |
+ _StreamControllerAddStreamState(controller, varData, source, cancelOnError) { |
+ this.varData = varData; |
+ super._AddStreamState(dart.as(controller, _EventSink$(T)), source, cancelOnError); |
+ if (controller.isPaused) { |
+ this.addSubscription.pause(); |
} |
} |
} |
- _BufferingStreamSubscription._STATE_CANCEL_ON_ERROR = 1; |
- _BufferingStreamSubscription._STATE_CLOSED = 2; |
- _BufferingStreamSubscription._STATE_INPUT_PAUSED = 4; |
- _BufferingStreamSubscription._STATE_CANCELED = 8; |
- _BufferingStreamSubscription._STATE_WAIT_FOR_CANCEL = 16; |
- _BufferingStreamSubscription._STATE_IN_CALLBACK = 32; |
- _BufferingStreamSubscription._STATE_HAS_PENDING = 64; |
- _BufferingStreamSubscription._STATE_PAUSE_COUNT = 128; |
- _BufferingStreamSubscription._STATE_PAUSE_COUNT_SHIFT = 7; |
- return _BufferingStreamSubscription; |
+ return _StreamControllerAddStreamState; |
}); |
- let _BufferingStreamSubscription = _BufferingStreamSubscription$(dart.dynamic); |
- let _StreamImpl$ = dart.generic(function(T) { |
- class _StreamImpl extends Stream$(T) { |
- listen(onData, opt$) { |
- let onError = opt$.onError === void 0 ? null : opt$.onError; |
- let onDone = opt$.onDone === void 0 ? null : opt$.onDone; |
- let cancelOnError = opt$.cancelOnError === void 0 ? null : opt$.cancelOnError; |
- cancelOnError = core.identical(true, cancelOnError); |
- let subscription = this[_createSubscription](onData, onError, onDone, cancelOnError); |
- this[_onListen](subscription); |
- return dart.as(subscription, StreamSubscription$(T)); |
- } |
- [_createSubscription](onData, onError, onDone, cancelOnError) { |
- return new _BufferingStreamSubscription(onData, onError, onDone, cancelOnError); |
- } |
- [_onListen](subscription) {} |
+ let _StreamControllerAddStreamState = _StreamControllerAddStreamState$(dart.dynamic); |
+ let _EventSink$ = dart.generic(function(T) { |
+ class _EventSink extends core.Object { |
} |
- return _StreamImpl; |
+ return _EventSink; |
}); |
- let _StreamImpl = _StreamImpl$(dart.dynamic); |
+ let _EventSink = _EventSink$(dart.dynamic); |
+ let _EventDispatch$ = dart.generic(function(T) { |
+ class _EventDispatch extends core.Object { |
+ } |
+ return _EventDispatch; |
+ }); |
+ let _EventDispatch = _EventDispatch$(dart.dynamic); |
let _isUsed = Symbol('_isUsed'); |
let _GeneratedStreamImpl$ = dart.generic(function(T) { |
class _GeneratedStreamImpl extends _StreamImpl$(T) { |
@@ -3024,6 +3033,43 @@ var async; |
}); |
let _GeneratedStreamImpl = _GeneratedStreamImpl$(dart.dynamic); |
let _iterator = Symbol('_iterator'); |
+ let _eventScheduled = Symbol('_eventScheduled'); |
+ class _PendingEvents extends core.Object { |
+ _PendingEvents() { |
+ this[_state] = _PendingEvents._STATE_UNSCHEDULED; |
+ } |
+ get isScheduled() { |
+ return this[_state] === _PendingEvents._STATE_SCHEDULED; |
+ } |
+ get [_eventScheduled]() { |
+ return dart.notNull(this[_state]) >= dart.notNull(_PendingEvents._STATE_SCHEDULED); |
+ } |
+ schedule(dispatch) { |
+ if (this.isScheduled) |
+ return; |
+ dart.assert(!dart.notNull(this.isEmpty)); |
+ if (this[_eventScheduled]) { |
+ dart.assert(this[_state] === _PendingEvents._STATE_CANCELED); |
+ this[_state] = _PendingEvents._STATE_SCHEDULED; |
+ return; |
+ } |
+ scheduleMicrotask((() => { |
+ let oldState = this[_state]; |
+ this[_state] = _PendingEvents._STATE_UNSCHEDULED; |
+ if (oldState === _PendingEvents._STATE_CANCELED) |
+ return; |
+ this.handleNext(dispatch); |
+ }).bind(this)); |
+ this[_state] = _PendingEvents._STATE_SCHEDULED; |
+ } |
+ cancelSchedule() { |
+ if (this.isScheduled) |
+ this[_state] = _PendingEvents._STATE_CANCELED; |
+ } |
+ } |
+ _PendingEvents._STATE_UNSCHEDULED = 0; |
+ _PendingEvents._STATE_SCHEDULED = 1; |
+ _PendingEvents._STATE_CANCELED = 3; |
let _IterablePendingEvents$ = dart.generic(function(T) { |
class _IterablePendingEvents extends _PendingEvents { |
_IterablePendingEvents(data) { |
@@ -3116,43 +3162,6 @@ var async; |
throw new core.StateError("No events after a done."); |
} |
} |
- let _eventScheduled = Symbol('_eventScheduled'); |
- class _PendingEvents extends core.Object { |
- _PendingEvents() { |
- this[_state] = _PendingEvents._STATE_UNSCHEDULED; |
- } |
- get isScheduled() { |
- return this[_state] === _PendingEvents._STATE_SCHEDULED; |
- } |
- get [_eventScheduled]() { |
- return dart.notNull(this[_state]) >= dart.notNull(_PendingEvents._STATE_SCHEDULED); |
- } |
- schedule(dispatch) { |
- if (this.isScheduled) |
- return; |
- dart.assert(!dart.notNull(this.isEmpty)); |
- if (this[_eventScheduled]) { |
- dart.assert(this[_state] === _PendingEvents._STATE_CANCELED); |
- this[_state] = _PendingEvents._STATE_SCHEDULED; |
- return; |
- } |
- scheduleMicrotask((() => { |
- let oldState = this[_state]; |
- this[_state] = _PendingEvents._STATE_UNSCHEDULED; |
- if (oldState === _PendingEvents._STATE_CANCELED) |
- return; |
- this.handleNext(dispatch); |
- }).bind(this)); |
- this[_state] = _PendingEvents._STATE_SCHEDULED; |
- } |
- cancelSchedule() { |
- if (this.isScheduled) |
- this[_state] = _PendingEvents._STATE_CANCELED; |
- } |
- } |
- _PendingEvents._STATE_UNSCHEDULED = 0; |
- _PendingEvents._STATE_SCHEDULED = 1; |
- _PendingEvents._STATE_CANCELED = 3; |
class _StreamImplEvents extends _PendingEvents { |
_StreamImplEvents() { |
this.firstPendingEvent = null; |
@@ -4167,15 +4176,6 @@ var async; |
} |
} |
dart.defineNamedConstructor(Timer, 'periodic'); |
- class AsyncError extends core.Object { |
- AsyncError(error, stackTrace) { |
- this.error = error; |
- this.stackTrace = stackTrace; |
- } |
- toString() { |
- return dart.as(dart.dinvoke(this.error, 'toString'), core.String); |
- } |
- } |
class _ZoneFunction extends core.Object { |
_ZoneFunction(zone, function) { |
this.zone = zone; |
@@ -4917,6 +4917,9 @@ var async; |
} |
}); |
// Exports: |
+ exports.AsyncError = AsyncError; |
+ exports.Stream = Stream; |
+ exports.Stream$ = Stream$; |
exports.DeferredLibrary = DeferredLibrary; |
exports.DeferredLoadException = DeferredLoadException; |
exports.Future = Future; |
@@ -4925,8 +4928,6 @@ var async; |
exports.Completer = Completer; |
exports.Completer$ = Completer$; |
exports.scheduleMicrotask = scheduleMicrotask; |
- exports.Stream = Stream; |
- exports.Stream$ = Stream$; |
exports.StreamSubscription = StreamSubscription; |
exports.StreamSubscription$ = StreamSubscription$; |
exports.EventSink = EventSink; |
@@ -4944,7 +4945,6 @@ var async; |
exports.StreamController = StreamController; |
exports.StreamController$ = StreamController$; |
exports.Timer = Timer; |
- exports.AsyncError = AsyncError; |
exports.ZoneSpecification = ZoneSpecification; |
exports.ZoneDelegate = ZoneDelegate; |
exports.Zone = Zone; |