Index: sdk/lib/convert/chunked_conversion.dart |
diff --git a/sdk/lib/convert/chunked_conversion.dart b/sdk/lib/convert/chunked_conversion.dart |
index 67c017bdccdbb7e8c110a38d6a8a85ea023f55b8..ad5afc3c45895faa0f4c346c9afe23b76afc69e6 100644 |
--- a/sdk/lib/convert/chunked_conversion.dart |
+++ b/sdk/lib/convert/chunked_conversion.dart |
@@ -47,36 +47,13 @@ class _SimpleCallbackSink<T> extends ChunkedConversionSink<T> { |
void close() { _callback(_accumulated); } |
} |
-/** |
- * This class wraps a [Converter] for use as a [StreamTransformer]. |
- */ |
-class _ConverterTransformStream<S, T> extends EventTransformStream<S, T> { |
- final _ConverterStreamEventTransformer<S, T> _eventTransformer; |
- |
- _ConverterTransformStream(Stream<S> source, Converter converter) |
- : this._withEventTransformer( |
- source, |
- new _ConverterStreamEventTransformer<S, T>(converter)); |
+class _EventSinkAdapter<T> implements ChunkedConversionSink<T> { |
+ final EventSink<T> _sink; |
- _ConverterTransformStream._withEventTransformer( |
- Stream<S> source, |
- _ConverterStreamEventTransformer<S, T> eventTransformer) |
- : _eventTransformer = eventTransformer, |
- super(source, eventTransformer); |
+ _EventSinkAdapter(this._sink); |
- /** |
- * Starts listening to `this`. |
- * |
- * This starts the chunked conversion. |
- */ |
- StreamSubscription<T> listen(void onData(T data), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError }) { |
- _eventTransformer._startChunkedConversion(); |
- return super.listen(onData, onError: onError, onDone: onDone, |
- cancelOnError: cancelOnError); |
- } |
+ void add(T data) => _sink.add(data); |
+ void close() => _sink.close(); |
} |
/** |
@@ -88,12 +65,9 @@ class _ConverterTransformStream<S, T> extends EventTransformStream<S, T> { |
* It also implements the [ChunkedConversionSink] interface so that it |
* can be used as output sink in a chunked conversion. |
*/ |
-class _ConverterStreamEventTransformer<S, T> |
- implements ChunkedConversionSink<T>, StreamEventTransformer<S, T> { |
- final Converter _converter; |
- |
- /** At every [handleData] this field is updated with the new event sink. */ |
- EventSink<T> _eventSink; |
+class _ConverterStreamEventSink<S, T> implements EventSink<S> { |
+ /** The output sink for the converter. */ |
+ final EventSink<T> _eventSink; |
/** |
* The input sink for new data. All data that is received with |
@@ -101,51 +75,14 @@ class _ConverterStreamEventTransformer<S, T> |
*/ |
ChunkedConversionSink _chunkedSink; |
- _ConverterStreamEventTransformer(this._converter); |
- |
- /** |
- * Starts the chunked conversion. |
- */ |
- void _startChunkedConversion() { |
- _chunkedSink = _converter.startChunkedConversion(this); |
- } |
- |
- /** |
- * Not supported. |
- */ |
- Stream bind(Stream otherStream) { |
- throw new UnsupportedError("Converter streams must not call bind"); |
- } |
- |
- void add(T o) => _eventSink.add(o); |
- void close() => _eventSink.close(); |
- |
- void handleData(S event, EventSink<T> eventSink) { |
- _eventSink = eventSink; |
- try { |
- _chunkedSink.add(event); |
- } catch(e) { |
- // TODO(floitsch): capture stack trace. |
- eventSink.addError(e); |
- } finally { |
- _eventSink = null; |
- } |
- } |
- |
- void handleDone(EventSink<T> eventSink) { |
- _eventSink = eventSink; |
- try { |
- _chunkedSink.close(); |
- } catch(e) { |
- // TODO(floitsch): capture stack trace. |
- eventSink.addError(e); |
- } finally { |
- _eventSink = null; |
- } |
- } |
+ _ConverterStreamEventSink(Converter converter, EventSink<T> sink) |
+ : this._eventSink = sink, |
+ _chunkedSink = |
+ converter.startChunkedConversion(new _EventSinkAdapter(sink)); |
- void handleError(var errorEvent, EventSink<T> eventSink) { |
- // TODO(floitsch): capture stack trace. |
- eventSink.addError(errorEvent); |
+ void add(T o) => _chunkedSink.add(o); |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _eventSink.addError(error, stackTrace); |
} |
+ void close() => _chunkedSink.close(); |
} |