Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(332)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 11886013: Make Stream transformation respect the single/multi subscriber nature of the source. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Added missing isSingleSubscription impl. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« sdk/lib/async/stream_impl.dart ('K') | « sdk/lib/async/stream_impl.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
index 07b0c19cb7379db1c6b19d39a9a8418b01d5f9fc..0b4b6ab2cf4d7b1a3ebc90a04bb7258788ad917a 100644
--- a/sdk/lib/async/stream_pipe.dart
+++ b/sdk/lib/async/stream_pipe.dart
@@ -38,11 +38,11 @@ _cancelAndError(StreamSubscription subscription, _FutureImpl future) =>
/**
- * A wrapper around a stream that allows independent subscribers.
+ * A [StreamTransformer] that forwards events and subscriptions.
*
- * By default [this] subscribes to [_source] and forwards all events to its own
- * subscribers. It does not subscribe until there is a subscriber, and
- * unsubscribes again when there are no subscribers left.
+ * By default this transformer subscribes to [_source] and forwards all events
+ * to [_stream]. It does not subscribe to [_source] until there is a subscriber,
+ * on [_stream] and unsubscribes again when there are no subscribers left.
*
* The events are passed through the [_handleData], [_handleError] and
* [_handleDone] methods. Subclasses are supposed to add handling of some of
@@ -50,83 +50,112 @@ _cancelAndError(StreamSubscription subscription, _FutureImpl future) =>
*
* This class is intended for internal use only.
*/
-class _ForwardingMultiStream<S, T> extends _MultiStreamImpl<T> {
- Stream<S> _source = null;
- StreamSubscription _subscription = null;
-
- void _subscribeToSource() {
- _subscription = _source.listen(this._handleData,
- onError: this._handleError,
- onDone: this._handleDone);
- if (_isPaused) {
- _subscription.pause();
+/**
+ *
+ * Handles backwards propagation of subscription and pause.
+ */
+class _ForwardingStreamTransformer<S, T> implements StreamTransformer<S, T> {
+ Stream<T> _stream;
+ Stream<S> _source;
+ StreamSubscription<S> _subscription;
+
+ Stream<T> _createOutputStream() {
+ if (_source.isSingleSubscription) {
+ return new _ForwardingSingleStream<T>(this);
}
+ return new _ForwardingMultiStream<T>(this);
}
- /**
- * Subscribe or unsubscribe on [source] depending on whether
- * [stream] has subscribers.
- */
- void _onSubscriptionStateChange() {
- if (_hasSubscribers) {
- assert(_subscription == null);
- if (_source != null) {
- _subscribeToSource();
+ Stream<T> bind(Stream<S> source) {
+ if (_source != null) {
+ throw new StateError("Transformer source already bound");
+ }
+ _source = source;
+ _stream = _createOutputStream();
+ return _stream;
+ }
+
+ void _onPauseStateChange(bool isPaused) {
+ if (isPaused) {
+ if (_subscription != null) {
+ _subscription.pause();
}
} else {
if (_subscription != null) {
- _subscription.cancel();
- _subscription = null;
+ _subscription.resume();
}
}
}
- void _onPauseStateChange() {
- if (_subscription == null) return;
- if (isPaused) {
- _subscription.pause();
+ /**
+ * Subscribe or unsubscribe on [_source] depending on whether
+ * [_stream] has subscribers.
+ */
+ void _onSubscriptionStateChange(bool hasSubscribers) {
+ if (hasSubscribers) {
+ assert(_subscription == null);
+ _subscription = _source.listen(this._handleData,
+ onError: this._handleError,
+ onDone: this._handleDone);
} else {
- _subscription.resume();
+ // TODO(lrn): Check why this can happen.
+ if (_subscription == null) return;
+ _subscription.cancel();
+ _subscription = null;
}
}
void _handleData(S inputEvent) {
var outputEvent = inputEvent;
- _add(outputEvent);
+ _stream._add(outputEvent);
}
void _handleError(AsyncError error) {
- _signalError(error);
+ _stream._signalError(error);
}
void _handleDone() {
- _close();
+ _stream._close();
}
}
+class _ForwardingMultiStream<T> extends _MultiStreamImpl<T> {
+ _ForwardingStreamTransformer _transformer;
+ _ForwardingMultiStream(this._transformer);
-abstract class _ForwardingTransformer<S, T> extends _ForwardingMultiStream<S, T>
- implements StreamTransformer<S, T> {
- Stream<T> bind(Stream<S> source) {
- if (_source != null) throw new StateError("Already bound to source.");
- _source = source;
- if (_hasSubscribers) {
- _subscribeToSource();
- }
- return this;
+ _onSubscriptionStateChange() {
+ _transformer._onSubscriptionStateChange(_hasSubscribers);
+ }
+
+ _onPauseStateChange() {
+ _transformer._onPauseStateChange(_isPaused);
}
}
+class _ForwardingSingleStream<T> extends _SingleStreamImpl<T> {
+ _ForwardingStreamTransformer _transformer;
+ _ForwardingSingleStream(this._transformer);
+
+ _onSubscriptionStateChange() {
+ _transformer._onSubscriptionStateChange(_hasSubscribers);
+ }
+
+ _onPauseStateChange() {
+ _transformer._onPauseStateChange(_isPaused);
+ }
+}
+
+
// -------------------------------------------------------------------
// Stream transformers used by the default Stream implementation.
// -------------------------------------------------------------------
typedef bool _Predicate<T>(T value);
-class WhereStream<T> extends _ForwardingTransformer<T, T> {
+class WhereTransformer<T> extends _ForwardingStreamTransformer<T, T> {
final _Predicate<T> _test;
- WhereStream(bool test(T value))
+ WhereTransformer(bool test(T value))
: this._test = test;
void _handleData(T inputEvent) {
@@ -134,11 +163,11 @@ class WhereStream<T> extends _ForwardingTransformer<T, T> {
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
return;
}
if (satisfies) {
- _add(inputEvent);
+ _stream._add(inputEvent);
}
}
}
@@ -149,10 +178,10 @@ typedef T _Transformation<S, T>(S value);
/**
* A stream pipe that converts data events before passing them on.
*/
-class MapStream<S, T> extends _ForwardingTransformer<S, T> {
+class MapTransformer<S, T> extends _ForwardingStreamTransformer<S, T> {
final _Transformation _transform;
- MapStream(T transform(S event))
+ MapTransformer(T transform(S event))
: this._transform = transform;
void _handleData(S inputEvent) {
@@ -160,31 +189,31 @@ class MapStream<S, T> extends _ForwardingTransformer<S, T> {
try {
outputEvent = _transform(inputEvent);
} catch (e, s) {
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
return;
}
- _add(outputEvent);
+ _stream._add(outputEvent);
}
}
/**
* A stream pipe that converts data events before passing them on.
*/
-class ExpandStream<S, T> extends _ForwardingTransformer<S, T> {
+class ExpandTransformer<S, T> extends _ForwardingStreamTransformer<S, T> {
final _Transformation<S, Iterable<T>> _expand;
- ExpandStream(Iterable<T> expand(S event))
+ ExpandTransformer(Iterable<T> expand(S event))
: this._expand = expand;
void _handleData(S inputEvent) {
try {
for (T value in _expand(inputEvent)) {
- _add(value);
+ _stream._add(value);
}
} catch (e, s) {
// If either _expand or iterating the generated iterator throws,
// we abort the iteration.
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
}
}
}
@@ -197,11 +226,11 @@ typedef bool _ErrorTest(error);
* A stream pipe that converts or disposes error events
* before passing them on.
*/
-class HandleErrorStream<T> extends _ForwardingTransformer<T, T> {
+class HandleErrorTransformer<T> extends _ForwardingStreamTransformer<T, T> {
final _ErrorTransformation _transform;
final _ErrorTest _test;
- HandleErrorStream(void transform(AsyncError event), bool test(error))
+ HandleErrorTransformer(void transform(AsyncError event), bool test(error))
: this._transform = transform, this._test = test;
void _handleError(AsyncError error) {
@@ -210,7 +239,7 @@ class HandleErrorStream<T> extends _ForwardingTransformer<T, T> {
try {
matches = _test(error.error);
} catch (e, s) {
- _signalError(_asyncError(e, s, error));
+ _stream._signalError(_asyncError(e, s, error));
return;
}
}
@@ -218,11 +247,11 @@ class HandleErrorStream<T> extends _ForwardingTransformer<T, T> {
try {
_transform(error);
} catch (e, s) {
- _signalError(_asyncError(e, s, error));
+ _stream._signalError(_asyncError(e, s, error));
return;
}
} else {
- _signalError(error);
+ _stream._signalError(error);
}
}
}
@@ -233,35 +262,39 @@ typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
/**
- * A stream pipe that intercepts all events and can generate any event as
+ * A stream transfomer that intercepts all events and can generate any event as
* output.
*
- * Each incoming event on this [StreamSink] is passed to the corresponding
- * provided event handler, along with a [StreamSink] linked to the [output] of
- * this pipe.
- * The handler can then decide which events to send to the output
+ * Each incoming event on the source stream is passed to the corresponding
+ * provided event handler, along with a [StreamSink] linked to the output
+ * Stream.
+ * The handler can then decide exactly which events to send to the output.
*/
-class PipeStream<S, T> extends _ForwardingTransformer<S, T> {
+class _StreamTransformerImpl<S, T> extends _ForwardingStreamTransformer<S, T> {
final _TransformDataHandler<S, T> _onData;
final _TransformErrorHandler<T> _onError;
final _TransformDoneHandler<T> _onDone;
StreamSink<T> _sink;
- PipeStream({void onData(S data, StreamSink<T> sink),
- void onError(AsyncError data, StreamSink<T> sink),
- void onDone(StreamSink<T> sink)})
+ _StreamTransformerImpl(void onData(S data, StreamSink<T> sink),
+ void onError(AsyncError data, StreamSink<T> sink),
+ void onDone(StreamSink<T> sink))
: this._onData = (onData == null ? _defaultHandleData : onData),
this._onError = (onError == null ? _defaultHandleError : onError),
- this._onDone = (onDone == null ? _defaultHandleDone : onDone) {
- // Cache the sink wrapper to avoid creating a new one for each event.
- this._sink = new _StreamImplSink(this);
+ this._onDone = (onDone == null ? _defaultHandleDone : onDone);
+
+ Stream<T> bind(Stream<S> source) {
+ Stream<T> stream = super.bind(source);
+ // Cache a Sink object to avoid creating a new one for each event.
+ _sink = new _StreamImplSink(stream);
+ return stream;
}
void _handleData(S data) {
try {
- return _onData(data, _sink);
+ _onData(data, _sink);
} catch (e, s) {
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
}
}
@@ -269,7 +302,7 @@ class PipeStream<S, T> extends _ForwardingTransformer<S, T> {
try {
_onError(error, _sink);
} catch (e, s) {
- _signalError(_asyncError(e, s, error));
+ _stream._signalError(_asyncError(e, s, error));
}
}
@@ -277,12 +310,12 @@ class PipeStream<S, T> extends _ForwardingTransformer<S, T> {
try {
_onDone(_sink);
} catch (e, s) {
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
}
}
/** Default data handler forwards all data. */
- static void _defaultHandleData(dynamic data, StreamSink sink) {
+ static void _defaultHandleData(var data, StreamSink sink) {
sink.add(data);
}
/** Default error handler forwards all errors. */
@@ -304,85 +337,11 @@ class _StreamImplSink<T> implements StreamSink<T> {
void close() { _target._close(); }
}
-/**
- * A stream pipe that intercepts all events and can generate any event as
- * output.
- *
- * Each incoming event on this [StreamSink] is passed to the corresponding
- * method on [transform], along with a [StreamSink] linked to the [output] of
- * this pipe.
- * The handler can then decide which events to send to the output
- */
-class TransformStream<S, T> extends _ForwardingTransformer<S, T> {
- final StreamTransformer<S, T> _transform;
- StreamSink<T> _sink;
- TransformStream(StreamTransformer<S, T> transform)
- : this._transform = transform {
- // Cache the sink wrapper to avoid creating a new one for each event.
- this._sink = new _StreamImplSink(this);
- }
-
- void _handleData(S data) {
- try {
- return _transform.handleData(data, _sink);
- } catch (e, s) {
- _controller.signalError(_asyncError(e, s));
- }
- }
-
- void _handleError(AsyncError error) {
- try {
- _transform.handleError(error, _sink);
- } catch (e, s) {
- _controller.signalError(_asyncError(e, s, error));
- }
- }
-
- void _handleDone() {
- try {
- _transform.handleDone(_sink);
- } catch (e, s) {
- _controller.signalError(_asyncError(e, s));
- }
- }
-}
-
-
-/** Helper class for transforming three functions into a StreamTransformer. */
-class _StreamTransformerFunctionWrapper<S, T>
- extends _StreamTransformer<S, T> {
- final _TransformDataHandler<S, T> _handleData;
- final _TransformErrorHandler<T> _handleError;
- final _TransformDoneHandler<T> _handleDone;
-
- _StreamTransformerFunctionWrapper({
- void onData(S data, StreamSink<T> sink),
- void onError(AsyncError data, StreamSink<T> sink),
- void onDone(StreamSink<T> sink)})
- : _handleData = onData != null ? onData : PipeStream._defaultHandleData,
- _handleError = onError != null ? onError
- : PipeStream._defaultHandleError,
- _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone;
-
- void handleData(S data, StreamSink<T> sink) {
- return _handleData(data, sink);
- }
-
- void handleError(AsyncError error, StreamSink<T> sink) {
- _handleError(error, sink);
- }
-
- void handleDone(StreamSink<T> sink) {
- _handleDone(sink);
- }
-}
-
-
-class TakeStream<T> extends _ForwardingTransformer<T, T> {
+class TakeTransformer<T> extends _ForwardingStreamTransformer<T, T> {
int _remaining;
- TakeStream(int count)
+ TakeTransformer(int count)
: this._remaining = count {
// This test is done early to avoid handling an async error
// in the _handleData method.
@@ -391,22 +350,22 @@ class TakeStream<T> extends _ForwardingTransformer<T, T> {
void _handleData(T inputEvent) {
if (_remaining > 0) {
- _add(inputEvent);
+ _stream._add(inputEvent);
_remaining -= 1;
if (_remaining == 0) {
// Closing also unsubscribes all subscribers, which unsubscribes
// this from source.
- _close();
+ _stream._close();
}
}
}
}
-class TakeWhileStream<T> extends _ForwardingTransformer<T, T> {
+class TakeWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> {
final _Predicate<T> _test;
- TakeWhileStream(bool test(T value))
+ TakeWhileTransformer(bool test(T value))
: this._test = test;
void _handleData(T inputEvent) {
@@ -414,27 +373,27 @@ class TakeWhileStream<T> extends _ForwardingTransformer<T, T> {
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
// The test didn't say true. Didn't say false either, but we stop anyway.
- _close();
+ _stream._close();
return;
}
if (satisfies) {
- _add(inputEvent);
+ _stream._add(inputEvent);
} else {
- _close();
+ _stream._close();
}
}
}
-class SkipStream<T> extends _ForwardingTransformer<T, T> {
+class SkipTransformer<T> extends _ForwardingStreamTransformer<T, T> {
int _remaining;
- SkipStream(int count)
+ SkipTransformer(int count)
: this._remaining = count{
// This test is done early to avoid handling an async error
// in the _handleData method.
- if (count is! int) throw new ArgumentError(count);
+ if (count is! int || count < 0) throw new ArgumentError(count);
}
void _handleData(T inputEvent) {
@@ -442,52 +401,52 @@ class SkipStream<T> extends _ForwardingTransformer<T, T> {
_remaining--;
return;
}
- return _add(inputEvent);
+ return _stream._add(inputEvent);
}
}
-class SkipWhileStream<T> extends _ForwardingTransformer<T, T> {
+class SkipWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> {
final _Predicate<T> _test;
bool _hasFailed = false;
- SkipWhileStream(bool test(T value))
+ SkipWhileTransformer(bool test(T value))
: this._test = test;
void _handleData(T inputEvent) {
if (_hasFailed) {
- _add(inputEvent);
+ _stream._add(inputEvent);
}
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
// A failure to return a boolean is considered "not matching".
_hasFailed = true;
return;
}
if (!satisfies) {
_hasFailed = true;
- _add(inputEvent);
+ _stream._add(inputEvent);
}
}
}
typedef bool _Equality<T>(T a, T b);
-class DistinctStream<T> extends _ForwardingTransformer<T, T> {
+class DistinctTransformer<T> extends _ForwardingStreamTransformer<T, T> {
static var _SENTINEL = new Object();
_Equality<T> _equals;
var _previous = _SENTINEL;
- DistinctStream(bool equals(T a, T b))
+ DistinctTransformer(bool equals(T a, T b))
: _equals = equals;
void _handleData(T inputEvent) {
if (identical(_previous, _SENTINEL)) {
_previous = inputEvent;
- return _add(inputEvent);
+ return _stream._add(inputEvent);
} else {
bool isEqual;
try {
@@ -497,11 +456,11 @@ class DistinctStream<T> extends _ForwardingTransformer<T, T> {
isEqual = _equals(_previous, inputEvent);
}
} catch (e, s) {
- _signalError(_asyncError(e, s));
+ _stream._signalError(_asyncError(e, s));
return null;
}
if (!isEqual) {
- _add(inputEvent);
+ _stream._add(inputEvent);
_previous = inputEvent;
}
}
« sdk/lib/async/stream_impl.dart ('K') | « sdk/lib/async/stream_impl.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698