Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(579)

Unified Diff: sdk/lib/async/stream_transformers.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_pipe.dart ('k') | sdk/lib/convert/chunked_conversion.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_transformers.dart
diff --git a/sdk/lib/async/stream_transformers.dart b/sdk/lib/async/stream_transformers.dart
new file mode 100644
index 0000000000000000000000000000000000000000..1e060f9f94a43a56c072143cf2751355eca9b88d
--- /dev/null
+++ b/sdk/lib/async/stream_transformers.dart
@@ -0,0 +1,307 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+part of dart.async;
+
+/**
+ * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
+ */
+class _EventSinkWrapper<T> implements EventSink<T> {
+ _EventSink _sink;
+ _EventSinkWrapper(this._sink);
+
+ void add(T data) { _sink._add(data); }
+ void addError(error, [StackTrace stackTrace]) {
+ _sink._addError(error, stackTrace);
+ }
+ void close() { _sink._close(); }
+}
+
+/**
+ * A StreamSubscription that pipes data through a sink.
+ *
+ * The constructor of this class takes a [_SinkMapper] which maps from
+ * [EventSink] to [EventSink]. The input to the mapper is the output of
+ * the transformation. The returned sink is the transformation's input.
+ */
+class _SinkTransformerStreamSubscription<S, T>
+ extends _BufferingStreamSubscription<T> {
+ /// The transformer's input sink.
+ EventSink _transformerSink;
+
+ /// The subscription to the input stream.
+ StreamSubscription<S> _subscription;
+
+ _SinkTransformerStreamSubscription(Stream<S> source,
+ _SinkMapper mapper,
+ void onData(T data),
+ Function onError,
+ void onDone(),
+ bool cancelOnError)
+ // We set the adapter's target only when the user is allowed to send data.
+ : super(onData, onError, onDone, cancelOnError) {
+ _EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this);
+ _transformerSink = mapper(eventSink);
+ _subscription = source.listen(_handleData,
+ onError: _handleError,
+ onDone: _handleDone);
+ }
+
+ /** Whether this subscription is still subscribed to its source. */
+ bool get _isSubscribed => _subscription != null;
+
+ // _EventSink interface.
+
+ /**
+ * Adds an event to this subscriptions.
+ *
+ * Contrary to normal [_BufferingStreamSubscription]s we may receive
+ * events when the stream is already closed. Report them as state
+ * error.
+ */
+ void _add(T data) {
+ if (_isClosed) {
+ throw new StateError("Stream is already closed");
+ }
+ super._add(data);
+ }
+
+ /**
+ * Adds an error event to this subscriptions.
+ *
+ * Contrary to normal [_BufferingStreamSubscription]s we may receive
+ * events when the stream is already closed. Report them as state
+ * error.
+ */
+ void _addError(Object error, StackTrace stackTrace) {
+ if (_isClosed) {
+ throw new StateError("Stream is already closed");
+ }
+ super._addError(error, stackTrace);
+ }
+
+ /**
+ * Adds a close event to this subscriptions.
+ *
+ * Contrary to normal [_BufferingStreamSubscription]s we may receive
+ * events when the stream is already closed. Report them as state
+ * error.
+ */
+ void _close() {
+ if (_isClosed) {
+ throw new StateError("Stream is already closed");
+ }
+ super._close();
+ }
+
+ // _BufferingStreamSubscription hooks.
+
+ void _onPause() {
+ if (_isSubscribed) _subscription.pause();
+ }
+
+ void _onResume() {
+ if (_isSubscribed) _subscription.resume();
+ }
+
+ void _onCancel() {
+ if (_isSubscribed) {
+ StreamSubscription subscription = _subscription;
+ _subscription = null;
+ subscription.cancel();
+ }
+ }
+
+ void _handleData(S data) {
+ try {
+ _transformerSink.add(data);
+ } catch (e, s) {
+ _addError(_asyncError(e, s), s);
+ }
+ }
+
+ void _handleError(error, [stackTrace]) {
+ try {
+ _transformerSink.addError(error, stackTrace);
+ } catch (e, s) {
+ if (identical(e, error)) {
+ _addError(error, stackTrace);
+ } else {
+ _addError(_asyncError(e, s), s);
+ }
+ }
+ }
+
+ void _handleDone() {
+ try {
+ _subscription = null;
+ _transformerSink.close();
+ } catch (e, s) {
+ _addError(_asyncError(e, s), s);
+ }
+ }
+}
+
+
+typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
+
+/**
+ * A StreamTransformer for Sink-mappers.
+ *
+ * A Sink-mapper takes an [EventSink] (its output) and returns another
+ * EventSink (its input).
+ *
+ * Note that this class can be `const`.
+ */
+class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> {
+ final _SinkMapper<S, T> _sinkMapper;
+ const _StreamSinkTransformer(this._sinkMapper);
+
+ Stream<T> bind(Stream<S> stream)
+ => new _BoundSinkStream<S, T>(stream, _sinkMapper);
+}
+
+/**
+ * The result of binding a StreamTransformer for Sink-mappers.
+ *
+ * It contains the bound Stream and the sink-mapper. Only when the user starts
+ * listening to this stream is the sink-mapper invoked. The result is used
+ * to create a StreamSubscription that transforms events.
+ */
+class _BoundSinkStream<S, T> extends Stream<T> {
+ final _SinkMapper<S, T> _sinkMapper;
+ final Stream<S> _stream;
+
+ _BoundSinkStream(this._stream, this._sinkMapper);
+
+ StreamSubscription<T> listen(void onData(T event),
+ { Function onError,
+ void onDone(),
+ bool cancelOnError }) {
+ cancelOnError = identical(true, cancelOnError);
+ return new _SinkTransformerStreamSubscription(
+ _stream, _sinkMapper, onData, onError, onDone, cancelOnError);
+ }
+}
+
+/// Data-handler coming from [StreamTransformer.fromHandlers].
+typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
+/// Error-handler coming from [StreamTransformer.fromHandlers].
+typedef void _TransformErrorHandler<T>(
+ Object error, StackTrace stackTrace, EventSink<T> sink);
+/// Done-handler coming from [StreamTransformer.fromHandlers].
+typedef void _TransformDoneHandler<T>(EventSink<T> sink);
+
+/**
+ * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`.
+ *
+ * This way we can reuse the code from [_StreamSinkTransformer].
+ */
+class _HandlerEventSink<S, T> implements EventSink<T> {
+ final _TransformDataHandler<S, T> _handleData;
+ final _TransformErrorHandler<T> _handleError;
+ final _TransformDoneHandler<T> _handleDone;
+
+ /// The output sink where the handlers should send their data into.
+ final EventSink<T> _sink;
+
+ _HandlerEventSink(this._handleData, this._handleError, this._handleDone,
+ this._sink);
+
+ void add(S data) => _handleData(data, _sink);
+ void addError(Object error, [StackTrace stackTrace])
+ => _handleError(error, stackTrace, _sink);
+ void close() => _handleDone(_sink);
+}
+
+/**
+ * A StreamTransformer that transformers events with the given handlers.
+ *
+ * Note that this transformer can only be used once.
+ */
+class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> {
+
+ _StreamHandlerTransformer({
+ void handleData(S data, EventSink<T> sink),
+ void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
+ void handleDone(EventSink<T> sink)})
+ : super((EventSink<T> outputSink) {
+ if (handleData == null) handleData = _defaultHandleData;
+ if (handleError == null) handleError = _defaultHandleError;
+ if (handleDone == null) handleDone = _defaultHandleDone;
+ return new _HandlerEventSink<S, T>(
+ handleData, handleError, handleDone, outputSink);
+ });
+
+ Stream<T> bind(Stream<S> stream) {
+ return super.bind(stream);
+ }
+
+ /** Default data handler forwards all data. */
+ static void _defaultHandleData(var data, EventSink sink) {
+ sink.add(data);
+ }
+
+ /** Default error handler forwards all errors. */
+ static void _defaultHandleError(error, StackTrace stackTrace,
+ EventSink sink) {
+ sink.addError(error);
+ }
+
+ /** Default done handler forwards done. */
+ static void _defaultHandleDone(EventSink sink) {
+ sink.close();
+ }
+}
+
+/// A closure mapping a stream and cancelOnError to a StreamSubscription.
+typedef StreamSubscription<T> _SubscriptionTransformer<S, T>(
+ Stream<S> stream, bool cancelOnError);
+
+/**
+ * A [StreamTransformer] that minimizes the number of additional classes.
+ *
+ * Instead of implementing three classes: a [StreamTransformer], a [Stream]
+ * (as the result of a `bind` call) and a [StreamSubscription] (which does the
+ * actual work), this class only requires a fincution that is invoked when the
+ * last bit (the subscription) of the transformer-workflow is needed.
+ *
+ * The given transformer function maps from Stream and cancelOnError to a
+ * `StreamSubscription`. As such it can also act on `cancel` events, making it
+ * fully general.
+ */
+class _StreamSubscriptionTransformer<S, T> implements StreamTransformer<S, T> {
+ final _SubscriptionTransformer<S, T> _transformer;
+
+ const _StreamSubscriptionTransformer(this._transformer);
+
+ Stream<T> bind(Stream<S> stream) =>
+ new _BoundSubscriptionStream<S, T>(stream, _transformer);
+}
+
+/**
+ * A stream transformed by a [_StreamSubscriptionTransformer].
+ *
+ * When this stream is listened to it invokes the [_transformer] function with
+ * the stored [_stream]. Usually the transformer starts listening at this
+ * moment.
+ */
+class _BoundSubscriptionStream<S, T> extends Stream<T> {
+ final _SubscriptionTransformer<S, T> _transformer;
+ final Stream<S> _stream;
+
+ _BoundSubscriptionStream(this._stream, this._transformer);
+
+ StreamSubscription<T> listen(void onData(T event),
+ { Function onError,
+ void onDone(),
+ bool cancelOnError }) {
+ cancelOnError = identical(true, cancelOnError);
+ StreamSubscription<T> result = _transformer(_stream, cancelOnError);
+ result.onData(onData);
+ result.onError(onError);
+ result.onDone(onDone);
+ return result;
+ }
+}
« no previous file with comments | « sdk/lib/async/stream_pipe.dart ('k') | sdk/lib/convert/chunked_conversion.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698