Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index 07b0c19cb7379db1c6b19d39a9a8418b01d5f9fc..0b4b6ab2cf4d7b1a3ebc90a04bb7258788ad917a 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -38,11 +38,11 @@ _cancelAndError(StreamSubscription subscription, _FutureImpl future) => |
/** |
- * A wrapper around a stream that allows independent subscribers. |
+ * A [StreamTransformer] that forwards events and subscriptions. |
* |
- * By default [this] subscribes to [_source] and forwards all events to its own |
- * subscribers. It does not subscribe until there is a subscriber, and |
- * unsubscribes again when there are no subscribers left. |
+ * By default this transformer subscribes to [_source] and forwards all events |
+ * to [_stream]. It does not subscribe to [_source] until there is a subscriber, |
+ * on [_stream] and unsubscribes again when there are no subscribers left. |
* |
* The events are passed through the [_handleData], [_handleError] and |
* [_handleDone] methods. Subclasses are supposed to add handling of some of |
@@ -50,83 +50,112 @@ _cancelAndError(StreamSubscription subscription, _FutureImpl future) => |
* |
* This class is intended for internal use only. |
*/ |
-class _ForwardingMultiStream<S, T> extends _MultiStreamImpl<T> { |
- Stream<S> _source = null; |
- StreamSubscription _subscription = null; |
- |
- void _subscribeToSource() { |
- _subscription = _source.listen(this._handleData, |
- onError: this._handleError, |
- onDone: this._handleDone); |
- if (_isPaused) { |
- _subscription.pause(); |
+/** |
+ * |
+ * Handles backwards propagation of subscription and pause. |
+ */ |
+class _ForwardingStreamTransformer<S, T> implements StreamTransformer<S, T> { |
+ Stream<T> _stream; |
+ Stream<S> _source; |
+ StreamSubscription<S> _subscription; |
+ |
+ Stream<T> _createOutputStream() { |
+ if (_source.isSingleSubscription) { |
+ return new _ForwardingSingleStream<T>(this); |
} |
+ return new _ForwardingMultiStream<T>(this); |
} |
- /** |
- * Subscribe or unsubscribe on [source] depending on whether |
- * [stream] has subscribers. |
- */ |
- void _onSubscriptionStateChange() { |
- if (_hasSubscribers) { |
- assert(_subscription == null); |
- if (_source != null) { |
- _subscribeToSource(); |
+ Stream<T> bind(Stream<S> source) { |
+ if (_source != null) { |
+ throw new StateError("Transformer source already bound"); |
+ } |
+ _source = source; |
+ _stream = _createOutputStream(); |
+ return _stream; |
+ } |
+ |
+ void _onPauseStateChange(bool isPaused) { |
+ if (isPaused) { |
+ if (_subscription != null) { |
+ _subscription.pause(); |
} |
} else { |
if (_subscription != null) { |
- _subscription.cancel(); |
- _subscription = null; |
+ _subscription.resume(); |
} |
} |
} |
- void _onPauseStateChange() { |
- if (_subscription == null) return; |
- if (isPaused) { |
- _subscription.pause(); |
+ /** |
+ * Subscribe or unsubscribe on [_source] depending on whether |
+ * [_stream] has subscribers. |
+ */ |
+ void _onSubscriptionStateChange(bool hasSubscribers) { |
+ if (hasSubscribers) { |
+ assert(_subscription == null); |
+ _subscription = _source.listen(this._handleData, |
+ onError: this._handleError, |
+ onDone: this._handleDone); |
} else { |
- _subscription.resume(); |
+ // TODO(lrn): Check why this can happen. |
+ if (_subscription == null) return; |
+ _subscription.cancel(); |
+ _subscription = null; |
} |
} |
void _handleData(S inputEvent) { |
var outputEvent = inputEvent; |
- _add(outputEvent); |
+ _stream._add(outputEvent); |
} |
void _handleError(AsyncError error) { |
- _signalError(error); |
+ _stream._signalError(error); |
} |
void _handleDone() { |
- _close(); |
+ _stream._close(); |
} |
} |
+class _ForwardingMultiStream<T> extends _MultiStreamImpl<T> { |
+ _ForwardingStreamTransformer _transformer; |
+ _ForwardingMultiStream(this._transformer); |
-abstract class _ForwardingTransformer<S, T> extends _ForwardingMultiStream<S, T> |
- implements StreamTransformer<S, T> { |
- Stream<T> bind(Stream<S> source) { |
- if (_source != null) throw new StateError("Already bound to source."); |
- _source = source; |
- if (_hasSubscribers) { |
- _subscribeToSource(); |
- } |
- return this; |
+ _onSubscriptionStateChange() { |
+ _transformer._onSubscriptionStateChange(_hasSubscribers); |
+ } |
+ |
+ _onPauseStateChange() { |
+ _transformer._onPauseStateChange(_isPaused); |
} |
} |
+class _ForwardingSingleStream<T> extends _SingleStreamImpl<T> { |
+ _ForwardingStreamTransformer _transformer; |
+ _ForwardingSingleStream(this._transformer); |
+ |
+ _onSubscriptionStateChange() { |
+ _transformer._onSubscriptionStateChange(_hasSubscribers); |
+ } |
+ |
+ _onPauseStateChange() { |
+ _transformer._onPauseStateChange(_isPaused); |
+ } |
+} |
+ |
+ |
// ------------------------------------------------------------------- |
// Stream transformers used by the default Stream implementation. |
// ------------------------------------------------------------------- |
typedef bool _Predicate<T>(T value); |
-class WhereStream<T> extends _ForwardingTransformer<T, T> { |
+class WhereTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
final _Predicate<T> _test; |
- WhereStream(bool test(T value)) |
+ WhereTransformer(bool test(T value)) |
: this._test = test; |
void _handleData(T inputEvent) { |
@@ -134,11 +163,11 @@ class WhereStream<T> extends _ForwardingTransformer<T, T> { |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
return; |
} |
if (satisfies) { |
- _add(inputEvent); |
+ _stream._add(inputEvent); |
} |
} |
} |
@@ -149,10 +178,10 @@ typedef T _Transformation<S, T>(S value); |
/** |
* A stream pipe that converts data events before passing them on. |
*/ |
-class MapStream<S, T> extends _ForwardingTransformer<S, T> { |
+class MapTransformer<S, T> extends _ForwardingStreamTransformer<S, T> { |
final _Transformation _transform; |
- MapStream(T transform(S event)) |
+ MapTransformer(T transform(S event)) |
: this._transform = transform; |
void _handleData(S inputEvent) { |
@@ -160,31 +189,31 @@ class MapStream<S, T> extends _ForwardingTransformer<S, T> { |
try { |
outputEvent = _transform(inputEvent); |
} catch (e, s) { |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
return; |
} |
- _add(outputEvent); |
+ _stream._add(outputEvent); |
} |
} |
/** |
* A stream pipe that converts data events before passing them on. |
*/ |
-class ExpandStream<S, T> extends _ForwardingTransformer<S, T> { |
+class ExpandTransformer<S, T> extends _ForwardingStreamTransformer<S, T> { |
final _Transformation<S, Iterable<T>> _expand; |
- ExpandStream(Iterable<T> expand(S event)) |
+ ExpandTransformer(Iterable<T> expand(S event)) |
: this._expand = expand; |
void _handleData(S inputEvent) { |
try { |
for (T value in _expand(inputEvent)) { |
- _add(value); |
+ _stream._add(value); |
} |
} catch (e, s) { |
// If either _expand or iterating the generated iterator throws, |
// we abort the iteration. |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
} |
} |
} |
@@ -197,11 +226,11 @@ typedef bool _ErrorTest(error); |
* A stream pipe that converts or disposes error events |
* before passing them on. |
*/ |
-class HandleErrorStream<T> extends _ForwardingTransformer<T, T> { |
+class HandleErrorTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
final _ErrorTransformation _transform; |
final _ErrorTest _test; |
- HandleErrorStream(void transform(AsyncError event), bool test(error)) |
+ HandleErrorTransformer(void transform(AsyncError event), bool test(error)) |
: this._transform = transform, this._test = test; |
void _handleError(AsyncError error) { |
@@ -210,7 +239,7 @@ class HandleErrorStream<T> extends _ForwardingTransformer<T, T> { |
try { |
matches = _test(error.error); |
} catch (e, s) { |
- _signalError(_asyncError(e, s, error)); |
+ _stream._signalError(_asyncError(e, s, error)); |
return; |
} |
} |
@@ -218,11 +247,11 @@ class HandleErrorStream<T> extends _ForwardingTransformer<T, T> { |
try { |
_transform(error); |
} catch (e, s) { |
- _signalError(_asyncError(e, s, error)); |
+ _stream._signalError(_asyncError(e, s, error)); |
return; |
} |
} else { |
- _signalError(error); |
+ _stream._signalError(error); |
} |
} |
} |
@@ -233,35 +262,39 @@ typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); |
typedef void _TransformDoneHandler<T>(StreamSink<T> sink); |
/** |
- * A stream pipe that intercepts all events and can generate any event as |
+ * A stream transfomer that intercepts all events and can generate any event as |
* output. |
* |
- * Each incoming event on this [StreamSink] is passed to the corresponding |
- * provided event handler, along with a [StreamSink] linked to the [output] of |
- * this pipe. |
- * The handler can then decide which events to send to the output |
+ * Each incoming event on the source stream is passed to the corresponding |
+ * provided event handler, along with a [StreamSink] linked to the output |
+ * Stream. |
+ * The handler can then decide exactly which events to send to the output. |
*/ |
-class PipeStream<S, T> extends _ForwardingTransformer<S, T> { |
+class _StreamTransformerImpl<S, T> extends _ForwardingStreamTransformer<S, T> { |
final _TransformDataHandler<S, T> _onData; |
final _TransformErrorHandler<T> _onError; |
final _TransformDoneHandler<T> _onDone; |
StreamSink<T> _sink; |
- PipeStream({void onData(S data, StreamSink<T> sink), |
- void onError(AsyncError data, StreamSink<T> sink), |
- void onDone(StreamSink<T> sink)}) |
+ _StreamTransformerImpl(void onData(S data, StreamSink<T> sink), |
+ void onError(AsyncError data, StreamSink<T> sink), |
+ void onDone(StreamSink<T> sink)) |
: this._onData = (onData == null ? _defaultHandleData : onData), |
this._onError = (onError == null ? _defaultHandleError : onError), |
- this._onDone = (onDone == null ? _defaultHandleDone : onDone) { |
- // Cache the sink wrapper to avoid creating a new one for each event. |
- this._sink = new _StreamImplSink(this); |
+ this._onDone = (onDone == null ? _defaultHandleDone : onDone); |
+ |
+ Stream<T> bind(Stream<S> source) { |
+ Stream<T> stream = super.bind(source); |
+ // Cache a Sink object to avoid creating a new one for each event. |
+ _sink = new _StreamImplSink(stream); |
+ return stream; |
} |
void _handleData(S data) { |
try { |
- return _onData(data, _sink); |
+ _onData(data, _sink); |
} catch (e, s) { |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
} |
} |
@@ -269,7 +302,7 @@ class PipeStream<S, T> extends _ForwardingTransformer<S, T> { |
try { |
_onError(error, _sink); |
} catch (e, s) { |
- _signalError(_asyncError(e, s, error)); |
+ _stream._signalError(_asyncError(e, s, error)); |
} |
} |
@@ -277,12 +310,12 @@ class PipeStream<S, T> extends _ForwardingTransformer<S, T> { |
try { |
_onDone(_sink); |
} catch (e, s) { |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
} |
} |
/** Default data handler forwards all data. */ |
- static void _defaultHandleData(dynamic data, StreamSink sink) { |
+ static void _defaultHandleData(var data, StreamSink sink) { |
sink.add(data); |
} |
/** Default error handler forwards all errors. */ |
@@ -304,85 +337,11 @@ class _StreamImplSink<T> implements StreamSink<T> { |
void close() { _target._close(); } |
} |
-/** |
- * A stream pipe that intercepts all events and can generate any event as |
- * output. |
- * |
- * Each incoming event on this [StreamSink] is passed to the corresponding |
- * method on [transform], along with a [StreamSink] linked to the [output] of |
- * this pipe. |
- * The handler can then decide which events to send to the output |
- */ |
-class TransformStream<S, T> extends _ForwardingTransformer<S, T> { |
- final StreamTransformer<S, T> _transform; |
- StreamSink<T> _sink; |
- TransformStream(StreamTransformer<S, T> transform) |
- : this._transform = transform { |
- // Cache the sink wrapper to avoid creating a new one for each event. |
- this._sink = new _StreamImplSink(this); |
- } |
- |
- void _handleData(S data) { |
- try { |
- return _transform.handleData(data, _sink); |
- } catch (e, s) { |
- _controller.signalError(_asyncError(e, s)); |
- } |
- } |
- |
- void _handleError(AsyncError error) { |
- try { |
- _transform.handleError(error, _sink); |
- } catch (e, s) { |
- _controller.signalError(_asyncError(e, s, error)); |
- } |
- } |
- |
- void _handleDone() { |
- try { |
- _transform.handleDone(_sink); |
- } catch (e, s) { |
- _controller.signalError(_asyncError(e, s)); |
- } |
- } |
-} |
- |
- |
-/** Helper class for transforming three functions into a StreamTransformer. */ |
-class _StreamTransformerFunctionWrapper<S, T> |
- extends _StreamTransformer<S, T> { |
- final _TransformDataHandler<S, T> _handleData; |
- final _TransformErrorHandler<T> _handleError; |
- final _TransformDoneHandler<T> _handleDone; |
- |
- _StreamTransformerFunctionWrapper({ |
- void onData(S data, StreamSink<T> sink), |
- void onError(AsyncError data, StreamSink<T> sink), |
- void onDone(StreamSink<T> sink)}) |
- : _handleData = onData != null ? onData : PipeStream._defaultHandleData, |
- _handleError = onError != null ? onError |
- : PipeStream._defaultHandleError, |
- _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone; |
- |
- void handleData(S data, StreamSink<T> sink) { |
- return _handleData(data, sink); |
- } |
- |
- void handleError(AsyncError error, StreamSink<T> sink) { |
- _handleError(error, sink); |
- } |
- |
- void handleDone(StreamSink<T> sink) { |
- _handleDone(sink); |
- } |
-} |
- |
- |
-class TakeStream<T> extends _ForwardingTransformer<T, T> { |
+class TakeTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
int _remaining; |
- TakeStream(int count) |
+ TakeTransformer(int count) |
: this._remaining = count { |
// This test is done early to avoid handling an async error |
// in the _handleData method. |
@@ -391,22 +350,22 @@ class TakeStream<T> extends _ForwardingTransformer<T, T> { |
void _handleData(T inputEvent) { |
if (_remaining > 0) { |
- _add(inputEvent); |
+ _stream._add(inputEvent); |
_remaining -= 1; |
if (_remaining == 0) { |
// Closing also unsubscribes all subscribers, which unsubscribes |
// this from source. |
- _close(); |
+ _stream._close(); |
} |
} |
} |
} |
-class TakeWhileStream<T> extends _ForwardingTransformer<T, T> { |
+class TakeWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
final _Predicate<T> _test; |
- TakeWhileStream(bool test(T value)) |
+ TakeWhileTransformer(bool test(T value)) |
: this._test = test; |
void _handleData(T inputEvent) { |
@@ -414,27 +373,27 @@ class TakeWhileStream<T> extends _ForwardingTransformer<T, T> { |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
// The test didn't say true. Didn't say false either, but we stop anyway. |
- _close(); |
+ _stream._close(); |
return; |
} |
if (satisfies) { |
- _add(inputEvent); |
+ _stream._add(inputEvent); |
} else { |
- _close(); |
+ _stream._close(); |
} |
} |
} |
-class SkipStream<T> extends _ForwardingTransformer<T, T> { |
+class SkipTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
int _remaining; |
- SkipStream(int count) |
+ SkipTransformer(int count) |
: this._remaining = count{ |
// This test is done early to avoid handling an async error |
// in the _handleData method. |
- if (count is! int) throw new ArgumentError(count); |
+ if (count is! int || count < 0) throw new ArgumentError(count); |
} |
void _handleData(T inputEvent) { |
@@ -442,52 +401,52 @@ class SkipStream<T> extends _ForwardingTransformer<T, T> { |
_remaining--; |
return; |
} |
- return _add(inputEvent); |
+ return _stream._add(inputEvent); |
} |
} |
-class SkipWhileStream<T> extends _ForwardingTransformer<T, T> { |
+class SkipWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
final _Predicate<T> _test; |
bool _hasFailed = false; |
- SkipWhileStream(bool test(T value)) |
+ SkipWhileTransformer(bool test(T value)) |
: this._test = test; |
void _handleData(T inputEvent) { |
if (_hasFailed) { |
- _add(inputEvent); |
+ _stream._add(inputEvent); |
} |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
// A failure to return a boolean is considered "not matching". |
_hasFailed = true; |
return; |
} |
if (!satisfies) { |
_hasFailed = true; |
- _add(inputEvent); |
+ _stream._add(inputEvent); |
} |
} |
} |
typedef bool _Equality<T>(T a, T b); |
-class DistinctStream<T> extends _ForwardingTransformer<T, T> { |
+class DistinctTransformer<T> extends _ForwardingStreamTransformer<T, T> { |
static var _SENTINEL = new Object(); |
_Equality<T> _equals; |
var _previous = _SENTINEL; |
- DistinctStream(bool equals(T a, T b)) |
+ DistinctTransformer(bool equals(T a, T b)) |
: _equals = equals; |
void _handleData(T inputEvent) { |
if (identical(_previous, _SENTINEL)) { |
_previous = inputEvent; |
- return _add(inputEvent); |
+ return _stream._add(inputEvent); |
} else { |
bool isEqual; |
try { |
@@ -497,11 +456,11 @@ class DistinctStream<T> extends _ForwardingTransformer<T, T> { |
isEqual = _equals(_previous, inputEvent); |
} |
} catch (e, s) { |
- _signalError(_asyncError(e, s)); |
+ _stream._signalError(_asyncError(e, s)); |
return null; |
} |
if (!isEqual) { |
- _add(inputEvent); |
+ _stream._add(inputEvent); |
_previous = inputEvent; |
} |
} |