Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index e7dedc2d27d14fc06d24f9fc84efd569c9319fd5..f3cab7b8ed6ae6cb29f78a5d8996459f7fb14131 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -31,6 +31,9 @@ abstract class Stream<T> { |
return new _IterableSingleStreamImpl<T>(data); |
} |
+ /** Whether the stream is a single-subscription stream. */ |
+ bool get isSingleSubscription; |
floitsch
2013/01/14 16:09:09
different CL, but sometimes we have singleSubscrib
|
+ |
/** |
* Returns a multi-subscription stream that produces the same events as this. |
* |
@@ -96,7 +99,7 @@ abstract class Stream<T> { |
* but it only sends the data events that satisfy the [test]. |
*/ |
Stream<T> where(bool test(T event)) { |
- return this.transform(new WhereStream<T>(test)); |
+ return this.transform(new WhereTransformer<T>(test)); |
} |
/** |
@@ -104,7 +107,7 @@ abstract class Stream<T> { |
* to a new value using the [convert] function. |
*/ |
Stream mappedBy(convert(T event)) { |
- return this.transform(new MapStream<T, dynamic>(convert)); |
+ return this.transform(new MapTransformer<T, dynamic>(convert)); |
} |
/** |
@@ -121,7 +124,7 @@ abstract class Stream<T> { |
* or simply return to make the stream forget the error. |
floitsch
2013/01/14 16:09:09
different CL: "if the error needs to be transforme
Lasse Reichstein Nielsen
2013/01/15 07:19:51
Done.
|
*/ |
Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { |
- return this.transform(new HandleErrorStream<T>(handle, test)); |
+ return this.transform(new HandleErrorTransformer<T>(handle, test)); |
} |
/** |
@@ -133,7 +136,8 @@ abstract class Stream<T> { |
* in order. |
*/ |
Stream expand(Iterable convert(T value)) { |
- return this.transform(new ExpandStream<T, dynamic>(convert)); |
+ return this.transform( |
+ new ExpandTransformer<T, dynamic>(convert)); |
} |
/** |
@@ -435,7 +439,7 @@ abstract class Stream<T> { |
* so will the returned stream. |
*/ |
Stream<T> take(int count) { |
- return this.transform(new TakeStream(count)); |
+ return this.transform(new TakeTransformer<T>(count)); |
} |
/** |
@@ -447,14 +451,14 @@ abstract class Stream<T> { |
* a value that [test] doesn't accept. |
*/ |
Stream<T> takeWhile(bool test(T value)) { |
- return this.transform(new TakeWhileStream(test)); |
+ return this.transform(new TakeWhileTransformer<T>(test)); |
} |
/** |
* Skips the first [count] data events from this stream. |
*/ |
Stream<T> skip(int count) { |
- return this.transform(new SkipStream(count)); |
+ return this.transform(new SkipTransformer<T>(count)); |
} |
/** |
@@ -466,7 +470,7 @@ abstract class Stream<T> { |
* event data, the returned stream will have the same events as this stream. |
*/ |
Stream<T> skipWhile(bool test(T value)) { |
- return this.transform(new SkipWhileStream(test)); |
+ return this.transform(new SkipWhileTransformer<T>(test)); |
} |
/** |
@@ -479,7 +483,7 @@ abstract class Stream<T> { |
* omitted, the '==' operator on the last provided data element is used. |
*/ |
Stream<T> distinct([bool equals(T previous, T next)]) { |
- return this.transform(new DistinctStream(equals)); |
+ return this.transform(new DistinctTransformer<T>(equals)); |
} |
/** |
@@ -489,7 +493,7 @@ abstract class Stream<T> { |
* equivalent to [:this.elementAt(0):] |
*/ |
Future<T> get first { |
- _FutureImpl<T> future = new _FutureImpl(); |
+ _FutureImpl<T> future = new _FutureImpl<T>(); |
StreamSubscription subscription; |
subscription = this.listen( |
(T value) { |
@@ -704,7 +708,7 @@ abstract class Stream<T> { |
*/ |
Future<T> elementAt(int index) { |
if (index is! int || index < 0) throw new ArgumentError(index); |
- _FutureImpl<T> future = new _FutureImpl(); |
+ _FutureImpl<T> future = new _FutureImpl<T>(); |
StreamSubscription subscription; |
subscription = this.listen( |
(T value) { |
@@ -786,6 +790,8 @@ class StreamView<T> extends Stream<T> { |
StreamView(this._stream); |
+ bool get isSingleSubscription => _stream.isSingleSubscription; |
+ |
StreamSubscription<T> listen(void onData(T value), |
{ void onError(AsyncError error), |
void onDone(), |
@@ -838,7 +844,9 @@ abstract class StreamTransformer<S, T> { |
factory StreamTransformer.from({ |
void onData(S data, StreamSink<T> sink), |
void onError(AsyncError error, StreamSink<T> sink), |
- void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper; |
+ void onDone(StreamSink<T> sink)}) { |
+ return new _StreamTransformerImpl<S, T>(onData, onError, onDone); |
+ } |
Stream<T> bind(Stream<S> stream); |
} |