Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(132)

Side by Side Diff: sdk/lib/async/stream_transformers.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix two more tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 /**
8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
9 */
10 class _EventSinkAdapter<T> implements EventSink<T> {
11 _EventSink _sink;
12 _EventSinkAdapter(this._sink);
13
14 void add(T data) { _sink._add(data); }
15 void addError(error, [StackTrace stackTrace]) {
16 _sink._addError(error, stackTrace);
17 }
18 void close() { _sink._close(); }
19 }
20
21 /**
22 * A StreamSubscription that pipes data through a sink.
23 *
24 * The constructor of this class takes a [_SinkMapper] which maps from
25 * [EventSink] to [EventSink]. The input to the mapper is the output of
26 * the transformation. The returned sink is the transformation's input.
27 */
28 class _SinkTransformerStreamSubscription<S, T>
29 extends _BufferingStreamSubscription<T> {
30 /// EventSink wrapper for this class.
31 _EventSinkAdapter<T> _sink;
32
33 /// The transformer's input sink.
34 EventSink _transformerSink;
35
36 /// The subscription to the input stream.
37 StreamSubscription<S> _subscription;
38
39 _SinkTransformerStreamSubscription(Stream<S> source,
40 _SinkMapper mapper,
41 void onData(T data),
42 Function onError,
43 void onDone(),
44 bool cancelOnError)
45 // We set the adapter's target only when the user is allowed to
46 // send data.
47 : super(onData, onError, onDone, cancelOnError) {
Lasse Reichstein Nielsen 2013/10/04 14:35:53 Indent by four only.
floitsch 2013/10/05 18:47:40 Done.
48 _sink = new _EventSinkAdapter<T>(this);
Lasse Reichstein Nielsen 2013/10/04 14:35:53 I don't think _sink is used anywhere else, so coul
floitsch 2013/10/05 18:47:40 Done.
49 _transformerSink = mapper(_sink);
50 _subscription = source.listen(_handleData,
51 onError: _handleError,
52 onDone: _handleDone);
53 }
54
55 /** Whether this subscription is still subscribed to its source. */
56 bool get _isSubscribed => _subscription != null;
57
58 void _onPause() {
59 if (_isSubscribed) _subscription.pause();
60 }
61
62 void _onResume() {
63 if (_isSubscribed) _subscription.resume();
64 }
65
66 void _onCancel() {
67 if (_isSubscribed) {
68 StreamSubscription subscription = _subscription;
69 _subscription = null;
70 subscription.cancel();
71 }
72 }
73
74 void _handleData(S data) {
75 try {
76 _transformerSink.add(data);
77 } catch (e, s) {
78 _addError(_asyncError(e, s), s);
79 }
80 }
81
82 void _handleError(error, [stackTrace]) {
83 try {
84 _transformerSink.addError(error, stackTrace);
85 } catch (e, s) {
86 if (identical(e, error)) {
87 _addError(error, stackTrace);
88 } else {
89 _addError(_asyncError(e, s), s);
90 }
91 }
92 }
93
94 void _handleDone() {
95 try {
96 _subscription = null;
97 _transformerSink.close();
98 } catch (e, s) {
99 _addError(_asyncError(e, s), s);
100 }
101 }
102 }
103
104
105 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
106
107 /**
108 * A StreamTransformer for Sink-mappers.
109 *
110 * A Sink-mapper takes an [EventSink] (its output) and returns another
111 * EventSink (its input).
112 *
113 * Note that this class can be `const`.
114 */
115 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> {
116 final _SinkMapper<S, T> _sinkMapper;
117 const _StreamSinkTransformer(this._sinkMapper);
118
119 Stream<T> bind(Stream<S> stream)
120 => new _BoundSinkStream<S, T>(stream, _sinkMapper);
121 }
122
123 /**
124 * The result of binding a StreamTransformer for Sink-mappers.
125 *
126 * It contains the bound Stream and the sink-mapper. Only, when the user starts
Lasse Reichstein Nielsen 2013/10/04 14:35:53 Only when the user starts listening to this stream
floitsch 2013/10/05 18:47:40 Done.
127 * listening to this stream, the sink-mapper is invoked. The result is used
128 * to create a StreamSubscription that transforms events.
129 */
130 class _BoundSinkStream<S, T> extends Stream<T> {
131 final _SinkMapper<S, T> _sinkMapper;
132 final Stream<S> _stream;
133
134 _BoundSinkStream(this._stream, this._sinkMapper);
135
136 StreamSubscription<T> listen(void onData(T event),
137 { Function onError,
138 void onDone(),
139 bool cancelOnError }) {
140 cancelOnError = identical(true, cancelOnError);
141 return new _SinkTransformerStreamSubscription(
142 _stream, _sinkMapper, onData, onError, onDone, cancelOnError);
143 }
144 }
145
146 /// Data-handler coming from [StreamTransformer.fromHandlers].
147 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
148 /// Error-handler coming from [StreamTransformer.fromHandlers].
149 typedef void _TransformErrorHandler<T>(
150 Object error, StackTrace stackTrace, EventSink<T> sink);
151 /// Done-handler coming from [StreamTransformer.fromHandlers].
152 typedef void _TransformDoneHandler<T>(EventSink<T> sink);
153
154 /**
155 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an EventSink.
Lasse Reichstein Nielsen 2013/10/04 14:35:53 `EventSink`
floitsch 2013/10/05 18:47:40 Done.
156 *
157 * This way we can reuse the code from [_StreamSinkTransformer].
158 */
159 class _HandlerEventSink<S, T> implements EventSink<T> {
160 final _TransformDataHandler<S, T> _handleData;
161 final _TransformErrorHandler<T> _handleError;
162 final _TransformDoneHandler<T> _handleDone;
163
164 /// The output sink where the handlers should send their data into.
165 final EventSink<T> _sink;
166
167 _HandlerEventSink(this._handleData, this._handleError, this._handleDone,
168 this._sink);
169
170 void add(T data) => _handleData(data, _sink);
171 void addError(Object error, [StackTrace stackTrace])
172 => _handleError(error, stackTrace, _sink);
173 void close() => _handleDone(_sink);
174 }
175
176 /**
177 * A StreamTransformer that transformers events with the given handlers.
178 *
179 * Note that this transformer can only be used once.
180 */
181 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> {
182 bool _hasBeenUsed = false;
183
184 _StreamHandlerTransformer({
185 void handleData(S data, EventSink<T> sink),
186 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
187 void handleDone(EventSink<T> sink)})
188 : super((EventSink<S> outputSink) {
189 if (handleData == null) handleData = _defaultHandleData;
190 if (handleError == null) handleError = _defaultHandleError;
191 if (handleDone == null) handleDone = _defaultHandleDone;
192 return new _HandlerEventSink<S, T>(
193 handleData, handleError, handleDone, outputSink);
194 });
195
196 Stream<T> bind(Stream<S> stream) {
197 if (_hasBeenUsed) {
198 throw new StateError("Transformer has already been used.");
199 }
200 _hasBeenUsed = true;
201 return super.bind(stream);
202 }
203
204 /** Default data handler forwards all data. */
205 static void _defaultHandleData(var data, EventSink sink) {
206 sink.add(data);
207 }
208
209 /** Default error handler forwards all errors. */
210 static void _defaultHandleError(error, StackTrace stackTrace, EventSink sink) {
Lasse Reichstein Nielsen 2013/10/04 14:35:53 long line.
floitsch 2013/10/05 18:47:40 Done.
211 sink.addError(error);
212 }
213
214 /** Default done handler forwards done. */
215 static void _defaultHandleDone(EventSink sink) {
216 sink.close();
217 }
218 }
219
220 /// A closure mapping a stream and cancelOnError to a StreamSubscription.
221 typedef StreamSubscription<T> _SubscriptionTransformer<S, T>(
222 Stream<S> stream, bool cancelOnError);
223
224 /**
225 * This class is the most powerful [StreamTransformer].
Lasse Reichstein Nielsen 2013/10/04 14:35:53 powerful -> generic? "the most" sounds like it sho
floitsch 2013/10/05 18:47:40 "fully generic". done.
226 *
227 * The given transformer closure maps from Stream, cancelOnError to a
228 * StreamSubscription. This allows it to do everything it needs. It can also
Lasse Reichstein Nielsen 2013/10/04 14:35:53 do everything it needs -> interact with the entire
floitsch 2013/10/05 18:47:40 Completely rewrote the comment.
229 * watch for cancels, which the other transformers above cannot do.
230 */
231 class _StreamSubscriptionTransformer<S, T> implements StreamTransformer<S, T> {
232 final _SubscriptionTransformer<S, T> _transformer;
233
234 const _StreamSubscriptionTransformer(this._transformer);
235
236 Stream<T> bind(Stream<S> stream)
237 => new _BoundSubscriptionStream<S, T>(stream, _transformer);
238 }
239
240 /**
241 * The bound version of [_StreamSubscriptionTransformer].
242 *
243 * This class is a [Stream] that has already an input stream. The transformer
244 * closure is, however, only invoked when this stream is listened to.
245 */
246 class _BoundSubscriptionStream<S, T> extends Stream<T> {
247 final _SubscriptionTransformer<S, T> _transformer;
248 final Stream<S> _stream;
249
250 _BoundSubscriptionStream(this._stream, this._transformer);
251
252 StreamSubscription<T> listen(void onData(T event),
253 { Function onError,
254 void onDone(),
255 bool cancelOnError }) {
256 cancelOnError = identical(true, cancelOnError);
257 StreamSubscription<T> result = _transformer(_stream, cancelOnError);
258 result.onData(onData);
259 result.onError(onError);
260 result.onDone(onDone);
261 return result;
262 }
263 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698