OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.convert; | 5 part of dart.convert; |
6 | 6 |
7 typedef void _ChunkedConversionCallback<T>(T accumulated); | 7 typedef void _ChunkedConversionCallback<T>(T accumulated); |
8 | 8 |
9 /** | 9 /** |
10 * A [ChunkedConversionSink] is used to transmit data more efficiently between | 10 * A [ChunkedConversionSink] is used to transmit data more efficiently between |
(...skipping 29 matching lines...) Expand all Loading... |
40 class _SimpleCallbackSink<T> extends ChunkedConversionSink<T> { | 40 class _SimpleCallbackSink<T> extends ChunkedConversionSink<T> { |
41 final _ChunkedConversionCallback<List<T>> _callback; | 41 final _ChunkedConversionCallback<List<T>> _callback; |
42 final List<T> _accumulated = <T>[]; | 42 final List<T> _accumulated = <T>[]; |
43 | 43 |
44 _SimpleCallbackSink(this._callback); | 44 _SimpleCallbackSink(this._callback); |
45 | 45 |
46 void add(T chunk) { _accumulated.add(chunk); } | 46 void add(T chunk) { _accumulated.add(chunk); } |
47 void close() { _callback(_accumulated); } | 47 void close() { _callback(_accumulated); } |
48 } | 48 } |
49 | 49 |
50 /** | 50 class _EventSinkAdapter<T> implements ChunkedConversionSink<T> { |
51 * This class wraps a [Converter] for use as a [StreamTransformer]. | 51 final EventSink<T> _sink; |
52 */ | |
53 class _ConverterTransformStream<S, T> extends EventTransformStream<S, T> { | |
54 final _ConverterStreamEventTransformer<S, T> _eventTransformer; | |
55 | 52 |
56 _ConverterTransformStream(Stream<S> source, Converter converter) | 53 _EventSinkAdapter(this._sink); |
57 : this._withEventTransformer( | |
58 source, | |
59 new _ConverterStreamEventTransformer<S, T>(converter)); | |
60 | 54 |
61 _ConverterTransformStream._withEventTransformer( | 55 void add(T data) => _sink.add(data); |
62 Stream<S> source, | 56 void close() => _sink.close(); |
63 _ConverterStreamEventTransformer<S, T> eventTransformer) | |
64 : _eventTransformer = eventTransformer, | |
65 super(source, eventTransformer); | |
66 | |
67 /** | |
68 * Starts listening to `this`. | |
69 * | |
70 * This starts the chunked conversion. | |
71 */ | |
72 StreamSubscription<T> listen(void onData(T data), | |
73 { Function onError, | |
74 void onDone(), | |
75 bool cancelOnError }) { | |
76 _eventTransformer._startChunkedConversion(); | |
77 return super.listen(onData, onError: onError, onDone: onDone, | |
78 cancelOnError: cancelOnError); | |
79 } | |
80 } | 57 } |
81 | 58 |
82 /** | 59 /** |
83 * This class converts implements the logic for a chunked conversion as a | 60 * This class converts implements the logic for a chunked conversion as a |
84 * stream transformer. | 61 * stream transformer. |
85 * | 62 * |
86 * It is used as strategy in the [EventTransformStream]. | 63 * It is used as strategy in the [EventTransformStream]. |
87 * | 64 * |
88 * It also implements the [ChunkedConversionSink] interface so that it | 65 * It also implements the [ChunkedConversionSink] interface so that it |
89 * can be used as output sink in a chunked conversion. | 66 * can be used as output sink in a chunked conversion. |
90 */ | 67 */ |
91 class _ConverterStreamEventTransformer<S, T> | 68 class _ConverterStreamEventSink<S, T> implements EventSink<S> { |
92 implements ChunkedConversionSink<T>, StreamEventTransformer<S, T> { | 69 /** The output sink for the converter. */ |
93 final Converter _converter; | 70 final EventSink<T> _eventSink; |
94 | |
95 /** At every [handleData] this field is updated with the new event sink. */ | |
96 EventSink<T> _eventSink; | |
97 | 71 |
98 /** | 72 /** |
99 * The input sink for new data. All data that is received with | 73 * The input sink for new data. All data that is received with |
100 * [handleData] is added into this sink. | 74 * [handleData] is added into this sink. |
101 */ | 75 */ |
102 ChunkedConversionSink _chunkedSink; | 76 ChunkedConversionSink _chunkedSink; |
103 | 77 |
104 _ConverterStreamEventTransformer(this._converter); | 78 _ConverterStreamEventSink(Converter converter, EventSink<T> sink) |
| 79 : this._eventSink = sink, |
| 80 _chunkedSink = |
| 81 converter.startChunkedConversion(new _EventSinkAdapter(sink)); |
105 | 82 |
106 /** | 83 void add(T o) => _chunkedSink.add(o); |
107 * Starts the chunked conversion. | 84 void addError(Object error, [StackTrace stackTrace]) { |
108 */ | 85 _eventSink.addError(error, stackTrace); |
109 void _startChunkedConversion() { | |
110 _chunkedSink = _converter.startChunkedConversion(this); | |
111 } | 86 } |
112 | 87 void close() => _chunkedSink.close(); |
113 /** | |
114 * Not supported. | |
115 */ | |
116 Stream bind(Stream otherStream) { | |
117 throw new UnsupportedError("Converter streams must not call bind"); | |
118 } | |
119 | |
120 void add(T o) => _eventSink.add(o); | |
121 void close() => _eventSink.close(); | |
122 | |
123 void handleData(S event, EventSink<T> eventSink) { | |
124 _eventSink = eventSink; | |
125 try { | |
126 _chunkedSink.add(event); | |
127 } catch(e) { | |
128 // TODO(floitsch): capture stack trace. | |
129 eventSink.addError(e); | |
130 } finally { | |
131 _eventSink = null; | |
132 } | |
133 } | |
134 | |
135 void handleDone(EventSink<T> eventSink) { | |
136 _eventSink = eventSink; | |
137 try { | |
138 _chunkedSink.close(); | |
139 } catch(e) { | |
140 // TODO(floitsch): capture stack trace. | |
141 eventSink.addError(e); | |
142 } finally { | |
143 _eventSink = null; | |
144 } | |
145 } | |
146 | |
147 void handleError(var errorEvent, EventSink<T> eventSink) { | |
148 // TODO(floitsch): capture stack trace. | |
149 eventSink.addError(errorEvent); | |
150 } | |
151 } | 88 } |
OLD | NEW |