Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: sdk/lib/async/stream_transformers.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Small fixes and tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.async;
6
7 /**
8 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
9 */
10 class _EventSinkAdapter<T> implements EventSink<T> {
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Just to be pedantic (with myself, since I named th
floitsch 2013/10/10 15:39:57 Renamed to _EventSinkWrapper. Yey for dartEditor's
11 _EventSink _sink;
12 _EventSinkAdapter(this._sink);
13
14 void add(T data) { _sink._add(data); }
15 void addError(error, [StackTrace stackTrace]) {
16 _sink._addError(error, stackTrace);
17 }
18 void close() { _sink._close(); }
19 }
20
21 /**
22 * A StreamSubscription that pipes data through a sink.
23 *
24 * The constructor of this class takes a [_SinkMapper] which maps from
25 * [EventSink] to [EventSink]. The input to the mapper is the output of
26 * the transformation. The returned sink is the transformation's input.
27 */
28 class _SinkTransformerStreamSubscription<S, T>
29 extends _BufferingStreamSubscription<T> {
30 /// The transformer's input sink.
31 EventSink _transformerSink;
32
33 /// The subscription to the input stream.
34 StreamSubscription<S> _subscription;
35
36 _SinkTransformerStreamSubscription(Stream<S> source,
37 _SinkMapper mapper,
38 void onData(T data),
39 Function onError,
40 void onDone(),
41 bool cancelOnError)
42 // We set the adapter's target only when the user is allowed to send data.
43 : super(onData, onError, onDone, cancelOnError) {
44 _EventSinkAdapter<T> eventSink = new _EventSinkAdapter<T>(this);
45 _transformerSink = mapper(eventSink);
46 _subscription = source.listen(_handleData,
47 onError: _handleError,
48 onDone: _handleDone);
49 }
50
51 /** Whether this subscription is still subscribed to its source. */
52 bool get _isSubscribed => _subscription != null;
53
54 // _EventSink interface.
55
56 /**
57 * Adds an event to this subscriptions.
58 *
59 * Contrary to normal [_BufferingStreamSubscription]s we may receive
60 * events when the stream is already closed. Report them as state
61 * error.
62 */
63 void _add(T data) {
64 if (_isClosed) {
65 throw new StateError("Stream is already closed");
66 }
67 super._add(data);
68 }
69
70 /**
71 * Adds an error event to this subscriptions.
72 *
73 * Contrary to normal [_BufferingStreamSubscription]s we may receive
74 * events when the stream is already closed. Report them as state
75 * error.
76 */
77 void _addError(Object error, StackTrace stackTrace) {
78 if (_isClosed) {
79 throw new StateError("Stream is already closed");
80 }
81 super._addError(error, stackTrace);
82 }
83
84 /**
85 * Adds a close event to this subscriptions.
86 *
87 * Contrary to normal [_BufferingStreamSubscription]s we may receive
88 * events when the stream is already closed. Report them as state
89 * error.
90 */
91 void _close() {
92 if (_isClosed) {
93 throw new StateError("Stream is already closed");
94 }
95 super._close();
96 }
97
98 // _BufferingStreamSubscription hooks.
99
100 void _onPause() {
101 if (_isSubscribed) _subscription.pause();
102 }
103
104 void _onResume() {
105 if (_isSubscribed) _subscription.resume();
106 }
107
108 void _onCancel() {
109 if (_isSubscribed) {
110 StreamSubscription subscription = _subscription;
111 _subscription = null;
112 subscription.cancel();
113 }
114 }
115
116 void _handleData(S data) {
117 try {
118 _transformerSink.add(data);
119 } catch (e, s) {
120 _addError(_asyncError(e, s), s);
121 }
122 }
123
124 void _handleError(error, [stackTrace]) {
125 try {
126 _transformerSink.addError(error, stackTrace);
127 } catch (e, s) {
128 if (identical(e, error)) {
129 _addError(error, stackTrace);
130 } else {
131 _addError(_asyncError(e, s), s);
132 }
133 }
134 }
135
136 void _handleDone() {
137 try {
138 _subscription = null;
139 _transformerSink.close();
140 } catch (e, s) {
141 _addError(_asyncError(e, s), s);
142 }
143 }
144 }
145
146
147 typedef EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
148
149 /**
150 * A StreamTransformer for Sink-mappers.
151 *
152 * A Sink-mapper takes an [EventSink] (its output) and returns another
153 * EventSink (its input).
154 *
155 * Note that this class can be `const`.
156 */
157 class _StreamSinkTransformer<S, T> implements StreamTransformer<S, T> {
158 final _SinkMapper<S, T> _sinkMapper;
159 const _StreamSinkTransformer(this._sinkMapper);
160
161 Stream<T> bind(Stream<S> stream)
162 => new _BoundSinkStream<S, T>(stream, _sinkMapper);
163 }
164
165 /**
166 * The result of binding a StreamTransformer for Sink-mappers.
167 *
168 * It contains the bound Stream and the sink-mapper. Only when the user starts
169 * listening to this stream is the sink-mapper invoked. The result is used
170 * to create a StreamSubscription that transforms events.
171 */
172 class _BoundSinkStream<S, T> extends Stream<T> {
173 final _SinkMapper<S, T> _sinkMapper;
174 final Stream<S> _stream;
175
176 _BoundSinkStream(this._stream, this._sinkMapper);
177
178 StreamSubscription<T> listen(void onData(T event),
179 { Function onError,
180 void onDone(),
181 bool cancelOnError }) {
182 cancelOnError = identical(true, cancelOnError);
183 return new _SinkTransformerStreamSubscription(
184 _stream, _sinkMapper, onData, onError, onDone, cancelOnError);
185 }
186 }
187
188 /// Data-handler coming from [StreamTransformer.fromHandlers].
189 typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
190 /// Error-handler coming from [StreamTransformer.fromHandlers].
191 typedef void _TransformErrorHandler<T>(
192 Object error, StackTrace stackTrace, EventSink<T> sink);
193 /// Done-handler coming from [StreamTransformer.fromHandlers].
194 typedef void _TransformDoneHandler<T>(EventSink<T> sink);
195
196 /**
197 * Wraps handlers (from [StreamTransformer.fromHandlers]) into an `EventSink`.
198 *
199 * This way we can reuse the code from [_StreamSinkTransformer].
200 */
201 class _HandlerEventSink<S, T> implements EventSink<T> {
202 final _TransformDataHandler<S, T> _handleData;
203 final _TransformErrorHandler<T> _handleError;
204 final _TransformDoneHandler<T> _handleDone;
205
206 /// The output sink where the handlers should send their data into.
207 final EventSink<T> _sink;
208
209 _HandlerEventSink(this._handleData, this._handleError, this._handleDone,
210 this._sink);
211
212 void add(S data) => _handleData(data, _sink);
213 void addError(Object error, [StackTrace stackTrace])
214 => _handleError(error, stackTrace, _sink);
215 void close() => _handleDone(_sink);
216 }
217
218 /**
219 * A StreamTransformer that transformers events with the given handlers.
220 *
221 * Note that this transformer can only be used once.
222 */
223 class _StreamHandlerTransformer<S, T> extends _StreamSinkTransformer<S, T> {
224 bool _hasBeenUsed = false;
225
226 _StreamHandlerTransformer({
227 void handleData(S data, EventSink<T> sink),
228 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
229 void handleDone(EventSink<T> sink)})
230 : super((EventSink<T> outputSink) {
231 if (handleData == null) handleData = _defaultHandleData;
232 if (handleError == null) handleError = _defaultHandleError;
233 if (handleDone == null) handleDone = _defaultHandleDone;
234 return new _HandlerEventSink<S, T>(
235 handleData, handleError, handleDone, outputSink);
236 });
237
238 Stream<T> bind(Stream<S> stream) {
239 if (_hasBeenUsed) {
240 throw new StateError("Transformer has already been used.");
241 }
242 _hasBeenUsed = true;
Lasse Reichstein Nielsen 2013/10/07 11:47:00 This was the reason it could only be used once? C
floitsch 2013/10/10 15:39:57 Done.
243 return super.bind(stream);
244 }
245
246 /** Default data handler forwards all data. */
247 static void _defaultHandleData(var data, EventSink sink) {
248 sink.add(data);
249 }
250
251 /** Default error handler forwards all errors. */
252 static void _defaultHandleError(error, StackTrace stackTrace,
253 EventSink sink) {
254 sink.addError(error);
255 }
256
257 /** Default done handler forwards done. */
258 static void _defaultHandleDone(EventSink sink) {
259 sink.close();
260 }
261 }
262
263 /// A closure mapping a stream and cancelOnError to a StreamSubscription.
264 typedef StreamSubscription<T> _SubscriptionTransformer<S, T>(
265 Stream<S> stream, bool cancelOnError);
266
267 /**
268 * This class is a fully generic [StreamTransformer].
269 *
270 * Instead of implementing three classes: a [StreamTransformer], a [Stream]
271 * (as the result of a `bind` call) and a [StreamSubscription] (which does the
272 * actual work), this class only requires a closure that is invoked when the
Lasse Reichstein Nielsen 2013/10/07 11:47:00 closure -> function. Using "closure" suggests that
floitsch 2013/10/10 15:39:57 Done.
273 * last bit (the subscription) of the transformer-workflow is needed.
274 *
275 * The given transformer closure maps from Stream, cancelOnError to a
Lasse Reichstein Nielsen 2013/10/07 11:47:00 "Stream, cancelOnError" -> "Stream and cancelOnErr
floitsch 2013/10/10 15:39:57 Done.
276 * StreamSubscription. As such it can also act on `cancel` events, making it
Lasse Reichstein Nielsen 2013/10/07 11:47:00 `StreamSubscription`.
floitsch 2013/10/10 15:39:57 Done.
277 * fully generic.
Lasse Reichstein Nielsen 2013/10/07 11:47:00 "fully generic" -> "fully general". "Generic" soun
floitsch 2013/10/10 15:39:57 Done.
278 */
279 class _StreamSubscriptionTransformer<S, T> implements StreamTransformer<S, T> {
280 final _SubscriptionTransformer<S, T> _transformer;
281
282 const _StreamSubscriptionTransformer(this._transformer);
283
284 Stream<T> bind(Stream<S> stream)
285 => new _BoundSubscriptionStream<S, T>(stream, _transformer);
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Style guide wants '=>' on the previous line.
floitsch 2013/10/10 15:39:57 Done.
286 }
287
288 /**
289 * The bound version of [_StreamSubscriptionTransformer].
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Not obvious what "bound" means. I guess it refers
floitsch 2013/10/10 15:39:57 Done.
290 *
291 * This class is a [Stream] that has already an input stream. The transformer
Lasse Reichstein Nielsen 2013/10/07 11:47:00 "has already" -> "already has". Maybe drop "alread
floitsch 2013/10/10 15:39:57 Rewritten.
292 * closure is, however, only invoked when this stream is listened to.
Lasse Reichstein Nielsen 2013/10/07 11:47:00 Drop ", however,". It suggests that this is surpri
floitsch 2013/10/10 15:39:57 Rewritten.
293 */
294 class _BoundSubscriptionStream<S, T> extends Stream<T> {
295 final _SubscriptionTransformer<S, T> _transformer;
296 final Stream<S> _stream;
297
298 _BoundSubscriptionStream(this._stream, this._transformer);
299
300 StreamSubscription<T> listen(void onData(T event),
301 { Function onError,
302 void onDone(),
303 bool cancelOnError }) {
304 cancelOnError = identical(true, cancelOnError);
305 StreamSubscription<T> result = _transformer(_stream, cancelOnError);
306 result.onData(onData);
307 result.onError(onError);
308 result.onDone(onDone);
309 return result;
310 }
311 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698