Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e0da85d379364ca9003a5d169485937630576c64 |
--- /dev/null |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -0,0 +1,460 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+// part of dart.async; |
+ |
+/** |
+ * A pipe between two streams. |
+ * |
+ * The default pipe subscribes to the [source] and sends on the |
+ * [stream]. |
+ * |
+ * The events are passed through the [_handleData], [_handleError] and |
+ * [_handleDone] methods. Subclasses are supposed to add handling of some of |
+ * the events by overriding these methods. |
+ * |
+ * This class is intended for internal use only. Users can use the [PipeStream] |
+ * to configure similar behavior. |
+ */ |
+abstract class _ForwardingStream<S, T> extends _MultiStreamImpl<T> |
+ implements StreamTransformer<S, T> { |
+ Stream<S> _source = null; |
+ StreamSubscription _subscription = null; |
+ |
+ StreamController<T> _createController() { |
+ return new _BaseForwardingController<T>(this); |
+ } |
+ |
+ void _subscribeToSource() { |
+ _subscription = _source.listen(this._handleData, |
+ onError: this._handleError, |
+ onDone: this._handleDone); |
+ if (_isPaused) { |
+ _subscription.pause(); |
+ } |
+ } |
+ |
+ Stream<T> bind(Stream<S> source) { |
+ assert(_source == null); |
+ _source = source; |
+ if (_hasSubscribers) { |
+ _subscribeToSource(); |
+ } |
+ return this; |
+ } |
+ |
+ /** |
+ * Subscribe or unsubscribe on [source] depending on whether |
+ * [stream] has subscribers. |
+ */ |
+ void _onSubscriptionStateChange() { |
+ if (_hasSubscribers) { |
+ assert(_subscription == null); |
+ if (_source != null) { |
+ _subscribeToSource(); |
+ } |
+ } else { |
+ if (_subscription != null) { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
+ } |
+ } |
+ |
+ void _onPauseStateChange() { |
+ if (_subscription == null) return; |
+ if (isPaused) { |
+ _subscription.pause(); |
+ } else { |
+ _subscription.resume(); |
+ } |
+ } |
+ |
+ void _handleData(S inputEvent) { |
+ var outputEvent = inputEvent; |
+ _add(outputEvent); |
+ } |
+ |
+ void _handleError(AsyncError error) { |
+ _signalError(error); |
+ } |
+ |
+ void _handleDone() { |
+ _close(); |
+ } |
+} |
+ |
+ |
+// ------------------------------------------------------------------- |
+// Stream pipes used by the default Stream implementation. |
+// ------------------------------------------------------------------- |
+ |
+typedef bool _Predicate<T>(T value); |
+ |
+class WhereStream<T> extends _ForwardingStream<T, T> { |
+ final _Predicate<T> _test; |
+ |
+ WhereStream(bool test(T value)) |
+ : this._test = test; |
+ |
+ void _handleData(T inputEvent) { |
+ bool satisfies; |
+ try { |
+ satisfies = _test(inputEvent); |
+ } catch (e, s) { |
+ _signalError(new AsyncError(e, s)); |
+ return; |
+ } |
+ if (satisfies) { |
+ _add(inputEvent); |
+ } |
+ } |
+} |
+ |
+ |
+typedef T _Transformation<S, T>(S value); |
+ |
+/** |
+ * A stream pipe that converts data events before passing them on. |
+ */ |
+class MapStream<S, T> extends _ForwardingStream<S, T> { |
+ final _Transformation _transform; |
+ |
+ MapStream(T transform(S event)) |
+ : this._transform = transform; |
+ |
+ void _handleData(S inputEvent) { |
+ T outputEvent; |
+ try { |
+ outputEvent = _transform(inputEvent); |
+ } catch (e, s) { |
+ _signalError(new AsyncError(e, s)); |
+ return; |
+ } |
+ _add(outputEvent); |
+ } |
+} |
+ |
+/** |
+ * A stream pipe that converts data events before passing them on. |
+ */ |
+class ExpandStream<S, T> extends _ForwardingStream<S, T> { |
+ final _Transformation<S, Iterable<T>> _expand; |
+ |
+ ExpandStream(Iterable<T> expand(S event)) |
+ : this._expand = expand; |
+ |
+ void _handleData(S inputEvent) { |
+ try { |
+ for (T value in _expand(inputEvent)) { |
+ _add(value); |
+ } |
+ } catch (e, s) { |
+ // If either _expand or iterating the generated iterator throws, |
+ // we abort the iteration. |
+ _signalError(new AsyncError(e, s)); |
+ } |
+ } |
+} |
+ |
+ |
+typedef AsyncError _ErrorTransformation(AsyncError error); |
+ |
+/** |
+ * A stream pipe that converts or disposes error events |
+ * before passing them on. |
+ */ |
+class HandleErrorStream<T> extends _ForwardingStream<T, T> { |
+ final _ErrorTransformation _transform; |
+ |
+ HandleErrorStream(AsyncError transform(AsyncError event)) |
+ : this._transform = transform; |
+ |
+ void _handleError(AsyncError error) { |
+ try { |
+ error = _transform(error); |
+ if (error == null) return; |
+ } catch (e, s) { |
+ error = new AsyncError.withCause(e, s, error); |
+ } |
+ _signalError(error); |
+ } |
+} |
+ |
+ |
+typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); |
+typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); |
+typedef void _TransformDoneHandler<T>(StreamSink<T> sink); |
+ |
+/** |
+ * A stream pipe that intercepts all events and can generate any event as |
+ * output. |
+ * |
+ * Each incoming event on this [StreamSink] is passed to the corresponding |
+ * provided event handler, along with a [StreamSink] linked to the [output] of |
+ * this pipe. |
+ * The handler can then decide which events to send to the output |
+ */ |
+class PipeStream<S, T> extends _ForwardingStream<S, T> { |
+ final _TransformDataHandler<S, T> _onData; |
+ final _TransformErrorHandler<T> _onError; |
+ final _TransformDoneHandler<T> _onDone; |
+ StreamSink<T> _sink; |
+ |
+ PipeStream({void onData(S data, StreamSink<T> sink), |
+ void onError(AsyncError data, StreamSink<T> sink), |
+ void onDone(StreamSink<T> sink)}) |
+ : this._onData = (onData == null ? _defaultHandleData : onData), |
+ this._onError = (onError == null ? _defaultHandleError : onError), |
+ this._onDone = (onDone == null ? _defaultHandleDone : onDone) { |
+ // Cache the sink wrapper to avoid creating a new one for each event. |
+ this._sink = new _StreamImplSink(this); |
+ } |
+ |
+ void _handleData(S data) { |
+ try { |
+ return _onData(data, _sink); |
+ } catch (e, s) { |
+ _signalError(new AsyncError(e, s)); |
+ } |
+ } |
+ |
+ void _handleError(AsyncError error) { |
+ try { |
+ _onError(error, _sink); |
+ } catch (e, s) { |
+ _signalError(new AsyncError.withCause(e, s, error)); |
+ } |
+ } |
+ |
+ void _handleDone() { |
+ try { |
+ _onDone(_sink); |
+ } catch (e, s) { |
+ _signalError(new AsyncError(e, s)); |
+ } |
+ } |
+ |
+ /** Default data handler forwards all data. */ |
+ static void _defaultHandleData(dynamic data, StreamSink sink) { |
+ sink.add(data); |
+ } |
+ /** Default error handler forwards all errors. */ |
+ static void _defaultHandleError(AsyncError error, StreamSink sink) { |
+ sink.signalError(error); |
+ } |
+ /** Default done handler forwards done. */ |
+ static void _defaultHandleDone(StreamSink sink) { |
+ sink.close(); |
+ } |
+} |
+ |
+/** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ |
+class _StreamImplSink<T> implements StreamSink<T> { |
+ _StreamImpl<T> _target; |
+ _StreamImplSink(this._target); |
+ void add(T data) { _target._add(data); } |
+ void signalError(AsyncError error) { _target._signalError(error); } |
+ void close() { _target._close(); } |
+} |
+ |
+/** |
+ * A stream pipe that intercepts all events and can generate any event as |
+ * output. |
+ * |
+ * Each incoming event on this [StreamSink] is passed to the corresponding |
+ * method on [transform], along with a [StreamSink] linked to the [output] of |
+ * this pipe. |
+ * The handler can then decide which events to send to the output |
+ */ |
+class TransformStream<S, T> extends _ForwardingStream<S, T> { |
+ final StreamTransformer<S, T> _transform; |
+ StreamSink<T> _sink; |
+ |
+ TransformStream(StreamTransformer<S, T> transform) |
+ : this._transform = transform { |
+ // Cache the sink wrapper to avoid creating a new one for each event. |
+ this._sink = new _StreamImplSink(this); |
+ } |
+ |
+ void _handleData(S data) { |
+ try { |
+ return _transform.handleData(data, _sink); |
+ } catch (e, s) { |
+ _controller.signalError(new AsyncError(e, s)); |
+ } |
+ } |
+ |
+ void _handleError(AsyncError error) { |
+ try { |
+ _transform.handleError(error, _sink); |
+ } catch (e, s) { |
+ _controller.signalError(new AsyncError.withCause(e, s, error)); |
+ } |
+ } |
+ |
+ void _handleDone() { |
+ try { |
+ _transform.handleDone(_sink); |
+ } catch (e, s) { |
+ _controller.signalError(new AsyncError(e, s)); |
+ } |
+ } |
+} |
+ |
+ |
+/** Helper class for transforming three functions into a StreamTransformer. */ |
+class _StreamTransformerFunctionWrapper<S, T> |
+ extends _StreamTransformer<S, T> { |
+ final _TransformDataHandler<S, T> _handleData; |
+ final _TransformErrorHandler<T> _handleError; |
+ final _TransformDoneHandler<T> _handleDone; |
+ |
+ _StreamTransformerFunctionWrapper({ |
+ void onData(S data, StreamSink<T> sink), |
+ void onError(AsyncError data, StreamSink<T> sink), |
+ void onDone(StreamSink<T> sink)}) |
+ : _handleData = onData != null ? onData : PipeStream._defaultHandleData, |
+ _handleError = onError != null ? onError |
+ : PipeStream._defaultHandleError, |
+ _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone; |
+ |
+ void handleData(S data, StreamSink<T> sink) { |
+ return _handleData(data, sink); |
+ } |
+ |
+ void handleError(AsyncError error, StreamSink<T> sink) { |
+ _handleError(error, sink); |
+ } |
+ |
+ void handleDone(StreamSink<T> sink) { |
+ _handleDone(sink); |
+ } |
+} |
+ |
+ |
+class TakeStream<T> extends _ForwardingStream<T, T> { |
+ int _remaining; |
+ |
+ TakeStream(int count) |
+ : this._remaining = count { |
+ if (count is! int) throw new ArgumentError(count); |
+ } |
+ |
+ void _handleData(T inputEvent) { |
+ if (_remaining > 0) { |
+ _add(inputEvent); |
+ _remaining -= 1; |
+ if (_remaining == 0) { |
+ // Closing also unsubscribes all subscribers, which unsubscribes |
+ // this from source. |
+ _close(); |
+ } |
+ } |
+ } |
+} |
+ |
+ |
+class TakeWhileStream<T> extends _ForwardingStream<T, T> { |
+ final _Predicate<T> _test; |
+ |
+ TakeWhileStream(bool test(T value)) |
+ : this._test = test; |
+ |
+ void _handleData(T inputEvent) { |
+ bool satisfies; |
+ try { |
+ satisfies = _test(inputEvent); |
+ } catch (e, s) { |
+ _signalError(new AsyncError(e, s)); |
+ // The test didn't say true. Didn't say false either, but we stop anyway. |
+ _close(); |
+ return; |
+ } |
+ if (satisfies) { |
+ _add(inputEvent); |
+ } else { |
+ _close(); |
+ } |
+ } |
+} |
+ |
+class SkipStream<T> extends _ForwardingStream<T, T> { |
+ int _remaining; |
+ |
+ SkipStream(int count) |
+ : this._remaining = count{ |
+ if (count is! int) throw new ArgumentError(count); |
+ } |
+ |
+ void _handleData(T inputEvent) { |
+ if (_remaining > 0) { |
+ _remaining--; |
+ return; |
+ } |
+ return _add(inputEvent); |
+ } |
+} |
+ |
+class SkipWhileStream<T> extends _ForwardingStream<T, T> { |
+ final _Predicate<T> _test; |
+ bool _hasFailed = false; |
+ |
+ SkipWhileStream(bool test(T value)) |
+ : this._test = test; |
+ |
+ void _handleData(T inputEvent) { |
+ if (_hasFailed) { |
+ _add(inputEvent); |
+ } |
+ bool satisfies; |
+ try { |
+ satisfies = _test(inputEvent); |
+ } catch (e, s) { |
+ _signalError(new AsyncError(e, s)); |
+ // A failure to return a boolean is considered "not matching". |
+ _hasFailed = true; |
+ return; |
+ } |
+ if (!satisfies) { |
+ _hasFailed = true; |
+ _add(inputEvent); |
+ } |
+ } |
+} |
+ |
+typedef bool _Equality<T>(T a, T b); |
+ |
+class DistinctStream<T> extends _ForwardingStream<T, T> { |
+ static var _SENTINEL = new Object(); |
+ |
+ _Equality<T> _equals; |
+ var _previous = _SENTINEL; |
+ |
+ DistinctStream(bool equals(T a, T b)) |
+ : _equals = equals; |
+ |
+ void _handleData(T inputEvent) { |
+ if (identical(_previous, _SENTINEL)) { |
+ _previous = inputEvent; |
+ return _add(inputEvent); |
+ } else { |
+ bool isEqual; |
+ try { |
+ if (_equals == null) { |
+ isEqual = (_previous == inputEvent); |
+ } else { |
+ isEqual = _equals(_previous, inputEvent); |
+ } |
+ } catch (e, s) { |
+ _signalError(new AsyncError(e, s)); |
+ return null; |
+ } |
+ if (!isEqual) { |
+ _add(inputEvent); |
+ _previous = inputEvent; |
+ } |
+ } |
+ } |
+} |