Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(473)

Unified Diff: sdk/lib/async/stream_pipe.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/string_transform.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_pipe.dart
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
new file mode 100644
index 0000000000000000000000000000000000000000..e0da85d379364ca9003a5d169485937630576c64
--- /dev/null
+++ b/sdk/lib/async/stream_pipe.dart
@@ -0,0 +1,460 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// part of dart.async;
+
+/**
+ * A pipe between two streams.
+ *
+ * The default pipe subscribes to the [source] and sends on the
+ * [stream].
+ *
+ * The events are passed through the [_handleData], [_handleError] and
+ * [_handleDone] methods. Subclasses are supposed to add handling of some of
+ * the events by overriding these methods.
+ *
+ * This class is intended for internal use only. Users can use the [PipeStream]
+ * to configure similar behavior.
+ */
+abstract class _ForwardingStream<S, T> extends _MultiStreamImpl<T>
+ implements StreamTransformer<S, T> {
+ Stream<S> _source = null;
+ StreamSubscription _subscription = null;
+
+ StreamController<T> _createController() {
+ return new _BaseForwardingController<T>(this);
+ }
+
+ void _subscribeToSource() {
+ _subscription = _source.listen(this._handleData,
+ onError: this._handleError,
+ onDone: this._handleDone);
+ if (_isPaused) {
+ _subscription.pause();
+ }
+ }
+
+ Stream<T> bind(Stream<S> source) {
+ assert(_source == null);
+ _source = source;
+ if (_hasSubscribers) {
+ _subscribeToSource();
+ }
+ return this;
+ }
+
+ /**
+ * Subscribe or unsubscribe on [source] depending on whether
+ * [stream] has subscribers.
+ */
+ void _onSubscriptionStateChange() {
+ if (_hasSubscribers) {
+ assert(_subscription == null);
+ if (_source != null) {
+ _subscribeToSource();
+ }
+ } else {
+ if (_subscription != null) {
+ _subscription.cancel();
+ _subscription = null;
+ }
+ }
+ }
+
+ void _onPauseStateChange() {
+ if (_subscription == null) return;
+ if (isPaused) {
+ _subscription.pause();
+ } else {
+ _subscription.resume();
+ }
+ }
+
+ void _handleData(S inputEvent) {
+ var outputEvent = inputEvent;
+ _add(outputEvent);
+ }
+
+ void _handleError(AsyncError error) {
+ _signalError(error);
+ }
+
+ void _handleDone() {
+ _close();
+ }
+}
+
+
+// -------------------------------------------------------------------
+// Stream pipes used by the default Stream implementation.
+// -------------------------------------------------------------------
+
+typedef bool _Predicate<T>(T value);
+
+class WhereStream<T> extends _ForwardingStream<T, T> {
+ final _Predicate<T> _test;
+
+ WhereStream(bool test(T value))
+ : this._test = test;
+
+ void _handleData(T inputEvent) {
+ bool satisfies;
+ try {
+ satisfies = _test(inputEvent);
+ } catch (e, s) {
+ _signalError(new AsyncError(e, s));
+ return;
+ }
+ if (satisfies) {
+ _add(inputEvent);
+ }
+ }
+}
+
+
+typedef T _Transformation<S, T>(S value);
+
+/**
+ * A stream pipe that converts data events before passing them on.
+ */
+class MapStream<S, T> extends _ForwardingStream<S, T> {
+ final _Transformation _transform;
+
+ MapStream(T transform(S event))
+ : this._transform = transform;
+
+ void _handleData(S inputEvent) {
+ T outputEvent;
+ try {
+ outputEvent = _transform(inputEvent);
+ } catch (e, s) {
+ _signalError(new AsyncError(e, s));
+ return;
+ }
+ _add(outputEvent);
+ }
+}
+
+/**
+ * A stream pipe that converts data events before passing them on.
+ */
+class ExpandStream<S, T> extends _ForwardingStream<S, T> {
+ final _Transformation<S, Iterable<T>> _expand;
+
+ ExpandStream(Iterable<T> expand(S event))
+ : this._expand = expand;
+
+ void _handleData(S inputEvent) {
+ try {
+ for (T value in _expand(inputEvent)) {
+ _add(value);
+ }
+ } catch (e, s) {
+ // If either _expand or iterating the generated iterator throws,
+ // we abort the iteration.
+ _signalError(new AsyncError(e, s));
+ }
+ }
+}
+
+
+typedef AsyncError _ErrorTransformation(AsyncError error);
+
+/**
+ * A stream pipe that converts or disposes error events
+ * before passing them on.
+ */
+class HandleErrorStream<T> extends _ForwardingStream<T, T> {
+ final _ErrorTransformation _transform;
+
+ HandleErrorStream(AsyncError transform(AsyncError event))
+ : this._transform = transform;
+
+ void _handleError(AsyncError error) {
+ try {
+ error = _transform(error);
+ if (error == null) return;
+ } catch (e, s) {
+ error = new AsyncError.withCause(e, s, error);
+ }
+ _signalError(error);
+ }
+}
+
+
+typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink);
+typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
+typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
+
+/**
+ * A stream pipe that intercepts all events and can generate any event as
+ * output.
+ *
+ * Each incoming event on this [StreamSink] is passed to the corresponding
+ * provided event handler, along with a [StreamSink] linked to the [output] of
+ * this pipe.
+ * The handler can then decide which events to send to the output
+ */
+class PipeStream<S, T> extends _ForwardingStream<S, T> {
+ final _TransformDataHandler<S, T> _onData;
+ final _TransformErrorHandler<T> _onError;
+ final _TransformDoneHandler<T> _onDone;
+ StreamSink<T> _sink;
+
+ PipeStream({void onData(S data, StreamSink<T> sink),
+ void onError(AsyncError data, StreamSink<T> sink),
+ void onDone(StreamSink<T> sink)})
+ : this._onData = (onData == null ? _defaultHandleData : onData),
+ this._onError = (onError == null ? _defaultHandleError : onError),
+ this._onDone = (onDone == null ? _defaultHandleDone : onDone) {
+ // Cache the sink wrapper to avoid creating a new one for each event.
+ this._sink = new _StreamImplSink(this);
+ }
+
+ void _handleData(S data) {
+ try {
+ return _onData(data, _sink);
+ } catch (e, s) {
+ _signalError(new AsyncError(e, s));
+ }
+ }
+
+ void _handleError(AsyncError error) {
+ try {
+ _onError(error, _sink);
+ } catch (e, s) {
+ _signalError(new AsyncError.withCause(e, s, error));
+ }
+ }
+
+ void _handleDone() {
+ try {
+ _onDone(_sink);
+ } catch (e, s) {
+ _signalError(new AsyncError(e, s));
+ }
+ }
+
+ /** Default data handler forwards all data. */
+ static void _defaultHandleData(dynamic data, StreamSink sink) {
+ sink.add(data);
+ }
+ /** Default error handler forwards all errors. */
+ static void _defaultHandleError(AsyncError error, StreamSink sink) {
+ sink.signalError(error);
+ }
+ /** Default done handler forwards done. */
+ static void _defaultHandleDone(StreamSink sink) {
+ sink.close();
+ }
+}
+
+/** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */
+class _StreamImplSink<T> implements StreamSink<T> {
+ _StreamImpl<T> _target;
+ _StreamImplSink(this._target);
+ void add(T data) { _target._add(data); }
+ void signalError(AsyncError error) { _target._signalError(error); }
+ void close() { _target._close(); }
+}
+
+/**
+ * A stream pipe that intercepts all events and can generate any event as
+ * output.
+ *
+ * Each incoming event on this [StreamSink] is passed to the corresponding
+ * method on [transform], along with a [StreamSink] linked to the [output] of
+ * this pipe.
+ * The handler can then decide which events to send to the output
+ */
+class TransformStream<S, T> extends _ForwardingStream<S, T> {
+ final StreamTransformer<S, T> _transform;
+ StreamSink<T> _sink;
+
+ TransformStream(StreamTransformer<S, T> transform)
+ : this._transform = transform {
+ // Cache the sink wrapper to avoid creating a new one for each event.
+ this._sink = new _StreamImplSink(this);
+ }
+
+ void _handleData(S data) {
+ try {
+ return _transform.handleData(data, _sink);
+ } catch (e, s) {
+ _controller.signalError(new AsyncError(e, s));
+ }
+ }
+
+ void _handleError(AsyncError error) {
+ try {
+ _transform.handleError(error, _sink);
+ } catch (e, s) {
+ _controller.signalError(new AsyncError.withCause(e, s, error));
+ }
+ }
+
+ void _handleDone() {
+ try {
+ _transform.handleDone(_sink);
+ } catch (e, s) {
+ _controller.signalError(new AsyncError(e, s));
+ }
+ }
+}
+
+
+/** Helper class for transforming three functions into a StreamTransformer. */
+class _StreamTransformerFunctionWrapper<S, T>
+ extends _StreamTransformer<S, T> {
+ final _TransformDataHandler<S, T> _handleData;
+ final _TransformErrorHandler<T> _handleError;
+ final _TransformDoneHandler<T> _handleDone;
+
+ _StreamTransformerFunctionWrapper({
+ void onData(S data, StreamSink<T> sink),
+ void onError(AsyncError data, StreamSink<T> sink),
+ void onDone(StreamSink<T> sink)})
+ : _handleData = onData != null ? onData : PipeStream._defaultHandleData,
+ _handleError = onError != null ? onError
+ : PipeStream._defaultHandleError,
+ _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone;
+
+ void handleData(S data, StreamSink<T> sink) {
+ return _handleData(data, sink);
+ }
+
+ void handleError(AsyncError error, StreamSink<T> sink) {
+ _handleError(error, sink);
+ }
+
+ void handleDone(StreamSink<T> sink) {
+ _handleDone(sink);
+ }
+}
+
+
+class TakeStream<T> extends _ForwardingStream<T, T> {
+ int _remaining;
+
+ TakeStream(int count)
+ : this._remaining = count {
+ if (count is! int) throw new ArgumentError(count);
+ }
+
+ void _handleData(T inputEvent) {
+ if (_remaining > 0) {
+ _add(inputEvent);
+ _remaining -= 1;
+ if (_remaining == 0) {
+ // Closing also unsubscribes all subscribers, which unsubscribes
+ // this from source.
+ _close();
+ }
+ }
+ }
+}
+
+
+class TakeWhileStream<T> extends _ForwardingStream<T, T> {
+ final _Predicate<T> _test;
+
+ TakeWhileStream(bool test(T value))
+ : this._test = test;
+
+ void _handleData(T inputEvent) {
+ bool satisfies;
+ try {
+ satisfies = _test(inputEvent);
+ } catch (e, s) {
+ _signalError(new AsyncError(e, s));
+ // The test didn't say true. Didn't say false either, but we stop anyway.
+ _close();
+ return;
+ }
+ if (satisfies) {
+ _add(inputEvent);
+ } else {
+ _close();
+ }
+ }
+}
+
+class SkipStream<T> extends _ForwardingStream<T, T> {
+ int _remaining;
+
+ SkipStream(int count)
+ : this._remaining = count{
+ if (count is! int) throw new ArgumentError(count);
+ }
+
+ void _handleData(T inputEvent) {
+ if (_remaining > 0) {
+ _remaining--;
+ return;
+ }
+ return _add(inputEvent);
+ }
+}
+
+class SkipWhileStream<T> extends _ForwardingStream<T, T> {
+ final _Predicate<T> _test;
+ bool _hasFailed = false;
+
+ SkipWhileStream(bool test(T value))
+ : this._test = test;
+
+ void _handleData(T inputEvent) {
+ if (_hasFailed) {
+ _add(inputEvent);
+ }
+ bool satisfies;
+ try {
+ satisfies = _test(inputEvent);
+ } catch (e, s) {
+ _signalError(new AsyncError(e, s));
+ // A failure to return a boolean is considered "not matching".
+ _hasFailed = true;
+ return;
+ }
+ if (!satisfies) {
+ _hasFailed = true;
+ _add(inputEvent);
+ }
+ }
+}
+
+typedef bool _Equality<T>(T a, T b);
+
+class DistinctStream<T> extends _ForwardingStream<T, T> {
+ static var _SENTINEL = new Object();
+
+ _Equality<T> _equals;
+ var _previous = _SENTINEL;
+
+ DistinctStream(bool equals(T a, T b))
+ : _equals = equals;
+
+ void _handleData(T inputEvent) {
+ if (identical(_previous, _SENTINEL)) {
+ _previous = inputEvent;
+ return _add(inputEvent);
+ } else {
+ bool isEqual;
+ try {
+ if (_equals == null) {
+ isEqual = (_previous == inputEvent);
+ } else {
+ isEqual = _equals(_previous, inputEvent);
+ }
+ } catch (e, s) {
+ _signalError(new AsyncError(e, s));
+ return null;
+ }
+ if (!isEqual) {
+ _add(inputEvent);
+ _previous = inputEvent;
+ }
+ }
+ }
+}
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/string_transform.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698