Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index b2a7a215b81caeee9e762136cb308cfe4f61d213..26343b69b35dee57f03ef60331b095f593b3825c 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -104,13 +104,12 @@ abstract class Stream<T> { |
// to wait for a listener before doing the `then` on the future. |
_StreamController<T> controller = new StreamController<T>(sync: true); |
future.then((value) { |
- controller._add(value); |
- controller._closeUnchecked(); |
- }, |
- onError: (error, stackTrace) { |
- controller._addError(error, stackTrace); |
- controller._closeUnchecked(); |
- }); |
+ controller._add(value); |
+ controller._closeUnchecked(); |
+ }, onError: (error, stackTrace) { |
+ controller._addError(error, stackTrace); |
+ controller._closeUnchecked(); |
+ }); |
return controller.stream; |
} |
@@ -181,7 +180,7 @@ abstract class Stream<T> { |
* If [computation] is omitted the event values will all be `null`. |
*/ |
factory Stream.periodic(Duration period, |
- [T computation(int computationCount)]) { |
+ [T computation(int computationCount)]) { |
Timer timer; |
int computationCount = 0; |
StreamController<T> controller; |
@@ -209,7 +208,8 @@ abstract class Stream<T> { |
}); |
} |
- controller = new StreamController<T>(sync: true, |
+ controller = new StreamController<T>( |
+ sync: true, |
onListen: () { |
watch.start(); |
startPeriodicTimer(); |
@@ -277,8 +277,8 @@ abstract class Stream<T> { |
* |
* The resulting stream is a broadcast stream if [source] is. |
*/ |
- factory Stream.eventTransformed(Stream source, |
- EventSink mapSink(EventSink<T> sink)) { |
+ factory Stream.eventTransformed( |
+ Stream source, EventSink mapSink(EventSink<T> sink)) { |
return new _BoundSinkStream(source, mapSink); |
} |
@@ -308,9 +308,9 @@ abstract class Stream<T> { |
* while having no subscribers to prevent losing events, or canceling the |
* subscription when there are no listeners. |
*/ |
- Stream<T> asBroadcastStream({ |
- void onListen(StreamSubscription<T> subscription), |
- void onCancel(StreamSubscription<T> subscription) }) { |
+ Stream<T> asBroadcastStream( |
+ {void onListen(StreamSubscription<T> subscription), |
+ void onCancel(StreamSubscription<T> subscription)}) { |
return new _AsBroadcastStream<T>(this, onListen, onCancel); |
} |
@@ -350,9 +350,7 @@ abstract class Stream<T> { |
* event handler functions are called. |
*/ |
StreamSubscription<T> listen(void onData(T event), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError}); |
+ {Function onError, void onDone(), bool cancelOnError}); |
/** |
* Creates a new stream from this stream that discards some data events. |
@@ -405,46 +403,46 @@ abstract class Stream<T> { |
void onListen() { |
final add = controller.add; |
assert(controller is _StreamController || |
- controller is _BroadcastStreamController); |
- final _EventSink<E> eventSink = |
- controller as Object /*=_EventSink<E>*/; |
+ controller is _BroadcastStreamController); |
+ final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
final addError = eventSink._addError; |
- subscription = this.listen( |
- (T event) { |
- dynamic newValue; |
- try { |
- newValue = convert(event); |
- } catch (e, s) { |
- controller.addError(e, s); |
- return; |
- } |
- if (newValue is Future) { |
- subscription.pause(); |
- newValue.then(add, onError: addError) |
- .whenComplete(subscription.resume); |
- } else { |
- controller.add(newValue as Object/*=E*/); |
- } |
- }, |
- onError: addError, |
- onDone: controller.close |
- ); |
+ subscription = this.listen((T event) { |
+ dynamic newValue; |
+ try { |
+ newValue = convert(event); |
+ } catch (e, s) { |
+ controller.addError(e, s); |
+ return; |
+ } |
+ if (newValue is Future) { |
+ subscription.pause(); |
+ newValue |
+ .then(add, onError: addError) |
+ .whenComplete(subscription.resume); |
+ } else { |
+ controller.add(newValue as Object/*=E*/); |
+ } |
+ }, onError: addError, onDone: controller.close); |
} |
if (this.isBroadcast) { |
controller = new StreamController<E>.broadcast( |
- onListen: onListen, |
- onCancel: () { subscription.cancel(); }, |
- sync: true |
- ); |
+ onListen: onListen, |
+ onCancel: () { |
+ subscription.cancel(); |
+ }, |
+ sync: true); |
} else { |
controller = new StreamController<E>( |
- onListen: onListen, |
- onPause: () { subscription.pause(); }, |
- onResume: () { subscription.resume(); }, |
- onCancel: () => subscription.cancel(), |
- sync: true |
- ); |
+ onListen: onListen, |
+ onPause: () { |
+ subscription.pause(); |
+ }, |
+ onResume: () { |
+ subscription.resume(); |
+ }, |
+ onCancel: () => subscription.cancel(), |
+ sync: true); |
} |
return controller.stream; |
} |
@@ -467,42 +465,43 @@ abstract class Stream<T> { |
StreamSubscription<T> subscription; |
void onListen() { |
assert(controller is _StreamController || |
- controller is _BroadcastStreamController); |
- final _EventSink<E> eventSink = |
- controller as Object /*=_EventSink<E>*/; |
- subscription = this.listen( |
- (T event) { |
- Stream<E> newStream; |
- try { |
- newStream = convert(event); |
- } catch (e, s) { |
- controller.addError(e, s); |
- return; |
- } |
- if (newStream != null) { |
- subscription.pause(); |
- controller.addStream(newStream) |
- .whenComplete(subscription.resume); |
- } |
- }, |
- onError: eventSink._addError, // Avoid Zone error replacement. |
- onDone: controller.close |
- ); |
+ controller is _BroadcastStreamController); |
+ final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
+ subscription = this.listen((T event) { |
+ Stream<E> newStream; |
+ try { |
+ newStream = convert(event); |
+ } catch (e, s) { |
+ controller.addError(e, s); |
+ return; |
+ } |
+ if (newStream != null) { |
+ subscription.pause(); |
+ controller.addStream(newStream).whenComplete(subscription.resume); |
+ } |
+ }, |
+ onError: eventSink._addError, // Avoid Zone error replacement. |
+ onDone: controller.close); |
} |
+ |
if (this.isBroadcast) { |
controller = new StreamController<E>.broadcast( |
- onListen: onListen, |
- onCancel: () { subscription.cancel(); }, |
- sync: true |
- ); |
+ onListen: onListen, |
+ onCancel: () { |
+ subscription.cancel(); |
+ }, |
+ sync: true); |
} else { |
controller = new StreamController<E>( |
- onListen: onListen, |
- onPause: () { subscription.pause(); }, |
- onResume: () { subscription.resume(); }, |
- onCancel: () => subscription.cancel(), |
- sync: true |
- ); |
+ onListen: onListen, |
+ onPause: () { |
+ subscription.pause(); |
+ }, |
+ onResume: () { |
+ subscription.resume(); |
+ }, |
+ onCancel: () => subscription.cancel(), |
+ sync: true); |
} |
return controller.stream; |
} |
@@ -535,7 +534,7 @@ abstract class Stream<T> { |
* If a broadcast stream is listened to more than once, each subscription |
* will individually perform the `test` and handle the error. |
*/ |
- Stream<T> handleError(Function onError, { bool test(error) }) { |
+ Stream<T> handleError(Function onError, {bool test(error)}) { |
return new _HandleErrorStream<T>(this, onError, test); |
} |
@@ -585,8 +584,7 @@ abstract class Stream<T> { |
* The `streamTransformer` can decide whether it wants to return a |
* broadcast stream or not. |
*/ |
- Stream<S> transform<S>( |
- StreamTransformer<T, S > streamTransformer) { |
+ Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { |
return streamTransformer.bind(this); |
} |
@@ -599,55 +597,46 @@ abstract class Stream<T> { |
T value; |
StreamSubscription subscription; |
subscription = this.listen( |
- (T element) { |
- if (seenFirst) { |
- _runUserCode(() => combine(value, element), |
- (T newValue) { value = newValue; }, |
- _cancelAndErrorClosure(subscription, result)); |
- } else { |
- value = element; |
- seenFirst = true; |
- } |
- }, |
- onError: result._completeError, |
- onDone: () { |
- if (!seenFirst) { |
- try { |
- throw IterableElementError.noElement(); |
- } catch (e, s) { |
- _completeWithErrorCallback(result, e, s); |
+ (T element) { |
+ if (seenFirst) { |
+ _runUserCode(() => combine(value, element), (T newValue) { |
+ value = newValue; |
+ }, _cancelAndErrorClosure(subscription, result)); |
+ } else { |
+ value = element; |
+ seenFirst = true; |
} |
- } else { |
- result._complete(value); |
- } |
- }, |
- cancelOnError: true |
- ); |
+ }, |
+ onError: result._completeError, |
+ onDone: () { |
+ if (!seenFirst) { |
+ try { |
+ throw IterableElementError.noElement(); |
+ } catch (e, s) { |
+ _completeWithErrorCallback(result, e, s); |
+ } |
+ } else { |
+ result._complete(value); |
+ } |
+ }, |
+ cancelOnError: true); |
return result; |
} |
/** Reduces a sequence of values by repeatedly applying [combine]. */ |
- Future<S> fold<S>(S initialValue, |
- S combine(S previous, T element)) { |
- |
+ Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { |
_Future<S> result = new _Future<S>(); |
S value = initialValue; |
StreamSubscription subscription; |
- subscription = this.listen( |
- (T element) { |
- _runUserCode( |
- () => combine(value, element), |
- (S newValue) { value = newValue; }, |
- _cancelAndErrorClosure(subscription, result) |
- ); |
- }, |
- onError: (e, st) { |
- result._completeError(e, st); |
- }, |
- onDone: () { |
- result._complete(value); |
- }, |
- cancelOnError: true); |
+ subscription = this.listen((T element) { |
+ _runUserCode(() => combine(value, element), (S newValue) { |
+ value = newValue; |
+ }, _cancelAndErrorClosure(subscription, result)); |
+ }, onError: (e, st) { |
+ result._completeError(e, st); |
+ }, onDone: () { |
+ result._complete(value); |
+ }, cancelOnError: true); |
return result; |
} |
@@ -666,25 +655,21 @@ abstract class Stream<T> { |
StringBuffer buffer = new StringBuffer(); |
StreamSubscription subscription; |
bool first = true; |
- subscription = this.listen( |
- (T element) { |
- if (!first) { |
- buffer.write(separator); |
- } |
- first = false; |
- try { |
- buffer.write(element); |
- } catch (e, s) { |
- _cancelAndErrorWithReplacement(subscription, result, e, s); |
- } |
- }, |
- onError: (e) { |
- result._completeError(e); |
- }, |
- onDone: () { |
- result._complete(buffer.toString()); |
- }, |
- cancelOnError: true); |
+ subscription = this.listen((T element) { |
+ if (!first) { |
+ buffer.write(separator); |
+ } |
+ first = false; |
+ try { |
+ buffer.write(element); |
+ } catch (e, s) { |
+ _cancelAndErrorWithReplacement(subscription, result, e, s); |
+ } |
+ }, onError: (e) { |
+ result._completeError(e); |
+ }, onDone: () { |
+ result._complete(buffer.toString()); |
+ }, cancelOnError: true); |
return result; |
} |
@@ -699,15 +684,11 @@ abstract class Stream<T> { |
StreamSubscription subscription; |
subscription = this.listen( |
(T element) { |
- _runUserCode( |
- () => (element == needle), |
- (bool isMatch) { |
- if (isMatch) { |
- _cancelAndValue(subscription, future, true); |
- } |
- }, |
- _cancelAndErrorClosure(subscription, future) |
- ); |
+ _runUserCode(() => (element == needle), (bool isMatch) { |
+ if (isMatch) { |
+ _cancelAndValue(subscription, future, true); |
+ } |
+ }, _cancelAndErrorClosure(subscription, future)); |
}, |
onError: future._completeError, |
onDone: () { |
@@ -729,11 +710,8 @@ abstract class Stream<T> { |
StreamSubscription subscription; |
subscription = this.listen( |
(T element) { |
- _runUserCode( |
- () => action(element), |
- (_) {}, |
- _cancelAndErrorClosure(subscription, future) |
- ); |
+ _runUserCode(() => action(element), (_) {}, |
+ _cancelAndErrorClosure(subscription, future)); |
}, |
onError: future._completeError, |
onDone: () { |
@@ -754,15 +732,11 @@ abstract class Stream<T> { |
StreamSubscription subscription; |
subscription = this.listen( |
(T element) { |
- _runUserCode( |
- () => test(element), |
- (bool isMatch) { |
- if (!isMatch) { |
- _cancelAndValue(subscription, future, false); |
- } |
- }, |
- _cancelAndErrorClosure(subscription, future) |
- ); |
+ _runUserCode(() => test(element), (bool isMatch) { |
+ if (!isMatch) { |
+ _cancelAndValue(subscription, future, false); |
+ } |
+ }, _cancelAndErrorClosure(subscription, future)); |
}, |
onError: future._completeError, |
onDone: () { |
@@ -791,15 +765,11 @@ abstract class Stream<T> { |
StreamSubscription subscription; |
subscription = this.listen( |
(T element) { |
- _runUserCode( |
- () => test(element), |
- (bool isMatch) { |
- if (isMatch) { |
- _cancelAndValue(subscription, future, true); |
- } |
- }, |
- _cancelAndErrorClosure(subscription, future) |
- ); |
+ _runUserCode(() => test(element), (bool isMatch) { |
+ if (isMatch) { |
+ _cancelAndValue(subscription, future, true); |
+ } |
+ }, _cancelAndErrorClosure(subscription, future)); |
}, |
onError: future._completeError, |
onDone: () { |
@@ -809,18 +779,19 @@ abstract class Stream<T> { |
return future; |
} |
- |
/** Counts the elements in the stream. */ |
Future<int> get length { |
_Future<int> future = new _Future<int>(); |
int count = 0; |
this.listen( |
- (_) { count++; }, |
- onError: future._completeError, |
- onDone: () { |
- future._complete(count); |
- }, |
- cancelOnError: true); |
+ (_) { |
+ count++; |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ future._complete(count); |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -837,14 +808,14 @@ abstract class Stream<T> { |
_Future<bool> future = new _Future<bool>(); |
StreamSubscription subscription; |
subscription = this.listen( |
- (_) { |
- _cancelAndValue(subscription, future, false); |
- }, |
- onError: future._completeError, |
- onDone: () { |
- future._complete(true); |
- }, |
- cancelOnError: true); |
+ (_) { |
+ _cancelAndValue(subscription, future, false); |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ future._complete(true); |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -853,14 +824,14 @@ abstract class Stream<T> { |
List<T> result = <T>[]; |
_Future<List<T>> future = new _Future<List<T>>(); |
this.listen( |
- (T data) { |
- result.add(data); |
- }, |
- onError: future._completeError, |
- onDone: () { |
- future._complete(result); |
- }, |
- cancelOnError: true); |
+ (T data) { |
+ result.add(data); |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ future._complete(result); |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -877,14 +848,14 @@ abstract class Stream<T> { |
Set<T> result = new Set<T>(); |
_Future<Set<T>> future = new _Future<Set<T>>(); |
this.listen( |
- (T data) { |
- result.add(data); |
- }, |
- onError: future._completeError, |
- onDone: () { |
- future._complete(result); |
- }, |
- cancelOnError: true); |
+ (T data) { |
+ result.add(data); |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ future._complete(result); |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -899,8 +870,8 @@ abstract class Stream<T> { |
* In case of a `done` event the future completes with the given |
* [futureValue]. |
*/ |
- Future<E> drain<E>([E futureValue]) |
- => listen(null, cancelOnError: true).asFuture<E>(futureValue); |
+ Future<E> drain<E>([E futureValue]) => listen(null, cancelOnError: true) |
+ .asFuture<E>(futureValue); |
/** |
* Provides at most the first [count] data events of this stream. |
@@ -1014,18 +985,18 @@ abstract class Stream<T> { |
_Future<T> future = new _Future<T>(); |
StreamSubscription subscription; |
subscription = this.listen( |
- (T value) { |
- _cancelAndValue(subscription, future, value); |
- }, |
- onError: future._completeError, |
- onDone: () { |
- try { |
- throw IterableElementError.noElement(); |
- } catch (e, s) { |
- _completeWithErrorCallback(future, e, s); |
- } |
- }, |
- cancelOnError: true); |
+ (T value) { |
+ _cancelAndValue(subscription, future, value); |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ try { |
+ throw IterableElementError.noElement(); |
+ } catch (e, s) { |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -1043,23 +1014,23 @@ abstract class Stream<T> { |
T result = null; |
bool foundResult = false; |
listen( |
- (T value) { |
- foundResult = true; |
- result = value; |
- }, |
- onError: future._completeError, |
- onDone: () { |
- if (foundResult) { |
- future._complete(result); |
- return; |
- } |
- try { |
- throw IterableElementError.noElement(); |
- } catch (e, s) { |
- _completeWithErrorCallback(future, e, s); |
- } |
- }, |
- cancelOnError: true); |
+ (T value) { |
+ foundResult = true; |
+ result = value; |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
+ } |
+ try { |
+ throw IterableElementError.noElement(); |
+ } catch (e, s) { |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -1077,32 +1048,32 @@ abstract class Stream<T> { |
bool foundResult = false; |
StreamSubscription subscription; |
subscription = this.listen( |
- (T value) { |
- if (foundResult) { |
- // This is the second element we get. |
+ (T value) { |
+ if (foundResult) { |
+ // This is the second element we get. |
+ try { |
+ throw IterableElementError.tooMany(); |
+ } catch (e, s) { |
+ _cancelAndErrorWithReplacement(subscription, future, e, s); |
+ } |
+ return; |
+ } |
+ foundResult = true; |
+ result = value; |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
+ } |
try { |
- throw IterableElementError.tooMany(); |
+ throw IterableElementError.noElement(); |
} catch (e, s) { |
- _cancelAndErrorWithReplacement(subscription, future, e, s); |
+ _completeWithErrorCallback(future, e, s); |
} |
- return; |
- } |
- foundResult = true; |
- result = value; |
- }, |
- onError: future._completeError, |
- onDone: () { |
- if (foundResult) { |
- future._complete(result); |
- return; |
- } |
- try { |
- throw IterableElementError.noElement(); |
- } catch (e, s) { |
- _completeWithErrorCallback(future, e, s); |
- } |
- }, |
- cancelOnError: true); |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -1131,30 +1102,26 @@ abstract class Stream<T> { |
_Future<dynamic> future = new _Future(); |
StreamSubscription subscription; |
subscription = this.listen( |
- (T value) { |
- _runUserCode( |
- () => test(value), |
- (bool isMatch) { |
+ (T value) { |
+ _runUserCode(() => test(value), (bool isMatch) { |
if (isMatch) { |
_cancelAndValue(subscription, future, value); |
} |
- }, |
- _cancelAndErrorClosure(subscription, future) |
- ); |
- }, |
- onError: future._completeError, |
- onDone: () { |
- if (defaultValue != null) { |
- _runUserCode(defaultValue, future._complete, future._completeError); |
- return; |
- } |
- try { |
- throw IterableElementError.noElement(); |
- } catch (e, s) { |
- _completeWithErrorCallback(future, e, s); |
- } |
- }, |
- cancelOnError: true); |
+ }, _cancelAndErrorClosure(subscription, future)); |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ if (defaultValue != null) { |
+ _runUserCode(defaultValue, future._complete, future._completeError); |
+ return; |
+ } |
+ try { |
+ throw IterableElementError.noElement(); |
+ } catch (e, s) { |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -1171,35 +1138,31 @@ abstract class Stream<T> { |
bool foundResult = false; |
StreamSubscription subscription; |
subscription = this.listen( |
- (T value) { |
- _runUserCode( |
- () => true == test(value), |
- (bool isMatch) { |
+ (T value) { |
+ _runUserCode(() => true == test(value), (bool isMatch) { |
if (isMatch) { |
foundResult = true; |
result = value; |
} |
- }, |
- _cancelAndErrorClosure(subscription, future) |
- ); |
- }, |
- onError: future._completeError, |
- onDone: () { |
- if (foundResult) { |
- future._complete(result); |
- return; |
- } |
- if (defaultValue != null) { |
- _runUserCode(defaultValue, future._complete, future._completeError); |
- return; |
- } |
- try { |
- throw IterableElementError.noElement(); |
- } catch (e, s) { |
- _completeWithErrorCallback(future, e, s); |
- } |
- }, |
- cancelOnError: true); |
+ }, _cancelAndErrorClosure(subscription, future)); |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
+ } |
+ if (defaultValue != null) { |
+ _runUserCode(defaultValue, future._complete, future._completeError); |
+ return; |
+ } |
+ try { |
+ throw IterableElementError.noElement(); |
+ } catch (e, s) { |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -1215,10 +1178,8 @@ abstract class Stream<T> { |
bool foundResult = false; |
StreamSubscription subscription; |
subscription = this.listen( |
- (T value) { |
- _runUserCode( |
- () => true == test(value), |
- (bool isMatch) { |
+ (T value) { |
+ _runUserCode(() => true == test(value), (bool isMatch) { |
if (isMatch) { |
if (foundResult) { |
try { |
@@ -1231,23 +1192,21 @@ abstract class Stream<T> { |
foundResult = true; |
result = value; |
} |
- }, |
- _cancelAndErrorClosure(subscription, future) |
- ); |
- }, |
- onError: future._completeError, |
- onDone: () { |
- if (foundResult) { |
- future._complete(result); |
- return; |
- } |
- try { |
- throw IterableElementError.noElement(); |
- } catch (e, s) { |
- _completeWithErrorCallback(future, e, s); |
- } |
- }, |
- cancelOnError: true); |
+ }, _cancelAndErrorClosure(subscription, future)); |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ if (foundResult) { |
+ future._complete(result); |
+ return; |
+ } |
+ try { |
+ throw IterableElementError.noElement(); |
+ } catch (e, s) { |
+ _completeWithErrorCallback(future, e, s); |
+ } |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -1273,19 +1232,19 @@ abstract class Stream<T> { |
StreamSubscription subscription; |
int elementIndex = 0; |
subscription = this.listen( |
- (T value) { |
- if (index == elementIndex) { |
- _cancelAndValue(subscription, future, value); |
- return; |
- } |
- elementIndex += 1; |
- }, |
- onError: future._completeError, |
- onDone: () { |
- future._completeError( |
- new RangeError.index(index, this, "index", null, elementIndex)); |
- }, |
- cancelOnError: true); |
+ (T value) { |
+ if (index == elementIndex) { |
+ _cancelAndValue(subscription, future, value); |
+ return; |
+ } |
+ elementIndex += 1; |
+ }, |
+ onError: future._completeError, |
+ onDone: () { |
+ future._completeError( |
+ new RangeError.index(index, this, "index", null, elementIndex)); |
+ }, |
+ cancelOnError: true); |
return future; |
} |
@@ -1324,18 +1283,21 @@ abstract class Stream<T> { |
controller.add(event); |
timer = zone.createTimer(timeLimit, timeout); |
} |
+ |
void onError(error, StackTrace stackTrace) { |
timer.cancel(); |
assert(controller is _StreamController || |
- controller is _BroadcastStreamController); |
+ controller is _BroadcastStreamController); |
dynamic eventSink = controller; |
- eventSink._addError(error, stackTrace); // Avoid Zone error replacement. |
+ eventSink._addError(error, stackTrace); // Avoid Zone error replacement. |
timer = zone.createTimer(timeLimit, timeout); |
} |
+ |
void onDone() { |
timer.cancel(); |
controller.close(); |
} |
+ |
void onListen() { |
// This is the onListen callback for of controller. |
// It runs in the same zone that the subscription was created in. |
@@ -1344,8 +1306,8 @@ abstract class Stream<T> { |
zone = Zone.current; |
if (onTimeout == null) { |
timeout = () { |
- controller.addError(new TimeoutException("No stream event", |
- timeLimit), null); |
+ controller.addError( |
+ new TimeoutException("No stream event", timeLimit), null); |
}; |
} else { |
// TODO(floitsch): the return type should be 'void', and the type |
@@ -1355,7 +1317,7 @@ abstract class Stream<T> { |
_ControllerEventSinkWrapper wrapper = |
new _ControllerEventSinkWrapper(null); |
timeout = () { |
- wrapper._sink = controller; // Only valid during call. |
+ wrapper._sink = controller; // Only valid during call. |
zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
wrapper._sink = null; |
}; |
@@ -1364,26 +1326,24 @@ abstract class Stream<T> { |
subscription = this.listen(onData, onError: onError, onDone: onDone); |
timer = zone.createTimer(timeLimit, timeout); |
} |
+ |
Future onCancel() { |
timer.cancel(); |
Future result = subscription.cancel(); |
subscription = null; |
return result; |
} |
+ |
controller = isBroadcast |
? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
- : new _SyncStreamController<T>( |
- onListen, |
- () { |
- // Don't null the timer, onCancel may call cancel again. |
- timer.cancel(); |
- subscription.pause(); |
- }, |
- () { |
- subscription.resume(); |
- timer = zone.createTimer(timeLimit, timeout); |
- }, |
- onCancel); |
+ : new _SyncStreamController<T>(onListen, () { |
+ // Don't null the timer, onCancel may call cancel again. |
+ timer.cancel(); |
+ subscription.pause(); |
+ }, () { |
+ subscription.resume(); |
+ timer = zone.createTimer(timeLimit, timeout); |
+ }, onCancel); |
return controller.stream; |
} |
} |
@@ -1501,7 +1461,6 @@ abstract class StreamSubscription<T> { |
Future<E> asFuture<E>([E futureValue]); |
} |
- |
/** |
* An interface that abstracts creation or handling of [Stream] events. |
*/ |
@@ -1516,30 +1475,28 @@ abstract class EventSink<T> implements Sink<T> { |
void close(); |
} |
- |
/** [Stream] wrapper that only exposes the [Stream] interface. */ |
class StreamView<T> extends Stream<T> { |
final Stream<T> _stream; |
- const StreamView(Stream<T> stream) : _stream = stream, super._internal(); |
+ const StreamView(Stream<T> stream) |
+ : _stream = stream, |
+ super._internal(); |
bool get isBroadcast => _stream.isBroadcast; |
Stream<T> asBroadcastStream( |
- {void onListen(StreamSubscription<T> subscription), |
- void onCancel(StreamSubscription<T> subscription)}) |
- => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
+ {void onListen(StreamSubscription<T> subscription), |
+ void onCancel(StreamSubscription<T> subscription)}) => |
+ _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
StreamSubscription<T> listen(void onData(T value), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError }) { |
- return _stream.listen(onData, onError: onError, onDone: onDone, |
- cancelOnError: cancelOnError); |
+ {Function onError, void onDone(), bool cancelOnError}) { |
+ return _stream.listen(onData, |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
} |
} |
- |
/** |
* Abstract interface for a "sink" accepting multiple entire streams. |
* |
@@ -1589,7 +1546,6 @@ abstract class StreamConsumer<S> { |
Future close(); |
} |
- |
/** |
* A object that accepts stream events both synchronously and asynchronously. |
* |
@@ -1648,7 +1604,6 @@ abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { |
Future get done; |
} |
- |
/** |
* The target of a [Stream.transform] call. |
* |
@@ -1722,8 +1677,9 @@ abstract class StreamTransformer<S, T> { |
* }); |
*/ |
const factory StreamTransformer( |
- StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
- = _StreamSubscriptionTransformer<S, T>; |
+ StreamSubscription<T> transformer( |
+ Stream<S> stream, bool cancelOnError)) = |
+ _StreamSubscriptionTransformer<S, T>; |
/** |
* Creates a [StreamTransformer] that delegates events to the given functions. |
@@ -1736,11 +1692,10 @@ abstract class StreamTransformer<S, T> { |
* sink.add(value); // Duplicate the incoming events. |
* })); |
*/ |
- factory StreamTransformer.fromHandlers({ |
- void handleData(S data, EventSink<T> sink), |
+ factory StreamTransformer.fromHandlers( |
+ {void handleData(S data, EventSink<T> sink), |
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
- void handleDone(EventSink<T> sink)}) |
- = _StreamHandlerTransformer<S, T>; |
+ void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
/** |
* Transform the incoming [stream]'s events. |
@@ -1765,12 +1720,12 @@ abstract class StreamTransformer<S, T> { |
* The stream may be paused between calls to [moveNext]. |
*/ |
abstract class StreamIterator<T> { |
- |
/** Create a [StreamIterator] on [stream]. */ |
factory StreamIterator(Stream<T> stream) |
// TODO(lrn): use redirecting factory constructor when type |
// arguments are supported. |
- => new _StreamIterator<T>(stream); |
+ => |
+ new _StreamIterator<T>(stream); |
/** |
* Wait for the next stream value to be available. |
@@ -1822,7 +1777,6 @@ abstract class StreamIterator<T> { |
Future cancel(); |
} |
- |
/** |
* Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
*/ |
@@ -1830,9 +1784,15 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
EventSink _sink; |
_ControllerEventSinkWrapper(this._sink); |
- void add(T data) { _sink.add(data); } |
+ void add(T data) { |
+ _sink.add(data); |
+ } |
+ |
void addError(error, [StackTrace stackTrace]) { |
_sink.addError(error, stackTrace); |
} |
- void close() { _sink.close(); } |
+ |
+ void close() { |
+ _sink.close(); |
+ } |
} |