| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index b2a7a215b81caeee9e762136cb308cfe4f61d213..26343b69b35dee57f03ef60331b095f593b3825c 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -104,13 +104,12 @@ abstract class Stream<T> {
|
| // to wait for a listener before doing the `then` on the future.
|
| _StreamController<T> controller = new StreamController<T>(sync: true);
|
| future.then((value) {
|
| - controller._add(value);
|
| - controller._closeUnchecked();
|
| - },
|
| - onError: (error, stackTrace) {
|
| - controller._addError(error, stackTrace);
|
| - controller._closeUnchecked();
|
| - });
|
| + controller._add(value);
|
| + controller._closeUnchecked();
|
| + }, onError: (error, stackTrace) {
|
| + controller._addError(error, stackTrace);
|
| + controller._closeUnchecked();
|
| + });
|
| return controller.stream;
|
| }
|
|
|
| @@ -181,7 +180,7 @@ abstract class Stream<T> {
|
| * If [computation] is omitted the event values will all be `null`.
|
| */
|
| factory Stream.periodic(Duration period,
|
| - [T computation(int computationCount)]) {
|
| + [T computation(int computationCount)]) {
|
| Timer timer;
|
| int computationCount = 0;
|
| StreamController<T> controller;
|
| @@ -209,7 +208,8 @@ abstract class Stream<T> {
|
| });
|
| }
|
|
|
| - controller = new StreamController<T>(sync: true,
|
| + controller = new StreamController<T>(
|
| + sync: true,
|
| onListen: () {
|
| watch.start();
|
| startPeriodicTimer();
|
| @@ -277,8 +277,8 @@ abstract class Stream<T> {
|
| *
|
| * The resulting stream is a broadcast stream if [source] is.
|
| */
|
| - factory Stream.eventTransformed(Stream source,
|
| - EventSink mapSink(EventSink<T> sink)) {
|
| + factory Stream.eventTransformed(
|
| + Stream source, EventSink mapSink(EventSink<T> sink)) {
|
| return new _BoundSinkStream(source, mapSink);
|
| }
|
|
|
| @@ -308,9 +308,9 @@ abstract class Stream<T> {
|
| * while having no subscribers to prevent losing events, or canceling the
|
| * subscription when there are no listeners.
|
| */
|
| - Stream<T> asBroadcastStream({
|
| - void onListen(StreamSubscription<T> subscription),
|
| - void onCancel(StreamSubscription<T> subscription) }) {
|
| + Stream<T> asBroadcastStream(
|
| + {void onListen(StreamSubscription<T> subscription),
|
| + void onCancel(StreamSubscription<T> subscription)}) {
|
| return new _AsBroadcastStream<T>(this, onListen, onCancel);
|
| }
|
|
|
| @@ -350,9 +350,7 @@ abstract class Stream<T> {
|
| * event handler functions are called.
|
| */
|
| StreamSubscription<T> listen(void onData(T event),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError});
|
| + {Function onError, void onDone(), bool cancelOnError});
|
|
|
| /**
|
| * Creates a new stream from this stream that discards some data events.
|
| @@ -405,46 +403,46 @@ abstract class Stream<T> {
|
| void onListen() {
|
| final add = controller.add;
|
| assert(controller is _StreamController ||
|
| - controller is _BroadcastStreamController);
|
| - final _EventSink<E> eventSink =
|
| - controller as Object /*=_EventSink<E>*/;
|
| + controller is _BroadcastStreamController);
|
| + final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
|
| final addError = eventSink._addError;
|
| - subscription = this.listen(
|
| - (T event) {
|
| - dynamic newValue;
|
| - try {
|
| - newValue = convert(event);
|
| - } catch (e, s) {
|
| - controller.addError(e, s);
|
| - return;
|
| - }
|
| - if (newValue is Future) {
|
| - subscription.pause();
|
| - newValue.then(add, onError: addError)
|
| - .whenComplete(subscription.resume);
|
| - } else {
|
| - controller.add(newValue as Object/*=E*/);
|
| - }
|
| - },
|
| - onError: addError,
|
| - onDone: controller.close
|
| - );
|
| + subscription = this.listen((T event) {
|
| + dynamic newValue;
|
| + try {
|
| + newValue = convert(event);
|
| + } catch (e, s) {
|
| + controller.addError(e, s);
|
| + return;
|
| + }
|
| + if (newValue is Future) {
|
| + subscription.pause();
|
| + newValue
|
| + .then(add, onError: addError)
|
| + .whenComplete(subscription.resume);
|
| + } else {
|
| + controller.add(newValue as Object/*=E*/);
|
| + }
|
| + }, onError: addError, onDone: controller.close);
|
| }
|
|
|
| if (this.isBroadcast) {
|
| controller = new StreamController<E>.broadcast(
|
| - onListen: onListen,
|
| - onCancel: () { subscription.cancel(); },
|
| - sync: true
|
| - );
|
| + onListen: onListen,
|
| + onCancel: () {
|
| + subscription.cancel();
|
| + },
|
| + sync: true);
|
| } else {
|
| controller = new StreamController<E>(
|
| - onListen: onListen,
|
| - onPause: () { subscription.pause(); },
|
| - onResume: () { subscription.resume(); },
|
| - onCancel: () => subscription.cancel(),
|
| - sync: true
|
| - );
|
| + onListen: onListen,
|
| + onPause: () {
|
| + subscription.pause();
|
| + },
|
| + onResume: () {
|
| + subscription.resume();
|
| + },
|
| + onCancel: () => subscription.cancel(),
|
| + sync: true);
|
| }
|
| return controller.stream;
|
| }
|
| @@ -467,42 +465,43 @@ abstract class Stream<T> {
|
| StreamSubscription<T> subscription;
|
| void onListen() {
|
| assert(controller is _StreamController ||
|
| - controller is _BroadcastStreamController);
|
| - final _EventSink<E> eventSink =
|
| - controller as Object /*=_EventSink<E>*/;
|
| - subscription = this.listen(
|
| - (T event) {
|
| - Stream<E> newStream;
|
| - try {
|
| - newStream = convert(event);
|
| - } catch (e, s) {
|
| - controller.addError(e, s);
|
| - return;
|
| - }
|
| - if (newStream != null) {
|
| - subscription.pause();
|
| - controller.addStream(newStream)
|
| - .whenComplete(subscription.resume);
|
| - }
|
| - },
|
| - onError: eventSink._addError, // Avoid Zone error replacement.
|
| - onDone: controller.close
|
| - );
|
| + controller is _BroadcastStreamController);
|
| + final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
|
| + subscription = this.listen((T event) {
|
| + Stream<E> newStream;
|
| + try {
|
| + newStream = convert(event);
|
| + } catch (e, s) {
|
| + controller.addError(e, s);
|
| + return;
|
| + }
|
| + if (newStream != null) {
|
| + subscription.pause();
|
| + controller.addStream(newStream).whenComplete(subscription.resume);
|
| + }
|
| + },
|
| + onError: eventSink._addError, // Avoid Zone error replacement.
|
| + onDone: controller.close);
|
| }
|
| +
|
| if (this.isBroadcast) {
|
| controller = new StreamController<E>.broadcast(
|
| - onListen: onListen,
|
| - onCancel: () { subscription.cancel(); },
|
| - sync: true
|
| - );
|
| + onListen: onListen,
|
| + onCancel: () {
|
| + subscription.cancel();
|
| + },
|
| + sync: true);
|
| } else {
|
| controller = new StreamController<E>(
|
| - onListen: onListen,
|
| - onPause: () { subscription.pause(); },
|
| - onResume: () { subscription.resume(); },
|
| - onCancel: () => subscription.cancel(),
|
| - sync: true
|
| - );
|
| + onListen: onListen,
|
| + onPause: () {
|
| + subscription.pause();
|
| + },
|
| + onResume: () {
|
| + subscription.resume();
|
| + },
|
| + onCancel: () => subscription.cancel(),
|
| + sync: true);
|
| }
|
| return controller.stream;
|
| }
|
| @@ -535,7 +534,7 @@ abstract class Stream<T> {
|
| * If a broadcast stream is listened to more than once, each subscription
|
| * will individually perform the `test` and handle the error.
|
| */
|
| - Stream<T> handleError(Function onError, { bool test(error) }) {
|
| + Stream<T> handleError(Function onError, {bool test(error)}) {
|
| return new _HandleErrorStream<T>(this, onError, test);
|
| }
|
|
|
| @@ -585,8 +584,7 @@ abstract class Stream<T> {
|
| * The `streamTransformer` can decide whether it wants to return a
|
| * broadcast stream or not.
|
| */
|
| - Stream<S> transform<S>(
|
| - StreamTransformer<T, S > streamTransformer) {
|
| + Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
|
| return streamTransformer.bind(this);
|
| }
|
|
|
| @@ -599,55 +597,46 @@ abstract class Stream<T> {
|
| T value;
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| - (T element) {
|
| - if (seenFirst) {
|
| - _runUserCode(() => combine(value, element),
|
| - (T newValue) { value = newValue; },
|
| - _cancelAndErrorClosure(subscription, result));
|
| - } else {
|
| - value = element;
|
| - seenFirst = true;
|
| - }
|
| - },
|
| - onError: result._completeError,
|
| - onDone: () {
|
| - if (!seenFirst) {
|
| - try {
|
| - throw IterableElementError.noElement();
|
| - } catch (e, s) {
|
| - _completeWithErrorCallback(result, e, s);
|
| + (T element) {
|
| + if (seenFirst) {
|
| + _runUserCode(() => combine(value, element), (T newValue) {
|
| + value = newValue;
|
| + }, _cancelAndErrorClosure(subscription, result));
|
| + } else {
|
| + value = element;
|
| + seenFirst = true;
|
| }
|
| - } else {
|
| - result._complete(value);
|
| - }
|
| - },
|
| - cancelOnError: true
|
| - );
|
| + },
|
| + onError: result._completeError,
|
| + onDone: () {
|
| + if (!seenFirst) {
|
| + try {
|
| + throw IterableElementError.noElement();
|
| + } catch (e, s) {
|
| + _completeWithErrorCallback(result, e, s);
|
| + }
|
| + } else {
|
| + result._complete(value);
|
| + }
|
| + },
|
| + cancelOnError: true);
|
| return result;
|
| }
|
|
|
| /** Reduces a sequence of values by repeatedly applying [combine]. */
|
| - Future<S> fold<S>(S initialValue,
|
| - S combine(S previous, T element)) {
|
| -
|
| + Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
|
| _Future<S> result = new _Future<S>();
|
| S value = initialValue;
|
| StreamSubscription subscription;
|
| - subscription = this.listen(
|
| - (T element) {
|
| - _runUserCode(
|
| - () => combine(value, element),
|
| - (S newValue) { value = newValue; },
|
| - _cancelAndErrorClosure(subscription, result)
|
| - );
|
| - },
|
| - onError: (e, st) {
|
| - result._completeError(e, st);
|
| - },
|
| - onDone: () {
|
| - result._complete(value);
|
| - },
|
| - cancelOnError: true);
|
| + subscription = this.listen((T element) {
|
| + _runUserCode(() => combine(value, element), (S newValue) {
|
| + value = newValue;
|
| + }, _cancelAndErrorClosure(subscription, result));
|
| + }, onError: (e, st) {
|
| + result._completeError(e, st);
|
| + }, onDone: () {
|
| + result._complete(value);
|
| + }, cancelOnError: true);
|
| return result;
|
| }
|
|
|
| @@ -666,25 +655,21 @@ abstract class Stream<T> {
|
| StringBuffer buffer = new StringBuffer();
|
| StreamSubscription subscription;
|
| bool first = true;
|
| - subscription = this.listen(
|
| - (T element) {
|
| - if (!first) {
|
| - buffer.write(separator);
|
| - }
|
| - first = false;
|
| - try {
|
| - buffer.write(element);
|
| - } catch (e, s) {
|
| - _cancelAndErrorWithReplacement(subscription, result, e, s);
|
| - }
|
| - },
|
| - onError: (e) {
|
| - result._completeError(e);
|
| - },
|
| - onDone: () {
|
| - result._complete(buffer.toString());
|
| - },
|
| - cancelOnError: true);
|
| + subscription = this.listen((T element) {
|
| + if (!first) {
|
| + buffer.write(separator);
|
| + }
|
| + first = false;
|
| + try {
|
| + buffer.write(element);
|
| + } catch (e, s) {
|
| + _cancelAndErrorWithReplacement(subscription, result, e, s);
|
| + }
|
| + }, onError: (e) {
|
| + result._completeError(e);
|
| + }, onDone: () {
|
| + result._complete(buffer.toString());
|
| + }, cancelOnError: true);
|
| return result;
|
| }
|
|
|
| @@ -699,15 +684,11 @@ abstract class Stream<T> {
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| (T element) {
|
| - _runUserCode(
|
| - () => (element == needle),
|
| - (bool isMatch) {
|
| - if (isMatch) {
|
| - _cancelAndValue(subscription, future, true);
|
| - }
|
| - },
|
| - _cancelAndErrorClosure(subscription, future)
|
| - );
|
| + _runUserCode(() => (element == needle), (bool isMatch) {
|
| + if (isMatch) {
|
| + _cancelAndValue(subscription, future, true);
|
| + }
|
| + }, _cancelAndErrorClosure(subscription, future));
|
| },
|
| onError: future._completeError,
|
| onDone: () {
|
| @@ -729,11 +710,8 @@ abstract class Stream<T> {
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| (T element) {
|
| - _runUserCode(
|
| - () => action(element),
|
| - (_) {},
|
| - _cancelAndErrorClosure(subscription, future)
|
| - );
|
| + _runUserCode(() => action(element), (_) {},
|
| + _cancelAndErrorClosure(subscription, future));
|
| },
|
| onError: future._completeError,
|
| onDone: () {
|
| @@ -754,15 +732,11 @@ abstract class Stream<T> {
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| (T element) {
|
| - _runUserCode(
|
| - () => test(element),
|
| - (bool isMatch) {
|
| - if (!isMatch) {
|
| - _cancelAndValue(subscription, future, false);
|
| - }
|
| - },
|
| - _cancelAndErrorClosure(subscription, future)
|
| - );
|
| + _runUserCode(() => test(element), (bool isMatch) {
|
| + if (!isMatch) {
|
| + _cancelAndValue(subscription, future, false);
|
| + }
|
| + }, _cancelAndErrorClosure(subscription, future));
|
| },
|
| onError: future._completeError,
|
| onDone: () {
|
| @@ -791,15 +765,11 @@ abstract class Stream<T> {
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| (T element) {
|
| - _runUserCode(
|
| - () => test(element),
|
| - (bool isMatch) {
|
| - if (isMatch) {
|
| - _cancelAndValue(subscription, future, true);
|
| - }
|
| - },
|
| - _cancelAndErrorClosure(subscription, future)
|
| - );
|
| + _runUserCode(() => test(element), (bool isMatch) {
|
| + if (isMatch) {
|
| + _cancelAndValue(subscription, future, true);
|
| + }
|
| + }, _cancelAndErrorClosure(subscription, future));
|
| },
|
| onError: future._completeError,
|
| onDone: () {
|
| @@ -809,18 +779,19 @@ abstract class Stream<T> {
|
| return future;
|
| }
|
|
|
| -
|
| /** Counts the elements in the stream. */
|
| Future<int> get length {
|
| _Future<int> future = new _Future<int>();
|
| int count = 0;
|
| this.listen(
|
| - (_) { count++; },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - future._complete(count);
|
| - },
|
| - cancelOnError: true);
|
| + (_) {
|
| + count++;
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + future._complete(count);
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -837,14 +808,14 @@ abstract class Stream<T> {
|
| _Future<bool> future = new _Future<bool>();
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| - (_) {
|
| - _cancelAndValue(subscription, future, false);
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - future._complete(true);
|
| - },
|
| - cancelOnError: true);
|
| + (_) {
|
| + _cancelAndValue(subscription, future, false);
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + future._complete(true);
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -853,14 +824,14 @@ abstract class Stream<T> {
|
| List<T> result = <T>[];
|
| _Future<List<T>> future = new _Future<List<T>>();
|
| this.listen(
|
| - (T data) {
|
| - result.add(data);
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - future._complete(result);
|
| - },
|
| - cancelOnError: true);
|
| + (T data) {
|
| + result.add(data);
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + future._complete(result);
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -877,14 +848,14 @@ abstract class Stream<T> {
|
| Set<T> result = new Set<T>();
|
| _Future<Set<T>> future = new _Future<Set<T>>();
|
| this.listen(
|
| - (T data) {
|
| - result.add(data);
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - future._complete(result);
|
| - },
|
| - cancelOnError: true);
|
| + (T data) {
|
| + result.add(data);
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + future._complete(result);
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -899,8 +870,8 @@ abstract class Stream<T> {
|
| * In case of a `done` event the future completes with the given
|
| * [futureValue].
|
| */
|
| - Future<E> drain<E>([E futureValue])
|
| - => listen(null, cancelOnError: true).asFuture<E>(futureValue);
|
| + Future<E> drain<E>([E futureValue]) => listen(null, cancelOnError: true)
|
| + .asFuture<E>(futureValue);
|
|
|
| /**
|
| * Provides at most the first [count] data events of this stream.
|
| @@ -1014,18 +985,18 @@ abstract class Stream<T> {
|
| _Future<T> future = new _Future<T>();
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| - (T value) {
|
| - _cancelAndValue(subscription, future, value);
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - try {
|
| - throw IterableElementError.noElement();
|
| - } catch (e, s) {
|
| - _completeWithErrorCallback(future, e, s);
|
| - }
|
| - },
|
| - cancelOnError: true);
|
| + (T value) {
|
| + _cancelAndValue(subscription, future, value);
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + try {
|
| + throw IterableElementError.noElement();
|
| + } catch (e, s) {
|
| + _completeWithErrorCallback(future, e, s);
|
| + }
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -1043,23 +1014,23 @@ abstract class Stream<T> {
|
| T result = null;
|
| bool foundResult = false;
|
| listen(
|
| - (T value) {
|
| - foundResult = true;
|
| - result = value;
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - if (foundResult) {
|
| - future._complete(result);
|
| - return;
|
| - }
|
| - try {
|
| - throw IterableElementError.noElement();
|
| - } catch (e, s) {
|
| - _completeWithErrorCallback(future, e, s);
|
| - }
|
| - },
|
| - cancelOnError: true);
|
| + (T value) {
|
| + foundResult = true;
|
| + result = value;
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._complete(result);
|
| + return;
|
| + }
|
| + try {
|
| + throw IterableElementError.noElement();
|
| + } catch (e, s) {
|
| + _completeWithErrorCallback(future, e, s);
|
| + }
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -1077,32 +1048,32 @@ abstract class Stream<T> {
|
| bool foundResult = false;
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| - (T value) {
|
| - if (foundResult) {
|
| - // This is the second element we get.
|
| + (T value) {
|
| + if (foundResult) {
|
| + // This is the second element we get.
|
| + try {
|
| + throw IterableElementError.tooMany();
|
| + } catch (e, s) {
|
| + _cancelAndErrorWithReplacement(subscription, future, e, s);
|
| + }
|
| + return;
|
| + }
|
| + foundResult = true;
|
| + result = value;
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._complete(result);
|
| + return;
|
| + }
|
| try {
|
| - throw IterableElementError.tooMany();
|
| + throw IterableElementError.noElement();
|
| } catch (e, s) {
|
| - _cancelAndErrorWithReplacement(subscription, future, e, s);
|
| + _completeWithErrorCallback(future, e, s);
|
| }
|
| - return;
|
| - }
|
| - foundResult = true;
|
| - result = value;
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - if (foundResult) {
|
| - future._complete(result);
|
| - return;
|
| - }
|
| - try {
|
| - throw IterableElementError.noElement();
|
| - } catch (e, s) {
|
| - _completeWithErrorCallback(future, e, s);
|
| - }
|
| - },
|
| - cancelOnError: true);
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -1131,30 +1102,26 @@ abstract class Stream<T> {
|
| _Future<dynamic> future = new _Future();
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| - (T value) {
|
| - _runUserCode(
|
| - () => test(value),
|
| - (bool isMatch) {
|
| + (T value) {
|
| + _runUserCode(() => test(value), (bool isMatch) {
|
| if (isMatch) {
|
| _cancelAndValue(subscription, future, value);
|
| }
|
| - },
|
| - _cancelAndErrorClosure(subscription, future)
|
| - );
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - if (defaultValue != null) {
|
| - _runUserCode(defaultValue, future._complete, future._completeError);
|
| - return;
|
| - }
|
| - try {
|
| - throw IterableElementError.noElement();
|
| - } catch (e, s) {
|
| - _completeWithErrorCallback(future, e, s);
|
| - }
|
| - },
|
| - cancelOnError: true);
|
| + }, _cancelAndErrorClosure(subscription, future));
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + if (defaultValue != null) {
|
| + _runUserCode(defaultValue, future._complete, future._completeError);
|
| + return;
|
| + }
|
| + try {
|
| + throw IterableElementError.noElement();
|
| + } catch (e, s) {
|
| + _completeWithErrorCallback(future, e, s);
|
| + }
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -1171,35 +1138,31 @@ abstract class Stream<T> {
|
| bool foundResult = false;
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| - (T value) {
|
| - _runUserCode(
|
| - () => true == test(value),
|
| - (bool isMatch) {
|
| + (T value) {
|
| + _runUserCode(() => true == test(value), (bool isMatch) {
|
| if (isMatch) {
|
| foundResult = true;
|
| result = value;
|
| }
|
| - },
|
| - _cancelAndErrorClosure(subscription, future)
|
| - );
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - if (foundResult) {
|
| - future._complete(result);
|
| - return;
|
| - }
|
| - if (defaultValue != null) {
|
| - _runUserCode(defaultValue, future._complete, future._completeError);
|
| - return;
|
| - }
|
| - try {
|
| - throw IterableElementError.noElement();
|
| - } catch (e, s) {
|
| - _completeWithErrorCallback(future, e, s);
|
| - }
|
| - },
|
| - cancelOnError: true);
|
| + }, _cancelAndErrorClosure(subscription, future));
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._complete(result);
|
| + return;
|
| + }
|
| + if (defaultValue != null) {
|
| + _runUserCode(defaultValue, future._complete, future._completeError);
|
| + return;
|
| + }
|
| + try {
|
| + throw IterableElementError.noElement();
|
| + } catch (e, s) {
|
| + _completeWithErrorCallback(future, e, s);
|
| + }
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -1215,10 +1178,8 @@ abstract class Stream<T> {
|
| bool foundResult = false;
|
| StreamSubscription subscription;
|
| subscription = this.listen(
|
| - (T value) {
|
| - _runUserCode(
|
| - () => true == test(value),
|
| - (bool isMatch) {
|
| + (T value) {
|
| + _runUserCode(() => true == test(value), (bool isMatch) {
|
| if (isMatch) {
|
| if (foundResult) {
|
| try {
|
| @@ -1231,23 +1192,21 @@ abstract class Stream<T> {
|
| foundResult = true;
|
| result = value;
|
| }
|
| - },
|
| - _cancelAndErrorClosure(subscription, future)
|
| - );
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - if (foundResult) {
|
| - future._complete(result);
|
| - return;
|
| - }
|
| - try {
|
| - throw IterableElementError.noElement();
|
| - } catch (e, s) {
|
| - _completeWithErrorCallback(future, e, s);
|
| - }
|
| - },
|
| - cancelOnError: true);
|
| + }, _cancelAndErrorClosure(subscription, future));
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + if (foundResult) {
|
| + future._complete(result);
|
| + return;
|
| + }
|
| + try {
|
| + throw IterableElementError.noElement();
|
| + } catch (e, s) {
|
| + _completeWithErrorCallback(future, e, s);
|
| + }
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -1273,19 +1232,19 @@ abstract class Stream<T> {
|
| StreamSubscription subscription;
|
| int elementIndex = 0;
|
| subscription = this.listen(
|
| - (T value) {
|
| - if (index == elementIndex) {
|
| - _cancelAndValue(subscription, future, value);
|
| - return;
|
| - }
|
| - elementIndex += 1;
|
| - },
|
| - onError: future._completeError,
|
| - onDone: () {
|
| - future._completeError(
|
| - new RangeError.index(index, this, "index", null, elementIndex));
|
| - },
|
| - cancelOnError: true);
|
| + (T value) {
|
| + if (index == elementIndex) {
|
| + _cancelAndValue(subscription, future, value);
|
| + return;
|
| + }
|
| + elementIndex += 1;
|
| + },
|
| + onError: future._completeError,
|
| + onDone: () {
|
| + future._completeError(
|
| + new RangeError.index(index, this, "index", null, elementIndex));
|
| + },
|
| + cancelOnError: true);
|
| return future;
|
| }
|
|
|
| @@ -1324,18 +1283,21 @@ abstract class Stream<T> {
|
| controller.add(event);
|
| timer = zone.createTimer(timeLimit, timeout);
|
| }
|
| +
|
| void onError(error, StackTrace stackTrace) {
|
| timer.cancel();
|
| assert(controller is _StreamController ||
|
| - controller is _BroadcastStreamController);
|
| + controller is _BroadcastStreamController);
|
| dynamic eventSink = controller;
|
| - eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
|
| + eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
|
| timer = zone.createTimer(timeLimit, timeout);
|
| }
|
| +
|
| void onDone() {
|
| timer.cancel();
|
| controller.close();
|
| }
|
| +
|
| void onListen() {
|
| // This is the onListen callback for of controller.
|
| // It runs in the same zone that the subscription was created in.
|
| @@ -1344,8 +1306,8 @@ abstract class Stream<T> {
|
| zone = Zone.current;
|
| if (onTimeout == null) {
|
| timeout = () {
|
| - controller.addError(new TimeoutException("No stream event",
|
| - timeLimit), null);
|
| + controller.addError(
|
| + new TimeoutException("No stream event", timeLimit), null);
|
| };
|
| } else {
|
| // TODO(floitsch): the return type should be 'void', and the type
|
| @@ -1355,7 +1317,7 @@ abstract class Stream<T> {
|
| _ControllerEventSinkWrapper wrapper =
|
| new _ControllerEventSinkWrapper(null);
|
| timeout = () {
|
| - wrapper._sink = controller; // Only valid during call.
|
| + wrapper._sink = controller; // Only valid during call.
|
| zone.runUnaryGuarded(registeredOnTimeout, wrapper);
|
| wrapper._sink = null;
|
| };
|
| @@ -1364,26 +1326,24 @@ abstract class Stream<T> {
|
| subscription = this.listen(onData, onError: onError, onDone: onDone);
|
| timer = zone.createTimer(timeLimit, timeout);
|
| }
|
| +
|
| Future onCancel() {
|
| timer.cancel();
|
| Future result = subscription.cancel();
|
| subscription = null;
|
| return result;
|
| }
|
| +
|
| controller = isBroadcast
|
| ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
|
| - : new _SyncStreamController<T>(
|
| - onListen,
|
| - () {
|
| - // Don't null the timer, onCancel may call cancel again.
|
| - timer.cancel();
|
| - subscription.pause();
|
| - },
|
| - () {
|
| - subscription.resume();
|
| - timer = zone.createTimer(timeLimit, timeout);
|
| - },
|
| - onCancel);
|
| + : new _SyncStreamController<T>(onListen, () {
|
| + // Don't null the timer, onCancel may call cancel again.
|
| + timer.cancel();
|
| + subscription.pause();
|
| + }, () {
|
| + subscription.resume();
|
| + timer = zone.createTimer(timeLimit, timeout);
|
| + }, onCancel);
|
| return controller.stream;
|
| }
|
| }
|
| @@ -1501,7 +1461,6 @@ abstract class StreamSubscription<T> {
|
| Future<E> asFuture<E>([E futureValue]);
|
| }
|
|
|
| -
|
| /**
|
| * An interface that abstracts creation or handling of [Stream] events.
|
| */
|
| @@ -1516,30 +1475,28 @@ abstract class EventSink<T> implements Sink<T> {
|
| void close();
|
| }
|
|
|
| -
|
| /** [Stream] wrapper that only exposes the [Stream] interface. */
|
| class StreamView<T> extends Stream<T> {
|
| final Stream<T> _stream;
|
|
|
| - const StreamView(Stream<T> stream) : _stream = stream, super._internal();
|
| + const StreamView(Stream<T> stream)
|
| + : _stream = stream,
|
| + super._internal();
|
|
|
| bool get isBroadcast => _stream.isBroadcast;
|
|
|
| Stream<T> asBroadcastStream(
|
| - {void onListen(StreamSubscription<T> subscription),
|
| - void onCancel(StreamSubscription<T> subscription)})
|
| - => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
|
| + {void onListen(StreamSubscription<T> subscription),
|
| + void onCancel(StreamSubscription<T> subscription)}) =>
|
| + _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
|
|
|
| StreamSubscription<T> listen(void onData(T value),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError }) {
|
| - return _stream.listen(onData, onError: onError, onDone: onDone,
|
| - cancelOnError: cancelOnError);
|
| + {Function onError, void onDone(), bool cancelOnError}) {
|
| + return _stream.listen(onData,
|
| + onError: onError, onDone: onDone, cancelOnError: cancelOnError);
|
| }
|
| }
|
|
|
| -
|
| /**
|
| * Abstract interface for a "sink" accepting multiple entire streams.
|
| *
|
| @@ -1589,7 +1546,6 @@ abstract class StreamConsumer<S> {
|
| Future close();
|
| }
|
|
|
| -
|
| /**
|
| * A object that accepts stream events both synchronously and asynchronously.
|
| *
|
| @@ -1648,7 +1604,6 @@ abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
|
| Future get done;
|
| }
|
|
|
| -
|
| /**
|
| * The target of a [Stream.transform] call.
|
| *
|
| @@ -1722,8 +1677,9 @@ abstract class StreamTransformer<S, T> {
|
| * });
|
| */
|
| const factory StreamTransformer(
|
| - StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
|
| - = _StreamSubscriptionTransformer<S, T>;
|
| + StreamSubscription<T> transformer(
|
| + Stream<S> stream, bool cancelOnError)) =
|
| + _StreamSubscriptionTransformer<S, T>;
|
|
|
| /**
|
| * Creates a [StreamTransformer] that delegates events to the given functions.
|
| @@ -1736,11 +1692,10 @@ abstract class StreamTransformer<S, T> {
|
| * sink.add(value); // Duplicate the incoming events.
|
| * }));
|
| */
|
| - factory StreamTransformer.fromHandlers({
|
| - void handleData(S data, EventSink<T> sink),
|
| + factory StreamTransformer.fromHandlers(
|
| + {void handleData(S data, EventSink<T> sink),
|
| void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
|
| - void handleDone(EventSink<T> sink)})
|
| - = _StreamHandlerTransformer<S, T>;
|
| + void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
|
|
|
| /**
|
| * Transform the incoming [stream]'s events.
|
| @@ -1765,12 +1720,12 @@ abstract class StreamTransformer<S, T> {
|
| * The stream may be paused between calls to [moveNext].
|
| */
|
| abstract class StreamIterator<T> {
|
| -
|
| /** Create a [StreamIterator] on [stream]. */
|
| factory StreamIterator(Stream<T> stream)
|
| // TODO(lrn): use redirecting factory constructor when type
|
| // arguments are supported.
|
| - => new _StreamIterator<T>(stream);
|
| + =>
|
| + new _StreamIterator<T>(stream);
|
|
|
| /**
|
| * Wait for the next stream value to be available.
|
| @@ -1822,7 +1777,6 @@ abstract class StreamIterator<T> {
|
| Future cancel();
|
| }
|
|
|
| -
|
| /**
|
| * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
|
| */
|
| @@ -1830,9 +1784,15 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
|
| EventSink _sink;
|
| _ControllerEventSinkWrapper(this._sink);
|
|
|
| - void add(T data) { _sink.add(data); }
|
| + void add(T data) {
|
| + _sink.add(data);
|
| + }
|
| +
|
| void addError(error, [StackTrace stackTrace]) {
|
| _sink.addError(error, stackTrace);
|
| }
|
| - void close() { _sink.close(); }
|
| +
|
| + void close() {
|
| + _sink.close();
|
| + }
|
| }
|
|
|