Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index e089f6ce88dce9180a110615d26ec6df7d2b2ab0..93ed5866d908770b5acfcb32156d2ddab99272a5 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -69,84 +69,26 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
// Override the following methods in subclasses to change the behavior. |
- void _handleData(S data, _EventOutputSink<T> sink) { |
+ void _handleData(S data, _EventSink<T> sink) { |
var outputData = data; |
- sink._sendData(outputData); |
+ sink._add(outputData); |
} |
- void _handleError(error, _EventOutputSink<T> sink) { |
- sink._sendError(error); |
+ void _handleError(error, _EventSink<T> sink) { |
+ sink._addError(error); |
} |
- void _handleDone(_EventOutputSink<T> sink) { |
- sink._sendDone(); |
+ void _handleDone(_EventSink<T> sink) { |
+ sink._close(); |
} |
} |
/** |
- * Common behavior of [StreamSubscription] classes. |
- * |
- * Stores and allows updating of the event handlers of a [StreamSubscription]. |
- */ |
-abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
- // TODO(ahe): Restore type when feature is implemented in dart2js |
- // checked mode. http://dartbug.com/7733 |
- var /* _DataHandler<T> */ _onData; |
- _ErrorHandler _onError; |
- _DoneHandler _onDone; |
- |
- _BaseStreamSubscription(this._onData, |
- this._onError, |
- this._onDone) { |
- if (_onData == null) _onData = _nullDataHandler; |
- if (_onError == null) _onError = _nullErrorHandler; |
- if (_onDone == null) _onDone = _nullDoneHandler; |
- } |
- |
- // StreamSubscription interface. |
- void onData(void handleData(T event)) { |
- if (handleData == null) handleData = _nullDataHandler; |
- _onData = handleData; |
- } |
- |
- void onError(void handleError(error)) { |
- if (handleError == null) handleError = _nullErrorHandler; |
- _onError = handleError; |
- } |
- |
- void onDone(void handleDone()) { |
- if (handleDone == null) handleDone = _nullDoneHandler; |
- _onDone = handleDone; |
- } |
- |
- void pause([Future resumeSignal]); |
- |
- void resume(); |
- |
- void cancel(); |
- |
- Future asFuture([var futureValue]) { |
- _FutureImpl<T> result = new _FutureImpl<T>(); |
- |
- // Overwrite the onDone and onError handlers. |
- onDone(() { result._setValue(futureValue); }); |
- onError((error) { |
- cancel(); |
- result._setError(error); |
- }); |
- |
- return result; |
- } |
-} |
- |
- |
-/** |
* Abstract superclass for subscriptions that forward to other subscriptions. |
*/ |
class _ForwardingStreamSubscription<S, T> |
- extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
+ extends _BufferingStreamSubscription<T> { |
final _ForwardingStream<S, T> _stream; |
- final bool _cancelOnError; |
StreamSubscription<S> _subscription; |
@@ -154,60 +96,46 @@ class _ForwardingStreamSubscription<S, T> |
void onData(T data), |
void onError(error), |
void onDone(), |
- this._cancelOnError) |
- : super(onData, onError, onDone) { |
- // Don't unsubscribe on incoming error, only if we send an error forwards. |
+ bool cancelOnError) |
+ : super(onData, onError, onDone, cancelOnError, null) { |
_subscription = |
_stream._source.listen(_handleData, |
onError: _handleError, |
onDone: _handleDone); |
} |
- // StreamSubscription interface. |
- |
- void pause([Future resumeSignal]) { |
- if (_subscription == null) return; |
- _subscription.pause(resumeSignal); |
- } |
- |
- void resume() { |
- if (_subscription == null) return; |
- _subscription.resume(); |
- } |
+ // _StreamSink interface. |
+ // Transformers sending more than one event have no way to know if the stream |
+ // is cancelled or closed after the first, so we just ignore remaining events. |
floitsch
2013/05/22 16:26:29
canceled.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
- bool get isPaused { |
- if (_subscription == null) return false; |
- return _subscription.isPaused; |
+ void _add(T data) { |
+ if (_isClosed) return; |
+ super._add(data); |
} |
- void cancel() { |
- if (_subscription != null) { |
- _subscription.cancel(); |
- _subscription = null; |
- } |
+ void _addError(Object error) { |
+ if (_isClosed) return; |
+ super._addError(error); |
} |
- // _EventOutputSink interface. Sends data to this subscription. |
+ // StreamSubscription callbacks. |
- void _sendData(T data) { |
- _onData(data); |
+ void _onPause() { |
+ if (_subscription == null) return; |
+ _subscription.pause(); |
} |
- void _sendError(error) { |
- _onError(error); |
- if (_cancelOnError) { |
- _subscription.cancel(); |
- _subscription = null; |
- } |
+ void _onResume() { |
+ if (_subscription == null) return; |
+ _subscription.resume(); |
} |
- void _sendDone() { |
- // If the transformation sends a done signal, we stop the subscription. |
+ void _onCancel() { |
if (_subscription != null) { |
- _subscription.cancel(); |
+ StreamSubscription subscription = _subscription; |
_subscription = null; |
+ subscription.cancel(); |
} |
- _onDone(); |
} |
// Methods used as listener on source subscription. |
@@ -241,16 +169,16 @@ class _WhereStream<T> extends _ForwardingStream<T, T> { |
_WhereStream(Stream<T> source, bool test(T value)) |
: _test = test, super(source); |
- void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventSink<T> sink) { |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
return; |
} |
if (satisfies) { |
- sink._sendData(inputEvent); |
+ sink._add(inputEvent); |
} |
} |
} |
@@ -267,15 +195,15 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> { |
_MapStream(Stream<S> source, T transform(S event)) |
: this._transform = transform, super(source); |
- void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(S inputEvent, _EventSink<T> sink) { |
T outputEvent; |
try { |
outputEvent = _transform(inputEvent); |
} catch (e, s) { |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
return; |
} |
- sink._sendData(outputEvent); |
+ sink._add(outputEvent); |
} |
} |
@@ -288,15 +216,15 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
_ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
: this._expand = expand, super(source); |
- void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(S inputEvent, _EventSink<T> sink) { |
try { |
for (T value in _expand(inputEvent)) { |
- sink._sendData(value); |
+ sink._add(value); |
} |
} catch (e, s) { |
// If either _expand or iterating the generated iterator throws, |
// we abort the iteration. |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
} |
} |
} |
@@ -318,13 +246,13 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
bool test(error)) |
: this._transform = transform, this._test = test, super(source); |
- void _handleError(Object error, _EventOutputSink<T> sink) { |
+ void _handleError(Object error, _EventSink<T> sink) { |
bool matches = true; |
if (_test != null) { |
try { |
matches = _test(error); |
} catch (e, s) { |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
return; |
} |
} |
@@ -332,11 +260,11 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
try { |
_transform(error); |
} catch (e, s) { |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
return; |
} |
} else { |
- sink._sendError(error); |
+ sink._addError(error); |
} |
} |
} |
@@ -352,14 +280,14 @@ class _TakeStream<T> extends _ForwardingStream<T, T> { |
if (count is! int) throw new ArgumentError(count); |
} |
- void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventSink<T> sink) { |
if (_remaining > 0) { |
- sink._sendData(inputEvent); |
+ sink._add(inputEvent); |
_remaining -= 1; |
if (_remaining == 0) { |
// Closing also unsubscribes all subscribers, which unsubscribes |
// this from source. |
- sink._sendDone(); |
+ sink._close(); |
} |
} |
} |
@@ -372,20 +300,20 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
_TakeWhileStream(Stream<T> source, bool test(T value)) |
: this._test = test, super(source); |
- void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventSink<T> sink) { |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
// The test didn't say true. Didn't say false either, but we stop anyway. |
- sink._sendDone(); |
+ sink._close(); |
return; |
} |
if (satisfies) { |
- sink._sendData(inputEvent); |
+ sink._add(inputEvent); |
} else { |
- sink._sendDone(); |
+ sink._close(); |
} |
} |
} |
@@ -400,12 +328,12 @@ class _SkipStream<T> extends _ForwardingStream<T, T> { |
if (count is! int || count < 0) throw new ArgumentError(count); |
} |
- void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventSink<T> sink) { |
if (_remaining > 0) { |
_remaining--; |
return; |
} |
- return sink._sendData(inputEvent); |
+ return sink._add(inputEvent); |
} |
} |
@@ -416,23 +344,23 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
_SkipWhileStream(Stream<T> source, bool test(T value)) |
: this._test = test, super(source); |
- void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventSink<T> sink) { |
if (_hasFailed) { |
- sink._sendData(inputEvent); |
+ sink._add(inputEvent); |
return; |
} |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
// A failure to return a boolean is considered "not matching". |
_hasFailed = true; |
return; |
} |
if (!satisfies) { |
_hasFailed = true; |
- sink._sendData(inputEvent); |
+ sink._add(inputEvent); |
} |
} |
} |
@@ -448,10 +376,10 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
_DistinctStream(Stream<T> source, bool equals(T a, T b)) |
: _equals = equals, super(source); |
- void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventSink<T> sink) { |
if (identical(_previous, _SENTINEL)) { |
_previous = inputEvent; |
- return sink._sendData(inputEvent); |
+ return sink._add(inputEvent); |
} else { |
bool isEqual; |
try { |
@@ -461,11 +389,11 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
isEqual = _equals(_previous, inputEvent); |
} |
} catch (e, s) { |
- sink._sendError(_asyncError(e, s)); |
+ sink._addError(_asyncError(e, s)); |
return null; |
} |
if (!isEqual) { |
- sink._sendData(inputEvent); |
+ sink._add(inputEvent); |
_previous = inputEvent; |
} |
} |