| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.convert; | 5 part of dart.convert; |
| 6 | 6 |
| 7 typedef void _ChunkedConversionCallback<T>(T accumulated); | 7 typedef void _ChunkedConversionCallback<T>(T accumulated); |
| 8 | 8 |
| 9 /** | 9 /** |
| 10 * A [ChunkedConversionSink] is used to transmit data more efficiently between | 10 * A [ChunkedConversionSink] is used to transmit data more efficiently between |
| (...skipping 29 matching lines...) Expand all Loading... |
| 40 class _SimpleCallbackSink<T> extends ChunkedConversionSink<T> { | 40 class _SimpleCallbackSink<T> extends ChunkedConversionSink<T> { |
| 41 final _ChunkedConversionCallback<List<T>> _callback; | 41 final _ChunkedConversionCallback<List<T>> _callback; |
| 42 final List<T> _accumulated = <T>[]; | 42 final List<T> _accumulated = <T>[]; |
| 43 | 43 |
| 44 _SimpleCallbackSink(this._callback); | 44 _SimpleCallbackSink(this._callback); |
| 45 | 45 |
| 46 void add(T chunk) { _accumulated.add(chunk); } | 46 void add(T chunk) { _accumulated.add(chunk); } |
| 47 void close() { _callback(_accumulated); } | 47 void close() { _callback(_accumulated); } |
| 48 } | 48 } |
| 49 | 49 |
| 50 /** | 50 class _EventSinkAdapter<T> implements ChunkedConversionSink<T> { |
| 51 * This class wraps a [Converter] for use as a [StreamTransformer]. | 51 final EventSink<T> _sink; |
| 52 */ | |
| 53 class _ConverterTransformStream<S, T> extends EventTransformStream<S, T> { | |
| 54 final _ConverterStreamEventTransformer<S, T> _eventTransformer; | |
| 55 | 52 |
| 56 _ConverterTransformStream(Stream<S> source, Converter converter) | 53 _EventSinkAdapter(this._sink); |
| 57 : this._withEventTransformer( | |
| 58 source, | |
| 59 new _ConverterStreamEventTransformer<S, T>(converter)); | |
| 60 | 54 |
| 61 _ConverterTransformStream._withEventTransformer( | 55 void add(T data) => _sink.add(data); |
| 62 Stream<S> source, | 56 void close() => _sink.close(); |
| 63 _ConverterStreamEventTransformer<S, T> eventTransformer) | |
| 64 : _eventTransformer = eventTransformer, | |
| 65 super(source, eventTransformer); | |
| 66 | |
| 67 /** | |
| 68 * Starts listening to `this`. | |
| 69 * | |
| 70 * This starts the chunked conversion. | |
| 71 */ | |
| 72 StreamSubscription<T> listen(void onData(T data), | |
| 73 { Function onError, | |
| 74 void onDone(), | |
| 75 bool cancelOnError }) { | |
| 76 _eventTransformer._startChunkedConversion(); | |
| 77 return super.listen(onData, onError: onError, onDone: onDone, | |
| 78 cancelOnError: cancelOnError); | |
| 79 } | |
| 80 } | 57 } |
| 81 | 58 |
| 82 /** | 59 /** |
| 83 * This class converts implements the logic for a chunked conversion as a | 60 * This class converts implements the logic for a chunked conversion as a |
| 84 * stream transformer. | 61 * stream transformer. |
| 85 * | 62 * |
| 86 * It is used as strategy in the [EventTransformStream]. | 63 * It is used as strategy in the [EventTransformStream]. |
| 87 * | 64 * |
| 88 * It also implements the [ChunkedConversionSink] interface so that it | 65 * It also implements the [ChunkedConversionSink] interface so that it |
| 89 * can be used as output sink in a chunked conversion. | 66 * can be used as output sink in a chunked conversion. |
| 90 */ | 67 */ |
| 91 class _ConverterStreamEventTransformer<S, T> | 68 class _ConverterStreamEventSink<S, T> implements EventSink<S> { |
| 92 implements ChunkedConversionSink<T>, StreamEventTransformer<S, T> { | 69 /** The output sink for the converter. */ |
| 93 final Converter _converter; | 70 final EventSink<T> _eventSink; |
| 94 | |
| 95 /** At every [handleData] this field is updated with the new event sink. */ | |
| 96 EventSink<T> _eventSink; | |
| 97 | 71 |
| 98 /** | 72 /** |
| 99 * The input sink for new data. All data that is received with | 73 * The input sink for new data. All data that is received with |
| 100 * [handleData] is added into this sink. | 74 * [handleData] is added into this sink. |
| 101 */ | 75 */ |
| 102 ChunkedConversionSink _chunkedSink; | 76 ChunkedConversionSink _chunkedSink; |
| 103 | 77 |
| 104 _ConverterStreamEventTransformer(this._converter); | 78 _ConverterStreamEventSink(Converter converter, EventSink<T> sink) |
| 79 : this._eventSink = sink, |
| 80 _chunkedSink = |
| 81 converter.startChunkedConversion(new _EventSinkAdapter(sink)); |
| 105 | 82 |
| 106 /** | 83 void add(T o) => _chunkedSink.add(o); |
| 107 * Starts the chunked conversion. | 84 void addError(Object error, [StackTrace stackTrace]) { |
| 108 */ | 85 _eventSink.addError(error, stackTrace); |
| 109 void _startChunkedConversion() { | |
| 110 _chunkedSink = _converter.startChunkedConversion(this); | |
| 111 } | 86 } |
| 112 | 87 void close() => _chunkedSink.close(); |
| 113 /** | |
| 114 * Not supported. | |
| 115 */ | |
| 116 Stream bind(Stream otherStream) { | |
| 117 throw new UnsupportedError("Converter streams must not call bind"); | |
| 118 } | |
| 119 | |
| 120 void add(T o) => _eventSink.add(o); | |
| 121 void close() => _eventSink.close(); | |
| 122 | |
| 123 void handleData(S event, EventSink<T> eventSink) { | |
| 124 _eventSink = eventSink; | |
| 125 try { | |
| 126 _chunkedSink.add(event); | |
| 127 } catch(e) { | |
| 128 // TODO(floitsch): capture stack trace. | |
| 129 eventSink.addError(e); | |
| 130 } finally { | |
| 131 _eventSink = null; | |
| 132 } | |
| 133 } | |
| 134 | |
| 135 void handleDone(EventSink<T> eventSink) { | |
| 136 _eventSink = eventSink; | |
| 137 try { | |
| 138 _chunkedSink.close(); | |
| 139 } catch(e) { | |
| 140 // TODO(floitsch): capture stack trace. | |
| 141 eventSink.addError(e); | |
| 142 } finally { | |
| 143 _eventSink = null; | |
| 144 } | |
| 145 } | |
| 146 | |
| 147 void handleError(var errorEvent, EventSink<T> eventSink) { | |
| 148 // TODO(floitsch): capture stack trace. | |
| 149 eventSink.addError(errorEvent); | |
| 150 } | |
| 151 } | 88 } |
| OLD | NEW |