Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index e7dedc2d27d14fc06d24f9fc84efd569c9319fd5..f3cab7b8ed6ae6cb29f78a5d8996459f7fb14131 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -31,6 +31,9 @@ abstract class Stream<T> { |
| return new _IterableSingleStreamImpl<T>(data); |
| } |
| + /** Whether the stream is a single-subscription stream. */ |
| + bool get isSingleSubscription; |
|
floitsch
2013/01/14 16:09:09
different CL, but sometimes we have singleSubscrib
|
| + |
| /** |
| * Returns a multi-subscription stream that produces the same events as this. |
| * |
| @@ -96,7 +99,7 @@ abstract class Stream<T> { |
| * but it only sends the data events that satisfy the [test]. |
| */ |
| Stream<T> where(bool test(T event)) { |
| - return this.transform(new WhereStream<T>(test)); |
| + return this.transform(new WhereTransformer<T>(test)); |
| } |
| /** |
| @@ -104,7 +107,7 @@ abstract class Stream<T> { |
| * to a new value using the [convert] function. |
| */ |
| Stream mappedBy(convert(T event)) { |
| - return this.transform(new MapStream<T, dynamic>(convert)); |
| + return this.transform(new MapTransformer<T, dynamic>(convert)); |
| } |
| /** |
| @@ -121,7 +124,7 @@ abstract class Stream<T> { |
| * or simply return to make the stream forget the error. |
|
floitsch
2013/01/14 16:09:09
different CL: "if the error needs to be transforme
Lasse Reichstein Nielsen
2013/01/15 07:19:51
Done.
|
| */ |
| Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { |
| - return this.transform(new HandleErrorStream<T>(handle, test)); |
| + return this.transform(new HandleErrorTransformer<T>(handle, test)); |
| } |
| /** |
| @@ -133,7 +136,8 @@ abstract class Stream<T> { |
| * in order. |
| */ |
| Stream expand(Iterable convert(T value)) { |
| - return this.transform(new ExpandStream<T, dynamic>(convert)); |
| + return this.transform( |
| + new ExpandTransformer<T, dynamic>(convert)); |
| } |
| /** |
| @@ -435,7 +439,7 @@ abstract class Stream<T> { |
| * so will the returned stream. |
| */ |
| Stream<T> take(int count) { |
| - return this.transform(new TakeStream(count)); |
| + return this.transform(new TakeTransformer<T>(count)); |
| } |
| /** |
| @@ -447,14 +451,14 @@ abstract class Stream<T> { |
| * a value that [test] doesn't accept. |
| */ |
| Stream<T> takeWhile(bool test(T value)) { |
| - return this.transform(new TakeWhileStream(test)); |
| + return this.transform(new TakeWhileTransformer<T>(test)); |
| } |
| /** |
| * Skips the first [count] data events from this stream. |
| */ |
| Stream<T> skip(int count) { |
| - return this.transform(new SkipStream(count)); |
| + return this.transform(new SkipTransformer<T>(count)); |
| } |
| /** |
| @@ -466,7 +470,7 @@ abstract class Stream<T> { |
| * event data, the returned stream will have the same events as this stream. |
| */ |
| Stream<T> skipWhile(bool test(T value)) { |
| - return this.transform(new SkipWhileStream(test)); |
| + return this.transform(new SkipWhileTransformer<T>(test)); |
| } |
| /** |
| @@ -479,7 +483,7 @@ abstract class Stream<T> { |
| * omitted, the '==' operator on the last provided data element is used. |
| */ |
| Stream<T> distinct([bool equals(T previous, T next)]) { |
| - return this.transform(new DistinctStream(equals)); |
| + return this.transform(new DistinctTransformer<T>(equals)); |
| } |
| /** |
| @@ -489,7 +493,7 @@ abstract class Stream<T> { |
| * equivalent to [:this.elementAt(0):] |
| */ |
| Future<T> get first { |
| - _FutureImpl<T> future = new _FutureImpl(); |
| + _FutureImpl<T> future = new _FutureImpl<T>(); |
| StreamSubscription subscription; |
| subscription = this.listen( |
| (T value) { |
| @@ -704,7 +708,7 @@ abstract class Stream<T> { |
| */ |
| Future<T> elementAt(int index) { |
| if (index is! int || index < 0) throw new ArgumentError(index); |
| - _FutureImpl<T> future = new _FutureImpl(); |
| + _FutureImpl<T> future = new _FutureImpl<T>(); |
| StreamSubscription subscription; |
| subscription = this.listen( |
| (T value) { |
| @@ -786,6 +790,8 @@ class StreamView<T> extends Stream<T> { |
| StreamView(this._stream); |
| + bool get isSingleSubscription => _stream.isSingleSubscription; |
| + |
| StreamSubscription<T> listen(void onData(T value), |
| { void onError(AsyncError error), |
| void onDone(), |
| @@ -838,7 +844,9 @@ abstract class StreamTransformer<S, T> { |
| factory StreamTransformer.from({ |
| void onData(S data, StreamSink<T> sink), |
| void onError(AsyncError error, StreamSink<T> sink), |
| - void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper; |
| + void onDone(StreamSink<T> sink)}) { |
| + return new _StreamTransformerImpl<S, T>(onData, onError, onDone); |
| + } |
| Stream<T> bind(Stream<S> stream); |
| } |