Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(445)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/string_transform.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // part of dart.async;
6
7 /**
8 * A pipe between two streams.
9 *
10 * The default pipe subscribes to the [source] and sends on the
11 * [stream].
12 *
13 * The events are passed through the [_handleData], [_handleError] and
14 * [_handleDone] methods. Subclasses are supposed to add handling of some of
15 * the events by overriding these methods.
16 *
17 * This class is intended for internal use only. Users can use the [PipeStream]
18 * to configure similar behavior.
19 */
20 abstract class _ForwardingStream<S, T> extends _MultiStreamImpl<T>
21 implements StreamTransformer<S, T> {
22 Stream<S> _source = null;
23 StreamSubscription _subscription = null;
24
25 StreamController<T> _createController() {
26 return new _BaseForwardingController<T>(this);
27 }
28
29 void _subscribeToSource() {
30 _subscription = _source.listen(this._handleData,
31 onError: this._handleError,
32 onDone: this._handleDone);
33 if (_isPaused) {
34 _subscription.pause();
35 }
36 }
37
38 Stream<T> bind(Stream<S> source) {
39 assert(_source == null);
40 _source = source;
41 if (_hasSubscribers) {
42 _subscribeToSource();
43 }
44 return this;
45 }
46
47 /**
48 * Subscribe or unsubscribe on [source] depending on whether
49 * [stream] has subscribers.
50 */
51 void _onSubscriptionStateChange() {
52 if (_hasSubscribers) {
53 assert(_subscription == null);
54 if (_source != null) {
55 _subscribeToSource();
56 }
57 } else {
58 if (_subscription != null) {
59 _subscription.cancel();
60 _subscription = null;
61 }
62 }
63 }
64
65 void _onPauseStateChange() {
66 if (_subscription == null) return;
67 if (isPaused) {
68 _subscription.pause();
69 } else {
70 _subscription.resume();
71 }
72 }
73
74 void _handleData(S inputEvent) {
75 var outputEvent = inputEvent;
76 _add(outputEvent);
77 }
78
79 void _handleError(AsyncError error) {
80 _signalError(error);
81 }
82
83 void _handleDone() {
84 _close();
85 }
86 }
87
88
89 // -------------------------------------------------------------------
90 // Stream pipes used by the default Stream implementation.
91 // -------------------------------------------------------------------
92
93 typedef bool _Predicate<T>(T value);
94
95 class WhereStream<T> extends _ForwardingStream<T, T> {
96 final _Predicate<T> _test;
97
98 WhereStream(bool test(T value))
99 : this._test = test;
100
101 void _handleData(T inputEvent) {
102 bool satisfies;
103 try {
104 satisfies = _test(inputEvent);
105 } catch (e, s) {
106 _signalError(new AsyncError(e, s));
107 return;
108 }
109 if (satisfies) {
110 _add(inputEvent);
111 }
112 }
113 }
114
115
116 typedef T _Transformation<S, T>(S value);
117
118 /**
119 * A stream pipe that converts data events before passing them on.
120 */
121 class MapStream<S, T> extends _ForwardingStream<S, T> {
122 final _Transformation _transform;
123
124 MapStream(T transform(S event))
125 : this._transform = transform;
126
127 void _handleData(S inputEvent) {
128 T outputEvent;
129 try {
130 outputEvent = _transform(inputEvent);
131 } catch (e, s) {
132 _signalError(new AsyncError(e, s));
133 return;
134 }
135 _add(outputEvent);
136 }
137 }
138
139 /**
140 * A stream pipe that converts data events before passing them on.
141 */
142 class ExpandStream<S, T> extends _ForwardingStream<S, T> {
143 final _Transformation<S, Iterable<T>> _expand;
144
145 ExpandStream(Iterable<T> expand(S event))
146 : this._expand = expand;
147
148 void _handleData(S inputEvent) {
149 try {
150 for (T value in _expand(inputEvent)) {
151 _add(value);
152 }
153 } catch (e, s) {
154 // If either _expand or iterating the generated iterator throws,
155 // we abort the iteration.
156 _signalError(new AsyncError(e, s));
157 }
158 }
159 }
160
161
162 typedef AsyncError _ErrorTransformation(AsyncError error);
163
164 /**
165 * A stream pipe that converts or disposes error events
166 * before passing them on.
167 */
168 class HandleErrorStream<T> extends _ForwardingStream<T, T> {
169 final _ErrorTransformation _transform;
170
171 HandleErrorStream(AsyncError transform(AsyncError event))
172 : this._transform = transform;
173
174 void _handleError(AsyncError error) {
175 try {
176 error = _transform(error);
177 if (error == null) return;
178 } catch (e, s) {
179 error = new AsyncError.withCause(e, s, error);
180 }
181 _signalError(error);
182 }
183 }
184
185
186 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink);
187 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
188 typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
189
190 /**
191 * A stream pipe that intercepts all events and can generate any event as
192 * output.
193 *
194 * Each incoming event on this [StreamSink] is passed to the corresponding
195 * provided event handler, along with a [StreamSink] linked to the [output] of
196 * this pipe.
197 * The handler can then decide which events to send to the output
198 */
199 class PipeStream<S, T> extends _ForwardingStream<S, T> {
200 final _TransformDataHandler<S, T> _onData;
201 final _TransformErrorHandler<T> _onError;
202 final _TransformDoneHandler<T> _onDone;
203 StreamSink<T> _sink;
204
205 PipeStream({void onData(S data, StreamSink<T> sink),
206 void onError(AsyncError data, StreamSink<T> sink),
207 void onDone(StreamSink<T> sink)})
208 : this._onData = (onData == null ? _defaultHandleData : onData),
209 this._onError = (onError == null ? _defaultHandleError : onError),
210 this._onDone = (onDone == null ? _defaultHandleDone : onDone) {
211 // Cache the sink wrapper to avoid creating a new one for each event.
212 this._sink = new _StreamImplSink(this);
213 }
214
215 void _handleData(S data) {
216 try {
217 return _onData(data, _sink);
218 } catch (e, s) {
219 _signalError(new AsyncError(e, s));
220 }
221 }
222
223 void _handleError(AsyncError error) {
224 try {
225 _onError(error, _sink);
226 } catch (e, s) {
227 _signalError(new AsyncError.withCause(e, s, error));
228 }
229 }
230
231 void _handleDone() {
232 try {
233 _onDone(_sink);
234 } catch (e, s) {
235 _signalError(new AsyncError(e, s));
236 }
237 }
238
239 /** Default data handler forwards all data. */
240 static void _defaultHandleData(dynamic data, StreamSink sink) {
241 sink.add(data);
242 }
243 /** Default error handler forwards all errors. */
244 static void _defaultHandleError(AsyncError error, StreamSink sink) {
245 sink.signalError(error);
246 }
247 /** Default done handler forwards done. */
248 static void _defaultHandleDone(StreamSink sink) {
249 sink.close();
250 }
251 }
252
253 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */
254 class _StreamImplSink<T> implements StreamSink<T> {
255 _StreamImpl<T> _target;
256 _StreamImplSink(this._target);
257 void add(T data) { _target._add(data); }
258 void signalError(AsyncError error) { _target._signalError(error); }
259 void close() { _target._close(); }
260 }
261
262 /**
263 * A stream pipe that intercepts all events and can generate any event as
264 * output.
265 *
266 * Each incoming event on this [StreamSink] is passed to the corresponding
267 * method on [transform], along with a [StreamSink] linked to the [output] of
268 * this pipe.
269 * The handler can then decide which events to send to the output
270 */
271 class TransformStream<S, T> extends _ForwardingStream<S, T> {
272 final StreamTransformer<S, T> _transform;
273 StreamSink<T> _sink;
274
275 TransformStream(StreamTransformer<S, T> transform)
276 : this._transform = transform {
277 // Cache the sink wrapper to avoid creating a new one for each event.
278 this._sink = new _StreamImplSink(this);
279 }
280
281 void _handleData(S data) {
282 try {
283 return _transform.handleData(data, _sink);
284 } catch (e, s) {
285 _controller.signalError(new AsyncError(e, s));
286 }
287 }
288
289 void _handleError(AsyncError error) {
290 try {
291 _transform.handleError(error, _sink);
292 } catch (e, s) {
293 _controller.signalError(new AsyncError.withCause(e, s, error));
294 }
295 }
296
297 void _handleDone() {
298 try {
299 _transform.handleDone(_sink);
300 } catch (e, s) {
301 _controller.signalError(new AsyncError(e, s));
302 }
303 }
304 }
305
306
307 /** Helper class for transforming three functions into a StreamTransformer. */
308 class _StreamTransformerFunctionWrapper<S, T>
309 extends _StreamTransformer<S, T> {
310 final _TransformDataHandler<S, T> _handleData;
311 final _TransformErrorHandler<T> _handleError;
312 final _TransformDoneHandler<T> _handleDone;
313
314 _StreamTransformerFunctionWrapper({
315 void onData(S data, StreamSink<T> sink),
316 void onError(AsyncError data, StreamSink<T> sink),
317 void onDone(StreamSink<T> sink)})
318 : _handleData = onData != null ? onData : PipeStream._defaultHandleData,
319 _handleError = onError != null ? onError
320 : PipeStream._defaultHandleError,
321 _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone;
322
323 void handleData(S data, StreamSink<T> sink) {
324 return _handleData(data, sink);
325 }
326
327 void handleError(AsyncError error, StreamSink<T> sink) {
328 _handleError(error, sink);
329 }
330
331 void handleDone(StreamSink<T> sink) {
332 _handleDone(sink);
333 }
334 }
335
336
337 class TakeStream<T> extends _ForwardingStream<T, T> {
338 int _remaining;
339
340 TakeStream(int count)
341 : this._remaining = count {
342 if (count is! int) throw new ArgumentError(count);
343 }
344
345 void _handleData(T inputEvent) {
346 if (_remaining > 0) {
347 _add(inputEvent);
348 _remaining -= 1;
349 if (_remaining == 0) {
350 // Closing also unsubscribes all subscribers, which unsubscribes
351 // this from source.
352 _close();
353 }
354 }
355 }
356 }
357
358
359 class TakeWhileStream<T> extends _ForwardingStream<T, T> {
360 final _Predicate<T> _test;
361
362 TakeWhileStream(bool test(T value))
363 : this._test = test;
364
365 void _handleData(T inputEvent) {
366 bool satisfies;
367 try {
368 satisfies = _test(inputEvent);
369 } catch (e, s) {
370 _signalError(new AsyncError(e, s));
371 // The test didn't say true. Didn't say false either, but we stop anyway.
372 _close();
373 return;
374 }
375 if (satisfies) {
376 _add(inputEvent);
377 } else {
378 _close();
379 }
380 }
381 }
382
383 class SkipStream<T> extends _ForwardingStream<T, T> {
384 int _remaining;
385
386 SkipStream(int count)
387 : this._remaining = count{
388 if (count is! int) throw new ArgumentError(count);
389 }
390
391 void _handleData(T inputEvent) {
392 if (_remaining > 0) {
393 _remaining--;
394 return;
395 }
396 return _add(inputEvent);
397 }
398 }
399
400 class SkipWhileStream<T> extends _ForwardingStream<T, T> {
401 final _Predicate<T> _test;
402 bool _hasFailed = false;
403
404 SkipWhileStream(bool test(T value))
405 : this._test = test;
406
407 void _handleData(T inputEvent) {
408 if (_hasFailed) {
409 _add(inputEvent);
410 }
411 bool satisfies;
412 try {
413 satisfies = _test(inputEvent);
414 } catch (e, s) {
415 _signalError(new AsyncError(e, s));
416 // A failure to return a boolean is considered "not matching".
417 _hasFailed = true;
418 return;
419 }
420 if (!satisfies) {
421 _hasFailed = true;
422 _add(inputEvent);
423 }
424 }
425 }
426
427 typedef bool _Equality<T>(T a, T b);
428
429 class DistinctStream<T> extends _ForwardingStream<T, T> {
430 static var _SENTINEL = new Object();
431
432 _Equality<T> _equals;
433 var _previous = _SENTINEL;
434
435 DistinctStream(bool equals(T a, T b))
436 : _equals = equals;
437
438 void _handleData(T inputEvent) {
439 if (identical(_previous, _SENTINEL)) {
440 _previous = inputEvent;
441 return _add(inputEvent);
442 } else {
443 bool isEqual;
444 try {
445 if (_equals == null) {
446 isEqual = (_previous == inputEvent);
447 } else {
448 isEqual = _equals(_previous, inputEvent);
449 }
450 } catch (e, s) {
451 _signalError(new AsyncError(e, s));
452 return null;
453 }
454 if (!isEqual) {
455 _add(inputEvent);
456 _previous = inputEvent;
457 }
458 }
459 }
460 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/string_transform.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698