Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Unified Diff: sdk/lib/async/stream.dart

Issue 11886013: Make Stream transformation respect the single/multi subscriber nature of the source. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Added missing isSingleSubscription impl. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | sdk/lib/async/stream_impl.dart » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index e7dedc2d27d14fc06d24f9fc84efd569c9319fd5..f3cab7b8ed6ae6cb29f78a5d8996459f7fb14131 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -31,6 +31,9 @@ abstract class Stream<T> {
return new _IterableSingleStreamImpl<T>(data);
}
+ /** Whether the stream is a single-subscription stream. */
+ bool get isSingleSubscription;
floitsch 2013/01/14 16:09:09 different CL, but sometimes we have singleSubscrib
+
/**
* Returns a multi-subscription stream that produces the same events as this.
*
@@ -96,7 +99,7 @@ abstract class Stream<T> {
* but it only sends the data events that satisfy the [test].
*/
Stream<T> where(bool test(T event)) {
- return this.transform(new WhereStream<T>(test));
+ return this.transform(new WhereTransformer<T>(test));
}
/**
@@ -104,7 +107,7 @@ abstract class Stream<T> {
* to a new value using the [convert] function.
*/
Stream mappedBy(convert(T event)) {
- return this.transform(new MapStream<T, dynamic>(convert));
+ return this.transform(new MapTransformer<T, dynamic>(convert));
}
/**
@@ -121,7 +124,7 @@ abstract class Stream<T> {
* or simply return to make the stream forget the error.
floitsch 2013/01/14 16:09:09 different CL: "if the error needs to be transforme
Lasse Reichstein Nielsen 2013/01/15 07:19:51 Done.
*/
Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) {
- return this.transform(new HandleErrorStream<T>(handle, test));
+ return this.transform(new HandleErrorTransformer<T>(handle, test));
}
/**
@@ -133,7 +136,8 @@ abstract class Stream<T> {
* in order.
*/
Stream expand(Iterable convert(T value)) {
- return this.transform(new ExpandStream<T, dynamic>(convert));
+ return this.transform(
+ new ExpandTransformer<T, dynamic>(convert));
}
/**
@@ -435,7 +439,7 @@ abstract class Stream<T> {
* so will the returned stream.
*/
Stream<T> take(int count) {
- return this.transform(new TakeStream(count));
+ return this.transform(new TakeTransformer<T>(count));
}
/**
@@ -447,14 +451,14 @@ abstract class Stream<T> {
* a value that [test] doesn't accept.
*/
Stream<T> takeWhile(bool test(T value)) {
- return this.transform(new TakeWhileStream(test));
+ return this.transform(new TakeWhileTransformer<T>(test));
}
/**
* Skips the first [count] data events from this stream.
*/
Stream<T> skip(int count) {
- return this.transform(new SkipStream(count));
+ return this.transform(new SkipTransformer<T>(count));
}
/**
@@ -466,7 +470,7 @@ abstract class Stream<T> {
* event data, the returned stream will have the same events as this stream.
*/
Stream<T> skipWhile(bool test(T value)) {
- return this.transform(new SkipWhileStream(test));
+ return this.transform(new SkipWhileTransformer<T>(test));
}
/**
@@ -479,7 +483,7 @@ abstract class Stream<T> {
* omitted, the '==' operator on the last provided data element is used.
*/
Stream<T> distinct([bool equals(T previous, T next)]) {
- return this.transform(new DistinctStream(equals));
+ return this.transform(new DistinctTransformer<T>(equals));
}
/**
@@ -489,7 +493,7 @@ abstract class Stream<T> {
* equivalent to [:this.elementAt(0):]
*/
Future<T> get first {
- _FutureImpl<T> future = new _FutureImpl();
+ _FutureImpl<T> future = new _FutureImpl<T>();
StreamSubscription subscription;
subscription = this.listen(
(T value) {
@@ -704,7 +708,7 @@ abstract class Stream<T> {
*/
Future<T> elementAt(int index) {
if (index is! int || index < 0) throw new ArgumentError(index);
- _FutureImpl<T> future = new _FutureImpl();
+ _FutureImpl<T> future = new _FutureImpl<T>();
StreamSubscription subscription;
subscription = this.listen(
(T value) {
@@ -786,6 +790,8 @@ class StreamView<T> extends Stream<T> {
StreamView(this._stream);
+ bool get isSingleSubscription => _stream.isSingleSubscription;
+
StreamSubscription<T> listen(void onData(T value),
{ void onError(AsyncError error),
void onDone(),
@@ -838,7 +844,9 @@ abstract class StreamTransformer<S, T> {
factory StreamTransformer.from({
void onData(S data, StreamSink<T> sink),
void onError(AsyncError error, StreamSink<T> sink),
- void onDone(StreamSink<T> sink)}) = _StreamTransformerFunctionWrapper;
+ void onDone(StreamSink<T> sink)}) {
+ return new _StreamTransformerImpl<S, T>(onData, onError, onDone);
+ }
Stream<T> bind(Stream<S> stream);
}
« no previous file with comments | « no previous file | sdk/lib/async/stream_controller.dart » ('j') | sdk/lib/async/stream_impl.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698