Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index e5aca172415406ea65dbe4da256ade70f2649958..e089f6ce88dce9180a110615d26ec6df7d2b2ab0 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -69,26 +69,84 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
// Override the following methods in subclasses to change the behavior. |
- void _handleData(S data, _EventSink<T> sink) { |
+ void _handleData(S data, _EventOutputSink<T> sink) { |
var outputData = data; |
- sink._add(outputData); |
+ sink._sendData(outputData); |
} |
- void _handleError(error, _EventSink<T> sink) { |
- sink._addError(error); |
+ void _handleError(error, _EventOutputSink<T> sink) { |
+ sink._sendError(error); |
} |
- void _handleDone(_EventSink<T> sink) { |
- sink._close(); |
+ void _handleDone(_EventOutputSink<T> sink) { |
+ sink._sendDone(); |
} |
} |
/** |
+ * Common behavior of [StreamSubscription] classes. |
+ * |
+ * Stores and allows updating of the event handlers of a [StreamSubscription]. |
+ */ |
+abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
+ // TODO(ahe): Restore type when feature is implemented in dart2js |
+ // checked mode. http://dartbug.com/7733 |
+ var /* _DataHandler<T> */ _onData; |
+ _ErrorHandler _onError; |
+ _DoneHandler _onDone; |
+ |
+ _BaseStreamSubscription(this._onData, |
+ this._onError, |
+ this._onDone) { |
+ if (_onData == null) _onData = _nullDataHandler; |
+ if (_onError == null) _onError = _nullErrorHandler; |
+ if (_onDone == null) _onDone = _nullDoneHandler; |
+ } |
+ |
+ // StreamSubscription interface. |
+ void onData(void handleData(T event)) { |
+ if (handleData == null) handleData = _nullDataHandler; |
+ _onData = handleData; |
+ } |
+ |
+ void onError(void handleError(error)) { |
+ if (handleError == null) handleError = _nullErrorHandler; |
+ _onError = handleError; |
+ } |
+ |
+ void onDone(void handleDone()) { |
+ if (handleDone == null) handleDone = _nullDoneHandler; |
+ _onDone = handleDone; |
+ } |
+ |
+ void pause([Future resumeSignal]); |
+ |
+ void resume(); |
+ |
+ void cancel(); |
+ |
+ Future asFuture([var futureValue]) { |
+ _FutureImpl<T> result = new _FutureImpl<T>(); |
+ |
+ // Overwrite the onDone and onError handlers. |
+ onDone(() { result._setValue(futureValue); }); |
+ onError((error) { |
+ cancel(); |
+ result._setError(error); |
+ }); |
+ |
+ return result; |
+ } |
+} |
+ |
+ |
+/** |
* Abstract superclass for subscriptions that forward to other subscriptions. |
*/ |
class _ForwardingStreamSubscription<S, T> |
- extends _BufferingStreamSubscription<T> { |
+ extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
final _ForwardingStream<S, T> _stream; |
+ final bool _cancelOnError; |
StreamSubscription<S> _subscription; |
@@ -96,46 +154,60 @@ class _ForwardingStreamSubscription<S, T> |
void onData(T data), |
void onError(error), |
void onDone(), |
- bool cancelOnError) |
- : super(onData, onError, onDone, cancelOnError) { |
+ this._cancelOnError) |
+ : super(onData, onError, onDone) { |
+ // Don't unsubscribe on incoming error, only if we send an error forwards. |
_subscription = |
_stream._source.listen(_handleData, |
onError: _handleError, |
onDone: _handleDone); |
} |
- // _StreamSink interface. |
- // Transformers sending more than one event have no way to know if the stream |
- // is canceled or closed after the first, so we just ignore remaining events. |
+ // StreamSubscription interface. |
- void _add(T data) { |
- if (_isClosed) return; |
- super._add(data); |
+ void pause([Future resumeSignal]) { |
+ if (_subscription == null) return; |
+ _subscription.pause(resumeSignal); |
} |
- void _addError(Object error) { |
- if (_isClosed) return; |
- super._addError(error); |
+ void resume() { |
+ if (_subscription == null) return; |
+ _subscription.resume(); |
} |
- // StreamSubscription callbacks. |
+ bool get isPaused { |
+ if (_subscription == null) return false; |
+ return _subscription.isPaused; |
+ } |
- void _onPause() { |
- if (_subscription == null) return; |
- _subscription.pause(); |
+ void cancel() { |
+ if (_subscription != null) { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
} |
- void _onResume() { |
- if (_subscription == null) return; |
- _subscription.resume(); |
+ // _EventOutputSink interface. Sends data to this subscription. |
+ |
+ void _sendData(T data) { |
+ _onData(data); |
+ } |
+ |
+ void _sendError(error) { |
+ _onError(error); |
+ if (_cancelOnError) { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
} |
- void _onCancel() { |
+ void _sendDone() { |
+ // If the transformation sends a done signal, we stop the subscription. |
if (_subscription != null) { |
- StreamSubscription subscription = _subscription; |
+ _subscription.cancel(); |
_subscription = null; |
- subscription.cancel(); |
} |
+ _onDone(); |
} |
// Methods used as listener on source subscription. |
@@ -169,16 +241,16 @@ class _WhereStream<T> extends _ForwardingStream<T, T> { |
_WhereStream(Stream<T> source, bool test(T value)) |
: _test = test, super(source); |
- void _handleData(T inputEvent, _EventSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
return; |
} |
if (satisfies) { |
- sink._add(inputEvent); |
+ sink._sendData(inputEvent); |
} |
} |
} |
@@ -195,15 +267,15 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> { |
_MapStream(Stream<S> source, T transform(S event)) |
: this._transform = transform, super(source); |
- void _handleData(S inputEvent, _EventSink<T> sink) { |
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
T outputEvent; |
try { |
outputEvent = _transform(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
return; |
} |
- sink._add(outputEvent); |
+ sink._sendData(outputEvent); |
} |
} |
@@ -216,15 +288,15 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
_ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
: this._expand = expand, super(source); |
- void _handleData(S inputEvent, _EventSink<T> sink) { |
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
try { |
for (T value in _expand(inputEvent)) { |
- sink._add(value); |
+ sink._sendData(value); |
} |
} catch (e, s) { |
// If either _expand or iterating the generated iterator throws, |
// we abort the iteration. |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
} |
} |
} |
@@ -246,13 +318,13 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
bool test(error)) |
: this._transform = transform, this._test = test, super(source); |
- void _handleError(Object error, _EventSink<T> sink) { |
+ void _handleError(Object error, _EventOutputSink<T> sink) { |
bool matches = true; |
if (_test != null) { |
try { |
matches = _test(error); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
return; |
} |
} |
@@ -260,11 +332,11 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
try { |
_transform(error); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
return; |
} |
} else { |
- sink._addError(error); |
+ sink._sendError(error); |
} |
} |
} |
@@ -280,14 +352,14 @@ class _TakeStream<T> extends _ForwardingStream<T, T> { |
if (count is! int) throw new ArgumentError(count); |
} |
- void _handleData(T inputEvent, _EventSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (_remaining > 0) { |
- sink._add(inputEvent); |
+ sink._sendData(inputEvent); |
_remaining -= 1; |
if (_remaining == 0) { |
// Closing also unsubscribes all subscribers, which unsubscribes |
// this from source. |
- sink._close(); |
+ sink._sendDone(); |
} |
} |
} |
@@ -300,20 +372,20 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
_TakeWhileStream(Stream<T> source, bool test(T value)) |
: this._test = test, super(source); |
- void _handleData(T inputEvent, _EventSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
// The test didn't say true. Didn't say false either, but we stop anyway. |
- sink._close(); |
+ sink._sendDone(); |
return; |
} |
if (satisfies) { |
- sink._add(inputEvent); |
+ sink._sendData(inputEvent); |
} else { |
- sink._close(); |
+ sink._sendDone(); |
} |
} |
} |
@@ -328,12 +400,12 @@ class _SkipStream<T> extends _ForwardingStream<T, T> { |
if (count is! int || count < 0) throw new ArgumentError(count); |
} |
- void _handleData(T inputEvent, _EventSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (_remaining > 0) { |
_remaining--; |
return; |
} |
- return sink._add(inputEvent); |
+ return sink._sendData(inputEvent); |
} |
} |
@@ -344,23 +416,23 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
_SkipWhileStream(Stream<T> source, bool test(T value)) |
: this._test = test, super(source); |
- void _handleData(T inputEvent, _EventSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (_hasFailed) { |
- sink._add(inputEvent); |
+ sink._sendData(inputEvent); |
return; |
} |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
// A failure to return a boolean is considered "not matching". |
_hasFailed = true; |
return; |
} |
if (!satisfies) { |
_hasFailed = true; |
- sink._add(inputEvent); |
+ sink._sendData(inputEvent); |
} |
} |
} |
@@ -376,10 +448,10 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
_DistinctStream(Stream<T> source, bool equals(T a, T b)) |
: _equals = equals, super(source); |
- void _handleData(T inputEvent, _EventSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (identical(_previous, _SENTINEL)) { |
_previous = inputEvent; |
- return sink._add(inputEvent); |
+ return sink._sendData(inputEvent); |
} else { |
bool isEqual; |
try { |
@@ -389,11 +461,11 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
isEqual = _equals(_previous, inputEvent); |
} |
} catch (e, s) { |
- sink._addError(_asyncError(e, s)); |
+ sink._sendError(_asyncError(e, s)); |
return null; |
} |
if (!isEqual) { |
- sink._add(inputEvent); |
+ sink._sendData(inputEvent); |
_previous = inputEvent; |
} |
} |